Hello, Thanks for the notes both Dan and Eugene, and for taking the time to do the presentation and answer our questions.
I mentioned the ongoing work on dynamic scaling on Flink because I suppose that it will address dynamic rebalancing eventually (there are multiple changes going on for dynamic scaling). https://docs.google.com/document/d/1G1OS1z3xEBOrYD4wSu-LuBCyPUWyFd9l3T9WyssQ63w/edit#heading=h.2suhu1fjxmp4 https://lists.apache.org/[email protected]:lte=1M:FLIP-8 Anyway I am far from an expert on flink, but probably the flink guys can give their opinion about this and refer to a more precise document that the ones I mentioned.. Thanks again, Ismaël On Fri, Aug 19, 2016 at 8:52 PM, Jean-Baptiste Onofré <[email protected]> wrote: > Great summary Eugene and Dan. > > And thanks again for the details, explanation, and discussion. > > Regards > JB > > > On 08/19/2016 08:16 PM, Eugene Kirpichov wrote: > >> Thanks for attending, everybody! >> >> Here are meeting notes (thanks Dan!). >> >> Q: Will SplittableDoFn enable better repartitioning of the input/output >> data? >> A: Not really; repartitioning is orthogonal to SDF. >> >> Current Source API suffers from lack of composition and scalability >> because >> we treat sources too much as metadata, not enough as data. >> >> Q(slide with transform expansion): who does the "magic"? >> A: The runner. Checkpointing and dynamically splitting restrictions will >> require collaboration with the runner. >> >> Q: How does the runner interact with the DoFn to control the restrictions? >> Is it related to the centralized job tracker etc.? >> A: RestrictionTracker is a simple helper object, that exists purely on the >> worker while executing a single partition, and interacts with the worker >> harness part of the runner. Not to be confused with the centralized job >> tracker (master) - completely unrelated. Worker harness, of course, >> interacts with the master in some relevant ways (e.g. Dataflow master can >> tell "you're a straggler, you should split"). >> >> Q: Is this a new DoFn subclass, or how will this integrate with the >> existing code? >> A: It's a feature of reflection-based DoFn (https://s.apache.org/a-new-do >> fn) >> - just another optional parameter of type RestrictionTracker to >> processElement() which is dynamically bound via reflection, so fully >> backward/forward compatible, and looks to users like a regular DoFn. >> >> Q: why is fractionClaimed a double? >> A: It's a number in 0..1. Job-wide magic (e.g. autoscaling, dynamic >> rebalancing) requires a uniform way to represent progress through >> different >> sources. >> >> Q: Spark runner is microbatch-based, so this seems to map well onto >> checkpoint/resume, right? >> A: Yes; actually the Dataflow runner is, at a worker level, also >> microbatch-based. The way SDF interacts with a runner will be very similar >> to how a Bounded/UnboundedSource interacts with a runner. >> >> Q: Using SDF, what would be the "packaging" of the IO? >> A: Same as currently: package IO's as PTransforms and their implementation >> under the hood can be anything: Source, simple ParDo's, SDF, etc. E.g. >> Datastore was recently refactored from BoundedSource to ParDo (ended up >> simpler and more scalable), transparently to users. >> >> Q: What's the timeline; what to do with the IOs currently in development? >> A: Timeline is O(months). Keep doing what you're doing and working on top >> of Source APIs when necessary and simple ParDo's otherwise. >> >> Q: What's the impact for the runner writers? >> A: Tentatively expected that most of the code for running an SDF will be >> common to runners, with some amount of per-runner glue code, just like >> GBK/windowing/triggering. Impact on Dataflow runner is larger since it >> supports dynamic rebalancing in batch mode and this is the hardest part, >> but for other runners shouldn't be too hard. >> >> JB: Talend has people who can help with this: e.g. help integrate into >> Spark runner, refactor IOs etc. Amit also willing to chat about supporting >> SDF in Spark runner. >> >> Ismael: There's a Flink proposal about dynamic rebalancing. Ismael will >> send a link. >> >> >> On Fri, Aug 19, 2016 at 7:21 AM Jean-Baptiste Onofré <[email protected]> >> wrote: >> >> Hi Eugene, >>> >>> thanks for the reminder. >>> >>> Just to prepare some topics for the call, please find some points: >>> >>> 1. Using SDF, what would be the "packaging" of the IO ? It sounds to me >>> that we can keep the IO packaging style (using with* setters for the IO >>> configuration) and replace PTransform, Source, Reader, ... directly with >>> SDF. Correct ? >>> >>> 2. What's your plan in term of release to include SDF ? We have several >>> IOs in preparation and I wonder if it's worth to start to use the new >>> SDF API or not. >>> >>> 3. What's the impact for the runner writers ? The runners will have to >>> support SDF, that could be tricky depending of the execution engine. In >>> the worst case where the runner can't fully support SDF, does it mean >>> that most of our IOs will be useless ? >>> >>> Just my dumb topics ;) >>> >>> Thanks, >>> See you at 8am ! >>> >>> Regards >>> JB >>> >>> On 08/19/2016 02:29 AM, Eugene Kirpichov wrote: >>> >>>> Hello everybody, >>>> >>>> Just a reminder: >>>> >>>> The meeting is happening tomorrow - Friday Aug 19th, 8am-9am PST, to >>>> join >>>> the call go to >>>> https://hangouts.google.com/hangouts/_/google.com/splittabledofn . >>>> I intend to go over the proposed design and then have a free-form >>>> discussion. >>>> >>>> Please have a skim through the proposal doc: https://s.apache.org/ >>>> splittable-do-fn >>>> I also made some slides that are basically a trimmed-down version of the >>>> doc to use as a guide when conducting the meeting, >>>> >>>> https://docs.google.com/presentation/d/1DfCp9VRLH-5Ap5nbz047 >>> Fiv3NfjrywJVSdef5dRY4aE/edit?usp=sharing >>> >>>> . >>>> >>>> I will post notes from the meeting on this thread afterwards. >>>> >>>> Thanks, looking forward. >>>> >>>> On Fri, Aug 12, 2016 at 5:35 PM Dan Halperin >>>> <[email protected] >>>> >>>> wrote: >>>> >>>> This is pretty cool! I'll be there too. (unless the hangout gets too >>>>> >>>> full >>> >>>> -- if so, I'll drop out in favor of others who aren't lucky enough to >>>>> >>>> get >>> >>>> to talk to Eugene all the time.) >>>>> >>>>> On Fri, Aug 12, 2016 at 4:03 PM, Andrew Psaltis < >>>>> >>>> [email protected]> >>> >>>> wrote: >>>>> >>>>> +1 I'll join >>>>>> >>>>>> On Friday, August 12, 2016, Aparup Banerjee (apbanerj) < >>>>>> >>>>> [email protected] >>>>> >>>>>> >>>>>>> wrote: >>>>>> >>>>>> + 1, me2 >>>>>>> >>>>>>> >>>>>>> >>>>>>> >>>>>>> On 8/12/16, 9:27 AM, "Amit Sela" <[email protected] >>>>>>> >>>>>> <javascript:;>> >>> >>>> wrote: >>>>>>> >>>>>>> +1 as in I'll join ;-) >>>>>>>> >>>>>>>> On Fri, Aug 12, 2016, 19:14 Eugene Kirpichov >>>>>>>> >>>>>>> <[email protected] >>>>>> >>>>>>> >>>>>>>> wrote: >>>>>>>> >>>>>>>> Sounds good, thanks! >>>>>>>>> Then Friday Aug 19th it is, 8am-9am PST, >>>>>>>>> https://staging.talkgadget.google.com/hangouts/_/google. >>>>>>>>> >>>>>>>> com/splittabledofn >>>>>>> >>>>>>>> >>>>>>>>> On Thu, Aug 11, 2016 at 11:12 PM Jean-Baptiste Onofré < >>>>>>>>> >>>>>>>> [email protected] >>>>>> >>>>>>> <javascript:;>> >>>>>>> >>>>>>>> wrote: >>>>>>>>> >>>>>>>>> Hi >>>>>>>>>> >>>>>>>>>> Unfortunately I will be in Ireland on August 15th. What about >>>>>>>>>> >>>>>>>>> Friday >>>>> >>>>>> 19th ? >>>>>>>>> >>>>>>>>>> >>>>>>>>>> Regards >>>>>>>>>> JB >>>>>>>>>> >>>>>>>>>> >>>>>>>>>> >>>>>>>>>> On Aug 11, 2016, 23:22, at 23:22, Eugene Kirpichov >>>>>>>>>> <[email protected]> wrote: >>>>>>>>>> >>>>>>>>>>> Hi JB, >>>>>>>>>>> >>>>>>>>>>> Sounds great, does the suggested time over videoconference work >>>>>>>>>>> >>>>>>>>>> for >>>>> >>>>>> you? >>>>>>>>>>> >>>>>>>>>>> On Thu, Aug 11, 2016 at 11:59 AM Jean-Baptiste Onofré < >>>>>>>>>>> >>>>>>>>>> [email protected] <javascript:;>> >>>>>>> >>>>>>>> wrote: >>>>>>>>>>> >>>>>>>>>>> Hi Eugene >>>>>>>>>>>> >>>>>>>>>>>> May we talk together next week ? I like the proposal. I would >>>>>>>>>>>> >>>>>>>>>>> just >>>>>> >>>>>>> need >>>>>>>>>>> >>>>>>>>>>>> some details for my understanding. >>>>>>>>>>>> >>>>>>>>>>>> Thanks >>>>>>>>>>>> Regards >>>>>>>>>>>> JB >>>>>>>>>>>> >>>>>>>>>>>> >>>>>>>>>>>> >>>>>>>>>>>> On Aug 11, 2016, 19:46, at 19:46, Eugene Kirpichov >>>>>>>>>>>> <[email protected]> wrote: >>>>>>>>>>>> >>>>>>>>>>>>> Hi JB, >>>>>>>>>>>>> >>>>>>>>>>>>> What are your thoughts on this? >>>>>>>>>>>>> >>>>>>>>>>>>> I'm also thinking of having a virtual meeting to explain more >>>>>>>>>>>>> >>>>>>>>>>>> about >>>>>>> >>>>>>>> this >>>>>>>>>>>>> proposal if necessary, since I understand it is a lot to >>>>>>>>>>>>> >>>>>>>>>>>> digest. >>>>> >>>>>> >>>>>>>>>>>>> How about: Monday Aug 15, 8am-9am Pacific time, over Hangouts? >>>>>>>>>>>>> (link: >>>>>>>>>>>>> >>>>>>>>>>>>> >>>>>>>>>>>> >>>>>>>>>>> >>>>>>>>>> https://staging.talkgadget.google.com/hangouts/_/google. >>>>>>>>> >>>>>>>> com/splittabledofn >>>>>>> >>>>>>>> - >>>>>>>>>>>>> I confirmed that it can be joined without being logged into a >>>>>>>>>>>>> >>>>>>>>>>>> Google >>>>>>> >>>>>>>> account) >>>>>>>>>>>>> >>>>>>>>>>>>> Who'd be interested in attending, and does this time/date work >>>>>>>>>>>>> >>>>>>>>>>>> for >>>>>> >>>>>>> people? >>>>>>>>>>>>> >>>>>>>>>>>>> On Fri, Aug 5, 2016 at 10:48 AM Eugene Kirpichov >>>>>>>>>>>>> >>>>>>>>>>>> <[email protected] <javascript:;>> >>>>>>>>>>> >>>>>>>>>>>> wrote: >>>>>>>>>>>>> >>>>>>>>>>>>> Hi JB, thanks for reading and for your comments! >>>>>>>>>>>>>> >>>>>>>>>>>>>> It sounds like you are concerned about continued support for >>>>>>>>>>>>>> >>>>>>>>>>>>> existing >>>>>>>>>>> >>>>>>>>>>>> IO's >>>>>>>>>>>>> >>>>>>>>>>>>>> people have developed, and about backward compatibility? >>>>>>>>>>>>>> >>>>>>>>>>>>>> We do not need to remove the Source API, and all existing >>>>>>>>>>>>>> >>>>>>>>>>>>> Source-based >>>>>>>>>>>>> >>>>>>>>>>>>>> connectors will continue to work [though the document >>>>>>>>>>>>>> >>>>>>>>>>>>> proposes >>>>> >>>>>> at >>>>>>> >>>>>>>> some >>>>>>>>>>>>> >>>>>>>>>>>>>> point to make Read.from(Source) to translate to a wrapper >>>>>>>>>>>>>> >>>>>>>>>>>>> SDF >>>>> >>>>>> under >>>>>>>>>>> >>>>>>>>>>>> the >>>>>>>>>>>>> >>>>>>>>>>>>>> hood, to exercise the feature more and to make sure that it >>>>>>>>>>>>>> >>>>>>>>>>>>> is >>>>> >>>>>> strictly >>>>>>>>>>>>> >>>>>>>>>>>>>> more powerful - but this is an optional implementation >>>>>>>>>>>>>> >>>>>>>>>>>>> detail]. >>>>>> >>>>>>> >>>>>>>>>>>>>> Perhaps the document phrases this too strongly - "replacing >>>>>>>>>>>>>> >>>>>>>>>>>>> the >>>>>> >>>>>>> Source >>>>>>>>>>>>> >>>>>>>>>>>>>> API": a better phrasing would be "introducing a new API so >>>>>>>>>>>>>> >>>>>>>>>>>>> powerful >>>>>>>>>>> >>>>>>>>>>>> and >>>>>>>>>>>>> >>>>>>>>>>>>>> easy-to-use that hopefully people will choose it over the >>>>>>>>>>>>>> >>>>>>>>>>>>> Source >>>>>> >>>>>>> API >>>>>>>>>>> >>>>>>>>>>>> all >>>>>>>>>>>>> >>>>>>>>>>>>>> the time, even though they don't have to" :) And we can >>>>>>>>>>>>>> >>>>>>>>>>>>> discuss >>>>>> >>>>>>> whether or >>>>>>>>>>>>> >>>>>>>>>>>>>> not to actually deprecate/remove the Source API at some >>>>>>>>>>>>>> >>>>>>>>>>>>> point >>>>> >>>>>> down >>>>>>> >>>>>>>> the >>>>>>>>>>>>> >>>>>>>>>>>>>> road, once it becomes clear whether this is the case or not. >>>>>>>>>>>>>> >>>>>>>>>>>>>> To give more context: this proposal came out of discussions >>>>>>>>>>>>>> >>>>>>>>>>>>> within >>>>>>> >>>>>>>> the SDK >>>>>>>>>>>>> >>>>>>>>>>>>>> team over the past ~1.5 years, before the Beam project >>>>>>>>>>>>>> >>>>>>>>>>>>> existed, >>>>>> >>>>>>> on >>>>>>> >>>>>>>> how to >>>>>>>>>>>>> >>>>>>>>>>>>>> make major improvements to the Source API; perhaps it will >>>>>>>>>>>>>> >>>>>>>>>>>>> clarify >>>>>>> >>>>>>>> things >>>>>>>>>>>>> >>>>>>>>>>>>>> if I give a history of the ideas discussed: >>>>>>>>>>>>>> - The first idea was to introduce a >>>>>>>>>>>>>> >>>>>>>>>>>>> Read.from(PCollection<Source>) >>>>>>> >>>>>>>> transform while keeping the Source API intact - this, given >>>>>>>>>>>>>> >>>>>>>>>>>>> appropriate >>>>>>>>>>>>> >>>>>>>>>>>>>> implementation, would solve most of the scalability and >>>>>>>>>>>>>> >>>>>>>>>>>>> composability >>>>>>>>>>> >>>>>>>>>>>> issues of IO's. Then most connectors would look like : >>>>>>>>>>>>>> >>>>>>>>>>>>> ParDo<A, >>>>>> >>>>>>> Source<B>> >>>>>>>>>>>>> >>>>>>>>>>>>>> + Read.from(). >>>>>>>>>>>>>> - Then we figured that the Source class is an unnecessary >>>>>>>>>>>>>> >>>>>>>>>>>>> abstraction, as >>>>>>>>>>>>> >>>>>>>>>>>>>> it simply holds data. What if we only had a Reader<S, B> >>>>>>>>>>>>>> >>>>>>>>>>>>> class >>>>> >>>>>> where >>>>>>>>>>> >>>>>>>>>>>> S is >>>>>>>>>>>>> >>>>>>>>>>>>>> the source type and B the output type? Then connectors would >>>>>>>>>>>>>> >>>>>>>>>>>>> be >>>>>> >>>>>>> something >>>>>>>>>>>>> >>>>>>>>>>>>>> like: ParDo<A, S> + hypothetical Read.using(Reader<S, B>). >>>>>>>>>>>>>> - Then somebody remarked that some of the features of Source >>>>>>>>>>>>>> >>>>>>>>>>>>> are >>>>>> >>>>>>> useful to >>>>>>>>>>>>> >>>>>>>>>>>>>> ParDo's as well: e.g. ability to report progress when >>>>>>>>>>>>>> >>>>>>>>>>>>> processing a >>>>>>> >>>>>>>> very >>>>>>>>>>>>> >>>>>>>>>>>>>> heavy element, or ability to produce very large output in >>>>>>>>>>>>>> >>>>>>>>>>>>> parallel. >>>>>>>>>>> >>>>>>>>>>>> - The two previous bullets were already hinting that the >>>>>>>>>>>>>> >>>>>>>>>>>>> Read.using() >>>>>>>>>>> >>>>>>>>>>>> primitive might not be so special: it just takes S and >>>>>>>>>>>>>> >>>>>>>>>>>>> produces >>>>>> >>>>>>> B: >>>>>>> >>>>>>>> isn't >>>>>>>>>>>>> >>>>>>>>>>>>>> that what a ParDo does, plus some source magic, minus the >>>>>>>>>>>>>> >>>>>>>>>>>>> convenience >>>>>>>>>>> >>>>>>>>>>>> of >>>>>>>>>>>>> >>>>>>>>>>>>>> c.output() vs. the start/advance() state machine? >>>>>>>>>>>>>> - At this point it became clear that we should explore >>>>>>>>>>>>>> >>>>>>>>>>>>> unifying >>>>>> >>>>>>> sources >>>>>>>>>>>>> >>>>>>>>>>>>>> and ParDo's, in particular: can we bring the magic of >>>>>>>>>>>>>> >>>>>>>>>>>>> sources >>>>> >>>>>> to >>>>>> >>>>>>> ParDo's >>>>>>>>>>>>> >>>>>>>>>>>>>> but without the limitations and coding inconveniences? And >>>>>>>>>>>>>> >>>>>>>>>>>>> this >>>>>> >>>>>>> is >>>>>>> >>>>>>>> how >>>>>>>>>>>>> >>>>>>>>>>>>>> SplittableDoFn was born: bringing source magic to a DoFn by >>>>>>>>>>>>>> >>>>>>>>>>>>> providing >>>>>>>>>>> >>>>>>>>>>>> a >>>>>>>>>>>>> >>>>>>>>>>>>>> RangeTracker. >>>>>>>>>>>>>> - Once the idea of "splittable DoFn's" was born, it became >>>>>>>>>>>>>> >>>>>>>>>>>>> clear >>>>>> >>>>>>> that >>>>>>>>>>> >>>>>>>>>>>> it >>>>>>>>>>>>> >>>>>>>>>>>>>> is strictly more general than sources; at least, in the >>>>>>>>>>>>>> >>>>>>>>>>>>> respect >>>>>> >>>>>>> that >>>>>>>>>>> >>>>>>>>>>>> sources have to produce output, while DoFn's don't: an SDF >>>>>>>>>>>>>> >>>>>>>>>>>>> may >>>>> >>>>>> very >>>>>>>>>>> >>>>>>>>>>>> well >>>>>>>>>>>>> >>>>>>>>>>>>>> produce no output at all, and simply perform a side effect >>>>>>>>>>>>>> >>>>>>>>>>>>> in >>>>> >>>>>> a >>>>>> >>>>>>> parallel/resumable way. >>>>>>>>>>>>>> - Then there were countless hours of discussions on unifying >>>>>>>>>>>>>> >>>>>>>>>>>>> the >>>>>> >>>>>>> bounded/unbounded cases, on the particulars of RangeTracker >>>>>>>>>>>>>> >>>>>>>>>>>>> APIs >>>>>> >>>>>>> reconciling parallelization and checkpointing, what the >>>>>>>>>>>>>> >>>>>>>>>>>>> relation >>>>>> >>>>>>> between >>>>>>>>>>>>> >>>>>>>>>>>>>> SDF and DF should be, etc. They culminated in the current >>>>>>>>>>>>>> >>>>>>>>>>>>> proposal. >>>>>>>>>>> >>>>>>>>>>>> The >>>>>>>>>>>>> >>>>>>>>>>>>>> proposal comes at a time when a couple of key ingredients >>>>>>>>>>>>>> >>>>>>>>>>>>> are >>>>> >>>>>> (almost) >>>>>>>>>>>>> >>>>>>>>>>>>>> ready: NewDoFn to make SDF look like a regular DoFn, and the >>>>>>>>>>>>>> >>>>>>>>>>>>> State/Timers >>>>>>>>>>>>> >>>>>>>>>>>>>> proposal to enable unbounded work per element. >>>>>>>>>>>>>> >>>>>>>>>>>>>> To put it shortly: >>>>>>>>>>>>>> - Yes, we will support existing Source connectors, and will >>>>>>>>>>>>>> >>>>>>>>>>>>> support >>>>>>>>>>> >>>>>>>>>>>> writing new ones, possibly forever. There is no interference >>>>>>>>>>>>>> >>>>>>>>>>>>> with >>>>>>> >>>>>>>> current >>>>>>>>>>>>> >>>>>>>>>>>>>> users of Source. >>>>>>>>>>>>>> - The new API is an attempt to improve the Source API, taken >>>>>>>>>>>>>> >>>>>>>>>>>>> to >>>>>> >>>>>>> its >>>>>>>>>>> >>>>>>>>>>>> logical limit where it turns out that users' goals can be >>>>>>>>>>>>>> >>>>>>>>>>>>> accomplished >>>>>>>>>>>>> >>>>>>>>>>>>>> easier and more generically entirely within ParDo's. >>>>>>>>>>>>>> >>>>>>>>>>>>>> Let me know what you think, and thanks again! >>>>>>>>>>>>>> >>>>>>>>>>>>>> On Fri, Aug 5, 2016 at 2:39 AM Jean-Baptiste Onofré >>>>>>>>>>>>>> >>>>>>>>>>>>> <[email protected] <javascript:;>> >>>>>>>>>>> >>>>>>>>>>>> wrote: >>>>>>>>>>>>>> >>>>>>>>>>>>>> Hi Eugene, >>>>>>>>>>>>>>> >>>>>>>>>>>>>>> Just a question: why is it in DoFn and note an improvement >>>>>>>>>>>>>>> >>>>>>>>>>>>>> of >>>>> >>>>>> Source >>>>>>>>>>> >>>>>>>>>>>> ? >>>>>>>>>>>>> >>>>>>>>>>>>>> >>>>>>>>>>>>>>> If I understand correctly, it means that we will have to >>>>>>>>>>>>>>> >>>>>>>>>>>>>> refactore >>>>>>>>>>> >>>>>>>>>>>> all >>>>>>>>>>>>> >>>>>>>>>>>>>> existing IO: basically, what you propose is to remove all >>>>>>>>>>>>>>> >>>>>>>>>>>>>> Source >>>>>>> >>>>>>>> to >>>>>>>>>>> >>>>>>>>>>>> replace with NewDoFn. >>>>>>>>>>>>>>> >>>>>>>>>>>>>>> I'm concern with this approach, especially in term of >>>>>>>>>>>>>>> >>>>>>>>>>>>>> timing: >>>>> >>>>>> clearly, >>>>>>>>>>>>> >>>>>>>>>>>>>> the IO is the area where we have to move forward in Beam as >>>>>>>>>>>>>>> >>>>>>>>>>>>>> it >>>>>> >>>>>>> will >>>>>>>>>>> >>>>>>>>>>>> allow new users to start in their projects. >>>>>>>>>>>>>>> So, we started to bring new IOs: Kafka, JMS, Cassandra, >>>>>>>>>>>>>>> >>>>>>>>>>>>>> MongoDB, >>>>>>> >>>>>>>> JDBC, >>>>>>>>>>>>> >>>>>>>>>>>>>> ... and some people started to learn the IO API >>>>>>>>>>>>>>> >>>>>>>>>>>>>> (Bounded/Unbouded >>>>>>> >>>>>>>> source, etc). >>>>>>>>>>>>>>> >>>>>>>>>>>>>>> I think it would make more sense to enhance the IO API >>>>>>>>>>>>>>> >>>>>>>>>>>>>> (Source) >>>>>> >>>>>>> instead >>>>>>>>>>>>> >>>>>>>>>>>>>> of introducing a NewDoFn. >>>>>>>>>>>>>>> >>>>>>>>>>>>>>> What are your thoughts for IO writer like me ? ;) >>>>>>>>>>>>>>> >>>>>>>>>>>>>>> Regards >>>>>>>>>>>>>>> JB >>>>>>>>>>>>>>> >>>>>>>>>>>>>>> On 08/04/2016 07:45 PM, Eugene Kirpichov wrote: >>>>>>>>>>>>>>> >>>>>>>>>>>>>>>> Hello Beam community, >>>>>>>>>>>>>>>> >>>>>>>>>>>>>>>> We (myself, Daniel Mills and Robert Bradshaw) would like >>>>>>>>>>>>>>>> >>>>>>>>>>>>>>> to >>>>> >>>>>> propose >>>>>>>>>>>>> >>>>>>>>>>>>>> "Splittable DoFn" - a major generalization of DoFn, which >>>>>>>>>>>>>>>> >>>>>>>>>>>>>>> allows >>>>>>>>>>> >>>>>>>>>>>> processing >>>>>>>>>>>>>>> >>>>>>>>>>>>>>>> of a single element to be non-monolithic, i.e. >>>>>>>>>>>>>>>> >>>>>>>>>>>>>>> checkpointable >>>>>> >>>>>>> and >>>>>>>>>>> >>>>>>>>>>>> parallelizable, as well as doing an unbounded amount of >>>>>>>>>>>>>>>> >>>>>>>>>>>>>>> work >>>>>> >>>>>>> per >>>>>>>>>>> >>>>>>>>>>>> element. >>>>>>>>>>>>>>> >>>>>>>>>>>>>>>> >>>>>>>>>>>>>>>> This allows effectively replacing the current >>>>>>>>>>>>>>>> >>>>>>>>>>>>>>> Bounded/UnboundedSource >>>>>>>>>>>>> >>>>>>>>>>>>>> APIs >>>>>>>>>>>>>>> >>>>>>>>>>>>>>>> with DoFn's that are much easier to code, more scalable >>>>>>>>>>>>>>>> >>>>>>>>>>>>>>> and >>>>> >>>>>> composable >>>>>>>>>>>>> >>>>>>>>>>>>>> with >>>>>>>>>>>>>>> >>>>>>>>>>>>>>>> the rest of the Beam programming model, and enables many >>>>>>>>>>>>>>>> >>>>>>>>>>>>>>> use >>>>>> >>>>>>> cases >>>>>>>>>>> >>>>>>>>>>>> that >>>>>>>>>>>>> >>>>>>>>>>>>>> were previously difficult or impossible, as well as some >>>>>>>>>>>>>>>> >>>>>>>>>>>>>>> non-obvious new >>>>>>>>>>>>> >>>>>>>>>>>>>> use cases. >>>>>>>>>>>>>>>> >>>>>>>>>>>>>>>> This proposal has been mentioned before in JIRA [BEAM-65] >>>>>>>>>>>>>>>> >>>>>>>>>>>>>>> and >>>>>> >>>>>>> some >>>>>>>>>>> >>>>>>>>>>>> Beam >>>>>>>>>>>>> >>>>>>>>>>>>>> meetings, and now the whole thing is written up in a >>>>>>>>>>>>>>>> >>>>>>>>>>>>>>> document: >>>>>>> >>>>>>>> >>>>>>>>>>>>>>>> https://s.apache.org/splittable-do-fn >>>>>>>>>>>>>>>> >>>>>>>>>>>>>>>> Here are some things that become possible with Splittable >>>>>>>>>>>>>>>> >>>>>>>>>>>>>>> DoFn: >>>>>>> >>>>>>>> - Efficiently read a filepattern matching millions of >>>>>>>>>>>>>>>> >>>>>>>>>>>>>>> files >>>>> >>>>>> - Read a collection of files that are produced by an >>>>>>>>>>>>>>>> >>>>>>>>>>>>>>> earlier >>>>>> >>>>>>> step >>>>>>>>>>> >>>>>>>>>>>> in the >>>>>>>>>>>>> >>>>>>>>>>>>>> pipeline (e.g. easily implement a connector to a storage >>>>>>>>>>>>>>>> >>>>>>>>>>>>>>> system >>>>>>> >>>>>>>> that can >>>>>>>>>>>>> >>>>>>>>>>>>>> export itself to files) >>>>>>>>>>>>>>>> - Implement a Kafka reader by composing a "list >>>>>>>>>>>>>>>> >>>>>>>>>>>>>>> partitions" >>>>> >>>>>> DoFn >>>>>>>>>>> >>>>>>>>>>>> with a >>>>>>>>>>>>> >>>>>>>>>>>>>> DoFn that simply polls a consumer and outputs new records >>>>>>>>>>>>>>>> >>>>>>>>>>>>>>> in >>>>>> >>>>>>> a >>>>>>> >>>>>>>> while() >>>>>>>>>>>>> >>>>>>>>>>>>>> loop >>>>>>>>>>>>>>> >>>>>>>>>>>>>>>> - Implement a log tailer by composing a DoFn that >>>>>>>>>>>>>>>> >>>>>>>>>>>>>>> incrementally >>>>>>> >>>>>>>> returns >>>>>>>>>>>>> >>>>>>>>>>>>>> new >>>>>>>>>>>>>>> >>>>>>>>>>>>>>>> files in a directory and a DoFn that tails a file >>>>>>>>>>>>>>>> - Implement a parallel "count friends in common" >>>>>>>>>>>>>>>> >>>>>>>>>>>>>>> algorithm >>>>> >>>>>> (matrix >>>>>>>>>>> >>>>>>>>>>>> squaring) with good work balancing >>>>>>>>>>>>>>>> >>>>>>>>>>>>>>>> Here is the meaningful part of a hypothetical Kafka >>>>>>>>>>>>>>>> >>>>>>>>>>>>>>> reader >>>>> >>>>>> written >>>>>>>>>>> >>>>>>>>>>>> against >>>>>>>>>>>>>>> >>>>>>>>>>>>>>>> this API: >>>>>>>>>>>>>>>> >>>>>>>>>>>>>>>> ProcessContinuation processElement( >>>>>>>>>>>>>>>> ProcessContext context, OffsetRangeTracker >>>>>>>>>>>>>>>> >>>>>>>>>>>>>>> tracker) >>>>>>> >>>>>>>> { >>>>>>>>>>> >>>>>>>>>>>> try (KafkaConsumer<String, String> consumer = >>>>>>>>>>>>>>>> Kafka.subscribe(context.element().topic, >>>>>>>>>>>>>>>> >>>>>>>>>>>>>>>> context.element().partition)) { >>>>>>> >>>>>>>> consumer.seek(tracker.start()); >>>>>>>>>>>>>>>> while (true) { >>>>>>>>>>>>>>>> ConsumerRecords<String, String> records = >>>>>>>>>>>>>>>> >>>>>>>>>>>>>>> consumer.poll(100ms); >>>>>>>>>>>>>>> >>>>>>>>>>>>>>>> if (records == null) return done(); >>>>>>>>>>>>>>>> for (ConsumerRecord<String, String> record : >>>>>>>>>>>>>>>> >>>>>>>>>>>>>>> records) >>>>>>> >>>>>>>> { >>>>>>>>>>> >>>>>>>>>>>> if (!tracker.tryClaim(record.offset())) { >>>>>>>>>>>>>>>> return >>>>>>>>>>>>>>>> >>>>>>>>>>>>>>> resume().withFutureOutputWatermark(record.timestamp()); >>>>>>>>>>>>>>> >>>>>>>>>>>>>>>> } >>>>>>>>>>>>>>>> context.output(record); >>>>>>>>>>>>>>>> } >>>>>>>>>>>>>>>> } >>>>>>>>>>>>>>>> } >>>>>>>>>>>>>>>> } >>>>>>>>>>>>>>>> >>>>>>>>>>>>>>>> The document describes in detail the motivations behind >>>>>>>>>>>>>>>> >>>>>>>>>>>>>>> this >>>>>> >>>>>>> feature, >>>>>>>>>>>>> >>>>>>>>>>>>>> the >>>>>>>>>>>>>>> >>>>>>>>>>>>>>>> basic idea and API, open questions, and outlines an >>>>>>>>>>>>>>>> >>>>>>>>>>>>>>> incremental >>>>>>> >>>>>>>> delivery >>>>>>>>>>>>> >>>>>>>>>>>>>> plan. >>>>>>>>>>>>>>>> >>>>>>>>>>>>>>>> The proposed API builds on the reflection-based new DoFn >>>>>>>>>>>>>>>> >>>>>>>>>>>>>>> [new-do-fn] >>>>>>>>>>>>> >>>>>>>>>>>>>> and is >>>>>>>>>>>>>>> >>>>>>>>>>>>>>>> loosely related to "State and Timers for DoFn" >>>>>>>>>>>>>>>> >>>>>>>>>>>>>>> [beam-state]. >>>>>> >>>>>>> >>>>>>>>>>>>>>>> Please take a look and comment! >>>>>>>>>>>>>>>> >>>>>>>>>>>>>>>> Thanks. >>>>>>>>>>>>>>>> >>>>>>>>>>>>>>>> [BEAM-65] https://issues.apache.org/jira/browse/BEAM-65 >>>>>>>>>>>>>>>> [new-do-fn] https://s.apache.org/a-new-do-fn >>>>>>>>>>>>>>>> [beam-state] https://s.apache.org/beam-state >>>>>>>>>>>>>>>> >>>>>>>>>>>>>>>> >>>>>>>>>>>>>>> -- >>>>>>>>>>>>>>> Jean-Baptiste Onofré >>>>>>>>>>>>>>> [email protected] <javascript:;> >>>>>>>>>>>>>>> http://blog.nanthrax.net >>>>>>>>>>>>>>> Talend - http://www.talend.com >>>>>>>>>>>>>>> >>>>>>>>>>>>>>> >>>>>>>>>>>>>> >>>>>>>>>>>> >>>>>>>>>> >>>>>>>>> >>>>>>> >>>>>> >>>>>> -- >>>>>> Thanks, >>>>>> Andrew >>>>>> >>>>>> Subscribe to my book: Streaming Data <http://manning.com/psaltis> >>>>>> <https://www.linkedin.com/pub/andrew-psaltis/1/17b/306> >>>>>> twiiter: @itmdata <http://twitter.com/intent/user?screen_name=itmdata >>>>>> > >>>>>> >>>>>> >>>>> >>>> >>> -- >>> Jean-Baptiste Onofré >>> [email protected] >>> http://blog.nanthrax.net >>> Talend - http://www.talend.com >>> >>> >> > -- > Jean-Baptiste Onofré > [email protected] > http://blog.nanthrax.net > Talend - http://www.talend.com >
