Hi Eugene, thanks for the long description! With the interleaving of execution it completely makes sense.
Best, Aljoscha On Sun, 21 Aug 2016 at 21:14 Jean-Baptiste Onofré <j...@nanthrax.net> wrote: > Anyway, from a runner perspective, we will have kind of API (part of the > Runner API) to "orchestrate" the SDF as we discussed during the call, > right ? > > Regards > JB > > On 08/21/2016 07:24 PM, Eugene Kirpichov wrote: > > Hi Aljoscha, > > This is an excellent question! And the answer is, we don't need any new > > concepts like "SDF executor" and can rely on the per-key state and timers > > machinery that already exists in all runners because it's necessary to > > implement windowing/triggering properly. > > > > Note that this is already somewhat addressed in the previously posted > State > > and Timers proposal https://s.apache.org/beam-state , under "per-key > > workflows". > > > > Think of it this way, using the Kafka example: we'll expand it into a > > transform: > > > > (1) ParDo { topic -> (unique key, topic, partition, [0, inf))) for > > partition in topic.listPartitions() } > > (2) GroupByKey > > (3) ParDo { key, topic, partition, R -> Kafka reader code in the > > proposal/slides } > > - R is the OffsetRange restriction which in this case will be always of > > the form [startOffset, inf). > > - there'll be just 1 value per key, but we use GBK to just get access > to > > the per-key state/timers machinery. This may be runner-specific; maybe > some > > runners don't need a GBK to do that. > > > > Now suppose the topic has two partitions, P1 and P2, and they get > assigned > > unique keys K1, K2. > > Then the input to (3) will be a collection of: (K1, topic, P1, [0, inf)), > > (K2, topic, P2, [0, inf)). > > Suppose we have just 1 worker with just 1 thread. Now, how will this > thread > > be able to produce elements from both P1 and P2? here's how. > > > > The thread will process (K1, topic, P1, [0, inf)), checkpoint after a > > certain time or after a certain number of elements are output (just like > > with the current UnboundedSource reading code) producing a residual > > restriction R1' (basically a new start timestamp), put R11 into the > per-key > > state and set a timer T1 to resume. > > Then it will process (K2, topic, P2, [0, inf)), do the same producing a > > residual restriction R2' and setting a timer T2 to resume. > > Then timer T1 will fire in the context of the key K1. The thread will > call > > processElement again, this time supplying R1' as the restriction; the > > process repeats and after a while it checkpoints and stores R1'' into > state > > of K1. > > Then timer T2 will fire in the context of K2, run processElement for a > > while, set a new timer and store R2'' into the state of K2. > > Etc. > > If partition 1 goes away, the processElement call will return "do not > > resume", so a timer will not be set and instead the state associated with > > K1 will be GC'd. > > > > So basically it's almost like cooperative thread scheduling: things run > for > > a while, until the runner tells them to checkpoint, then they set a timer > > to resume themselves, and the runner fires the timers, and the process > > repeats. And, again, this only requires things that runners can already > do > > - state and timers, but no new concept of SDF executor (and consequently > no > > necessity to choose/tune how many you need). > > > > Makes sense? > > > > On Sat, Aug 20, 2016 at 9:46 AM Aljoscha Krettek <aljos...@apache.org> > > wrote: > > > >> Hi, > >> I have another question that I think wasn't addressed in the meeting. At > >> least it wasn't mentioned in the notes. > >> > >> In the context of replacing sources by a combination of to SDFs, how do > you > >> determine how many "SDF executor" instances you need downstream? For the > >> sake of argument assume that both SDFs are executed with parallelism 1 > (or > >> one per worker). Now, if you have a file source that reads from a static > >> set of files the first SDF would emit the filenames while the second SDF > >> would receive the filenames and emit their contents. This works well and > >> the downstream SDF can process one filename after the other. Now, think > of > >> something like a Kafka source. The first SDF would emit the partitions > (say > >> 4 partitions, in this example) and the second SDF would be responsible > for > >> reading from a topic and emitting elements. Reading from one topic never > >> finishes so you can't process the topics in series. I think you would > need > >> to have 4 downstream "SDF executor" instances. The question now is: how > do > >> you determine whether you are in the first or the second situation? > >> > >> Probably I'm just overlooking something and this is already dealt with > >> somewhere... :-) > >> > >> Cheers, > >> Aljoscha > >> > >> On Fri, 19 Aug 2016 at 21:02 Ismaël Mejía <ieme...@gmail.com> wrote: > >> > >>> Hello, > >>> > >>> Thanks for the notes both Dan and Eugene, and for taking the time to do > >> the > >>> presentation and answer our questions. > >>> > >>> I mentioned the ongoing work on dynamic scaling on Flink because I > >> suppose > >>> that it will address dynamic rebalancing eventually (there are multiple > >>> changes going on for dynamic scaling). > >>> > >>> > >>> > >> > https://docs.google.com/document/d/1G1OS1z3xEBOrYD4wSu-LuBCyPUWyFd9l3T9WyssQ63w/edit#heading=h.2suhu1fjxmp4 > >>> > >>> https://lists.apache.org/list.html?d...@flink.apache.org:lte=1M:FLIP-8 > >>> > >>> Anyway I am far from an expert on flink, but probably the flink guys > can > >>> give their opinion about this and refer to a more precise document that > >> the > >>> ones I mentioned.. > >>> > >>> Thanks again, > >>> Ismaël > >>> > >>> On Fri, Aug 19, 2016 at 8:52 PM, Jean-Baptiste Onofré <j...@nanthrax.net > > > >>> wrote: > >>> > >>>> Great summary Eugene and Dan. > >>>> > >>>> And thanks again for the details, explanation, and discussion. > >>>> > >>>> Regards > >>>> JB > >>>> > >>>> > >>>> On 08/19/2016 08:16 PM, Eugene Kirpichov wrote: > >>>> > >>>>> Thanks for attending, everybody! > >>>>> > >>>>> Here are meeting notes (thanks Dan!). > >>>>> > >>>>> Q: Will SplittableDoFn enable better repartitioning of the > >> input/output > >>>>> data? > >>>>> A: Not really; repartitioning is orthogonal to SDF. > >>>>> > >>>>> Current Source API suffers from lack of composition and scalability > >>>>> because > >>>>> we treat sources too much as metadata, not enough as data. > >>>>> > >>>>> Q(slide with transform expansion): who does the "magic"? > >>>>> A: The runner. Checkpointing and dynamically splitting restrictions > >> will > >>>>> require collaboration with the runner. > >>>>> > >>>>> Q: How does the runner interact with the DoFn to control the > >>> restrictions? > >>>>> Is it related to the centralized job tracker etc.? > >>>>> A: RestrictionTracker is a simple helper object, that exists purely > on > >>> the > >>>>> worker while executing a single partition, and interacts with the > >> worker > >>>>> harness part of the runner. Not to be confused with the centralized > >> job > >>>>> tracker (master) - completely unrelated. Worker harness, of course, > >>>>> interacts with the master in some relevant ways (e.g. Dataflow master > >>> can > >>>>> tell "you're a straggler, you should split"). > >>>>> > >>>>> Q: Is this a new DoFn subclass, or how will this integrate with the > >>>>> existing code? > >>>>> A: It's a feature of reflection-based DoFn ( > >>> https://s.apache.org/a-new-do > >>>>> fn) > >>>>> - just another optional parameter of type RestrictionTracker to > >>>>> processElement() which is dynamically bound via reflection, so fully > >>>>> backward/forward compatible, and looks to users like a regular DoFn. > >>>>> > >>>>> Q: why is fractionClaimed a double? > >>>>> A: It's a number in 0..1. Job-wide magic (e.g. autoscaling, dynamic > >>>>> rebalancing) requires a uniform way to represent progress through > >>>>> different > >>>>> sources. > >>>>> > >>>>> Q: Spark runner is microbatch-based, so this seems to map well onto > >>>>> checkpoint/resume, right? > >>>>> A: Yes; actually the Dataflow runner is, at a worker level, also > >>>>> microbatch-based. The way SDF interacts with a runner will be very > >>> similar > >>>>> to how a Bounded/UnboundedSource interacts with a runner. > >>>>> > >>>>> Q: Using SDF, what would be the "packaging" of the IO? > >>>>> A: Same as currently: package IO's as PTransforms and their > >>> implementation > >>>>> under the hood can be anything: Source, simple ParDo's, SDF, etc. > E.g. > >>>>> Datastore was recently refactored from BoundedSource to ParDo (ended > >> up > >>>>> simpler and more scalable), transparently to users. > >>>>> > >>>>> Q: What's the timeline; what to do with the IOs currently in > >>> development? > >>>>> A: Timeline is O(months). Keep doing what you're doing and working on > >>> top > >>>>> of Source APIs when necessary and simple ParDo's otherwise. > >>>>> > >>>>> Q: What's the impact for the runner writers? > >>>>> A: Tentatively expected that most of the code for running an SDF will > >> be > >>>>> common to runners, with some amount of per-runner glue code, just > like > >>>>> GBK/windowing/triggering. Impact on Dataflow runner is larger since > it > >>>>> supports dynamic rebalancing in batch mode and this is the hardest > >> part, > >>>>> but for other runners shouldn't be too hard. > >>>>> > >>>>> JB: Talend has people who can help with this: e.g. help integrate > into > >>>>> Spark runner, refactor IOs etc. Amit also willing to chat about > >>> supporting > >>>>> SDF in Spark runner. > >>>>> > >>>>> Ismael: There's a Flink proposal about dynamic rebalancing. Ismael > >> will > >>>>> send a link. > >>>>> > >>>>> > >>>>> On Fri, Aug 19, 2016 at 7:21 AM Jean-Baptiste Onofré < > j...@nanthrax.net > >>> > >>>>> wrote: > >>>>> > >>>>> Hi Eugene, > >>>>>> > >>>>>> thanks for the reminder. > >>>>>> > >>>>>> Just to prepare some topics for the call, please find some points: > >>>>>> > >>>>>> 1. Using SDF, what would be the "packaging" of the IO ? It sounds to > >> me > >>>>>> that we can keep the IO packaging style (using with* setters for the > >> IO > >>>>>> configuration) and replace PTransform, Source, Reader, ... directly > >>> with > >>>>>> SDF. Correct ? > >>>>>> > >>>>>> 2. What's your plan in term of release to include SDF ? We have > >> several > >>>>>> IOs in preparation and I wonder if it's worth to start to use the > new > >>>>>> SDF API or not. > >>>>>> > >>>>>> 3. What's the impact for the runner writers ? The runners will have > >> to > >>>>>> support SDF, that could be tricky depending of the execution engine. > >> In > >>>>>> the worst case where the runner can't fully support SDF, does it > mean > >>>>>> that most of our IOs will be useless ? > >>>>>> > >>>>>> Just my dumb topics ;) > >>>>>> > >>>>>> Thanks, > >>>>>> See you at 8am ! > >>>>>> > >>>>>> Regards > >>>>>> JB > >>>>>> > >>>>>> On 08/19/2016 02:29 AM, Eugene Kirpichov wrote: > >>>>>> > >>>>>>> Hello everybody, > >>>>>>> > >>>>>>> Just a reminder: > >>>>>>> > >>>>>>> The meeting is happening tomorrow - Friday Aug 19th, 8am-9am PST, > to > >>>>>>> join > >>>>>>> the call go to > >>>>>>> https://hangouts.google.com/hangouts/_/google.com/splittabledofn . > >>>>>>> I intend to go over the proposed design and then have a free-form > >>>>>>> discussion. > >>>>>>> > >>>>>>> Please have a skim through the proposal doc: https://s.apache.org/ > >>>>>>> splittable-do-fn > >>>>>>> I also made some slides that are basically a trimmed-down version > of > >>> the > >>>>>>> doc to use as a guide when conducting the meeting, > >>>>>>> > >>>>>>> https://docs.google.com/presentation/d/1DfCp9VRLH-5Ap5nbz047 > >>>>>> Fiv3NfjrywJVSdef5dRY4aE/edit?usp=sharing > >>>>>> > >>>>>>> . > >>>>>>> > >>>>>>> I will post notes from the meeting on this thread afterwards. > >>>>>>> > >>>>>>> Thanks, looking forward. > >>>>>>> > >>>>>>> On Fri, Aug 12, 2016 at 5:35 PM Dan Halperin > >>>>>>> <dhalp...@google.com.invalid > >>>>>>> > >>>>>>> wrote: > >>>>>>> > >>>>>>> This is pretty cool! I'll be there too. (unless the hangout gets > too > >>>>>>>> > >>>>>>> full > >>>>>> > >>>>>>> -- if so, I'll drop out in favor of others who aren't lucky enough > >> to > >>>>>>>> > >>>>>>> get > >>>>>> > >>>>>>> to talk to Eugene all the time.) > >>>>>>>> > >>>>>>>> On Fri, Aug 12, 2016 at 4:03 PM, Andrew Psaltis < > >>>>>>>> > >>>>>>> psaltis.and...@gmail.com> > >>>>>> > >>>>>>> wrote: > >>>>>>>> > >>>>>>>> +1 I'll join > >>>>>>>>> > >>>>>>>>> On Friday, August 12, 2016, Aparup Banerjee (apbanerj) < > >>>>>>>>> > >>>>>>>> apban...@cisco.com > >>>>>>>> > >>>>>>>>> > >>>>>>>>>> wrote: > >>>>>>>>> > >>>>>>>>> + 1, me2 > >>>>>>>>>> > >>>>>>>>>> > >>>>>>>>>> > >>>>>>>>>> > >>>>>>>>>> On 8/12/16, 9:27 AM, "Amit Sela" <amitsel...@gmail.com > >>>>>>>>>> > >>>>>>>>> <javascript:;>> > >>>>>> > >>>>>>> wrote: > >>>>>>>>>> > >>>>>>>>>> +1 as in I'll join ;-) > >>>>>>>>>>> > >>>>>>>>>>> On Fri, Aug 12, 2016, 19:14 Eugene Kirpichov > >>>>>>>>>>> > >>>>>>>>>> <kirpic...@google.com.invalid > >>>>>>>>> > >>>>>>>>>> > >>>>>>>>>>> wrote: > >>>>>>>>>>> > >>>>>>>>>>> Sounds good, thanks! > >>>>>>>>>>>> Then Friday Aug 19th it is, 8am-9am PST, > >>>>>>>>>>>> https://staging.talkgadget.google.com/hangouts/_/google. > >>>>>>>>>>>> > >>>>>>>>>>> com/splittabledofn > >>>>>>>>>> > >>>>>>>>>>> > >>>>>>>>>>>> On Thu, Aug 11, 2016 at 11:12 PM Jean-Baptiste Onofré < > >>>>>>>>>>>> > >>>>>>>>>>> j...@nanthrax.net > >>>>>>>>> > >>>>>>>>>> <javascript:;>> > >>>>>>>>>> > >>>>>>>>>>> wrote: > >>>>>>>>>>>> > >>>>>>>>>>>> Hi > >>>>>>>>>>>>> > >>>>>>>>>>>>> Unfortunately I will be in Ireland on August 15th. What about > >>>>>>>>>>>>> > >>>>>>>>>>>> Friday > >>>>>>>> > >>>>>>>>> 19th ? > >>>>>>>>>>>> > >>>>>>>>>>>>> > >>>>>>>>>>>>> Regards > >>>>>>>>>>>>> JB > >>>>>>>>>>>>> > >>>>>>>>>>>>> > >>>>>>>>>>>>> > >>>>>>>>>>>>> On Aug 11, 2016, 23:22, at 23:22, Eugene Kirpichov > >>>>>>>>>>>>> <kirpic...@google.com.INVALID> wrote: > >>>>>>>>>>>>> > >>>>>>>>>>>>>> Hi JB, > >>>>>>>>>>>>>> > >>>>>>>>>>>>>> Sounds great, does the suggested time over videoconference > >> work > >>>>>>>>>>>>>> > >>>>>>>>>>>>> for > >>>>>>>> > >>>>>>>>> you? > >>>>>>>>>>>>>> > >>>>>>>>>>>>>> On Thu, Aug 11, 2016 at 11:59 AM Jean-Baptiste Onofré < > >>>>>>>>>>>>>> > >>>>>>>>>>>>> j...@nanthrax.net <javascript:;>> > >>>>>>>>>> > >>>>>>>>>>> wrote: > >>>>>>>>>>>>>> > >>>>>>>>>>>>>> Hi Eugene > >>>>>>>>>>>>>>> > >>>>>>>>>>>>>>> May we talk together next week ? I like the proposal. I > >> would > >>>>>>>>>>>>>>> > >>>>>>>>>>>>>> just > >>>>>>>>> > >>>>>>>>>> need > >>>>>>>>>>>>>> > >>>>>>>>>>>>>>> some details for my understanding. > >>>>>>>>>>>>>>> > >>>>>>>>>>>>>>> Thanks > >>>>>>>>>>>>>>> Regards > >>>>>>>>>>>>>>> JB > >>>>>>>>>>>>>>> > >>>>>>>>>>>>>>> > >>>>>>>>>>>>>>> > >>>>>>>>>>>>>>> On Aug 11, 2016, 19:46, at 19:46, Eugene Kirpichov > >>>>>>>>>>>>>>> <kirpic...@google.com.INVALID> wrote: > >>>>>>>>>>>>>>> > >>>>>>>>>>>>>>>> Hi JB, > >>>>>>>>>>>>>>>> > >>>>>>>>>>>>>>>> What are your thoughts on this? > >>>>>>>>>>>>>>>> > >>>>>>>>>>>>>>>> I'm also thinking of having a virtual meeting to explain > >> more > >>>>>>>>>>>>>>>> > >>>>>>>>>>>>>>> about > >>>>>>>>>> > >>>>>>>>>>> this > >>>>>>>>>>>>>>>> proposal if necessary, since I understand it is a lot to > >>>>>>>>>>>>>>>> > >>>>>>>>>>>>>>> digest. > >>>>>>>> > >>>>>>>>> > >>>>>>>>>>>>>>>> How about: Monday Aug 15, 8am-9am Pacific time, over > >>> Hangouts? > >>>>>>>>>>>>>>>> (link: > >>>>>>>>>>>>>>>> > >>>>>>>>>>>>>>>> > >>>>>>>>>>>>>>> > >>>>>>>>>>>>>> > >>>>>>>>>>>>> https://staging.talkgadget.google.com/hangouts/_/google. > >>>>>>>>>>>> > >>>>>>>>>>> com/splittabledofn > >>>>>>>>>> > >>>>>>>>>>> - > >>>>>>>>>>>>>>>> I confirmed that it can be joined without being logged > >> into a > >>>>>>>>>>>>>>>> > >>>>>>>>>>>>>>> Google > >>>>>>>>>> > >>>>>>>>>>> account) > >>>>>>>>>>>>>>>> > >>>>>>>>>>>>>>>> Who'd be interested in attending, and does this time/date > >>> work > >>>>>>>>>>>>>>>> > >>>>>>>>>>>>>>> for > >>>>>>>>> > >>>>>>>>>> people? > >>>>>>>>>>>>>>>> > >>>>>>>>>>>>>>>> On Fri, Aug 5, 2016 at 10:48 AM Eugene Kirpichov > >>>>>>>>>>>>>>>> > >>>>>>>>>>>>>>> <kirpic...@google.com <javascript:;>> > >>>>>>>>>>>>>> > >>>>>>>>>>>>>>> wrote: > >>>>>>>>>>>>>>>> > >>>>>>>>>>>>>>>> Hi JB, thanks for reading and for your comments! > >>>>>>>>>>>>>>>>> > >>>>>>>>>>>>>>>>> It sounds like you are concerned about continued support > >> for > >>>>>>>>>>>>>>>>> > >>>>>>>>>>>>>>>> existing > >>>>>>>>>>>>>> > >>>>>>>>>>>>>>> IO's > >>>>>>>>>>>>>>>> > >>>>>>>>>>>>>>>>> people have developed, and about backward compatibility? > >>>>>>>>>>>>>>>>> > >>>>>>>>>>>>>>>>> We do not need to remove the Source API, and all existing > >>>>>>>>>>>>>>>>> > >>>>>>>>>>>>>>>> Source-based > >>>>>>>>>>>>>>>> > >>>>>>>>>>>>>>>>> connectors will continue to work [though the document > >>>>>>>>>>>>>>>>> > >>>>>>>>>>>>>>>> proposes > >>>>>>>> > >>>>>>>>> at > >>>>>>>>>> > >>>>>>>>>>> some > >>>>>>>>>>>>>>>> > >>>>>>>>>>>>>>>>> point to make Read.from(Source) to translate to a wrapper > >>>>>>>>>>>>>>>>> > >>>>>>>>>>>>>>>> SDF > >>>>>>>> > >>>>>>>>> under > >>>>>>>>>>>>>> > >>>>>>>>>>>>>>> the > >>>>>>>>>>>>>>>> > >>>>>>>>>>>>>>>>> hood, to exercise the feature more and to make sure that > >> it > >>>>>>>>>>>>>>>>> > >>>>>>>>>>>>>>>> is > >>>>>>>> > >>>>>>>>> strictly > >>>>>>>>>>>>>>>> > >>>>>>>>>>>>>>>>> more powerful - but this is an optional implementation > >>>>>>>>>>>>>>>>> > >>>>>>>>>>>>>>>> detail]. > >>>>>>>>> > >>>>>>>>>> > >>>>>>>>>>>>>>>>> Perhaps the document phrases this too strongly - > >> "replacing > >>>>>>>>>>>>>>>>> > >>>>>>>>>>>>>>>> the > >>>>>>>>> > >>>>>>>>>> Source > >>>>>>>>>>>>>>>> > >>>>>>>>>>>>>>>>> API": a better phrasing would be "introducing a new API > so > >>>>>>>>>>>>>>>>> > >>>>>>>>>>>>>>>> powerful > >>>>>>>>>>>>>> > >>>>>>>>>>>>>>> and > >>>>>>>>>>>>>>>> > >>>>>>>>>>>>>>>>> easy-to-use that hopefully people will choose it over the > >>>>>>>>>>>>>>>>> > >>>>>>>>>>>>>>>> Source > >>>>>>>>> > >>>>>>>>>> API > >>>>>>>>>>>>>> > >>>>>>>>>>>>>>> all > >>>>>>>>>>>>>>>> > >>>>>>>>>>>>>>>>> the time, even though they don't have to" :) And we can > >>>>>>>>>>>>>>>>> > >>>>>>>>>>>>>>>> discuss > >>>>>>>>> > >>>>>>>>>> whether or > >>>>>>>>>>>>>>>> > >>>>>>>>>>>>>>>>> not to actually deprecate/remove the Source API at some > >>>>>>>>>>>>>>>>> > >>>>>>>>>>>>>>>> point > >>>>>>>> > >>>>>>>>> down > >>>>>>>>>> > >>>>>>>>>>> the > >>>>>>>>>>>>>>>> > >>>>>>>>>>>>>>>>> road, once it becomes clear whether this is the case or > >> not. > >>>>>>>>>>>>>>>>> > >>>>>>>>>>>>>>>>> To give more context: this proposal came out of > >> discussions > >>>>>>>>>>>>>>>>> > >>>>>>>>>>>>>>>> within > >>>>>>>>>> > >>>>>>>>>>> the SDK > >>>>>>>>>>>>>>>> > >>>>>>>>>>>>>>>>> team over the past ~1.5 years, before the Beam project > >>>>>>>>>>>>>>>>> > >>>>>>>>>>>>>>>> existed, > >>>>>>>>> > >>>>>>>>>> on > >>>>>>>>>> > >>>>>>>>>>> how to > >>>>>>>>>>>>>>>> > >>>>>>>>>>>>>>>>> make major improvements to the Source API; perhaps it > will > >>>>>>>>>>>>>>>>> > >>>>>>>>>>>>>>>> clarify > >>>>>>>>>> > >>>>>>>>>>> things > >>>>>>>>>>>>>>>> > >>>>>>>>>>>>>>>>> if I give a history of the ideas discussed: > >>>>>>>>>>>>>>>>> - The first idea was to introduce a > >>>>>>>>>>>>>>>>> > >>>>>>>>>>>>>>>> Read.from(PCollection<Source>) > >>>>>>>>>> > >>>>>>>>>>> transform while keeping the Source API intact - this, given > >>>>>>>>>>>>>>>>> > >>>>>>>>>>>>>>>> appropriate > >>>>>>>>>>>>>>>> > >>>>>>>>>>>>>>>>> implementation, would solve most of the scalability and > >>>>>>>>>>>>>>>>> > >>>>>>>>>>>>>>>> composability > >>>>>>>>>>>>>> > >>>>>>>>>>>>>>> issues of IO's. Then most connectors would look like : > >>>>>>>>>>>>>>>>> > >>>>>>>>>>>>>>>> ParDo<A, > >>>>>>>>> > >>>>>>>>>> Source<B>> > >>>>>>>>>>>>>>>> > >>>>>>>>>>>>>>>>> + Read.from(). > >>>>>>>>>>>>>>>>> - Then we figured that the Source class is an unnecessary > >>>>>>>>>>>>>>>>> > >>>>>>>>>>>>>>>> abstraction, as > >>>>>>>>>>>>>>>> > >>>>>>>>>>>>>>>>> it simply holds data. What if we only had a Reader<S, B> > >>>>>>>>>>>>>>>>> > >>>>>>>>>>>>>>>> class > >>>>>>>> > >>>>>>>>> where > >>>>>>>>>>>>>> > >>>>>>>>>>>>>>> S is > >>>>>>>>>>>>>>>> > >>>>>>>>>>>>>>>>> the source type and B the output type? Then connectors > >> would > >>>>>>>>>>>>>>>>> > >>>>>>>>>>>>>>>> be > >>>>>>>>> > >>>>>>>>>> something > >>>>>>>>>>>>>>>> > >>>>>>>>>>>>>>>>> like: ParDo<A, S> + hypothetical Read.using(Reader<S, > B>). > >>>>>>>>>>>>>>>>> - Then somebody remarked that some of the features of > >> Source > >>>>>>>>>>>>>>>>> > >>>>>>>>>>>>>>>> are > >>>>>>>>> > >>>>>>>>>> useful to > >>>>>>>>>>>>>>>> > >>>>>>>>>>>>>>>>> ParDo's as well: e.g. ability to report progress when > >>>>>>>>>>>>>>>>> > >>>>>>>>>>>>>>>> processing a > >>>>>>>>>> > >>>>>>>>>>> very > >>>>>>>>>>>>>>>> > >>>>>>>>>>>>>>>>> heavy element, or ability to produce very large output in > >>>>>>>>>>>>>>>>> > >>>>>>>>>>>>>>>> parallel. > >>>>>>>>>>>>>> > >>>>>>>>>>>>>>> - The two previous bullets were already hinting that the > >>>>>>>>>>>>>>>>> > >>>>>>>>>>>>>>>> Read.using() > >>>>>>>>>>>>>> > >>>>>>>>>>>>>>> primitive might not be so special: it just takes S and > >>>>>>>>>>>>>>>>> > >>>>>>>>>>>>>>>> produces > >>>>>>>>> > >>>>>>>>>> B: > >>>>>>>>>> > >>>>>>>>>>> isn't > >>>>>>>>>>>>>>>> > >>>>>>>>>>>>>>>>> that what a ParDo does, plus some source magic, minus the > >>>>>>>>>>>>>>>>> > >>>>>>>>>>>>>>>> convenience > >>>>>>>>>>>>>> > >>>>>>>>>>>>>>> of > >>>>>>>>>>>>>>>> > >>>>>>>>>>>>>>>>> c.output() vs. the start/advance() state machine? > >>>>>>>>>>>>>>>>> - At this point it became clear that we should explore > >>>>>>>>>>>>>>>>> > >>>>>>>>>>>>>>>> unifying > >>>>>>>>> > >>>>>>>>>> sources > >>>>>>>>>>>>>>>> > >>>>>>>>>>>>>>>>> and ParDo's, in particular: can we bring the magic of > >>>>>>>>>>>>>>>>> > >>>>>>>>>>>>>>>> sources > >>>>>>>> > >>>>>>>>> to > >>>>>>>>> > >>>>>>>>>> ParDo's > >>>>>>>>>>>>>>>> > >>>>>>>>>>>>>>>>> but without the limitations and coding inconveniences? > And > >>>>>>>>>>>>>>>>> > >>>>>>>>>>>>>>>> this > >>>>>>>>> > >>>>>>>>>> is > >>>>>>>>>> > >>>>>>>>>>> how > >>>>>>>>>>>>>>>> > >>>>>>>>>>>>>>>>> SplittableDoFn was born: bringing source magic to a DoFn > >> by > >>>>>>>>>>>>>>>>> > >>>>>>>>>>>>>>>> providing > >>>>>>>>>>>>>> > >>>>>>>>>>>>>>> a > >>>>>>>>>>>>>>>> > >>>>>>>>>>>>>>>>> RangeTracker. > >>>>>>>>>>>>>>>>> - Once the idea of "splittable DoFn's" was born, it > became > >>>>>>>>>>>>>>>>> > >>>>>>>>>>>>>>>> clear > >>>>>>>>> > >>>>>>>>>> that > >>>>>>>>>>>>>> > >>>>>>>>>>>>>>> it > >>>>>>>>>>>>>>>> > >>>>>>>>>>>>>>>>> is strictly more general than sources; at least, in the > >>>>>>>>>>>>>>>>> > >>>>>>>>>>>>>>>> respect > >>>>>>>>> > >>>>>>>>>> that > >>>>>>>>>>>>>> > >>>>>>>>>>>>>>> sources have to produce output, while DoFn's don't: an SDF > >>>>>>>>>>>>>>>>> > >>>>>>>>>>>>>>>> may > >>>>>>>> > >>>>>>>>> very > >>>>>>>>>>>>>> > >>>>>>>>>>>>>>> well > >>>>>>>>>>>>>>>> > >>>>>>>>>>>>>>>>> produce no output at all, and simply perform a side > effect > >>>>>>>>>>>>>>>>> > >>>>>>>>>>>>>>>> in > >>>>>>>> > >>>>>>>>> a > >>>>>>>>> > >>>>>>>>>> parallel/resumable way. > >>>>>>>>>>>>>>>>> - Then there were countless hours of discussions on > >> unifying > >>>>>>>>>>>>>>>>> > >>>>>>>>>>>>>>>> the > >>>>>>>>> > >>>>>>>>>> bounded/unbounded cases, on the particulars of RangeTracker > >>>>>>>>>>>>>>>>> > >>>>>>>>>>>>>>>> APIs > >>>>>>>>> > >>>>>>>>>> reconciling parallelization and checkpointing, what the > >>>>>>>>>>>>>>>>> > >>>>>>>>>>>>>>>> relation > >>>>>>>>> > >>>>>>>>>> between > >>>>>>>>>>>>>>>> > >>>>>>>>>>>>>>>>> SDF and DF should be, etc. They culminated in the current > >>>>>>>>>>>>>>>>> > >>>>>>>>>>>>>>>> proposal. > >>>>>>>>>>>>>> > >>>>>>>>>>>>>>> The > >>>>>>>>>>>>>>>> > >>>>>>>>>>>>>>>>> proposal comes at a time when a couple of key ingredients > >>>>>>>>>>>>>>>>> > >>>>>>>>>>>>>>>> are > >>>>>>>> > >>>>>>>>> (almost) > >>>>>>>>>>>>>>>> > >>>>>>>>>>>>>>>>> ready: NewDoFn to make SDF look like a regular DoFn, and > >> the > >>>>>>>>>>>>>>>>> > >>>>>>>>>>>>>>>> State/Timers > >>>>>>>>>>>>>>>> > >>>>>>>>>>>>>>>>> proposal to enable unbounded work per element. > >>>>>>>>>>>>>>>>> > >>>>>>>>>>>>>>>>> To put it shortly: > >>>>>>>>>>>>>>>>> - Yes, we will support existing Source connectors, and > >> will > >>>>>>>>>>>>>>>>> > >>>>>>>>>>>>>>>> support > >>>>>>>>>>>>>> > >>>>>>>>>>>>>>> writing new ones, possibly forever. There is no > interference > >>>>>>>>>>>>>>>>> > >>>>>>>>>>>>>>>> with > >>>>>>>>>> > >>>>>>>>>>> current > >>>>>>>>>>>>>>>> > >>>>>>>>>>>>>>>>> users of Source. > >>>>>>>>>>>>>>>>> - The new API is an attempt to improve the Source API, > >> taken > >>>>>>>>>>>>>>>>> > >>>>>>>>>>>>>>>> to > >>>>>>>>> > >>>>>>>>>> its > >>>>>>>>>>>>>> > >>>>>>>>>>>>>>> logical limit where it turns out that users' goals can be > >>>>>>>>>>>>>>>>> > >>>>>>>>>>>>>>>> accomplished > >>>>>>>>>>>>>>>> > >>>>>>>>>>>>>>>>> easier and more generically entirely within ParDo's. > >>>>>>>>>>>>>>>>> > >>>>>>>>>>>>>>>>> Let me know what you think, and thanks again! > >>>>>>>>>>>>>>>>> > >>>>>>>>>>>>>>>>> On Fri, Aug 5, 2016 at 2:39 AM Jean-Baptiste Onofré > >>>>>>>>>>>>>>>>> > >>>>>>>>>>>>>>>> <j...@nanthrax.net <javascript:;>> > >>>>>>>>>>>>>> > >>>>>>>>>>>>>>> wrote: > >>>>>>>>>>>>>>>>> > >>>>>>>>>>>>>>>>> Hi Eugene, > >>>>>>>>>>>>>>>>>> > >>>>>>>>>>>>>>>>>> Just a question: why is it in DoFn and note an > >> improvement > >>>>>>>>>>>>>>>>>> > >>>>>>>>>>>>>>>>> of > >>>>>>>> > >>>>>>>>> Source > >>>>>>>>>>>>>> > >>>>>>>>>>>>>>> ? > >>>>>>>>>>>>>>>> > >>>>>>>>>>>>>>>>> > >>>>>>>>>>>>>>>>>> If I understand correctly, it means that we will have to > >>>>>>>>>>>>>>>>>> > >>>>>>>>>>>>>>>>> refactore > >>>>>>>>>>>>>> > >>>>>>>>>>>>>>> all > >>>>>>>>>>>>>>>> > >>>>>>>>>>>>>>>>> existing IO: basically, what you propose is to remove all > >>>>>>>>>>>>>>>>>> > >>>>>>>>>>>>>>>>> Source > >>>>>>>>>> > >>>>>>>>>>> to > >>>>>>>>>>>>>> > >>>>>>>>>>>>>>> replace with NewDoFn. > >>>>>>>>>>>>>>>>>> > >>>>>>>>>>>>>>>>>> I'm concern with this approach, especially in term of > >>>>>>>>>>>>>>>>>> > >>>>>>>>>>>>>>>>> timing: > >>>>>>>> > >>>>>>>>> clearly, > >>>>>>>>>>>>>>>> > >>>>>>>>>>>>>>>>> the IO is the area where we have to move forward in Beam > >> as > >>>>>>>>>>>>>>>>>> > >>>>>>>>>>>>>>>>> it > >>>>>>>>> > >>>>>>>>>> will > >>>>>>>>>>>>>> > >>>>>>>>>>>>>>> allow new users to start in their projects. > >>>>>>>>>>>>>>>>>> So, we started to bring new IOs: Kafka, JMS, Cassandra, > >>>>>>>>>>>>>>>>>> > >>>>>>>>>>>>>>>>> MongoDB, > >>>>>>>>>> > >>>>>>>>>>> JDBC, > >>>>>>>>>>>>>>>> > >>>>>>>>>>>>>>>>> ... and some people started to learn the IO API > >>>>>>>>>>>>>>>>>> > >>>>>>>>>>>>>>>>> (Bounded/Unbouded > >>>>>>>>>> > >>>>>>>>>>> source, etc). > >>>>>>>>>>>>>>>>>> > >>>>>>>>>>>>>>>>>> I think it would make more sense to enhance the IO API > >>>>>>>>>>>>>>>>>> > >>>>>>>>>>>>>>>>> (Source) > >>>>>>>>> > >>>>>>>>>> instead > >>>>>>>>>>>>>>>> > >>>>>>>>>>>>>>>>> of introducing a NewDoFn. > >>>>>>>>>>>>>>>>>> > >>>>>>>>>>>>>>>>>> What are your thoughts for IO writer like me ? ;) > >>>>>>>>>>>>>>>>>> > >>>>>>>>>>>>>>>>>> Regards > >>>>>>>>>>>>>>>>>> JB > >>>>>>>>>>>>>>>>>> > >>>>>>>>>>>>>>>>>> On 08/04/2016 07:45 PM, Eugene Kirpichov wrote: > >>>>>>>>>>>>>>>>>> > >>>>>>>>>>>>>>>>>>> Hello Beam community, > >>>>>>>>>>>>>>>>>>> > >>>>>>>>>>>>>>>>>>> We (myself, Daniel Mills and Robert Bradshaw) would > like > >>>>>>>>>>>>>>>>>>> > >>>>>>>>>>>>>>>>>> to > >>>>>>>> > >>>>>>>>> propose > >>>>>>>>>>>>>>>> > >>>>>>>>>>>>>>>>> "Splittable DoFn" - a major generalization of DoFn, which > >>>>>>>>>>>>>>>>>>> > >>>>>>>>>>>>>>>>>> allows > >>>>>>>>>>>>>> > >>>>>>>>>>>>>>> processing > >>>>>>>>>>>>>>>>>> > >>>>>>>>>>>>>>>>>>> of a single element to be non-monolithic, i.e. > >>>>>>>>>>>>>>>>>>> > >>>>>>>>>>>>>>>>>> checkpointable > >>>>>>>>> > >>>>>>>>>> and > >>>>>>>>>>>>>> > >>>>>>>>>>>>>>> parallelizable, as well as doing an unbounded amount of > >>>>>>>>>>>>>>>>>>> > >>>>>>>>>>>>>>>>>> work > >>>>>>>>> > >>>>>>>>>> per > >>>>>>>>>>>>>> > >>>>>>>>>>>>>>> element. > >>>>>>>>>>>>>>>>>> > >>>>>>>>>>>>>>>>>>> > >>>>>>>>>>>>>>>>>>> This allows effectively replacing the current > >>>>>>>>>>>>>>>>>>> > >>>>>>>>>>>>>>>>>> Bounded/UnboundedSource > >>>>>>>>>>>>>>>> > >>>>>>>>>>>>>>>>> APIs > >>>>>>>>>>>>>>>>>> > >>>>>>>>>>>>>>>>>>> with DoFn's that are much easier to code, more scalable > >>>>>>>>>>>>>>>>>>> > >>>>>>>>>>>>>>>>>> and > >>>>>>>> > >>>>>>>>> composable > >>>>>>>>>>>>>>>> > >>>>>>>>>>>>>>>>> with > >>>>>>>>>>>>>>>>>> > >>>>>>>>>>>>>>>>>>> the rest of the Beam programming model, and enables > many > >>>>>>>>>>>>>>>>>>> > >>>>>>>>>>>>>>>>>> use > >>>>>>>>> > >>>>>>>>>> cases > >>>>>>>>>>>>>> > >>>>>>>>>>>>>>> that > >>>>>>>>>>>>>>>> > >>>>>>>>>>>>>>>>> were previously difficult or impossible, as well as some > >>>>>>>>>>>>>>>>>>> > >>>>>>>>>>>>>>>>>> non-obvious new > >>>>>>>>>>>>>>>> > >>>>>>>>>>>>>>>>> use cases. > >>>>>>>>>>>>>>>>>>> > >>>>>>>>>>>>>>>>>>> This proposal has been mentioned before in JIRA > >> [BEAM-65] > >>>>>>>>>>>>>>>>>>> > >>>>>>>>>>>>>>>>>> and > >>>>>>>>> > >>>>>>>>>> some > >>>>>>>>>>>>>> > >>>>>>>>>>>>>>> Beam > >>>>>>>>>>>>>>>> > >>>>>>>>>>>>>>>>> meetings, and now the whole thing is written up in a > >>>>>>>>>>>>>>>>>>> > >>>>>>>>>>>>>>>>>> document: > >>>>>>>>>> > >>>>>>>>>>> > >>>>>>>>>>>>>>>>>>> https://s.apache.org/splittable-do-fn > >>>>>>>>>>>>>>>>>>> > >>>>>>>>>>>>>>>>>>> Here are some things that become possible with > >> Splittable > >>>>>>>>>>>>>>>>>>> > >>>>>>>>>>>>>>>>>> DoFn: > >>>>>>>>>> > >>>>>>>>>>> - Efficiently read a filepattern matching millions of > >>>>>>>>>>>>>>>>>>> > >>>>>>>>>>>>>>>>>> files > >>>>>>>> > >>>>>>>>> - Read a collection of files that are produced by an > >>>>>>>>>>>>>>>>>>> > >>>>>>>>>>>>>>>>>> earlier > >>>>>>>>> > >>>>>>>>>> step > >>>>>>>>>>>>>> > >>>>>>>>>>>>>>> in the > >>>>>>>>>>>>>>>> > >>>>>>>>>>>>>>>>> pipeline (e.g. easily implement a connector to a storage > >>>>>>>>>>>>>>>>>>> > >>>>>>>>>>>>>>>>>> system > >>>>>>>>>> > >>>>>>>>>>> that can > >>>>>>>>>>>>>>>> > >>>>>>>>>>>>>>>>> export itself to files) > >>>>>>>>>>>>>>>>>>> - Implement a Kafka reader by composing a "list > >>>>>>>>>>>>>>>>>>> > >>>>>>>>>>>>>>>>>> partitions" > >>>>>>>> > >>>>>>>>> DoFn > >>>>>>>>>>>>>> > >>>>>>>>>>>>>>> with a > >>>>>>>>>>>>>>>> > >>>>>>>>>>>>>>>>> DoFn that simply polls a consumer and outputs new records > >>>>>>>>>>>>>>>>>>> > >>>>>>>>>>>>>>>>>> in > >>>>>>>>> > >>>>>>>>>> a > >>>>>>>>>> > >>>>>>>>>>> while() > >>>>>>>>>>>>>>>> > >>>>>>>>>>>>>>>>> loop > >>>>>>>>>>>>>>>>>> > >>>>>>>>>>>>>>>>>>> - Implement a log tailer by composing a DoFn that > >>>>>>>>>>>>>>>>>>> > >>>>>>>>>>>>>>>>>> incrementally > >>>>>>>>>> > >>>>>>>>>>> returns > >>>>>>>>>>>>>>>> > >>>>>>>>>>>>>>>>> new > >>>>>>>>>>>>>>>>>> > >>>>>>>>>>>>>>>>>>> files in a directory and a DoFn that tails a file > >>>>>>>>>>>>>>>>>>> - Implement a parallel "count friends in common" > >>>>>>>>>>>>>>>>>>> > >>>>>>>>>>>>>>>>>> algorithm > >>>>>>>> > >>>>>>>>> (matrix > >>>>>>>>>>>>>> > >>>>>>>>>>>>>>> squaring) with good work balancing > >>>>>>>>>>>>>>>>>>> > >>>>>>>>>>>>>>>>>>> Here is the meaningful part of a hypothetical Kafka > >>>>>>>>>>>>>>>>>>> > >>>>>>>>>>>>>>>>>> reader > >>>>>>>> > >>>>>>>>> written > >>>>>>>>>>>>>> > >>>>>>>>>>>>>>> against > >>>>>>>>>>>>>>>>>> > >>>>>>>>>>>>>>>>>>> this API: > >>>>>>>>>>>>>>>>>>> > >>>>>>>>>>>>>>>>>>> ProcessContinuation processElement( > >>>>>>>>>>>>>>>>>>> ProcessContext context, OffsetRangeTracker > >>>>>>>>>>>>>>>>>>> > >>>>>>>>>>>>>>>>>> tracker) > >>>>>>>>>> > >>>>>>>>>>> { > >>>>>>>>>>>>>> > >>>>>>>>>>>>>>> try (KafkaConsumer<String, String> consumer = > >>>>>>>>>>>>>>>>>>> > Kafka.subscribe(context.element().topic, > >>>>>>>>>>>>>>>>>>> > >>>>>>>>>>>>>>>>>>> context.element().partition)) { > >>>>>>>>>> > >>>>>>>>>>> consumer.seek(tracker.start()); > >>>>>>>>>>>>>>>>>>> while (true) { > >>>>>>>>>>>>>>>>>>> ConsumerRecords<String, String> records = > >>>>>>>>>>>>>>>>>>> > >>>>>>>>>>>>>>>>>> consumer.poll(100ms); > >>>>>>>>>>>>>>>>>> > >>>>>>>>>>>>>>>>>>> if (records == null) return done(); > >>>>>>>>>>>>>>>>>>> for (ConsumerRecord<String, String> record : > >>>>>>>>>>>>>>>>>>> > >>>>>>>>>>>>>>>>>> records) > >>>>>>>>>> > >>>>>>>>>>> { > >>>>>>>>>>>>>> > >>>>>>>>>>>>>>> if (!tracker.tryClaim(record.offset())) { > >>>>>>>>>>>>>>>>>>> return > >>>>>>>>>>>>>>>>>>> > >>>>>>>>>>>>>>>>>> resume().withFutureOutputWatermark(record.timestamp()); > >>>>>>>>>>>>>>>>>> > >>>>>>>>>>>>>>>>>>> } > >>>>>>>>>>>>>>>>>>> context.output(record); > >>>>>>>>>>>>>>>>>>> } > >>>>>>>>>>>>>>>>>>> } > >>>>>>>>>>>>>>>>>>> } > >>>>>>>>>>>>>>>>>>> } > >>>>>>>>>>>>>>>>>>> > >>>>>>>>>>>>>>>>>>> The document describes in detail the motivations behind > >>>>>>>>>>>>>>>>>>> > >>>>>>>>>>>>>>>>>> this > >>>>>>>>> > >>>>>>>>>> feature, > >>>>>>>>>>>>>>>> > >>>>>>>>>>>>>>>>> the > >>>>>>>>>>>>>>>>>> > >>>>>>>>>>>>>>>>>>> basic idea and API, open questions, and outlines an > >>>>>>>>>>>>>>>>>>> > >>>>>>>>>>>>>>>>>> incremental > >>>>>>>>>> > >>>>>>>>>>> delivery > >>>>>>>>>>>>>>>> > >>>>>>>>>>>>>>>>> plan. > >>>>>>>>>>>>>>>>>>> > >>>>>>>>>>>>>>>>>>> The proposed API builds on the reflection-based new > DoFn > >>>>>>>>>>>>>>>>>>> > >>>>>>>>>>>>>>>>>> [new-do-fn] > >>>>>>>>>>>>>>>> > >>>>>>>>>>>>>>>>> and is > >>>>>>>>>>>>>>>>>> > >>>>>>>>>>>>>>>>>>> loosely related to "State and Timers for DoFn" > >>>>>>>>>>>>>>>>>>> > >>>>>>>>>>>>>>>>>> [beam-state]. > >>>>>>>>> > >>>>>>>>>> > >>>>>>>>>>>>>>>>>>> Please take a look and comment! > >>>>>>>>>>>>>>>>>>> > >>>>>>>>>>>>>>>>>>> Thanks. > >>>>>>>>>>>>>>>>>>> > >>>>>>>>>>>>>>>>>>> [BEAM-65] > https://issues.apache.org/jira/browse/BEAM-65 > >>>>>>>>>>>>>>>>>>> [new-do-fn] https://s.apache.org/a-new-do-fn > >>>>>>>>>>>>>>>>>>> [beam-state] https://s.apache.org/beam-state > >>>>>>>>>>>>>>>>>>> > >>>>>>>>>>>>>>>>>>> > >>>>>>>>>>>>>>>>>> -- > >>>>>>>>>>>>>>>>>> Jean-Baptiste Onofré > >>>>>>>>>>>>>>>>>> jbono...@apache.org <javascript:;> > >>>>>>>>>>>>>>>>>> http://blog.nanthrax.net > >>>>>>>>>>>>>>>>>> Talend - http://www.talend.com > >>>>>>>>>>>>>>>>>> > >>>>>>>>>>>>>>>>>> > >>>>>>>>>>>>>>>>> > >>>>>>>>>>>>>>> > >>>>>>>>>>>>> > >>>>>>>>>>>> > >>>>>>>>>> > >>>>>>>>> > >>>>>>>>> -- > >>>>>>>>> Thanks, > >>>>>>>>> Andrew > >>>>>>>>> > >>>>>>>>> Subscribe to my book: Streaming Data <http://manning.com/psaltis > > > >>>>>>>>> <https://www.linkedin.com/pub/andrew-psaltis/1/17b/306> > >>>>>>>>> twiiter: @itmdata < > >>> http://twitter.com/intent/user?screen_name=itmdata > >>>>>>>>>> > >>>>>>>>> > >>>>>>>>> > >>>>>>>> > >>>>>>> > >>>>>> -- > >>>>>> Jean-Baptiste Onofré > >>>>>> jbono...@apache.org > >>>>>> http://blog.nanthrax.net > >>>>>> Talend - http://www.talend.com > >>>>>> > >>>>>> > >>>>> > >>>> -- > >>>> Jean-Baptiste Onofré > >>>> jbono...@apache.org > >>>> http://blog.nanthrax.net > >>>> Talend - http://www.talend.com > >>>> > >>> > >> > > > > -- > Jean-Baptiste Onofré > jbono...@apache.org > http://blog.nanthrax.net > Talend - http://www.talend.com >