Hi JB I was referring to CassandraWriteOperation.finalize()
Regards Sumit Chawla On Fri, Jul 29, 2016 at 12:46 PM, Jean-Baptiste Onofré <[email protected]> wrote: > Hi Sumit, > > Not sure I follow you. > > Which resource cleanup are you talking about: > - the close() on the reader (source) ? > - the finishBundle() on the writer (sink) ? > > Regards > JB > > > On 07/29/2016 09:35 PM, Chawla,Sumit wrote: > >> Hi Raghu >> >> My source is going to be unbounded (streaming) with writes to Cassandra. >> Only concern with KafkaIO. write is that producer is closed after every >> bundle, and every bundle may have to open a new connection to Kafka. ( >> Please correct me if i am wrong: I am assuming the bundle to be equivalent >> to Window Size\Mini-batch). >> >> In Jean's implementation i see a different style of resource cleanup. Can >> someone please explain when that finalize method is called? >> >> Regards >> Sumit Chawla >> >> >> On Fri, Jul 29, 2016 at 10:45 AM, Raghu Angadi <[email protected] >> > >> wrote: >> >> It is the preferred pattern I think. Is your source bounded or unbounded >>> (i.e. streaming)? If it is latter, your sink could even be simpler than >>> JB's. e.g. KafkaIO.write() where it just writes the messages to Kafka in >>> processElement(). >>> >>> The pros are pretty clear : runner independent, pure Beam, simpler code. >>> cons : no checkpoint/rollback, I don't know if Flink specific sink >>> provides >>> this either. >>> >>> On Fri, Jul 29, 2016 at 10:18 AM, Chawla,Sumit <[email protected]> >>> wrote: >>> >>> Any more comments on this pattern suggested by Jean? >>>> >>>> Regards >>>> Sumit Chawla >>>> >>>> >>>> On Thu, Jul 28, 2016 at 1:34 PM, Kenneth Knowles <[email protected] >>>> >>>> wrote: >>>> >>>> What I said earlier is not quite accurate, though my advice is the >>>>> >>>> same. >>> >>>> Here are the corrections: >>>>> >>>>> - The Write transform actually has a too-general name, and >>>>> >>>> Write.of(Sink) >>>> >>>>> only really works for finite data. It re-windows into the global window >>>>> >>>> and >>>> >>>>> replaces any triggers. >>>>> - So the special case in the Flink runner actually just _enables_ a >>>>> >>>> (fake) >>>> >>>>> Sink to work. >>>>> >>>>> We should probably rename Write to some more specific name that >>>>> >>>> indicates >>> >>>> the particular strategy, and make it easier for a user to decide >>>>> >>>> whether >>> >>>> that pattern is what they want. And the transform as-is should probably >>>>> reject unbounded inputs. >>>>> >>>>> So you should still proceed with implementation via ParDo and your own >>>>> logic. If you want some logic similar to Write (but with different >>>>> windowing and triggering) then it is a pretty simple composite to >>>>> >>>> derive >>> >>>> something from. >>>>> >>>>> On Thu, Jul 28, 2016 at 12:37 PM, Chawla,Sumit <[email protected] >>>>> >>>> >>>> wrote: >>>>> >>>>> Thanks Jean >>>>>> >>>>>> This is an interesting pattern here. I see that its implemented as >>>>>> PTransform, with constructs ( WriteOperation/Writer) pretty similar >>>>>> >>>>> to >>> >>>> Sink<T> interface. Would love to hear more pros/cons of this pattern >>>>>> >>>>> :) >>>> >>>>> . >>>>> >>>>>> Definitely it gives more control over connection initialization and >>>>>> cleanup. >>>>>> >>>>>> Regards >>>>>> Sumit Chawla >>>>>> >>>>>> >>>>>> On Thu, Jul 28, 2016 at 12:20 PM, Jean-Baptiste Onofré < >>>>>> >>>>> [email protected]> >>>> >>>>> wrote: >>>>>> >>>>>> Hi Sumit, >>>>>>> >>>>>>> I created a PR containing Cassandra IO with a sink: >>>>>>> >>>>>>> https://github.com/apache/incubator-beam/pull/592 >>>>>>> >>>>>>> Maybe it can help you. >>>>>>> >>>>>>> Regards >>>>>>> JB >>>>>>> >>>>>>> >>>>>>> On 07/28/2016 09:00 PM, Chawla,Sumit wrote: >>>>>>> >>>>>>> Hi Kenneth >>>>>>>> >>>>>>>> Thanks for looking into it. I am currently trying to implement >>>>>>>> >>>>>>> Sinks >>> >>>> for >>>>> >>>>>> writing data into Cassandra/Titan DB. My immediate goal is to run >>>>>>>> >>>>>>> it >>>> >>>>> on >>>>> >>>>>> Flink Runner. >>>>>>>> >>>>>>>> >>>>>>>> >>>>>>>> Regards >>>>>>>> Sumit Chawla >>>>>>>> >>>>>>>> >>>>>>>> On Thu, Jul 28, 2016 at 11:56 AM, Kenneth Knowles >>>>>>>> >>>>>>> <[email protected] >>>>>> >>>>>>> >>>>>>>>> wrote: >>>>>>>> >>>>>>>> Hi Sumit, >>>>>>>> >>>>>>>>> >>>>>>>>> I see what has happened here, from that snippet you pasted from >>>>>>>>> >>>>>>>> the >>> >>>> Flink >>>>>> >>>>>>> runner's code [1]. Thanks for looking into it! >>>>>>>>> >>>>>>>>> The Flink runner today appears to reject Write.Bounded transforms >>>>>>>>> >>>>>>>> in >>>> >>>>> streaming mode if the sink is not an instance of >>>>>>>>> >>>>>>>> UnboundedFlinkSink. >>>> >>>>> The >>>>>> >>>>>>> intent of that code, I believe, was to special case >>>>>>>>> >>>>>>>> UnboundedFlinkSink >>>>> >>>>>> to >>>>>> >>>>>>> make it easy to use an existing Flink sink, not to disable all >>>>>>>>> >>>>>>>> other >>>> >>>>> Write >>>>>>>>> transforms. What do you think, Max? >>>>>>>>> >>>>>>>>> Until we fix this issue, you should use ParDo transforms to do >>>>>>>>> >>>>>>>> the >>> >>>> writing. >>>>>>>>> If you can share a little about your sink, we may be able to >>>>>>>>> >>>>>>>> suggest >>>> >>>>> patterns for implementing it. Like Eugene said, the >>>>>>>>> >>>>>>>> Write.of(Sink) >>> >>>> transform is just a specialized pattern of ParDo's, not a Beam >>>>>>>>> >>>>>>>> primitive. >>>>>> >>>>>>> >>>>>>>>> Kenn >>>>>>>>> >>>>>>>>> [1] >>>>>>>>> >>>>>>>>> >>>>>>>>> >>>>>>>>> >>>>>> >>>>> >>>> >>> https://github.com/apache/incubator-beam/blob/master/runners/flink/runner/src/main/java/org/apache/beam/runners/flink/translation/FlinkStreamingTransformTranslators.java#L203 >>> >>>> >>>>>>>>> >>>>>>>>> On Wed, Jul 27, 2016 at 5:57 PM, Eugene Kirpichov < >>>>>>>>> [email protected]> wrote: >>>>>>>>> >>>>>>>>> Thanks Sumit. Looks like your question is, indeed, specific to >>>>>>>>> >>>>>>>> the >>> >>>> Flink >>>>>> >>>>>>> runner, and I'll then defer to somebody familiar with it. >>>>>>>>>> >>>>>>>>>> On Wed, Jul 27, 2016 at 5:25 PM Chawla,Sumit < >>>>>>>>>> >>>>>>>>> [email protected]> >>>>> >>>>>> wrote: >>>>>>>>>> >>>>>>>>>> Thanks a lot Eugene. >>>>>>>>>> >>>>>>>>>>> >>>>>>>>>>> My immediate requirement is to run this Sink on FlinkRunner. >>>>>>>>>>> >>>>>>>>>> Which >>>> >>>>> >>>>>>>>>>>>>> mandates that my implementation must also implement >>>>>>>>>>>>> >>>>>>>>>>>> SinkFunction<>. >>>>> >>>>>> In >>>>>>>>>>> that >>>case, none of the Sink<> methods get called anyway. >>>>>>>>>>> >>>>>>>>>>> I am using FlinkRunner. The Sink implementation that i was >>>>>>>>>>> >>>>>>>>>> writing >>>> >>>>> by >>>>> >>>>>> extending Sink<> class had to implement Flink Specific >>>>>>>>>>> >>>>>>>>>> SinkFunction >>>> >>>>> for >>>>>> >>>>>>> >>>>>>>>>>> the >>>>>>>>>> >>>>>>>>>> correct translation. >>>>>>>>>>> >>>>>>>>>>> private static class WriteSinkStreamingTranslator<T> implements >>>>>>>>>>> >>>>>>>>>>> >>>>>>>>>>> >>>>>>>>>> >>>>>>>>> >>>>>> >>>>> >>>> >>> FlinkStreamingPipelineTranslator.StreamTransformTranslator<Write.Bound<T>> >>> >>>> >>>>>>>>> { >>>>>>>>>> >>>>>>>>>>> >>>>>>>>>>> @Override >>>>>>>>>>> public void translateNode(Write.Bound<T> transform, >>>>>>>>>>> FlinkStreamingTranslationContext context) { >>>>>>>>>>> String name = transform.getName(); >>>>>>>>>>> PValue input = context.getInput(transform); >>>>>>>>>>> >>>>>>>>>>> Sink<T> sink = transform.getSink(); >>>>>>>>>>> if (!(sink instanceof UnboundedFlinkSink)) { >>>>>>>>>>> throw new UnsupportedOperationException("At the time, >>>>>>>>>>> >>>>>>>>>> only >>> >>>> unbounded Flink sinks are supported."); >>>>>>>>>>> } >>>>>>>>>>> >>>>>>>>>>> DataStream<WindowedValue<T>> inputDataSet = >>>>>>>>>>> context.getInputDataStream(input); >>>>>>>>>>> >>>>>>>>>>> inputDataSet.flatMap(new FlatMapFunction<WindowedValue<T>, >>>>>>>>>>> >>>>>>>>>>> Object>() >>>>>>>>>> >>>>>>>>> >>>>>>>>> { >>>>>>>>>> >>>>>>>>>> @Override >>>>>>>>>>> public void flatMap(WindowedValue<T> value, >>>>>>>>>>> >>>>>>>>>> Collector<Object> >>>>> >>>>>> out) throws Exception { >>>>>>>>>>> out.collect(value.getValue()); >>>>>>>>>>> } >>>>>>>>>>> }).addSink(((UnboundedFlinkSink<Object>) >>>>>>>>>>> sink).getFlinkSource()).name(name); >>>>>>>>>>> } >>>>>>>>>>> } >>>>>>>>>>> >>>>>>>>>>> >>>>>>>>>>> >>>>>>>>>>> >>>>>>>>>>> Regards >>>>>>>>>>> Sumit Chawla >>>>>>>>>>> >>>>>>>>>>> >>>>>>>>>>> On Wed, Jul 27, 2016 at 4:53 PM, Eugene Kirpichov < >>>>>>>>>>> [email protected]> wrote: >>>>>>>>>>> >>>>>>>>>>> Hi Sumit, >>>>>>>>>>> >>>>>>>>>>>> >>>>>>>>>>>> All reusable parts of a pipeline, including connectors to >>>>>>>>>>>> >>>>>>>>>>> storage >>>> >>>>> >>>>>>>>>>>> systems, >>>>>>>>>>> >>>>>>>>>>> should be packaged as PTransform's. >>>>>>>>>>>> >>>>>>>>>>>> Sink is an advanced API that you can use under the hood to >>>>>>>>>>>> >>>>>>>>>>> implement >>>>> >>>>>> >>>>>>>>>>>> the >>>>>>>>>>> >>>>>>>>>> >>>>>>>>>> transform, if this particular connector benefits from this API >>>>>>>>>>> >>>>>>>>>> - >>> >>>> but >>>>> >>>>>> >>>>>>>>>>>> you >>>>>>>>>>> >>>>>>>>>> >>>>>>>>>> don't have to, and many connectors indeed don't need it, and >>>>>>>>>>> >>>>>>>>>> are >>> >>>> >>>>>>>>>>>> simpler >>>>>>>>>>> >>>>>>>>>> >>>>>>>>>> to >>>>>>>>>>> >>>>>>>>>>> implement just as wrappers around a couple of ParDo's writing >>>>>>>>>>>> >>>>>>>>>>> the >>>> >>>>> >>>>>>>>>>>> data. >>>>>>>>>>> >>>>>>>>>> >>>>>>>>> >>>>>>>>>> Even if the connector is implemented using a Sink, packaging >>>>>>>>>>>> >>>>>>>>>>> the >>> >>>> >>>>>>>>>>>> connector >>>>>>>>>>> >>>>>>>>>>> as a PTransform is important because it's easier to apply in a >>>>>>>>>>>> >>>>>>>>>>>> pipeline >>>>>>>>>>> >>>>>>>>>> >>>>>>>>> and >>>>>>>>>> >>>>>>>>>>> >>>>>>>>>>> because it's more future-proof (the author of the connector >>>>>>>>>>>> >>>>>>>>>>> may >>> >>>> later >>>>>> >>>>>>> change it to use something else rather than Sink under the >>>>>>>>>>>> >>>>>>>>>>> hood >>> >>>> >>>>>>>>>>>> without >>>>>>>>>>> >>>>>>>>>> >>>>>>>>> breaking existing users). >>>>>>>>>> >>>>>>>>>>> >>>>>>>>>>>> Sink is, currently, useful in the following case: >>>>>>>>>>>> - You're writing a bounded amount of data (we do not yet have >>>>>>>>>>>> >>>>>>>>>>> an >>> >>>> >>>>>>>>>>>> unbounded >>>>>>>>>>> >>>>>>>>>>> Sink analogue) >>>>>>>>>>>> - The location you're writing to is known at pipeline >>>>>>>>>>>> >>>>>>>>>>> construction >>>> >>>>> >>>>>>>>>>>> time, >>>>>>>>>>> >>>>>>>>>> >>>>>>>>>> and does not depend on the data itself (support for >>>>>>>>>>> >>>>>>>>>> "data-dependent" >>>>> >>>>>> >>>>>>>>>>>> sinks >>>>>>>>>>> >>>>>>>>>>> is on the radar https://issues.apache.org/jira/browse/BEAM-92 >>>>>>>>>>>> >>>>>>>>>>> ) >>> >>>> - The storage system you're writing to has a distinct >>>>>>>>>>>> >>>>>>>>>>>> "initialization" >>>>>>>>>>> >>>>>>>>>> >>>>>>>>> and >>>>>>>>>> >>>>>>>>>>> >>>>>>>>>>> "finalization" step, allowing the write operation to appear >>>>>>>>>>>> >>>>>>>>>>> atomic >>>> >>>>> >>>>>>>>>>>> (either >>>>>>>>>>> >>>>>>>>>>> all data is written or none). This mostly applies to files >>>>>>>>>>>> >>>>>>>>>>> (where >>>> >>>>> >>>>>>>>>>>> writing >>>>>>>>>>> >>>>>>>>>> >>>>>>>>>> is done by first writing to a temporary directory, and then >>>>>>>>>>> >>>>>>>>>> renaming >>>>> >>>>>> >>>>>>>>>>>> all >>>>>>>>>>> >>>>>>>>>> >>>>>>>>>> files to their final location), but there can be other cases >>>>>>>>>>> >>>>>>>>>> too. >>> >>>> >>>>>>>>>>>> Here's an example GCP connector using the Sink API under the >>>>>>>>>>>> >>>>>>>>>>> hood: >>>> >>>>> >>>>>>>>>>>> >>>>>>>>>>>> >>>>>>>>>>>> >>>>>>>>>>> >>>>>>>>>> >>>>>>>>> >>>>>> >>>>> >>>> >>> https://github.com/apache/incubator-beam/blob/master/sdks/java/io/google-cloud-platform/src/main/java/org/apache/beam/sdk/io/gcp/bigquery/BigQueryIO.java#L1797 >>> >>>> >>>>>>>>> Most other non-file-based connectors, indeed, don't (KafkaIO, >>>>>>>>>> >>>>>>>>>>> >>>>>>>>>>>> DatastoreIO, >>>>>>>>>>> >>>>>>>>>>> BigtableIO etc.) >>>>>>>>>>>> >>>>>>>>>>>> I'm not familiar with the Flink API, however I'm a bit >>>>>>>>>>>> >>>>>>>>>>> confused >>> >>>> by >>>> >>>>> >>>>>>>>>>>> your >>>>>>>>>>> >>>>>>>>>> >>>>>>>>> last paragraph: the Beam programming model is intentionally >>>>>>>>>> >>>>>>>>>>> runner-agnostic, so that you can run exactly the same code on >>>>>>>>>>>> >>>>>>>>>>>> different >>>>>>>>>>> >>>>>>>>>> >>>>>>>>> runners. >>>>>>>>>> >>>>>>>>>>> >>>>>>>>>>>> On Wed, Jul 27, 2016 at 4:30 PM Chawla,Sumit < >>>>>>>>>>>> >>>>>>>>>>> [email protected] >>>>>> >>>>>>> >>>>>>>>>>>> >>>>>>>>>>> wrote: >>>>>>>>>> >>>>>>>>>>> >>>>>>>>>>>> Hi >>>>>>>>>>>> >>>>>>>>>>>>> >>>>>>>>>>>>> Please suggest me on what is the best way to write a Sink in >>>>>>>>>>>>> >>>>>>>>>>>>> Beam. I >>>>>>>>>>>> >>>>>>>>>>> >>>>>>>>> see >>>>>>>>>> >>>>>>>>>>> >>>>>>>>>>> that there is a Sink<T> abstract class which is in >>>>>>>>>>>> >>>>>>>>>>> experimental >>> >>>> >>>>>>>>>>>>> state. >>>>>>>>>>>> >>>>>>>>>>> >>>>>>>>>> What is the expected outcome of this one? Do we have the api >>>>>>>>>>> >>>>>>>>>>>> >>>>>>>>>>>>> frozen, >>>>>>>>>>>> >>>>>>>>>>> >>>>>>>>> or >>>>>>>>>> >>>>>>>>>> this could still change? Most of the existing Sink >>>>>>>>>>> >>>>>>>>>> implementations >>>> >>>>> >>>>>>>>>>>>> like >>>>>>>>>>>> >>>>>>>>>>> >>>>>>>>>>> KafkaIO.Write are not using this interface, and instead >>>>>>>>>>>> >>>>>>>>>>> extends >>> >>>> PTransform<PCollection<KV<K, V>>, PDone>. Would these be >>>>>>>>>>>>> >>>>>>>>>>>> changed >>>> >>>>> to >>>>> >>>>>> >>>>>>>>>>>>> extend >>>>>>>>>>>> >>>>>>>>>>>> Sink<>. >>>>>>>>>>>>> >>>>>>>>>>>>> >>>>>>>>>>>>> My immediate requirement is to run this Sink on FlinkRunner. >>>>>>>>>>>>> >>>>>>>>>>>> Which >>>>> >>>>>> >>>>>>>>>>>>> mandates >>>>>>>>>>>> >>>>>>>>>>>> that my implementation must also implement SinkFunction<>. >>>>>>>>>>>>> >>>>>>>>>>>> In >>> >>>> that >>>>> >>>>>> >>>>>>>>>>>>> case, >>>>>>>>>>>> >>>>>>>>>>> >>>>>>>>>>> none of the Sink<> methods get called anyway. >>>>>>>>>>>> >>>>>>>>>>>>> >>>>>>>>>>>>> Regards >>>>>>>>>>>>> Sumit Chawla >>>>>>>>>>>>> >>>>>>>>>>>>> >>>>>>>>>>>>> >>>>>>>>>>>> >>>>>>>>>>> >>>>>>>>>> >>>>>>>>> >>>>>>>> -- >>>>>>> Jean-Baptiste Onofré >>>>>>> [email protected] >>>>>>> http://blog.nanthrax.net >>>>>>> Talend - http://www.talend.com >>>>>>> >>>>>>> >>>>>> >>>>> >>>> >>> >> > -- > Jean-Baptiste Onofré > [email protected] > http://blog.nanthrax.net > Talend - http://www.talend.com >
