Sounds good, thanks! Then Friday Aug 19th it is, 8am-9am PST, https://staging.talkgadget.google.com/hangouts/_/google.com/splittabledofn
On Thu, Aug 11, 2016 at 11:12 PM Jean-Baptiste Onofré <[email protected]> wrote: > Hi > > Unfortunately I will be in Ireland on August 15th. What about Friday 19th ? > > Regards > JB > > > > On Aug 11, 2016, 23:22, at 23:22, Eugene Kirpichov > <[email protected]> wrote: > >Hi JB, > > > >Sounds great, does the suggested time over videoconference work for > >you? > > > >On Thu, Aug 11, 2016 at 11:59 AM Jean-Baptiste Onofré <[email protected]> > >wrote: > > > >> Hi Eugene > >> > >> May we talk together next week ? I like the proposal. I would just > >need > >> some details for my understanding. > >> > >> Thanks > >> Regards > >> JB > >> > >> > >> > >> On Aug 11, 2016, 19:46, at 19:46, Eugene Kirpichov > >> <[email protected]> wrote: > >> >Hi JB, > >> > > >> >What are your thoughts on this? > >> > > >> >I'm also thinking of having a virtual meeting to explain more about > >> >this > >> >proposal if necessary, since I understand it is a lot to digest. > >> > > >> >How about: Monday Aug 15, 8am-9am Pacific time, over Hangouts? > >> >(link: > >> > > >> > > > https://staging.talkgadget.google.com/hangouts/_/google.com/splittabledofn > >> >- > >> >I confirmed that it can be joined without being logged into a Google > >> >account) > >> > > >> >Who'd be interested in attending, and does this time/date work for > >> >people? > >> > > >> >On Fri, Aug 5, 2016 at 10:48 AM Eugene Kirpichov > ><[email protected]> > >> >wrote: > >> > > >> >> Hi JB, thanks for reading and for your comments! > >> >> > >> >> It sounds like you are concerned about continued support for > >existing > >> >IO's > >> >> people have developed, and about backward compatibility? > >> >> > >> >> We do not need to remove the Source API, and all existing > >> >Source-based > >> >> connectors will continue to work [though the document proposes at > >> >some > >> >> point to make Read.from(Source) to translate to a wrapper SDF > >under > >> >the > >> >> hood, to exercise the feature more and to make sure that it is > >> >strictly > >> >> more powerful - but this is an optional implementation detail]. > >> >> > >> >> Perhaps the document phrases this too strongly - "replacing the > >> >Source > >> >> API": a better phrasing would be "introducing a new API so > >powerful > >> >and > >> >> easy-to-use that hopefully people will choose it over the Source > >API > >> >all > >> >> the time, even though they don't have to" :) And we can discuss > >> >whether or > >> >> not to actually deprecate/remove the Source API at some point down > >> >the > >> >> road, once it becomes clear whether this is the case or not. > >> >> > >> >> To give more context: this proposal came out of discussions within > >> >the SDK > >> >> team over the past ~1.5 years, before the Beam project existed, on > >> >how to > >> >> make major improvements to the Source API; perhaps it will clarify > >> >things > >> >> if I give a history of the ideas discussed: > >> >> - The first idea was to introduce a Read.from(PCollection<Source>) > >> >> transform while keeping the Source API intact - this, given > >> >appropriate > >> >> implementation, would solve most of the scalability and > >composability > >> >> issues of IO's. Then most connectors would look like : ParDo<A, > >> >Source<B>> > >> >> + Read.from(). > >> >> - Then we figured that the Source class is an unnecessary > >> >abstraction, as > >> >> it simply holds data. What if we only had a Reader<S, B> class > >where > >> >S is > >> >> the source type and B the output type? Then connectors would be > >> >something > >> >> like: ParDo<A, S> + hypothetical Read.using(Reader<S, B>). > >> >> - Then somebody remarked that some of the features of Source are > >> >useful to > >> >> ParDo's as well: e.g. ability to report progress when processing a > >> >very > >> >> heavy element, or ability to produce very large output in > >parallel. > >> >> - The two previous bullets were already hinting that the > >Read.using() > >> >> primitive might not be so special: it just takes S and produces B: > >> >isn't > >> >> that what a ParDo does, plus some source magic, minus the > >convenience > >> >of > >> >> c.output() vs. the start/advance() state machine? > >> >> - At this point it became clear that we should explore unifying > >> >sources > >> >> and ParDo's, in particular: can we bring the magic of sources to > >> >ParDo's > >> >> but without the limitations and coding inconveniences? And this is > >> >how > >> >> SplittableDoFn was born: bringing source magic to a DoFn by > >providing > >> >a > >> >> RangeTracker. > >> >> - Once the idea of "splittable DoFn's" was born, it became clear > >that > >> >it > >> >> is strictly more general than sources; at least, in the respect > >that > >> >> sources have to produce output, while DoFn's don't: an SDF may > >very > >> >well > >> >> produce no output at all, and simply perform a side effect in a > >> >> parallel/resumable way. > >> >> - Then there were countless hours of discussions on unifying the > >> >> bounded/unbounded cases, on the particulars of RangeTracker APIs > >> >> reconciling parallelization and checkpointing, what the relation > >> >between > >> >> SDF and DF should be, etc. They culminated in the current > >proposal. > >> >The > >> >> proposal comes at a time when a couple of key ingredients are > >> >(almost) > >> >> ready: NewDoFn to make SDF look like a regular DoFn, and the > >> >State/Timers > >> >> proposal to enable unbounded work per element. > >> >> > >> >> To put it shortly: > >> >> - Yes, we will support existing Source connectors, and will > >support > >> >> writing new ones, possibly forever. There is no interference with > >> >current > >> >> users of Source. > >> >> - The new API is an attempt to improve the Source API, taken to > >its > >> >> logical limit where it turns out that users' goals can be > >> >accomplished > >> >> easier and more generically entirely within ParDo's. > >> >> > >> >> Let me know what you think, and thanks again! > >> >> > >> >> On Fri, Aug 5, 2016 at 2:39 AM Jean-Baptiste Onofré > ><[email protected]> > >> >> wrote: > >> >> > >> >>> Hi Eugene, > >> >>> > >> >>> Just a question: why is it in DoFn and note an improvement of > >Source > >> >? > >> >>> > >> >>> If I understand correctly, it means that we will have to > >refactore > >> >all > >> >>> existing IO: basically, what you propose is to remove all Source > >to > >> >>> replace with NewDoFn. > >> >>> > >> >>> I'm concern with this approach, especially in term of timing: > >> >clearly, > >> >>> the IO is the area where we have to move forward in Beam as it > >will > >> >>> allow new users to start in their projects. > >> >>> So, we started to bring new IOs: Kafka, JMS, Cassandra, MongoDB, > >> >JDBC, > >> >>> ... and some people started to learn the IO API (Bounded/Unbouded > >> >>> source, etc). > >> >>> > >> >>> I think it would make more sense to enhance the IO API (Source) > >> >instead > >> >>> of introducing a NewDoFn. > >> >>> > >> >>> What are your thoughts for IO writer like me ? ;) > >> >>> > >> >>> Regards > >> >>> JB > >> >>> > >> >>> On 08/04/2016 07:45 PM, Eugene Kirpichov wrote: > >> >>> > Hello Beam community, > >> >>> > > >> >>> > We (myself, Daniel Mills and Robert Bradshaw) would like to > >> >propose > >> >>> > "Splittable DoFn" - a major generalization of DoFn, which > >allows > >> >>> processing > >> >>> > of a single element to be non-monolithic, i.e. checkpointable > >and > >> >>> > parallelizable, as well as doing an unbounded amount of work > >per > >> >>> element. > >> >>> > > >> >>> > This allows effectively replacing the current > >> >Bounded/UnboundedSource > >> >>> APIs > >> >>> > with DoFn's that are much easier to code, more scalable and > >> >composable > >> >>> with > >> >>> > the rest of the Beam programming model, and enables many use > >cases > >> >that > >> >>> > were previously difficult or impossible, as well as some > >> >non-obvious new > >> >>> > use cases. > >> >>> > > >> >>> > This proposal has been mentioned before in JIRA [BEAM-65] and > >some > >> >Beam > >> >>> > meetings, and now the whole thing is written up in a document: > >> >>> > > >> >>> > https://s.apache.org/splittable-do-fn > >> >>> > > >> >>> > Here are some things that become possible with Splittable DoFn: > >> >>> > - Efficiently read a filepattern matching millions of files > >> >>> > - Read a collection of files that are produced by an earlier > >step > >> >in the > >> >>> > pipeline (e.g. easily implement a connector to a storage system > >> >that can > >> >>> > export itself to files) > >> >>> > - Implement a Kafka reader by composing a "list partitions" > >DoFn > >> >with a > >> >>> > DoFn that simply polls a consumer and outputs new records in a > >> >while() > >> >>> loop > >> >>> > - Implement a log tailer by composing a DoFn that incrementally > >> >returns > >> >>> new > >> >>> > files in a directory and a DoFn that tails a file > >> >>> > - Implement a parallel "count friends in common" algorithm > >(matrix > >> >>> > squaring) with good work balancing > >> >>> > > >> >>> > Here is the meaningful part of a hypothetical Kafka reader > >written > >> >>> against > >> >>> > this API: > >> >>> > > >> >>> > ProcessContinuation processElement( > >> >>> > ProcessContext context, OffsetRangeTracker tracker) > >{ > >> >>> > try (KafkaConsumer<String, String> consumer = > >> >>> > Kafka.subscribe(context.element().topic, > >> >>> > context.element().partition)) { > >> >>> > consumer.seek(tracker.start()); > >> >>> > while (true) { > >> >>> > ConsumerRecords<String, String> records = > >> >>> consumer.poll(100ms); > >> >>> > if (records == null) return done(); > >> >>> > for (ConsumerRecord<String, String> record : records) > >{ > >> >>> > if (!tracker.tryClaim(record.offset())) { > >> >>> > return > >> >>> resume().withFutureOutputWatermark(record.timestamp()); > >> >>> > } > >> >>> > context.output(record); > >> >>> > } > >> >>> > } > >> >>> > } > >> >>> > } > >> >>> > > >> >>> > The document describes in detail the motivations behind this > >> >feature, > >> >>> the > >> >>> > basic idea and API, open questions, and outlines an incremental > >> >delivery > >> >>> > plan. > >> >>> > > >> >>> > The proposed API builds on the reflection-based new DoFn > >> >[new-do-fn] > >> >>> and is > >> >>> > loosely related to "State and Timers for DoFn" [beam-state]. > >> >>> > > >> >>> > Please take a look and comment! > >> >>> > > >> >>> > Thanks. > >> >>> > > >> >>> > [BEAM-65] https://issues.apache.org/jira/browse/BEAM-65 > >> >>> > [new-do-fn] https://s.apache.org/a-new-do-fn > >> >>> > [beam-state] https://s.apache.org/beam-state > >> >>> > > >> >>> > >> >>> -- > >> >>> Jean-Baptiste Onofré > >> >>> [email protected] > >> >>> http://blog.nanthrax.net > >> >>> Talend - http://www.talend.com > >> >>> > >> >> > >> >
