Great! so we'll use the hangout you added here, see you then. On Wed, Mar 15, 2017 at 7:22 PM Eugene Kirpichov <kirpic...@google.com.invalid> wrote:
> Amit - 8am is fine with me, let's do that. > > On Wed, Mar 15, 2017 at 6:00 AM Jean-Baptiste Onofré <j...@nanthrax.net> > wrote: > > > Hi, > > > > Anyway, I hope it will result with some notes on the mailing list as it > > could be > > helpful. > > > > I'm not against a video call to move forward, but, from ma community > > perspective, we should always provide minute notes on the mailing list. > > > > Unfortunately, next Friday, I will still be in China, so not possible to > > join > > (even if I would have like to participate :(). > > > > Regards > > JB > > > > On 03/15/2017 07:45 PM, Amit Sela wrote: > > > I have dinner at 9am.. which doesn't sound like a real thing if you > > forget > > > about timezones J > > > How about 8am ? or something later like 12pm mid-day ? > > > Apex can take the 9am time slot ;-) > > > > > > On Wed, Mar 15, 2017 at 4:28 AM Eugene Kirpichov > > > <kirpic...@google.com.invalid> wrote: > > > > > >> Hi! Please feel free to join this call, but I think we'd be mostly > > >> discussing how to do it in the Spark runner in particular; so we'll > > >> probably need another call for Apex anyway. > > >> > > >> On Tue, Mar 14, 2017 at 6:54 PM Thomas Weise <t...@apache.org> wrote: > > >> > > >>> Hi Eugene, > > >>> > > >>> This would work for me also. Please let me know if you want to keep > the > > >>> Apex related discussion separate or want me to join this call. > > >>> > > >>> Thanks, > > >>> Thomas > > >>> > > >>> > > >>> On Tue, Mar 14, 2017 at 1:56 PM, Eugene Kirpichov < > > >>> kirpic...@google.com.invalid> wrote: > > >>> > > >>>> Sure, Friday morning sounds good. How about 9am Friday PST, at > > >> videocall > > >>> by > > >>>> link > https://hangouts.google.com/hangouts/_/google.com/splittabledofn > > >> ? > > >>>> > > >>>> On Mon, Mar 13, 2017 at 10:30 PM Amit Sela <amitsel...@gmail.com> > > >> wrote: > > >>>> > > >>>>> PST mornings are better, because they are evening/nights for me. > > >> Friday > > >>>>> would work-out best for me. > > >>>>> > > >>>>> On Mon, Mar 13, 2017 at 11:46 PM Eugene Kirpichov > > >>>>> <kirpic...@google.com.invalid> wrote: > > >>>>> > > >>>>>> Awesome!!! > > >>>>>> > > >>>>>> Amit - remind me your time zone? JB, do you want to join? > > >>>>>> I'm free this week all afternoons (say after 2pm) in Pacific Time, > > >>> and > > >>>>>> mornings of Wed & Fri. We'll probably need half an hour to an > hour. > > >>>>>> > > >>>>>> On Mon, Mar 13, 2017 at 1:29 PM Aljoscha Krettek < > > >>> aljos...@apache.org> > > >>>>>> wrote: > > >>>>>> > > >>>>>>> I whipped up a quick version for Flink that seems to work: > > >>>>>>> https://github.com/apache/beam/pull/2235 > > >>>>>>> > > >>>>>>> There are still two failing tests, as described in the PR. > > >>>>>>> > > >>>>>>> On Mon, Mar 13, 2017, at 20:10, Amit Sela wrote: > > >>>>>>>> +1 for a video call. I think it should be pretty straight > > >> forward > > >>>> for > > >>>>>> the > > >>>>>>>> Spark runner after the work on read from UnboundedSource and > > >>> after > > >>>>>>>> GroupAlsoByWindow, but from my experience such a call could > > >> move > > >>> us > > >>>>>>>> forward > > >>>>>>>> fast enough. > > >>>>>>>> > > >>>>>>>> On Mon, Mar 13, 2017, 20:37 Eugene Kirpichov < > > >>> kirpic...@google.com > > >>>>> > > >>>>>>>> wrote: > > >>>>>>>> > > >>>>>>>>> Hi all, > > >>>>>>>>> > > >>>>>>>>> Let us continue working on this. I am back from various > > >> travels > > >>>> and > > >>>>>> am > > >>>>>>>>> eager to help. > > >>>>>>>>> > > >>>>>>>>> Amit, JB - would you like to perhaps have a videocall to hash > > >>>> this > > >>>>>> out > > >>>>>>> for > > >>>>>>>>> the Spark runner? > > >>>>>>>>> > > >>>>>>>>> Aljoscha - are the necessary Flink changes done / or is the > > >>> need > > >>>>> for > > >>>>>>> them > > >>>>>>>>> obviated by using the (existing) runner-facing state/timer > > >>> APIs? > > >>>>>>> Should we > > >>>>>>>>> have a videocall too? > > >>>>>>>>> > > >>>>>>>>> Thomas - what do you think about getting this into Apex > > >> runner? > > >>>>>>>>> > > >>>>>>>>> (I think videocalls will allow to make rapid progress, but > > >> it's > > >>>>>>> probably a > > >>>>>>>>> better idea to keep them separate since they'll involve a lot > > >>> of > > >>>>>>>>> runner-specific details) > > >>>>>>>>> > > >>>>>>>>> PS - The completion of this in Dataflow streaming runner is > > >>>>> currently > > >>>>>>>>> waiting only on having a small service-side change > > >> implemented > > >>>> and > > >>>>>>> rolled > > >>>>>>>>> out for termination of streaming jobs. > > >>>>>>>>> > > >>>>>>>>> On Wed, Feb 8, 2017 at 10:55 AM Kenneth Knowles < > > >>> k...@google.com> > > >>>>>>> wrote: > > >>>>>>>>> > > >>>>>>>>> I recommend proceeding with the runner-facing state & timer > > >>> APIs; > > >>>>>> they > > >>>>>>> are > > >>>>>>>>> lower-level and more appropriate for this. All runners > > >> provide > > >>>> them > > >>>>>> or > > >>>>>>> use > > >>>>>>>>> runners/core implementations, as they are needed for > > >>> triggering. > > >>>>>>>>> > > >>>>>>>>> On Wed, Feb 8, 2017 at 10:34 AM, Eugene Kirpichov < > > >>>>>>> kirpic...@google.com> > > >>>>>>>>> wrote: > > >>>>>>>>> > > >>>>>>>>> Thanks Aljoscha! > > >>>>>>>>> > > >>>>>>>>> Minor note: I'm not familiar with what level of support for > > >>>> timers > > >>>>>>> Flink > > >>>>>>>>> currently has - however SDF in Direct and Dataflow runner > > >>>> currently > > >>>>>>> does > > >>>>>>>>> not use the user-facing state/timer APIs - rather, it uses > > >> the > > >>>>>>>>> runner-facing APIs (StateInternals and TimerInternals) - > > >>> perhaps > > >>>>>> Flink > > >>>>>>>>> already implements these. We may want to change this, but for > > >>> now > > >>>>>> it's > > >>>>>>> good > > >>>>>>>>> enough (besides, SDF uses watermark holds, which are not > > >>>> supported > > >>>>> by > > >>>>>>> the > > >>>>>>>>> user-facing state API yet). > > >>>>>>>>> > > >>>>>>>>> On Wed, Feb 8, 2017 at 10:19 AM Aljoscha Krettek < > > >>>>>>>>> aljos...@data-artisans.com> wrote: > > >>>>>>>>> > > >>>>>>>>> Thanks for the motivation, Eugene! :-) > > >>>>>>>>> > > >>>>>>>>> I've wanted to do this for a while now but was waiting for > > >> the > > >>>>> Flink > > >>>>>>> 1.2 > > >>>>>>>>> release (which happened this week)! There's some prerequisite > > >>>> work > > >>>>> to > > >>>>>>> be > > >>>>>>>>> done on the Flink runner: we'll move to the new timer > > >>> interfaces > > >>>>>>> introduced > > >>>>>>>>> in Flink 1.2 and implement support for both the user facing > > >>> state > > >>>>> and > > >>>>>>> timer > > >>>>>>>>> APIs. This should make implementation of SDF easier. > > >>>>>>>>> > > >>>>>>>>> On Wed, Feb 8, 2017 at 7:06 PM, Eugene Kirpichov < > > >>>>>> kirpic...@google.com > > >>>>>>>> > > >>>>>>>>> wrote: > > >>>>>>>>> > > >>>>>>>>> Thanks! Looking forward to this work. > > >>>>>>>>> > > >>>>>>>>> On Wed, Feb 8, 2017 at 3:50 AM Jean-Baptiste Onofré < > > >>>>> j...@nanthrax.net > > >>>>>>> > > >>>>>>>>> wrote: > > >>>>>>>>> > > >>>>>>>>> Thanks for the update Eugene. > > >>>>>>>>> > > >>>>>>>>> I will work on the spark runner with Amit. > > >>>>>>>>> > > >>>>>>>>> Regards > > >>>>>>>>> JB > > >>>>>>>>> > > >>>>>>>>> On Feb 7, 2017, 19:12, at 19:12, Eugene Kirpichov > > >>>>>>>>> <kirpic...@google.com.INVALID> wrote: > > >>>>>>>>>> Hello, > > >>>>>>>>>> > > >>>>>>>>>> I'm almost done adding support for Splittable DoFn > > >>>>>>>>>> http://s.apache.org/splittable-do-fn to Dataflow streaming > > >>>>> runner*, > > >>>>>>> and > > >>>>>>>>>> very excited about that. There's only 1 PR > > >>>>>>>>>> <https://github.com/apache/beam/pull/1898> remaining, plus > > >>>>> enabling > > >>>>>>>>>> some > > >>>>>>>>>> tests. > > >>>>>>>>>> > > >>>>>>>>>> * (batch runner is much harder because it's not yet quite > > >>> clear > > >>>> to > > >>>>>> me > > >>>>>>>>>> how > > >>>>>>>>>> to properly implement liquid sharding > > >>>>>>>>>> < > > >>>>>>>>> > > >>>>>>> > > >>>>>> > > >>>>> https://cloud.google.com/blog/big-data/2016/05/no-shard- > > >>>> left-behind-dynamic-work-rebalancing-in-google-cloud-dataflow > > >>>>>>>>>> > > >>>>>>>>>> with > > >>>>>>>>>> SDF - and the current API is not ready for that yet) > > >>>>>>>>>> > > >>>>>>>>>> After implementing all the runner-agnostic parts of > > >> Splittable > > >>>>>> DoFn, I > > >>>>>>>>>> found them quite easy to integrate into Dataflow streaming > > >>>> runner, > > >>>>>> and > > >>>>>>>>>> I > > >>>>>>>>>> think this means it should be easy to integrate into other > > >>>> runners > > >>>>>>> too. > > >>>>>>>>>> > > >>>>>>>>>> ====== Why it'd be cool ====== > > >>>>>>>>>> The general benefits of SDF are well-described in the design > > >>> doc > > >>>>>>>>>> (linked > > >>>>>>>>>> above). > > >>>>>>>>>> As for right now - if we integrated SDF with all runners, > > >> it'd > > >>>>>> already > > >>>>>>>>>> enable us to start greatly simplifying the code of existing > > >>>>>> streaming > > >>>>>>>>>> connectors (CountingInput, Kafka, Pubsub, JMS) and writing > > >> new > > >>>>>>>>>> connectors > > >>>>>>>>>> (e.g. a really nice one to implement would be "directory > > >>>> watcher", > > >>>>>>> that > > >>>>>>>>>> continuously returns new files in a directory). > > >>>>>>>>>> > > >>>>>>>>>> As a teaser, here's the complete implementation of an > > >>> "unbounded > > >>>>>>>>>> counter" I > > >>>>>>>>>> used for my test of Dataflow runner integration: > > >>>>>>>>>> > > >>>>>>>>>> class CountFn extends DoFn<String, String> { > > >>>>>>>>>> @ProcessElement > > >>>>>>>>>> public ProcessContinuation process(ProcessContext c, > > >>>>>>> OffsetRangeTracker > > >>>>>>>>>> tracker) { > > >>>>>>>>>> for (int i = tracker.currentRestriction().getFrom(); > > >>>>>>>>>> tracker.tryClaim(i); ++i) c.output(i); > > >>>>>>>>>> return resume(); > > >>>>>>>>>> } > > >>>>>>>>>> > > >>>>>>>>>> @GetInitialRestriction > > >>>>>>>>>> public OffsetRange getInitialRange(String element) { > > >>> return > > >>>>> new > > >>>>>>>>>> OffsetRange(0, Integer.MAX_VALUE); } > > >>>>>>>>>> > > >>>>>>>>>> @NewTracker > > >>>>>>>>>> public OffsetRangeTracker newTracker(OffsetRange range) { > > >>>>> return > > >>>>>>> new > > >>>>>>>>>> OffsetRangeTracker(range); } > > >>>>>>>>>> } > > >>>>>>>>>> > > >>>>>>>>>> ====== What I'm asking ====== > > >>>>>>>>>> So, I'd like to ask for help integrating SDF into Spark, > > >> Flink > > >>>> and > > >>>>>>> Apex > > >>>>>>>>>> runners from people who are intimately familiar with them - > > >>>>>>>>>> specifically, I > > >>>>>>>>>> was hoping best-case I could nerd-snipe some of you into > > >>> taking > > >>>>> over > > >>>>>>>>>> the > > >>>>>>>>>> integration of SDF with your favorite runner ;) > > >>>>>>>>>> > > >>>>>>>>>> The proper set of people seems to be +Aljoscha Krettek > > >>>>>>>>>> <aljos...@data-artisans.com> +Maximilian Michels > > >>>>>>>>>> <m...@data-artisans.com> > > >>>>>>>>>> +ieme...@gmail.com <ieme...@gmail.com> +Amit Sela > > >>>>>>>>>> <amitsel...@gmail.com> +Thomas > > >>>>>>>>>> Weise unless I forgot somebody. > > >>>>>>>>>> > > >>>>>>>>>> Average-case, I was looking for runner-specific guidance on > > >>> how > > >>>> to > > >>>>>> do > > >>>>>>>>>> it > > >>>>>>>>>> myself. > > >>>>>>>>>> > > >>>>>>>>>> ====== If you want to help ====== > > >>>>>>>>>> If somebody decides to take this over, in my absence (I'll > > >> be > > >>>>> mostly > > >>>>>>>>>> gone > > >>>>>>>>>> for ~the next month)., the best people to ask for > > >>> implementation > > >>>>>>>>>> advice are +Kenn > > >>>>>>>>>> Knowles <k...@google.com> and +Daniel Mills < > > >> mil...@google.com > > >>>> > > >>>> . > > >>>>>>>>>> > > >>>>>>>>>> For reference, here's how SDF is implemented in the direct > > >>>> runner: > > >>>>>>>>>> - Direct runner overrides > > >>>>>>>>>> < > > >>>>>>>>> > > >>>>>>> > > >>>>>> > > >>>>> https://github.com/apache/beam/blob/0616245e654c60ae94cc2c188f857b > > >>>> 74a62d9b24/runners/direct-java/src/main/java/org/apache/ > > >>>> beam/runners/direct/ParDoMultiOverrideFactory.java > > >>>>>>>>>> > > >>>>>>>>>> ParDo.of() for a splittable DoFn and replaces it with > > >>>>>> SplittableParDo > > >>>>>>>>>> < > > >>>>>>>>> > > >>>>>>> > > >>>>>> > > >>>>> https://github.com/apache/beam/blob/master/runners/core- > > >>>> java/src/main/java/org/apache/beam/runners/core/SplittableParDo.java > > >>>>>>>>>> > > >>>>>>>>>> (common > > >>>>>>>>>> transform expansion) > > >>>>>>>>>> - SplittableParDo uses two runner-specific primitive > > >>> transforms: > > >>>>>>>>>> "GBKIntoKeyedWorkItems" and "SplittableProcessElements". > > >>> Direct > > >>>>>> runner > > >>>>>>>>>> overrides the first one like this > > >>>>>>>>>> < > > >>>>>>>>> > > >>>>>>> > > >>>>>> > > >>>>> https://github.com/apache/beam/blob/cc28f0cb4c44169f933475ae29a325 > > >>>> 99024d3a1f/runners/direct-java/src/main/java/org/apache/ > > >>>> beam/runners/direct/DirectGBKIntoKeyedWorkItemsOverrideFactory.java > > >>>>>>>>>> , > > >>>>>>>>>> and directly implements evaluation of the second one like > > >> this > > >>>>>>>>>> < > > >>>>>>>>> > > >>>>>>> > > >>>>>> > > >>>>> https://github.com/apache/beam/blob/cc28f0cb4c44169f933475ae29a325 > > >>>> 99024d3a1f/runners/direct-java/src/main/java/org/apache/ > > >>>> beam/runners/direct/SplittableProcessElementsEvaluatorFactory.java > > >>>>>>>>>> , > > >>>>>>>>>> using runner hooks introduced in this PR > > >>>>>>>>>> <https://github.com/apache/beam/pull/1824>. At the core of > > >>> the > > >>>>>> hooks > > >>>>>>> is > > >>>>>>>>>> "ProcessFn" which is like a regular DoFn but has to be > > >>> prepared > > >>>> at > > >>>>>>>>>> runtime > > >>>>>>>>>> with some hooks (state, timers, and runner access to > > >>>>>>>>>> RestrictionTracker) > > >>>>>>>>>> before you invoke it. I added a convenience implementation > > >> of > > >>>> the > > >>>>>> hook > > >>>>>>>>>> mimicking behavior of UnboundedSource. > > >>>>>>>>>> - The relevant runner-agnostic tests are in > > >> SplittableDoFnTest > > >>>>>>>>>> < > > >>>>>>>>> > > >>>>>>> > > >>>>>> > > >>>>> https://github.com/apache/beam/blob/cc28f0cb4c44169f933475ae29a325 > > >>>> 99024d3a1f/sdks/java/core/src/test/java/org/apache/beam/sdk/ > > >>>> transforms/SplittableDoFnTest.java > > >>>>>>>>>> > > >>>>>>>>>> . > > >>>>>>>>>> > > >>>>>>>>>> That's all it takes, really - the runner has to implement > > >>> these > > >>>>> two > > >>>>>>>>>> transforms. When I looked at Spark and Flink runners, it was > > >>> not > > >>>>>> quite > > >>>>>>>>>> clear to me how to implement the GBKIntoKeyedWorkItems > > >>>> transform, > > >>>>>> e.g. > > >>>>>>>>>> Spark runner currently doesn't use KeyedWorkItem at all - > > >> but > > >>> it > > >>>>>> seems > > >>>>>>>>>> definitely possible. > > >>>>>>>>>> > > >>>>>>>>>> Thanks! > > >>>>>>>>> > > >>>>>>>>> > > >>>>>>>>> > > >>>>>>>>> > > >>>>>>>>> -- > > >>>>>>>>> Data Artisans GmbH | Stresemannstr. 121A | 10963 Berlin > > >>>>>>>>> > > >>>>>>>>> i...@data-artisans.com > > >>>>>>>>> +49-(0)30-55599146 <+49%2030%2055599146> > <+49%2030%2055599146> <+49%2030%2055599146> > > >> <+49%2030%2055599146> > > >>> <+49%2030%2055599146> <+49%2030%2055599146> > > >>>>> <+49%2030%2055599146> > > >>>>>>>>> > > >>>>>>>>> Registered at Amtsgericht Charlottenburg - HRB 158244 B > > >>>>>>>>> Managing Directors: Kostas Tzoumas, Stephan Ewen > > >>>>>>>>> > > >>>>>>>>> > > >>>>>>>>> > > >>>>>>> > > >>>>>> > > >>>>> > > >>>> > > >>> > > >> > > > > > > > -- > > Jean-Baptiste Onofré > > jbono...@apache.org > > http://blog.nanthrax.net > > Talend - http://www.talend.com > > >