Thank you, JB! It will be helpful to have a structured access to the technical discussions.
Looking into the Splittable DoFn proposal [1] the authors are pointing what I think to be the current main design issue: Source objects cannot be produced by the pipeline at runtime. You are probably familiar with Husky [4] where they discuss about dynamically object creation; maybe semantics are different but to me it seems to tackle the same problem. Their point could than fit into the Beam model? I wonder if they could aspire to be a runner of Beam, I would appreciate any remarks on this. Is this proposal mainly related to supporting autoscaling and dynamic work rebalancing as described in [2] & [3]? Thank you and I am sorry if I interrupted your discussion on the proposal. [1] https://docs.google.com/document/d/1AQmx-T9XjSi1PNoEp5_L-lT0j7BkgTbmQnc6uFEMI4c/edit# <https://docs.google.com/document/d/1AQmx-T9XjSi1PNoEp5_L-lT0j7BkgTbmQnc6uFEMI4c/edit#> [2] https://cloud.google.com/blog/big-data/2016/03/comparing-cloud-dataflow-autoscaling-to-spark-and-hadoop <https://cloud.google.com/blog/big-data/2016/03/comparing-cloud-dataflow-autoscaling-to-spark-and-hadoop> [3] https://cloud.google.com/blog/big-data/2016/05/no-shard-left-behind-dynamic-work-rebalancing-in-google-cloud-dataflow <https://cloud.google.com/blog/big-data/2016/05/no-shard-left-behind-dynamic-work-rebalancing-in-google-cloud-dataflow> [4] http://www.husky-project.com/ <http://www.husky-project.com/> ; http://www.vldb.org/pvldb/vol9/p420-yang.pdf <http://www.vldb.org/pvldb/vol9/p420-yang.pdf> Best, Ovidiu > On 29 Aug 2016, at 14:54, Jean-Baptiste Onofré <[email protected]> wrote: > > Hi Ovidiu, > > We had a "Technical Discussions" link on the website menu, but I can't see it > anymore on the website (I just see "Technical Vision"). > It contains all documents on which we are discussing. > > Agree with you to have an area where we store all "Technical Discussion > Documents". > > Let me discuss with Frances about that. > > Thanks ! > Regards > JB > > On 08/29/2016 02:50 PM, Ovidiu-Cristian MARCU wrote: >> Hi everyone >> >> Is there any repository where one can track all proposals, something like >> Flink does with this wiki [1]? >> >> [1] >> https://cwiki.apache.org/confluence/display/FLINK/Flink+Improvement+Proposals >> >> <https://cwiki.apache.org/confluence/display/FLINK/Flink+Improvement+Proposals> >> >> Thanks >> Ovidiu >> >>> On 29 Aug 2016, at 12:01, Jean-Baptiste Onofré <[email protected]> wrote: >>> >>> Hi Aljoscha, >>> >>> Indeed, it's something we discussed during our call. >>> >>> AFAIU, it's one function of the tracker. When doing the tracker tryClaim >>> (with offset, partition id, or any kind of tracked "ID"), if the claim is >>> not possible, then we will update the watermark. >>> >>> So the tracker is useful to determine the "split" and also to deal with >>> watermark. >>> >>> Regards >>> JB >>> >>> On 08/29/2016 11:55 AM, Aljoscha Krettek wrote: >>>> Hi, >>>> I have another question about this: currently, unbounded sources have >>>> special logic for determining the watermark and the system periodically >>>> asks the sources for the current watermark. As I understood it, watermarks >>>> are only "generated" at the sources. How will this work when sources are >>>> implemented as a combination of DoFns and SplittableDoFns? Will >>>> SplittableDoFns be asked for a watermark, does this mean that watermarks >>>> can then be "generated" at any operation? >>>> >>>> Cheers, >>>> Aljoscha >>>> >>>> On Sun, 21 Aug 2016 at 22:34 Eugene Kirpichov >>>> <[email protected]> >>>> wrote: >>>> >>>>> Hi JB, >>>>> >>>>> Yes, I'm assuming you're referring to the "magic" part on the transform >>>>> expansion diagram. This is indeed runner-specific, and timers+state are >>>>> likely the simplest way to do this for an SDF that does unbounded amount >>>>> of >>>>> work. >>>>> >>>>> On Sun, Aug 21, 2016 at 12:14 PM Jean-Baptiste Onofré <[email protected]> >>>>> wrote: >>>>> >>>>>> Anyway, from a runner perspective, we will have kind of API (part of the >>>>>> Runner API) to "orchestrate" the SDF as we discussed during the call, >>>>>> right ? >>>>>> >>>>>> Regards >>>>>> JB >>>>>> >>>>>> On 08/21/2016 07:24 PM, Eugene Kirpichov wrote: >>>>>>> Hi Aljoscha, >>>>>>> This is an excellent question! And the answer is, we don't need any new >>>>>>> concepts like "SDF executor" and can rely on the per-key state and >>>>> timers >>>>>>> machinery that already exists in all runners because it's necessary to >>>>>>> implement windowing/triggering properly. >>>>>>> >>>>>>> Note that this is already somewhat addressed in the previously posted >>>>>> State >>>>>>> and Timers proposal https://s.apache.org/beam-state , under "per-key >>>>>>> workflows". >>>>>>> >>>>>>> Think of it this way, using the Kafka example: we'll expand it into a >>>>>>> transform: >>>>>>> >>>>>>> (1) ParDo { topic -> (unique key, topic, partition, [0, inf))) for >>>>>>> partition in topic.listPartitions() } >>>>>>> (2) GroupByKey >>>>>>> (3) ParDo { key, topic, partition, R -> Kafka reader code in the >>>>>>> proposal/slides } >>>>>>> - R is the OffsetRange restriction which in this case will be always >>>>> of >>>>>>> the form [startOffset, inf). >>>>>>> - there'll be just 1 value per key, but we use GBK to just get access >>>>>> to >>>>>>> the per-key state/timers machinery. This may be runner-specific; maybe >>>>>> some >>>>>>> runners don't need a GBK to do that. >>>>>>> >>>>>>> Now suppose the topic has two partitions, P1 and P2, and they get >>>>>> assigned >>>>>>> unique keys K1, K2. >>>>>>> Then the input to (3) will be a collection of: (K1, topic, P1, [0, >>>>> inf)), >>>>>>> (K2, topic, P2, [0, inf)). >>>>>>> Suppose we have just 1 worker with just 1 thread. Now, how will this >>>>>> thread >>>>>>> be able to produce elements from both P1 and P2? here's how. >>>>>>> >>>>>>> The thread will process (K1, topic, P1, [0, inf)), checkpoint after a >>>>>>> certain time or after a certain number of elements are output (just >>>>> like >>>>>>> with the current UnboundedSource reading code) producing a residual >>>>>>> restriction R1' (basically a new start timestamp), put R11 into the >>>>>> per-key >>>>>>> state and set a timer T1 to resume. >>>>>>> Then it will process (K2, topic, P2, [0, inf)), do the same producing a >>>>>>> residual restriction R2' and setting a timer T2 to resume. >>>>>>> Then timer T1 will fire in the context of the key K1. The thread will >>>>>> call >>>>>>> processElement again, this time supplying R1' as the restriction; the >>>>>>> process repeats and after a while it checkpoints and stores R1'' into >>>>>> state >>>>>>> of K1. >>>>>>> Then timer T2 will fire in the context of K2, run processElement for a >>>>>>> while, set a new timer and store R2'' into the state of K2. >>>>>>> Etc. >>>>>>> If partition 1 goes away, the processElement call will return "do not >>>>>>> resume", so a timer will not be set and instead the state associated >>>>> with >>>>>>> K1 will be GC'd. >>>>>>> >>>>>>> So basically it's almost like cooperative thread scheduling: things run >>>>>> for >>>>>>> a while, until the runner tells them to checkpoint, then they set a >>>>> timer >>>>>>> to resume themselves, and the runner fires the timers, and the process >>>>>>> repeats. And, again, this only requires things that runners can already >>>>>> do >>>>>>> - state and timers, but no new concept of SDF executor (and >>>>> consequently >>>>>> no >>>>>>> necessity to choose/tune how many you need). >>>>>>> >>>>>>> Makes sense? >>>>>>> >>>>>>> On Sat, Aug 20, 2016 at 9:46 AM Aljoscha Krettek <[email protected]> >>>>>>> wrote: >>>>>>> >>>>>>>> Hi, >>>>>>>> I have another question that I think wasn't addressed in the meeting. >>>>> At >>>>>>>> least it wasn't mentioned in the notes. >>>>>>>> >>>>>>>> In the context of replacing sources by a combination of to SDFs, how >>>>> do >>>>>> you >>>>>>>> determine how many "SDF executor" instances you need downstream? For >>>>> the >>>>>>>> sake of argument assume that both SDFs are executed with parallelism 1 >>>>>> (or >>>>>>>> one per worker). Now, if you have a file source that reads from a >>>>> static >>>>>>>> set of files the first SDF would emit the filenames while the second >>>>> SDF >>>>>>>> would receive the filenames and emit their contents. This works well >>>>> and >>>>>>>> the downstream SDF can process one filename after the other. Now, >>>>> think >>>>>> of >>>>>>>> something like a Kafka source. The first SDF would emit the partitions >>>>>> (say >>>>>>>> 4 partitions, in this example) and the second SDF would be responsible >>>>>> for >>>>>>>> reading from a topic and emitting elements. Reading from one topic >>>>> never >>>>>>>> finishes so you can't process the topics in series. I think you would >>>>>> need >>>>>>>> to have 4 downstream "SDF executor" instances. The question now is: >>>>> how >>>>>> do >>>>>>>> you determine whether you are in the first or the second situation? >>>>>>>> >>>>>>>> Probably I'm just overlooking something and this is already dealt with >>>>>>>> somewhere... :-) >>>>>>>> >>>>>>>> Cheers, >>>>>>>> Aljoscha >>>>>>>> >>>>>>>> On Fri, 19 Aug 2016 at 21:02 Ismaël Mejía <[email protected]> wrote: >>>>>>>> >>>>>>>>> Hello, >>>>>>>>> >>>>>>>>> Thanks for the notes both Dan and Eugene, and for taking the time to >>>>> do >>>>>>>> the >>>>>>>>> presentation and answer our questions. >>>>>>>>> >>>>>>>>> I mentioned the ongoing work on dynamic scaling on Flink because I >>>>>>>> suppose >>>>>>>>> that it will address dynamic rebalancing eventually (there are >>>>> multiple >>>>>>>>> changes going on for dynamic scaling). >>>>>>>>> >>>>>>>>> >>>>>>>>> >>>>>>>> >>>>>> >>>>> https://docs.google.com/document/d/1G1OS1z3xEBOrYD4wSu-LuBCyPUWyFd9l3T9WyssQ63w/edit#heading=h.2suhu1fjxmp4 >>>>>>>>> >>>>>>>>> >>>>> https://lists.apache.org/[email protected]:lte=1M:FLIP-8 >>>>>>>>> >>>>>>>>> Anyway I am far from an expert on flink, but probably the flink guys >>>>>> can >>>>>>>>> give their opinion about this and refer to a more precise document >>>>> that >>>>>>>> the >>>>>>>>> ones I mentioned.. >>>>>>>>> >>>>>>>>> Thanks again, >>>>>>>>> Ismaël >>>>>>>>> >>>>>>>>> On Fri, Aug 19, 2016 at 8:52 PM, Jean-Baptiste Onofré < >>>>> [email protected] >>>>>>> >>>>>>>>> wrote: >>>>>>>>> >>>>>>>>>> Great summary Eugene and Dan. >>>>>>>>>> >>>>>>>>>> And thanks again for the details, explanation, and discussion. >>>>>>>>>> >>>>>>>>>> Regards >>>>>>>>>> JB >>>>>>>>>> >>>>>>>>>> >>>>>>>>>> On 08/19/2016 08:16 PM, Eugene Kirpichov wrote: >>>>>>>>>> >>>>>>>>>>> Thanks for attending, everybody! >>>>>>>>>>> >>>>>>>>>>> Here are meeting notes (thanks Dan!). >>>>>>>>>>> >>>>>>>>>>> Q: Will SplittableDoFn enable better repartitioning of the >>>>>>>> input/output >>>>>>>>>>> data? >>>>>>>>>>> A: Not really; repartitioning is orthogonal to SDF. >>>>>>>>>>> >>>>>>>>>>> Current Source API suffers from lack of composition and scalability >>>>>>>>>>> because >>>>>>>>>>> we treat sources too much as metadata, not enough as data. >>>>>>>>>>> >>>>>>>>>>> Q(slide with transform expansion): who does the "magic"? >>>>>>>>>>> A: The runner. Checkpointing and dynamically splitting restrictions >>>>>>>> will >>>>>>>>>>> require collaboration with the runner. >>>>>>>>>>> >>>>>>>>>>> Q: How does the runner interact with the DoFn to control the >>>>>>>>> restrictions? >>>>>>>>>>> Is it related to the centralized job tracker etc.? >>>>>>>>>>> A: RestrictionTracker is a simple helper object, that exists purely >>>>>> on >>>>>>>>> the >>>>>>>>>>> worker while executing a single partition, and interacts with the >>>>>>>> worker >>>>>>>>>>> harness part of the runner. Not to be confused with the centralized >>>>>>>> job >>>>>>>>>>> tracker (master) - completely unrelated. Worker harness, of course, >>>>>>>>>>> interacts with the master in some relevant ways (e.g. Dataflow >>>>> master >>>>>>>>> can >>>>>>>>>>> tell "you're a straggler, you should split"). >>>>>>>>>>> >>>>>>>>>>> Q: Is this a new DoFn subclass, or how will this integrate with the >>>>>>>>>>> existing code? >>>>>>>>>>> A: It's a feature of reflection-based DoFn ( >>>>>>>>> https://s.apache.org/a-new-do >>>>>>>>>>> fn) >>>>>>>>>>> - just another optional parameter of type RestrictionTracker to >>>>>>>>>>> processElement() which is dynamically bound via reflection, so >>>>> fully >>>>>>>>>>> backward/forward compatible, and looks to users like a regular >>>>> DoFn. >>>>>>>>>>> >>>>>>>>>>> Q: why is fractionClaimed a double? >>>>>>>>>>> A: It's a number in 0..1. Job-wide magic (e.g. autoscaling, dynamic >>>>>>>>>>> rebalancing) requires a uniform way to represent progress through >>>>>>>>>>> different >>>>>>>>>>> sources. >>>>>>>>>>> >>>>>>>>>>> Q: Spark runner is microbatch-based, so this seems to map well onto >>>>>>>>>>> checkpoint/resume, right? >>>>>>>>>>> A: Yes; actually the Dataflow runner is, at a worker level, also >>>>>>>>>>> microbatch-based. The way SDF interacts with a runner will be very >>>>>>>>> similar >>>>>>>>>>> to how a Bounded/UnboundedSource interacts with a runner. >>>>>>>>>>> >>>>>>>>>>> Q: Using SDF, what would be the "packaging" of the IO? >>>>>>>>>>> A: Same as currently: package IO's as PTransforms and their >>>>>>>>> implementation >>>>>>>>>>> under the hood can be anything: Source, simple ParDo's, SDF, etc. >>>>>> E.g. >>>>>>>>>>> Datastore was recently refactored from BoundedSource to ParDo >>>>> (ended >>>>>>>> up >>>>>>>>>>> simpler and more scalable), transparently to users. >>>>>>>>>>> >>>>>>>>>>> Q: What's the timeline; what to do with the IOs currently in >>>>>>>>> development? >>>>>>>>>>> A: Timeline is O(months). Keep doing what you're doing and working >>>>> on >>>>>>>>> top >>>>>>>>>>> of Source APIs when necessary and simple ParDo's otherwise. >>>>>>>>>>> >>>>>>>>>>> Q: What's the impact for the runner writers? >>>>>>>>>>> A: Tentatively expected that most of the code for running an SDF >>>>> will >>>>>>>> be >>>>>>>>>>> common to runners, with some amount of per-runner glue code, just >>>>>> like >>>>>>>>>>> GBK/windowing/triggering. Impact on Dataflow runner is larger since >>>>>> it >>>>>>>>>>> supports dynamic rebalancing in batch mode and this is the hardest >>>>>>>> part, >>>>>>>>>>> but for other runners shouldn't be too hard. >>>>>>>>>>> >>>>>>>>>>> JB: Talend has people who can help with this: e.g. help integrate >>>>>> into >>>>>>>>>>> Spark runner, refactor IOs etc. Amit also willing to chat about >>>>>>>>> supporting >>>>>>>>>>> SDF in Spark runner. >>>>>>>>>>> >>>>>>>>>>> Ismael: There's a Flink proposal about dynamic rebalancing. Ismael >>>>>>>> will >>>>>>>>>>> send a link. >>>>>>>>>>> >>>>>>>>>>> >>>>>>>>>>> On Fri, Aug 19, 2016 at 7:21 AM Jean-Baptiste Onofré < >>>>>> [email protected] >>>>>>>>> >>>>>>>>>>> wrote: >>>>>>>>>>> >>>>>>>>>>> Hi Eugene, >>>>>>>>>>>> >>>>>>>>>>>> thanks for the reminder. >>>>>>>>>>>> >>>>>>>>>>>> Just to prepare some topics for the call, please find some points: >>>>>>>>>>>> >>>>>>>>>>>> 1. Using SDF, what would be the "packaging" of the IO ? It sounds >>>>> to >>>>>>>> me >>>>>>>>>>>> that we can keep the IO packaging style (using with* setters for >>>>> the >>>>>>>> IO >>>>>>>>>>>> configuration) and replace PTransform, Source, Reader, ... >>>>> directly >>>>>>>>> with >>>>>>>>>>>> SDF. Correct ? >>>>>>>>>>>> >>>>>>>>>>>> 2. What's your plan in term of release to include SDF ? We have >>>>>>>> several >>>>>>>>>>>> IOs in preparation and I wonder if it's worth to start to use the >>>>>> new >>>>>>>>>>>> SDF API or not. >>>>>>>>>>>> >>>>>>>>>>>> 3. What's the impact for the runner writers ? The runners will >>>>> have >>>>>>>> to >>>>>>>>>>>> support SDF, that could be tricky depending of the execution >>>>> engine. >>>>>>>> In >>>>>>>>>>>> the worst case where the runner can't fully support SDF, does it >>>>>> mean >>>>>>>>>>>> that most of our IOs will be useless ? >>>>>>>>>>>> >>>>>>>>>>>> Just my dumb topics ;) >>>>>>>>>>>> >>>>>>>>>>>> Thanks, >>>>>>>>>>>> See you at 8am ! >>>>>>>>>>>> >>>>>>>>>>>> Regards >>>>>>>>>>>> JB >>>>>>>>>>>> >>>>>>>>>>>> On 08/19/2016 02:29 AM, Eugene Kirpichov wrote: >>>>>>>>>>>> >>>>>>>>>>>>> Hello everybody, >>>>>>>>>>>>> >>>>>>>>>>>>> Just a reminder: >>>>>>>>>>>>> >>>>>>>>>>>>> The meeting is happening tomorrow - Friday Aug 19th, 8am-9am PST, >>>>>> to >>>>>>>>>>>>> join >>>>>>>>>>>>> the call go to >>>>>>>>>>>>> https://hangouts.google.com/hangouts/_/google.com/splittabledofn >>>>> . >>>>>>>>>>>>> I intend to go over the proposed design and then have a free-form >>>>>>>>>>>>> discussion. >>>>>>>>>>>>> >>>>>>>>>>>>> Please have a skim through the proposal doc: >>>>> https://s.apache.org/ >>>>>>>>>>>>> splittable-do-fn >>>>>>>>>>>>> I also made some slides that are basically a trimmed-down version >>>>>> of >>>>>>>>> the >>>>>>>>>>>>> doc to use as a guide when conducting the meeting, >>>>>>>>>>>>> >>>>>>>>>>>>> https://docs.google.com/presentation/d/1DfCp9VRLH-5Ap5nbz047 >>>>>>>>>>>> Fiv3NfjrywJVSdef5dRY4aE/edit?usp=sharing >>>>>>>>>>>> >>>>>>>>>>>>> . >>>>>>>>>>>>> >>>>>>>>>>>>> I will post notes from the meeting on this thread afterwards. >>>>>>>>>>>>> >>>>>>>>>>>>> Thanks, looking forward. >>>>>>>>>>>>> >>>>>>>>>>>>> On Fri, Aug 12, 2016 at 5:35 PM Dan Halperin >>>>>>>>>>>>> <[email protected] >>>>>>>>>>>>> >>>>>>>>>>>>> wrote: >>>>>>>>>>>>> >>>>>>>>>>>>> This is pretty cool! I'll be there too. (unless the hangout gets >>>>>> too >>>>>>>>>>>>>> >>>>>>>>>>>>> full >>>>>>>>>>>> >>>>>>>>>>>>> -- if so, I'll drop out in favor of others who aren't lucky >>>>> enough >>>>>>>> to >>>>>>>>>>>>>> >>>>>>>>>>>>> get >>>>>>>>>>>> >>>>>>>>>>>>> to talk to Eugene all the time.) >>>>>>>>>>>>>> >>>>>>>>>>>>>> On Fri, Aug 12, 2016 at 4:03 PM, Andrew Psaltis < >>>>>>>>>>>>>> >>>>>>>>>>>>> [email protected]> >>>>>>>>>>>> >>>>>>>>>>>>> wrote: >>>>>>>>>>>>>> >>>>>>>>>>>>>> +1 I'll join >>>>>>>>>>>>>>> >>>>>>>>>>>>>>> On Friday, August 12, 2016, Aparup Banerjee (apbanerj) < >>>>>>>>>>>>>>> >>>>>>>>>>>>>> [email protected] >>>>>>>>>>>>>> >>>>>>>>>>>>>>> >>>>>>>>>>>>>>>> wrote: >>>>>>>>>>>>>>> >>>>>>>>>>>>>>> + 1, me2 >>>>>>>>>>>>>>>> >>>>>>>>>>>>>>>> >>>>>>>>>>>>>>>> >>>>>>>>>>>>>>>> >>>>>>>>>>>>>>>> On 8/12/16, 9:27 AM, "Amit Sela" <[email protected] >>>>>>>>>>>>>>>> >>>>>>>>>>>>>>> <javascript:;>> >>>>>>>>>>>> >>>>>>>>>>>>> wrote: >>>>>>>>>>>>>>>> >>>>>>>>>>>>>>>> +1 as in I'll join ;-) >>>>>>>>>>>>>>>>> >>>>>>>>>>>>>>>>> On Fri, Aug 12, 2016, 19:14 Eugene Kirpichov >>>>>>>>>>>>>>>>> >>>>>>>>>>>>>>>> <[email protected] >>>>>>>>>>>>>>> >>>>>>>>>>>>>>>> >>>>>>>>>>>>>>>>> wrote: >>>>>>>>>>>>>>>>> >>>>>>>>>>>>>>>>> Sounds good, thanks! >>>>>>>>>>>>>>>>>> Then Friday Aug 19th it is, 8am-9am PST, >>>>>>>>>>>>>>>>>> https://staging.talkgadget.google.com/hangouts/_/google. >>>>>>>>>>>>>>>>>> >>>>>>>>>>>>>>>>> com/splittabledofn >>>>>>>>>>>>>>>> >>>>>>>>>>>>>>>>> >>>>>>>>>>>>>>>>>> On Thu, Aug 11, 2016 at 11:12 PM Jean-Baptiste Onofré < >>>>>>>>>>>>>>>>>> >>>>>>>>>>>>>>>>> [email protected] >>>>>>>>>>>>>>> >>>>>>>>>>>>>>>> <javascript:;>> >>>>>>>>>>>>>>>> >>>>>>>>>>>>>>>>> wrote: >>>>>>>>>>>>>>>>>> >>>>>>>>>>>>>>>>>> Hi >>>>>>>>>>>>>>>>>>> >>>>>>>>>>>>>>>>>>> Unfortunately I will be in Ireland on August 15th. What >>>>> about >>>>>>>>>>>>>>>>>>> >>>>>>>>>>>>>>>>>> Friday >>>>>>>>>>>>>> >>>>>>>>>>>>>>> 19th ? >>>>>>>>>>>>>>>>>> >>>>>>>>>>>>>>>>>>> >>>>>>>>>>>>>>>>>>> Regards >>>>>>>>>>>>>>>>>>> JB >>>>>>>>>>>>>>>>>>> >>>>>>>>>>>>>>>>>>> >>>>>>>>>>>>>>>>>>> >>>>>>>>>>>>>>>>>>> On Aug 11, 2016, 23:22, at 23:22, Eugene Kirpichov >>>>>>>>>>>>>>>>>>> <[email protected]> wrote: >>>>>>>>>>>>>>>>>>> >>>>>>>>>>>>>>>>>>>> Hi JB, >>>>>>>>>>>>>>>>>>>> >>>>>>>>>>>>>>>>>>>> Sounds great, does the suggested time over videoconference >>>>>>>> work >>>>>>>>>>>>>>>>>>>> >>>>>>>>>>>>>>>>>>> for >>>>>>>>>>>>>> >>>>>>>>>>>>>>> you? >>>>>>>>>>>>>>>>>>>> >>>>>>>>>>>>>>>>>>>> On Thu, Aug 11, 2016 at 11:59 AM Jean-Baptiste Onofré < >>>>>>>>>>>>>>>>>>>> >>>>>>>>>>>>>>>>>>> [email protected] <javascript:;>> >>>>>>>>>>>>>>>> >>>>>>>>>>>>>>>>> wrote: >>>>>>>>>>>>>>>>>>>> >>>>>>>>>>>>>>>>>>>> Hi Eugene >>>>>>>>>>>>>>>>>>>>> >>>>>>>>>>>>>>>>>>>>> May we talk together next week ? I like the proposal. I >>>>>>>> would >>>>>>>>>>>>>>>>>>>>> >>>>>>>>>>>>>>>>>>>> just >>>>>>>>>>>>>>> >>>>>>>>>>>>>>>> need >>>>>>>>>>>>>>>>>>>> >>>>>>>>>>>>>>>>>>>>> some details for my understanding. >>>>>>>>>>>>>>>>>>>>> >>>>>>>>>>>>>>>>>>>>> Thanks >>>>>>>>>>>>>>>>>>>>> Regards >>>>>>>>>>>>>>>>>>>>> JB >>>>>>>>>>>>>>>>>>>>> >>>>>>>>>>>>>>>>>>>>> >>>>>>>>>>>>>>>>>>>>> >>>>>>>>>>>>>>>>>>>>> On Aug 11, 2016, 19:46, at 19:46, Eugene Kirpichov >>>>>>>>>>>>>>>>>>>>> <[email protected]> wrote: >>>>>>>>>>>>>>>>>>>>> >>>>>>>>>>>>>>>>>>>>>> Hi JB, >>>>>>>>>>>>>>>>>>>>>> >>>>>>>>>>>>>>>>>>>>>> What are your thoughts on this? >>>>>>>>>>>>>>>>>>>>>> >>>>>>>>>>>>>>>>>>>>>> I'm also thinking of having a virtual meeting to explain >>>>>>>> more >>>>>>>>>>>>>>>>>>>>>> >>>>>>>>>>>>>>>>>>>>> about >>>>>>>>>>>>>>>> >>>>>>>>>>>>>>>>> this >>>>>>>>>>>>>>>>>>>>>> proposal if necessary, since I understand it is a lot to >>>>>>>>>>>>>>>>>>>>>> >>>>>>>>>>>>>>>>>>>>> digest. >>>>>>>>>>>>>> >>>>>>>>>>>>>>> >>>>>>>>>>>>>>>>>>>>>> How about: Monday Aug 15, 8am-9am Pacific time, over >>>>>>>>> Hangouts? >>>>>>>>>>>>>>>>>>>>>> (link: >>>>>>>>>>>>>>>>>>>>>> >>>>>>>>>>>>>>>>>>>>>> >>>>>>>>>>>>>>>>>>>>> >>>>>>>>>>>>>>>>>>>> >>>>>>>>>>>>>>>>>>> https://staging.talkgadget.google.com/hangouts/_/google. >>>>>>>>>>>>>>>>>> >>>>>>>>>>>>>>>>> com/splittabledofn >>>>>>>>>>>>>>>> >>>>>>>>>>>>>>>>> - >>>>>>>>>>>>>>>>>>>>>> I confirmed that it can be joined without being logged >>>>>>>> into a >>>>>>>>>>>>>>>>>>>>>> >>>>>>>>>>>>>>>>>>>>> Google >>>>>>>>>>>>>>>> >>>>>>>>>>>>>>>>> account) >>>>>>>>>>>>>>>>>>>>>> >>>>>>>>>>>>>>>>>>>>>> Who'd be interested in attending, and does this >>>>> time/date >>>>>>>>> work >>>>>>>>>>>>>>>>>>>>>> >>>>>>>>>>>>>>>>>>>>> for >>>>>>>>>>>>>>> >>>>>>>>>>>>>>>> people? >>>>>>>>>>>>>>>>>>>>>> >>>>>>>>>>>>>>>>>>>>>> On Fri, Aug 5, 2016 at 10:48 AM Eugene Kirpichov >>>>>>>>>>>>>>>>>>>>>> >>>>>>>>>>>>>>>>>>>>> <[email protected] <javascript:;>> >>>>>>>>>>>>>>>>>>>> >>>>>>>>>>>>>>>>>>>>> wrote: >>>>>>>>>>>>>>>>>>>>>> >>>>>>>>>>>>>>>>>>>>>> Hi JB, thanks for reading and for your comments! >>>>>>>>>>>>>>>>>>>>>>> >>>>>>>>>>>>>>>>>>>>>>> It sounds like you are concerned about continued >>>>> support >>>>>>>> for >>>>>>>>>>>>>>>>>>>>>>> >>>>>>>>>>>>>>>>>>>>>> existing >>>>>>>>>>>>>>>>>>>> >>>>>>>>>>>>>>>>>>>>> IO's >>>>>>>>>>>>>>>>>>>>>> >>>>>>>>>>>>>>>>>>>>>>> people have developed, and about backward >>>>> compatibility? >>>>>>>>>>>>>>>>>>>>>>> >>>>>>>>>>>>>>>>>>>>>>> We do not need to remove the Source API, and all >>>>> existing >>>>>>>>>>>>>>>>>>>>>>> >>>>>>>>>>>>>>>>>>>>>> Source-based >>>>>>>>>>>>>>>>>>>>>> >>>>>>>>>>>>>>>>>>>>>>> connectors will continue to work [though the document >>>>>>>>>>>>>>>>>>>>>>> >>>>>>>>>>>>>>>>>>>>>> proposes >>>>>>>>>>>>>> >>>>>>>>>>>>>>> at >>>>>>>>>>>>>>>> >>>>>>>>>>>>>>>>> some >>>>>>>>>>>>>>>>>>>>>> >>>>>>>>>>>>>>>>>>>>>>> point to make Read.from(Source) to translate to a >>>>> wrapper >>>>>>>>>>>>>>>>>>>>>>> >>>>>>>>>>>>>>>>>>>>>> SDF >>>>>>>>>>>>>> >>>>>>>>>>>>>>> under >>>>>>>>>>>>>>>>>>>> >>>>>>>>>>>>>>>>>>>>> the >>>>>>>>>>>>>>>>>>>>>> >>>>>>>>>>>>>>>>>>>>>>> hood, to exercise the feature more and to make sure >>>>> that >>>>>>>> it >>>>>>>>>>>>>>>>>>>>>>> >>>>>>>>>>>>>>>>>>>>>> is >>>>>>>>>>>>>> >>>>>>>>>>>>>>> strictly >>>>>>>>>>>>>>>>>>>>>> >>>>>>>>>>>>>>>>>>>>>>> more powerful - but this is an optional implementation >>>>>>>>>>>>>>>>>>>>>>> >>>>>>>>>>>>>>>>>>>>>> detail]. >>>>>>>>>>>>>>> >>>>>>>>>>>>>>>> >>>>>>>>>>>>>>>>>>>>>>> Perhaps the document phrases this too strongly - >>>>>>>> "replacing >>>>>>>>>>>>>>>>>>>>>>> >>>>>>>>>>>>>>>>>>>>>> the >>>>>>>>>>>>>>> >>>>>>>>>>>>>>>> Source >>>>>>>>>>>>>>>>>>>>>> >>>>>>>>>>>>>>>>>>>>>>> API": a better phrasing would be "introducing a new API >>>>>> so >>>>>>>>>>>>>>>>>>>>>>> >>>>>>>>>>>>>>>>>>>>>> powerful >>>>>>>>>>>>>>>>>>>> >>>>>>>>>>>>>>>>>>>>> and >>>>>>>>>>>>>>>>>>>>>> >>>>>>>>>>>>>>>>>>>>>>> easy-to-use that hopefully people will choose it over >>>>> the >>>>>>>>>>>>>>>>>>>>>>> >>>>>>>>>>>>>>>>>>>>>> Source >>>>>>>>>>>>>>> >>>>>>>>>>>>>>>> API >>>>>>>>>>>>>>>>>>>> >>>>>>>>>>>>>>>>>>>>> all >>>>>>>>>>>>>>>>>>>>>> >>>>>>>>>>>>>>>>>>>>>>> the time, even though they don't have to" :) And we can >>>>>>>>>>>>>>>>>>>>>>> >>>>>>>>>>>>>>>>>>>>>> discuss >>>>>>>>>>>>>>> >>>>>>>>>>>>>>>> whether or >>>>>>>>>>>>>>>>>>>>>> >>>>>>>>>>>>>>>>>>>>>>> not to actually deprecate/remove the Source API at some >>>>>>>>>>>>>>>>>>>>>>> >>>>>>>>>>>>>>>>>>>>>> point >>>>>>>>>>>>>> >>>>>>>>>>>>>>> down >>>>>>>>>>>>>>>> >>>>>>>>>>>>>>>>> the >>>>>>>>>>>>>>>>>>>>>> >>>>>>>>>>>>>>>>>>>>>>> road, once it becomes clear whether this is the case or >>>>>>>> not. >>>>>>>>>>>>>>>>>>>>>>> >>>>>>>>>>>>>>>>>>>>>>> To give more context: this proposal came out of >>>>>>>> discussions >>>>>>>>>>>>>>>>>>>>>>> >>>>>>>>>>>>>>>>>>>>>> within >>>>>>>>>>>>>>>> >>>>>>>>>>>>>>>>> the SDK >>>>>>>>>>>>>>>>>>>>>> >>>>>>>>>>>>>>>>>>>>>>> team over the past ~1.5 years, before the Beam project >>>>>>>>>>>>>>>>>>>>>>> >>>>>>>>>>>>>>>>>>>>>> existed, >>>>>>>>>>>>>>> >>>>>>>>>>>>>>>> on >>>>>>>>>>>>>>>> >>>>>>>>>>>>>>>>> how to >>>>>>>>>>>>>>>>>>>>>> >>>>>>>>>>>>>>>>>>>>>>> make major improvements to the Source API; perhaps it >>>>>> will >>>>>>>>>>>>>>>>>>>>>>> >>>>>>>>>>>>>>>>>>>>>> clarify >>>>>>>>>>>>>>>> >>>>>>>>>>>>>>>>> things >>>>>>>>>>>>>>>>>>>>>> >>>>>>>>>>>>>>>>>>>>>>> if I give a history of the ideas discussed: >>>>>>>>>>>>>>>>>>>>>>> - The first idea was to introduce a >>>>>>>>>>>>>>>>>>>>>>> >>>>>>>>>>>>>>>>>>>>>> Read.from(PCollection<Source>) >>>>>>>>>>>>>>>> >>>>>>>>>>>>>>>>> transform while keeping the Source API intact - this, given >>>>>>>>>>>>>>>>>>>>>>> >>>>>>>>>>>>>>>>>>>>>> appropriate >>>>>>>>>>>>>>>>>>>>>> >>>>>>>>>>>>>>>>>>>>>>> implementation, would solve most of the scalability and >>>>>>>>>>>>>>>>>>>>>>> >>>>>>>>>>>>>>>>>>>>>> composability >>>>>>>>>>>>>>>>>>>> >>>>>>>>>>>>>>>>>>>>> issues of IO's. Then most connectors would look like : >>>>>>>>>>>>>>>>>>>>>>> >>>>>>>>>>>>>>>>>>>>>> ParDo<A, >>>>>>>>>>>>>>> >>>>>>>>>>>>>>>> Source<B>> >>>>>>>>>>>>>>>>>>>>>> >>>>>>>>>>>>>>>>>>>>>>> + Read.from(). >>>>>>>>>>>>>>>>>>>>>>> - Then we figured that the Source class is an >>>>> unnecessary >>>>>>>>>>>>>>>>>>>>>>> >>>>>>>>>>>>>>>>>>>>>> abstraction, as >>>>>>>>>>>>>>>>>>>>>> >>>>>>>>>>>>>>>>>>>>>>> it simply holds data. What if we only had a Reader<S, >>>>> B> >>>>>>>>>>>>>>>>>>>>>>> >>>>>>>>>>>>>>>>>>>>>> class >>>>>>>>>>>>>> >>>>>>>>>>>>>>> where >>>>>>>>>>>>>>>>>>>> >>>>>>>>>>>>>>>>>>>>> S is >>>>>>>>>>>>>>>>>>>>>> >>>>>>>>>>>>>>>>>>>>>>> the source type and B the output type? Then connectors >>>>>>>> would >>>>>>>>>>>>>>>>>>>>>>> >>>>>>>>>>>>>>>>>>>>>> be >>>>>>>>>>>>>>> >>>>>>>>>>>>>>>> something >>>>>>>>>>>>>>>>>>>>>> >>>>>>>>>>>>>>>>>>>>>>> like: ParDo<A, S> + hypothetical Read.using(Reader<S, >>>>>> B>). >>>>>>>>>>>>>>>>>>>>>>> - Then somebody remarked that some of the features of >>>>>>>> Source >>>>>>>>>>>>>>>>>>>>>>> >>>>>>>>>>>>>>>>>>>>>> are >>>>>>>>>>>>>>> >>>>>>>>>>>>>>>> useful to >>>>>>>>>>>>>>>>>>>>>> >>>>>>>>>>>>>>>>>>>>>>> ParDo's as well: e.g. ability to report progress when >>>>>>>>>>>>>>>>>>>>>>> >>>>>>>>>>>>>>>>>>>>>> processing a >>>>>>>>>>>>>>>> >>>>>>>>>>>>>>>>> very >>>>>>>>>>>>>>>>>>>>>> >>>>>>>>>>>>>>>>>>>>>>> heavy element, or ability to produce very large output >>>>> in >>>>>>>>>>>>>>>>>>>>>>> >>>>>>>>>>>>>>>>>>>>>> parallel. >>>>>>>>>>>>>>>>>>>> >>>>>>>>>>>>>>>>>>>>> - The two previous bullets were already hinting that the >>>>>>>>>>>>>>>>>>>>>>> >>>>>>>>>>>>>>>>>>>>>> Read.using() >>>>>>>>>>>>>>>>>>>> >>>>>>>>>>>>>>>>>>>>> primitive might not be so special: it just takes S and >>>>>>>>>>>>>>>>>>>>>>> >>>>>>>>>>>>>>>>>>>>>> produces >>>>>>>>>>>>>>> >>>>>>>>>>>>>>>> B: >>>>>>>>>>>>>>>> >>>>>>>>>>>>>>>>> isn't >>>>>>>>>>>>>>>>>>>>>> >>>>>>>>>>>>>>>>>>>>>>> that what a ParDo does, plus some source magic, minus >>>>> the >>>>>>>>>>>>>>>>>>>>>>> >>>>>>>>>>>>>>>>>>>>>> convenience >>>>>>>>>>>>>>>>>>>> >>>>>>>>>>>>>>>>>>>>> of >>>>>>>>>>>>>>>>>>>>>> >>>>>>>>>>>>>>>>>>>>>>> c.output() vs. the start/advance() state machine? >>>>>>>>>>>>>>>>>>>>>>> - At this point it became clear that we should explore >>>>>>>>>>>>>>>>>>>>>>> >>>>>>>>>>>>>>>>>>>>>> unifying >>>>>>>>>>>>>>> >>>>>>>>>>>>>>>> sources >>>>>>>>>>>>>>>>>>>>>> >>>>>>>>>>>>>>>>>>>>>>> and ParDo's, in particular: can we bring the magic of >>>>>>>>>>>>>>>>>>>>>>> >>>>>>>>>>>>>>>>>>>>>> sources >>>>>>>>>>>>>> >>>>>>>>>>>>>>> to >>>>>>>>>>>>>>> >>>>>>>>>>>>>>>> ParDo's >>>>>>>>>>>>>>>>>>>>>> >>>>>>>>>>>>>>>>>>>>>>> but without the limitations and coding inconveniences? >>>>>> And >>>>>>>>>>>>>>>>>>>>>>> >>>>>>>>>>>>>>>>>>>>>> this >>>>>>>>>>>>>>> >>>>>>>>>>>>>>>> is >>>>>>>>>>>>>>>> >>>>>>>>>>>>>>>>> how >>>>>>>>>>>>>>>>>>>>>> >>>>>>>>>>>>>>>>>>>>>>> SplittableDoFn was born: bringing source magic to a >>>>> DoFn >>>>>>>> by >>>>>>>>>>>>>>>>>>>>>>> >>>>>>>>>>>>>>>>>>>>>> providing >>>>>>>>>>>>>>>>>>>> >>>>>>>>>>>>>>>>>>>>> a >>>>>>>>>>>>>>>>>>>>>> >>>>>>>>>>>>>>>>>>>>>>> RangeTracker. >>>>>>>>>>>>>>>>>>>>>>> - Once the idea of "splittable DoFn's" was born, it >>>>>> became >>>>>>>>>>>>>>>>>>>>>>> >>>>>>>>>>>>>>>>>>>>>> clear >>>>>>>>>>>>>>> >>>>>>>>>>>>>>>> that >>>>>>>>>>>>>>>>>>>> >>>>>>>>>>>>>>>>>>>>> it >>>>>>>>>>>>>>>>>>>>>> >>>>>>>>>>>>>>>>>>>>>>> is strictly more general than sources; at least, in the >>>>>>>>>>>>>>>>>>>>>>> >>>>>>>>>>>>>>>>>>>>>> respect >>>>>>>>>>>>>>> >>>>>>>>>>>>>>>> that >>>>>>>>>>>>>>>>>>>> >>>>>>>>>>>>>>>>>>>>> sources have to produce output, while DoFn's don't: an >>>>> SDF >>>>>>>>>>>>>>>>>>>>>>> >>>>>>>>>>>>>>>>>>>>>> may >>>>>>>>>>>>>> >>>>>>>>>>>>>>> very >>>>>>>>>>>>>>>>>>>> >>>>>>>>>>>>>>>>>>>>> well >>>>>>>>>>>>>>>>>>>>>> >>>>>>>>>>>>>>>>>>>>>>> produce no output at all, and simply perform a side >>>>>> effect >>>>>>>>>>>>>>>>>>>>>>> >>>>>>>>>>>>>>>>>>>>>> in >>>>>>>>>>>>>> >>>>>>>>>>>>>>> a >>>>>>>>>>>>>>> >>>>>>>>>>>>>>>> parallel/resumable way. >>>>>>>>>>>>>>>>>>>>>>> - Then there were countless hours of discussions on >>>>>>>> unifying >>>>>>>>>>>>>>>>>>>>>>> >>>>>>>>>>>>>>>>>>>>>> the >>>>>>>>>>>>>>> >>>>>>>>>>>>>>>> bounded/unbounded cases, on the particulars of RangeTracker >>>>>>>>>>>>>>>>>>>>>>> >>>>>>>>>>>>>>>>>>>>>> APIs >>>>>>>>>>>>>>> >>>>>>>>>>>>>>>> reconciling parallelization and checkpointing, what the >>>>>>>>>>>>>>>>>>>>>>> >>>>>>>>>>>>>>>>>>>>>> relation >>>>>>>>>>>>>>> >>>>>>>>>>>>>>>> between >>>>>>>>>>>>>>>>>>>>>> >>>>>>>>>>>>>>>>>>>>>>> SDF and DF should be, etc. They culminated in the >>>>> current >>>>>>>>>>>>>>>>>>>>>>> >>>>>>>>>>>>>>>>>>>>>> proposal. >>>>>>>>>>>>>>>>>>>> >>>>>>>>>>>>>>>>>>>>> The >>>>>>>>>>>>>>>>>>>>>> >>>>>>>>>>>>>>>>>>>>>>> proposal comes at a time when a couple of key >>>>> ingredients >>>>>>>>>>>>>>>>>>>>>>> >>>>>>>>>>>>>>>>>>>>>> are >>>>>>>>>>>>>> >>>>>>>>>>>>>>> (almost) >>>>>>>>>>>>>>>>>>>>>> >>>>>>>>>>>>>>>>>>>>>>> ready: NewDoFn to make SDF look like a regular DoFn, >>>>> and >>>>>>>> the >>>>>>>>>>>>>>>>>>>>>>> >>>>>>>>>>>>>>>>>>>>>> State/Timers >>>>>>>>>>>>>>>>>>>>>> >>>>>>>>>>>>>>>>>>>>>>> proposal to enable unbounded work per element. >>>>>>>>>>>>>>>>>>>>>>> >>>>>>>>>>>>>>>>>>>>>>> To put it shortly: >>>>>>>>>>>>>>>>>>>>>>> - Yes, we will support existing Source connectors, and >>>>>>>> will >>>>>>>>>>>>>>>>>>>>>>> >>>>>>>>>>>>>>>>>>>>>> support >>>>>>>>>>>>>>>>>>>> >>>>>>>>>>>>>>>>>>>>> writing new ones, possibly forever. There is no >>>>>> interference >>>>>>>>>>>>>>>>>>>>>>> >>>>>>>>>>>>>>>>>>>>>> with >>>>>>>>>>>>>>>> >>>>>>>>>>>>>>>>> current >>>>>>>>>>>>>>>>>>>>>> >>>>>>>>>>>>>>>>>>>>>>> users of Source. >>>>>>>>>>>>>>>>>>>>>>> - The new API is an attempt to improve the Source API, >>>>>>>> taken >>>>>>>>>>>>>>>>>>>>>>> >>>>>>>>>>>>>>>>>>>>>> to >>>>>>>>>>>>>>> >>>>>>>>>>>>>>>> its >>>>>>>>>>>>>>>>>>>> >>>>>>>>>>>>>>>>>>>>> logical limit where it turns out that users' goals can be >>>>>>>>>>>>>>>>>>>>>>> >>>>>>>>>>>>>>>>>>>>>> accomplished >>>>>>>>>>>>>>>>>>>>>> >>>>>>>>>>>>>>>>>>>>>>> easier and more generically entirely within ParDo's. >>>>>>>>>>>>>>>>>>>>>>> >>>>>>>>>>>>>>>>>>>>>>> Let me know what you think, and thanks again! >>>>>>>>>>>>>>>>>>>>>>> >>>>>>>>>>>>>>>>>>>>>>> On Fri, Aug 5, 2016 at 2:39 AM Jean-Baptiste Onofré >>>>>>>>>>>>>>>>>>>>>>> >>>>>>>>>>>>>>>>>>>>>> <[email protected] <javascript:;>> >>>>>>>>>>>>>>>>>>>> >>>>>>>>>>>>>>>>>>>>> wrote: >>>>>>>>>>>>>>>>>>>>>>> >>>>>>>>>>>>>>>>>>>>>>> Hi Eugene, >>>>>>>>>>>>>>>>>>>>>>>> >>>>>>>>>>>>>>>>>>>>>>>> Just a question: why is it in DoFn and note an >>>>>>>> improvement >>>>>>>>>>>>>>>>>>>>>>>> >>>>>>>>>>>>>>>>>>>>>>> of >>>>>>>>>>>>>> >>>>>>>>>>>>>>> Source >>>>>>>>>>>>>>>>>>>> >>>>>>>>>>>>>>>>>>>>> ? >>>>>>>>>>>>>>>>>>>>>> >>>>>>>>>>>>>>>>>>>>>>> >>>>>>>>>>>>>>>>>>>>>>>> If I understand correctly, it means that we will have >>>>> to >>>>>>>>>>>>>>>>>>>>>>>> >>>>>>>>>>>>>>>>>>>>>>> refactore >>>>>>>>>>>>>>>>>>>> >>>>>>>>>>>>>>>>>>>>> all >>>>>>>>>>>>>>>>>>>>>> >>>>>>>>>>>>>>>>>>>>>>> existing IO: basically, what you propose is to remove >>>>> all >>>>>>>>>>>>>>>>>>>>>>>> >>>>>>>>>>>>>>>>>>>>>>> Source >>>>>>>>>>>>>>>> >>>>>>>>>>>>>>>>> to >>>>>>>>>>>>>>>>>>>> >>>>>>>>>>>>>>>>>>>>> replace with NewDoFn. >>>>>>>>>>>>>>>>>>>>>>>> >>>>>>>>>>>>>>>>>>>>>>>> I'm concern with this approach, especially in term of >>>>>>>>>>>>>>>>>>>>>>>> >>>>>>>>>>>>>>>>>>>>>>> timing: >>>>>>>>>>>>>> >>>>>>>>>>>>>>> clearly, >>>>>>>>>>>>>>>>>>>>>> >>>>>>>>>>>>>>>>>>>>>>> the IO is the area where we have to move forward in >>>>> Beam >>>>>>>> as >>>>>>>>>>>>>>>>>>>>>>>> >>>>>>>>>>>>>>>>>>>>>>> it >>>>>>>>>>>>>>> >>>>>>>>>>>>>>>> will >>>>>>>>>>>>>>>>>>>> >>>>>>>>>>>>>>>>>>>>> allow new users to start in their projects. >>>>>>>>>>>>>>>>>>>>>>>> So, we started to bring new IOs: Kafka, JMS, >>>>> Cassandra, >>>>>>>>>>>>>>>>>>>>>>>> >>>>>>>>>>>>>>>>>>>>>>> MongoDB, >>>>>>>>>>>>>>>> >>>>>>>>>>>>>>>>> JDBC, >>>>>>>>>>>>>>>>>>>>>> >>>>>>>>>>>>>>>>>>>>>>> ... and some people started to learn the IO API >>>>>>>>>>>>>>>>>>>>>>>> >>>>>>>>>>>>>>>>>>>>>>> (Bounded/Unbouded >>>>>>>>>>>>>>>> >>>>>>>>>>>>>>>>> source, etc). >>>>>>>>>>>>>>>>>>>>>>>> >>>>>>>>>>>>>>>>>>>>>>>> I think it would make more sense to enhance the IO API >>>>>>>>>>>>>>>>>>>>>>>> >>>>>>>>>>>>>>>>>>>>>>> (Source) >>>>>>>>>>>>>>> >>>>>>>>>>>>>>>> instead >>>>>>>>>>>>>>>>>>>>>> >>>>>>>>>>>>>>>>>>>>>>> of introducing a NewDoFn. >>>>>>>>>>>>>>>>>>>>>>>> >>>>>>>>>>>>>>>>>>>>>>>> What are your thoughts for IO writer like me ? ;) >>>>>>>>>>>>>>>>>>>>>>>> >>>>>>>>>>>>>>>>>>>>>>>> Regards >>>>>>>>>>>>>>>>>>>>>>>> JB >>>>>>>>>>>>>>>>>>>>>>>> >>>>>>>>>>>>>>>>>>>>>>>> On 08/04/2016 07:45 PM, Eugene Kirpichov wrote: >>>>>>>>>>>>>>>>>>>>>>>> >>>>>>>>>>>>>>>>>>>>>>>>> Hello Beam community, >>>>>>>>>>>>>>>>>>>>>>>>> >>>>>>>>>>>>>>>>>>>>>>>>> We (myself, Daniel Mills and Robert Bradshaw) would >>>>>> like >>>>>>>>>>>>>>>>>>>>>>>>> >>>>>>>>>>>>>>>>>>>>>>>> to >>>>>>>>>>>>>> >>>>>>>>>>>>>>> propose >>>>>>>>>>>>>>>>>>>>>> >>>>>>>>>>>>>>>>>>>>>>> "Splittable DoFn" - a major generalization of DoFn, >>>>> which >>>>>>>>>>>>>>>>>>>>>>>>> >>>>>>>>>>>>>>>>>>>>>>>> allows >>>>>>>>>>>>>>>>>>>> >>>>>>>>>>>>>>>>>>>>> processing >>>>>>>>>>>>>>>>>>>>>>>> >>>>>>>>>>>>>>>>>>>>>>>>> of a single element to be non-monolithic, i.e. >>>>>>>>>>>>>>>>>>>>>>>>> >>>>>>>>>>>>>>>>>>>>>>>> checkpointable >>>>>>>>>>>>>>> >>>>>>>>>>>>>>>> and >>>>>>>>>>>>>>>>>>>> >>>>>>>>>>>>>>>>>>>>> parallelizable, as well as doing an unbounded amount of >>>>>>>>>>>>>>>>>>>>>>>>> >>>>>>>>>>>>>>>>>>>>>>>> work >>>>>>>>>>>>>>> >>>>>>>>>>>>>>>> per >>>>>>>>>>>>>>>>>>>> >>>>>>>>>>>>>>>>>>>>> element. >>>>>>>>>>>>>>>>>>>>>>>> >>>>>>>>>>>>>>>>>>>>>>>>> >>>>>>>>>>>>>>>>>>>>>>>>> This allows effectively replacing the current >>>>>>>>>>>>>>>>>>>>>>>>> >>>>>>>>>>>>>>>>>>>>>>>> Bounded/UnboundedSource >>>>>>>>>>>>>>>>>>>>>> >>>>>>>>>>>>>>>>>>>>>>> APIs >>>>>>>>>>>>>>>>>>>>>>>> >>>>>>>>>>>>>>>>>>>>>>>>> with DoFn's that are much easier to code, more >>>>> scalable >>>>>>>>>>>>>>>>>>>>>>>>> >>>>>>>>>>>>>>>>>>>>>>>> and >>>>>>>>>>>>>> >>>>>>>>>>>>>>> composable >>>>>>>>>>>>>>>>>>>>>> >>>>>>>>>>>>>>>>>>>>>>> with >>>>>>>>>>>>>>>>>>>>>>>> >>>>>>>>>>>>>>>>>>>>>>>>> the rest of the Beam programming model, and enables >>>>>> many >>>>>>>>>>>>>>>>>>>>>>>>> >>>>>>>>>>>>>>>>>>>>>>>> use >>>>>>>>>>>>>>> >>>>>>>>>>>>>>>> cases >>>>>>>>>>>>>>>>>>>> >>>>>>>>>>>>>>>>>>>>> that >>>>>>>>>>>>>>>>>>>>>> >>>>>>>>>>>>>>>>>>>>>>> were previously difficult or impossible, as well as >>>>> some >>>>>>>>>>>>>>>>>>>>>>>>> >>>>>>>>>>>>>>>>>>>>>>>> non-obvious new >>>>>>>>>>>>>>>>>>>>>> >>>>>>>>>>>>>>>>>>>>>>> use cases. >>>>>>>>>>>>>>>>>>>>>>>>> >>>>>>>>>>>>>>>>>>>>>>>>> This proposal has been mentioned before in JIRA >>>>>>>> [BEAM-65] >>>>>>>>>>>>>>>>>>>>>>>>> >>>>>>>>>>>>>>>>>>>>>>>> and >>>>>>>>>>>>>>> >>>>>>>>>>>>>>>> some >>>>>>>>>>>>>>>>>>>> >>>>>>>>>>>>>>>>>>>>> Beam >>>>>>>>>>>>>>>>>>>>>> >>>>>>>>>>>>>>>>>>>>>>> meetings, and now the whole thing is written up in a >>>>>>>>>>>>>>>>>>>>>>>>> >>>>>>>>>>>>>>>>>>>>>>>> document: >>>>>>>>>>>>>>>> >>>>>>>>>>>>>>>>> >>>>>>>>>>>>>>>>>>>>>>>>> https://s.apache.org/splittable-do-fn >>>>>>>>>>>>>>>>>>>>>>>>> >>>>>>>>>>>>>>>>>>>>>>>>> Here are some things that become possible with >>>>>>>> Splittable >>>>>>>>>>>>>>>>>>>>>>>>> >>>>>>>>>>>>>>>>>>>>>>>> DoFn: >>>>>>>>>>>>>>>> >>>>>>>>>>>>>>>>> - Efficiently read a filepattern matching millions of >>>>>>>>>>>>>>>>>>>>>>>>> >>>>>>>>>>>>>>>>>>>>>>>> files >>>>>>>>>>>>>> >>>>>>>>>>>>>>> - Read a collection of files that are produced by an >>>>>>>>>>>>>>>>>>>>>>>>> >>>>>>>>>>>>>>>>>>>>>>>> earlier >>>>>>>>>>>>>>> >>>>>>>>>>>>>>>> step >>>>>>>>>>>>>>>>>>>> >>>>>>>>>>>>>>>>>>>>> in the >>>>>>>>>>>>>>>>>>>>>> >>>>>>>>>>>>>>>>>>>>>>> pipeline (e.g. easily implement a connector to a >>>>> storage >>>>>>>>>>>>>>>>>>>>>>>>> >>>>>>>>>>>>>>>>>>>>>>>> system >>>>>>>>>>>>>>>> >>>>>>>>>>>>>>>>> that can >>>>>>>>>>>>>>>>>>>>>> >>>>>>>>>>>>>>>>>>>>>>> export itself to files) >>>>>>>>>>>>>>>>>>>>>>>>> - Implement a Kafka reader by composing a "list >>>>>>>>>>>>>>>>>>>>>>>>> >>>>>>>>>>>>>>>>>>>>>>>> partitions" >>>>>>>>>>>>>> >>>>>>>>>>>>>>> DoFn >>>>>>>>>>>>>>>>>>>> >>>>>>>>>>>>>>>>>>>>> with a >>>>>>>>>>>>>>>>>>>>>> >>>>>>>>>>>>>>>>>>>>>>> DoFn that simply polls a consumer and outputs new >>>>> records >>>>>>>>>>>>>>>>>>>>>>>>> >>>>>>>>>>>>>>>>>>>>>>>> in >>>>>>>>>>>>>>> >>>>>>>>>>>>>>>> a >>>>>>>>>>>>>>>> >>>>>>>>>>>>>>>>> while() >>>>>>>>>>>>>>>>>>>>>> >>>>>>>>>>>>>>>>>>>>>>> loop >>>>>>>>>>>>>>>>>>>>>>>> >>>>>>>>>>>>>>>>>>>>>>>>> - Implement a log tailer by composing a DoFn that >>>>>>>>>>>>>>>>>>>>>>>>> >>>>>>>>>>>>>>>>>>>>>>>> incrementally >>>>>>>>>>>>>>>> >>>>>>>>>>>>>>>>> returns >>>>>>>>>>>>>>>>>>>>>> >>>>>>>>>>>>>>>>>>>>>>> new >>>>>>>>>>>>>>>>>>>>>>>> >>>>>>>>>>>>>>>>>>>>>>>>> files in a directory and a DoFn that tails a file >>>>>>>>>>>>>>>>>>>>>>>>> - Implement a parallel "count friends in common" >>>>>>>>>>>>>>>>>>>>>>>>> >>>>>>>>>>>>>>>>>>>>>>>> algorithm >>>>>>>>>>>>>> >>>>>>>>>>>>>>> (matrix >>>>>>>>>>>>>>>>>>>> >>>>>>>>>>>>>>>>>>>>> squaring) with good work balancing >>>>>>>>>>>>>>>>>>>>>>>>> >>>>>>>>>>>>>>>>>>>>>>>>> Here is the meaningful part of a hypothetical Kafka >>>>>>>>>>>>>>>>>>>>>>>>> >>>>>>>>>>>>>>>>>>>>>>>> reader >>>>>>>>>>>>>> >>>>>>>>>>>>>>> written >>>>>>>>>>>>>>>>>>>> >>>>>>>>>>>>>>>>>>>>> against >>>>>>>>>>>>>>>>>>>>>>>> >>>>>>>>>>>>>>>>>>>>>>>>> this API: >>>>>>>>>>>>>>>>>>>>>>>>> >>>>>>>>>>>>>>>>>>>>>>>>> ProcessContinuation processElement( >>>>>>>>>>>>>>>>>>>>>>>>> ProcessContext context, >>>>> OffsetRangeTracker >>>>>>>>>>>>>>>>>>>>>>>>> >>>>>>>>>>>>>>>>>>>>>>>> tracker) >>>>>>>>>>>>>>>> >>>>>>>>>>>>>>>>> { >>>>>>>>>>>>>>>>>>>> >>>>>>>>>>>>>>>>>>>>> try (KafkaConsumer<String, String> consumer = >>>>>>>>>>>>>>>>>>>>>>>>> >>>>>> Kafka.subscribe(context.element().topic, >>>>>>>>>>>>>>>>>>>>>>>>> >>>>>>>>>>>>>>>>>>>>>>>>> context.element().partition)) { >>>>>>>>>>>>>>>> >>>>>>>>>>>>>>>>> consumer.seek(tracker.start()); >>>>>>>>>>>>>>>>>>>>>>>>> while (true) { >>>>>>>>>>>>>>>>>>>>>>>>> ConsumerRecords<String, String> records = >>>>>>>>>>>>>>>>>>>>>>>>> >>>>>>>>>>>>>>>>>>>>>>>> consumer.poll(100ms); >>>>>>>>>>>>>>>>>>>>>>>> >>>>>>>>>>>>>>>>>>>>>>>>> if (records == null) return done(); >>>>>>>>>>>>>>>>>>>>>>>>> for (ConsumerRecord<String, String> record >>>>> : >>>>>>>>>>>>>>>>>>>>>>>>> >>>>>>>>>>>>>>>>>>>>>>>> records) >>>>>>>>>>>>>>>> >>>>>>>>>>>>>>>>> { >>>>>>>>>>>>>>>>>>>> >>>>>>>>>>>>>>>>>>>>> if (!tracker.tryClaim(record.offset())) { >>>>>>>>>>>>>>>>>>>>>>>>> return >>>>>>>>>>>>>>>>>>>>>>>>> >>>>>>>>>>>>>>>>>>>>>>>> >>>>> resume().withFutureOutputWatermark(record.timestamp()); >>>>>>>>>>>>>>>>>>>>>>>> >>>>>>>>>>>>>>>>>>>>>>>>> } >>>>>>>>>>>>>>>>>>>>>>>>> context.output(record); >>>>>>>>>>>>>>>>>>>>>>>>> } >>>>>>>>>>>>>>>>>>>>>>>>> } >>>>>>>>>>>>>>>>>>>>>>>>> } >>>>>>>>>>>>>>>>>>>>>>>>> } >>>>>>>>>>>>>>>>>>>>>>>>> >>>>>>>>>>>>>>>>>>>>>>>>> The document describes in detail the motivations >>>>> behind >>>>>>>>>>>>>>>>>>>>>>>>> >>>>>>>>>>>>>>>>>>>>>>>> this >>>>>>>>>>>>>>> >>>>>>>>>>>>>>>> feature, >>>>>>>>>>>>>>>>>>>>>> >>>>>>>>>>>>>>>>>>>>>>> the >>>>>>>>>>>>>>>>>>>>>>>> >>>>>>>>>>>>>>>>>>>>>>>>> basic idea and API, open questions, and outlines an >>>>>>>>>>>>>>>>>>>>>>>>> >>>>>>>>>>>>>>>>>>>>>>>> incremental >>>>>>>>>>>>>>>> >>>>>>>>>>>>>>>>> delivery >>>>>>>>>>>>>>>>>>>>>> >>>>>>>>>>>>>>>>>>>>>>> plan. >>>>>>>>>>>>>>>>>>>>>>>>> >>>>>>>>>>>>>>>>>>>>>>>>> The proposed API builds on the reflection-based new >>>>>> DoFn >>>>>>>>>>>>>>>>>>>>>>>>> >>>>>>>>>>>>>>>>>>>>>>>> [new-do-fn] >>>>>>>>>>>>>>>>>>>>>> >>>>>>>>>>>>>>>>>>>>>>> and is >>>>>>>>>>>>>>>>>>>>>>>> >>>>>>>>>>>>>>>>>>>>>>>>> loosely related to "State and Timers for DoFn" >>>>>>>>>>>>>>>>>>>>>>>>> >>>>>>>>>>>>>>>>>>>>>>>> [beam-state]. >>>>>>>>>>>>>>> >>>>>>>>>>>>>>>> >>>>>>>>>>>>>>>>>>>>>>>>> Please take a look and comment! >>>>>>>>>>>>>>>>>>>>>>>>> >>>>>>>>>>>>>>>>>>>>>>>>> Thanks. >>>>>>>>>>>>>>>>>>>>>>>>> >>>>>>>>>>>>>>>>>>>>>>>>> [BEAM-65] >>>>>> https://issues.apache.org/jira/browse/BEAM-65 >>>>>>>>>>>>>>>>>>>>>>>>> [new-do-fn] https://s.apache.org/a-new-do-fn >>>>>>>>>>>>>>>>>>>>>>>>> [beam-state] https://s.apache.org/beam-state >>>>>>>>>>>>>>>>>>>>>>>>> >>>>>>>>>>>>>>>>>>>>>>>>> >>>>>>>>>>>>>>>>>>>>>>>> -- >>>>>>>>>>>>>>>>>>>>>>>> Jean-Baptiste Onofré >>>>>>>>>>>>>>>>>>>>>>>> [email protected] <javascript:;> >>>>>>>>>>>>>>>>>>>>>>>> http://blog.nanthrax.net >>>>>>>>>>>>>>>>>>>>>>>> Talend - http://www.talend.com >>>>>>>>>>>>>>>>>>>>>>>> >>>>>>>>>>>>>>>>>>>>>>>> >>>>>>>>>>>>>>>>>>>>>>> >>>>>>>>>>>>>>>>>>>>> >>>>>>>>>>>>>>>>>>> >>>>>>>>>>>>>>>>>> >>>>>>>>>>>>>>>> >>>>>>>>>>>>>>> >>>>>>>>>>>>>>> -- >>>>>>>>>>>>>>> Thanks, >>>>>>>>>>>>>>> Andrew >>>>>>>>>>>>>>> >>>>>>>>>>>>>>> Subscribe to my book: Streaming Data < >>>>> http://manning.com/psaltis >>>>>>> >>>>>>>>>>>>>>> <https://www.linkedin.com/pub/andrew-psaltis/1/17b/306> >>>>>>>>>>>>>>> twiiter: @itmdata < >>>>>>>>> http://twitter.com/intent/user?screen_name=itmdata >>>>>>>>>>>>>>>> >>>>>>>>>>>>>>> >>>>>>>>>>>>>>> >>>>>>>>>>>>>> >>>>>>>>>>>>> >>>>>>>>>>>> -- >>>>>>>>>>>> Jean-Baptiste Onofré >>>>>>>>>>>> [email protected] >>>>>>>>>>>> http://blog.nanthrax.net >>>>>>>>>>>> Talend - http://www.talend.com >>>>>>>>>>>> >>>>>>>>>>>> >>>>>>>>>>> >>>>>>>>>> -- >>>>>>>>>> Jean-Baptiste Onofré >>>>>>>>>> [email protected] >>>>>>>>>> http://blog.nanthrax.net >>>>>>>>>> Talend - http://www.talend.com >>>>>>>>>> >>>>>>>>> >>>>>>>> >>>>>>> >>>>>> >>>>>> -- >>>>>> Jean-Baptiste Onofré >>>>>> [email protected] >>>>>> http://blog.nanthrax.net >>>>>> Talend - http://www.talend.com >>>>>> >>>>> >>>> >>> >>> -- >>> Jean-Baptiste Onofré >>> [email protected] >>> http://blog.nanthrax.net >>> Talend - http://www.talend.com >>> >>> >> >> > > -- > Jean-Baptiste Onofré > [email protected] > http://blog.nanthrax.net > Talend - http://www.talend.com > >
