Hi Sumit,

Not sure I follow you.

Which resource cleanup are you talking about:
- the close() on the reader (source) ?
- the finishBundle() on the writer (sink) ?

Regards
JB

On 07/29/2016 09:35 PM, Chawla,Sumit  wrote:
Hi Raghu

My source is going to be unbounded (streaming) with writes to Cassandra.
Only concern with KafkaIO. write is that producer is closed after every
bundle, and every bundle may have to open a new connection to Kafka.  (
Please correct me if i am wrong: I am assuming the bundle to be equivalent
to Window Size\Mini-batch).

In Jean's implementation i see a different style of resource cleanup. Can
someone please explain when that finalize method is called?

Regards
Sumit Chawla


On Fri, Jul 29, 2016 at 10:45 AM, Raghu Angadi <[email protected]>
wrote:

It is the preferred pattern I think. Is your source bounded or unbounded
(i.e. streaming)? If it is latter, your sink could even be simpler than
JB's. e.g. KafkaIO.write() where it just writes the messages to Kafka in
processElement().

The pros are pretty clear : runner independent, pure Beam, simpler code.
cons : no checkpoint/rollback, I don't know if Flink specific sink provides
this either.

On Fri, Jul 29, 2016 at 10:18 AM, Chawla,Sumit <[email protected]>
wrote:

Any more comments on this pattern suggested by Jean?

Regards
Sumit Chawla


On Thu, Jul 28, 2016 at 1:34 PM, Kenneth Knowles <[email protected]

wrote:

What I said earlier is not quite accurate, though my advice is the
same.
Here are the corrections:

 - The Write transform actually has a too-general name, and
Write.of(Sink)
only really works for finite data. It re-windows into the global window
and
replaces any triggers.
 - So the special case in the Flink runner actually just _enables_ a
(fake)
Sink to work.

We should probably rename Write to some more specific name that
indicates
the particular strategy, and make it easier for a user to decide
whether
that pattern is what they want. And the transform as-is should probably
reject unbounded inputs.

So you should still proceed with implementation via ParDo and your own
logic. If you want some logic similar to Write (but with different
windowing and triggering) then it is a pretty simple composite to
derive
something from.

On Thu, Jul 28, 2016 at 12:37 PM, Chawla,Sumit <[email protected]

wrote:

Thanks Jean

This is an interesting pattern here.  I see that its implemented as
PTransform, with constructs ( WriteOperation/Writer)  pretty similar
to
Sink<T> interface.  Would love to hear more pros/cons of this pattern
:)
.
Definitely it gives more control over connection initialization and
cleanup.

Regards
Sumit Chawla


On Thu, Jul 28, 2016 at 12:20 PM, Jean-Baptiste Onofré <
[email protected]>
wrote:

Hi Sumit,

I created a PR containing Cassandra IO with a sink:

https://github.com/apache/incubator-beam/pull/592

Maybe it can help you.

Regards
JB


On 07/28/2016 09:00 PM, Chawla,Sumit  wrote:

Hi Kenneth

Thanks for looking into it. I am currently trying to implement
Sinks
for
writing data into Cassandra/Titan DB.  My immediate goal is to run
it
on
Flink Runner.



Regards
Sumit Chawla


On Thu, Jul 28, 2016 at 11:56 AM, Kenneth Knowles
<[email protected]

wrote:

Hi Sumit,

I see what has happened here, from that snippet you pasted from
the
Flink
runner's code [1]. Thanks for looking into it!

The Flink runner today appears to reject Write.Bounded transforms
in
streaming mode if the sink is not an instance of
UnboundedFlinkSink.
The
intent of that code, I believe, was to special case
UnboundedFlinkSink
to
make it easy to use an existing Flink sink, not to disable all
other
Write
transforms. What do you think, Max?

Until we fix this issue, you should use ParDo transforms to do
the
writing.
If you can share a little about your sink, we may be able to
suggest
patterns for implementing it. Like Eugene said, the
Write.of(Sink)
transform is just a specialized pattern of ParDo's, not a Beam
primitive.

Kenn

[1]






https://github.com/apache/incubator-beam/blob/master/runners/flink/runner/src/main/java/org/apache/beam/runners/flink/translation/FlinkStreamingTransformTranslators.java#L203


On Wed, Jul 27, 2016 at 5:57 PM, Eugene Kirpichov <
[email protected]> wrote:

Thanks Sumit. Looks like your question is, indeed, specific to
the
Flink
runner, and I'll then defer to somebody familiar with it.

