Thanks! Looking forward to this work. On Wed, Feb 8, 2017 at 3:50 AM Jean-Baptiste Onofré <[email protected]> wrote:
> Thanks for the update Eugene. > > I will work on the spark runner with Amit. > > Regards > JB > > On Feb 7, 2017, 19:12, at 19:12, Eugene Kirpichov > <[email protected]> wrote: > >Hello, > > > >I'm almost done adding support for Splittable DoFn > >http://s.apache.org/splittable-do-fn to Dataflow streaming runner*, and > >very excited about that. There's only 1 PR > ><https://github.com/apache/beam/pull/1898> remaining, plus enabling > >some > >tests. > > > >* (batch runner is much harder because it's not yet quite clear to me > >how > >to properly implement liquid sharding > >< > https://cloud.google.com/blog/big-data/2016/05/no-shard-left-behind-dynamic-work-rebalancing-in-google-cloud-dataflow > > > >with > >SDF - and the current API is not ready for that yet) > > > >After implementing all the runner-agnostic parts of Splittable DoFn, I > >found them quite easy to integrate into Dataflow streaming runner, and > >I > >think this means it should be easy to integrate into other runners too. > > > >====== Why it'd be cool ====== > >The general benefits of SDF are well-described in the design doc > >(linked > >above). > >As for right now - if we integrated SDF with all runners, it'd already > >enable us to start greatly simplifying the code of existing streaming > >connectors (CountingInput, Kafka, Pubsub, JMS) and writing new > >connectors > >(e.g. a really nice one to implement would be "directory watcher", that > >continuously returns new files in a directory). > > > >As a teaser, here's the complete implementation of an "unbounded > >counter" I > >used for my test of Dataflow runner integration: > > > > class CountFn extends DoFn<String, String> { > > @ProcessElement > >public ProcessContinuation process(ProcessContext c, OffsetRangeTracker > >tracker) { > > for (int i = tracker.currentRestriction().getFrom(); > >tracker.tryClaim(i); ++i) c.output(i); > > return resume(); > > } > > > > @GetInitialRestriction > > public OffsetRange getInitialRange(String element) { return new > >OffsetRange(0, Integer.MAX_VALUE); } > > > > @NewTracker > > public OffsetRangeTracker newTracker(OffsetRange range) { return new > >OffsetRangeTracker(range); } > > } > > > >====== What I'm asking ====== > >So, I'd like to ask for help integrating SDF into Spark, Flink and Apex > >runners from people who are intimately familiar with them - > >specifically, I > >was hoping best-case I could nerd-snipe some of you into taking over > >the > >integration of SDF with your favorite runner ;) > > > >The proper set of people seems to be +Aljoscha Krettek > ><[email protected]> +Maximilian Michels > ><[email protected]> > >[email protected] <[email protected]> +Amit Sela > ><[email protected]> +Thomas > >Weise unless I forgot somebody. > > > >Average-case, I was looking for runner-specific guidance on how to do > >it > >myself. > > > >====== If you want to help ====== > >If somebody decides to take this over, in my absence (I'll be mostly > >gone > >for ~the next month)., the best people to ask for implementation > >advice are +Kenn > >Knowles <[email protected]> and +Daniel Mills <[email protected]> . > > > >For reference, here's how SDF is implemented in the direct runner: > >- Direct runner overrides > >< > https://github.com/apache/beam/blob/0616245e654c60ae94cc2c188f857b74a62d9b24/runners/direct-java/src/main/java/org/apache/beam/runners/direct/ParDoMultiOverrideFactory.java > > > > ParDo.of() for a splittable DoFn and replaces it with SplittableParDo > >< > https://github.com/apache/beam/blob/master/runners/core-java/src/main/java/org/apache/beam/runners/core/SplittableParDo.java > > > >(common > >transform expansion) > >- SplittableParDo uses two runner-specific primitive transforms: > >"GBKIntoKeyedWorkItems" and "SplittableProcessElements". Direct runner > >overrides the first one like this > >< > https://github.com/apache/beam/blob/cc28f0cb4c44169f933475ae29a32599024d3a1f/runners/direct-java/src/main/java/org/apache/beam/runners/direct/DirectGBKIntoKeyedWorkItemsOverrideFactory.java > >, > >and directly implements evaluation of the second one like this > >< > https://github.com/apache/beam/blob/cc28f0cb4c44169f933475ae29a32599024d3a1f/runners/direct-java/src/main/java/org/apache/beam/runners/direct/SplittableProcessElementsEvaluatorFactory.java > >, > >using runner hooks introduced in this PR > ><https://github.com/apache/beam/pull/1824>. At the core of the hooks is > >"ProcessFn" which is like a regular DoFn but has to be prepared at > >runtime > >with some hooks (state, timers, and runner access to > >RestrictionTracker) > >before you invoke it. I added a convenience implementation of the hook > >mimicking behavior of UnboundedSource. > >- The relevant runner-agnostic tests are in SplittableDoFnTest > >< > https://github.com/apache/beam/blob/cc28f0cb4c44169f933475ae29a32599024d3a1f/sdks/java/core/src/test/java/org/apache/beam/sdk/transforms/SplittableDoFnTest.java > > > >. > > > >That's all it takes, really - the runner has to implement these two > >transforms. When I looked at Spark and Flink runners, it was not quite > >clear to me how to implement the GBKIntoKeyedWorkItems transform, e.g. > >Spark runner currently doesn't use KeyedWorkItem at all - but it seems > >definitely possible. > > > >Thanks! >
