Hi, I have another question that I think wasn't addressed in the meeting. At least it wasn't mentioned in the notes.
In the context of replacing sources by a combination of to SDFs, how do you determine how many "SDF executor" instances you need downstream? For the sake of argument assume that both SDFs are executed with parallelism 1 (or one per worker). Now, if you have a file source that reads from a static set of files the first SDF would emit the filenames while the second SDF would receive the filenames and emit their contents. This works well and the downstream SDF can process one filename after the other. Now, think of something like a Kafka source. The first SDF would emit the partitions (say 4 partitions, in this example) and the second SDF would be responsible for reading from a topic and emitting elements. Reading from one topic never finishes so you can't process the topics in series. I think you would need to have 4 downstream "SDF executor" instances. The question now is: how do you determine whether you are in the first or the second situation? Probably I'm just overlooking something and this is already dealt with somewhere... :-) Cheers, Aljoscha On Fri, 19 Aug 2016 at 21:02 Ismaël Mejía <[email protected]> wrote: > Hello, > > Thanks for the notes both Dan and Eugene, and for taking the time to do the > presentation and answer our questions. > > I mentioned the ongoing work on dynamic scaling on Flink because I suppose > that it will address dynamic rebalancing eventually (there are multiple > changes going on for dynamic scaling). > > > https://docs.google.com/document/d/1G1OS1z3xEBOrYD4wSu-LuBCyPUWyFd9l3T9WyssQ63w/edit#heading=h.2suhu1fjxmp4 > > https://lists.apache.org/[email protected]:lte=1M:FLIP-8 > > Anyway I am far from an expert on flink, but probably the flink guys can > give their opinion about this and refer to a more precise document that the > ones I mentioned.. > > Thanks again, > Ismaël > > On Fri, Aug 19, 2016 at 8:52 PM, Jean-Baptiste Onofré <[email protected]> > wrote: > > > Great summary Eugene and Dan. > > > > And thanks again for the details, explanation, and discussion. > > > > Regards > > JB > > > > > > On 08/19/2016 08:16 PM, Eugene Kirpichov wrote: > > > >> Thanks for attending, everybody! > >> > >> Here are meeting notes (thanks Dan!). > >> > >> Q: Will SplittableDoFn enable better repartitioning of the input/output > >> data? > >> A: Not really; repartitioning is orthogonal to SDF. > >> > >> Current Source API suffers from lack of composition and scalability > >> because > >> we treat sources too much as metadata, not enough as data. > >> > >> Q(slide with transform expansion): who does the "magic"? > >> A: The runner. Checkpointing and dynamically splitting restrictions will > >> require collaboration with the runner. > >> > >> Q: How does the runner interact with the DoFn to control the > restrictions? > >> Is it related to the centralized job tracker etc.? > >> A: RestrictionTracker is a simple helper object, that exists purely on > the > >> worker while executing a single partition, and interacts with the worker > >> harness part of the runner. Not to be confused with the centralized job > >> tracker (master) - completely unrelated. Worker harness, of course, > >> interacts with the master in some relevant ways (e.g. Dataflow master > can > >> tell "you're a straggler, you should split"). > >> > >> Q: Is this a new DoFn subclass, or how will this integrate with the > >> existing code? > >> A: It's a feature of reflection-based DoFn ( > https://s.apache.org/a-new-do > >> fn) > >> - just another optional parameter of type RestrictionTracker to > >> processElement() which is dynamically bound via reflection, so fully > >> backward/forward compatible, and looks to users like a regular DoFn. > >> > >> Q: why is fractionClaimed a double? > >> A: It's a number in 0..1. Job-wide magic (e.g. autoscaling, dynamic > >> rebalancing) requires a uniform way to represent progress through > >> different > >> sources. > >> > >> Q: Spark runner is microbatch-based, so this seems to map well onto > >> checkpoint/resume, right? > >> A: Yes; actually the Dataflow runner is, at a worker level, also > >> microbatch-based. The way SDF interacts with a runner will be very > similar > >> to how a Bounded/UnboundedSource interacts with a runner. > >> > >> Q: Using SDF, what would be the "packaging" of the IO? > >> A: Same as currently: package IO's as PTransforms and their > implementation > >> under the hood can be anything: Source, simple ParDo's, SDF, etc. E.g. > >> Datastore was recently refactored from BoundedSource to ParDo (ended up > >> simpler and more scalable), transparently to users. > >> > >> Q: What's the timeline; what to do with the IOs currently in > development? > >> A: Timeline is O(months). Keep doing what you're doing and working on > top > >> of Source APIs when necessary and simple ParDo's otherwise. > >> > >> Q: What's the impact for the runner writers? > >> A: Tentatively expected that most of the code for running an SDF will be > >> common to runners, with some amount of per-runner glue code, just like > >> GBK/windowing/triggering. Impact on Dataflow runner is larger since it > >> supports dynamic rebalancing in batch mode and this is the hardest part, > >> but for other runners shouldn't be too hard. > >> > >> JB: Talend has people who can help with this: e.g. help integrate into > >> Spark runner, refactor IOs etc. Amit also willing to chat about > supporting > >> SDF in Spark runner. > >> > >> Ismael: There's a Flink proposal about dynamic rebalancing. Ismael will > >> send a link. > >> > >> > >> On Fri, Aug 19, 2016 at 7:21 AM Jean-Baptiste Onofré <[email protected]> > >> wrote: > >> > >> Hi Eugene, > >>> > >>> thanks for the reminder. > >>> > >>> Just to prepare some topics for the call, please find some points: > >>> > >>> 1. Using SDF, what would be the "packaging" of the IO ? It sounds to me > >>> that we can keep the IO packaging style (using with* setters for the IO > >>> configuration) and replace PTransform, Source, Reader, ... directly > with > >>> SDF. Correct ? > >>> > >>> 2. What's your plan in term of release to include SDF ? We have several > >>> IOs in preparation and I wonder if it's worth to start to use the new > >>> SDF API or not. > >>> > >>> 3. What's the impact for the runner writers ? The runners will have to > >>> support SDF, that could be tricky depending of the execution engine. In > >>> the worst case where the runner can't fully support SDF, does it mean > >>> that most of our IOs will be useless ? > >>> > >>> Just my dumb topics ;) > >>> > >>> Thanks, > >>> See you at 8am ! > >>> > >>> Regards > >>> JB > >>> > >>> On 08/19/2016 02:29 AM, Eugene Kirpichov wrote: > >>> > >>>> Hello everybody, > >>>> > >>>> Just a reminder: > >>>> > >>>> The meeting is happening tomorrow - Friday Aug 19th, 8am-9am PST, to > >>>> join > >>>> the call go to > >>>> https://hangouts.google.com/hangouts/_/google.com/splittabledofn . > >>>> I intend to go over the proposed design and then have a free-form > >>>> discussion. > >>>> > >>>> Please have a skim through the proposal doc: https://s.apache.org/ > >>>> splittable-do-fn > >>>> I also made some slides that are basically a trimmed-down version of > the > >>>> doc to use as a guide when conducting the meeting, > >>>> > >>>> https://docs.google.com/presentation/d/1DfCp9VRLH-5Ap5nbz047 > >>> Fiv3NfjrywJVSdef5dRY4aE/edit?usp=sharing > >>> > >>>> . > >>>> > >>>> I will post notes from the meeting on this thread afterwards. > >>>> > >>>> Thanks, looking forward. > >>>> > >>>> On Fri, Aug 12, 2016 at 5:35 PM Dan Halperin > >>>> <[email protected] > >>>> > >>>> wrote: > >>>> > >>>> This is pretty cool! I'll be there too. (unless the hangout gets too > >>>>> > >>>> full > >>> > >>>> -- if so, I'll drop out in favor of others who aren't lucky enough to > >>>>> > >>>> get > >>> > >>>> to talk to Eugene all the time.) > >>>>> > >>>>> On Fri, Aug 12, 2016 at 4:03 PM, Andrew Psaltis < > >>>>> > >>>> [email protected]> > >>> > >>>> wrote: > >>>>> > >>>>> +1 I'll join > >>>>>> > >>>>>> On Friday, August 12, 2016, Aparup Banerjee (apbanerj) < > >>>>>> > >>>>> [email protected] > >>>>> > >>>>>> > >>>>>>> wrote: > >>>>>> > >>>>>> + 1, me2 > >>>>>>> > >>>>>>> > >>>>>>> > >>>>>>> > >>>>>>> On 8/12/16, 9:27 AM, "Amit Sela" <[email protected] > >>>>>>> > >>>>>> <javascript:;>> > >>> > >>>> wrote: > >>>>>>> > >>>>>>> +1 as in I'll join ;-) > >>>>>>>> > >>>>>>>> On Fri, Aug 12, 2016, 19:14 Eugene Kirpichov > >>>>>>>> > >>>>>>> <[email protected] > >>>>>> > >>>>>>> > >>>>>>>> wrote: > >>>>>>>> > >>>>>>>> Sounds good, thanks! > >>>>>>>>> Then Friday Aug 19th it is, 8am-9am PST, > >>>>>>>>> https://staging.talkgadget.google.com/hangouts/_/google. > >>>>>>>>> > >>>>>>>> com/splittabledofn > >>>>>>> > >>>>>>>> > >>>>>>>>> On Thu, Aug 11, 2016 at 11:12 PM Jean-Baptiste Onofré < > >>>>>>>>> > >>>>>>>> [email protected] > >>>>>> > >>>>>>> <javascript:;>> > >>>>>>> > >>>>>>>> wrote: > >>>>>>>>> > >>>>>>>>> Hi > >>>>>>>>>> > >>>>>>>>>> Unfortunately I will be in Ireland on August 15th. What about > >>>>>>>>>> > >>>>>>>>> Friday > >>>>> > >>>>>> 19th ? > >>>>>>>>> > >>>>>>>>>> > >>>>>>>>>> Regards > >>>>>>>>>> JB > >>>>>>>>>> > >>>>>>>>>> > >>>>>>>>>> > >>>>>>>>>> On Aug 11, 2016, 23:22, at 23:22, Eugene Kirpichov > >>>>>>>>>> <[email protected]> wrote: > >>>>>>>>>> > >>>>>>>>>>> Hi JB, > >>>>>>>>>>> > >>>>>>>>>>> Sounds great, does the suggested time over videoconference work > >>>>>>>>>>> > >>>>>>>>>> for > >>>>> > >>>>>> you? > >>>>>>>>>>> > >>>>>>>>>>> On Thu, Aug 11, 2016 at 11:59 AM Jean-Baptiste Onofré < > >>>>>>>>>>> > >>>>>>>>>> [email protected] <javascript:;>> > >>>>>>> > >>>>>>>> wrote: > >>>>>>>>>>> > >>>>>>>>>>> Hi Eugene > >>>>>>>>>>>> > >>>>>>>>>>>> May we talk together next week ? I like the proposal. I would > >>>>>>>>>>>> > >>>>>>>>>>> just > >>>>>> > >>>>>>> need > >>>>>>>>>>> > >>>>>>>>>>>> some details for my understanding. > >>>>>>>>>>>> > >>>>>>>>>>>> Thanks > >>>>>>>>>>>> Regards > >>>>>>>>>>>> JB > >>>>>>>>>>>> > >>>>>>>>>>>> > >>>>>>>>>>>> > >>>>>>>>>>>> On Aug 11, 2016, 19:46, at 19:46, Eugene Kirpichov > >>>>>>>>>>>> <[email protected]> wrote: > >>>>>>>>>>>> > >>>>>>>>>>>>> Hi JB, > >>>>>>>>>>>>> > >>>>>>>>>>>>> What are your thoughts on this? > >>>>>>>>>>>>> > >>>>>>>>>>>>> I'm also thinking of having a virtual meeting to explain more > >>>>>>>>>>>>> > >>>>>>>>>>>> about > >>>>>>> > >>>>>>>> this > >>>>>>>>>>>>> proposal if necessary, since I understand it is a lot to > >>>>>>>>>>>>> > >>>>>>>>>>>> digest. > >>>>> > >>>>>> > >>>>>>>>>>>>> How about: Monday Aug 15, 8am-9am Pacific time, over > Hangouts? > >>>>>>>>>>>>> (link: > >>>>>>>>>>>>> > >>>>>>>>>>>>> > >>>>>>>>>>>> > >>>>>>>>>>> > >>>>>>>>>> https://staging.talkgadget.google.com/hangouts/_/google. > >>>>>>>>> > >>>>>>>> com/splittabledofn > >>>>>>> > >>>>>>>> - > >>>>>>>>>>>>> I confirmed that it can be joined without being logged into a > >>>>>>>>>>>>> > >>>>>>>>>>>> Google > >>>>>>> > >>>>>>>> account) > >>>>>>>>>>>>> > >>>>>>>>>>>>> Who'd be interested in attending, and does this time/date > work > >>>>>>>>>>>>> > >>>>>>>>>>>> for > >>>>>> > >>>>>>> people? > >>>>>>>>>>>>> > >>>>>>>>>>>>> On Fri, Aug 5, 2016 at 10:48 AM Eugene Kirpichov > >>>>>>>>>>>>> > >>>>>>>>>>>> <[email protected] <javascript:;>> > >>>>>>>>>>> > >>>>>>>>>>>> wrote: > >>>>>>>>>>>>> > >>>>>>>>>>>>> Hi JB, thanks for reading and for your comments! > >>>>>>>>>>>>>> > >>>>>>>>>>>>>> It sounds like you are concerned about continued support for > >>>>>>>>>>>>>> > >>>>>>>>>>>>> existing > >>>>>>>>>>> > >>>>>>>>>>>> IO's > >>>>>>>>>>>>> > >>>>>>>>>>>>>> people have developed, and about backward compatibility? > >>>>>>>>>>>>>> > >>>>>>>>>>>>>> We do not need to remove the Source API, and all existing > >>>>>>>>>>>>>> > >>>>>>>>>>>>> Source-based > >>>>>>>>>>>>> > >>>>>>>>>>>>>> connectors will continue to work [though the document > >>>>>>>>>>>>>> > >>>>>>>>>>>>> proposes > >>>>> > >>>>>> at > >>>>>>> > >>>>>>>> some > >>>>>>>>>>>>> > >>>>>>>>>>>>>> point to make Read.from(Source) to translate to a wrapper > >>>>>>>>>>>>>> > >>>>>>>>>>>>> SDF > >>>>> > >>>>>> under > >>>>>>>>>>> > >>>>>>>>>>>> the > >>>>>>>>>>>>> > >>>>>>>>>>>>>> hood, to exercise the feature more and to make sure that it > >>>>>>>>>>>>>> > >>>>>>>>>>>>> is > >>>>> > >>>>>> strictly > >>>>>>>>>>>>> > >>>>>>>>>>>>>> more powerful - but this is an optional implementation > >>>>>>>>>>>>>> > >>>>>>>>>>>>> detail]. > >>>>>> > >>>>>>> > >>>>>>>>>>>>>> Perhaps the document phrases this too strongly - "replacing > >>>>>>>>>>>>>> > >>>>>>>>>>>>> the > >>>>>> > >>>>>>> Source > >>>>>>>>>>>>> > >>>>>>>>>>>>>> API": a better phrasing would be "introducing a new API so > >>>>>>>>>>>>>> > >>>>>>>>>>>>> powerful > >>>>>>>>>>> > >>>>>>>>>>>> and > >>>>>>>>>>>>> > >>>>>>>>>>>>>> easy-to-use that hopefully people will choose it over the > >>>>>>>>>>>>>> > >>>>>>>>>>>>> Source > >>>>>> > >>>>>>> API > >>>>>>>>>>> > >>>>>>>>>>>> all > >>>>>>>>>>>>> > >>>>>>>>>>>>>> the time, even though they don't have to" :) And we can > >>>>>>>>>>>>>> > >>>>>>>>>>>>> discuss > >>>>>> > >>>>>>> whether or > >>>>>>>>>>>>> > >>>>>>>>>>>>>> not to actually deprecate/remove the Source API at some > >>>>>>>>>>>>>> > >>>>>>>>>>>>> point > >>>>> > >>>>>> down > >>>>>>> > >>>>>>>> the > >>>>>>>>>>>>> > >>>>>>>>>>>>>> road, once it becomes clear whether this is the case or not. > >>>>>>>>>>>>>> > >>>>>>>>>>>>>> To give more context: this proposal came out of discussions > >>>>>>>>>>>>>> > >>>>>>>>>>>>> within > >>>>>>> > >>>>>>>> the SDK > >>>>>>>>>>>>> > >>>>>>>>>>>>>> team over the past ~1.5 years, before the Beam project > >>>>>>>>>>>>>> > >>>>>>>>>>>>> existed, > >>>>>> > >>>>>>> on > >>>>>>> > >>>>>>>> how to > >>>>>>>>>>>>> > >>>>>>>>>>>>>> make major improvements to the Source API; perhaps it will > >>>>>>>>>>>>>> > >>>>>>>>>>>>> clarify > >>>>>>> > >>>>>>>> things > >>>>>>>>>>>>> > >>>>>>>>>>>>>> if I give a history of the ideas discussed: > >>>>>>>>>>>>>> - The first idea was to introduce a > >>>>>>>>>>>>>> > >>>>>>>>>>>>> Read.from(PCollection<Source>) > >>>>>>> > >>>>>>>> transform while keeping the Source API intact - this, given > >>>>>>>>>>>>>> > >>>>>>>>>>>>> appropriate > >>>>>>>>>>>>> > >>>>>>>>>>>>>> implementation, would solve most of the scalability and > >>>>>>>>>>>>>> > >>>>>>>>>>>>> composability > >>>>>>>>>>> > >>>>>>>>>>>> issues of IO's. Then most connectors would look like : > >>>>>>>>>>>>>> > >>>>>>>>>>>>> ParDo<A, > >>>>>> > >>>>>>> Source<B>> > >>>>>>>>>>>>> > >>>>>>>>>>>>>> + Read.from(). > >>>>>>>>>>>>>> - Then we figured that the Source class is an unnecessary > >>>>>>>>>>>>>> > >>>>>>>>>>>>> abstraction, as > >>>>>>>>>>>>> > >>>>>>>>>>>>>> it simply holds data. What if we only had a Reader<S, B> > >>>>>>>>>>>>>> > >>>>>>>>>>>>> class > >>>>> > >>>>>> where > >>>>>>>>>>> > >>>>>>>>>>>> S is > >>>>>>>>>>>>> > >>>>>>>>>>>>>> the source type and B the output type? Then connectors would > >>>>>>>>>>>>>> > >>>>>>>>>>>>> be > >>>>>> > >>>>>>> something > >>>>>>>>>>>>> > >>>>>>>>>>>>>> like: ParDo<A, S> + hypothetical Read.using(Reader<S, B>). > >>>>>>>>>>>>>> - Then somebody remarked that some of the features of Source > >>>>>>>>>>>>>> > >>>>>>>>>>>>> are > >>>>>> > >>>>>>> useful to > >>>>>>>>>>>>> > >>>>>>>>>>>>>> ParDo's as well: e.g. ability to report progress when > >>>>>>>>>>>>>> > >>>>>>>>>>>>> processing a > >>>>>>> > >>>>>>>> very > >>>>>>>>>>>>> > >>>>>>>>>>>>>> heavy element, or ability to produce very large output in > >>>>>>>>>>>>>> > >>>>>>>>>>>>> parallel. > >>>>>>>>>>> > >>>>>>>>>>>> - The two previous bullets were already hinting that the > >>>>>>>>>>>>>> > >>>>>>>>>>>>> Read.using() > >>>>>>>>>>> > >>>>>>>>>>>> primitive might not be so special: it just takes S and > >>>>>>>>>>>>>> > >>>>>>>>>>>>> produces > >>>>>> > >>>>>>> B: > >>>>>>> > >>>>>>>> isn't > >>>>>>>>>>>>> > >>>>>>>>>>>>>> that what a ParDo does, plus some source magic, minus the > >>>>>>>>>>>>>> > >>>>>>>>>>>>> convenience > >>>>>>>>>>> > >>>>>>>>>>>> of > >>>>>>>>>>>>> > >>>>>>>>>>>>>> c.output() vs. the start/advance() state machine? > >>>>>>>>>>>>>> - At this point it became clear that we should explore > >>>>>>>>>>>>>> > >>>>>>>>>>>>> unifying > >>>>>> > >>>>>>> sources > >>>>>>>>>>>>> > >>>>>>>>>>>>>> and ParDo's, in particular: can we bring the magic of > >>>>>>>>>>>>>> > >>>>>>>>>>>>> sources > >>>>> > >>>>>> to > >>>>>> > >>>>>>> ParDo's > >>>>>>>>>>>>> > >>>>>>>>>>>>>> but without the limitations and coding inconveniences? And > >>>>>>>>>>>>>> > >>>>>>>>>>>>> this > >>>>>> > >>>>>>> is > >>>>>>> > >>>>>>>> how > >>>>>>>>>>>>> > >>>>>>>>>>>>>> SplittableDoFn was born: bringing source magic to a DoFn by > >>>>>>>>>>>>>> > >>>>>>>>>>>>> providing > >>>>>>>>>>> > >>>>>>>>>>>> a > >>>>>>>>>>>>> > >>>>>>>>>>>>>> RangeTracker. > >>>>>>>>>>>>>> - Once the idea of "splittable DoFn's" was born, it became > >>>>>>>>>>>>>> > >>>>>>>>>>>>> clear > >>>>>> > >>>>>>> that > >>>>>>>>>>> > >>>>>>>>>>>> it > >>>>>>>>>>>>> > >>>>>>>>>>>>>> is strictly more general than sources; at least, in the > >>>>>>>>>>>>>> > >>>>>>>>>>>>> respect > >>>>>> > >>>>>>> that > >>>>>>>>>>> > >>>>>>>>>>>> sources have to produce output, while DoFn's don't: an SDF > >>>>>>>>>>>>>> > >>>>>>>>>>>>> may > >>>>> > >>>>>> very > >>>>>>>>>>> > >>>>>>>>>>>> well > >>>>>>>>>>>>> > >>>>>>>>>>>>>> produce no output at all, and simply perform a side effect > >>>>>>>>>>>>>> > >>>>>>>>>>>>> in > >>>>> > >>>>>> a > >>>>>> > >>>>>>> parallel/resumable way. > >>>>>>>>>>>>>> - Then there were countless hours of discussions on unifying > >>>>>>>>>>>>>> > >>>>>>>>>>>>> the > >>>>>> > >>>>>>> bounded/unbounded cases, on the particulars of RangeTracker > >>>>>>>>>>>>>> > >>>>>>>>>>>>> APIs > >>>>>> > >>>>>>> reconciling parallelization and checkpointing, what the > >>>>>>>>>>>>>> > >>>>>>>>>>>>> relation > >>>>>> > >>>>>>> between > >>>>>>>>>>>>> > >>>>>>>>>>>>>> SDF and DF should be, etc. They culminated in the current > >>>>>>>>>>>>>> > >>>>>>>>>>>>> proposal. > >>>>>>>>>>> > >>>>>>>>>>>> The > >>>>>>>>>>>>> > >>>>>>>>>>>>>> proposal comes at a time when a couple of key ingredients > >>>>>>>>>>>>>> > >>>>>>>>>>>>> are > >>>>> > >>>>>> (almost) > >>>>>>>>>>>>> > >>>>>>>>>>>>>> ready: NewDoFn to make SDF look like a regular DoFn, and the > >>>>>>>>>>>>>> > >>>>>>>>>>>>> State/Timers > >>>>>>>>>>>>> > >>>>>>>>>>>>>> proposal to enable unbounded work per element. > >>>>>>>>>>>>>> > >>>>>>>>>>>>>> To put it shortly: > >>>>>>>>>>>>>> - Yes, we will support existing Source connectors, and will > >>>>>>>>>>>>>> > >>>>>>>>>>>>> support > >>>>>>>>>>> > >>>>>>>>>>>> writing new ones, possibly forever. There is no interference > >>>>>>>>>>>>>> > >>>>>>>>>>>>> with > >>>>>>> > >>>>>>>> current > >>>>>>>>>>>>> > >>>>>>>>>>>>>> users of Source. > >>>>>>>>>>>>>> - The new API is an attempt to improve the Source API, taken > >>>>>>>>>>>>>> > >>>>>>>>>>>>> to > >>>>>> > >>>>>>> its > >>>>>>>>>>> > >>>>>>>>>>>> logical limit where it turns out that users' goals can be > >>>>>>>>>>>>>> > >>>>>>>>>>>>> accomplished > >>>>>>>>>>>>> > >>>>>>>>>>>>>> easier and more generically entirely within ParDo's. > >>>>>>>>>>>>>> > >>>>>>>>>>>>>> Let me know what you think, and thanks again! > >>>>>>>>>>>>>> > >>>>>>>>>>>>>> On Fri, Aug 5, 2016 at 2:39 AM Jean-Baptiste Onofré > >>>>>>>>>>>>>> > >>>>>>>>>>>>> <[email protected] <javascript:;>> > >>>>>>>>>>> > >>>>>>>>>>>> wrote: > >>>>>>>>>>>>>> > >>>>>>>>>>>>>> Hi Eugene, > >>>>>>>>>>>>>>> > >>>>>>>>>>>>>>> Just a question: why is it in DoFn and note an improvement > >>>>>>>>>>>>>>> > >>>>>>>>>>>>>> of > >>>>> > >>>>>> Source > >>>>>>>>>>> > >>>>>>>>>>>> ? > >>>>>>>>>>>>> > >>>>>>>>>>>>>> > >>>>>>>>>>>>>>> If I understand correctly, it means that we will have to > >>>>>>>>>>>>>>> > >>>>>>>>>>>>>> refactore > >>>>>>>>>>> > >>>>>>>>>>>> all > >>>>>>>>>>>>> > >>>>>>>>>>>>>> existing IO: basically, what you propose is to remove all > >>>>>>>>>>>>>>> > >>>>>>>>>>>>>> Source > >>>>>>> > >>>>>>>> to > >>>>>>>>>>> > >>>>>>>>>>>> replace with NewDoFn. > >>>>>>>>>>>>>>> > >>>>>>>>>>>>>>> I'm concern with this approach, especially in term of > >>>>>>>>>>>>>>> > >>>>>>>>>>>>>> timing: > >>>>> > >>>>>> clearly, > >>>>>>>>>>>>> > >>>>>>>>>>>>>> the IO is the area where we have to move forward in Beam as > >>>>>>>>>>>>>>> > >>>>>>>>>>>>>> it > >>>>>> > >>>>>>> will > >>>>>>>>>>> > >>>>>>>>>>>> allow new users to start in their projects. > >>>>>>>>>>>>>>> So, we started to bring new IOs: Kafka, JMS, Cassandra, > >>>>>>>>>>>>>>> > >>>>>>>>>>>>>> MongoDB, > >>>>>>> > >>>>>>>> JDBC, > >>>>>>>>>>>>> > >>>>>>>>>>>>>> ... and some people started to learn the IO API > >>>>>>>>>>>>>>> > >>>>>>>>>>>>>> (Bounded/Unbouded > >>>>>>> > >>>>>>>> source, etc). > >>>>>>>>>>>>>>> > >>>>>>>>>>>>>>> I think it would make more sense to enhance the IO API > >>>>>>>>>>>>>>> > >>>>>>>>>>>>>> (Source) > >>>>>> > >>>>>>> instead > >>>>>>>>>>>>> > >>>>>>>>>>>>>> of introducing a NewDoFn. > >>>>>>>>>>>>>>> > >>>>>>>>>>>>>>> What are your thoughts for IO writer like me ? ;) > >>>>>>>>>>>>>>> > >>>>>>>>>>>>>>> Regards > >>>>>>>>>>>>>>> JB > >>>>>>>>>>>>>>> > >>>>>>>>>>>>>>> On 08/04/2016 07:45 PM, Eugene Kirpichov wrote: > >>>>>>>>>>>>>>> > >>>>>>>>>>>>>>>> Hello Beam community, > >>>>>>>>>>>>>>>> > >>>>>>>>>>>>>>>> We (myself, Daniel Mills and Robert Bradshaw) would like > >>>>>>>>>>>>>>>> > >>>>>>>>>>>>>>> to > >>>>> > >>>>>> propose > >>>>>>>>>>>>> > >>>>>>>>>>>>>> "Splittable DoFn" - a major generalization of DoFn, which > >>>>>>>>>>>>>>>> > >>>>>>>>>>>>>>> allows > >>>>>>>>>>> > >>>>>>>>>>>> processing > >>>>>>>>>>>>>>> > >>>>>>>>>>>>>>>> of a single element to be non-monolithic, i.e. > >>>>>>>>>>>>>>>> > >>>>>>>>>>>>>>> checkpointable > >>>>>> > >>>>>>> and > >>>>>>>>>>> > >>>>>>>>>>>> parallelizable, as well as doing an unbounded amount of > >>>>>>>>>>>>>>>> > >>>>>>>>>>>>>>> work > >>>>>> > >>>>>>> per > >>>>>>>>>>> > >>>>>>>>>>>> element. > >>>>>>>>>>>>>>> > >>>>>>>>>>>>>>>> > >>>>>>>>>>>>>>>> This allows effectively replacing the current > >>>>>>>>>>>>>>>> > >>>>>>>>>>>>>>> Bounded/UnboundedSource > >>>>>>>>>>>>> > >>>>>>>>>>>>>> APIs > >>>>>>>>>>>>>>> > >>>>>>>>>>>>>>>> with DoFn's that are much easier to code, more scalable > >>>>>>>>>>>>>>>> > >>>>>>>>>>>>>>> and > >>>>> > >>>>>> composable > >>>>>>>>>>>>> > >>>>>>>>>>>>>> with > >>>>>>>>>>>>>>> > >>>>>>>>>>>>>>>> the rest of the Beam programming model, and enables many > >>>>>>>>>>>>>>>> > >>>>>>>>>>>>>>> use > >>>>>> > >>>>>>> cases > >>>>>>>>>>> > >>>>>>>>>>>> that > >>>>>>>>>>>>> > >>>>>>>>>>>>>> were previously difficult or impossible, as well as some > >>>>>>>>>>>>>>>> > >>>>>>>>>>>>>>> non-obvious new > >>>>>>>>>>>>> > >>>>>>>>>>>>>> use cases. > >>>>>>>>>>>>>>>> > >>>>>>>>>>>>>>>> This proposal has been mentioned before in JIRA [BEAM-65] > >>>>>>>>>>>>>>>> > >>>>>>>>>>>>>>> and > >>>>>> > >>>>>>> some > >>>>>>>>>>> > >>>>>>>>>>>> Beam > >>>>>>>>>>>>> > >>>>>>>>>>>>>> meetings, and now the whole thing is written up in a > >>>>>>>>>>>>>>>> > >>>>>>>>>>>>>>> document: > >>>>>>> > >>>>>>>> > >>>>>>>>>>>>>>>> https://s.apache.org/splittable-do-fn > >>>>>>>>>>>>>>>> > >>>>>>>>>>>>>>>> Here are some things that become possible with Splittable > >>>>>>>>>>>>>>>> > >>>>>>>>>>>>>>> DoFn: > >>>>>>> > >>>>>>>> - Efficiently read a filepattern matching millions of > >>>>>>>>>>>>>>>> > >>>>>>>>>>>>>>> files > >>>>> > >>>>>> - Read a collection of files that are produced by an > >>>>>>>>>>>>>>>> > >>>>>>>>>>>>>>> earlier > >>>>>> > >>>>>>> step > >>>>>>>>>>> > >>>>>>>>>>>> in the > >>>>>>>>>>>>> > >>>>>>>>>>>>>> pipeline (e.g. easily implement a connector to a storage > >>>>>>>>>>>>>>>> > >>>>>>>>>>>>>>> system > >>>>>>> > >>>>>>>> that can > >>>>>>>>>>>>> > >>>>>>>>>>>>>> export itself to files) > >>>>>>>>>>>>>>>> - Implement a Kafka reader by composing a "list > >>>>>>>>>>>>>>>> > >>>>>>>>>>>>>>> partitions" > >>>>> > >>>>>> DoFn > >>>>>>>>>>> > >>>>>>>>>>>> with a > >>>>>>>>>>>>> > >>>>>>>>>>>>>> DoFn that simply polls a consumer and outputs new records > >>>>>>>>>>>>>>>> > >>>>>>>>>>>>>>> in > >>>>>> > >>>>>>> a > >>>>>>> > >>>>>>>> while() > >>>>>>>>>>>>> > >>>>>>>>>>>>>> loop > >>>>>>>>>>>>>>> > >>>>>>>>>>>>>>>> - Implement a log tailer by composing a DoFn that > >>>>>>>>>>>>>>>> > >>>>>>>>>>>>>>> incrementally > >>>>>>> > >>>>>>>> returns > >>>>>>>>>>>>> > >>>>>>>>>>>>>> new > >>>>>>>>>>>>>>> > >>>>>>>>>>>>>>>> files in a directory and a DoFn that tails a file > >>>>>>>>>>>>>>>> - Implement a parallel "count friends in common" > >>>>>>>>>>>>>>>> > >>>>>>>>>>>>>>> algorithm > >>>>> > >>>>>> (matrix > >>>>>>>>>>> > >>>>>>>>>>>> squaring) with good work balancing > >>>>>>>>>>>>>>>> > >>>>>>>>>>>>>>>> Here is the meaningful part of a hypothetical Kafka > >>>>>>>>>>>>>>>> > >>>>>>>>>>>>>>> reader > >>>>> > >>>>>> written > >>>>>>>>>>> > >>>>>>>>>>>> against > >>>>>>>>>>>>>>> > >>>>>>>>>>>>>>>> this API: > >>>>>>>>>>>>>>>> > >>>>>>>>>>>>>>>> ProcessContinuation processElement( > >>>>>>>>>>>>>>>> ProcessContext context, OffsetRangeTracker > >>>>>>>>>>>>>>>> > >>>>>>>>>>>>>>> tracker) > >>>>>>> > >>>>>>>> { > >>>>>>>>>>> > >>>>>>>>>>>> try (KafkaConsumer<String, String> consumer = > >>>>>>>>>>>>>>>> Kafka.subscribe(context.element().topic, > >>>>>>>>>>>>>>>> > >>>>>>>>>>>>>>>> context.element().partition)) { > >>>>>>> > >>>>>>>> consumer.seek(tracker.start()); > >>>>>>>>>>>>>>>> while (true) { > >>>>>>>>>>>>>>>> ConsumerRecords<String, String> records = > >>>>>>>>>>>>>>>> > >>>>>>>>>>>>>>> consumer.poll(100ms); > >>>>>>>>>>>>>>> > >>>>>>>>>>>>>>>> if (records == null) return done(); > >>>>>>>>>>>>>>>> for (ConsumerRecord<String, String> record : > >>>>>>>>>>>>>>>> > >>>>>>>>>>>>>>> records) > >>>>>>> > >>>>>>>> { > >>>>>>>>>>> > >>>>>>>>>>>> if (!tracker.tryClaim(record.offset())) { > >>>>>>>>>>>>>>>> return > >>>>>>>>>>>>>>>> > >>>>>>>>>>>>>>> resume().withFutureOutputWatermark(record.timestamp()); > >>>>>>>>>>>>>>> > >>>>>>>>>>>>>>>> } > >>>>>>>>>>>>>>>> context.output(record); > >>>>>>>>>>>>>>>> } > >>>>>>>>>>>>>>>> } > >>>>>>>>>>>>>>>> } > >>>>>>>>>>>>>>>> } > >>>>>>>>>>>>>>>> > >>>>>>>>>>>>>>>> The document describes in detail the motivations behind > >>>>>>>>>>>>>>>> > >>>>>>>>>>>>>>> this > >>>>>> > >>>>>>> feature, > >>>>>>>>>>>>> > >>>>>>>>>>>>>> the > >>>>>>>>>>>>>>> > >>>>>>>>>>>>>>>> basic idea and API, open questions, and outlines an > >>>>>>>>>>>>>>>> > >>>>>>>>>>>>>>> incremental > >>>>>>> > >>>>>>>> delivery > >>>>>>>>>>>>> > >>>>>>>>>>>>>> plan. > >>>>>>>>>>>>>>>> > >>>>>>>>>>>>>>>> The proposed API builds on the reflection-based new DoFn > >>>>>>>>>>>>>>>> > >>>>>>>>>>>>>>> [new-do-fn] > >>>>>>>>>>>>> > >>>>>>>>>>>>>> and is > >>>>>>>>>>>>>>> > >>>>>>>>>>>>>>>> loosely related to "State and Timers for DoFn" > >>>>>>>>>>>>>>>> > >>>>>>>>>>>>>>> [beam-state]. > >>>>>> > >>>>>>> > >>>>>>>>>>>>>>>> Please take a look and comment! > >>>>>>>>>>>>>>>> > >>>>>>>>>>>>>>>> Thanks. > >>>>>>>>>>>>>>>> > >>>>>>>>>>>>>>>> [BEAM-65] https://issues.apache.org/jira/browse/BEAM-65 > >>>>>>>>>>>>>>>> [new-do-fn] https://s.apache.org/a-new-do-fn > >>>>>>>>>>>>>>>> [beam-state] https://s.apache.org/beam-state > >>>>>>>>>>>>>>>> > >>>>>>>>>>>>>>>> > >>>>>>>>>>>>>>> -- > >>>>>>>>>>>>>>> Jean-Baptiste Onofré > >>>>>>>>>>>>>>> [email protected] <javascript:;> > >>>>>>>>>>>>>>> http://blog.nanthrax.net > >>>>>>>>>>>>>>> Talend - http://www.talend.com > >>>>>>>>>>>>>>> > >>>>>>>>>>>>>>> > >>>>>>>>>>>>>> > >>>>>>>>>>>> > >>>>>>>>>> > >>>>>>>>> > >>>>>>> > >>>>>> > >>>>>> -- > >>>>>> Thanks, > >>>>>> Andrew > >>>>>> > >>>>>> Subscribe to my book: Streaming Data <http://manning.com/psaltis> > >>>>>> <https://www.linkedin.com/pub/andrew-psaltis/1/17b/306> > >>>>>> twiiter: @itmdata < > http://twitter.com/intent/user?screen_name=itmdata > >>>>>> > > >>>>>> > >>>>>> > >>>>> > >>>> > >>> -- > >>> Jean-Baptiste Onofré > >>> [email protected] > >>> http://blog.nanthrax.net > >>> Talend - http://www.talend.com > >>> > >>> > >> > > -- > > Jean-Baptiste Onofré > > [email protected] > > http://blog.nanthrax.net > > Talend - http://www.talend.com > > >
