Hi Eugene, This would work for me also. Please let me know if you want to keep the Apex related discussion separate or want me to join this call.
Thanks, Thomas On Tue, Mar 14, 2017 at 1:56 PM, Eugene Kirpichov < [email protected]> wrote: > Sure, Friday morning sounds good. How about 9am Friday PST, at videocall by > link https://hangouts.google.com/hangouts/_/google.com/splittabledofn ? > > On Mon, Mar 13, 2017 at 10:30 PM Amit Sela <[email protected]> wrote: > > > PST mornings are better, because they are evening/nights for me. Friday > > would work-out best for me. > > > > On Mon, Mar 13, 2017 at 11:46 PM Eugene Kirpichov > > <[email protected]> wrote: > > > > > Awesome!!! > > > > > > Amit - remind me your time zone? JB, do you want to join? > > > I'm free this week all afternoons (say after 2pm) in Pacific Time, and > > > mornings of Wed & Fri. We'll probably need half an hour to an hour. > > > > > > On Mon, Mar 13, 2017 at 1:29 PM Aljoscha Krettek <[email protected]> > > > wrote: > > > > > > > I whipped up a quick version for Flink that seems to work: > > > > https://github.com/apache/beam/pull/2235 > > > > > > > > There are still two failing tests, as described in the PR. > > > > > > > > On Mon, Mar 13, 2017, at 20:10, Amit Sela wrote: > > > > > +1 for a video call. I think it should be pretty straight forward > for > > > the > > > > > Spark runner after the work on read from UnboundedSource and after > > > > > GroupAlsoByWindow, but from my experience such a call could move us > > > > > forward > > > > > fast enough. > > > > > > > > > > On Mon, Mar 13, 2017, 20:37 Eugene Kirpichov <[email protected] > > > > > > > wrote: > > > > > > > > > > > Hi all, > > > > > > > > > > > > Let us continue working on this. I am back from various travels > and > > > am > > > > > > eager to help. > > > > > > > > > > > > Amit, JB - would you like to perhaps have a videocall to hash > this > > > out > > > > for > > > > > > the Spark runner? > > > > > > > > > > > > Aljoscha - are the necessary Flink changes done / or is the need > > for > > > > them > > > > > > obviated by using the (existing) runner-facing state/timer APIs? > > > > Should we > > > > > > have a videocall too? > > > > > > > > > > > > Thomas - what do you think about getting this into Apex runner? > > > > > > > > > > > > (I think videocalls will allow to make rapid progress, but it's > > > > probably a > > > > > > better idea to keep them separate since they'll involve a lot of > > > > > > runner-specific details) > > > > > > > > > > > > PS - The completion of this in Dataflow streaming runner is > > currently > > > > > > waiting only on having a small service-side change implemented > and > > > > rolled > > > > > > out for termination of streaming jobs. > > > > > > > > > > > > On Wed, Feb 8, 2017 at 10:55 AM Kenneth Knowles <[email protected]> > > > > wrote: > > > > > > > > > > > > I recommend proceeding with the runner-facing state & timer APIs; > > > they > > > > are > > > > > > lower-level and more appropriate for this. All runners provide > them > > > or > > > > use > > > > > > runners/core implementations, as they are needed for triggering. > > > > > > > > > > > > On Wed, Feb 8, 2017 at 10:34 AM, Eugene Kirpichov < > > > > [email protected]> > > > > > > wrote: > > > > > > > > > > > > Thanks Aljoscha! > > > > > > > > > > > > Minor note: I'm not familiar with what level of support for > timers > > > > Flink > > > > > > currently has - however SDF in Direct and Dataflow runner > currently > > > > does > > > > > > not use the user-facing state/timer APIs - rather, it uses the > > > > > > runner-facing APIs (StateInternals and TimerInternals) - perhaps > > > Flink > > > > > > already implements these. We may want to change this, but for now > > > it's > > > > good > > > > > > enough (besides, SDF uses watermark holds, which are not > supported > > by > > > > the > > > > > > user-facing state API yet). > > > > > > > > > > > > On Wed, Feb 8, 2017 at 10:19 AM Aljoscha Krettek < > > > > > > [email protected]> wrote: > > > > > > > > > > > > Thanks for the motivation, Eugene! :-) > > > > > > > > > > > > I've wanted to do this for a while now but was waiting for the > > Flink > > > > 1.2 > > > > > > release (which happened this week)! There's some prerequisite > work > > to > > > > be > > > > > > done on the Flink runner: we'll move to the new timer interfaces > > > > introduced > > > > > > in Flink 1.2 and implement support for both the user facing state > > and > > > > timer > > > > > > APIs. This should make implementation of SDF easier. > > > > > > > > > > > > On Wed, Feb 8, 2017 at 7:06 PM, Eugene Kirpichov < > > > [email protected] > > > > > > > > > > > wrote: > > > > > > > > > > > > Thanks! Looking forward to this work. > > > > > > > > > > > > On Wed, Feb 8, 2017 at 3:50 AM Jean-Baptiste Onofré < > > [email protected] > > > > > > > > > > wrote: > > > > > > > > > > > > Thanks for the update Eugene. > > > > > > > > > > > > I will work on the spark runner with Amit. > > > > > > > > > > > > Regards > > > > > > JB > > > > > > > > > > > > On Feb 7, 2017, 19:12, at 19:12, Eugene Kirpichov > > > > > > <[email protected]> wrote: > > > > > > >Hello, > > > > > > > > > > > > > >I'm almost done adding support for Splittable DoFn > > > > > > >http://s.apache.org/splittable-do-fn to Dataflow streaming > > runner*, > > > > and > > > > > > >very excited about that. There's only 1 PR > > > > > > ><https://github.com/apache/beam/pull/1898> remaining, plus > > enabling > > > > > > >some > > > > > > >tests. > > > > > > > > > > > > > >* (batch runner is much harder because it's not yet quite clear > to > > > me > > > > > > >how > > > > > > >to properly implement liquid sharding > > > > > > >< > > > > > > > > > > > > > > > https://cloud.google.com/blog/big-data/2016/05/no-shard- > left-behind-dynamic-work-rebalancing-in-google-cloud-dataflow > > > > > > > > > > > > > >with > > > > > > >SDF - and the current API is not ready for that yet) > > > > > > > > > > > > > >After implementing all the runner-agnostic parts of Splittable > > > DoFn, I > > > > > > >found them quite easy to integrate into Dataflow streaming > runner, > > > and > > > > > > >I > > > > > > >think this means it should be easy to integrate into other > runners > > > > too. > > > > > > > > > > > > > >====== Why it'd be cool ====== > > > > > > >The general benefits of SDF are well-described in the design doc > > > > > > >(linked > > > > > > >above). > > > > > > >As for right now - if we integrated SDF with all runners, it'd > > > already > > > > > > >enable us to start greatly simplifying the code of existing > > > streaming > > > > > > >connectors (CountingInput, Kafka, Pubsub, JMS) and writing new > > > > > > >connectors > > > > > > >(e.g. a really nice one to implement would be "directory > watcher", > > > > that > > > > > > >continuously returns new files in a directory). > > > > > > > > > > > > > >As a teaser, here's the complete implementation of an "unbounded > > > > > > >counter" I > > > > > > >used for my test of Dataflow runner integration: > > > > > > > > > > > > > > class CountFn extends DoFn<String, String> { > > > > > > > @ProcessElement > > > > > > >public ProcessContinuation process(ProcessContext c, > > > > OffsetRangeTracker > > > > > > >tracker) { > > > > > > > for (int i = tracker.currentRestriction().getFrom(); > > > > > > >tracker.tryClaim(i); ++i) c.output(i); > > > > > > > return resume(); > > > > > > > } > > > > > > > > > > > > > > @GetInitialRestriction > > > > > > > public OffsetRange getInitialRange(String element) { return > > new > > > > > > >OffsetRange(0, Integer.MAX_VALUE); } > > > > > > > > > > > > > > @NewTracker > > > > > > > public OffsetRangeTracker newTracker(OffsetRange range) { > > return > > > > new > > > > > > >OffsetRangeTracker(range); } > > > > > > > } > > > > > > > > > > > > > >====== What I'm asking ====== > > > > > > >So, I'd like to ask for help integrating SDF into Spark, Flink > and > > > > Apex > > > > > > >runners from people who are intimately familiar with them - > > > > > > >specifically, I > > > > > > >was hoping best-case I could nerd-snipe some of you into taking > > over > > > > > > >the > > > > > > >integration of SDF with your favorite runner ;) > > > > > > > > > > > > > >The proper set of people seems to be +Aljoscha Krettek > > > > > > ><[email protected]> +Maximilian Michels > > > > > > ><[email protected]> > > > > > > >[email protected] <[email protected]> +Amit Sela > > > > > > ><[email protected]> +Thomas > > > > > > >Weise unless I forgot somebody. > > > > > > > > > > > > > >Average-case, I was looking for runner-specific guidance on how > to > > > do > > > > > > >it > > > > > > >myself. > > > > > > > > > > > > > >====== If you want to help ====== > > > > > > >If somebody decides to take this over, in my absence (I'll be > > mostly > > > > > > >gone > > > > > > >for ~the next month)., the best people to ask for implementation > > > > > > >advice are +Kenn > > > > > > >Knowles <[email protected]> and +Daniel Mills <[email protected]> > . > > > > > > > > > > > > > >For reference, here's how SDF is implemented in the direct > runner: > > > > > > >- Direct runner overrides > > > > > > >< > > > > > > > > > > > > > > > https://github.com/apache/beam/blob/0616245e654c60ae94cc2c188f857b > 74a62d9b24/runners/direct-java/src/main/java/org/apache/ > beam/runners/direct/ParDoMultiOverrideFactory.java > > > > > > > > > > > > > > ParDo.of() for a splittable DoFn and replaces it with > > > SplittableParDo > > > > > > >< > > > > > > > > > > > > > > > https://github.com/apache/beam/blob/master/runners/core- > java/src/main/java/org/apache/beam/runners/core/SplittableParDo.java > > > > > > > > > > > > > >(common > > > > > > >transform expansion) > > > > > > >- SplittableParDo uses two runner-specific primitive transforms: > > > > > > >"GBKIntoKeyedWorkItems" and "SplittableProcessElements". Direct > > > runner > > > > > > >overrides the first one like this > > > > > > >< > > > > > > > > > > > > > > > https://github.com/apache/beam/blob/cc28f0cb4c44169f933475ae29a325 > 99024d3a1f/runners/direct-java/src/main/java/org/apache/ > beam/runners/direct/DirectGBKIntoKeyedWorkItemsOverrideFactory.java > > > > > > >, > > > > > > >and directly implements evaluation of the second one like this > > > > > > >< > > > > > > > > > > > > > > > https://github.com/apache/beam/blob/cc28f0cb4c44169f933475ae29a325 > 99024d3a1f/runners/direct-java/src/main/java/org/apache/ > beam/runners/direct/SplittableProcessElementsEvaluatorFactory.java > > > > > > >, > > > > > > >using runner hooks introduced in this PR > > > > > > ><https://github.com/apache/beam/pull/1824>. At the core of the > > > hooks > > > > is > > > > > > >"ProcessFn" which is like a regular DoFn but has to be prepared > at > > > > > > >runtime > > > > > > >with some hooks (state, timers, and runner access to > > > > > > >RestrictionTracker) > > > > > > >before you invoke it. I added a convenience implementation of > the > > > hook > > > > > > >mimicking behavior of UnboundedSource. > > > > > > >- The relevant runner-agnostic tests are in SplittableDoFnTest > > > > > > >< > > > > > > > > > > > > > > > https://github.com/apache/beam/blob/cc28f0cb4c44169f933475ae29a325 > 99024d3a1f/sdks/java/core/src/test/java/org/apache/beam/sdk/ > transforms/SplittableDoFnTest.java > > > > > > > > > > > > > >. > > > > > > > > > > > > > >That's all it takes, really - the runner has to implement these > > two > > > > > > >transforms. When I looked at Spark and Flink runners, it was not > > > quite > > > > > > >clear to me how to implement the GBKIntoKeyedWorkItems > transform, > > > e.g. > > > > > > >Spark runner currently doesn't use KeyedWorkItem at all - but it > > > seems > > > > > > >definitely possible. > > > > > > > > > > > > > >Thanks! > > > > > > > > > > > > > > > > > > > > > > > > > > > > > > -- > > > > > > Data Artisans GmbH | Stresemannstr. 121A | 10963 Berlin > > > > > > > > > > > > [email protected] > > > > > > +49-(0)30-55599146 <+49%2030%2055599146> <+49%2030%2055599146> > > <+49%2030%2055599146> > > > > > > > > > > > > Registered at Amtsgericht Charlottenburg - HRB 158244 B > > > > > > Managing Directors: Kostas Tzoumas, Stephan Ewen > > > > > > > > > > > > > > > > > > > > > > > > > > > >
