Thanks! Looking forward to this work.

On Wed, Feb 8, 2017 at 3:50 AM Jean-Baptiste Onofré <[email protected]> wrote:

> Thanks for the update Eugene.
>
> I will work on the spark runner with Amit.
>
> Regards
> JB
>
> On Feb 7, 2017, 19:12, at 19:12, Eugene Kirpichov
> <[email protected]> wrote:
> >Hello,
> >
> >I'm almost done adding support for Splittable DoFn
> >http://s.apache.org/splittable-do-fn to Dataflow streaming runner*, and
> >very excited about that. There's only 1 PR
> ><https://github.com/apache/beam/pull/1898> remaining, plus enabling
> >some
> >tests.
> >
> >* (batch runner is much harder because it's not yet quite clear to me
> >how
> >to properly implement liquid sharding
> ><
> https://cloud.google.com/blog/big-data/2016/05/no-shard-left-behind-dynamic-work-rebalancing-in-google-cloud-dataflow
> >
> >with
> >SDF - and the current API is not ready for that yet)
> >
> >After implementing all the runner-agnostic parts of Splittable DoFn, I
> >found them quite easy to integrate into Dataflow streaming runner, and
> >I
> >think this means it should be easy to integrate into other runners too.
> >
> >====== Why it'd be cool ======
> >The general benefits of SDF are well-described in the design doc
> >(linked
> >above).
> >As for right now - if we integrated SDF with all runners, it'd already
> >enable us to start greatly simplifying the code of existing streaming
> >connectors (CountingInput, Kafka, Pubsub, JMS) and writing new
> >connectors
> >(e.g. a really nice one to implement would be "directory watcher", that
> >continuously returns new files in a directory).
> >
> >As a teaser, here's the complete implementation of an "unbounded
> >counter" I
> >used for my test of Dataflow runner integration:
> >
> >  class CountFn extends DoFn<String, String> {
> >    @ProcessElement
> >public ProcessContinuation process(ProcessContext c, OffsetRangeTracker
> >tracker) {
> >      for (int i = tracker.currentRestriction().getFrom();
> >tracker.tryClaim(i); ++i) c.output(i);
> >      return resume();
> >    }
> >
> >    @GetInitialRestriction
> >    public OffsetRange getInitialRange(String element) { return new
> >OffsetRange(0, Integer.MAX_VALUE); }
> >
> >    @NewTracker
> >   public OffsetRangeTracker newTracker(OffsetRange range) { return new
> >OffsetRangeTracker(range); }
> >  }
> >
> >====== What I'm asking ======
> >So, I'd like to ask for help integrating SDF into Spark, Flink and Apex
> >runners from people who are intimately familiar with them -
> >specifically, I
> >was hoping best-case I could nerd-snipe some of you into taking over
> >the
> >integration of SDF with your favorite runner ;)
> >
> >The proper set of people seems to be +Aljoscha Krettek
> ><[email protected]> +Maximilian Michels
> ><[email protected]>
> >[email protected] <[email protected]> +Amit Sela
> ><[email protected]> +Thomas
> >Weise unless I forgot somebody.
> >
> >Average-case, I was looking for runner-specific guidance on how to do
> >it
> >myself.
> >
> >====== If you want to help ======
> >If somebody decides to take this over, in my absence (I'll be mostly
> >gone
> >for ~the next month)., the best people to ask for implementation
> >advice are +Kenn
> >Knowles <[email protected]> and +Daniel Mills <[email protected]> .
> >
> >For reference, here's how SDF is implemented in the direct runner:
> >- Direct runner overrides
> ><
> https://github.com/apache/beam/blob/0616245e654c60ae94cc2c188f857b74a62d9b24/runners/direct-java/src/main/java/org/apache/beam/runners/direct/ParDoMultiOverrideFactory.java
> >
> > ParDo.of() for a splittable DoFn and replaces it with SplittableParDo
> ><
> https://github.com/apache/beam/blob/master/runners/core-java/src/main/java/org/apache/beam/runners/core/SplittableParDo.java
> >
> >(common
> >transform expansion)
> >- SplittableParDo uses two runner-specific primitive transforms:
> >"GBKIntoKeyedWorkItems" and "SplittableProcessElements". Direct runner
> >overrides the first one like this
> ><
> https://github.com/apache/beam/blob/cc28f0cb4c44169f933475ae29a32599024d3a1f/runners/direct-java/src/main/java/org/apache/beam/runners/direct/DirectGBKIntoKeyedWorkItemsOverrideFactory.java
> >,
> >and directly implements evaluation of the second one like this
> ><
> https://github.com/apache/beam/blob/cc28f0cb4c44169f933475ae29a32599024d3a1f/runners/direct-java/src/main/java/org/apache/beam/runners/direct/SplittableProcessElementsEvaluatorFactory.java
> >,
> >using runner hooks introduced in this PR
> ><https://github.com/apache/beam/pull/1824>. At the core of the hooks is
> >"ProcessFn" which is like a regular DoFn but has to be prepared at
> >runtime
> >with some hooks (state, timers, and runner access to
> >RestrictionTracker)
> >before you invoke it. I added a convenience implementation of the hook
> >mimicking behavior of UnboundedSource.
> >- The relevant runner-agnostic tests are in SplittableDoFnTest
> ><
> https://github.com/apache/beam/blob/cc28f0cb4c44169f933475ae29a32599024d3a1f/sdks/java/core/src/test/java/org/apache/beam/sdk/transforms/SplittableDoFnTest.java
> >
> >.
> >
> >That's all it takes, really - the runner has to implement these two
> >transforms. When I looked at Spark and Flink runners, it was not quite
> >clear to me how to implement the GBKIntoKeyedWorkItems transform, e.g.
> >Spark runner currently doesn't use KeyedWorkItem at all - but it seems
> >definitely possible.
> >
> >Thanks!
>

Reply via email to