By the way I like the use cases you are introducing: we discussed about similar use cases with Dan.

Just wonder about the existing IO.


On August 4, 2016 7:46:14 PM Eugene Kirpichov <> wrote:

Hello Beam community,

We (myself, Daniel Mills and Robert Bradshaw) would like to propose
"Splittable DoFn" - a major generalization of DoFn, which allows processing
of a single element to be non-monolithic, i.e. checkpointable and
parallelizable, as well as doing an unbounded amount of work per element.

This allows effectively replacing the current Bounded/UnboundedSource APIs
with DoFn's that are much easier to code, more scalable and composable with
the rest of the Beam programming model, and enables many use cases that
were previously difficult or impossible, as well as some non-obvious new
use cases.

This proposal has been mentioned before in JIRA [BEAM-65] and some Beam
meetings, and now the whole thing is written up in a document:

Here are some things that become possible with Splittable DoFn:
- Efficiently read a filepattern matching millions of files
- Read a collection of files that are produced by an earlier step in the
pipeline (e.g. easily implement a connector to a storage system that can
export itself to files)
- Implement a Kafka reader by composing a "list partitions" DoFn with a
DoFn that simply polls a consumer and outputs new records in a while() loop
- Implement a log tailer by composing a DoFn that incrementally returns new
files in a directory and a DoFn that tails a file
- Implement a parallel "count friends in common" algorithm (matrix
squaring) with good work balancing

Here is the meaningful part of a hypothetical Kafka reader written against
this API:

    ProcessContinuation processElement(
            ProcessContext context, OffsetRangeTracker tracker) {
      try (KafkaConsumer<String, String> consumer =
                                context.element().partition)) {;
        while (true) {
          ConsumerRecords<String, String> records = consumer.poll(100ms);
          if (records == null) return done();
          for (ConsumerRecord<String, String> record : records) {
            if (!tracker.tryClaim(record.offset())) {
              return resume().withFutureOutputWatermark(record.timestamp());

The document describes in detail the motivations behind this feature, the
basic idea and API, open questions, and outlines an incremental delivery

The proposed API builds on the reflection-based new DoFn [new-do-fn] and is
loosely related to "State and Timers for DoFn" [beam-state].

Please take a look and comment!



Reply via email to