Any more comments on this pattern suggested by Jean?

Regards
Sumit Chawla


On Thu, Jul 28, 2016 at 1:34 PM, Kenneth Knowles <[email protected]>
wrote:

> What I said earlier is not quite accurate, though my advice is the same.
> Here are the corrections:
>
>  - The Write transform actually has a too-general name, and Write.of(Sink)
> only really works for finite data. It re-windows into the global window and
> replaces any triggers.
>  - So the special case in the Flink runner actually just _enables_ a (fake)
> Sink to work.
>
> We should probably rename Write to some more specific name that indicates
> the particular strategy, and make it easier for a user to decide whether
> that pattern is what they want. And the transform as-is should probably
> reject unbounded inputs.
>
> So you should still proceed with implementation via ParDo and your own
> logic. If you want some logic similar to Write (but with different
> windowing and triggering) then it is a pretty simple composite to derive
> something from.
>
> On Thu, Jul 28, 2016 at 12:37 PM, Chawla,Sumit <[email protected]>
> wrote:
>
> > Thanks Jean
> >
> > This is an interesting pattern here.  I see that its implemented as
> > PTransform, with constructs ( WriteOperation/Writer)  pretty similar to
> > Sink<T> interface.  Would love to hear more pros/cons of this pattern :)
> .
> > Definitely it gives more control over connection initialization and
> > cleanup.
> >
> > Regards
> > Sumit Chawla
> >
> >
> > On Thu, Jul 28, 2016 at 12:20 PM, Jean-Baptiste Onofré <[email protected]>
> > wrote:
> >
> > > Hi Sumit,
> > >
> > > I created a PR containing Cassandra IO with a sink:
> > >
> > > https://github.com/apache/incubator-beam/pull/592
> > >
> > > Maybe it can help you.
> > >
> > > Regards
> > > JB
> > >
> > >
> > > On 07/28/2016 09:00 PM, Chawla,Sumit  wrote:
> > >
> > >> Hi Kenneth
> > >>
> > >> Thanks for looking into it. I am currently trying to implement Sinks
> for
> > >> writing data into Cassandra/Titan DB.  My immediate goal is to run it
> on
> > >> Flink Runner.
> > >>
> > >>
> > >>
> > >> Regards
> > >> Sumit Chawla
> > >>
> > >>
> > >> On Thu, Jul 28, 2016 at 11:56 AM, Kenneth Knowles
> > <[email protected]
> > >> >
> > >> wrote:
> > >>
> > >> Hi Sumit,
> > >>>
> > >>> I see what has happened here, from that snippet you pasted from the
> > Flink
> > >>> runner's code [1]. Thanks for looking into it!
> > >>>
> > >>> The Flink runner today appears to reject Write.Bounded transforms in
> > >>> streaming mode if the sink is not an instance of UnboundedFlinkSink.
> > The
> > >>> intent of that code, I believe, was to special case
> UnboundedFlinkSink
> > to
> > >>> make it easy to use an existing Flink sink, not to disable all other
> > >>> Write
> > >>> transforms. What do you think, Max?
> > >>>
> > >>> Until we fix this issue, you should use ParDo transforms to do the
> > >>> writing.
> > >>> If you can share a little about your sink, we may be able to suggest
> > >>> patterns for implementing it. Like Eugene said, the Write.of(Sink)
> > >>> transform is just a specialized pattern of ParDo's, not a Beam
> > primitive.
> > >>>
> > >>> Kenn
> > >>>
> > >>> [1]
> > >>>
> > >>>
> > >>>
> >
> https://github.com/apache/incubator-beam/blob/master/runners/flink/runner/src/main/java/org/apache/beam/runners/flink/translation/FlinkStreamingTransformTranslators.java#L203
> > >>>
> > >>>
> > >>> On Wed, Jul 27, 2016 at 5:57 PM, Eugene Kirpichov <
> > >>> [email protected]> wrote:
> > >>>
> > >>> Thanks Sumit. Looks like your question is, indeed, specific to the
> > Flink
> > >>>> runner, and I'll then defer to somebody familiar with it.
> > >>>>
> > >>>> On Wed, Jul 27, 2016 at 5:25 PM Chawla,Sumit <
> [email protected]>
> > >>>> wrote:
> > >>>>
> > >>>> Thanks a lot Eugene.
> > >>>>>
> > >>>>> My immediate requirement is to run this Sink on FlinkRunner. Which
> > >>>>>>>>
> > >>>>>>> mandates that my implementation must also implement
> SinkFunction<>.
> > >>>>> In
> > >>>>> that >>>case, none of the Sink<> methods get called anyway.
> > >>>>>
> > >>>>> I am using FlinkRunner. The Sink implementation that i was writing
> by
> > >>>>> extending Sink<> class had to implement Flink Specific SinkFunction
> > for
> > >>>>>
> > >>>> the
> > >>>>
> > >>>>> correct translation.
> > >>>>>
> > >>>>> private static class WriteSinkStreamingTranslator<T> implements
> > >>>>>
> > >>>>>
> > >>>>
> > >>>
> >
> FlinkStreamingPipelineTranslator.StreamTransformTranslator<Write.Bound<T>>
> > >>>
> > >>>> {
> > >>>>>
> > >>>>>    @Override
> > >>>>>    public void translateNode(Write.Bound<T> transform,
> > >>>>> FlinkStreamingTranslationContext context) {
> > >>>>>      String name = transform.getName();
> > >>>>>      PValue input = context.getInput(transform);
> > >>>>>
> > >>>>>      Sink<T> sink = transform.getSink();
> > >>>>>      if (!(sink instanceof UnboundedFlinkSink)) {
> > >>>>>        throw new UnsupportedOperationException("At the time, only
> > >>>>> unbounded Flink sinks are supported.");
> > >>>>>      }
> > >>>>>
> > >>>>>      DataStream<WindowedValue<T>> inputDataSet =
> > >>>>> context.getInputDataStream(input);
> > >>>>>
> > >>>>>      inputDataSet.flatMap(new FlatMapFunction<WindowedValue<T>,
> > >>>>>
> > >>>> Object>()
> > >>>
> > >>>> {
> > >>>>
> > >>>>>        @Override
> > >>>>>        public void flatMap(WindowedValue<T> value,
> Collector<Object>
> > >>>>> out) throws Exception {
> > >>>>>          out.collect(value.getValue());
> > >>>>>        }
> > >>>>>      }).addSink(((UnboundedFlinkSink<Object>)
> > >>>>> sink).getFlinkSource()).name(name);
> > >>>>>    }
> > >>>>> }
> > >>>>>
> > >>>>>
> > >>>>>
> > >>>>>
> > >>>>> Regards
> > >>>>> Sumit Chawla
> > >>>>>
> > >>>>>
> > >>>>> On Wed, Jul 27, 2016 at 4:53 PM, Eugene Kirpichov <
> > >>>>> [email protected]> wrote:
> > >>>>>
> > >>>>> Hi Sumit,
> > >>>>>>
> > >>>>>> All reusable parts of a pipeline, including connectors to storage
> > >>>>>>
> > >>>>> systems,
> > >>>>>
> > >>>>>> should be packaged as PTransform's.
> > >>>>>>
> > >>>>>> Sink is an advanced API that you can use under the hood to
> implement
> > >>>>>>
> > >>>>> the
> > >>>>
> > >>>>> transform, if this particular connector benefits from this API -
> but
> > >>>>>>
> > >>>>> you
> > >>>>
> > >>>>> don't have to, and many connectors indeed don't need it, and are
> > >>>>>>
> > >>>>> simpler
> > >>>>
> > >>>>> to
> > >>>>>
> > >>>>>> implement just as wrappers around a couple of ParDo's writing the
> > >>>>>>
> > >>>>> data.
> > >>>
> > >>>>
> > >>>>>> Even if the connector is implemented using a Sink, packaging the
> > >>>>>>
> > >>>>> connector
> > >>>>>
> > >>>>>> as a PTransform is important because it's easier to apply in a
> > >>>>>>
> > >>>>> pipeline
> > >>>
> > >>>> and
> > >>>>>
> > >>>>>> because it's more future-proof (the author of the connector may
> > later
> > >>>>>> change it to use something else rather than Sink under the hood
> > >>>>>>
> > >>>>> without
> > >>>
> > >>>> breaking existing users).
> > >>>>>>
> > >>>>>> Sink is, currently, useful in the following case:
> > >>>>>> - You're writing a bounded amount of data (we do not yet have an
> > >>>>>>
> > >>>>> unbounded
> > >>>>>
> > >>>>>> Sink analogue)
> > >>>>>> - The location you're writing to is known at pipeline construction
> > >>>>>>
> > >>>>> time,
> > >>>>
> > >>>>> and does not depend on the data itself (support for
> "data-dependent"
> > >>>>>>
> > >>>>> sinks
> > >>>>>
> > >>>>>> is on the radar https://issues.apache.org/jira/browse/BEAM-92)
> > >>>>>> - The storage system you're writing to has a distinct
> > >>>>>>
> > >>>>> "initialization"
> > >>>
> > >>>> and
> > >>>>>
> > >>>>>> "finalization" step, allowing the write operation to appear atomic
> > >>>>>>
> > >>>>> (either
> > >>>>>
> > >>>>>> all data is written or none). This mostly applies to files (where
> > >>>>>>
> > >>>>> writing
> > >>>>
> > >>>>> is done by first writing to a temporary directory, and then
> renaming
> > >>>>>>
> > >>>>> all
> > >>>>
> > >>>>> files to their final location), but there can be other cases too.
> > >>>>>>
> > >>>>>> Here's an example GCP connector using the Sink API under the hood:
> > >>>>>>
> > >>>>>>
> > >>>>>>
> > >>>>>
> > >>>>
> > >>>
> >
> https://github.com/apache/incubator-beam/blob/master/sdks/java/io/google-cloud-platform/src/main/java/org/apache/beam/sdk/io/gcp/bigquery/BigQueryIO.java#L1797
> > >>>
> > >>>> Most other non-file-based connectors, indeed, don't (KafkaIO,
> > >>>>>>
> > >>>>> DatastoreIO,
> > >>>>>
> > >>>>>> BigtableIO etc.)
> > >>>>>>
> > >>>>>> I'm not familiar with the Flink API, however I'm a bit confused by
> > >>>>>>
> > >>>>> your
> > >>>
> > >>>> last paragraph: the Beam programming model is intentionally
> > >>>>>> runner-agnostic, so that you can run exactly the same code on
> > >>>>>>
> > >>>>> different
> > >>>
> > >>>> runners.
> > >>>>>>
> > >>>>>> On Wed, Jul 27, 2016 at 4:30 PM Chawla,Sumit <
> > [email protected]
> > >>>>>>
> > >>>>>
> > >>>> wrote:
> > >>>>>>
> > >>>>>> Hi
> > >>>>>>>
> > >>>>>>> Please suggest me on what is the best way to write a Sink in
> > >>>>>>>
> > >>>>>> Beam.  I
> > >>>
> > >>>> see
> > >>>>>
> > >>>>>> that there is a Sink<T> abstract class which is in experimental
> > >>>>>>>
> > >>>>>> state.
> > >>>>
> > >>>>> What is the expected outcome of this one? Do we have the api
> > >>>>>>>
> > >>>>>> frozen,
> > >>>
> > >>>> or
> > >>>>
> > >>>>> this could still change?  Most of the existing Sink implementations
> > >>>>>>>
> > >>>>>> like
> > >>>>>
> > >>>>>> KafkaIO.Write are not using this interface, and instead extends
> > >>>>>>> PTransform<PCollection<KV<K, V>>, PDone>. Would these be changed
> to
> > >>>>>>>
> > >>>>>> extend
> > >>>>>>
> > >>>>>>> Sink<>.
> > >>>>>>>
> > >>>>>>>
> > >>>>>>> My immediate requirement is to run this Sink on FlinkRunner.
> Which
> > >>>>>>>
> > >>>>>> mandates
> > >>>>>>
> > >>>>>>> that my implementation must also implement SinkFunction<>.  In
> that
> > >>>>>>>
> > >>>>>> case,
> > >>>>>
> > >>>>>> none of the Sink<> methods get called anyway.
> > >>>>>>>
> > >>>>>>> Regards
> > >>>>>>> Sumit Chawla
> > >>>>>>>
> > >>>>>>>
> > >>>>>>
> > >>>>>
> > >>>>
> > >>>
> > >>
> > > --
> > > Jean-Baptiste Onofré
> > > [email protected]
> > > http://blog.nanthrax.net
> > > Talend - http://www.talend.com
> > >
> >
>

Reply via email to