Any more comments on this pattern suggested by Jean? Regards Sumit Chawla
On Thu, Jul 28, 2016 at 1:34 PM, Kenneth Knowles <[email protected]> wrote: > What I said earlier is not quite accurate, though my advice is the same. > Here are the corrections: > > - The Write transform actually has a too-general name, and Write.of(Sink) > only really works for finite data. It re-windows into the global window and > replaces any triggers. > - So the special case in the Flink runner actually just _enables_ a (fake) > Sink to work. > > We should probably rename Write to some more specific name that indicates > the particular strategy, and make it easier for a user to decide whether > that pattern is what they want. And the transform as-is should probably > reject unbounded inputs. > > So you should still proceed with implementation via ParDo and your own > logic. If you want some logic similar to Write (but with different > windowing and triggering) then it is a pretty simple composite to derive > something from. > > On Thu, Jul 28, 2016 at 12:37 PM, Chawla,Sumit <[email protected]> > wrote: > > > Thanks Jean > > > > This is an interesting pattern here. I see that its implemented as > > PTransform, with constructs ( WriteOperation/Writer) pretty similar to > > Sink<T> interface. Would love to hear more pros/cons of this pattern :) > . > > Definitely it gives more control over connection initialization and > > cleanup. > > > > Regards > > Sumit Chawla > > > > > > On Thu, Jul 28, 2016 at 12:20 PM, Jean-Baptiste Onofré <[email protected]> > > wrote: > > > > > Hi Sumit, > > > > > > I created a PR containing Cassandra IO with a sink: > > > > > > https://github.com/apache/incubator-beam/pull/592 > > > > > > Maybe it can help you. > > > > > > Regards > > > JB > > > > > > > > > On 07/28/2016 09:00 PM, Chawla,Sumit wrote: > > > > > >> Hi Kenneth > > >> > > >> Thanks for looking into it. I am currently trying to implement Sinks > for > > >> writing data into Cassandra/Titan DB. My immediate goal is to run it > on > > >> Flink Runner. > > >> > > >> > > >> > > >> Regards > > >> Sumit Chawla > > >> > > >> > > >> On Thu, Jul 28, 2016 at 11:56 AM, Kenneth Knowles > > <[email protected] > > >> > > > >> wrote: > > >> > > >> Hi Sumit, > > >>> > > >>> I see what has happened here, from that snippet you pasted from the > > Flink > > >>> runner's code [1]. Thanks for looking into it! > > >>> > > >>> The Flink runner today appears to reject Write.Bounded transforms in > > >>> streaming mode if the sink is not an instance of UnboundedFlinkSink. > > The > > >>> intent of that code, I believe, was to special case > UnboundedFlinkSink > > to > > >>> make it easy to use an existing Flink sink, not to disable all other > > >>> Write > > >>> transforms. What do you think, Max? > > >>> > > >>> Until we fix this issue, you should use ParDo transforms to do the > > >>> writing. > > >>> If you can share a little about your sink, we may be able to suggest > > >>> patterns for implementing it. Like Eugene said, the Write.of(Sink) > > >>> transform is just a specialized pattern of ParDo's, not a Beam > > primitive. > > >>> > > >>> Kenn > > >>> > > >>> [1] > > >>> > > >>> > > >>> > > > https://github.com/apache/incubator-beam/blob/master/runners/flink/runner/src/main/java/org/apache/beam/runners/flink/translation/FlinkStreamingTransformTranslators.java#L203 > > >>> > > >>> > > >>> On Wed, Jul 27, 2016 at 5:57 PM, Eugene Kirpichov < > > >>> [email protected]> wrote: > > >>> > > >>> Thanks Sumit. Looks like your question is, indeed, specific to the > > Flink > > >>>> runner, and I'll then defer to somebody familiar with it. > > >>>> > > >>>> On Wed, Jul 27, 2016 at 5:25 PM Chawla,Sumit < > [email protected]> > > >>>> wrote: > > >>>> > > >>>> Thanks a lot Eugene. > > >>>>> > > >>>>> My immediate requirement is to run this Sink on FlinkRunner. Which > > >>>>>>>> > > >>>>>>> mandates that my implementation must also implement > SinkFunction<>. > > >>>>> In > > >>>>> that >>>case, none of the Sink<> methods get called anyway. > > >>>>> > > >>>>> I am using FlinkRunner. The Sink implementation that i was writing > by > > >>>>> extending Sink<> class had to implement Flink Specific SinkFunction > > for > > >>>>> > > >>>> the > > >>>> > > >>>>> correct translation. > > >>>>> > > >>>>> private static class WriteSinkStreamingTranslator<T> implements > > >>>>> > > >>>>> > > >>>> > > >>> > > > FlinkStreamingPipelineTranslator.StreamTransformTranslator<Write.Bound<T>> > > >>> > > >>>> { > > >>>>> > > >>>>> @Override > > >>>>> public void translateNode(Write.Bound<T> transform, > > >>>>> FlinkStreamingTranslationContext context) { > > >>>>> String name = transform.getName(); > > >>>>> PValue input = context.getInput(transform); > > >>>>> > > >>>>> Sink<T> sink = transform.getSink(); > > >>>>> if (!(sink instanceof UnboundedFlinkSink)) { > > >>>>> throw new UnsupportedOperationException("At the time, only > > >>>>> unbounded Flink sinks are supported."); > > >>>>> } > > >>>>> > > >>>>> DataStream<WindowedValue<T>> inputDataSet = > > >>>>> context.getInputDataStream(input); > > >>>>> > > >>>>> inputDataSet.flatMap(new FlatMapFunction<WindowedValue<T>, > > >>>>> > > >>>> Object>() > > >>> > > >>>> { > > >>>> > > >>>>> @Override > > >>>>> public void flatMap(WindowedValue<T> value, > Collector<Object> > > >>>>> out) throws Exception { > > >>>>> out.collect(value.getValue()); > > >>>>> } > > >>>>> }).addSink(((UnboundedFlinkSink<Object>) > > >>>>> sink).getFlinkSource()).name(name); > > >>>>> } > > >>>>> } > > >>>>> > > >>>>> > > >>>>> > > >>>>> > > >>>>> Regards > > >>>>> Sumit Chawla > > >>>>> > > >>>>> > > >>>>> On Wed, Jul 27, 2016 at 4:53 PM, Eugene Kirpichov < > > >>>>> [email protected]> wrote: > > >>>>> > > >>>>> Hi Sumit, > > >>>>>> > > >>>>>> All reusable parts of a pipeline, including connectors to storage > > >>>>>> > > >>>>> systems, > > >>>>> > > >>>>>> should be packaged as PTransform's. > > >>>>>> > > >>>>>> Sink is an advanced API that you can use under the hood to > implement > > >>>>>> > > >>>>> the > > >>>> > > >>>>> transform, if this particular connector benefits from this API - > but > > >>>>>> > > >>>>> you > > >>>> > > >>>>> don't have to, and many connectors indeed don't need it, and are > > >>>>>> > > >>>>> simpler > > >>>> > > >>>>> to > > >>>>> > > >>>>>> implement just as wrappers around a couple of ParDo's writing the > > >>>>>> > > >>>>> data. > > >>> > > >>>> > > >>>>>> Even if the connector is implemented using a Sink, packaging the > > >>>>>> > > >>>>> connector > > >>>>> > > >>>>>> as a PTransform is important because it's easier to apply in a > > >>>>>> > > >>>>> pipeline > > >>> > > >>>> and > > >>>>> > > >>>>>> because it's more future-proof (the author of the connector may > > later > > >>>>>> change it to use something else rather than Sink under the hood > > >>>>>> > > >>>>> without > > >>> > > >>>> breaking existing users). > > >>>>>> > > >>>>>> Sink is, currently, useful in the following case: > > >>>>>> - You're writing a bounded amount of data (we do not yet have an > > >>>>>> > > >>>>> unbounded > > >>>>> > > >>>>>> Sink analogue) > > >>>>>> - The location you're writing to is known at pipeline construction > > >>>>>> > > >>>>> time, > > >>>> > > >>>>> and does not depend on the data itself (support for > "data-dependent" > > >>>>>> > > >>>>> sinks > > >>>>> > > >>>>>> is on the radar https://issues.apache.org/jira/browse/BEAM-92) > > >>>>>> - The storage system you're writing to has a distinct > > >>>>>> > > >>>>> "initialization" > > >>> > > >>>> and > > >>>>> > > >>>>>> "finalization" step, allowing the write operation to appear atomic > > >>>>>> > > >>>>> (either > > >>>>> > > >>>>>> all data is written or none). This mostly applies to files (where > > >>>>>> > > >>>>> writing > > >>>> > > >>>>> is done by first writing to a temporary directory, and then > renaming > > >>>>>> > > >>>>> all > > >>>> > > >>>>> files to their final location), but there can be other cases too. > > >>>>>> > > >>>>>> Here's an example GCP connector using the Sink API under the hood: > > >>>>>> > > >>>>>> > > >>>>>> > > >>>>> > > >>>> > > >>> > > > https://github.com/apache/incubator-beam/blob/master/sdks/java/io/google-cloud-platform/src/main/java/org/apache/beam/sdk/io/gcp/bigquery/BigQueryIO.java#L1797 > > >>> > > >>>> Most other non-file-based connectors, indeed, don't (KafkaIO, > > >>>>>> > > >>>>> DatastoreIO, > > >>>>> > > >>>>>> BigtableIO etc.) > > >>>>>> > > >>>>>> I'm not familiar with the Flink API, however I'm a bit confused by > > >>>>>> > > >>>>> your > > >>> > > >>>> last paragraph: the Beam programming model is intentionally > > >>>>>> runner-agnostic, so that you can run exactly the same code on > > >>>>>> > > >>>>> different > > >>> > > >>>> runners. > > >>>>>> > > >>>>>> On Wed, Jul 27, 2016 at 4:30 PM Chawla,Sumit < > > [email protected] > > >>>>>> > > >>>>> > > >>>> wrote: > > >>>>>> > > >>>>>> Hi > > >>>>>>> > > >>>>>>> Please suggest me on what is the best way to write a Sink in > > >>>>>>> > > >>>>>> Beam. I > > >>> > > >>>> see > > >>>>> > > >>>>>> that there is a Sink<T> abstract class which is in experimental > > >>>>>>> > > >>>>>> state. > > >>>> > > >>>>> What is the expected outcome of this one? Do we have the api > > >>>>>>> > > >>>>>> frozen, > > >>> > > >>>> or > > >>>> > > >>>>> this could still change? Most of the existing Sink implementations > > >>>>>>> > > >>>>>> like > > >>>>> > > >>>>>> KafkaIO.Write are not using this interface, and instead extends > > >>>>>>> PTransform<PCollection<KV<K, V>>, PDone>. Would these be changed > to > > >>>>>>> > > >>>>>> extend > > >>>>>> > > >>>>>>> Sink<>. > > >>>>>>> > > >>>>>>> > > >>>>>>> My immediate requirement is to run this Sink on FlinkRunner. > Which > > >>>>>>> > > >>>>>> mandates > > >>>>>> > > >>>>>>> that my implementation must also implement SinkFunction<>. In > that > > >>>>>>> > > >>>>>> case, > > >>>>> > > >>>>>> none of the Sink<> methods get called anyway. > > >>>>>>> > > >>>>>>> Regards > > >>>>>>> Sumit Chawla > > >>>>>>> > > >>>>>>> > > >>>>>> > > >>>>> > > >>>> > > >>> > > >> > > > -- > > > Jean-Baptiste Onofré > > > [email protected] > > > http://blog.nanthrax.net > > > Talend - http://www.talend.com > > > > > >