On Wed, Jul 27, 2016 at 5:25 PM Chawla,Sumit <
[email protected]>
wrote:

Thanks a lot Eugene.

My immediate requirement is to run this Sink on FlinkRunner.
Which

mandates that my implementation must also implement
SinkFunction<>.
In
that >>>case, none of the Sink<> methods get called anyway.

I am using FlinkRunner. The Sink implementation that i was
writing
by
extending Sink<> class had to implement Flink Specific
SinkFunction
for

the

correct translation.

private static class WriteSinkStreamingTranslator<T> implements







FlinkStreamingPipelineTranslator.StreamTransformTranslator<Write.Bound<T>>

{

   @Override
   public void translateNode(Write.Bound<T> transform,
FlinkStreamingTranslationContext context) {
     String name = transform.getName();
     PValue input = context.getInput(transform);

     Sink<T> sink = transform.getSink();
     if (!(sink instanceof UnboundedFlinkSink)) {
       throw new UnsupportedOperationException("At the time,
only
unbounded Flink sinks are supported.");
     }

     DataStream<WindowedValue<T>> inputDataSet =
context.getInputDataStream(input);

     inputDataSet.flatMap(new FlatMapFunction<WindowedValue<T>,

Object>()

{

       @Override
       public void flatMap(WindowedValue<T> value,
Collector<Object>
out) throws Exception {
         out.collect(value.getValue());
       }
     }).addSink(((UnboundedFlinkSink<Object>)
sink).getFlinkSource()).name(name);
   }
}




Regards
Sumit Chawla


On Wed, Jul 27, 2016 at 4:53 PM, Eugene Kirpichov <
[email protected]> wrote:

Hi Sumit,

All reusable parts of a pipeline, including connectors to
storage

systems,

should be packaged as PTransform's.

Sink is an advanced API that you can use under the hood to
implement

the

transform, if this particular connector benefits from this API
-
but

you

don't have to, and many connectors indeed don't need it, and
are

simpler

to

implement just as wrappers around a couple of ParDo's writing
the

data.


Even if the connector is implemented using a Sink, packaging
the

connector

as a PTransform is important because it's easier to apply in a

pipeline

and

because it's more future-proof (the author of the connector
may
later
change it to use something else rather than Sink under the
hood

without

breaking existing users).

Sink is, currently, useful in the following case:
- You're writing a bounded amount of data (we do not yet have
an

unbounded

Sink analogue)
- The location you're writing to is known at pipeline
construction

time,

and does not depend on the data itself (support for
"data-dependent"

sinks

is on the radar https://issues.apache.org/jira/browse/BEAM-92
)
- The storage system you're writing to has a distinct

"initialization"

and

"finalization" step, allowing the write operation to appear
atomic

(either

all data is written or none). This mostly applies to files
(where

writing

is done by first writing to a temporary directory, and then
renaming

all

files to their final location), but there can be other cases
too.

Here's an example GCP connector using the Sink API under the
hood:









https://github.com/apache/incubator-beam/blob/master/sdks/java/io/google-cloud-platform/src/main/java/org/apache/beam/sdk/io/gcp/bigquery/BigQueryIO.java#L1797

Most other non-file-based connectors, indeed, don't (KafkaIO,

DatastoreIO,

BigtableIO etc.)

I'm not familiar with the Flink API, however I'm a bit
confused
by

your

last paragraph: the Beam programming model is intentionally
runner-agnostic, so that you can run exactly the same code on

different

runners.

On Wed, Jul 27, 2016 at 4:30 PM Chawla,Sumit <
[email protected]


wrote:

Hi

Please suggest me on what is the best way to write a Sink in

Beam.  I

see

that there is a Sink<T> abstract class which is in
experimental

state.

What is the expected outcome of this one? Do we have the api

frozen,

or

this could still change?  Most of the existing Sink
implementations

like

KafkaIO.Write are not using this interface, and instead
extends
PTransform<PCollection<KV<K, V>>, PDone>. Would these be
changed
to

extend

Sink<>.


My immediate requirement is to run this Sink on FlinkRunner.
Which

mandates

that my implementation must also implement SinkFunction<>.
In
that

case,

none of the Sink<> methods get called anyway.

Regards
Sumit Chawla







--
Jean-Baptiste Onofré
[email protected]
http://blog.nanthrax.net
Talend - http://www.talend.com







--
Jean-Baptiste Onofré
[email protected]
http://blog.nanthrax.net
Talend - http://www.talend.com

Reply via email to