It is the preferred pattern I think. Is your source bounded or unbounded
(i.e. streaming)? If it is latter, your sink could even be simpler than
JB's. e.g. KafkaIO.write() where it just writes the messages to Kafka in
processElement().

The pros are pretty clear : runner independent, pure Beam, simpler code.
cons : no checkpoint/rollback, I don't know if Flink specific sink provides
this either.

On Fri, Jul 29, 2016 at 10:18 AM, Chawla,Sumit <[email protected]>
wrote:

> Any more comments on this pattern suggested by Jean?
>
> Regards
> Sumit Chawla
>
>
> On Thu, Jul 28, 2016 at 1:34 PM, Kenneth Knowles <[email protected]>
> wrote:
>
> > What I said earlier is not quite accurate, though my advice is the same.
> > Here are the corrections:
> >
> >  - The Write transform actually has a too-general name, and
> Write.of(Sink)
> > only really works for finite data. It re-windows into the global window
> and
> > replaces any triggers.
> >  - So the special case in the Flink runner actually just _enables_ a
> (fake)
> > Sink to work.
> >
> > We should probably rename Write to some more specific name that indicates
> > the particular strategy, and make it easier for a user to decide whether
> > that pattern is what they want. And the transform as-is should probably
> > reject unbounded inputs.
> >
> > So you should still proceed with implementation via ParDo and your own
> > logic. If you want some logic similar to Write (but with different
> > windowing and triggering) then it is a pretty simple composite to derive
> > something from.
> >
> > On Thu, Jul 28, 2016 at 12:37 PM, Chawla,Sumit <[email protected]>
> > wrote:
> >
> > > Thanks Jean
> > >
> > > This is an interesting pattern here.  I see that its implemented as
> > > PTransform, with constructs ( WriteOperation/Writer)  pretty similar to
> > > Sink<T> interface.  Would love to hear more pros/cons of this pattern
> :)
> > .
> > > Definitely it gives more control over connection initialization and
> > > cleanup.
> > >
> > > Regards
> > > Sumit Chawla
> > >
> > >
> > > On Thu, Jul 28, 2016 at 12:20 PM, Jean-Baptiste Onofré <
> [email protected]>
> > > wrote:
> > >
> > > > Hi Sumit,
> > > >
> > > > I created a PR containing Cassandra IO with a sink:
> > > >
> > > > https://github.com/apache/incubator-beam/pull/592
> > > >
> > > > Maybe it can help you.
> > > >
> > > > Regards
> > > > JB
> > > >
> > > >
> > > > On 07/28/2016 09:00 PM, Chawla,Sumit  wrote:
> > > >
> > > >> Hi Kenneth
> > > >>
> > > >> Thanks for looking into it. I am currently trying to implement Sinks
> > for
> > > >> writing data into Cassandra/Titan DB.  My immediate goal is to run
> it
> > on
> > > >> Flink Runner.
> > > >>
> > > >>
> > > >>
> > > >> Regards
> > > >> Sumit Chawla
> > > >>
> > > >>
> > > >> On Thu, Jul 28, 2016 at 11:56 AM, Kenneth Knowles
> > > <[email protected]
> > > >> >
> > > >> wrote:
> > > >>
> > > >> Hi Sumit,
> > > >>>
> > > >>> I see what has happened here, from that snippet you pasted from the
> > > Flink
> > > >>> runner's code [1]. Thanks for looking into it!
> > > >>>
> > > >>> The Flink runner today appears to reject Write.Bounded transforms
> in
> > > >>> streaming mode if the sink is not an instance of
> UnboundedFlinkSink.
> > > The
> > > >>> intent of that code, I believe, was to special case
> > UnboundedFlinkSink
> > > to
> > > >>> make it easy to use an existing Flink sink, not to disable all
> other
> > > >>> Write
> > > >>> transforms. What do you think, Max?
> > > >>>
> > > >>> Until we fix this issue, you should use ParDo transforms to do the
> > > >>> writing.
> > > >>> If you can share a little about your sink, we may be able to
> suggest
> > > >>> patterns for implementing it. Like Eugene said, the Write.of(Sink)
> > > >>> transform is just a specialized pattern of ParDo's, not a Beam
> > > primitive.
> > > >>>
> > > >>> Kenn
> > > >>>
> > > >>> [1]
> > > >>>
> > > >>>
> > > >>>
> > >
> >
> https://github.com/apache/incubator-beam/blob/master/runners/flink/runner/src/main/java/org/apache/beam/runners/flink/translation/FlinkStreamingTransformTranslators.java#L203
> > > >>>
> > > >>>
> > > >>> On Wed, Jul 27, 2016 at 5:57 PM, Eugene Kirpichov <
> > > >>> [email protected]> wrote:
> > > >>>
> > > >>> Thanks Sumit. Looks like your question is, indeed, specific to the
> > > Flink
> > > >>>> runner, and I'll then defer to somebody familiar with it.
> > > >>>>
> > > >>>> On Wed, Jul 27, 2016 at 5:25 PM Chawla,Sumit <
> > [email protected]>
> > > >>>> wrote:
> > > >>>>
> > > >>>> Thanks a lot Eugene.
> > > >>>>>
> > > >>>>> My immediate requirement is to run this Sink on FlinkRunner.
> Which
> > > >>>>>>>>
> > > >>>>>>> mandates that my implementation must also implement
> > SinkFunction<>.
> > > >>>>> In
> > > >>>>> that >>>case, none of the Sink<> methods get called anyway.
> > > >>>>>
> > > >>>>> I am using FlinkRunner. The Sink implementation that i was
> writing
> > by
> > > >>>>> extending Sink<> class had to implement Flink Specific
> SinkFunction
> > > for
> > > >>>>>
> > > >>>> the
> > > >>>>
> > > >>>>> correct translation.
> > > >>>>>
> > > >>>>> private static class WriteSinkStreamingTranslator<T> implements
> > > >>>>>
> > > >>>>>
> > > >>>>
> > > >>>
> > >
> >
> FlinkStreamingPipelineTranslator.StreamTransformTranslator<Write.Bound<T>>
> > > >>>
> > > >>>> {
> > > >>>>>
> > > >>>>>    @Override
> > > >>>>>    public void translateNode(Write.Bound<T> transform,
> > > >>>>> FlinkStreamingTranslationContext context) {
> > > >>>>>      String name = transform.getName();
> > > >>>>>      PValue input = context.getInput(transform);
> > > >>>>>
> > > >>>>>      Sink<T> sink = transform.getSink();
> > > >>>>>      if (!(sink instanceof UnboundedFlinkSink)) {
> > > >>>>>        throw new UnsupportedOperationException("At the time, only
> > > >>>>> unbounded Flink sinks are supported.");
> > > >>>>>      }
> > > >>>>>
> > > >>>>>      DataStream<WindowedValue<T>> inputDataSet =
> > > >>>>> context.getInputDataStream(input);
> > > >>>>>
> > > >>>>>      inputDataSet.flatMap(new FlatMapFunction<WindowedValue<T>,
> > > >>>>>
> > > >>>> Object>()
> > > >>>
> > > >>>> {
> > > >>>>
> > > >>>>>        @Override
> > > >>>>>        public void flatMap(WindowedValue<T> value,
> > Collector<Object>
> > > >>>>> out) throws Exception {
> > > >>>>>          out.collect(value.getValue());
> > > >>>>>        }
> > > >>>>>      }).addSink(((UnboundedFlinkSink<Object>)
> > > >>>>> sink).getFlinkSource()).name(name);
> > > >>>>>    }
> > > >>>>> }
> > > >>>>>
> > > >>>>>
> > > >>>>>
> > > >>>>>
> > > >>>>> Regards
> > > >>>>> Sumit Chawla
> > > >>>>>
> > > >>>>>
> > > >>>>> On Wed, Jul 27, 2016 at 4:53 PM, Eugene Kirpichov <
> > > >>>>> [email protected]> wrote:
> > > >>>>>
> > > >>>>> Hi Sumit,
> > > >>>>>>
> > > >>>>>> All reusable parts of a pipeline, including connectors to
> storage
> > > >>>>>>
> > > >>>>> systems,
> > > >>>>>
> > > >>>>>> should be packaged as PTransform's.
> > > >>>>>>
> > > >>>>>> Sink is an advanced API that you can use under the hood to
> > implement
> > > >>>>>>
> > > >>>>> the
> > > >>>>
> > > >>>>> transform, if this particular connector benefits from this API -
> > but
> > > >>>>>>
> > > >>>>> you
> > > >>>>
> > > >>>>> don't have to, and many connectors indeed don't need it, and are
> > > >>>>>>
> > > >>>>> simpler
> > > >>>>
> > > >>>>> to
> > > >>>>>
> > > >>>>>> implement just as wrappers around a couple of ParDo's writing
> the
> > > >>>>>>
> > > >>>>> data.
> > > >>>
> > > >>>>
> > > >>>>>> Even if the connector is implemented using a Sink, packaging the
> > > >>>>>>
> > > >>>>> connector
> > > >>>>>
> > > >>>>>> as a PTransform is important because it's easier to apply in a
> > > >>>>>>
> > > >>>>> pipeline
> > > >>>
> > > >>>> and
> > > >>>>>
> > > >>>>>> because it's more future-proof (the author of the connector may
> > > later
> > > >>>>>> change it to use something else rather than Sink under the hood
> > > >>>>>>
> > > >>>>> without
> > > >>>
> > > >>>> breaking existing users).
> > > >>>>>>
> > > >>>>>> Sink is, currently, useful in the following case:
> > > >>>>>> - You're writing a bounded amount of data (we do not yet have an
> > > >>>>>>
> > > >>>>> unbounded
> > > >>>>>
> > > >>>>>> Sink analogue)
> > > >>>>>> - The location you're writing to is known at pipeline
> construction
> > > >>>>>>
> > > >>>>> time,
> > > >>>>
> > > >>>>> and does not depend on the data itself (support for
> > "data-dependent"
> > > >>>>>>
> > > >>>>> sinks
> > > >>>>>
> > > >>>>>> is on the radar https://issues.apache.org/jira/browse/BEAM-92)
> > > >>>>>> - The storage system you're writing to has a distinct
> > > >>>>>>
> > > >>>>> "initialization"
> > > >>>
> > > >>>> and
> > > >>>>>
> > > >>>>>> "finalization" step, allowing the write operation to appear
> atomic
> > > >>>>>>
> > > >>>>> (either
> > > >>>>>
> > > >>>>>> all data is written or none). This mostly applies to files
> (where
> > > >>>>>>
> > > >>>>> writing
> > > >>>>
> > > >>>>> is done by first writing to a temporary directory, and then
> > renaming
> > > >>>>>>
> > > >>>>> all
> > > >>>>
> > > >>>>> files to their final location), but there can be other cases too.
> > > >>>>>>
> > > >>>>>> Here's an example GCP connector using the Sink API under the
> hood:
> > > >>>>>>
> > > >>>>>>
> > > >>>>>>
> > > >>>>>
> > > >>>>
> > > >>>
> > >
> >
> https://github.com/apache/incubator-beam/blob/master/sdks/java/io/google-cloud-platform/src/main/java/org/apache/beam/sdk/io/gcp/bigquery/BigQueryIO.java#L1797
> > > >>>
> > > >>>> Most other non-file-based connectors, indeed, don't (KafkaIO,
> > > >>>>>>
> > > >>>>> DatastoreIO,
> > > >>>>>
> > > >>>>>> BigtableIO etc.)
> > > >>>>>>
> > > >>>>>> I'm not familiar with the Flink API, however I'm a bit confused
> by
> > > >>>>>>
> > > >>>>> your
> > > >>>
> > > >>>> last paragraph: the Beam programming model is intentionally
> > > >>>>>> runner-agnostic, so that you can run exactly the same code on
> > > >>>>>>
> > > >>>>> different
> > > >>>
> > > >>>> runners.
> > > >>>>>>
> > > >>>>>> On Wed, Jul 27, 2016 at 4:30 PM Chawla,Sumit <
> > > [email protected]
> > > >>>>>>
> > > >>>>>
> > > >>>> wrote:
> > > >>>>>>
> > > >>>>>> Hi
> > > >>>>>>>
> > > >>>>>>> Please suggest me on what is the best way to write a Sink in
> > > >>>>>>>
> > > >>>>>> Beam.  I
> > > >>>
> > > >>>> see
> > > >>>>>
> > > >>>>>> that there is a Sink<T> abstract class which is in experimental
> > > >>>>>>>
> > > >>>>>> state.
> > > >>>>
> > > >>>>> What is the expected outcome of this one? Do we have the api
> > > >>>>>>>
> > > >>>>>> frozen,
> > > >>>
> > > >>>> or
> > > >>>>
> > > >>>>> this could still change?  Most of the existing Sink
> implementations
> > > >>>>>>>
> > > >>>>>> like
> > > >>>>>
> > > >>>>>> KafkaIO.Write are not using this interface, and instead extends
> > > >>>>>>> PTransform<PCollection<KV<K, V>>, PDone>. Would these be
> changed
> > to
> > > >>>>>>>
> > > >>>>>> extend
> > > >>>>>>
> > > >>>>>>> Sink<>.
> > > >>>>>>>
> > > >>>>>>>
> > > >>>>>>> My immediate requirement is to run this Sink on FlinkRunner.
> > Which
> > > >>>>>>>
> > > >>>>>> mandates
> > > >>>>>>
> > > >>>>>>> that my implementation must also implement SinkFunction<>.  In
> > that
> > > >>>>>>>
> > > >>>>>> case,
> > > >>>>>
> > > >>>>>> none of the Sink<> methods get called anyway.
> > > >>>>>>>
> > > >>>>>>> Regards
> > > >>>>>>> Sumit Chawla
> > > >>>>>>>
> > > >>>>>>>
> > > >>>>>>
> > > >>>>>
> > > >>>>
> > > >>>
> > > >>
> > > > --
> > > > Jean-Baptiste Onofré
> > > > [email protected]
> > > > http://blog.nanthrax.net
> > > > Talend - http://www.talend.com
> > > >
> > >
> >
>

Reply via email to