I really like the proposal, especially how it unifies at lot of things. One question: How would this work with sources that (right now) return true from UnboundedSource.requiresDeduping(). As I understand it the code that executes such sources has to do bookkeeping to ensure that we don't get duplicate values. Would we add such a feature for the output of DoFns or would we work towards removing the deduping functionality from Beam and push it into the source implementations?
Cheers, Aljoscha On Fri, 5 Aug 2016 at 03:27 Jean-Baptiste Onofré <[email protected]> wrote: > By the way I like the use cases you are introducing: we discussed about > similar use cases with Dan. > > Just wonder about the existing IO. > > Regards > JB > > > On August 4, 2016 7:46:14 PM Eugene Kirpichov > <[email protected]> wrote: > > > Hello Beam community, > > > > We (myself, Daniel Mills and Robert Bradshaw) would like to propose > > "Splittable DoFn" - a major generalization of DoFn, which allows > processing > > of a single element to be non-monolithic, i.e. checkpointable and > > parallelizable, as well as doing an unbounded amount of work per element. > > > > This allows effectively replacing the current Bounded/UnboundedSource > APIs > > with DoFn's that are much easier to code, more scalable and composable > with > > the rest of the Beam programming model, and enables many use cases that > > were previously difficult or impossible, as well as some non-obvious new > > use cases. > > > > This proposal has been mentioned before in JIRA [BEAM-65] and some Beam > > meetings, and now the whole thing is written up in a document: > > > > https://s.apache.org/splittable-do-fn > > > > Here are some things that become possible with Splittable DoFn: > > - Efficiently read a filepattern matching millions of files > > - Read a collection of files that are produced by an earlier step in the > > pipeline (e.g. easily implement a connector to a storage system that can > > export itself to files) > > - Implement a Kafka reader by composing a "list partitions" DoFn with a > > DoFn that simply polls a consumer and outputs new records in a while() > loop > > - Implement a log tailer by composing a DoFn that incrementally returns > new > > files in a directory and a DoFn that tails a file > > - Implement a parallel "count friends in common" algorithm (matrix > > squaring) with good work balancing > > > > Here is the meaningful part of a hypothetical Kafka reader written > against > > this API: > > > > ProcessContinuation processElement( > > ProcessContext context, OffsetRangeTracker tracker) { > > try (KafkaConsumer<String, String> consumer = > > Kafka.subscribe(context.element().topic, > > context.element().partition)) { > > consumer.seek(tracker.start()); > > while (true) { > > ConsumerRecords<String, String> records = consumer.poll(100ms); > > if (records == null) return done(); > > for (ConsumerRecord<String, String> record : records) { > > if (!tracker.tryClaim(record.offset())) { > > return > resume().withFutureOutputWatermark(record.timestamp()); > > } > > context.output(record); > > } > > } > > } > > } > > > > The document describes in detail the motivations behind this feature, the > > basic idea and API, open questions, and outlines an incremental delivery > > plan. > > > > The proposed API builds on the reflection-based new DoFn [new-do-fn] and > is > > loosely related to "State and Timers for DoFn" [beam-state]. > > > > Please take a look and comment! > > > > Thanks. > > > > [BEAM-65] https://issues.apache.org/jira/browse/BEAM-65 > > [new-do-fn] https://s.apache.org/a-new-do-fn > > [beam-state] https://s.apache.org/beam-state > > >
