Hi, I have another question about this: currently, unbounded sources have special logic for determining the watermark and the system periodically asks the sources for the current watermark. As I understood it, watermarks are only "generated" at the sources. How will this work when sources are implemented as a combination of DoFns and SplittableDoFns? Will SplittableDoFns be asked for a watermark, does this mean that watermarks can then be "generated" at any operation?
Cheers, Aljoscha On Sun, 21 Aug 2016 at 22:34 Eugene Kirpichov <[email protected]> wrote: > Hi JB, > > Yes, I'm assuming you're referring to the "magic" part on the transform > expansion diagram. This is indeed runner-specific, and timers+state are > likely the simplest way to do this for an SDF that does unbounded amount of > work. > > On Sun, Aug 21, 2016 at 12:14 PM Jean-Baptiste Onofré <[email protected]> > wrote: > > > Anyway, from a runner perspective, we will have kind of API (part of the > > Runner API) to "orchestrate" the SDF as we discussed during the call, > > right ? > > > > Regards > > JB > > > > On 08/21/2016 07:24 PM, Eugene Kirpichov wrote: > > > Hi Aljoscha, > > > This is an excellent question! And the answer is, we don't need any new > > > concepts like "SDF executor" and can rely on the per-key state and > timers > > > machinery that already exists in all runners because it's necessary to > > > implement windowing/triggering properly. > > > > > > Note that this is already somewhat addressed in the previously posted > > State > > > and Timers proposal https://s.apache.org/beam-state , under "per-key > > > workflows". > > > > > > Think of it this way, using the Kafka example: we'll expand it into a > > > transform: > > > > > > (1) ParDo { topic -> (unique key, topic, partition, [0, inf))) for > > > partition in topic.listPartitions() } > > > (2) GroupByKey > > > (3) ParDo { key, topic, partition, R -> Kafka reader code in the > > > proposal/slides } > > > - R is the OffsetRange restriction which in this case will be always > of > > > the form [startOffset, inf). > > > - there'll be just 1 value per key, but we use GBK to just get access > > to > > > the per-key state/timers machinery. This may be runner-specific; maybe > > some > > > runners don't need a GBK to do that. > > > > > > Now suppose the topic has two partitions, P1 and P2, and they get > > assigned > > > unique keys K1, K2. > > > Then the input to (3) will be a collection of: (K1, topic, P1, [0, > inf)), > > > (K2, topic, P2, [0, inf)). > > > Suppose we have just 1 worker with just 1 thread. Now, how will this > > thread > > > be able to produce elements from both P1 and P2? here's how. > > > > > > The thread will process (K1, topic, P1, [0, inf)), checkpoint after a > > > certain time or after a certain number of elements are output (just > like > > > with the current UnboundedSource reading code) producing a residual > > > restriction R1' (basically a new start timestamp), put R11 into the > > per-key > > > state and set a timer T1 to resume. > > > Then it will process (K2, topic, P2, [0, inf)), do the same producing a > > > residual restriction R2' and setting a timer T2 to resume. > > > Then timer T1 will fire in the context of the key K1. The thread will > > call > > > processElement again, this time supplying R1' as the restriction; the > > > process repeats and after a while it checkpoints and stores R1'' into > > state > > > of K1. > > > Then timer T2 will fire in the context of K2, run processElement for a > > > while, set a new timer and store R2'' into the state of K2. > > > Etc. > > > If partition 1 goes away, the processElement call will return "do not > > > resume", so a timer will not be set and instead the state associated > with > > > K1 will be GC'd. > > > > > > So basically it's almost like cooperative thread scheduling: things run > > for > > > a while, until the runner tells them to checkpoint, then they set a > timer > > > to resume themselves, and the runner fires the timers, and the process > > > repeats. And, again, this only requires things that runners can already > > do > > > - state and timers, but no new concept of SDF executor (and > consequently > > no > > > necessity to choose/tune how many you need). > > > > > > Makes sense? > > > > > > On Sat, Aug 20, 2016 at 9:46 AM Aljoscha Krettek <[email protected]> > > > wrote: > > > > > >> Hi, > > >> I have another question that I think wasn't addressed in the meeting. > At > > >> least it wasn't mentioned in the notes. > > >> > > >> In the context of replacing sources by a combination of to SDFs, how > do > > you > > >> determine how many "SDF executor" instances you need downstream? For > the > > >> sake of argument assume that both SDFs are executed with parallelism 1 > > (or > > >> one per worker). Now, if you have a file source that reads from a > static > > >> set of files the first SDF would emit the filenames while the second > SDF > > >> would receive the filenames and emit their contents. This works well > and > > >> the downstream SDF can process one filename after the other. Now, > think > > of > > >> something like a Kafka source. The first SDF would emit the partitions > > (say > > >> 4 partitions, in this example) and the second SDF would be responsible > > for > > >> reading from a topic and emitting elements. Reading from one topic > never > > >> finishes so you can't process the topics in series. I think you would > > need > > >> to have 4 downstream "SDF executor" instances. The question now is: > how > > do > > >> you determine whether you are in the first or the second situation? > > >> > > >> Probably I'm just overlooking something and this is already dealt with > > >> somewhere... :-) > > >> > > >> Cheers, > > >> Aljoscha > > >> > > >> On Fri, 19 Aug 2016 at 21:02 Ismaël Mejía <[email protected]> wrote: > > >> > > >>> Hello, > > >>> > > >>> Thanks for the notes both Dan and Eugene, and for taking the time to > do > > >> the > > >>> presentation and answer our questions. > > >>> > > >>> I mentioned the ongoing work on dynamic scaling on Flink because I > > >> suppose > > >>> that it will address dynamic rebalancing eventually (there are > multiple > > >>> changes going on for dynamic scaling). > > >>> > > >>> > > >>> > > >> > > > https://docs.google.com/document/d/1G1OS1z3xEBOrYD4wSu-LuBCyPUWyFd9l3T9WyssQ63w/edit#heading=h.2suhu1fjxmp4 > > >>> > > >>> > https://lists.apache.org/[email protected]:lte=1M:FLIP-8 > > >>> > > >>> Anyway I am far from an expert on flink, but probably the flink guys > > can > > >>> give their opinion about this and refer to a more precise document > that > > >> the > > >>> ones I mentioned.. > > >>> > > >>> Thanks again, > > >>> Ismaël > > >>> > > >>> On Fri, Aug 19, 2016 at 8:52 PM, Jean-Baptiste Onofré < > [email protected] > > > > > >>> wrote: > > >>> > > >>>> Great summary Eugene and Dan. > > >>>> > > >>>> And thanks again for the details, explanation, and discussion. > > >>>> > > >>>> Regards > > >>>> JB > > >>>> > > >>>> > > >>>> On 08/19/2016 08:16 PM, Eugene Kirpichov wrote: > > >>>> > > >>>>> Thanks for attending, everybody! > > >>>>> > > >>>>> Here are meeting notes (thanks Dan!). > > >>>>> > > >>>>> Q: Will SplittableDoFn enable better repartitioning of the > > >> input/output > > >>>>> data? > > >>>>> A: Not really; repartitioning is orthogonal to SDF. > > >>>>> > > >>>>> Current Source API suffers from lack of composition and scalability > > >>>>> because > > >>>>> we treat sources too much as metadata, not enough as data. > > >>>>> > > >>>>> Q(slide with transform expansion): who does the "magic"? > > >>>>> A: The runner. Checkpointing and dynamically splitting restrictions > > >> will > > >>>>> require collaboration with the runner. > > >>>>> > > >>>>> Q: How does the runner interact with the DoFn to control the > > >>> restrictions? > > >>>>> Is it related to the centralized job tracker etc.? > > >>>>> A: RestrictionTracker is a simple helper object, that exists purely > > on > > >>> the > > >>>>> worker while executing a single partition, and interacts with the > > >> worker > > >>>>> harness part of the runner. Not to be confused with the centralized > > >> job > > >>>>> tracker (master) - completely unrelated. Worker harness, of course, > > >>>>> interacts with the master in some relevant ways (e.g. Dataflow > master > > >>> can > > >>>>> tell "you're a straggler, you should split"). > > >>>>> > > >>>>> Q: Is this a new DoFn subclass, or how will this integrate with the > > >>>>> existing code? > > >>>>> A: It's a feature of reflection-based DoFn ( > > >>> https://s.apache.org/a-new-do > > >>>>> fn) > > >>>>> - just another optional parameter of type RestrictionTracker to > > >>>>> processElement() which is dynamically bound via reflection, so > fully > > >>>>> backward/forward compatible, and looks to users like a regular > DoFn. > > >>>>> > > >>>>> Q: why is fractionClaimed a double? > > >>>>> A: It's a number in 0..1. Job-wide magic (e.g. autoscaling, dynamic > > >>>>> rebalancing) requires a uniform way to represent progress through > > >>>>> different > > >>>>> sources. > > >>>>> > > >>>>> Q: Spark runner is microbatch-based, so this seems to map well onto > > >>>>> checkpoint/resume, right? > > >>>>> A: Yes; actually the Dataflow runner is, at a worker level, also > > >>>>> microbatch-based. The way SDF interacts with a runner will be very > > >>> similar > > >>>>> to how a Bounded/UnboundedSource interacts with a runner. > > >>>>> > > >>>>> Q: Using SDF, what would be the "packaging" of the IO? > > >>>>> A: Same as currently: package IO's as PTransforms and their > > >>> implementation > > >>>>> under the hood can be anything: Source, simple ParDo's, SDF, etc. > > E.g. > > >>>>> Datastore was recently refactored from BoundedSource to ParDo > (ended > > >> up > > >>>>> simpler and more scalable), transparently to users. > > >>>>> > > >>>>> Q: What's the timeline; what to do with the IOs currently in > > >>> development? > > >>>>> A: Timeline is O(months). Keep doing what you're doing and working > on > > >>> top > > >>>>> of Source APIs when necessary and simple ParDo's otherwise. > > >>>>> > > >>>>> Q: What's the impact for the runner writers? > > >>>>> A: Tentatively expected that most of the code for running an SDF > will > > >> be > > >>>>> common to runners, with some amount of per-runner glue code, just > > like > > >>>>> GBK/windowing/triggering. Impact on Dataflow runner is larger since > > it > > >>>>> supports dynamic rebalancing in batch mode and this is the hardest > > >> part, > > >>>>> but for other runners shouldn't be too hard. > > >>>>> > > >>>>> JB: Talend has people who can help with this: e.g. help integrate > > into > > >>>>> Spark runner, refactor IOs etc. Amit also willing to chat about > > >>> supporting > > >>>>> SDF in Spark runner. > > >>>>> > > >>>>> Ismael: There's a Flink proposal about dynamic rebalancing. Ismael > > >> will > > >>>>> send a link. > > >>>>> > > >>>>> > > >>>>> On Fri, Aug 19, 2016 at 7:21 AM Jean-Baptiste Onofré < > > [email protected] > > >>> > > >>>>> wrote: > > >>>>> > > >>>>> Hi Eugene, > > >>>>>> > > >>>>>> thanks for the reminder. > > >>>>>> > > >>>>>> Just to prepare some topics for the call, please find some points: > > >>>>>> > > >>>>>> 1. Using SDF, what would be the "packaging" of the IO ? It sounds > to > > >> me > > >>>>>> that we can keep the IO packaging style (using with* setters for > the > > >> IO > > >>>>>> configuration) and replace PTransform, Source, Reader, ... > directly > > >>> with > > >>>>>> SDF. Correct ? > > >>>>>> > > >>>>>> 2. What's your plan in term of release to include SDF ? We have > > >> several > > >>>>>> IOs in preparation and I wonder if it's worth to start to use the > > new > > >>>>>> SDF API or not. > > >>>>>> > > >>>>>> 3. What's the impact for the runner writers ? The runners will > have > > >> to > > >>>>>> support SDF, that could be tricky depending of the execution > engine. > > >> In > > >>>>>> the worst case where the runner can't fully support SDF, does it > > mean > > >>>>>> that most of our IOs will be useless ? > > >>>>>> > > >>>>>> Just my dumb topics ;) > > >>>>>> > > >>>>>> Thanks, > > >>>>>> See you at 8am ! > > >>>>>> > > >>>>>> Regards > > >>>>>> JB > > >>>>>> > > >>>>>> On 08/19/2016 02:29 AM, Eugene Kirpichov wrote: > > >>>>>> > > >>>>>>> Hello everybody, > > >>>>>>> > > >>>>>>> Just a reminder: > > >>>>>>> > > >>>>>>> The meeting is happening tomorrow - Friday Aug 19th, 8am-9am PST, > > to > > >>>>>>> join > > >>>>>>> the call go to > > >>>>>>> https://hangouts.google.com/hangouts/_/google.com/splittabledofn > . > > >>>>>>> I intend to go over the proposed design and then have a free-form > > >>>>>>> discussion. > > >>>>>>> > > >>>>>>> Please have a skim through the proposal doc: > https://s.apache.org/ > > >>>>>>> splittable-do-fn > > >>>>>>> I also made some slides that are basically a trimmed-down version > > of > > >>> the > > >>>>>>> doc to use as a guide when conducting the meeting, > > >>>>>>> > > >>>>>>> https://docs.google.com/presentation/d/1DfCp9VRLH-5Ap5nbz047 > > >>>>>> Fiv3NfjrywJVSdef5dRY4aE/edit?usp=sharing > > >>>>>> > > >>>>>>> . > > >>>>>>> > > >>>>>>> I will post notes from the meeting on this thread afterwards. > > >>>>>>> > > >>>>>>> Thanks, looking forward. > > >>>>>>> > > >>>>>>> On Fri, Aug 12, 2016 at 5:35 PM Dan Halperin > > >>>>>>> <[email protected] > > >>>>>>> > > >>>>>>> wrote: > > >>>>>>> > > >>>>>>> This is pretty cool! I'll be there too. (unless the hangout gets > > too > > >>>>>>>> > > >>>>>>> full > > >>>>>> > > >>>>>>> -- if so, I'll drop out in favor of others who aren't lucky > enough > > >> to > > >>>>>>>> > > >>>>>>> get > > >>>>>> > > >>>>>>> to talk to Eugene all the time.) > > >>>>>>>> > > >>>>>>>> On Fri, Aug 12, 2016 at 4:03 PM, Andrew Psaltis < > > >>>>>>>> > > >>>>>>> [email protected]> > > >>>>>> > > >>>>>>> wrote: > > >>>>>>>> > > >>>>>>>> +1 I'll join > > >>>>>>>>> > > >>>>>>>>> On Friday, August 12, 2016, Aparup Banerjee (apbanerj) < > > >>>>>>>>> > > >>>>>>>> [email protected] > > >>>>>>>> > > >>>>>>>>> > > >>>>>>>>>> wrote: > > >>>>>>>>> > > >>>>>>>>> + 1, me2 > > >>>>>>>>>> > > >>>>>>>>>> > > >>>>>>>>>> > > >>>>>>>>>> > > >>>>>>>>>> On 8/12/16, 9:27 AM, "Amit Sela" <[email protected] > > >>>>>>>>>> > > >>>>>>>>> <javascript:;>> > > >>>>>> > > >>>>>>> wrote: > > >>>>>>>>>> > > >>>>>>>>>> +1 as in I'll join ;-) > > >>>>>>>>>>> > > >>>>>>>>>>> On Fri, Aug 12, 2016, 19:14 Eugene Kirpichov > > >>>>>>>>>>> > > >>>>>>>>>> <[email protected] > > >>>>>>>>> > > >>>>>>>>>> > > >>>>>>>>>>> wrote: > > >>>>>>>>>>> > > >>>>>>>>>>> Sounds good, thanks! > > >>>>>>>>>>>> Then Friday Aug 19th it is, 8am-9am PST, > > >>>>>>>>>>>> https://staging.talkgadget.google.com/hangouts/_/google. > > >>>>>>>>>>>> > > >>>>>>>>>>> com/splittabledofn > > >>>>>>>>>> > > >>>>>>>>>>> > > >>>>>>>>>>>> On Thu, Aug 11, 2016 at 11:12 PM Jean-Baptiste Onofré < > > >>>>>>>>>>>> > > >>>>>>>>>>> [email protected] > > >>>>>>>>> > > >>>>>>>>>> <javascript:;>> > > >>>>>>>>>> > > >>>>>>>>>>> wrote: > > >>>>>>>>>>>> > > >>>>>>>>>>>> Hi > > >>>>>>>>>>>>> > > >>>>>>>>>>>>> Unfortunately I will be in Ireland on August 15th. What > about > > >>>>>>>>>>>>> > > >>>>>>>>>>>> Friday > > >>>>>>>> > > >>>>>>>>> 19th ? > > >>>>>>>>>>>> > > >>>>>>>>>>>>> > > >>>>>>>>>>>>> Regards > > >>>>>>>>>>>>> JB > > >>>>>>>>>>>>> > > >>>>>>>>>>>>> > > >>>>>>>>>>>>> > > >>>>>>>>>>>>> On Aug 11, 2016, 23:22, at 23:22, Eugene Kirpichov > > >>>>>>>>>>>>> <[email protected]> wrote: > > >>>>>>>>>>>>> > > >>>>>>>>>>>>>> Hi JB, > > >>>>>>>>>>>>>> > > >>>>>>>>>>>>>> Sounds great, does the suggested time over videoconference > > >> work > > >>>>>>>>>>>>>> > > >>>>>>>>>>>>> for > > >>>>>>>> > > >>>>>>>>> you? > > >>>>>>>>>>>>>> > > >>>>>>>>>>>>>> On Thu, Aug 11, 2016 at 11:59 AM Jean-Baptiste Onofré < > > >>>>>>>>>>>>>> > > >>>>>>>>>>>>> [email protected] <javascript:;>> > > >>>>>>>>>> > > >>>>>>>>>>> wrote: > > >>>>>>>>>>>>>> > > >>>>>>>>>>>>>> Hi Eugene > > >>>>>>>>>>>>>>> > > >>>>>>>>>>>>>>> May we talk together next week ? I like the proposal. I > > >> would > > >>>>>>>>>>>>>>> > > >>>>>>>>>>>>>> just > > >>>>>>>>> > > >>>>>>>>>> need > > >>>>>>>>>>>>>> > > >>>>>>>>>>>>>>> some details for my understanding. > > >>>>>>>>>>>>>>> > > >>>>>>>>>>>>>>> Thanks > > >>>>>>>>>>>>>>> Regards > > >>>>>>>>>>>>>>> JB > > >>>>>>>>>>>>>>> > > >>>>>>>>>>>>>>> > > >>>>>>>>>>>>>>> > > >>>>>>>>>>>>>>> On Aug 11, 2016, 19:46, at 19:46, Eugene Kirpichov > > >>>>>>>>>>>>>>> <[email protected]> wrote: > > >>>>>>>>>>>>>>> > > >>>>>>>>>>>>>>>> Hi JB, > > >>>>>>>>>>>>>>>> > > >>>>>>>>>>>>>>>> What are your thoughts on this? > > >>>>>>>>>>>>>>>> > > >>>>>>>>>>>>>>>> I'm also thinking of having a virtual meeting to explain > > >> more > > >>>>>>>>>>>>>>>> > > >>>>>>>>>>>>>>> about > > >>>>>>>>>> > > >>>>>>>>>>> this > > >>>>>>>>>>>>>>>> proposal if necessary, since I understand it is a lot to > > >>>>>>>>>>>>>>>> > > >>>>>>>>>>>>>>> digest. > > >>>>>>>> > > >>>>>>>>> > > >>>>>>>>>>>>>>>> How about: Monday Aug 15, 8am-9am Pacific time, over > > >>> Hangouts? > > >>>>>>>>>>>>>>>> (link: > > >>>>>>>>>>>>>>>> > > >>>>>>>>>>>>>>>> > > >>>>>>>>>>>>>>> > > >>>>>>>>>>>>>> > > >>>>>>>>>>>>> https://staging.talkgadget.google.com/hangouts/_/google. > > >>>>>>>>>>>> > > >>>>>>>>>>> com/splittabledofn > > >>>>>>>>>> > > >>>>>>>>>>> - > > >>>>>>>>>>>>>>>> I confirmed that it can be joined without being logged > > >> into a > > >>>>>>>>>>>>>>>> > > >>>>>>>>>>>>>>> Google > > >>>>>>>>>> > > >>>>>>>>>>> account) > > >>>>>>>>>>>>>>>> > > >>>>>>>>>>>>>>>> Who'd be interested in attending, and does this > time/date > > >>> work > > >>>>>>>>>>>>>>>> > > >>>>>>>>>>>>>>> for > > >>>>>>>>> > > >>>>>>>>>> people? > > >>>>>>>>>>>>>>>> > > >>>>>>>>>>>>>>>> On Fri, Aug 5, 2016 at 10:48 AM Eugene Kirpichov > > >>>>>>>>>>>>>>>> > > >>>>>>>>>>>>>>> <[email protected] <javascript:;>> > > >>>>>>>>>>>>>> > > >>>>>>>>>>>>>>> wrote: > > >>>>>>>>>>>>>>>> > > >>>>>>>>>>>>>>>> Hi JB, thanks for reading and for your comments! > > >>>>>>>>>>>>>>>>> > > >>>>>>>>>>>>>>>>> It sounds like you are concerned about continued > support > > >> for > > >>>>>>>>>>>>>>>>> > > >>>>>>>>>>>>>>>> existing > > >>>>>>>>>>>>>> > > >>>>>>>>>>>>>>> IO's > > >>>>>>>>>>>>>>>> > > >>>>>>>>>>>>>>>>> people have developed, and about backward > compatibility? > > >>>>>>>>>>>>>>>>> > > >>>>>>>>>>>>>>>>> We do not need to remove the Source API, and all > existing > > >>>>>>>>>>>>>>>>> > > >>>>>>>>>>>>>>>> Source-based > > >>>>>>>>>>>>>>>> > > >>>>>>>>>>>>>>>>> connectors will continue to work [though the document > > >>>>>>>>>>>>>>>>> > > >>>>>>>>>>>>>>>> proposes > > >>>>>>>> > > >>>>>>>>> at > > >>>>>>>>>> > > >>>>>>>>>>> some > > >>>>>>>>>>>>>>>> > > >>>>>>>>>>>>>>>>> point to make Read.from(Source) to translate to a > wrapper > > >>>>>>>>>>>>>>>>> > > >>>>>>>>>>>>>>>> SDF > > >>>>>>>> > > >>>>>>>>> under > > >>>>>>>>>>>>>> > > >>>>>>>>>>>>>>> the > > >>>>>>>>>>>>>>>> > > >>>>>>>>>>>>>>>>> hood, to exercise the feature more and to make sure > that > > >> it > > >>>>>>>>>>>>>>>>> > > >>>>>>>>>>>>>>>> is > > >>>>>>>> > > >>>>>>>>> strictly > > >>>>>>>>>>>>>>>> > > >>>>>>>>>>>>>>>>> more powerful - but this is an optional implementation > > >>>>>>>>>>>>>>>>> > > >>>>>>>>>>>>>>>> detail]. > > >>>>>>>>> > > >>>>>>>>>> > > >>>>>>>>>>>>>>>>> Perhaps the document phrases this too strongly - > > >> "replacing > > >>>>>>>>>>>>>>>>> > > >>>>>>>>>>>>>>>> the > > >>>>>>>>> > > >>>>>>>>>> Source > > >>>>>>>>>>>>>>>> > > >>>>>>>>>>>>>>>>> API": a better phrasing would be "introducing a new API > > so > > >>>>>>>>>>>>>>>>> > > >>>>>>>>>>>>>>>> powerful > > >>>>>>>>>>>>>> > > >>>>>>>>>>>>>>> and > > >>>>>>>>>>>>>>>> > > >>>>>>>>>>>>>>>>> easy-to-use that hopefully people will choose it over > the > > >>>>>>>>>>>>>>>>> > > >>>>>>>>>>>>>>>> Source > > >>>>>>>>> > > >>>>>>>>>> API > > >>>>>>>>>>>>>> > > >>>>>>>>>>>>>>> all > > >>>>>>>>>>>>>>>> > > >>>>>>>>>>>>>>>>> the time, even though they don't have to" :) And we can > > >>>>>>>>>>>>>>>>> > > >>>>>>>>>>>>>>>> discuss > > >>>>>>>>> > > >>>>>>>>>> whether or > > >>>>>>>>>>>>>>>> > > >>>>>>>>>>>>>>>>> not to actually deprecate/remove the Source API at some > > >>>>>>>>>>>>>>>>> > > >>>>>>>>>>>>>>>> point > > >>>>>>>> > > >>>>>>>>> down > > >>>>>>>>>> > > >>>>>>>>>>> the > > >>>>>>>>>>>>>>>> > > >>>>>>>>>>>>>>>>> road, once it becomes clear whether this is the case or > > >> not. > > >>>>>>>>>>>>>>>>> > > >>>>>>>>>>>>>>>>> To give more context: this proposal came out of > > >> discussions > > >>>>>>>>>>>>>>>>> > > >>>>>>>>>>>>>>>> within > > >>>>>>>>>> > > >>>>>>>>>>> the SDK > > >>>>>>>>>>>>>>>> > > >>>>>>>>>>>>>>>>> team over the past ~1.5 years, before the Beam project > > >>>>>>>>>>>>>>>>> > > >>>>>>>>>>>>>>>> existed, > > >>>>>>>>> > > >>>>>>>>>> on > > >>>>>>>>>> > > >>>>>>>>>>> how to > > >>>>>>>>>>>>>>>> > > >>>>>>>>>>>>>>>>> make major improvements to the Source API; perhaps it > > will > > >>>>>>>>>>>>>>>>> > > >>>>>>>>>>>>>>>> clarify > > >>>>>>>>>> > > >>>>>>>>>>> things > > >>>>>>>>>>>>>>>> > > >>>>>>>>>>>>>>>>> if I give a history of the ideas discussed: > > >>>>>>>>>>>>>>>>> - The first idea was to introduce a > > >>>>>>>>>>>>>>>>> > > >>>>>>>>>>>>>>>> Read.from(PCollection<Source>) > > >>>>>>>>>> > > >>>>>>>>>>> transform while keeping the Source API intact - this, given > > >>>>>>>>>>>>>>>>> > > >>>>>>>>>>>>>>>> appropriate > > >>>>>>>>>>>>>>>> > > >>>>>>>>>>>>>>>>> implementation, would solve most of the scalability and > > >>>>>>>>>>>>>>>>> > > >>>>>>>>>>>>>>>> composability > > >>>>>>>>>>>>>> > > >>>>>>>>>>>>>>> issues of IO's. Then most connectors would look like : > > >>>>>>>>>>>>>>>>> > > >>>>>>>>>>>>>>>> ParDo<A, > > >>>>>>>>> > > >>>>>>>>>> Source<B>> > > >>>>>>>>>>>>>>>> > > >>>>>>>>>>>>>>>>> + Read.from(). > > >>>>>>>>>>>>>>>>> - Then we figured that the Source class is an > unnecessary > > >>>>>>>>>>>>>>>>> > > >>>>>>>>>>>>>>>> abstraction, as > > >>>>>>>>>>>>>>>> > > >>>>>>>>>>>>>>>>> it simply holds data. What if we only had a Reader<S, > B> > > >>>>>>>>>>>>>>>>> > > >>>>>>>>>>>>>>>> class > > >>>>>>>> > > >>>>>>>>> where > > >>>>>>>>>>>>>> > > >>>>>>>>>>>>>>> S is > > >>>>>>>>>>>>>>>> > > >>>>>>>>>>>>>>>>> the source type and B the output type? Then connectors > > >> would > > >>>>>>>>>>>>>>>>> > > >>>>>>>>>>>>>>>> be > > >>>>>>>>> > > >>>>>>>>>> something > > >>>>>>>>>>>>>>>> > > >>>>>>>>>>>>>>>>> like: ParDo<A, S> + hypothetical Read.using(Reader<S, > > B>). > > >>>>>>>>>>>>>>>>> - Then somebody remarked that some of the features of > > >> Source > > >>>>>>>>>>>>>>>>> > > >>>>>>>>>>>>>>>> are > > >>>>>>>>> > > >>>>>>>>>> useful to > > >>>>>>>>>>>>>>>> > > >>>>>>>>>>>>>>>>> ParDo's as well: e.g. ability to report progress when > > >>>>>>>>>>>>>>>>> > > >>>>>>>>>>>>>>>> processing a > > >>>>>>>>>> > > >>>>>>>>>>> very > > >>>>>>>>>>>>>>>> > > >>>>>>>>>>>>>>>>> heavy element, or ability to produce very large output > in > > >>>>>>>>>>>>>>>>> > > >>>>>>>>>>>>>>>> parallel. > > >>>>>>>>>>>>>> > > >>>>>>>>>>>>>>> - The two previous bullets were already hinting that the > > >>>>>>>>>>>>>>>>> > > >>>>>>>>>>>>>>>> Read.using() > > >>>>>>>>>>>>>> > > >>>>>>>>>>>>>>> primitive might not be so special: it just takes S and > > >>>>>>>>>>>>>>>>> > > >>>>>>>>>>>>>>>> produces > > >>>>>>>>> > > >>>>>>>>>> B: > > >>>>>>>>>> > > >>>>>>>>>>> isn't > > >>>>>>>>>>>>>>>> > > >>>>>>>>>>>>>>>>> that what a ParDo does, plus some source magic, minus > the > > >>>>>>>>>>>>>>>>> > > >>>>>>>>>>>>>>>> convenience > > >>>>>>>>>>>>>> > > >>>>>>>>>>>>>>> of > > >>>>>>>>>>>>>>>> > > >>>>>>>>>>>>>>>>> c.output() vs. the start/advance() state machine? > > >>>>>>>>>>>>>>>>> - At this point it became clear that we should explore > > >>>>>>>>>>>>>>>>> > > >>>>>>>>>>>>>>>> unifying > > >>>>>>>>> > > >>>>>>>>>> sources > > >>>>>>>>>>>>>>>> > > >>>>>>>>>>>>>>>>> and ParDo's, in particular: can we bring the magic of > > >>>>>>>>>>>>>>>>> > > >>>>>>>>>>>>>>>> sources > > >>>>>>>> > > >>>>>>>>> to > > >>>>>>>>> > > >>>>>>>>>> ParDo's > > >>>>>>>>>>>>>>>> > > >>>>>>>>>>>>>>>>> but without the limitations and coding inconveniences? > > And > > >>>>>>>>>>>>>>>>> > > >>>>>>>>>>>>>>>> this > > >>>>>>>>> > > >>>>>>>>>> is > > >>>>>>>>>> > > >>>>>>>>>>> how > > >>>>>>>>>>>>>>>> > > >>>>>>>>>>>>>>>>> SplittableDoFn was born: bringing source magic to a > DoFn > > >> by > > >>>>>>>>>>>>>>>>> > > >>>>>>>>>>>>>>>> providing > > >>>>>>>>>>>>>> > > >>>>>>>>>>>>>>> a > > >>>>>>>>>>>>>>>> > > >>>>>>>>>>>>>>>>> RangeTracker. > > >>>>>>>>>>>>>>>>> - Once the idea of "splittable DoFn's" was born, it > > became > > >>>>>>>>>>>>>>>>> > > >>>>>>>>>>>>>>>> clear > > >>>>>>>>> > > >>>>>>>>>> that > > >>>>>>>>>>>>>> > > >>>>>>>>>>>>>>> it > > >>>>>>>>>>>>>>>> > > >>>>>>>>>>>>>>>>> is strictly more general than sources; at least, in the > > >>>>>>>>>>>>>>>>> > > >>>>>>>>>>>>>>>> respect > > >>>>>>>>> > > >>>>>>>>>> that > > >>>>>>>>>>>>>> > > >>>>>>>>>>>>>>> sources have to produce output, while DoFn's don't: an > SDF > > >>>>>>>>>>>>>>>>> > > >>>>>>>>>>>>>>>> may > > >>>>>>>> > > >>>>>>>>> very > > >>>>>>>>>>>>>> > > >>>>>>>>>>>>>>> well > > >>>>>>>>>>>>>>>> > > >>>>>>>>>>>>>>>>> produce no output at all, and simply perform a side > > effect > > >>>>>>>>>>>>>>>>> > > >>>>>>>>>>>>>>>> in > > >>>>>>>> > > >>>>>>>>> a > > >>>>>>>>> > > >>>>>>>>>> parallel/resumable way. > > >>>>>>>>>>>>>>>>> - Then there were countless hours of discussions on > > >> unifying > > >>>>>>>>>>>>>>>>> > > >>>>>>>>>>>>>>>> the > > >>>>>>>>> > > >>>>>>>>>> bounded/unbounded cases, on the particulars of RangeTracker > > >>>>>>>>>>>>>>>>> > > >>>>>>>>>>>>>>>> APIs > > >>>>>>>>> > > >>>>>>>>>> reconciling parallelization and checkpointing, what the > > >>>>>>>>>>>>>>>>> > > >>>>>>>>>>>>>>>> relation > > >>>>>>>>> > > >>>>>>>>>> between > > >>>>>>>>>>>>>>>> > > >>>>>>>>>>>>>>>>> SDF and DF should be, etc. They culminated in the > current > > >>>>>>>>>>>>>>>>> > > >>>>>>>>>>>>>>>> proposal. > > >>>>>>>>>>>>>> > > >>>>>>>>>>>>>>> The > > >>>>>>>>>>>>>>>> > > >>>>>>>>>>>>>>>>> proposal comes at a time when a couple of key > ingredients > > >>>>>>>>>>>>>>>>> > > >>>>>>>>>>>>>>>> are > > >>>>>>>> > > >>>>>>>>> (almost) > > >>>>>>>>>>>>>>>> > > >>>>>>>>>>>>>>>>> ready: NewDoFn to make SDF look like a regular DoFn, > and > > >> the > > >>>>>>>>>>>>>>>>> > > >>>>>>>>>>>>>>>> State/Timers > > >>>>>>>>>>>>>>>> > > >>>>>>>>>>>>>>>>> proposal to enable unbounded work per element. > > >>>>>>>>>>>>>>>>> > > >>>>>>>>>>>>>>>>> To put it shortly: > > >>>>>>>>>>>>>>>>> - Yes, we will support existing Source connectors, and > > >> will > > >>>>>>>>>>>>>>>>> > > >>>>>>>>>>>>>>>> support > > >>>>>>>>>>>>>> > > >>>>>>>>>>>>>>> writing new ones, possibly forever. There is no > > interference > > >>>>>>>>>>>>>>>>> > > >>>>>>>>>>>>>>>> with > > >>>>>>>>>> > > >>>>>>>>>>> current > > >>>>>>>>>>>>>>>> > > >>>>>>>>>>>>>>>>> users of Source. > > >>>>>>>>>>>>>>>>> - The new API is an attempt to improve the Source API, > > >> taken > > >>>>>>>>>>>>>>>>> > > >>>>>>>>>>>>>>>> to > > >>>>>>>>> > > >>>>>>>>>> its > > >>>>>>>>>>>>>> > > >>>>>>>>>>>>>>> logical limit where it turns out that users' goals can be > > >>>>>>>>>>>>>>>>> > > >>>>>>>>>>>>>>>> accomplished > > >>>>>>>>>>>>>>>> > > >>>>>>>>>>>>>>>>> easier and more generically entirely within ParDo's. > > >>>>>>>>>>>>>>>>> > > >>>>>>>>>>>>>>>>> Let me know what you think, and thanks again! > > >>>>>>>>>>>>>>>>> > > >>>>>>>>>>>>>>>>> On Fri, Aug 5, 2016 at 2:39 AM Jean-Baptiste Onofré > > >>>>>>>>>>>>>>>>> > > >>>>>>>>>>>>>>>> <[email protected] <javascript:;>> > > >>>>>>>>>>>>>> > > >>>>>>>>>>>>>>> wrote: > > >>>>>>>>>>>>>>>>> > > >>>>>>>>>>>>>>>>> Hi Eugene, > > >>>>>>>>>>>>>>>>>> > > >>>>>>>>>>>>>>>>>> Just a question: why is it in DoFn and note an > > >> improvement > > >>>>>>>>>>>>>>>>>> > > >>>>>>>>>>>>>>>>> of > > >>>>>>>> > > >>>>>>>>> Source > > >>>>>>>>>>>>>> > > >>>>>>>>>>>>>>> ? > > >>>>>>>>>>>>>>>> > > >>>>>>>>>>>>>>>>> > > >>>>>>>>>>>>>>>>>> If I understand correctly, it means that we will have > to > > >>>>>>>>>>>>>>>>>> > > >>>>>>>>>>>>>>>>> refactore > > >>>>>>>>>>>>>> > > >>>>>>>>>>>>>>> all > > >>>>>>>>>>>>>>>> > > >>>>>>>>>>>>>>>>> existing IO: basically, what you propose is to remove > all > > >>>>>>>>>>>>>>>>>> > > >>>>>>>>>>>>>>>>> Source > > >>>>>>>>>> > > >>>>>>>>>>> to > > >>>>>>>>>>>>>> > > >>>>>>>>>>>>>>> replace with NewDoFn. > > >>>>>>>>>>>>>>>>>> > > >>>>>>>>>>>>>>>>>> I'm concern with this approach, especially in term of > > >>>>>>>>>>>>>>>>>> > > >>>>>>>>>>>>>>>>> timing: > > >>>>>>>> > > >>>>>>>>> clearly, > > >>>>>>>>>>>>>>>> > > >>>>>>>>>>>>>>>>> the IO is the area where we have to move forward in > Beam > > >> as > > >>>>>>>>>>>>>>>>>> > > >>>>>>>>>>>>>>>>> it > > >>>>>>>>> > > >>>>>>>>>> will > > >>>>>>>>>>>>>> > > >>>>>>>>>>>>>>> allow new users to start in their projects. > > >>>>>>>>>>>>>>>>>> So, we started to bring new IOs: Kafka, JMS, > Cassandra, > > >>>>>>>>>>>>>>>>>> > > >>>>>>>>>>>>>>>>> MongoDB, > > >>>>>>>>>> > > >>>>>>>>>>> JDBC, > > >>>>>>>>>>>>>>>> > > >>>>>>>>>>>>>>>>> ... and some people started to learn the IO API > > >>>>>>>>>>>>>>>>>> > > >>>>>>>>>>>>>>>>> (Bounded/Unbouded > > >>>>>>>>>> > > >>>>>>>>>>> source, etc). > > >>>>>>>>>>>>>>>>>> > > >>>>>>>>>>>>>>>>>> I think it would make more sense to enhance the IO API > > >>>>>>>>>>>>>>>>>> > > >>>>>>>>>>>>>>>>> (Source) > > >>>>>>>>> > > >>>>>>>>>> instead > > >>>>>>>>>>>>>>>> > > >>>>>>>>>>>>>>>>> of introducing a NewDoFn. > > >>>>>>>>>>>>>>>>>> > > >>>>>>>>>>>>>>>>>> What are your thoughts for IO writer like me ? ;) > > >>>>>>>>>>>>>>>>>> > > >>>>>>>>>>>>>>>>>> Regards > > >>>>>>>>>>>>>>>>>> JB > > >>>>>>>>>>>>>>>>>> > > >>>>>>>>>>>>>>>>>> On 08/04/2016 07:45 PM, Eugene Kirpichov wrote: > > >>>>>>>>>>>>>>>>>> > > >>>>>>>>>>>>>>>>>>> Hello Beam community, > > >>>>>>>>>>>>>>>>>>> > > >>>>>>>>>>>>>>>>>>> We (myself, Daniel Mills and Robert Bradshaw) would > > like > > >>>>>>>>>>>>>>>>>>> > > >>>>>>>>>>>>>>>>>> to > > >>>>>>>> > > >>>>>>>>> propose > > >>>>>>>>>>>>>>>> > > >>>>>>>>>>>>>>>>> "Splittable DoFn" - a major generalization of DoFn, > which > > >>>>>>>>>>>>>>>>>>> > > >>>>>>>>>>>>>>>>>> allows > > >>>>>>>>>>>>>> > > >>>>>>>>>>>>>>> processing > > >>>>>>>>>>>>>>>>>> > > >>>>>>>>>>>>>>>>>>> of a single element to be non-monolithic, i.e. > > >>>>>>>>>>>>>>>>>>> > > >>>>>>>>>>>>>>>>>> checkpointable > > >>>>>>>>> > > >>>>>>>>>> and > > >>>>>>>>>>>>>> > > >>>>>>>>>>>>>>> parallelizable, as well as doing an unbounded amount of > > >>>>>>>>>>>>>>>>>>> > > >>>>>>>>>>>>>>>>>> work > > >>>>>>>>> > > >>>>>>>>>> per > > >>>>>>>>>>>>>> > > >>>>>>>>>>>>>>> element. > > >>>>>>>>>>>>>>>>>> > > >>>>>>>>>>>>>>>>>>> > > >>>>>>>>>>>>>>>>>>> This allows effectively replacing the current > > >>>>>>>>>>>>>>>>>>> > > >>>>>>>>>>>>>>>>>> Bounded/UnboundedSource > > >>>>>>>>>>>>>>>> > > >>>>>>>>>>>>>>>>> APIs > > >>>>>>>>>>>>>>>>>> > > >>>>>>>>>>>>>>>>>>> with DoFn's that are much easier to code, more > scalable > > >>>>>>>>>>>>>>>>>>> > > >>>>>>>>>>>>>>>>>> and > > >>>>>>>> > > >>>>>>>>> composable > > >>>>>>>>>>>>>>>> > > >>>>>>>>>>>>>>>>> with > > >>>>>>>>>>>>>>>>>> > > >>>>>>>>>>>>>>>>>>> the rest of the Beam programming model, and enables > > many > > >>>>>>>>>>>>>>>>>>> > > >>>>>>>>>>>>>>>>>> use > > >>>>>>>>> > > >>>>>>>>>> cases > > >>>>>>>>>>>>>> > > >>>>>>>>>>>>>>> that > > >>>>>>>>>>>>>>>> > > >>>>>>>>>>>>>>>>> were previously difficult or impossible, as well as > some > > >>>>>>>>>>>>>>>>>>> > > >>>>>>>>>>>>>>>>>> non-obvious new > > >>>>>>>>>>>>>>>> > > >>>>>>>>>>>>>>>>> use cases. > > >>>>>>>>>>>>>>>>>>> > > >>>>>>>>>>>>>>>>>>> This proposal has been mentioned before in JIRA > > >> [BEAM-65] > > >>>>>>>>>>>>>>>>>>> > > >>>>>>>>>>>>>>>>>> and > > >>>>>>>>> > > >>>>>>>>>> some > > >>>>>>>>>>>>>> > > >>>>>>>>>>>>>>> Beam > > >>>>>>>>>>>>>>>> > > >>>>>>>>>>>>>>>>> meetings, and now the whole thing is written up in a > > >>>>>>>>>>>>>>>>>>> > > >>>>>>>>>>>>>>>>>> document: > > >>>>>>>>>> > > >>>>>>>>>>> > > >>>>>>>>>>>>>>>>>>> https://s.apache.org/splittable-do-fn > > >>>>>>>>>>>>>>>>>>> > > >>>>>>>>>>>>>>>>>>> Here are some things that become possible with > > >> Splittable > > >>>>>>>>>>>>>>>>>>> > > >>>>>>>>>>>>>>>>>> DoFn: > > >>>>>>>>>> > > >>>>>>>>>>> - Efficiently read a filepattern matching millions of > > >>>>>>>>>>>>>>>>>>> > > >>>>>>>>>>>>>>>>>> files > > >>>>>>>> > > >>>>>>>>> - Read a collection of files that are produced by an > > >>>>>>>>>>>>>>>>>>> > > >>>>>>>>>>>>>>>>>> earlier > > >>>>>>>>> > > >>>>>>>>>> step > > >>>>>>>>>>>>>> > > >>>>>>>>>>>>>>> in the > > >>>>>>>>>>>>>>>> > > >>>>>>>>>>>>>>>>> pipeline (e.g. easily implement a connector to a > storage > > >>>>>>>>>>>>>>>>>>> > > >>>>>>>>>>>>>>>>>> system > > >>>>>>>>>> > > >>>>>>>>>>> that can > > >>>>>>>>>>>>>>>> > > >>>>>>>>>>>>>>>>> export itself to files) > > >>>>>>>>>>>>>>>>>>> - Implement a Kafka reader by composing a "list > > >>>>>>>>>>>>>>>>>>> > > >>>>>>>>>>>>>>>>>> partitions" > > >>>>>>>> > > >>>>>>>>> DoFn > > >>>>>>>>>>>>>> > > >>>>>>>>>>>>>>> with a > > >>>>>>>>>>>>>>>> > > >>>>>>>>>>>>>>>>> DoFn that simply polls a consumer and outputs new > records > > >>>>>>>>>>>>>>>>>>> > > >>>>>>>>>>>>>>>>>> in > > >>>>>>>>> > > >>>>>>>>>> a > > >>>>>>>>>> > > >>>>>>>>>>> while() > > >>>>>>>>>>>>>>>> > > >>>>>>>>>>>>>>>>> loop > > >>>>>>>>>>>>>>>>>> > > >>>>>>>>>>>>>>>>>>> - Implement a log tailer by composing a DoFn that > > >>>>>>>>>>>>>>>>>>> > > >>>>>>>>>>>>>>>>>> incrementally > > >>>>>>>>>> > > >>>>>>>>>>> returns > > >>>>>>>>>>>>>>>> > > >>>>>>>>>>>>>>>>> new > > >>>>>>>>>>>>>>>>>> > > >>>>>>>>>>>>>>>>>>> files in a directory and a DoFn that tails a file > > >>>>>>>>>>>>>>>>>>> - Implement a parallel "count friends in common" > > >>>>>>>>>>>>>>>>>>> > > >>>>>>>>>>>>>>>>>> algorithm > > >>>>>>>> > > >>>>>>>>> (matrix > > >>>>>>>>>>>>>> > > >>>>>>>>>>>>>>> squaring) with good work balancing > > >>>>>>>>>>>>>>>>>>> > > >>>>>>>>>>>>>>>>>>> Here is the meaningful part of a hypothetical Kafka > > >>>>>>>>>>>>>>>>>>> > > >>>>>>>>>>>>>>>>>> reader > > >>>>>>>> > > >>>>>>>>> written > > >>>>>>>>>>>>>> > > >>>>>>>>>>>>>>> against > > >>>>>>>>>>>>>>>>>> > > >>>>>>>>>>>>>>>>>>> this API: > > >>>>>>>>>>>>>>>>>>> > > >>>>>>>>>>>>>>>>>>> ProcessContinuation processElement( > > >>>>>>>>>>>>>>>>>>> ProcessContext context, > OffsetRangeTracker > > >>>>>>>>>>>>>>>>>>> > > >>>>>>>>>>>>>>>>>> tracker) > > >>>>>>>>>> > > >>>>>>>>>>> { > > >>>>>>>>>>>>>> > > >>>>>>>>>>>>>>> try (KafkaConsumer<String, String> consumer = > > >>>>>>>>>>>>>>>>>>> > > Kafka.subscribe(context.element().topic, > > >>>>>>>>>>>>>>>>>>> > > >>>>>>>>>>>>>>>>>>> context.element().partition)) { > > >>>>>>>>>> > > >>>>>>>>>>> consumer.seek(tracker.start()); > > >>>>>>>>>>>>>>>>>>> while (true) { > > >>>>>>>>>>>>>>>>>>> ConsumerRecords<String, String> records = > > >>>>>>>>>>>>>>>>>>> > > >>>>>>>>>>>>>>>>>> consumer.poll(100ms); > > >>>>>>>>>>>>>>>>>> > > >>>>>>>>>>>>>>>>>>> if (records == null) return done(); > > >>>>>>>>>>>>>>>>>>> for (ConsumerRecord<String, String> record > : > > >>>>>>>>>>>>>>>>>>> > > >>>>>>>>>>>>>>>>>> records) > > >>>>>>>>>> > > >>>>>>>>>>> { > > >>>>>>>>>>>>>> > > >>>>>>>>>>>>>>> if (!tracker.tryClaim(record.offset())) { > > >>>>>>>>>>>>>>>>>>> return > > >>>>>>>>>>>>>>>>>>> > > >>>>>>>>>>>>>>>>>> > resume().withFutureOutputWatermark(record.timestamp()); > > >>>>>>>>>>>>>>>>>> > > >>>>>>>>>>>>>>>>>>> } > > >>>>>>>>>>>>>>>>>>> context.output(record); > > >>>>>>>>>>>>>>>>>>> } > > >>>>>>>>>>>>>>>>>>> } > > >>>>>>>>>>>>>>>>>>> } > > >>>>>>>>>>>>>>>>>>> } > > >>>>>>>>>>>>>>>>>>> > > >>>>>>>>>>>>>>>>>>> The document describes in detail the motivations > behind > > >>>>>>>>>>>>>>>>>>> > > >>>>>>>>>>>>>>>>>> this > > >>>>>>>>> > > >>>>>>>>>> feature, > > >>>>>>>>>>>>>>>> > > >>>>>>>>>>>>>>>>> the > > >>>>>>>>>>>>>>>>>> > > >>>>>>>>>>>>>>>>>>> basic idea and API, open questions, and outlines an > > >>>>>>>>>>>>>>>>>>> > > >>>>>>>>>>>>>>>>>> incremental > > >>>>>>>>>> > > >>>>>>>>>>> delivery > > >>>>>>>>>>>>>>>> > > >>>>>>>>>>>>>>>>> plan. > > >>>>>>>>>>>>>>>>>>> > > >>>>>>>>>>>>>>>>>>> The proposed API builds on the reflection-based new > > DoFn > > >>>>>>>>>>>>>>>>>>> > > >>>>>>>>>>>>>>>>>> [new-do-fn] > > >>>>>>>>>>>>>>>> > > >>>>>>>>>>>>>>>>> and is > > >>>>>>>>>>>>>>>>>> > > >>>>>>>>>>>>>>>>>>> loosely related to "State and Timers for DoFn" > > >>>>>>>>>>>>>>>>>>> > > >>>>>>>>>>>>>>>>>> [beam-state]. > > >>>>>>>>> > > >>>>>>>>>> > > >>>>>>>>>>>>>>>>>>> Please take a look and comment! > > >>>>>>>>>>>>>>>>>>> > > >>>>>>>>>>>>>>>>>>> Thanks. > > >>>>>>>>>>>>>>>>>>> > > >>>>>>>>>>>>>>>>>>> [BEAM-65] > > https://issues.apache.org/jira/browse/BEAM-65 > > >>>>>>>>>>>>>>>>>>> [new-do-fn] https://s.apache.org/a-new-do-fn > > >>>>>>>>>>>>>>>>>>> [beam-state] https://s.apache.org/beam-state > > >>>>>>>>>>>>>>>>>>> > > >>>>>>>>>>>>>>>>>>> > > >>>>>>>>>>>>>>>>>> -- > > >>>>>>>>>>>>>>>>>> Jean-Baptiste Onofré > > >>>>>>>>>>>>>>>>>> [email protected] <javascript:;> > > >>>>>>>>>>>>>>>>>> http://blog.nanthrax.net > > >>>>>>>>>>>>>>>>>> Talend - http://www.talend.com > > >>>>>>>>>>>>>>>>>> > > >>>>>>>>>>>>>>>>>> > > >>>>>>>>>>>>>>>>> > > >>>>>>>>>>>>>>> > > >>>>>>>>>>>>> > > >>>>>>>>>>>> > > >>>>>>>>>> > > >>>>>>>>> > > >>>>>>>>> -- > > >>>>>>>>> Thanks, > > >>>>>>>>> Andrew > > >>>>>>>>> > > >>>>>>>>> Subscribe to my book: Streaming Data < > http://manning.com/psaltis > > > > > >>>>>>>>> <https://www.linkedin.com/pub/andrew-psaltis/1/17b/306> > > >>>>>>>>> twiiter: @itmdata < > > >>> http://twitter.com/intent/user?screen_name=itmdata > > >>>>>>>>>> > > >>>>>>>>> > > >>>>>>>>> > > >>>>>>>> > > >>>>>>> > > >>>>>> -- > > >>>>>> Jean-Baptiste Onofré > > >>>>>> [email protected] > > >>>>>> http://blog.nanthrax.net > > >>>>>> Talend - http://www.talend.com > > >>>>>> > > >>>>>> > > >>>>> > > >>>> -- > > >>>> Jean-Baptiste Onofré > > >>>> [email protected] > > >>>> http://blog.nanthrax.net > > >>>> Talend - http://www.talend.com > > >>>> > > >>> > > >> > > > > > > > -- > > Jean-Baptiste Onofré > > [email protected] > > http://blog.nanthrax.net > > Talend - http://www.talend.com > > >
