Great news! I'd like to to try and see how we can do this for the Flink runner once I have some time.
On Thu, 13 Oct 2016 at 07:40 Jean-Baptiste Onofré <[email protected]> wrote: > Great !!!!! > > Let me experiment a bit in SDF (especially in the IO). > > I keep you posted. > > Regards > JB > > On 10/13/2016 02:55 AM, Eugene Kirpichov wrote: > > Hey all, > > > > An update: https://github.com/apache/incubator-beam/pull/896 has been > > merged, laying groundwork and adding support for splittable DoFn to the > > in-memory runner. > > > > What this PR does: > > - It defines an API, in full accordance with the proposal discussed on > this > > thread. > > - It adds a mostly runner-agnostic expansion of the ParDo transform for a > > splittable DoFn, with one runner-specific primitive transform that needs > to > > be overridden by every runner. > > - It overrides said transform in the in-memory runner, so this works > > end-to-end in the in-memory runner. > > - All this code is covered by tests (unit and integration > > @RunnableOnService) and appears to work properly in combination with the > > rest of the Beam model: e.g., inputs to a splittable DoFn can be > windowed, > > and their windows and timestamps are transparently propagated. > > > > Caveats: > > - The API is marked @Experimental, but this is an understatement: it is > > assumed to be in flux and is not intended to be used yet. Overwhelmingly > > likely, it *will* change in incompatible ways. DO NOT write pipelines > with > > this transform yet. > > - It only works in the in-memory runner: the vast majority of code is > > runner-agnostic, but a central runner-specific primitive transform is > only > > overridden by the in-memory runner. > > > > My immediate next plan is to make this work in the Cloud Dataflow > streaming > > runner (since this is the runner I'm most familiar with), in order to get > > experience with what kind of runner hooks are needed and to put the API > in > > shape for adding hooks for other runners - and then work either myself or > > with the community on making it work in other runners too. Once all > runners > > sufficiently support a particular subset of features, we can start > > transitioning some connectors or writing new ones using that subset (I > > expect that streaming connectors will come first). > > > > Additionally, the Python SDK is considering using Splittable DoFn as the > > *only* API for streaming sources (right now it doesn't have any API for > > that, so there's no compatibility concerns). No implementation work has > > happened yet, but it seems like a good idea. > > > > On Tue, Aug 30, 2016 at 1:45 AM Aljoscha Krettek <[email protected]> > > wrote: > > > >> Thanks for the explanation Eugene and JB. > >> > >> By the way, I'm not trying to find holes in this, I really like the > >> feature. I just sometimes wonder how a specific thing might be > implemented > >> with this. > >> > >> On Mon, 29 Aug 2016 at 18:00 Eugene Kirpichov > <[email protected] > >>> > >> wrote: > >> > >>> Hi Aljoscha, > >>> > >>> The watermark reporting is done via > >>> ProcessContinuation.futureOutputWatermark, at the granularity of > >> returning > >>> from individual processElement() calls - you return from the call and > >> give > >>> a watermark on your future output. We assume that updating watermark is > >>> sufficient at a per-bundle level (or, if not, then that you can make > >>> bundles small enough) cause that's the same level at which state > changes, > >>> timers etc. are committed. > >>> It can be implemented by setting a per-key watermark hold and updating > it > >>> when each call for this element returns. That's the way it is > implemented > >>> in my current prototype > >> https://github.com/apache/incubator-beam/pull/896 > >>> (see > >>> SplittableParDo.ProcessFn) > >>> > >>> On Mon, Aug 29, 2016 at 2:55 AM Aljoscha Krettek <[email protected]> > >>> wrote: > >>> > >>>> Hi, > >>>> I have another question about this: currently, unbounded sources have > >>>> special logic for determining the watermark and the system > periodically > >>>> asks the sources for the current watermark. As I understood it, > >>> watermarks > >>>> are only "generated" at the sources. How will this work when sources > >> are > >>>> implemented as a combination of DoFns and SplittableDoFns? Will > >>>> SplittableDoFns be asked for a watermark, does this mean that > >> watermarks > >>>> can then be "generated" at any operation? > >>>> > >>>> Cheers, > >>>> Aljoscha > >>>> > >>>> On Sun, 21 Aug 2016 at 22:34 Eugene Kirpichov > >>> <[email protected] > >>>>> > >>>> wrote: > >>>> > >>>>> Hi JB, > >>>>> > >>>>> Yes, I'm assuming you're referring to the "magic" part on the > >> transform > >>>>> expansion diagram. This is indeed runner-specific, and timers+state > >> are > >>>>> likely the simplest way to do this for an SDF that does unbounded > >>> amount > >>>> of > >>>>> work. > >>>>> > >>>>> On Sun, Aug 21, 2016 at 12:14 PM Jean-Baptiste Onofré < > >> [email protected] > >>>> > >>>>> wrote: > >>>>> > >>>>>> Anyway, from a runner perspective, we will have kind of API (part > >> of > >>>> the > >>>>>> Runner API) to "orchestrate" the SDF as we discussed during the > >> call, > >>>>>> right ? > >>>>>> > >>>>>> Regards > >>>>>> JB > >>>>>> > >>>>>> On 08/21/2016 07:24 PM, Eugene Kirpichov wrote: > >>>>>>> Hi Aljoscha, > >>>>>>> This is an excellent question! And the answer is, we don't need > >> any > >>>> new > >>>>>>> concepts like "SDF executor" and can rely on the per-key state > >> and > >>>>> timers > >>>>>>> machinery that already exists in all runners because it's > >> necessary > >>>> to > >>>>>>> implement windowing/triggering properly. > >>>>>>> > >>>>>>> Note that this is already somewhat addressed in the previously > >>> posted > >>>>>> State > >>>>>>> and Timers proposal https://s.apache.org/beam-state , under > >>> "per-key > >>>>>>> workflows". > >>>>>>> > >>>>>>> Think of it this way, using the Kafka example: we'll expand it > >>> into a > >>>>>>> transform: > >>>>>>> > >>>>>>> (1) ParDo { topic -> (unique key, topic, partition, [0, inf))) > >> for > >>>>>>> partition in topic.listPartitions() } > >>>>>>> (2) GroupByKey > >>>>>>> (3) ParDo { key, topic, partition, R -> Kafka reader code in the > >>>>>>> proposal/slides } > >>>>>>> - R is the OffsetRange restriction which in this case will be > >>>> always > >>>>> of > >>>>>>> the form [startOffset, inf). > >>>>>>> - there'll be just 1 value per key, but we use GBK to just get > >>>> access > >>>>>> to > >>>>>>> the per-key state/timers machinery. This may be runner-specific; > >>>> maybe > >>>>>> some > >>>>>>> runners don't need a GBK to do that. > >>>>>>> > >>>>>>> Now suppose the topic has two partitions, P1 and P2, and they get > >>>>>> assigned > >>>>>>> unique keys K1, K2. > >>>>>>> Then the input to (3) will be a collection of: (K1, topic, P1, > >> [0, > >>>>> inf)), > >>>>>>> (K2, topic, P2, [0, inf)). > >>>>>>> Suppose we have just 1 worker with just 1 thread. Now, how will > >>> this > >>>>>> thread > >>>>>>> be able to produce elements from both P1 and P2? here's how. > >>>>>>> > >>>>>>> The thread will process (K1, topic, P1, [0, inf)), checkpoint > >>> after a > >>>>>>> certain time or after a certain number of elements are output > >> (just > >>>>> like > >>>>>>> with the current UnboundedSource reading code) producing a > >> residual > >>>>>>> restriction R1' (basically a new start timestamp), put R11 into > >> the > >>>>>> per-key > >>>>>>> state and set a timer T1 to resume. > >>>>>>> Then it will process (K2, topic, P2, [0, inf)), do the same > >>>> producing a > >>>>>>> residual restriction R2' and setting a timer T2 to resume. > >>>>>>> Then timer T1 will fire in the context of the key K1. The thread > >>> will > >>>>>> call > >>>>>>> processElement again, this time supplying R1' as the restriction; > >>> the > >>>>>>> process repeats and after a while it checkpoints and stores R1'' > >>> into > >>>>>> state > >>>>>>> of K1. > >>>>>>> Then timer T2 will fire in the context of K2, run processElement > >>> for > >>>> a > >>>>>>> while, set a new timer and store R2'' into the state of K2. > >>>>>>> Etc. > >>>>>>> If partition 1 goes away, the processElement call will return "do > >>> not > >>>>>>> resume", so a timer will not be set and instead the state > >>> associated > >>>>> with > >>>>>>> K1 will be GC'd. > >>>>>>> > >>>>>>> So basically it's almost like cooperative thread scheduling: > >> things > >>>> run > >>>>>> for > >>>>>>> a while, until the runner tells them to checkpoint, then they > >> set a > >>>>> timer > >>>>>>> to resume themselves, and the runner fires the timers, and the > >>>> process > >>>>>>> repeats. And, again, this only requires things that runners can > >>>> already > >>>>>> do > >>>>>>> - state and timers, but no new concept of SDF executor (and > >>>>> consequently > >>>>>> no > >>>>>>> necessity to choose/tune how many you need). > >>>>>>> > >>>>>>> Makes sense? > >>>>>>> > >>>>>>> On Sat, Aug 20, 2016 at 9:46 AM Aljoscha Krettek < > >>>> [email protected]> > >>>>>>> wrote: > >>>>>>> > >>>>>>>> Hi, > >>>>>>>> I have another question that I think wasn't addressed in the > >>>> meeting. > >>>>> At > >>>>>>>> least it wasn't mentioned in the notes. > >>>>>>>> > >>>>>>>> In the context of replacing sources by a combination of to SDFs, > >>> how > >>>>> do > >>>>>> you > >>>>>>>> determine how many "SDF executor" instances you need downstream? > >>> For > >>>>> the > >>>>>>>> sake of argument assume that both SDFs are executed with > >>>> parallelism 1 > >>>>>> (or > >>>>>>>> one per worker). Now, if you have a file source that reads from > >> a > >>>>> static > >>>>>>>> set of files the first SDF would emit the filenames while the > >>> second > >>>>> SDF > >>>>>>>> would receive the filenames and emit their contents. This works > >>> well > >>>>> and > >>>>>>>> the downstream SDF can process one filename after the other. > >> Now, > >>>>> think > >>>>>> of > >>>>>>>> something like a Kafka source. The first SDF would emit the > >>>> partitions > >>>>>> (say > >>>>>>>> 4 partitions, in this example) and the second SDF would be > >>>> responsible > >>>>>> for > >>>>>>>> reading from a topic and emitting elements. Reading from one > >> topic > >>>>> never > >>>>>>>> finishes so you can't process the topics in series. I think you > >>>> would > >>>>>> need > >>>>>>>> to have 4 downstream "SDF executor" instances. The question now > >>> is: > >>>>> how > >>>>>> do > >>>>>>>> you determine whether you are in the first or the second > >>> situation? > >>>>>>>> > >>>>>>>> Probably I'm just overlooking something and this is already > >> dealt > >>>> with > >>>>>>>> somewhere... :-) > >>>>>>>> > >>>>>>>> Cheers, > >>>>>>>> Aljoscha > >>>>>>>> > >>>>>>>> On Fri, 19 Aug 2016 at 21:02 Ismaël Mejía <[email protected]> > >>>> wrote: > >>>>>>>> > >>>>>>>>> Hello, > >>>>>>>>> > >>>>>>>>> Thanks for the notes both Dan and Eugene, and for taking the > >> time > >>>> to > >>>>> do > >>>>>>>> the > >>>>>>>>> presentation and answer our questions. > >>>>>>>>> > >>>>>>>>> I mentioned the ongoing work on dynamic scaling on Flink > >> because > >>> I > >>>>>>>> suppose > >>>>>>>>> that it will address dynamic rebalancing eventually (there are > >>>>> multiple > >>>>>>>>> changes going on for dynamic scaling). > >>>>>>>>> > >>>>>>>>> > >>>>>>>>> > >>>>>>>> > >>>>>> > >>>>> > >>>> > >>> > >> > https://docs.google.com/document/d/1G1OS1z3xEBOrYD4wSu-LuBCyPUWyFd9l3T9WyssQ63w/edit#heading=h.2suhu1fjxmp4 > >>>>>>>>> > >>>>>>>>> > >>>>> > >> https://lists.apache.org/[email protected]:lte=1M:FLIP-8 > >>>>>>>>> > >>>>>>>>> Anyway I am far from an expert on flink, but probably the flink > >>>> guys > >>>>>> can > >>>>>>>>> give their opinion about this and refer to a more precise > >>> document > >>>>> that > >>>>>>>> the > >>>>>>>>> ones I mentioned.. > >>>>>>>>> > >>>>>>>>> Thanks again, > >>>>>>>>> Ismaël > >>>>>>>>> > >>>>>>>>> On Fri, Aug 19, 2016 at 8:52 PM, Jean-Baptiste Onofré < > >>>>> [email protected] > >>>>>>> > >>>>>>>>> wrote: > >>>>>>>>> > >>>>>>>>>> Great summary Eugene and Dan. > >>>>>>>>>> > >>>>>>>>>> And thanks again for the details, explanation, and discussion. > >>>>>>>>>> > >>>>>>>>>> Regards > >>>>>>>>>> JB > >>>>>>>>>> > >>>>>>>>>> > >>>>>>>>>> On 08/19/2016 08:16 PM, Eugene Kirpichov wrote: > >>>>>>>>>> > >>>>>>>>>>> Thanks for attending, everybody! > >>>>>>>>>>> > >>>>>>>>>>> Here are meeting notes (thanks Dan!). > >>>>>>>>>>> > >>>>>>>>>>> Q: Will SplittableDoFn enable better repartitioning of the > >>>>>>>> input/output > >>>>>>>>>>> data? > >>>>>>>>>>> A: Not really; repartitioning is orthogonal to SDF. > >>>>>>>>>>> > >>>>>>>>>>> Current Source API suffers from lack of composition and > >>>> scalability > >>>>>>>>>>> because > >>>>>>>>>>> we treat sources too much as metadata, not enough as data. > >>>>>>>>>>> > >>>>>>>>>>> Q(slide with transform expansion): who does the "magic"? > >>>>>>>>>>> A: The runner. Checkpointing and dynamically splitting > >>>> restrictions > >>>>>>>> will > >>>>>>>>>>> require collaboration with the runner. > >>>>>>>>>>> > >>>>>>>>>>> Q: How does the runner interact with the DoFn to control the > >>>>>>>>> restrictions? > >>>>>>>>>>> Is it related to the centralized job tracker etc.? > >>>>>>>>>>> A: RestrictionTracker is a simple helper object, that exists > >>>> purely > >>>>>> on > >>>>>>>>> the > >>>>>>>>>>> worker while executing a single partition, and interacts with > >>> the > >>>>>>>> worker > >>>>>>>>>>> harness part of the runner. Not to be confused with the > >>>> centralized > >>>>>>>> job > >>>>>>>>>>> tracker (master) - completely unrelated. Worker harness, of > >>>> course, > >>>>>>>>>>> interacts with the master in some relevant ways (e.g. > >> Dataflow > >>>>> master > >>>>>>>>> can > >>>>>>>>>>> tell "you're a straggler, you should split"). > >>>>>>>>>>> > >>>>>>>>>>> Q: Is this a new DoFn subclass, or how will this integrate > >> with > >>>> the > >>>>>>>>>>> existing code? > >>>>>>>>>>> A: It's a feature of reflection-based DoFn ( > >>>>>>>>> https://s.apache.org/a-new-do > >>>>>>>>>>> fn) > >>>>>>>>>>> - just another optional parameter of type RestrictionTracker > >> to > >>>>>>>>>>> processElement() which is dynamically bound via reflection, > >> so > >>>>> fully > >>>>>>>>>>> backward/forward compatible, and looks to users like a > >> regular > >>>>> DoFn. > >>>>>>>>>>> > >>>>>>>>>>> Q: why is fractionClaimed a double? > >>>>>>>>>>> A: It's a number in 0..1. Job-wide magic (e.g. autoscaling, > >>>> dynamic > >>>>>>>>>>> rebalancing) requires a uniform way to represent progress > >>> through > >>>>>>>>>>> different > >>>>>>>>>>> sources. > >>>>>>>>>>> > >>>>>>>>>>> Q: Spark runner is microbatch-based, so this seems to map > >> well > >>>> onto > >>>>>>>>>>> checkpoint/resume, right? > >>>>>>>>>>> A: Yes; actually the Dataflow runner is, at a worker level, > >>> also > >>>>>>>>>>> microbatch-based. The way SDF interacts with a runner will be > >>>> very > >>>>>>>>> similar > >>>>>>>>>>> to how a Bounded/UnboundedSource interacts with a runner. > >>>>>>>>>>> > >>>>>>>>>>> Q: Using SDF, what would be the "packaging" of the IO? > >>>>>>>>>>> A: Same as currently: package IO's as PTransforms and their > >>>>>>>>> implementation > >>>>>>>>>>> under the hood can be anything: Source, simple ParDo's, SDF, > >>> etc. > >>>>>> E.g. > >>>>>>>>>>> Datastore was recently refactored from BoundedSource to ParDo > >>>>> (ended > >>>>>>>> up > >>>>>>>>>>> simpler and more scalable), transparently to users. > >>>>>>>>>>> > >>>>>>>>>>> Q: What's the timeline; what to do with the IOs currently in > >>>>>>>>> development? > >>>>>>>>>>> A: Timeline is O(months). Keep doing what you're doing and > >>>> working > >>>>> on > >>>>>>>>> top > >>>>>>>>>>> of Source APIs when necessary and simple ParDo's otherwise. > >>>>>>>>>>> > >>>>>>>>>>> Q: What's the impact for the runner writers? > >>>>>>>>>>> A: Tentatively expected that most of the code for running an > >>> SDF > >>>>> will > >>>>>>>> be > >>>>>>>>>>> common to runners, with some amount of per-runner glue code, > >>> just > >>>>>> like > >>>>>>>>>>> GBK/windowing/triggering. Impact on Dataflow runner is larger > >>>> since > >>>>>> it > >>>>>>>>>>> supports dynamic rebalancing in batch mode and this is the > >>>> hardest > >>>>>>>> part, > >>>>>>>>>>> but for other runners shouldn't be too hard. > >>>>>>>>>>> > >>>>>>>>>>> JB: Talend has people who can help with this: e.g. help > >>> integrate > >>>>>> into > >>>>>>>>>>> Spark runner, refactor IOs etc. Amit also willing to chat > >> about > >>>>>>>>> supporting > >>>>>>>>>>> SDF in Spark runner. > >>>>>>>>>>> > >>>>>>>>>>> Ismael: There's a Flink proposal about dynamic rebalancing. > >>>> Ismael > >>>>>>>> will > >>>>>>>>>>> send a link. > >>>>>>>>>>> > >>>>>>>>>>> > >>>>>>>>>>> On Fri, Aug 19, 2016 at 7:21 AM Jean-Baptiste Onofré < > >>>>>> [email protected] > >>>>>>>>> > >>>>>>>>>>> wrote: > >>>>>>>>>>> > >>>>>>>>>>> Hi Eugene, > >>>>>>>>>>>> > >>>>>>>>>>>> thanks for the reminder. > >>>>>>>>>>>> > >>>>>>>>>>>> Just to prepare some topics for the call, please find some > >>>> points: > >>>>>>>>>>>> > >>>>>>>>>>>> 1. Using SDF, what would be the "packaging" of the IO ? It > >>>> sounds > >>>>> to > >>>>>>>> me > >>>>>>>>>>>> that we can keep the IO packaging style (using with* setters > >>> for > >>>>> the > >>>>>>>> IO > >>>>>>>>>>>> configuration) and replace PTransform, Source, Reader, ... > >>>>> directly > >>>>>>>>> with > >>>>>>>>>>>> SDF. Correct ? > >>>>>>>>>>>> > >>>>>>>>>>>> 2. What's your plan in term of release to include SDF ? We > >>> have > >>>>>>>> several > >>>>>>>>>>>> IOs in preparation and I wonder if it's worth to start to > >> use > >>>> the > >>>>>> new > >>>>>>>>>>>> SDF API or not. > >>>>>>>>>>>> > >>>>>>>>>>>> 3. What's the impact for the runner writers ? The runners > >> will > >>>>> have > >>>>>>>> to > >>>>>>>>>>>> support SDF, that could be tricky depending of the execution > >>>>> engine. > >>>>>>>> In > >>>>>>>>>>>> the worst case where the runner can't fully support SDF, > >> does > >>> it > >>>>>> mean > >>>>>>>>>>>> that most of our IOs will be useless ? > >>>>>>>>>>>> > >>>>>>>>>>>> Just my dumb topics ;) > >>>>>>>>>>>> > >>>>>>>>>>>> Thanks, > >>>>>>>>>>>> See you at 8am ! > >>>>>>>>>>>> > >>>>>>>>>>>> Regards > >>>>>>>>>>>> JB > >>>>>>>>>>>> > >>>>>>>>>>>> On 08/19/2016 02:29 AM, Eugene Kirpichov wrote: > >>>>>>>>>>>> > >>>>>>>>>>>>> Hello everybody, > >>>>>>>>>>>>> > >>>>>>>>>>>>> Just a reminder: > >>>>>>>>>>>>> > >>>>>>>>>>>>> The meeting is happening tomorrow - Friday Aug 19th, > >> 8am-9am > >>>> PST, > >>>>>> to > >>>>>>>>>>>>> join > >>>>>>>>>>>>> the call go to > >>>>>>>>>>>>> > >>>> https://hangouts.google.com/hangouts/_/google.com/splittabledofn > >>>>> . > >>>>>>>>>>>>> I intend to go over the proposed design and then have a > >>>> free-form > >>>>>>>>>>>>> discussion. > >>>>>>>>>>>>> > >>>>>>>>>>>>> Please have a skim through the proposal doc: > >>>>> https://s.apache.org/ > >>>>>>>>>>>>> splittable-do-fn > >>>>>>>>>>>>> I also made some slides that are basically a trimmed-down > >>>> version > >>>>>> of > >>>>>>>>> the > >>>>>>>>>>>>> doc to use as a guide when conducting the meeting, > >>>>>>>>>>>>> > >>>>>>>>>>>>> > >> https://docs.google.com/presentation/d/1DfCp9VRLH-5Ap5nbz047 > >>>>>>>>>>>> Fiv3NfjrywJVSdef5dRY4aE/edit?usp=sharing > >>>>>>>>>>>> > >>>>>>>>>>>>> . > >>>>>>>>>>>>> > >>>>>>>>>>>>> I will post notes from the meeting on this thread > >> afterwards. > >>>>>>>>>>>>> > >>>>>>>>>>>>> Thanks, looking forward. > >>>>>>>>>>>>> > >>>>>>>>>>>>> On Fri, Aug 12, 2016 at 5:35 PM Dan Halperin > >>>>>>>>>>>>> <[email protected] > >>>>>>>>>>>>> > >>>>>>>>>>>>> wrote: > >>>>>>>>>>>>> > >>>>>>>>>>>>> This is pretty cool! I'll be there too. (unless the hangout > >>>> gets > >>>>>> too > >>>>>>>>>>>>>> > >>>>>>>>>>>>> full > >>>>>>>>>>>> > >>>>>>>>>>>>> -- if so, I'll drop out in favor of others who aren't lucky > >>>>> enough > >>>>>>>> to > >>>>>>>>>>>>>> > >>>>>>>>>>>>> get > >>>>>>>>>>>> > >>>>>>>>>>>>> to talk to Eugene all the time.) > >>>>>>>>>>>>>> > >>>>>>>>>>>>>> On Fri, Aug 12, 2016 at 4:03 PM, Andrew Psaltis < > >>>>>>>>>>>>>> > >>>>>>>>>>>>> [email protected]> > >>>>>>>>>>>> > >>>>>>>>>>>>> wrote: > >>>>>>>>>>>>>> > >>>>>>>>>>>>>> +1 I'll join > >>>>>>>>>>>>>>> > >>>>>>>>>>>>>>> On Friday, August 12, 2016, Aparup Banerjee (apbanerj) < > >>>>>>>>>>>>>>> > >>>>>>>>>>>>>> [email protected] > >>>>>>>>>>>>>> > >>>>>>>>>>>>>>> > >>>>>>>>>>>>>>>> wrote: > >>>>>>>>>>>>>>> > >>>>>>>>>>>>>>> + 1, me2 > >>>>>>>>>>>>>>>> > >>>>>>>>>>>>>>>> > >>>>>>>>>>>>>>>> > >>>>>>>>>>>>>>>> > >>>>>>>>>>>>>>>> On 8/12/16, 9:27 AM, "Amit Sela" <[email protected] > >>>>>>>>>>>>>>>> > >>>>>>>>>>>>>>> <javascript:;>> > >>>>>>>>>>>> > >>>>>>>>>>>>> wrote: > >>>>>>>>>>>>>>>> > >>>>>>>>>>>>>>>> +1 as in I'll join ;-) > >>>>>>>>>>>>>>>>> > >>>>>>>>>>>>>>>>> On Fri, Aug 12, 2016, 19:14 Eugene Kirpichov > >>>>>>>>>>>>>>>>> > >>>>>>>>>>>>>>>> <[email protected] > >>>>>>>>>>>>>>> > >>>>>>>>>>>>>>>> > >>>>>>>>>>>>>>>>> wrote: > >>>>>>>>>>>>>>>>> > >>>>>>>>>>>>>>>>> Sounds good, thanks! > >>>>>>>>>>>>>>>>>> Then Friday Aug 19th it is, 8am-9am PST, > >>>>>>>>>>>>>>>>>> > >> https://staging.talkgadget.google.com/hangouts/_/google > >>> . > >>>>>>>>>>>>>>>>>> > >>>>>>>>>>>>>>>>> com/splittabledofn > >>>>>>>>>>>>>>>> > >>>>>>>>>>>>>>>>> > >>>>>>>>>>>>>>>>>> On Thu, Aug 11, 2016 at 11:12 PM Jean-Baptiste Onofré > >> < > >>>>>>>>>>>>>>>>>> > >>>>>>>>>>>>>>>>> [email protected] > >>>>>>>>>>>>>>> > >>>>>>>>>>>>>>>> <javascript:;>> > >>>>>>>>>>>>>>>> > >>>>>>>>>>>>>>>>> wrote: > >>>>>>>>>>>>>>>>>> > >>>>>>>>>>>>>>>>>> Hi > >>>>>>>>>>>>>>>>>>> > >>>>>>>>>>>>>>>>>>> Unfortunately I will be in Ireland on August 15th. > >> What > >>>>> about > >>>>>>>>>>>>>>>>>>> > >>>>>>>>>>>>>>>>>> Friday > >>>>>>>>>>>>>> > >>>>>>>>>>>>>>> 19th ? > >>>>>>>>>>>>>>>>>> > >>>>>>>>>>>>>>>>>>> > >>>>>>>>>>>>>>>>>>> Regards > >>>>>>>>>>>>>>>>>>> JB > >>>>>>>>>>>>>>>>>>> > >>>>>>>>>>>>>>>>>>> > >>>>>>>>>>>>>>>>>>> > >>>>>>>>>>>>>>>>>>> On Aug 11, 2016, 23:22, at 23:22, Eugene Kirpichov > >>>>>>>>>>>>>>>>>>> <[email protected]> wrote: > >>>>>>>>>>>>>>>>>>> > >>>>>>>>>>>>>>>>>>>> Hi JB, > >>>>>>>>>>>>>>>>>>>> > >>>>>>>>>>>>>>>>>>>> Sounds great, does the suggested time over > >>>> videoconference > >>>>>>>> work > >>>>>>>>>>>>>>>>>>>> > >>>>>>>>>>>>>>>>>>> for > >>>>>>>>>>>>>> > >>>>>>>>>>>>>>> you? > >>>>>>>>>>>>>>>>>>>> > >>>>>>>>>>>>>>>>>>>> On Thu, Aug 11, 2016 at 11:59 AM Jean-Baptiste > >> Onofré > >>> < > >>>>>>>>>>>>>>>>>>>> > >>>>>>>>>>>>>>>>>>> [email protected] <javascript:;>> > >>>>>>>>>>>>>>>> > >>>>>>>>>>>>>>>>> wrote: > >>>>>>>>>>>>>>>>>>>> > >>>>>>>>>>>>>>>>>>>> Hi Eugene > >>>>>>>>>>>>>>>>>>>>> > >>>>>>>>>>>>>>>>>>>>> May we talk together next week ? I like the > >>> proposal. I > >>>>>>>> would > >>>>>>>>>>>>>>>>>>>>> > >>>>>>>>>>>>>>>>>>>> just > >>>>>>>>>>>>>>> > >>>>>>>>>>>>>>>> need > >>>>>>>>>>>>>>>>>>>> > >>>>>>>>>>>>>>>>>>>>> some details for my understanding. > >>>>>>>>>>>>>>>>>>>>> > >>>>>>>>>>>>>>>>>>>>> Thanks > >>>>>>>>>>>>>>>>>>>>> Regards > >>>>>>>>>>>>>>>>>>>>> JB > >>>>>>>>>>>>>>>>>>>>> > >>>>>>>>>>>>>>>>>>>>> > >>>>>>>>>>>>>>>>>>>>> > >>>>>>>>>>>>>>>>>>>>> On Aug 11, 2016, 19:46, at 19:46, Eugene Kirpichov > >>>>>>>>>>>>>>>>>>>>> <[email protected]> wrote: > >>>>>>>>>>>>>>>>>>>>> > >>>>>>>>>>>>>>>>>>>>>> Hi JB, > >>>>>>>>>>>>>>>>>>>>>> > >>>>>>>>>>>>>>>>>>>>>> What are your thoughts on this? > >>>>>>>>>>>>>>>>>>>>>> > >>>>>>>>>>>>>>>>>>>>>> I'm also thinking of having a virtual meeting to > >>>> explain > >>>>>>>> more > >>>>>>>>>>>>>>>>>>>>>> > >>>>>>>>>>>>>>>>>>>>> about > >>>>>>>>>>>>>>>> > >>>>>>>>>>>>>>>>> this > >>>>>>>>>>>>>>>>>>>>>> proposal if necessary, since I understand it is a > >>> lot > >>>> to > >>>>>>>>>>>>>>>>>>>>>> > >>>>>>>>>>>>>>>>>>>>> digest. > >>>>>>>>>>>>>> > >>>>>>>>>>>>>>> > >>>>>>>>>>>>>>>>>>>>>> How about: Monday Aug 15, 8am-9am Pacific time, > >> over > >>>>>>>>> Hangouts? > >>>>>>>>>>>>>>>>>>>>>> (link: > >>>>>>>>>>>>>>>>>>>>>> > >>>>>>>>>>>>>>>>>>>>>> > >>>>>>>>>>>>>>>>>>>>> > >>>>>>>>>>>>>>>>>>>> > >>>>>>>>>>>>>>>>>>> > >>> https://staging.talkgadget.google.com/hangouts/_/google. > >>>>>>>>>>>>>>>>>> > >>>>>>>>>>>>>>>>> com/splittabledofn > >>>>>>>>>>>>>>>> > >>>>>>>>>>>>>>>>> - > >>>>>>>>>>>>>>>>>>>>>> I confirmed that it can be joined without being > >>> logged > >>>>>>>> into a > >>>>>>>>>>>>>>>>>>>>>> > >>>>>>>>>>>>>>>>>>>>> Google > >>>>>>>>>>>>>>>> > >>>>>>>>>>>>>>>>> account) > >>>>>>>>>>>>>>>>>>>>>> > >>>>>>>>>>>>>>>>>>>>>> Who'd be interested in attending, and does this > >>>>> time/date > >>>>>>>>> work > >>>>>>>>>>>>>>>>>>>>>> > >>>>>>>>>>>>>>>>>>>>> for > >>>>>>>>>>>>>>> > >>>>>>>>>>>>>>>> people? > >>>>>>>>>>>>>>>>>>>>>> > >>>>>>>>>>>>>>>>>>>>>> On Fri, Aug 5, 2016 at 10:48 AM Eugene Kirpichov > >>>>>>>>>>>>>>>>>>>>>> > >>>>>>>>>>>>>>>>>>>>> <[email protected] <javascript:;>> > >>>>>>>>>>>>>>>>>>>> > >>>>>>>>>>>>>>>>>>>>> wrote: > >>>>>>>>>>>>>>>>>>>>>> > >>>>>>>>>>>>>>>>>>>>>> Hi JB, thanks for reading and for your comments! > >>>>>>>>>>>>>>>>>>>>>>> > >>>>>>>>>>>>>>>>>>>>>>> It sounds like you are concerned about continued > >>>>> support > >>>>>>>> for > >>>>>>>>>>>>>>>>>>>>>>> > >>>>>>>>>>>>>>>>>>>>>> existing > >>>>>>>>>>>>>>>>>>>> > >>>>>>>>>>>>>>>>>>>>> IO's > >>>>>>>>>>>>>>>>>>>>>> > >>>>>>>>>>>>>>>>>>>>>>> people have developed, and about backward > >>>>> compatibility? > >>>>>>>>>>>>>>>>>>>>>>> > >>>>>>>>>>>>>>>>>>>>>>> We do not need to remove the Source API, and all > >>>>> existing > >>>>>>>>>>>>>>>>>>>>>>> > >>>>>>>>>>>>>>>>>>>>>> Source-based > >>>>>>>>>>>>>>>>>>>>>> > >>>>>>>>>>>>>>>>>>>>>>> connectors will continue to work [though the > >>> document > >>>>>>>>>>>>>>>>>>>>>>> > >>>>>>>>>>>>>>>>>>>>>> proposes > >>>>>>>>>>>>>> > >>>>>>>>>>>>>>> at > >>>>>>>>>>>>>>>> > >>>>>>>>>>>>>>>>> some > >>>>>>>>>>>>>>>>>>>>>> > >>>>>>>>>>>>>>>>>>>>>>> point to make Read.from(Source) to translate to a > >>>>> wrapper > >>>>>>>>>>>>>>>>>>>>>>> > >>>>>>>>>>>>>>>>>>>>>> SDF > >>>>>>>>>>>>>> > >>>>>>>>>>>>>>> under > >>>>>>>>>>>>>>>>>>>> > >>>>>>>>>>>>>>>>>>>>> the > >>>>>>>>>>>>>>>>>>>>>> > >>>>>>>>>>>>>>>>>>>>>>> hood, to exercise the feature more and to make > >> sure > >>>>> that > >>>>>>>> it > >>>>>>>>>>>>>>>>>>>>>>> > >>>>>>>>>>>>>>>>>>>>>> is > >>>>>>>>>>>>>> > >>>>>>>>>>>>>>> strictly > >>>>>>>>>>>>>>>>>>>>>> > >>>>>>>>>>>>>>>>>>>>>>> more powerful - but this is an optional > >>>> implementation > >>>>>>>>>>>>>>>>>>>>>>> > >>>>>>>>>>>>>>>>>>>>>> detail]. > >>>>>>>>>>>>>>> > >>>>>>>>>>>>>>>> > >>>>>>>>>>>>>>>>>>>>>>> Perhaps the document phrases this too strongly - > >>>>>>>> "replacing > >>>>>>>>>>>>>>>>>>>>>>> > >>>>>>>>>>>>>>>>>>>>>> the > >>>>>>>>>>>>>>> > >>>>>>>>>>>>>>>> Source > >>>>>>>>>>>>>>>>>>>>>> > >>>>>>>>>>>>>>>>>>>>>>> API": a better phrasing would be "introducing a > >> new > >>>> API > >>>>>> so > >>>>>>>>>>>>>>>>>>>>>>> > >>>>>>>>>>>>>>>>>>>>>> powerful > >>>>>>>>>>>>>>>>>>>> > >>>>>>>>>>>>>>>>>>>>> and > >>>>>>>>>>>>>>>>>>>>>> > >>>>>>>>>>>>>>>>>>>>>>> easy-to-use that hopefully people will choose it > >>> over > >>>>> the > >>>>>>>>>>>>>>>>>>>>>>> > >>>>>>>>>>>>>>>>>>>>>> Source > >>>>>>>>>>>>>>> > >>>>>>>>>>>>>>>> API > >>>>>>>>>>>>>>>>>>>> > >>>>>>>>>>>>>>>>>>>>> all > >>>>>>>>>>>>>>>>>>>>>> > >>>>>>>>>>>>>>>>>>>>>>> the time, even though they don't have to" :) And > >> we > >>>> can > >>>>>>>>>>>>>>>>>>>>>>> > >>>>>>>>>>>>>>>>>>>>>> discuss > >>>>>>>>>>>>>>> > >>>>>>>>>>>>>>>> whether or > >>>>>>>>>>>>>>>>>>>>>> > >>>>>>>>>>>>>>>>>>>>>>> not to actually deprecate/remove the Source API > >> at > >>>> some > >>>>>>>>>>>>>>>>>>>>>>> > >>>>>>>>>>>>>>>>>>>>>> point > >>>>>>>>>>>>>> > >>>>>>>>>>>>>>> down > >>>>>>>>>>>>>>>> > >>>>>>>>>>>>>>>>> the > >>>>>>>>>>>>>>>>>>>>>> > >>>>>>>>>>>>>>>>>>>>>>> road, once it becomes clear whether this is the > >>> case > >>>> or > >>>>>>>> not. > >>>>>>>>>>>>>>>>>>>>>>> > >>>>>>>>>>>>>>>>>>>>>>> To give more context: this proposal came out of > >>>>>>>> discussions > >>>>>>>>>>>>>>>>>>>>>>> > >>>>>>>>>>>>>>>>>>>>>> within > >>>>>>>>>>>>>>>> > >>>>>>>>>>>>>>>>> the SDK > >>>>>>>>>>>>>>>>>>>>>> > >>>>>>>>>>>>>>>>>>>>>>> team over the past ~1.5 years, before the Beam > >>>> project > >>>>>>>>>>>>>>>>>>>>>>> > >>>>>>>>>>>>>>>>>>>>>> existed, > >>>>>>>>>>>>>>> > >>>>>>>>>>>>>>>> on > >>>>>>>>>>>>>>>> > >>>>>>>>>>>>>>>>> how to > >>>>>>>>>>>>>>>>>>>>>> > >>>>>>>>>>>>>>>>>>>>>>> make major improvements to the Source API; > >> perhaps > >>> it > >>>>>> will > >>>>>>>>>>>>>>>>>>>>>>> > >>>>>>>>>>>>>>>>>>>>>> clarify > >>>>>>>>>>>>>>>> > >>>>>>>>>>>>>>>>> things > >>>>>>>>>>>>>>>>>>>>>> > >>>>>>>>>>>>>>>>>>>>>>> if I give a history of the ideas discussed: > >>>>>>>>>>>>>>>>>>>>>>> - The first idea was to introduce a > >>>>>>>>>>>>>>>>>>>>>>> > >>>>>>>>>>>>>>>>>>>>>> Read.from(PCollection<Source>) > >>>>>>>>>>>>>>>> > >>>>>>>>>>>>>>>>> transform while keeping the Source API intact - this, > >>> given > >>>>>>>>>>>>>>>>>>>>>>> > >>>>>>>>>>>>>>>>>>>>>> appropriate > >>>>>>>>>>>>>>>>>>>>>> > >>>>>>>>>>>>>>>>>>>>>>> implementation, would solve most of the > >> scalability > >>>> and > >>>>>>>>>>>>>>>>>>>>>>> > >>>>>>>>>>>>>>>>>>>>>> composability > >>>>>>>>>>>>>>>>>>>> > >>>>>>>>>>>>>>>>>>>>> issues of IO's. Then most connectors would look > >> like > >>> : > >>>>>>>>>>>>>>>>>>>>>>> > >>>>>>>>>>>>>>>>>>>>>> ParDo<A, > >>>>>>>>>>>>>>> > >>>>>>>>>>>>>>>> Source<B>> > >>>>>>>>>>>>>>>>>>>>>> > >>>>>>>>>>>>>>>>>>>>>>> + Read.from(). > >>>>>>>>>>>>>>>>>>>>>>> - Then we figured that the Source class is an > >>>>> unnecessary > >>>>>>>>>>>>>>>>>>>>>>> > >>>>>>>>>>>>>>>>>>>>>> abstraction, as > >>>>>>>>>>>>>>>>>>>>>> > >>>>>>>>>>>>>>>>>>>>>>> it simply holds data. What if we only had a > >>> Reader<S, > >>>>> B> > >>>>>>>>>>>>>>>>>>>>>>> > >>>>>>>>>>>>>>>>>>>>>> class > >>>>>>>>>>>>>> > >>>>>>>>>>>>>>> where > >>>>>>>>>>>>>>>>>>>> > >>>>>>>>>>>>>>>>>>>>> S is > >>>>>>>>>>>>>>>>>>>>>> > >>>>>>>>>>>>>>>>>>>>>>> the source type and B the output type? Then > >>>> connectors > >>>>>>>> would > >>>>>>>>>>>>>>>>>>>>>>> > >>>>>>>>>>>>>>>>>>>>>> be > >>>>>>>>>>>>>>> > >>>>>>>>>>>>>>>> something > >>>>>>>>>>>>>>>>>>>>>> > >>>>>>>>>>>>>>>>>>>>>>> like: ParDo<A, S> + hypothetical > >>> Read.using(Reader<S, > >>>>>> B>). > >>>>>>>>>>>>>>>>>>>>>>> - Then somebody remarked that some of the > >> features > >>> of > >>>>>>>> Source > >>>>>>>>>>>>>>>>>>>>>>> > >>>>>>>>>>>>>>>>>>>>>> are > >>>>>>>>>>>>>>> > >>>>>>>>>>>>>>>> useful to > >>>>>>>>>>>>>>>>>>>>>> > >>>>>>>>>>>>>>>>>>>>>>> ParDo's as well: e.g. ability to report progress > >>> when > >>>>>>>>>>>>>>>>>>>>>>> > >>>>>>>>>>>>>>>>>>>>>> processing a > >>>>>>>>>>>>>>>> > >>>>>>>>>>>>>>>>> very > >>>>>>>>>>>>>>>>>>>>>> > >>>>>>>>>>>>>>>>>>>>>>> heavy element, or ability to produce very large > >>>> output > >>>>> in > >>>>>>>>>>>>>>>>>>>>>>> > >>>>>>>>>>>>>>>>>>>>>> parallel. > >>>>>>>>>>>>>>>>>>>> > >>>>>>>>>>>>>>>>>>>>> - The two previous bullets were already hinting > >> that > >>>> the > >>>>>>>>>>>>>>>>>>>>>>> > >>>>>>>>>>>>>>>>>>>>>> Read.using() > >>>>>>>>>>>>>>>>>>>> > >>>>>>>>>>>>>>>>>>>>> primitive might not be so special: it just takes S > >>> and > >>>>>>>>>>>>>>>>>>>>>>> > >>>>>>>>>>>>>>>>>>>>>> produces > >>>>>>>>>>>>>>> > >>>>>>>>>>>>>>>> B: > >>>>>>>>>>>>>>>> > >>>>>>>>>>>>>>>>> isn't > >>>>>>>>>>>>>>>>>>>>>> > >>>>>>>>>>>>>>>>>>>>>>> that what a ParDo does, plus some source magic, > >>> minus > >>>>> the > >>>>>>>>>>>>>>>>>>>>>>> > >>>>>>>>>>>>>>>>>>>>>> convenience > >>>>>>>>>>>>>>>>>>>> > >>>>>>>>>>>>>>>>>>>>> of > >>>>>>>>>>>>>>>>>>>>>> > >>>>>>>>>>>>>>>>>>>>>>> c.output() vs. the start/advance() state machine? > >>>>>>>>>>>>>>>>>>>>>>> - At this point it became clear that we should > >>>> explore > >>>>>>>>>>>>>>>>>>>>>>> > >>>>>>>>>>>>>>>>>>>>>> unifying > >>>>>>>>>>>>>>> > >>>>>>>>>>>>>>>> sources > >>>>>>>>>>>>>>>>>>>>>> > >>>>>>>>>>>>>>>>>>>>>>> and ParDo's, in particular: can we bring the > >> magic > >>> of > >>>>>>>>>>>>>>>>>>>>>>> > >>>>>>>>>>>>>>>>>>>>>> sources > >>>>>>>>>>>>>> > >>>>>>>>>>>>>>> to > >>>>>>>>>>>>>>> > >>>>>>>>>>>>>>>> ParDo's > >>>>>>>>>>>>>>>>>>>>>> > >>>>>>>>>>>>>>>>>>>>>>> but without the limitations and coding > >>>> inconveniences? > >>>>>> And > >>>>>>>>>>>>>>>>>>>>>>> > >>>>>>>>>>>>>>>>>>>>>> this > >>>>>>>>>>>>>>> > >>>>>>>>>>>>>>>> is > >>>>>>>>>>>>>>>> > >>>>>>>>>>>>>>>>> how > >>>>>>>>>>>>>>>>>>>>>> > >>>>>>>>>>>>>>>>>>>>>>> SplittableDoFn was born: bringing source magic > >> to a > >>>>> DoFn > >>>>>>>> by > >>>>>>>>>>>>>>>>>>>>>>> > >>>>>>>>>>>>>>>>>>>>>> providing > >>>>>>>>>>>>>>>>>>>> > >>>>>>>>>>>>>>>>>>>>> a > >>>>>>>>>>>>>>>>>>>>>> > >>>>>>>>>>>>>>>>>>>>>>> RangeTracker. > >>>>>>>>>>>>>>>>>>>>>>> - Once the idea of "splittable DoFn's" was born, > >> it > >>>>>> became > >>>>>>>>>>>>>>>>>>>>>>> > >>>>>>>>>>>>>>>>>>>>>> clear > >>>>>>>>>>>>>>> > >>>>>>>>>>>>>>>> that > >>>>>>>>>>>>>>>>>>>> > >>>>>>>>>>>>>>>>>>>>> it > >>>>>>>>>>>>>>>>>>>>>> > >>>>>>>>>>>>>>>>>>>>>>> is strictly more general than sources; at least, > >> in > >>>> the > >>>>>>>>>>>>>>>>>>>>>>> > >>>>>>>>>>>>>>>>>>>>>> respect > >>>>>>>>>>>>>>> > >>>>>>>>>>>>>>>> that > >>>>>>>>>>>>>>>>>>>> > >>>>>>>>>>>>>>>>>>>>> sources have to produce output, while DoFn's don't: > >>> an > >>>>> SDF > >>>>>>>>>>>>>>>>>>>>>>> > >>>>>>>>>>>>>>>>>>>>>> may > >>>>>>>>>>>>>> > >>>>>>>>>>>>>>> very > >>>>>>>>>>>>>>>>>>>> > >>>>>>>>>>>>>>>>>>>>> well > >>>>>>>>>>>>>>>>>>>>>> > >>>>>>>>>>>>>>>>>>>>>>> produce no output at all, and simply perform a > >> side > >>>>>> effect > >>>>>>>>>>>>>>>>>>>>>>> > >>>>>>>>>>>>>>>>>>>>>> in > >>>>>>>>>>>>>> > >>>>>>>>>>>>>>> a > >>>>>>>>>>>>>>> > >>>>>>>>>>>>>>>> parallel/resumable way. > >>>>>>>>>>>>>>>>>>>>>>> - Then there were countless hours of discussions > >> on > >>>>>>>> unifying > >>>>>>>>>>>>>>>>>>>>>>> > >>>>>>>>>>>>>>>>>>>>>> the > >>>>>>>>>>>>>>> > >>>>>>>>>>>>>>>> bounded/unbounded cases, on the particulars of > >>> RangeTracker > >>>>>>>>>>>>>>>>>>>>>>> > >>>>>>>>>>>>>>>>>>>>>> APIs > >>>>>>>>>>>>>>> > >>>>>>>>>>>>>>>> reconciling parallelization and checkpointing, what the > >>>>>>>>>>>>>>>>>>>>>>> > >>>>>>>>>>>>>>>>>>>>>> relation > >>>>>>>>>>>>>>> > >>>>>>>>>>>>>>>> between > >>>>>>>>>>>>>>>>>>>>>> > >>>>>>>>>>>>>>>>>>>>>>> SDF and DF should be, etc. They culminated in the > >>>>> current > >>>>>>>>>>>>>>>>>>>>>>> > >>>>>>>>>>>>>>>>>>>>>> proposal. > >>>>>>>>>>>>>>>>>>>> > >>>>>>>>>>>>>>>>>>>>> The > >>>>>>>>>>>>>>>>>>>>>> > >>>>>>>>>>>>>>>>>>>>>>> proposal comes at a time when a couple of key > >>>>> ingredients > >>>>>>>>>>>>>>>>>>>>>>> > >>>>>>>>>>>>>>>>>>>>>> are > >>>>>>>>>>>>>> > >>>>>>>>>>>>>>> (almost) > >>>>>>>>>>>>>>>>>>>>>> > >>>>>>>>>>>>>>>>>>>>>>> ready: NewDoFn to make SDF look like a regular > >>> DoFn, > >>>>> and > >>>>>>>> the > >>>>>>>>>>>>>>>>>>>>>>> > >>>>>>>>>>>>>>>>>>>>>> State/Timers > >>>>>>>>>>>>>>>>>>>>>> > >>>>>>>>>>>>>>>>>>>>>>> proposal to enable unbounded work per element. > >>>>>>>>>>>>>>>>>>>>>>> > >>>>>>>>>>>>>>>>>>>>>>> To put it shortly: > >>>>>>>>>>>>>>>>>>>>>>> - Yes, we will support existing Source > >> connectors, > >>>> and > >>>>>>>> will > >>>>>>>>>>>>>>>>>>>>>>> > >>>>>>>>>>>>>>>>>>>>>> support > >>>>>>>>>>>>>>>>>>>> > >>>>>>>>>>>>>>>>>>>>> writing new ones, possibly forever. There is no > >>>>>> interference > >>>>>>>>>>>>>>>>>>>>>>> > >>>>>>>>>>>>>>>>>>>>>> with > >>>>>>>>>>>>>>>> > >>>>>>>>>>>>>>>>> current > >>>>>>>>>>>>>>>>>>>>>> > >>>>>>>>>>>>>>>>>>>>>>> users of Source. > >>>>>>>>>>>>>>>>>>>>>>> - The new API is an attempt to improve the Source > >>>> API, > >>>>>>>> taken > >>>>>>>>>>>>>>>>>>>>>>> > >>>>>>>>>>>>>>>>>>>>>> to > >>>>>>>>>>>>>>> > >>>>>>>>>>>>>>>> its > >>>>>>>>>>>>>>>>>>>> > >>>>>>>>>>>>>>>>>>>>> logical limit where it turns out that users' goals > >>> can > >>>> be > >>>>>>>>>>>>>>>>>>>>>>> > >>>>>>>>>>>>>>>>>>>>>> accomplished > >>>>>>>>>>>>>>>>>>>>>> > >>>>>>>>>>>>>>>>>>>>>>> easier and more generically entirely within > >>> ParDo's. > >>>>>>>>>>>>>>>>>>>>>>> > >>>>>>>>>>>>>>>>>>>>>>> Let me know what you think, and thanks again! > >>>>>>>>>>>>>>>>>>>>>>> > >>>>>>>>>>>>>>>>>>>>>>> On Fri, Aug 5, 2016 at 2:39 AM Jean-Baptiste > >> Onofré > >>>>>>>>>>>>>>>>>>>>>>> > >>>>>>>>>>>>>>>>>>>>>> <[email protected] <javascript:;>> > >>>>>>>>>>>>>>>>>>>> > >>>>>>>>>>>>>>>>>>>>> wrote: > >>>>>>>>>>>>>>>>>>>>>>> > >>>>>>>>>>>>>>>>>>>>>>> Hi Eugene, > >>>>>>>>>>>>>>>>>>>>>>>> > >>>>>>>>>>>>>>>>>>>>>>>> Just a question: why is it in DoFn and note an > >>>>>>>> improvement > >>>>>>>>>>>>>>>>>>>>>>>> > >>>>>>>>>>>>>>>>>>>>>>> of > >>>>>>>>>>>>>> > >>>>>>>>>>>>>>> Source > >>>>>>>>>>>>>>>>>>>> > >>>>>>>>>>>>>>>>>>>>> ? > >>>>>>>>>>>>>>>>>>>>>> > >>>>>>>>>>>>>>>>>>>>>>> > >>>>>>>>>>>>>>>>>>>>>>>> If I understand correctly, it means that we will > >>>> have > >>>>> to > >>>>>>>>>>>>>>>>>>>>>>>> > >>>>>>>>>>>>>>>>>>>>>>> refactore > >>>>>>>>>>>>>>>>>>>> > >>>>>>>>>>>>>>>>>>>>> all > >>>>>>>>>>>>>>>>>>>>>> > >>>>>>>>>>>>>>>>>>>>>>> existing IO: basically, what you propose is to > >>> remove > >>>>> all > >>>>>>>>>>>>>>>>>>>>>>>> > >>>>>>>>>>>>>>>>>>>>>>> Source > >>>>>>>>>>>>>>>> > >>>>>>>>>>>>>>>>> to > >>>>>>>>>>>>>>>>>>>> > >>>>>>>>>>>>>>>>>>>>> replace with NewDoFn. > >>>>>>>>>>>>>>>>>>>>>>>> > >>>>>>>>>>>>>>>>>>>>>>>> I'm concern with this approach, especially in > >> term > >>>> of > >>>>>>>>>>>>>>>>>>>>>>>> > >>>>>>>>>>>>>>>>>>>>>>> timing: > >>>>>>>>>>>>>> > >>>>>>>>>>>>>>> clearly, > >>>>>>>>>>>>>>>>>>>>>> > >>>>>>>>>>>>>>>>>>>>>>> the IO is the area where we have to move forward > >> in > >>>>> Beam > >>>>>>>> as > >>>>>>>>>>>>>>>>>>>>>>>> > >>>>>>>>>>>>>>>>>>>>>>> it > >>>>>>>>>>>>>>> > >>>>>>>>>>>>>>>> will > >>>>>>>>>>>>>>>>>>>> > >>>>>>>>>>>>>>>>>>>>> allow new users to start in their projects. > >>>>>>>>>>>>>>>>>>>>>>>> So, we started to bring new IOs: Kafka, JMS, > >>>>> Cassandra, > >>>>>>>>>>>>>>>>>>>>>>>> > >>>>>>>>>>>>>>>>>>>>>>> MongoDB, > >>>>>>>>>>>>>>>> > >>>>>>>>>>>>>>>>> JDBC, > >>>>>>>>>>>>>>>>>>>>>> > >>>>>>>>>>>>>>>>>>>>>>> ... and some people started to learn the IO API > >>>>>>>>>>>>>>>>>>>>>>>> > >>>>>>>>>>>>>>>>>>>>>>> (Bounded/Unbouded > >>>>>>>>>>>>>>>> > >>>>>>>>>>>>>>>>> source, etc). > >>>>>>>>>>>>>>>>>>>>>>>> > >>>>>>>>>>>>>>>>>>>>>>>> I think it would make more sense to enhance the > >> IO > >>>> API > >>>>>>>>>>>>>>>>>>>>>>>> > >>>>>>>>>>>>>>>>>>>>>>> (Source) > >>>>>>>>>>>>>>> > >>>>>>>>>>>>>>>> instead > >>>>>>>>>>>>>>>>>>>>>> > >>>>>>>>>>>>>>>>>>>>>>> of introducing a NewDoFn. > >>>>>>>>>>>>>>>>>>>>>>>> > >>>>>>>>>>>>>>>>>>>>>>>> What are your thoughts for IO writer like me ? > >> ;) > >>>>>>>>>>>>>>>>>>>>>>>> > >>>>>>>>>>>>>>>>>>>>>>>> Regards > >>>>>>>>>>>>>>>>>>>>>>>> JB > >>>>>>>>>>>>>>>>>>>>>>>> > >>>>>>>>>>>>>>>>>>>>>>>> On 08/04/2016 07:45 PM, Eugene Kirpichov wrote: > >>>>>>>>>>>>>>>>>>>>>>>> > >>>>>>>>>>>>>>>>>>>>>>>>> Hello Beam community, > >>>>>>>>>>>>>>>>>>>>>>>>> > >>>>>>>>>>>>>>>>>>>>>>>>> We (myself, Daniel Mills and Robert Bradshaw) > >>> would > >>>>>> like > >>>>>>>>>>>>>>>>>>>>>>>>> > >>>>>>>>>>>>>>>>>>>>>>>> to > >>>>>>>>>>>>>> > >>>>>>>>>>>>>>> propose > >>>>>>>>>>>>>>>>>>>>>> > >>>>>>>>>>>>>>>>>>>>>>> "Splittable DoFn" - a major generalization of > >> DoFn, > >>>>> which > >>>>>>>>>>>>>>>>>>>>>>>>> > >>>>>>>>>>>>>>>>>>>>>>>> allows > >>>>>>>>>>>>>>>>>>>> > >>>>>>>>>>>>>>>>>>>>> processing > >>>>>>>>>>>>>>>>>>>>>>>> > >>>>>>>>>>>>>>>>>>>>>>>>> of a single element to be non-monolithic, i.e. > >>>>>>>>>>>>>>>>>>>>>>>>> > >>>>>>>>>>>>>>>>>>>>>>>> checkpointable > >>>>>>>>>>>>>>> > >>>>>>>>>>>>>>>> and > >>>>>>>>>>>>>>>>>>>> > >>>>>>>>>>>>>>>>>>>>> parallelizable, as well as doing an unbounded > >> amount > >>> of > >>>>>>>>>>>>>>>>>>>>>>>>> > >>>>>>>>>>>>>>>>>>>>>>>> work > >>>>>>>>>>>>>>> > >>>>>>>>>>>>>>>> per > >>>>>>>>>>>>>>>>>>>> > >>>>>>>>>>>>>>>>>>>>> element. > >>>>>>>>>>>>>>>>>>>>>>>> > >>>>>>>>>>>>>>>>>>>>>>>>> > >>>>>>>>>>>>>>>>>>>>>>>>> This allows effectively replacing the current > >>>>>>>>>>>>>>>>>>>>>>>>> > >>>>>>>>>>>>>>>>>>>>>>>> Bounded/UnboundedSource > >>>>>>>>>>>>>>>>>>>>>> > >>>>>>>>>>>>>>>>>>>>>>> APIs > >>>>>>>>>>>>>>>>>>>>>>>> > >>>>>>>>>>>>>>>>>>>>>>>>> with DoFn's that are much easier to code, more > >>>>> scalable > >>>>>>>>>>>>>>>>>>>>>>>>> > >>>>>>>>>>>>>>>>>>>>>>>> and > >>>>>>>>>>>>>> > >>>>>>>>>>>>>>> composable > >>>>>>>>>>>>>>>>>>>>>> > >>>>>>>>>>>>>>>>>>>>>>> with > >>>>>>>>>>>>>>>>>>>>>>>> > >>>>>>>>>>>>>>>>>>>>>>>>> the rest of the Beam programming model, and > >>> enables > >>>>>> many > >>>>>>>>>>>>>>>>>>>>>>>>> > >>>>>>>>>>>>>>>>>>>>>>>> use > >>>>>>>>>>>>>>> > >>>>>>>>>>>>>>>> cases > >>>>>>>>>>>>>>>>>>>> > >>>>>>>>>>>>>>>>>>>>> that > >>>>>>>>>>>>>>>>>>>>>> > >>>>>>>>>>>>>>>>>>>>>>> were previously difficult or impossible, as well > >> as > >>>>> some > >>>>>>>>>>>>>>>>>>>>>>>>> > >>>>>>>>>>>>>>>>>>>>>>>> non-obvious new > >>>>>>>>>>>>>>>>>>>>>> > >>>>>>>>>>>>>>>>>>>>>>> use cases. > >>>>>>>>>>>>>>>>>>>>>>>>> > >>>>>>>>>>>>>>>>>>>>>>>>> This proposal has been mentioned before in JIRA > >>>>>>>> [BEAM-65] > >>>>>>>>>>>>>>>>>>>>>>>>> > >>>>>>>>>>>>>>>>>>>>>>>> and > >>>>>>>>>>>>>>> > >>>>>>>>>>>>>>>> some > >>>>>>>>>>>>>>>>>>>> > >>>>>>>>>>>>>>>>>>>>> Beam > >>>>>>>>>>>>>>>>>>>>>> > >>>>>>>>>>>>>>>>>>>>>>> meetings, and now the whole thing is written up > >> in > >>> a > >>>>>>>>>>>>>>>>>>>>>>>>> > >>>>>>>>>>>>>>>>>>>>>>>> document: > >>>>>>>>>>>>>>>> > >>>>>>>>>>>>>>>>> > >>>>>>>>>>>>>>>>>>>>>>>>> https://s.apache.org/splittable-do-fn > >>>>>>>>>>>>>>>>>>>>>>>>> > >>>>>>>>>>>>>>>>>>>>>>>>> Here are some things that become possible with > >>>>>>>> Splittable > >>>>>>>>>>>>>>>>>>>>>>>>> > >>>>>>>>>>>>>>>>>>>>>>>> DoFn: > >>>>>>>>>>>>>>>> > >>>>>>>>>>>>>>>>> - Efficiently read a filepattern matching millions of > >>>>>>>>>>>>>>>>>>>>>>>>> > >>>>>>>>>>>>>>>>>>>>>>>> files > >>>>>>>>>>>>>> > >>>>>>>>>>>>>>> - Read a collection of files that are produced by an > >>>>>>>>>>>>>>>>>>>>>>>>> > >>>>>>>>>>>>>>>>>>>>>>>> earlier > >>>>>>>>>>>>>>> > >>>>>>>>>>>>>>>> step > >>>>>>>>>>>>>>>>>>>> > >>>>>>>>>>>>>>>>>>>>> in the > >>>>>>>>>>>>>>>>>>>>>> > >>>>>>>>>>>>>>>>>>>>>>> pipeline (e.g. easily implement a connector to a > >>>>> storage > >>>>>>>>>>>>>>>>>>>>>>>>> > >>>>>>>>>>>>>>>>>>>>>>>> system > >>>>>>>>>>>>>>>> > >>>>>>>>>>>>>>>>> that can > >>>>>>>>>>>>>>>>>>>>>> > >>>>>>>>>>>>>>>>>>>>>>> export itself to files) > >>>>>>>>>>>>>>>>>>>>>>>>> - Implement a Kafka reader by composing a "list > >>>>>>>>>>>>>>>>>>>>>>>>> > >>>>>>>>>>>>>>>>>>>>>>>> partitions" > >>>>>>>>>>>>>> > >>>>>>>>>>>>>>> DoFn > >>>>>>>>>>>>>>>>>>>> > >>>>>>>>>>>>>>>>>>>>> with a > >>>>>>>>>>>>>>>>>>>>>> > >>>>>>>>>>>>>>>>>>>>>>> DoFn that simply polls a consumer and outputs new > >>>>> records > >>>>>>>>>>>>>>>>>>>>>>>>> > >>>>>>>>>>>>>>>>>>>>>>>> in > >>>>>>>>>>>>>>> > >>>>>>>>>>>>>>>> a > >>>>>>>>>>>>>>>> > >>>>>>>>>>>>>>>>> while() > >>>>>>>>>>>>>>>>>>>>>> > >>>>>>>>>>>>>>>>>>>>>>> loop > >>>>>>>>>>>>>>>>>>>>>>>> > >>>>>>>>>>>>>>>>>>>>>>>>> - Implement a log tailer by composing a DoFn > >> that > >>>>>>>>>>>>>>>>>>>>>>>>> > >>>>>>>>>>>>>>>>>>>>>>>> incrementally > >>>>>>>>>>>>>>>> > >>>>>>>>>>>>>>>>> returns > >>>>>>>>>>>>>>>>>>>>>> > >>>>>>>>>>>>>>>
