Hi Aljoscha,

AFAIK, the effect of .requiresDeduping() is that the runner inserts a
GBK/dedup transform on top of the read. This seems entirely compatible with
SDF, except it will be decoupled from the SDF itself: if an SDF produces
output that potentially contains duplicates, and there's no easy way to fix
it in the SDF itself, and you (developer of the connector) would like to
eliminate them, you can explicitly compose the SDF with a canned deduping
transform. Does this address your question?

Thanks!

On Fri, Aug 5, 2016 at 7:14 AM Aljoscha Krettek <aljos...@apache.org> wrote:

> I really like the proposal, especially how it unifies at lot of things.
>
> One question: How would this work with sources that (right now) return true
> from UnboundedSource.requiresDeduping(). As I understand it the code that
> executes such sources has to do bookkeeping to ensure that we don't get
> duplicate values. Would we add such a feature for the output of DoFns or
> would we work towards removing the deduping functionality from Beam and
> push it into the source implementations?
>
> Cheers,
> Aljoscha
>
> On Fri, 5 Aug 2016 at 03:27 Jean-Baptiste Onofré <j...@nanthrax.net> wrote:
>
> > By the way I like the use cases you are introducing: we discussed about
> > similar use cases with Dan.
> >
> > Just wonder about the existing IO.
> >
> > Regards
> > JB
> >
> >
> > On August 4, 2016 7:46:14 PM Eugene Kirpichov
> > <kirpic...@google.com.INVALID> wrote:
> >
> > > Hello Beam community,
> > >
> > > We (myself, Daniel Mills and Robert Bradshaw) would like to propose
> > > "Splittable DoFn" - a major generalization of DoFn, which allows
> > processing
> > > of a single element to be non-monolithic, i.e. checkpointable and
> > > parallelizable, as well as doing an unbounded amount of work per
> element.
> > >
> > > This allows effectively replacing the current Bounded/UnboundedSource
> > APIs
> > > with DoFn's that are much easier to code, more scalable and composable
> > with
> > > the rest of the Beam programming model, and enables many use cases that
> > > were previously difficult or impossible, as well as some non-obvious
> new
> > > use cases.
> > >
> > > This proposal has been mentioned before in JIRA [BEAM-65] and some Beam
> > > meetings, and now the whole thing is written up in a document:
> > >
> > >         https://s.apache.org/splittable-do-fn
> > >
> > > Here are some things that become possible with Splittable DoFn:
> > > - Efficiently read a filepattern matching millions of files
> > > - Read a collection of files that are produced by an earlier step in
> the
> > > pipeline (e.g. easily implement a connector to a storage system that
> can
> > > export itself to files)
> > > - Implement a Kafka reader by composing a "list partitions" DoFn with a
> > > DoFn that simply polls a consumer and outputs new records in a while()
> > loop
> > > - Implement a log tailer by composing a DoFn that incrementally returns
> > new
> > > files in a directory and a DoFn that tails a file
> > > - Implement a parallel "count friends in common" algorithm (matrix
> > > squaring) with good work balancing
> > >
> > > Here is the meaningful part of a hypothetical Kafka reader written
> > against
> > > this API:
> > >
> > >     ProcessContinuation processElement(
> > >             ProcessContext context, OffsetRangeTracker tracker) {
> > >       try (KafkaConsumer<String, String> consumer =
> > >                 Kafka.subscribe(context.element().topic,
> > >                                 context.element().partition)) {
> > >         consumer.seek(tracker.start());
> > >         while (true) {
> > >           ConsumerRecords<String, String> records =
> consumer.poll(100ms);
> > >           if (records == null) return done();
> > >           for (ConsumerRecord<String, String> record : records) {
> > >             if (!tracker.tryClaim(record.offset())) {
> > >               return
> > resume().withFutureOutputWatermark(record.timestamp());
> > >             }
> > >             context.output(record);
> > >           }
> > >         }
> > >       }
> > >     }
> > >
> > > The document describes in detail the motivations behind this feature,
> the
> > > basic idea and API, open questions, and outlines an incremental
> delivery
> > > plan.
> > >
> > > The proposed API builds on the reflection-based new DoFn [new-do-fn]
> and
> > is
> > > loosely related to "State and Timers for DoFn" [beam-state].
> > >
> > > Please take a look and comment!
> > >
> > > Thanks.
> > >
> > > [BEAM-65] https://issues.apache.org/jira/browse/BEAM-65
> > > [new-do-fn] https://s.apache.org/a-new-do-fn
> > > [beam-state] https://s.apache.org/beam-state
> >
> >
> >
>

Reply via email to