Jip, thanks, that answers it. On Fri, 5 Aug 2016 at 19:51 Eugene Kirpichov <kirpic...@google.com.invalid> wrote:
> Hi Aljoscha, > > AFAIK, the effect of .requiresDeduping() is that the runner inserts a > GBK/dedup transform on top of the read. This seems entirely compatible with > SDF, except it will be decoupled from the SDF itself: if an SDF produces > output that potentially contains duplicates, and there's no easy way to fix > it in the SDF itself, and you (developer of the connector) would like to > eliminate them, you can explicitly compose the SDF with a canned deduping > transform. Does this address your question? > > Thanks! > > On Fri, Aug 5, 2016 at 7:14 AM Aljoscha Krettek <aljos...@apache.org> > wrote: > > > I really like the proposal, especially how it unifies at lot of things. > > > > One question: How would this work with sources that (right now) return > true > > from UnboundedSource.requiresDeduping(). As I understand it the code that > > executes such sources has to do bookkeeping to ensure that we don't get > > duplicate values. Would we add such a feature for the output of DoFns or > > would we work towards removing the deduping functionality from Beam and > > push it into the source implementations? > > > > Cheers, > > Aljoscha > > > > On Fri, 5 Aug 2016 at 03:27 Jean-Baptiste Onofré <j...@nanthrax.net> > wrote: > > > > > By the way I like the use cases you are introducing: we discussed about > > > similar use cases with Dan. > > > > > > Just wonder about the existing IO. > > > > > > Regards > > > JB > > > > > > > > > On August 4, 2016 7:46:14 PM Eugene Kirpichov > > > <kirpic...@google.com.INVALID> wrote: > > > > > > > Hello Beam community, > > > > > > > > We (myself, Daniel Mills and Robert Bradshaw) would like to propose > > > > "Splittable DoFn" - a major generalization of DoFn, which allows > > > processing > > > > of a single element to be non-monolithic, i.e. checkpointable and > > > > parallelizable, as well as doing an unbounded amount of work per > > element. > > > > > > > > This allows effectively replacing the current Bounded/UnboundedSource > > > APIs > > > > with DoFn's that are much easier to code, more scalable and > composable > > > with > > > > the rest of the Beam programming model, and enables many use cases > that > > > > were previously difficult or impossible, as well as some non-obvious > > new > > > > use cases. > > > > > > > > This proposal has been mentioned before in JIRA [BEAM-65] and some > Beam > > > > meetings, and now the whole thing is written up in a document: > > > > > > > > https://s.apache.org/splittable-do-fn > > > > > > > > Here are some things that become possible with Splittable DoFn: > > > > - Efficiently read a filepattern matching millions of files > > > > - Read a collection of files that are produced by an earlier step in > > the > > > > pipeline (e.g. easily implement a connector to a storage system that > > can > > > > export itself to files) > > > > - Implement a Kafka reader by composing a "list partitions" DoFn > with a > > > > DoFn that simply polls a consumer and outputs new records in a > while() > > > loop > > > > - Implement a log tailer by composing a DoFn that incrementally > returns > > > new > > > > files in a directory and a DoFn that tails a file > > > > - Implement a parallel "count friends in common" algorithm (matrix > > > > squaring) with good work balancing > > > > > > > > Here is the meaningful part of a hypothetical Kafka reader written > > > against > > > > this API: > > > > > > > > ProcessContinuation processElement( > > > > ProcessContext context, OffsetRangeTracker tracker) { > > > > try (KafkaConsumer<String, String> consumer = > > > > Kafka.subscribe(context.element().topic, > > > > context.element().partition)) { > > > > consumer.seek(tracker.start()); > > > > while (true) { > > > > ConsumerRecords<String, String> records = > > consumer.poll(100ms); > > > > if (records == null) return done(); > > > > for (ConsumerRecord<String, String> record : records) { > > > > if (!tracker.tryClaim(record.offset())) { > > > > return > > > resume().withFutureOutputWatermark(record.timestamp()); > > > > } > > > > context.output(record); > > > > } > > > > } > > > > } > > > > } > > > > > > > > The document describes in detail the motivations behind this feature, > > the > > > > basic idea and API, open questions, and outlines an incremental > > delivery > > > > plan. > > > > > > > > The proposed API builds on the reflection-based new DoFn [new-do-fn] > > and > > > is > > > > loosely related to "State and Timers for DoFn" [beam-state]. > > > > > > > > Please take a look and comment! > > > > > > > > Thanks. > > > > > > > > [BEAM-65] https://issues.apache.org/jira/browse/BEAM-65 > > > > [new-do-fn] https://s.apache.org/a-new-do-fn > > > > [beam-state] https://s.apache.org/beam-state > > > > > > > > > > > >