Jip, thanks, that answers it.

On Fri, 5 Aug 2016 at 19:51 Eugene Kirpichov <kirpic...@google.com.invalid>
wrote:

> Hi Aljoscha,
>
> AFAIK, the effect of .requiresDeduping() is that the runner inserts a
> GBK/dedup transform on top of the read. This seems entirely compatible with
> SDF, except it will be decoupled from the SDF itself: if an SDF produces
> output that potentially contains duplicates, and there's no easy way to fix
> it in the SDF itself, and you (developer of the connector) would like to
> eliminate them, you can explicitly compose the SDF with a canned deduping
> transform. Does this address your question?
>
> Thanks!
>
> On Fri, Aug 5, 2016 at 7:14 AM Aljoscha Krettek <aljos...@apache.org>
> wrote:
>
> > I really like the proposal, especially how it unifies at lot of things.
> >
> > One question: How would this work with sources that (right now) return
> true
> > from UnboundedSource.requiresDeduping(). As I understand it the code that
> > executes such sources has to do bookkeeping to ensure that we don't get
> > duplicate values. Would we add such a feature for the output of DoFns or
> > would we work towards removing the deduping functionality from Beam and
> > push it into the source implementations?
> >
> > Cheers,
> > Aljoscha
> >
> > On Fri, 5 Aug 2016 at 03:27 Jean-Baptiste Onofré <j...@nanthrax.net>
> wrote:
> >
> > > By the way I like the use cases you are introducing: we discussed about
> > > similar use cases with Dan.
> > >
> > > Just wonder about the existing IO.
> > >
> > > Regards
> > > JB
> > >
> > >
> > > On August 4, 2016 7:46:14 PM Eugene Kirpichov
> > > <kirpic...@google.com.INVALID> wrote:
> > >
> > > > Hello Beam community,
> > > >
> > > > We (myself, Daniel Mills and Robert Bradshaw) would like to propose
> > > > "Splittable DoFn" - a major generalization of DoFn, which allows
> > > processing
> > > > of a single element to be non-monolithic, i.e. checkpointable and
> > > > parallelizable, as well as doing an unbounded amount of work per
> > element.
> > > >
> > > > This allows effectively replacing the current Bounded/UnboundedSource
> > > APIs
> > > > with DoFn's that are much easier to code, more scalable and
> composable
> > > with
> > > > the rest of the Beam programming model, and enables many use cases
> that
> > > > were previously difficult or impossible, as well as some non-obvious
> > new
> > > > use cases.
> > > >
> > > > This proposal has been mentioned before in JIRA [BEAM-65] and some
> Beam
> > > > meetings, and now the whole thing is written up in a document:
> > > >
> > > >         https://s.apache.org/splittable-do-fn
> > > >
> > > > Here are some things that become possible with Splittable DoFn:
> > > > - Efficiently read a filepattern matching millions of files
> > > > - Read a collection of files that are produced by an earlier step in
> > the
> > > > pipeline (e.g. easily implement a connector to a storage system that
> > can
> > > > export itself to files)
> > > > - Implement a Kafka reader by composing a "list partitions" DoFn
> with a
> > > > DoFn that simply polls a consumer and outputs new records in a
> while()
> > > loop
> > > > - Implement a log tailer by composing a DoFn that incrementally
> returns
> > > new
> > > > files in a directory and a DoFn that tails a file
> > > > - Implement a parallel "count friends in common" algorithm (matrix
> > > > squaring) with good work balancing
> > > >
> > > > Here is the meaningful part of a hypothetical Kafka reader written
> > > against
> > > > this API:
> > > >
> > > >     ProcessContinuation processElement(
> > > >             ProcessContext context, OffsetRangeTracker tracker) {
> > > >       try (KafkaConsumer<String, String> consumer =
> > > >                 Kafka.subscribe(context.element().topic,
> > > >                                 context.element().partition)) {
> > > >         consumer.seek(tracker.start());
> > > >         while (true) {
> > > >           ConsumerRecords<String, String> records =
> > consumer.poll(100ms);
> > > >           if (records == null) return done();
> > > >           for (ConsumerRecord<String, String> record : records) {
> > > >             if (!tracker.tryClaim(record.offset())) {
> > > >               return
> > > resume().withFutureOutputWatermark(record.timestamp());
> > > >             }
> > > >             context.output(record);
> > > >           }
> > > >         }
> > > >       }
> > > >     }
> > > >
> > > > The document describes in detail the motivations behind this feature,
> > the
> > > > basic idea and API, open questions, and outlines an incremental
> > delivery
> > > > plan.
> > > >
> > > > The proposed API builds on the reflection-based new DoFn [new-do-fn]
> > and
> > > is
> > > > loosely related to "State and Timers for DoFn" [beam-state].
> > > >
> > > > Please take a look and comment!
> > > >
> > > > Thanks.
> > > >
> > > > [BEAM-65] https://issues.apache.org/jira/browse/BEAM-65
> > > > [new-do-fn] https://s.apache.org/a-new-do-fn
> > > > [beam-state] https://s.apache.org/beam-state
> > >
> > >
> > >
> >
>

Reply via email to