Thanks for the explanation Eugene and JB. By the way, I'm not trying to find holes in this, I really like the feature. I just sometimes wonder how a specific thing might be implemented with this.
On Mon, 29 Aug 2016 at 18:00 Eugene Kirpichov <[email protected]> wrote: > Hi Aljoscha, > > The watermark reporting is done via > ProcessContinuation.futureOutputWatermark, at the granularity of returning > from individual processElement() calls - you return from the call and give > a watermark on your future output. We assume that updating watermark is > sufficient at a per-bundle level (or, if not, then that you can make > bundles small enough) cause that's the same level at which state changes, > timers etc. are committed. > It can be implemented by setting a per-key watermark hold and updating it > when each call for this element returns. That's the way it is implemented > in my current prototype https://github.com/apache/incubator-beam/pull/896 > (see > SplittableParDo.ProcessFn) > > On Mon, Aug 29, 2016 at 2:55 AM Aljoscha Krettek <[email protected]> > wrote: > > > Hi, > > I have another question about this: currently, unbounded sources have > > special logic for determining the watermark and the system periodically > > asks the sources for the current watermark. As I understood it, > watermarks > > are only "generated" at the sources. How will this work when sources are > > implemented as a combination of DoFns and SplittableDoFns? Will > > SplittableDoFns be asked for a watermark, does this mean that watermarks > > can then be "generated" at any operation? > > > > Cheers, > > Aljoscha > > > > On Sun, 21 Aug 2016 at 22:34 Eugene Kirpichov > <[email protected] > > > > > wrote: > > > > > Hi JB, > > > > > > Yes, I'm assuming you're referring to the "magic" part on the transform > > > expansion diagram. This is indeed runner-specific, and timers+state are > > > likely the simplest way to do this for an SDF that does unbounded > amount > > of > > > work. > > > > > > On Sun, Aug 21, 2016 at 12:14 PM Jean-Baptiste Onofré <[email protected] > > > > > wrote: > > > > > > > Anyway, from a runner perspective, we will have kind of API (part of > > the > > > > Runner API) to "orchestrate" the SDF as we discussed during the call, > > > > right ? > > > > > > > > Regards > > > > JB > > > > > > > > On 08/21/2016 07:24 PM, Eugene Kirpichov wrote: > > > > > Hi Aljoscha, > > > > > This is an excellent question! And the answer is, we don't need any > > new > > > > > concepts like "SDF executor" and can rely on the per-key state and > > > timers > > > > > machinery that already exists in all runners because it's necessary > > to > > > > > implement windowing/triggering properly. > > > > > > > > > > Note that this is already somewhat addressed in the previously > posted > > > > State > > > > > and Timers proposal https://s.apache.org/beam-state , under > "per-key > > > > > workflows". > > > > > > > > > > Think of it this way, using the Kafka example: we'll expand it > into a > > > > > transform: > > > > > > > > > > (1) ParDo { topic -> (unique key, topic, partition, [0, inf))) for > > > > > partition in topic.listPartitions() } > > > > > (2) GroupByKey > > > > > (3) ParDo { key, topic, partition, R -> Kafka reader code in the > > > > > proposal/slides } > > > > > - R is the OffsetRange restriction which in this case will be > > always > > > of > > > > > the form [startOffset, inf). > > > > > - there'll be just 1 value per key, but we use GBK to just get > > access > > > > to > > > > > the per-key state/timers machinery. This may be runner-specific; > > maybe > > > > some > > > > > runners don't need a GBK to do that. > > > > > > > > > > Now suppose the topic has two partitions, P1 and P2, and they get > > > > assigned > > > > > unique keys K1, K2. > > > > > Then the input to (3) will be a collection of: (K1, topic, P1, [0, > > > inf)), > > > > > (K2, topic, P2, [0, inf)). > > > > > Suppose we have just 1 worker with just 1 thread. Now, how will > this > > > > thread > > > > > be able to produce elements from both P1 and P2? here's how. > > > > > > > > > > The thread will process (K1, topic, P1, [0, inf)), checkpoint > after a > > > > > certain time or after a certain number of elements are output (just > > > like > > > > > with the current UnboundedSource reading code) producing a residual > > > > > restriction R1' (basically a new start timestamp), put R11 into the > > > > per-key > > > > > state and set a timer T1 to resume. > > > > > Then it will process (K2, topic, P2, [0, inf)), do the same > > producing a > > > > > residual restriction R2' and setting a timer T2 to resume. > > > > > Then timer T1 will fire in the context of the key K1. The thread > will > > > > call > > > > > processElement again, this time supplying R1' as the restriction; > the > > > > > process repeats and after a while it checkpoints and stores R1'' > into > > > > state > > > > > of K1. > > > > > Then timer T2 will fire in the context of K2, run processElement > for > > a > > > > > while, set a new timer and store R2'' into the state of K2. > > > > > Etc. > > > > > If partition 1 goes away, the processElement call will return "do > not > > > > > resume", so a timer will not be set and instead the state > associated > > > with > > > > > K1 will be GC'd. > > > > > > > > > > So basically it's almost like cooperative thread scheduling: things > > run > > > > for > > > > > a while, until the runner tells them to checkpoint, then they set a > > > timer > > > > > to resume themselves, and the runner fires the timers, and the > > process > > > > > repeats. And, again, this only requires things that runners can > > already > > > > do > > > > > - state and timers, but no new concept of SDF executor (and > > > consequently > > > > no > > > > > necessity to choose/tune how many you need). > > > > > > > > > > Makes sense? > > > > > > > > > > On Sat, Aug 20, 2016 at 9:46 AM Aljoscha Krettek < > > [email protected]> > > > > > wrote: > > > > > > > > > >> Hi, > > > > >> I have another question that I think wasn't addressed in the > > meeting. > > > At > > > > >> least it wasn't mentioned in the notes. > > > > >> > > > > >> In the context of replacing sources by a combination of to SDFs, > how > > > do > > > > you > > > > >> determine how many "SDF executor" instances you need downstream? > For > > > the > > > > >> sake of argument assume that both SDFs are executed with > > parallelism 1 > > > > (or > > > > >> one per worker). Now, if you have a file source that reads from a > > > static > > > > >> set of files the first SDF would emit the filenames while the > second > > > SDF > > > > >> would receive the filenames and emit their contents. This works > well > > > and > > > > >> the downstream SDF can process one filename after the other. Now, > > > think > > > > of > > > > >> something like a Kafka source. The first SDF would emit the > > partitions > > > > (say > > > > >> 4 partitions, in this example) and the second SDF would be > > responsible > > > > for > > > > >> reading from a topic and emitting elements. Reading from one topic > > > never > > > > >> finishes so you can't process the topics in series. I think you > > would > > > > need > > > > >> to have 4 downstream "SDF executor" instances. The question now > is: > > > how > > > > do > > > > >> you determine whether you are in the first or the second > situation? > > > > >> > > > > >> Probably I'm just overlooking something and this is already dealt > > with > > > > >> somewhere... :-) > > > > >> > > > > >> Cheers, > > > > >> Aljoscha > > > > >> > > > > >> On Fri, 19 Aug 2016 at 21:02 Ismaël Mejía <[email protected]> > > wrote: > > > > >> > > > > >>> Hello, > > > > >>> > > > > >>> Thanks for the notes both Dan and Eugene, and for taking the time > > to > > > do > > > > >> the > > > > >>> presentation and answer our questions. > > > > >>> > > > > >>> I mentioned the ongoing work on dynamic scaling on Flink because > I > > > > >> suppose > > > > >>> that it will address dynamic rebalancing eventually (there are > > > multiple > > > > >>> changes going on for dynamic scaling). > > > > >>> > > > > >>> > > > > >>> > > > > >> > > > > > > > > > > https://docs.google.com/document/d/1G1OS1z3xEBOrYD4wSu-LuBCyPUWyFd9l3T9WyssQ63w/edit#heading=h.2suhu1fjxmp4 > > > > >>> > > > > >>> > > > https://lists.apache.org/[email protected]:lte=1M:FLIP-8 > > > > >>> > > > > >>> Anyway I am far from an expert on flink, but probably the flink > > guys > > > > can > > > > >>> give their opinion about this and refer to a more precise > document > > > that > > > > >> the > > > > >>> ones I mentioned.. > > > > >>> > > > > >>> Thanks again, > > > > >>> Ismaël > > > > >>> > > > > >>> On Fri, Aug 19, 2016 at 8:52 PM, Jean-Baptiste Onofré < > > > [email protected] > > > > > > > > > >>> wrote: > > > > >>> > > > > >>>> Great summary Eugene and Dan. > > > > >>>> > > > > >>>> And thanks again for the details, explanation, and discussion. > > > > >>>> > > > > >>>> Regards > > > > >>>> JB > > > > >>>> > > > > >>>> > > > > >>>> On 08/19/2016 08:16 PM, Eugene Kirpichov wrote: > > > > >>>> > > > > >>>>> Thanks for attending, everybody! > > > > >>>>> > > > > >>>>> Here are meeting notes (thanks Dan!). > > > > >>>>> > > > > >>>>> Q: Will SplittableDoFn enable better repartitioning of the > > > > >> input/output > > > > >>>>> data? > > > > >>>>> A: Not really; repartitioning is orthogonal to SDF. > > > > >>>>> > > > > >>>>> Current Source API suffers from lack of composition and > > scalability > > > > >>>>> because > > > > >>>>> we treat sources too much as metadata, not enough as data. > > > > >>>>> > > > > >>>>> Q(slide with transform expansion): who does the "magic"? > > > > >>>>> A: The runner. Checkpointing and dynamically splitting > > restrictions > > > > >> will > > > > >>>>> require collaboration with the runner. > > > > >>>>> > > > > >>>>> Q: How does the runner interact with the DoFn to control the > > > > >>> restrictions? > > > > >>>>> Is it related to the centralized job tracker etc.? > > > > >>>>> A: RestrictionTracker is a simple helper object, that exists > > purely > > > > on > > > > >>> the > > > > >>>>> worker while executing a single partition, and interacts with > the > > > > >> worker > > > > >>>>> harness part of the runner. Not to be confused with the > > centralized > > > > >> job > > > > >>>>> tracker (master) - completely unrelated. Worker harness, of > > course, > > > > >>>>> interacts with the master in some relevant ways (e.g. Dataflow > > > master > > > > >>> can > > > > >>>>> tell "you're a straggler, you should split"). > > > > >>>>> > > > > >>>>> Q: Is this a new DoFn subclass, or how will this integrate with > > the > > > > >>>>> existing code? > > > > >>>>> A: It's a feature of reflection-based DoFn ( > > > > >>> https://s.apache.org/a-new-do > > > > >>>>> fn) > > > > >>>>> - just another optional parameter of type RestrictionTracker to > > > > >>>>> processElement() which is dynamically bound via reflection, so > > > fully > > > > >>>>> backward/forward compatible, and looks to users like a regular > > > DoFn. > > > > >>>>> > > > > >>>>> Q: why is fractionClaimed a double? > > > > >>>>> A: It's a number in 0..1. Job-wide magic (e.g. autoscaling, > > dynamic > > > > >>>>> rebalancing) requires a uniform way to represent progress > through > > > > >>>>> different > > > > >>>>> sources. > > > > >>>>> > > > > >>>>> Q: Spark runner is microbatch-based, so this seems to map well > > onto > > > > >>>>> checkpoint/resume, right? > > > > >>>>> A: Yes; actually the Dataflow runner is, at a worker level, > also > > > > >>>>> microbatch-based. The way SDF interacts with a runner will be > > very > > > > >>> similar > > > > >>>>> to how a Bounded/UnboundedSource interacts with a runner. > > > > >>>>> > > > > >>>>> Q: Using SDF, what would be the "packaging" of the IO? > > > > >>>>> A: Same as currently: package IO's as PTransforms and their > > > > >>> implementation > > > > >>>>> under the hood can be anything: Source, simple ParDo's, SDF, > etc. > > > > E.g. > > > > >>>>> Datastore was recently refactored from BoundedSource to ParDo > > > (ended > > > > >> up > > > > >>>>> simpler and more scalable), transparently to users. > > > > >>>>> > > > > >>>>> Q: What's the timeline; what to do with the IOs currently in > > > > >>> development? > > > > >>>>> A: Timeline is O(months). Keep doing what you're doing and > > working > > > on > > > > >>> top > > > > >>>>> of Source APIs when necessary and simple ParDo's otherwise. > > > > >>>>> > > > > >>>>> Q: What's the impact for the runner writers? > > > > >>>>> A: Tentatively expected that most of the code for running an > SDF > > > will > > > > >> be > > > > >>>>> common to runners, with some amount of per-runner glue code, > just > > > > like > > > > >>>>> GBK/windowing/triggering. Impact on Dataflow runner is larger > > since > > > > it > > > > >>>>> supports dynamic rebalancing in batch mode and this is the > > hardest > > > > >> part, > > > > >>>>> but for other runners shouldn't be too hard. > > > > >>>>> > > > > >>>>> JB: Talend has people who can help with this: e.g. help > integrate > > > > into > > > > >>>>> Spark runner, refactor IOs etc. Amit also willing to chat about > > > > >>> supporting > > > > >>>>> SDF in Spark runner. > > > > >>>>> > > > > >>>>> Ismael: There's a Flink proposal about dynamic rebalancing. > > Ismael > > > > >> will > > > > >>>>> send a link. > > > > >>>>> > > > > >>>>> > > > > >>>>> On Fri, Aug 19, 2016 at 7:21 AM Jean-Baptiste Onofré < > > > > [email protected] > > > > >>> > > > > >>>>> wrote: > > > > >>>>> > > > > >>>>> Hi Eugene, > > > > >>>>>> > > > > >>>>>> thanks for the reminder. > > > > >>>>>> > > > > >>>>>> Just to prepare some topics for the call, please find some > > points: > > > > >>>>>> > > > > >>>>>> 1. Using SDF, what would be the "packaging" of the IO ? It > > sounds > > > to > > > > >> me > > > > >>>>>> that we can keep the IO packaging style (using with* setters > for > > > the > > > > >> IO > > > > >>>>>> configuration) and replace PTransform, Source, Reader, ... > > > directly > > > > >>> with > > > > >>>>>> SDF. Correct ? > > > > >>>>>> > > > > >>>>>> 2. What's your plan in term of release to include SDF ? We > have > > > > >> several > > > > >>>>>> IOs in preparation and I wonder if it's worth to start to use > > the > > > > new > > > > >>>>>> SDF API or not. > > > > >>>>>> > > > > >>>>>> 3. What's the impact for the runner writers ? The runners will > > > have > > > > >> to > > > > >>>>>> support SDF, that could be tricky depending of the execution > > > engine. > > > > >> In > > > > >>>>>> the worst case where the runner can't fully support SDF, does > it > > > > mean > > > > >>>>>> that most of our IOs will be useless ? > > > > >>>>>> > > > > >>>>>> Just my dumb topics ;) > > > > >>>>>> > > > > >>>>>> Thanks, > > > > >>>>>> See you at 8am ! > > > > >>>>>> > > > > >>>>>> Regards > > > > >>>>>> JB > > > > >>>>>> > > > > >>>>>> On 08/19/2016 02:29 AM, Eugene Kirpichov wrote: > > > > >>>>>> > > > > >>>>>>> Hello everybody, > > > > >>>>>>> > > > > >>>>>>> Just a reminder: > > > > >>>>>>> > > > > >>>>>>> The meeting is happening tomorrow - Friday Aug 19th, 8am-9am > > PST, > > > > to > > > > >>>>>>> join > > > > >>>>>>> the call go to > > > > >>>>>>> > > https://hangouts.google.com/hangouts/_/google.com/splittabledofn > > > . > > > > >>>>>>> I intend to go over the proposed design and then have a > > free-form > > > > >>>>>>> discussion. > > > > >>>>>>> > > > > >>>>>>> Please have a skim through the proposal doc: > > > https://s.apache.org/ > > > > >>>>>>> splittable-do-fn > > > > >>>>>>> I also made some slides that are basically a trimmed-down > > version > > > > of > > > > >>> the > > > > >>>>>>> doc to use as a guide when conducting the meeting, > > > > >>>>>>> > > > > >>>>>>> https://docs.google.com/presentation/d/1DfCp9VRLH-5Ap5nbz047 > > > > >>>>>> Fiv3NfjrywJVSdef5dRY4aE/edit?usp=sharing > > > > >>>>>> > > > > >>>>>>> . > > > > >>>>>>> > > > > >>>>>>> I will post notes from the meeting on this thread afterwards. > > > > >>>>>>> > > > > >>>>>>> Thanks, looking forward. > > > > >>>>>>> > > > > >>>>>>> On Fri, Aug 12, 2016 at 5:35 PM Dan Halperin > > > > >>>>>>> <[email protected] > > > > >>>>>>> > > > > >>>>>>> wrote: > > > > >>>>>>> > > > > >>>>>>> This is pretty cool! I'll be there too. (unless the hangout > > gets > > > > too > > > > >>>>>>>> > > > > >>>>>>> full > > > > >>>>>> > > > > >>>>>>> -- if so, I'll drop out in favor of others who aren't lucky > > > enough > > > > >> to > > > > >>>>>>>> > > > > >>>>>>> get > > > > >>>>>> > > > > >>>>>>> to talk to Eugene all the time.) > > > > >>>>>>>> > > > > >>>>>>>> On Fri, Aug 12, 2016 at 4:03 PM, Andrew Psaltis < > > > > >>>>>>>> > > > > >>>>>>> [email protected]> > > > > >>>>>> > > > > >>>>>>> wrote: > > > > >>>>>>>> > > > > >>>>>>>> +1 I'll join > > > > >>>>>>>>> > > > > >>>>>>>>> On Friday, August 12, 2016, Aparup Banerjee (apbanerj) < > > > > >>>>>>>>> > > > > >>>>>>>> [email protected] > > > > >>>>>>>> > > > > >>>>>>>>> > > > > >>>>>>>>>> wrote: > > > > >>>>>>>>> > > > > >>>>>>>>> + 1, me2 > > > > >>>>>>>>>> > > > > >>>>>>>>>> > > > > >>>>>>>>>> > > > > >>>>>>>>>> > > > > >>>>>>>>>> On 8/12/16, 9:27 AM, "Amit Sela" <[email protected] > > > > >>>>>>>>>> > > > > >>>>>>>>> <javascript:;>> > > > > >>>>>> > > > > >>>>>>> wrote: > > > > >>>>>>>>>> > > > > >>>>>>>>>> +1 as in I'll join ;-) > > > > >>>>>>>>>>> > > > > >>>>>>>>>>> On Fri, Aug 12, 2016, 19:14 Eugene Kirpichov > > > > >>>>>>>>>>> > > > > >>>>>>>>>> <[email protected] > > > > >>>>>>>>> > > > > >>>>>>>>>> > > > > >>>>>>>>>>> wrote: > > > > >>>>>>>>>>> > > > > >>>>>>>>>>> Sounds good, thanks! > > > > >>>>>>>>>>>> Then Friday Aug 19th it is, 8am-9am PST, > > > > >>>>>>>>>>>> https://staging.talkgadget.google.com/hangouts/_/google > . > > > > >>>>>>>>>>>> > > > > >>>>>>>>>>> com/splittabledofn > > > > >>>>>>>>>> > > > > >>>>>>>>>>> > > > > >>>>>>>>>>>> On Thu, Aug 11, 2016 at 11:12 PM Jean-Baptiste Onofré < > > > > >>>>>>>>>>>> > > > > >>>>>>>>>>> [email protected] > > > > >>>>>>>>> > > > > >>>>>>>>>> <javascript:;>> > > > > >>>>>>>>>> > > > > >>>>>>>>>>> wrote: > > > > >>>>>>>>>>>> > > > > >>>>>>>>>>>> Hi > > > > >>>>>>>>>>>>> > > > > >>>>>>>>>>>>> Unfortunately I will be in Ireland on August 15th. What > > > about > > > > >>>>>>>>>>>>> > > > > >>>>>>>>>>>> Friday > > > > >>>>>>>> > > > > >>>>>>>>> 19th ? > > > > >>>>>>>>>>>> > > > > >>>>>>>>>>>>> > > > > >>>>>>>>>>>>> Regards > > > > >>>>>>>>>>>>> JB > > > > >>>>>>>>>>>>> > > > > >>>>>>>>>>>>> > > > > >>>>>>>>>>>>> > > > > >>>>>>>>>>>>> On Aug 11, 2016, 23:22, at 23:22, Eugene Kirpichov > > > > >>>>>>>>>>>>> <[email protected]> wrote: > > > > >>>>>>>>>>>>> > > > > >>>>>>>>>>>>>> Hi JB, > > > > >>>>>>>>>>>>>> > > > > >>>>>>>>>>>>>> Sounds great, does the suggested time over > > videoconference > > > > >> work > > > > >>>>>>>>>>>>>> > > > > >>>>>>>>>>>>> for > > > > >>>>>>>> > > > > >>>>>>>>> you? > > > > >>>>>>>>>>>>>> > > > > >>>>>>>>>>>>>> On Thu, Aug 11, 2016 at 11:59 AM Jean-Baptiste Onofré > < > > > > >>>>>>>>>>>>>> > > > > >>>>>>>>>>>>> [email protected] <javascript:;>> > > > > >>>>>>>>>> > > > > >>>>>>>>>>> wrote: > > > > >>>>>>>>>>>>>> > > > > >>>>>>>>>>>>>> Hi Eugene > > > > >>>>>>>>>>>>>>> > > > > >>>>>>>>>>>>>>> May we talk together next week ? I like the > proposal. I > > > > >> would > > > > >>>>>>>>>>>>>>> > > > > >>>>>>>>>>>>>> just > > > > >>>>>>>>> > > > > >>>>>>>>>> need > > > > >>>>>>>>>>>>>> > > > > >>>>>>>>>>>>>>> some details for my understanding. > > > > >>>>>>>>>>>>>>> > > > > >>>>>>>>>>>>>>> Thanks > > > > >>>>>>>>>>>>>>> Regards > > > > >>>>>>>>>>>>>>> JB > > > > >>>>>>>>>>>>>>> > > > > >>>>>>>>>>>>>>> > > > > >>>>>>>>>>>>>>> > > > > >>>>>>>>>>>>>>> On Aug 11, 2016, 19:46, at 19:46, Eugene Kirpichov > > > > >>>>>>>>>>>>>>> <[email protected]> wrote: > > > > >>>>>>>>>>>>>>> > > > > >>>>>>>>>>>>>>>> Hi JB, > > > > >>>>>>>>>>>>>>>> > > > > >>>>>>>>>>>>>>>> What are your thoughts on this? > > > > >>>>>>>>>>>>>>>> > > > > >>>>>>>>>>>>>>>> I'm also thinking of having a virtual meeting to > > explain > > > > >> more > > > > >>>>>>>>>>>>>>>> > > > > >>>>>>>>>>>>>>> about > > > > >>>>>>>>>> > > > > >>>>>>>>>>> this > > > > >>>>>>>>>>>>>>>> proposal if necessary, since I understand it is a > lot > > to > > > > >>>>>>>>>>>>>>>> > > > > >>>>>>>>>>>>>>> digest. > > > > >>>>>>>> > > > > >>>>>>>>> > > > > >>>>>>>>>>>>>>>> How about: Monday Aug 15, 8am-9am Pacific time, over > > > > >>> Hangouts? > > > > >>>>>>>>>>>>>>>> (link: > > > > >>>>>>>>>>>>>>>> > > > > >>>>>>>>>>>>>>>> > > > > >>>>>>>>>>>>>>> > > > > >>>>>>>>>>>>>> > > > > >>>>>>>>>>>>> > https://staging.talkgadget.google.com/hangouts/_/google. > > > > >>>>>>>>>>>> > > > > >>>>>>>>>>> com/splittabledofn > > > > >>>>>>>>>> > > > > >>>>>>>>>>> - > > > > >>>>>>>>>>>>>>>> I confirmed that it can be joined without being > logged > > > > >> into a > > > > >>>>>>>>>>>>>>>> > > > > >>>>>>>>>>>>>>> Google > > > > >>>>>>>>>> > > > > >>>>>>>>>>> account) > > > > >>>>>>>>>>>>>>>> > > > > >>>>>>>>>>>>>>>> Who'd be interested in attending, and does this > > > time/date > > > > >>> work > > > > >>>>>>>>>>>>>>>> > > > > >>>>>>>>>>>>>>> for > > > > >>>>>>>>> > > > > >>>>>>>>>> people? > > > > >>>>>>>>>>>>>>>> > > > > >>>>>>>>>>>>>>>> On Fri, Aug 5, 2016 at 10:48 AM Eugene Kirpichov > > > > >>>>>>>>>>>>>>>> > > > > >>>>>>>>>>>>>>> <[email protected] <javascript:;>> > > > > >>>>>>>>>>>>>> > > > > >>>>>>>>>>>>>>> wrote: > > > > >>>>>>>>>>>>>>>> > > > > >>>>>>>>>>>>>>>> Hi JB, thanks for reading and for your comments! > > > > >>>>>>>>>>>>>>>>> > > > > >>>>>>>>>>>>>>>>> It sounds like you are concerned about continued > > > support > > > > >> for > > > > >>>>>>>>>>>>>>>>> > > > > >>>>>>>>>>>>>>>> existing > > > > >>>>>>>>>>>>>> > > > > >>>>>>>>>>>>>>> IO's > > > > >>>>>>>>>>>>>>>> > > > > >>>>>>>>>>>>>>>>> people have developed, and about backward > > > compatibility? > > > > >>>>>>>>>>>>>>>>> > > > > >>>>>>>>>>>>>>>>> We do not need to remove the Source API, and all > > > existing > > > > >>>>>>>>>>>>>>>>> > > > > >>>>>>>>>>>>>>>> Source-based > > > > >>>>>>>>>>>>>>>> > > > > >>>>>>>>>>>>>>>>> connectors will continue to work [though the > document > > > > >>>>>>>>>>>>>>>>> > > > > >>>>>>>>>>>>>>>> proposes > > > > >>>>>>>> > > > > >>>>>>>>> at > > > > >>>>>>>>>> > > > > >>>>>>>>>>> some > > > > >>>>>>>>>>>>>>>> > > > > >>>>>>>>>>>>>>>>> point to make Read.from(Source) to translate to a > > > wrapper > > > > >>>>>>>>>>>>>>>>> > > > > >>>>>>>>>>>>>>>> SDF > > > > >>>>>>>> > > > > >>>>>>>>> under > > > > >>>>>>>>>>>>>> > > > > >>>>>>>>>>>>>>> the > > > > >>>>>>>>>>>>>>>> > > > > >>>>>>>>>>>>>>>>> hood, to exercise the feature more and to make sure > > > that > > > > >> it > > > > >>>>>>>>>>>>>>>>> > > > > >>>>>>>>>>>>>>>> is > > > > >>>>>>>> > > > > >>>>>>>>> strictly > > > > >>>>>>>>>>>>>>>> > > > > >>>>>>>>>>>>>>>>> more powerful - but this is an optional > > implementation > > > > >>>>>>>>>>>>>>>>> > > > > >>>>>>>>>>>>>>>> detail]. > > > > >>>>>>>>> > > > > >>>>>>>>>> > > > > >>>>>>>>>>>>>>>>> Perhaps the document phrases this too strongly - > > > > >> "replacing > > > > >>>>>>>>>>>>>>>>> > > > > >>>>>>>>>>>>>>>> the > > > > >>>>>>>>> > > > > >>>>>>>>>> Source > > > > >>>>>>>>>>>>>>>> > > > > >>>>>>>>>>>>>>>>> API": a better phrasing would be "introducing a new > > API > > > > so > > > > >>>>>>>>>>>>>>>>> > > > > >>>>>>>>>>>>>>>> powerful > > > > >>>>>>>>>>>>>> > > > > >>>>>>>>>>>>>>> and > > > > >>>>>>>>>>>>>>>> > > > > >>>>>>>>>>>>>>>>> easy-to-use that hopefully people will choose it > over > > > the > > > > >>>>>>>>>>>>>>>>> > > > > >>>>>>>>>>>>>>>> Source > > > > >>>>>>>>> > > > > >>>>>>>>>> API > > > > >>>>>>>>>>>>>> > > > > >>>>>>>>>>>>>>> all > > > > >>>>>>>>>>>>>>>> > > > > >>>>>>>>>>>>>>>>> the time, even though they don't have to" :) And we > > can > > > > >>>>>>>>>>>>>>>>> > > > > >>>>>>>>>>>>>>>> discuss > > > > >>>>>>>>> > > > > >>>>>>>>>> whether or > > > > >>>>>>>>>>>>>>>> > > > > >>>>>>>>>>>>>>>>> not to actually deprecate/remove the Source API at > > some > > > > >>>>>>>>>>>>>>>>> > > > > >>>>>>>>>>>>>>>> point > > > > >>>>>>>> > > > > >>>>>>>>> down > > > > >>>>>>>>>> > > > > >>>>>>>>>>> the > > > > >>>>>>>>>>>>>>>> > > > > >>>>>>>>>>>>>>>>> road, once it becomes clear whether this is the > case > > or > > > > >> not. > > > > >>>>>>>>>>>>>>>>> > > > > >>>>>>>>>>>>>>>>> To give more context: this proposal came out of > > > > >> discussions > > > > >>>>>>>>>>>>>>>>> > > > > >>>>>>>>>>>>>>>> within > > > > >>>>>>>>>> > > > > >>>>>>>>>>> the SDK > > > > >>>>>>>>>>>>>>>> > > > > >>>>>>>>>>>>>>>>> team over the past ~1.5 years, before the Beam > > project > > > > >>>>>>>>>>>>>>>>> > > > > >>>>>>>>>>>>>>>> existed, > > > > >>>>>>>>> > > > > >>>>>>>>>> on > > > > >>>>>>>>>> > > > > >>>>>>>>>>> how to > > > > >>>>>>>>>>>>>>>> > > > > >>>>>>>>>>>>>>>>> make major improvements to the Source API; perhaps > it > > > > will > > > > >>>>>>>>>>>>>>>>> > > > > >>>>>>>>>>>>>>>> clarify > > > > >>>>>>>>>> > > > > >>>>>>>>>>> things > > > > >>>>>>>>>>>>>>>> > > > > >>>>>>>>>>>>>>>>> if I give a history of the ideas discussed: > > > > >>>>>>>>>>>>>>>>> - The first idea was to introduce a > > > > >>>>>>>>>>>>>>>>> > > > > >>>>>>>>>>>>>>>> Read.from(PCollection<Source>) > > > > >>>>>>>>>> > > > > >>>>>>>>>>> transform while keeping the Source API intact - this, > given > > > > >>>>>>>>>>>>>>>>> > > > > >>>>>>>>>>>>>>>> appropriate > > > > >>>>>>>>>>>>>>>> > > > > >>>>>>>>>>>>>>>>> implementation, would solve most of the scalability > > and > > > > >>>>>>>>>>>>>>>>> > > > > >>>>>>>>>>>>>>>> composability > > > > >>>>>>>>>>>>>> > > > > >>>>>>>>>>>>>>> issues of IO's. Then most connectors would look like > : > > > > >>>>>>>>>>>>>>>>> > > > > >>>>>>>>>>>>>>>> ParDo<A, > > > > >>>>>>>>> > > > > >>>>>>>>>> Source<B>> > > > > >>>>>>>>>>>>>>>> > > > > >>>>>>>>>>>>>>>>> + Read.from(). > > > > >>>>>>>>>>>>>>>>> - Then we figured that the Source class is an > > > unnecessary > > > > >>>>>>>>>>>>>>>>> > > > > >>>>>>>>>>>>>>>> abstraction, as > > > > >>>>>>>>>>>>>>>> > > > > >>>>>>>>>>>>>>>>> it simply holds data. What if we only had a > Reader<S, > > > B> > > > > >>>>>>>>>>>>>>>>> > > > > >>>>>>>>>>>>>>>> class > > > > >>>>>>>> > > > > >>>>>>>>> where > > > > >>>>>>>>>>>>>> > > > > >>>>>>>>>>>>>>> S is > > > > >>>>>>>>>>>>>>>> > > > > >>>>>>>>>>>>>>>>> the source type and B the output type? Then > > connectors > > > > >> would > > > > >>>>>>>>>>>>>>>>> > > > > >>>>>>>>>>>>>>>> be > > > > >>>>>>>>> > > > > >>>>>>>>>> something > > > > >>>>>>>>>>>>>>>> > > > > >>>>>>>>>>>>>>>>> like: ParDo<A, S> + hypothetical > Read.using(Reader<S, > > > > B>). > > > > >>>>>>>>>>>>>>>>> - Then somebody remarked that some of the features > of > > > > >> Source > > > > >>>>>>>>>>>>>>>>> > > > > >>>>>>>>>>>>>>>> are > > > > >>>>>>>>> > > > > >>>>>>>>>> useful to > > > > >>>>>>>>>>>>>>>> > > > > >>>>>>>>>>>>>>>>> ParDo's as well: e.g. ability to report progress > when > > > > >>>>>>>>>>>>>>>>> > > > > >>>>>>>>>>>>>>>> processing a > > > > >>>>>>>>>> > > > > >>>>>>>>>>> very > > > > >>>>>>>>>>>>>>>> > > > > >>>>>>>>>>>>>>>>> heavy element, or ability to produce very large > > output > > > in > > > > >>>>>>>>>>>>>>>>> > > > > >>>>>>>>>>>>>>>> parallel. > > > > >>>>>>>>>>>>>> > > > > >>>>>>>>>>>>>>> - The two previous bullets were already hinting that > > the > > > > >>>>>>>>>>>>>>>>> > > > > >>>>>>>>>>>>>>>> Read.using() > > > > >>>>>>>>>>>>>> > > > > >>>>>>>>>>>>>>> primitive might not be so special: it just takes S > and > > > > >>>>>>>>>>>>>>>>> > > > > >>>>>>>>>>>>>>>> produces > > > > >>>>>>>>> > > > > >>>>>>>>>> B: > > > > >>>>>>>>>> > > > > >>>>>>>>>>> isn't > > > > >>>>>>>>>>>>>>>> > > > > >>>>>>>>>>>>>>>>> that what a ParDo does, plus some source magic, > minus > > > the > > > > >>>>>>>>>>>>>>>>> > > > > >>>>>>>>>>>>>>>> convenience > > > > >>>>>>>>>>>>>> > > > > >>>>>>>>>>>>>>> of > > > > >>>>>>>>>>>>>>>> > > > > >>>>>>>>>>>>>>>>> c.output() vs. the start/advance() state machine? > > > > >>>>>>>>>>>>>>>>> - At this point it became clear that we should > > explore > > > > >>>>>>>>>>>>>>>>> > > > > >>>>>>>>>>>>>>>> unifying > > > > >>>>>>>>> > > > > >>>>>>>>>> sources > > > > >>>>>>>>>>>>>>>> > > > > >>>>>>>>>>>>>>>>> and ParDo's, in particular: can we bring the magic > of > > > > >>>>>>>>>>>>>>>>> > > > > >>>>>>>>>>>>>>>> sources > > > > >>>>>>>> > > > > >>>>>>>>> to > > > > >>>>>>>>> > > > > >>>>>>>>>> ParDo's > > > > >>>>>>>>>>>>>>>> > > > > >>>>>>>>>>>>>>>>> but without the limitations and coding > > inconveniences? > > > > And > > > > >>>>>>>>>>>>>>>>> > > > > >>>>>>>>>>>>>>>> this > > > > >>>>>>>>> > > > > >>>>>>>>>> is > > > > >>>>>>>>>> > > > > >>>>>>>>>>> how > > > > >>>>>>>>>>>>>>>> > > > > >>>>>>>>>>>>>>>>> SplittableDoFn was born: bringing source magic to a > > > DoFn > > > > >> by > > > > >>>>>>>>>>>>>>>>> > > > > >>>>>>>>>>>>>>>> providing > > > > >>>>>>>>>>>>>> > > > > >>>>>>>>>>>>>>> a > > > > >>>>>>>>>>>>>>>> > > > > >>>>>>>>>>>>>>>>> RangeTracker. > > > > >>>>>>>>>>>>>>>>> - Once the idea of "splittable DoFn's" was born, it > > > > became > > > > >>>>>>>>>>>>>>>>> > > > > >>>>>>>>>>>>>>>> clear > > > > >>>>>>>>> > > > > >>>>>>>>>> that > > > > >>>>>>>>>>>>>> > > > > >>>>>>>>>>>>>>> it > > > > >>>>>>>>>>>>>>>> > > > > >>>>>>>>>>>>>>>>> is strictly more general than sources; at least, in > > the > > > > >>>>>>>>>>>>>>>>> > > > > >>>>>>>>>>>>>>>> respect > > > > >>>>>>>>> > > > > >>>>>>>>>> that > > > > >>>>>>>>>>>>>> > > > > >>>>>>>>>>>>>>> sources have to produce output, while DoFn's don't: > an > > > SDF > > > > >>>>>>>>>>>>>>>>> > > > > >>>>>>>>>>>>>>>> may > > > > >>>>>>>> > > > > >>>>>>>>> very > > > > >>>>>>>>>>>>>> > > > > >>>>>>>>>>>>>>> well > > > > >>>>>>>>>>>>>>>> > > > > >>>>>>>>>>>>>>>>> produce no output at all, and simply perform a side > > > > effect > > > > >>>>>>>>>>>>>>>>> > > > > >>>>>>>>>>>>>>>> in > > > > >>>>>>>> > > > > >>>>>>>>> a > > > > >>>>>>>>> > > > > >>>>>>>>>> parallel/resumable way. > > > > >>>>>>>>>>>>>>>>> - Then there were countless hours of discussions on > > > > >> unifying > > > > >>>>>>>>>>>>>>>>> > > > > >>>>>>>>>>>>>>>> the > > > > >>>>>>>>> > > > > >>>>>>>>>> bounded/unbounded cases, on the particulars of > RangeTracker > > > > >>>>>>>>>>>>>>>>> > > > > >>>>>>>>>>>>>>>> APIs > > > > >>>>>>>>> > > > > >>>>>>>>>> reconciling parallelization and checkpointing, what the > > > > >>>>>>>>>>>>>>>>> > > > > >>>>>>>>>>>>>>>> relation > > > > >>>>>>>>> > > > > >>>>>>>>>> between > > > > >>>>>>>>>>>>>>>> > > > > >>>>>>>>>>>>>>>>> SDF and DF should be, etc. They culminated in the > > > current > > > > >>>>>>>>>>>>>>>>> > > > > >>>>>>>>>>>>>>>> proposal. > > > > >>>>>>>>>>>>>> > > > > >>>>>>>>>>>>>>> The > > > > >>>>>>>>>>>>>>>> > > > > >>>>>>>>>>>>>>>>> proposal comes at a time when a couple of key > > > ingredients > > > > >>>>>>>>>>>>>>>>> > > > > >>>>>>>>>>>>>>>> are > > > > >>>>>>>> > > > > >>>>>>>>> (almost) > > > > >>>>>>>>>>>>>>>> > > > > >>>>>>>>>>>>>>>>> ready: NewDoFn to make SDF look like a regular > DoFn, > > > and > > > > >> the > > > > >>>>>>>>>>>>>>>>> > > > > >>>>>>>>>>>>>>>> State/Timers > > > > >>>>>>>>>>>>>>>> > > > > >>>>>>>>>>>>>>>>> proposal to enable unbounded work per element. > > > > >>>>>>>>>>>>>>>>> > > > > >>>>>>>>>>>>>>>>> To put it shortly: > > > > >>>>>>>>>>>>>>>>> - Yes, we will support existing Source connectors, > > and > > > > >> will > > > > >>>>>>>>>>>>>>>>> > > > > >>>>>>>>>>>>>>>> support > > > > >>>>>>>>>>>>>> > > > > >>>>>>>>>>>>>>> writing new ones, possibly forever. There is no > > > > interference > > > > >>>>>>>>>>>>>>>>> > > > > >>>>>>>>>>>>>>>> with > > > > >>>>>>>>>> > > > > >>>>>>>>>>> current > > > > >>>>>>>>>>>>>>>> > > > > >>>>>>>>>>>>>>>>> users of Source. > > > > >>>>>>>>>>>>>>>>> - The new API is an attempt to improve the Source > > API, > > > > >> taken > > > > >>>>>>>>>>>>>>>>> > > > > >>>>>>>>>>>>>>>> to > > > > >>>>>>>>> > > > > >>>>>>>>>> its > > > > >>>>>>>>>>>>>> > > > > >>>>>>>>>>>>>>> logical limit where it turns out that users' goals > can > > be > > > > >>>>>>>>>>>>>>>>> > > > > >>>>>>>>>>>>>>>> accomplished > > > > >>>>>>>>>>>>>>>> > > > > >>>>>>>>>>>>>>>>> easier and more generically entirely within > ParDo's. > > > > >>>>>>>>>>>>>>>>> > > > > >>>>>>>>>>>>>>>>> Let me know what you think, and thanks again! > > > > >>>>>>>>>>>>>>>>> > > > > >>>>>>>>>>>>>>>>> On Fri, Aug 5, 2016 at 2:39 AM Jean-Baptiste Onofré > > > > >>>>>>>>>>>>>>>>> > > > > >>>>>>>>>>>>>>>> <[email protected] <javascript:;>> > > > > >>>>>>>>>>>>>> > > > > >>>>>>>>>>>>>>> wrote: > > > > >>>>>>>>>>>>>>>>> > > > > >>>>>>>>>>>>>>>>> Hi Eugene, > > > > >>>>>>>>>>>>>>>>>> > > > > >>>>>>>>>>>>>>>>>> Just a question: why is it in DoFn and note an > > > > >> improvement > > > > >>>>>>>>>>>>>>>>>> > > > > >>>>>>>>>>>>>>>>> of > > > > >>>>>>>> > > > > >>>>>>>>> Source > > > > >>>>>>>>>>>>>> > > > > >>>>>>>>>>>>>>> ? > > > > >>>>>>>>>>>>>>>> > > > > >>>>>>>>>>>>>>>>> > > > > >>>>>>>>>>>>>>>>>> If I understand correctly, it means that we will > > have > > > to > > > > >>>>>>>>>>>>>>>>>> > > > > >>>>>>>>>>>>>>>>> refactore > > > > >>>>>>>>>>>>>> > > > > >>>>>>>>>>>>>>> all > > > > >>>>>>>>>>>>>>>> > > > > >>>>>>>>>>>>>>>>> existing IO: basically, what you propose is to > remove > > > all > > > > >>>>>>>>>>>>>>>>>> > > > > >>>>>>>>>>>>>>>>> Source > > > > >>>>>>>>>> > > > > >>>>>>>>>>> to > > > > >>>>>>>>>>>>>> > > > > >>>>>>>>>>>>>>> replace with NewDoFn. > > > > >>>>>>>>>>>>>>>>>> > > > > >>>>>>>>>>>>>>>>>> I'm concern with this approach, especially in term > > of > > > > >>>>>>>>>>>>>>>>>> > > > > >>>>>>>>>>>>>>>>> timing: > > > > >>>>>>>> > > > > >>>>>>>>> clearly, > > > > >>>>>>>>>>>>>>>> > > > > >>>>>>>>>>>>>>>>> the IO is the area where we have to move forward in > > > Beam > > > > >> as > > > > >>>>>>>>>>>>>>>>>> > > > > >>>>>>>>>>>>>>>>> it > > > > >>>>>>>>> > > > > >>>>>>>>>> will > > > > >>>>>>>>>>>>>> > > > > >>>>>>>>>>>>>>> allow new users to start in their projects. > > > > >>>>>>>>>>>>>>>>>> So, we started to bring new IOs: Kafka, JMS, > > > Cassandra, > > > > >>>>>>>>>>>>>>>>>> > > > > >>>>>>>>>>>>>>>>> MongoDB, > > > > >>>>>>>>>> > > > > >>>>>>>>>>> JDBC, > > > > >>>>>>>>>>>>>>>> > > > > >>>>>>>>>>>>>>>>> ... and some people started to learn the IO API > > > > >>>>>>>>>>>>>>>>>> > > > > >>>>>>>>>>>>>>>>> (Bounded/Unbouded > > > > >>>>>>>>>> > > > > >>>>>>>>>>> source, etc). > > > > >>>>>>>>>>>>>>>>>> > > > > >>>>>>>>>>>>>>>>>> I think it would make more sense to enhance the IO > > API > > > > >>>>>>>>>>>>>>>>>> > > > > >>>>>>>>>>>>>>>>> (Source) > > > > >>>>>>>>> > > > > >>>>>>>>>> instead > > > > >>>>>>>>>>>>>>>> > > > > >>>>>>>>>>>>>>>>> of introducing a NewDoFn. > > > > >>>>>>>>>>>>>>>>>> > > > > >>>>>>>>>>>>>>>>>> What are your thoughts for IO writer like me ? ;) > > > > >>>>>>>>>>>>>>>>>> > > > > >>>>>>>>>>>>>>>>>> Regards > > > > >>>>>>>>>>>>>>>>>> JB > > > > >>>>>>>>>>>>>>>>>> > > > > >>>>>>>>>>>>>>>>>> On 08/04/2016 07:45 PM, Eugene Kirpichov wrote: > > > > >>>>>>>>>>>>>>>>>> > > > > >>>>>>>>>>>>>>>>>>> Hello Beam community, > > > > >>>>>>>>>>>>>>>>>>> > > > > >>>>>>>>>>>>>>>>>>> We (myself, Daniel Mills and Robert Bradshaw) > would > > > > like > > > > >>>>>>>>>>>>>>>>>>> > > > > >>>>>>>>>>>>>>>>>> to > > > > >>>>>>>> > > > > >>>>>>>>> propose > > > > >>>>>>>>>>>>>>>> > > > > >>>>>>>>>>>>>>>>> "Splittable DoFn" - a major generalization of DoFn, > > > which > > > > >>>>>>>>>>>>>>>>>>> > > > > >>>>>>>>>>>>>>>>>> allows > > > > >>>>>>>>>>>>>> > > > > >>>>>>>>>>>>>>> processing > > > > >>>>>>>>>>>>>>>>>> > > > > >>>>>>>>>>>>>>>>>>> of a single element to be non-monolithic, i.e. > > > > >>>>>>>>>>>>>>>>>>> > > > > >>>>>>>>>>>>>>>>>> checkpointable > > > > >>>>>>>>> > > > > >>>>>>>>>> and > > > > >>>>>>>>>>>>>> > > > > >>>>>>>>>>>>>>> parallelizable, as well as doing an unbounded amount > of > > > > >>>>>>>>>>>>>>>>>>> > > > > >>>>>>>>>>>>>>>>>> work > > > > >>>>>>>>> > > > > >>>>>>>>>> per > > > > >>>>>>>>>>>>>> > > > > >>>>>>>>>>>>>>> element. > > > > >>>>>>>>>>>>>>>>>> > > > > >>>>>>>>>>>>>>>>>>> > > > > >>>>>>>>>>>>>>>>>>> This allows effectively replacing the current > > > > >>>>>>>>>>>>>>>>>>> > > > > >>>>>>>>>>>>>>>>>> Bounded/UnboundedSource > > > > >>>>>>>>>>>>>>>> > > > > >>>>>>>>>>>>>>>>> APIs > > > > >>>>>>>>>>>>>>>>>> > > > > >>>>>>>>>>>>>>>>>>> with DoFn's that are much easier to code, more > > > scalable > > > > >>>>>>>>>>>>>>>>>>> > > > > >>>>>>>>>>>>>>>>>> and > > > > >>>>>>>> > > > > >>>>>>>>> composable > > > > >>>>>>>>>>>>>>>> > > > > >>>>>>>>>>>>>>>>> with > > > > >>>>>>>>>>>>>>>>>> > > > > >>>>>>>>>>>>>>>>>>> the rest of the Beam programming model, and > enables > > > > many > > > > >>>>>>>>>>>>>>>>>>> > > > > >>>>>>>>>>>>>>>>>> use > > > > >>>>>>>>> > > > > >>>>>>>>>> cases > > > > >>>>>>>>>>>>>> > > > > >>>>>>>>>>>>>>> that > > > > >>>>>>>>>>>>>>>> > > > > >>>>>>>>>>>>>>>>> were previously difficult or impossible, as well as > > > some > > > > >>>>>>>>>>>>>>>>>>> > > > > >>>>>>>>>>>>>>>>>> non-obvious new > > > > >>>>>>>>>>>>>>>> > > > > >>>>>>>>>>>>>>>>> use cases. > > > > >>>>>>>>>>>>>>>>>>> > > > > >>>>>>>>>>>>>>>>>>> This proposal has been mentioned before in JIRA > > > > >> [BEAM-65] > > > > >>>>>>>>>>>>>>>>>>> > > > > >>>>>>>>>>>>>>>>>> and > > > > >>>>>>>>> > > > > >>>>>>>>>> some > > > > >>>>>>>>>>>>>> > > > > >>>>>>>>>>>>>>> Beam > > > > >>>>>>>>>>>>>>>> > > > > >>>>>>>>>>>>>>>>> meetings, and now the whole thing is written up in > a > > > > >>>>>>>>>>>>>>>>>>> > > > > >>>>>>>>>>>>>>>>>> document: > > > > >>>>>>>>>> > > > > >>>>>>>>>>> > > > > >>>>>>>>>>>>>>>>>>> https://s.apache.org/splittable-do-fn > > > > >>>>>>>>>>>>>>>>>>> > > > > >>>>>>>>>>>>>>>>>>> Here are some things that become possible with > > > > >> Splittable > > > > >>>>>>>>>>>>>>>>>>> > > > > >>>>>>>>>>>>>>>>>> DoFn: > > > > >>>>>>>>>> > > > > >>>>>>>>>>> - Efficiently read a filepattern matching millions of > > > > >>>>>>>>>>>>>>>>>>> > > > > >>>>>>>>>>>>>>>>>> files > > > > >>>>>>>> > > > > >>>>>>>>> - Read a collection of files that are produced by an > > > > >>>>>>>>>>>>>>>>>>> > > > > >>>>>>>>>>>>>>>>>> earlier > > > > >>>>>>>>> > > > > >>>>>>>>>> step > > > > >>>>>>>>>>>>>> > > > > >>>>>>>>>>>>>>> in the > > > > >>>>>>>>>>>>>>>> > > > > >>>>>>>>>>>>>>>>> pipeline (e.g. easily implement a connector to a > > > storage > > > > >>>>>>>>>>>>>>>>>>> > > > > >>>>>>>>>>>>>>>>>> system > > > > >>>>>>>>>> > > > > >>>>>>>>>>> that can > > > > >>>>>>>>>>>>>>>> > > > > >>>>>>>>>>>>>>>>> export itself to files) > > > > >>>>>>>>>>>>>>>>>>> - Implement a Kafka reader by composing a "list > > > > >>>>>>>>>>>>>>>>>>> > > > > >>>>>>>>>>>>>>>>>> partitions" > > > > >>>>>>>> > > > > >>>>>>>>> DoFn > > > > >>>>>>>>>>>>>> > > > > >>>>>>>>>>>>>>> with a > > > > >>>>>>>>>>>>>>>> > > > > >>>>>>>>>>>>>>>>> DoFn that simply polls a consumer and outputs new > > > records > > > > >>>>>>>>>>>>>>>>>>> > > > > >>>>>>>>>>>>>>>>>> in > > > > >>>>>>>>> > > > > >>>>>>>>>> a > > > > >>>>>>>>>> > > > > >>>>>>>>>>> while() > > > > >>>>>>>>>>>>>>>> > > > > >>>>>>>>>>>>>>>>> loop > > > > >>>>>>>>>>>>>>>>>> > > > > >>>>>>>>>>>>>>>>>>> - Implement a log tailer by composing a DoFn that > > > > >>>>>>>>>>>>>>>>>>> > > > > >>>>>>>>>>>>>>>>>> incrementally > > > > >>>>>>>>>> > > > > >>>>>>>>>>> returns > > > > >>>>>>>>>>>>>>>> > > > > >>>>>>>>>>>>>>>>> new > > > > >>>>>>>>>>>>>>>>>> > > > > >>>>>>>>>>>>>>>>>>> files in a directory and a DoFn that tails a file > > > > >>>>>>>>>>>>>>>>>>> - Implement a parallel "count friends in common" > > > > >>>>>>>>>>>>>>>>>>> > > > > >>>>>>>>>>>>>>>>>> algorithm > > > > >>>>>>>> > > > > >>>>>>>>> (matrix > > > > >>>>>>>>>>>>>> > > > > >>>>>>>>>>>>>>> squaring) with good work balancing > > > > >>>>>>>>>>>>>>>>>>> > > > > >>>>>>>>>>>>>>>>>>> Here is the meaningful part of a hypothetical > Kafka > > > > >>>>>>>>>>>>>>>>>>> > > > > >>>>>>>>>>>>>>>>>> reader > > > > >>>>>>>> > > > > >>>>>>>>> written > > > > >>>>>>>>>>>>>> > > > > >>>>>>>>>>>>>>> against > > > > >>>>>>>>>>>>>>>>>> > > > > >>>>>>>>>>>>>>>>>>> this API: > > > > >>>>>>>>>>>>>>>>>>> > > > > >>>>>>>>>>>>>>>>>>> ProcessContinuation processElement( > > > > >>>>>>>>>>>>>>>>>>> ProcessContext context, > > > OffsetRangeTracker > > > > >>>>>>>>>>>>>>>>>>> > > > > >>>>>>>>>>>>>>>>>> tracker) > > > > >>>>>>>>>> > > > > >>>>>>>>>>> { > > > > >>>>>>>>>>>>>> > > > > >>>>>>>>>>>>>>> try (KafkaConsumer<String, String> consumer = > > > > >>>>>>>>>>>>>>>>>>> > > > > Kafka.subscribe(context.element().topic, > > > > >>>>>>>>>>>>>>>>>>> > > > > >>>>>>>>>>>>>>>>>>> context.element().partition)) { > > > > >>>>>>>>>> > > > > >>>>>>>>>>> consumer.seek(tracker.start()); > > > > >>>>>>>>>>>>>>>>>>> while (true) { > > > > >>>>>>>>>>>>>>>>>>> ConsumerRecords<String, String> > records = > > > > >>>>>>>>>>>>>>>>>>> > > > > >>>>>>>>>>>>>>>>>> consumer.poll(100ms); > > > > >>>>>>>>>>>>>>>>>> > > > > >>>>>>>>>>>>>>>>>>> if (records == null) return done(); > > > > >>>>>>>>>>>>>>>>>>> for (ConsumerRecord<String, String> > > record > > > : > > > > >>>>>>>>>>>>>>>>>>> > > > > >>>>>>>>>>>>>>>>>> records) > > > > >>>>>>>>>> > > > > >>>>>>>>>>> { > > > > >>>>>>>>>>>>>> > > > > >>>>>>>>>>>>>>> if (!tracker.tryClaim(record.offset())) { > > > > >>>>>>>>>>>>>>>>>>> return > > > > >>>>>>>>>>>>>>>>>>> > > > > >>>>>>>>>>>>>>>>>> > > > resume().withFutureOutputWatermark(record.timestamp()); > > > > >>>>>>>>>>>>>>>>>> > > > > >>>>>>>>>>>>>>>>>>> } > > > > >>>>>>>>>>>>>>>>>>> context.output(record); > > > > >>>>>>>>>>>>>>>>>>> } > > > > >>>>>>>>>>>>>>>>>>> } > > > > >>>>>>>>>>>>>>>>>>> } > > > > >>>>>>>>>>>>>>>>>>> } > > > > >>>>>>>>>>>>>>>>>>> > > > > >>>>>>>>>>>>>>>>>>> The document describes in detail the motivations > > > behind > > > > >>>>>>>>>>>>>>>>>>> > > > > >>>>>>>>>>>>>>>>>> this > > > > >>>>>>>>> > > > > >>>>>>>>>> feature, > > > > >>>>>>>>>>>>>>>> > > > > >>>>>>>>>>>>>>>>> the > > > > >>>>>>>>>>>>>>>>>> > > > > >>>>>>>>>>>>>>>>>>> basic idea and API, open questions, and outlines > an > > > > >>>>>>>>>>>>>>>>>>> > > > > >>>>>>>>>>>>>>>>>> incremental > > > > >>>>>>>>>> > > > > >>>>>>>>>>> delivery > > > > >>>>>>>>>>>>>>>> > > > > >>>>>>>>>>>>>>>>> plan. > > > > >>>>>>>>>>>>>>>>>>> > > > > >>>>>>>>>>>>>>>>>>> The proposed API builds on the reflection-based > new > > > > DoFn > > > > >>>>>>>>>>>>>>>>>>> > > > > >>>>>>>>>>>>>>>>>> [new-do-fn] > > > > >>>>>>>>>>>>>>>> > > > > >>>>>>>>>>>>>>>>> and is > > > > >>>>>>>>>>>>>>>>>> > > > > >>>>>>>>>>>>>>>>>>> loosely related to "State and Timers for DoFn" > > > > >>>>>>>>>>>>>>>>>>> > > > > >>>>>>>>>>>>>>>>>> [beam-state]. > > > > >>>>>>>>> > > > > >>>>>>>>>> > > > > >>>>>>>>>>>>>>>>>>> Please take a look and comment! > > > > >>>>>>>>>>>>>>>>>>> > > > > >>>>>>>>>>>>>>>>>>> Thanks. > > > > >>>>>>>>>>>>>>>>>>> > > > > >>>>>>>>>>>>>>>>>>> [BEAM-65] > > > > https://issues.apache.org/jira/browse/BEAM-65 > > > > >>>>>>>>>>>>>>>>>>> [new-do-fn] https://s.apache.org/a-new-do-fn > > > > >>>>>>>>>>>>>>>>>>> [beam-state] https://s.apache.org/beam-state > > > > >>>>>>>>>>>>>>>>>>> > > > > >>>>>>>>>>>>>>>>>>> > > > > >>>>>>>>>>>>>>>>>> -- > > > > >>>>>>>>>>>>>>>>>> Jean-Baptiste Onofré > > > > >>>>>>>>>>>>>>>>>> [email protected] <javascript:;> > > > > >>>>>>>>>>>>>>>>>> http://blog.nanthrax.net > > > > >>>>>>>>>>>>>>>>>> Talend - http://www.talend.com > > > > >>>>>>>>>>>>>>>>>> > > > > >>>>>>>>>>>>>>>>>> > > > > >>>>>>>>>>>>>>>>> > > > > >>>>>>>>>>>>>>> > > > > >>>>>>>>>>>>> > > > > >>>>>>>>>>>> > > > > >>>>>>>>>> > > > > >>>>>>>>> > > > > >>>>>>>>> -- > > > > >>>>>>>>> Thanks, > > > > >>>>>>>>> Andrew > > > > >>>>>>>>> > > > > >>>>>>>>> Subscribe to my book: Streaming Data < > > > http://manning.com/psaltis > > > > > > > > > >>>>>>>>> <https://www.linkedin.com/pub/andrew-psaltis/1/17b/306> > > > > >>>>>>>>> twiiter: @itmdata < > > > > >>> http://twitter.com/intent/user?screen_name=itmdata > > > > >>>>>>>>>> > > > > >>>>>>>>> > > > > >>>>>>>>> > > > > >>>>>>>> > > > > >>>>>>> > > > > >>>>>> -- > > > > >>>>>> Jean-Baptiste Onofré > > > > >>>>>> [email protected] > > > > >>>>>> http://blog.nanthrax.net > > > > >>>>>> Talend - http://www.talend.com > > > > >>>>>> > > > > >>>>>> > > > > >>>>> > > > > >>>> -- > > > > >>>> Jean-Baptiste Onofré > > > > >>>> [email protected] > > > > >>>> http://blog.nanthrax.net > > > > >>>> Talend - http://www.talend.com > > > > >>>> > > > > >>> > > > > >> > > > > > > > > > > > > > -- > > > > Jean-Baptiste Onofré > > > > [email protected] > > > > http://blog.nanthrax.net > > > > Talend - http://www.talend.com > > > > > > > > > >
