I recommend proceeding with the runner-facing state & timer APIs; they are
lower-level and more appropriate for this. All runners provide them or use
runners/core implementations, as they are needed for triggering.

On Wed, Feb 8, 2017 at 10:34 AM, Eugene Kirpichov <[email protected]>
wrote:

> Thanks Aljoscha!
>
> Minor note: I'm not familiar with what level of support for timers Flink
> currently has - however SDF in Direct and Dataflow runner currently does
> not use the user-facing state/timer APIs - rather, it uses the
> runner-facing APIs (StateInternals and TimerInternals) - perhaps Flink
> already implements these. We may want to change this, but for now it's good
> enough (besides, SDF uses watermark holds, which are not supported by the
> user-facing state API yet).
>
> On Wed, Feb 8, 2017 at 10:19 AM Aljoscha Krettek <
> [email protected]> wrote:
>
>> Thanks for the motivation, Eugene! :-)
>>
>> I've wanted to do this for a while now but was waiting for the Flink 1.2
>> release (which happened this week)! There's some prerequisite work to be
>> done on the Flink runner: we'll move to the new timer interfaces introduced
>> in Flink 1.2 and implement support for both the user facing state and timer
>> APIs. This should make implementation of SDF easier.
>>
>> On Wed, Feb 8, 2017 at 7:06 PM, Eugene Kirpichov <[email protected]>
>> wrote:
>>
>> Thanks! Looking forward to this work.
>>
>> On Wed, Feb 8, 2017 at 3:50 AM Jean-Baptiste Onofré <[email protected]>
>> wrote:
>>
>> Thanks for the update Eugene.
>>
>> I will work on the spark runner with Amit.
>>
>> Regards
>> JB
>>
>> On Feb 7, 2017, 19:12, at 19:12, Eugene Kirpichov
>> <[email protected]> wrote:
>> >Hello,
>> >
>> >I'm almost done adding support for Splittable DoFn
>> >http://s.apache.org/splittable-do-fn to Dataflow streaming runner*, and
>> >very excited about that. There's only 1 PR
>> ><https://github.com/apache/beam/pull/1898> remaining, plus enabling
>> >some
>> >tests.
>> >
>> >* (batch runner is much harder because it's not yet quite clear to me
>> >how
>> >to properly implement liquid sharding
>> ><https://cloud.google.com/blog/big-data/2016/05/no-
>> shard-left-behind-dynamic-work-rebalancing-in-google-cloud-dataflow>
>> >with
>> >SDF - and the current API is not ready for that yet)
>> >
>> >After implementing all the runner-agnostic parts of Splittable DoFn, I
>> >found them quite easy to integrate into Dataflow streaming runner, and
>> >I
>> >think this means it should be easy to integrate into other runners too.
>> >
>> >====== Why it'd be cool ======
>> >The general benefits of SDF are well-described in the design doc
>> >(linked
>> >above).
>> >As for right now - if we integrated SDF with all runners, it'd already
>> >enable us to start greatly simplifying the code of existing streaming
>> >connectors (CountingInput, Kafka, Pubsub, JMS) and writing new
>> >connectors
>> >(e.g. a really nice one to implement would be "directory watcher", that
>> >continuously returns new files in a directory).
>> >
>> >As a teaser, here's the complete implementation of an "unbounded
>> >counter" I
>> >used for my test of Dataflow runner integration:
>> >
>> >  class CountFn extends DoFn<String, String> {
>> >    @ProcessElement
>> >public ProcessContinuation process(ProcessContext c, OffsetRangeTracker
>> >tracker) {
>> >      for (int i = tracker.currentRestriction().getFrom();
>> >tracker.tryClaim(i); ++i) c.output(i);
>> >      return resume();
>> >    }
>> >
>> >    @GetInitialRestriction
>> >    public OffsetRange getInitialRange(String element) { return new
>> >OffsetRange(0, Integer.MAX_VALUE); }
>> >
>> >    @NewTracker
>> >   public OffsetRangeTracker newTracker(OffsetRange range) { return new
>> >OffsetRangeTracker(range); }
>> >  }
>> >
>> >====== What I'm asking ======
>> >So, I'd like to ask for help integrating SDF into Spark, Flink and Apex
>> >runners from people who are intimately familiar with them -
>> >specifically, I
>> >was hoping best-case I could nerd-snipe some of you into taking over
>> >the
>> >integration of SDF with your favorite runner ;)
>> >
>> >The proper set of people seems to be +Aljoscha Krettek
>> ><[email protected]> +Maximilian Michels
>> ><[email protected]>
>> >[email protected] <[email protected]> +Amit Sela
>> ><[email protected]> +Thomas
>> >Weise unless I forgot somebody.
>> >
>> >Average-case, I was looking for runner-specific guidance on how to do
>> >it
>> >myself.
>> >
>> >====== If you want to help ======
>> >If somebody decides to take this over, in my absence (I'll be mostly
>> >gone
>> >for ~the next month)., the best people to ask for implementation
>> >advice are +Kenn
>> >Knowles <[email protected]> and +Daniel Mills <[email protected]> .
>> >
>> >For reference, here's how SDF is implemented in the direct runner:
>> >- Direct runner overrides
>> ><https://github.com/apache/beam/blob/0616245e654c60ae94cc2c188f857b
>> 74a62d9b24/runners/direct-java/src/main/java/org/apache/
>> beam/runners/direct/ParDoMultiOverrideFactory.java>
>> > ParDo.of() for a splittable DoFn and replaces it with SplittableParDo
>> ><https://github.com/apache/beam/blob/master/runners/core-
>> java/src/main/java/org/apache/beam/runners/core/SplittableParDo.java>
>> >(common
>> >transform expansion)
>> >- SplittableParDo uses two runner-specific primitive transforms:
>> >"GBKIntoKeyedWorkItems" and "SplittableProcessElements". Direct runner
>> >overrides the first one like this
>> ><https://github.com/apache/beam/blob/cc28f0cb4c44169f933475ae29a325
>> 99024d3a1f/runners/direct-java/src/main/java/org/apache/
>> beam/runners/direct/DirectGBKIntoKeyedWorkItemsOverrideFactory.java>,
>> >and directly implements evaluation of the second one like this
>> ><https://github.com/apache/beam/blob/cc28f0cb4c44169f933475ae29a325
>> 99024d3a1f/runners/direct-java/src/main/java/org/apache/
>> beam/runners/direct/SplittableProcessElementsEvaluatorFactory.java>,
>> >using runner hooks introduced in this PR
>> ><https://github.com/apache/beam/pull/1824>. At the core of the hooks is
>> >"ProcessFn" which is like a regular DoFn but has to be prepared at
>> >runtime
>> >with some hooks (state, timers, and runner access to
>> >RestrictionTracker)
>> >before you invoke it. I added a convenience implementation of the hook
>> >mimicking behavior of UnboundedSource.
>> >- The relevant runner-agnostic tests are in SplittableDoFnTest
>> ><https://github.com/apache/beam/blob/cc28f0cb4c44169f933475ae29a325
>> 99024d3a1f/sdks/java/core/src/test/java/org/apache/beam/sdk/
>> transforms/SplittableDoFnTest.java>
>> >.
>> >
>> >That's all it takes, really - the runner has to implement these two
>> >transforms. When I looked at Spark and Flink runners, it was not quite
>> >clear to me how to implement the GBKIntoKeyedWorkItems transform, e.g.
>> >Spark runner currently doesn't use KeyedWorkItem at all - but it seems
>> >definitely possible.
>> >
>> >Thanks!
>>
>>
>>
>>
>> --
>> Data Artisans GmbH | Stresemannstr. 121A | 10963 Berlin
>>
>> [email protected]
>> +49-(0)30-55599146
>>
>> Registered at Amtsgericht Charlottenburg - HRB 158244 B
>> Managing Directors: Kostas Tzoumas, Stephan Ewen
>>
>

Reply via email to