I really like the proposal, especially how it unifies at lot of things.

One question: How would this work with sources that (right now) return true
from UnboundedSource.requiresDeduping(). As I understand it the code that
executes such sources has to do bookkeeping to ensure that we don't get
duplicate values. Would we add such a feature for the output of DoFns or
would we work towards removing the deduping functionality from Beam and
push it into the source implementations?

Cheers,
Aljoscha

On Fri, 5 Aug 2016 at 03:27 Jean-Baptiste Onofré <[email protected]> wrote:

> By the way I like the use cases you are introducing: we discussed about
> similar use cases with Dan.
>
> Just wonder about the existing IO.
>
> Regards
> JB
>
>
> On August 4, 2016 7:46:14 PM Eugene Kirpichov
> <[email protected]> wrote:
>
> > Hello Beam community,
> >
> > We (myself, Daniel Mills and Robert Bradshaw) would like to propose
> > "Splittable DoFn" - a major generalization of DoFn, which allows
> processing
> > of a single element to be non-monolithic, i.e. checkpointable and
> > parallelizable, as well as doing an unbounded amount of work per element.
> >
> > This allows effectively replacing the current Bounded/UnboundedSource
> APIs
> > with DoFn's that are much easier to code, more scalable and composable
> with
> > the rest of the Beam programming model, and enables many use cases that
> > were previously difficult or impossible, as well as some non-obvious new
> > use cases.
> >
> > This proposal has been mentioned before in JIRA [BEAM-65] and some Beam
> > meetings, and now the whole thing is written up in a document:
> >
> >         https://s.apache.org/splittable-do-fn
> >
> > Here are some things that become possible with Splittable DoFn:
> > - Efficiently read a filepattern matching millions of files
> > - Read a collection of files that are produced by an earlier step in the
> > pipeline (e.g. easily implement a connector to a storage system that can
> > export itself to files)
> > - Implement a Kafka reader by composing a "list partitions" DoFn with a
> > DoFn that simply polls a consumer and outputs new records in a while()
> loop
> > - Implement a log tailer by composing a DoFn that incrementally returns
> new
> > files in a directory and a DoFn that tails a file
> > - Implement a parallel "count friends in common" algorithm (matrix
> > squaring) with good work balancing
> >
> > Here is the meaningful part of a hypothetical Kafka reader written
> against
> > this API:
> >
> >     ProcessContinuation processElement(
> >             ProcessContext context, OffsetRangeTracker tracker) {
> >       try (KafkaConsumer<String, String> consumer =
> >                 Kafka.subscribe(context.element().topic,
> >                                 context.element().partition)) {
> >         consumer.seek(tracker.start());
> >         while (true) {
> >           ConsumerRecords<String, String> records = consumer.poll(100ms);
> >           if (records == null) return done();
> >           for (ConsumerRecord<String, String> record : records) {
> >             if (!tracker.tryClaim(record.offset())) {
> >               return
> resume().withFutureOutputWatermark(record.timestamp());
> >             }
> >             context.output(record);
> >           }
> >         }
> >       }
> >     }
> >
> > The document describes in detail the motivations behind this feature, the
> > basic idea and API, open questions, and outlines an incremental delivery
> > plan.
> >
> > The proposed API builds on the reflection-based new DoFn [new-do-fn] and
> is
> > loosely related to "State and Timers for DoFn" [beam-state].
> >
> > Please take a look and comment!
> >
> > Thanks.
> >
> > [BEAM-65] https://issues.apache.org/jira/browse/BEAM-65
> > [new-do-fn] https://s.apache.org/a-new-do-fn
> > [beam-state] https://s.apache.org/beam-state
>
>
>

Reply via email to