Hi Unfortunately I will be in Ireland on August 15th. What about Friday 19th ?
Regards JB On Aug 11, 2016, 23:22, at 23:22, Eugene Kirpichov <[email protected]> wrote: >Hi JB, > >Sounds great, does the suggested time over videoconference work for >you? > >On Thu, Aug 11, 2016 at 11:59 AM Jean-Baptiste Onofré <[email protected]> >wrote: > >> Hi Eugene >> >> May we talk together next week ? I like the proposal. I would just >need >> some details for my understanding. >> >> Thanks >> Regards >> JB >> >> >> >> On Aug 11, 2016, 19:46, at 19:46, Eugene Kirpichov >> <[email protected]> wrote: >> >Hi JB, >> > >> >What are your thoughts on this? >> > >> >I'm also thinking of having a virtual meeting to explain more about >> >this >> >proposal if necessary, since I understand it is a lot to digest. >> > >> >How about: Monday Aug 15, 8am-9am Pacific time, over Hangouts? >> >(link: >> > >> >https://staging.talkgadget.google.com/hangouts/_/google.com/splittabledofn >> >- >> >I confirmed that it can be joined without being logged into a Google >> >account) >> > >> >Who'd be interested in attending, and does this time/date work for >> >people? >> > >> >On Fri, Aug 5, 2016 at 10:48 AM Eugene Kirpichov ><[email protected]> >> >wrote: >> > >> >> Hi JB, thanks for reading and for your comments! >> >> >> >> It sounds like you are concerned about continued support for >existing >> >IO's >> >> people have developed, and about backward compatibility? >> >> >> >> We do not need to remove the Source API, and all existing >> >Source-based >> >> connectors will continue to work [though the document proposes at >> >some >> >> point to make Read.from(Source) to translate to a wrapper SDF >under >> >the >> >> hood, to exercise the feature more and to make sure that it is >> >strictly >> >> more powerful - but this is an optional implementation detail]. >> >> >> >> Perhaps the document phrases this too strongly - "replacing the >> >Source >> >> API": a better phrasing would be "introducing a new API so >powerful >> >and >> >> easy-to-use that hopefully people will choose it over the Source >API >> >all >> >> the time, even though they don't have to" :) And we can discuss >> >whether or >> >> not to actually deprecate/remove the Source API at some point down >> >the >> >> road, once it becomes clear whether this is the case or not. >> >> >> >> To give more context: this proposal came out of discussions within >> >the SDK >> >> team over the past ~1.5 years, before the Beam project existed, on >> >how to >> >> make major improvements to the Source API; perhaps it will clarify >> >things >> >> if I give a history of the ideas discussed: >> >> - The first idea was to introduce a Read.from(PCollection<Source>) >> >> transform while keeping the Source API intact - this, given >> >appropriate >> >> implementation, would solve most of the scalability and >composability >> >> issues of IO's. Then most connectors would look like : ParDo<A, >> >Source<B>> >> >> + Read.from(). >> >> - Then we figured that the Source class is an unnecessary >> >abstraction, as >> >> it simply holds data. What if we only had a Reader<S, B> class >where >> >S is >> >> the source type and B the output type? Then connectors would be >> >something >> >> like: ParDo<A, S> + hypothetical Read.using(Reader<S, B>). >> >> - Then somebody remarked that some of the features of Source are >> >useful to >> >> ParDo's as well: e.g. ability to report progress when processing a >> >very >> >> heavy element, or ability to produce very large output in >parallel. >> >> - The two previous bullets were already hinting that the >Read.using() >> >> primitive might not be so special: it just takes S and produces B: >> >isn't >> >> that what a ParDo does, plus some source magic, minus the >convenience >> >of >> >> c.output() vs. the start/advance() state machine? >> >> - At this point it became clear that we should explore unifying >> >sources >> >> and ParDo's, in particular: can we bring the magic of sources to >> >ParDo's >> >> but without the limitations and coding inconveniences? And this is >> >how >> >> SplittableDoFn was born: bringing source magic to a DoFn by >providing >> >a >> >> RangeTracker. >> >> - Once the idea of "splittable DoFn's" was born, it became clear >that >> >it >> >> is strictly more general than sources; at least, in the respect >that >> >> sources have to produce output, while DoFn's don't: an SDF may >very >> >well >> >> produce no output at all, and simply perform a side effect in a >> >> parallel/resumable way. >> >> - Then there were countless hours of discussions on unifying the >> >> bounded/unbounded cases, on the particulars of RangeTracker APIs >> >> reconciling parallelization and checkpointing, what the relation >> >between >> >> SDF and DF should be, etc. They culminated in the current >proposal. >> >The >> >> proposal comes at a time when a couple of key ingredients are >> >(almost) >> >> ready: NewDoFn to make SDF look like a regular DoFn, and the >> >State/Timers >> >> proposal to enable unbounded work per element. >> >> >> >> To put it shortly: >> >> - Yes, we will support existing Source connectors, and will >support >> >> writing new ones, possibly forever. There is no interference with >> >current >> >> users of Source. >> >> - The new API is an attempt to improve the Source API, taken to >its >> >> logical limit where it turns out that users' goals can be >> >accomplished >> >> easier and more generically entirely within ParDo's. >> >> >> >> Let me know what you think, and thanks again! >> >> >> >> On Fri, Aug 5, 2016 at 2:39 AM Jean-Baptiste Onofré ><[email protected]> >> >> wrote: >> >> >> >>> Hi Eugene, >> >>> >> >>> Just a question: why is it in DoFn and note an improvement of >Source >> >? >> >>> >> >>> If I understand correctly, it means that we will have to >refactore >> >all >> >>> existing IO: basically, what you propose is to remove all Source >to >> >>> replace with NewDoFn. >> >>> >> >>> I'm concern with this approach, especially in term of timing: >> >clearly, >> >>> the IO is the area where we have to move forward in Beam as it >will >> >>> allow new users to start in their projects. >> >>> So, we started to bring new IOs: Kafka, JMS, Cassandra, MongoDB, >> >JDBC, >> >>> ... and some people started to learn the IO API (Bounded/Unbouded >> >>> source, etc). >> >>> >> >>> I think it would make more sense to enhance the IO API (Source) >> >instead >> >>> of introducing a NewDoFn. >> >>> >> >>> What are your thoughts for IO writer like me ? ;) >> >>> >> >>> Regards >> >>> JB >> >>> >> >>> On 08/04/2016 07:45 PM, Eugene Kirpichov wrote: >> >>> > Hello Beam community, >> >>> > >> >>> > We (myself, Daniel Mills and Robert Bradshaw) would like to >> >propose >> >>> > "Splittable DoFn" - a major generalization of DoFn, which >allows >> >>> processing >> >>> > of a single element to be non-monolithic, i.e. checkpointable >and >> >>> > parallelizable, as well as doing an unbounded amount of work >per >> >>> element. >> >>> > >> >>> > This allows effectively replacing the current >> >Bounded/UnboundedSource >> >>> APIs >> >>> > with DoFn's that are much easier to code, more scalable and >> >composable >> >>> with >> >>> > the rest of the Beam programming model, and enables many use >cases >> >that >> >>> > were previously difficult or impossible, as well as some >> >non-obvious new >> >>> > use cases. >> >>> > >> >>> > This proposal has been mentioned before in JIRA [BEAM-65] and >some >> >Beam >> >>> > meetings, and now the whole thing is written up in a document: >> >>> > >> >>> > https://s.apache.org/splittable-do-fn >> >>> > >> >>> > Here are some things that become possible with Splittable DoFn: >> >>> > - Efficiently read a filepattern matching millions of files >> >>> > - Read a collection of files that are produced by an earlier >step >> >in the >> >>> > pipeline (e.g. easily implement a connector to a storage system >> >that can >> >>> > export itself to files) >> >>> > - Implement a Kafka reader by composing a "list partitions" >DoFn >> >with a >> >>> > DoFn that simply polls a consumer and outputs new records in a >> >while() >> >>> loop >> >>> > - Implement a log tailer by composing a DoFn that incrementally >> >returns >> >>> new >> >>> > files in a directory and a DoFn that tails a file >> >>> > - Implement a parallel "count friends in common" algorithm >(matrix >> >>> > squaring) with good work balancing >> >>> > >> >>> > Here is the meaningful part of a hypothetical Kafka reader >written >> >>> against >> >>> > this API: >> >>> > >> >>> > ProcessContinuation processElement( >> >>> > ProcessContext context, OffsetRangeTracker tracker) >{ >> >>> > try (KafkaConsumer<String, String> consumer = >> >>> > Kafka.subscribe(context.element().topic, >> >>> > context.element().partition)) { >> >>> > consumer.seek(tracker.start()); >> >>> > while (true) { >> >>> > ConsumerRecords<String, String> records = >> >>> consumer.poll(100ms); >> >>> > if (records == null) return done(); >> >>> > for (ConsumerRecord<String, String> record : records) >{ >> >>> > if (!tracker.tryClaim(record.offset())) { >> >>> > return >> >>> resume().withFutureOutputWatermark(record.timestamp()); >> >>> > } >> >>> > context.output(record); >> >>> > } >> >>> > } >> >>> > } >> >>> > } >> >>> > >> >>> > The document describes in detail the motivations behind this >> >feature, >> >>> the >> >>> > basic idea and API, open questions, and outlines an incremental >> >delivery >> >>> > plan. >> >>> > >> >>> > The proposed API builds on the reflection-based new DoFn >> >[new-do-fn] >> >>> and is >> >>> > loosely related to "State and Timers for DoFn" [beam-state]. >> >>> > >> >>> > Please take a look and comment! >> >>> > >> >>> > Thanks. >> >>> > >> >>> > [BEAM-65] https://issues.apache.org/jira/browse/BEAM-65 >> >>> > [new-do-fn] https://s.apache.org/a-new-do-fn >> >>> > [beam-state] https://s.apache.org/beam-state >> >>> > >> >>> >> >>> -- >> >>> Jean-Baptiste Onofré >> >>> [email protected] >> >>> http://blog.nanthrax.net >> >>> Talend - http://www.talend.com >> >>> >> >> >>
