Thanks for the update Eugene. I will work on the spark runner with Amit.
Regards JB On Feb 7, 2017, 19:12, at 19:12, Eugene Kirpichov <[email protected]> wrote: >Hello, > >I'm almost done adding support for Splittable DoFn >http://s.apache.org/splittable-do-fn to Dataflow streaming runner*, and >very excited about that. There's only 1 PR ><https://github.com/apache/beam/pull/1898> remaining, plus enabling >some >tests. > >* (batch runner is much harder because it's not yet quite clear to me >how >to properly implement liquid sharding ><https://cloud.google.com/blog/big-data/2016/05/no-shard-left-behind-dynamic-work-rebalancing-in-google-cloud-dataflow> >with >SDF - and the current API is not ready for that yet) > >After implementing all the runner-agnostic parts of Splittable DoFn, I >found them quite easy to integrate into Dataflow streaming runner, and >I >think this means it should be easy to integrate into other runners too. > >====== Why it'd be cool ====== >The general benefits of SDF are well-described in the design doc >(linked >above). >As for right now - if we integrated SDF with all runners, it'd already >enable us to start greatly simplifying the code of existing streaming >connectors (CountingInput, Kafka, Pubsub, JMS) and writing new >connectors >(e.g. a really nice one to implement would be "directory watcher", that >continuously returns new files in a directory). > >As a teaser, here's the complete implementation of an "unbounded >counter" I >used for my test of Dataflow runner integration: > > class CountFn extends DoFn<String, String> { > @ProcessElement >public ProcessContinuation process(ProcessContext c, OffsetRangeTracker >tracker) { > for (int i = tracker.currentRestriction().getFrom(); >tracker.tryClaim(i); ++i) c.output(i); > return resume(); > } > > @GetInitialRestriction > public OffsetRange getInitialRange(String element) { return new >OffsetRange(0, Integer.MAX_VALUE); } > > @NewTracker > public OffsetRangeTracker newTracker(OffsetRange range) { return new >OffsetRangeTracker(range); } > } > >====== What I'm asking ====== >So, I'd like to ask for help integrating SDF into Spark, Flink and Apex >runners from people who are intimately familiar with them - >specifically, I >was hoping best-case I could nerd-snipe some of you into taking over >the >integration of SDF with your favorite runner ;) > >The proper set of people seems to be +Aljoscha Krettek ><[email protected]> +Maximilian Michels ><[email protected]> >[email protected] <[email protected]> +Amit Sela ><[email protected]> +Thomas >Weise unless I forgot somebody. > >Average-case, I was looking for runner-specific guidance on how to do >it >myself. > >====== If you want to help ====== >If somebody decides to take this over, in my absence (I'll be mostly >gone >for ~the next month)., the best people to ask for implementation >advice are +Kenn >Knowles <[email protected]> and +Daniel Mills <[email protected]> . > >For reference, here's how SDF is implemented in the direct runner: >- Direct runner overrides ><https://github.com/apache/beam/blob/0616245e654c60ae94cc2c188f857b74a62d9b24/runners/direct-java/src/main/java/org/apache/beam/runners/direct/ParDoMultiOverrideFactory.java> > ParDo.of() for a splittable DoFn and replaces it with SplittableParDo ><https://github.com/apache/beam/blob/master/runners/core-java/src/main/java/org/apache/beam/runners/core/SplittableParDo.java> >(common >transform expansion) >- SplittableParDo uses two runner-specific primitive transforms: >"GBKIntoKeyedWorkItems" and "SplittableProcessElements". Direct runner >overrides the first one like this ><https://github.com/apache/beam/blob/cc28f0cb4c44169f933475ae29a32599024d3a1f/runners/direct-java/src/main/java/org/apache/beam/runners/direct/DirectGBKIntoKeyedWorkItemsOverrideFactory.java>, >and directly implements evaluation of the second one like this ><https://github.com/apache/beam/blob/cc28f0cb4c44169f933475ae29a32599024d3a1f/runners/direct-java/src/main/java/org/apache/beam/runners/direct/SplittableProcessElementsEvaluatorFactory.java>, >using runner hooks introduced in this PR ><https://github.com/apache/beam/pull/1824>. At the core of the hooks is >"ProcessFn" which is like a regular DoFn but has to be prepared at >runtime >with some hooks (state, timers, and runner access to >RestrictionTracker) >before you invoke it. I added a convenience implementation of the hook >mimicking behavior of UnboundedSource. >- The relevant runner-agnostic tests are in SplittableDoFnTest ><https://github.com/apache/beam/blob/cc28f0cb4c44169f933475ae29a32599024d3a1f/sdks/java/core/src/test/java/org/apache/beam/sdk/transforms/SplittableDoFnTest.java> >. > >That's all it takes, really - the runner has to implement these two >transforms. When I looked at Spark and Flink runners, it was not quite >clear to me how to implement the GBKIntoKeyedWorkItems transform, e.g. >Spark runner currently doesn't use KeyedWorkItem at all - but it seems >definitely possible. > >Thanks!
