Thanks for the offers, guys! The code is finished, though. I only need
to do the last touch ups.

On Tue, Mar 28, 2017, at 09:16, JingsongLee wrote:
> Hi Aljoscha,
> I would like to work on the Flink runner with you.
> Best,JingsongLee------------------------------------------------------------------From:Jean-Baptiste
> Onofré <[email protected]>Time:2017 Mar 28 (Tue) 14:04To:dev
> <[email protected]>Subject:Re: Call for help: let's add Splittable DoFn
> to Spark, Flink and Apex runners
> Hi Aljoscha,
> 
> do you need some help on this ?
> 
> Regards
> JB
> 
> On 03/28/2017 08:00 AM, Aljoscha Krettek wrote:
> > Hi,
> > sorry for being so slow but I’m currently traveling.
> >
> > The Flink code works but I think it could benefit from some refactoring
> > to make the code nice and maintainable.
> >
> > Best,
> > Aljoscha
> >
> > On Tue, Mar 28, 2017, at 07:40, Jean-Baptiste Onofré wrote:
> >> I add myself on the Spark runner.
> >>
> >> Regards
> >> JB
> >>
> >> On 03/27/2017 08:18 PM, Eugene Kirpichov wrote:
> >>> Hi all,
> >>>
> >>> Let's continue the ~bi-weekly sync-ups about state of SDF support in
> >>> Spark/Flink/Apex runners.
> >>>
> >>> Spark:
> >>> Amit, Aviem, Ismaël - when would be a good time for you; does same time
> >>> work (8am PST this Friday)? Who else would like to join?
> >>>
> >>> Flink:
> >>> I pinged the PR, but - Aljoscha, do you think it's worth discussing
> >>> anything there over a videocall?
> >>>
> >>> Apex:
> >>> Thomas - how about same time next Monday? (9:30am PST) Who else would like
> >>> to join?
> >>>
> >>> On Mon, Mar 20, 2017 at 9:59 AM Eugene Kirpichov <[email protected]>
> >>> wrote:
> >>>
> >>>> Meeting notes:
> >>>> Me and Thomas had a video call and we pretty much walked through the
> >>>> implementation of SDF in the runner-agnostic part and in the direct 
> >>>>runner.
> >>>> Flink and Apex are pretty similar, so likely
> >>>> https://github.com/apache/beam/pull/2235 (the Flink PR) will give a very
> >>>> good guideline as to how to do this in Apex.
> >>>> Will talk again in ~2 weeks; and will involve +David Yan
> >>>> <[email protected]> who is also on Apex and currently conveniently
> >>>> works on the Google Dataflow team and, from in-person conversation, was
> >>>> interested in being involved :)
> >>>>
> >>>> On Mon, Mar 20, 2017 at 7:34 AM Eugene Kirpichov <[email protected]>
> >>>> wrote:
> >>>>
> >>>> Thomas - yes, 9:30 works, shall we do that?
> >>>>
> >>>> JB - excellent! You can start experimenting already, using direct runner!
> >>>>
> >>>> On Mon, Mar 20, 2017, 2:26 AM Jean-Baptiste Onofré <[email protected]>
> >>>> wrote:
> >>>>
> >>>> Hi Eugene,
> >>>>
> >>>> Thanks for the meeting notes !
> >>>>
> >>>> I will be in the next call and Ismaël also provided to me some updates.
> >>>>
> >>>> I will sync with Amit on Spark runner and start to experiment and test 
> >>>>SDF
> >>>> on
> >>>> the JMS IO.
> >>>>
> >>>> Thanks !
> >>>> Regards
> >>>> JB
> >>>>
> >>>> On 03/17/2017 04:36 PM, Eugene Kirpichov wrote:
> >>>>> Meeting notes from today's call with Amit, Aviem and Ismaël:
> >>>>>
> >>>>> Spark has 2 types of stateful operators; a cheap one intended for
> >>>> updating
> >>>>> elements (works with state but not with timers) and an expensive one.
> >>>> I.e.
> >>>>> there's no efficient direct counterpart to Beam's keyed state model. In
> >>>>> implementation of Beam State & Timers API, Spark runner will use the
> >>>>> cheaper one for state and the expensive one for timers. So, for SDF,
> >>>> which
> >>>>> in the runner-agnostic SplittableParDo expansion needs both state and
> >>>>> timers, we'll need the expensive one - but this should be fine since 
> >>>>>with
> >>>>> SDF the bottleneck should be in the ProcessElement call itself, not in
> >>>>> splitting/scheduling it.
> >>>>>
> >>>>> For Spark batch runner, implementing SDF might be still simpler: runner
> >>>>> will just not request any checkpointing. Hard parts about SDF/batch are
> >>>>> dynamic rebalancing and size estimation APIs - they will be refined this
> >>>>> quarter, but it's ok to initially not have them.
> >>>>>
> >>>>> Spark runner might use a different expansion of SDF not involving
> >>>>> KeyedWorkItem's (i.e. not overriding the GBKIntoKeyedWorkItems
> >>>> transform),
> >>>>> though still striving to reuse as much code as possible from the 
> >>>>>standard
> >>>>> expansion implemented in SplittableParDo, at least ProcessFn.
> >>>>>
> >>>>> Testing questions:
> >>>>> - Spark runner already implements termination on
> >>>>> watermarks-reaching-infinity properly.
> >>>>> - Q: How to test that the runner actually splits? A: The code that 
> >>>>>splits
> >>>>> is in the runner-agnostic, so a runner would have to deliberately
> >>>> sabotage
> >>>>> it in order to break it - unlikely. Also, for semantics we have
> >>>>> runner-agnostic ROS tests; but at some point will need performance tests
> >>>>> too.
> >>>>>
> >>>>> Next steps:
> >>>>> - Amit will look at the standard SplittableParDo expansion and
> >>>>> implementation in Flink and Direct runner, will write up a doc about how
> >>>> to
> >>>>> do this in Spark.
> >>>>> - Another videotalk in 2 weeks to check on progress/issues.
> >>>>>
> >>>>> Thanks all!
> >>>>>
> >>>>> On Fri, Mar 17, 2017 at 8:29 AM Eugene Kirpichov <[email protected]>
> >>>>> wrote:
> >>>>>
> >>>>>> Yes, Monday morning works! How about also 8am PST, same Hangout link -
> >>>>>> does that work for you?
> >>>>>>
> >>>>>> On Fri, Mar 17, 2017 at 7:50 AM Thomas Weise <[email protected]>
> >>>>>> wrote:
> >>>>>>
> >>>>>> Eugene,
> >>>>>>
> >>>>>> I cannot make it for the call today. Would Monday morning work for you
> >>>> to
> >>>>>> discuss the Apex changes?
> >>>>>>
> >>>>>> Thanks
> >>>>>>
> >>>>>> On Tue, Mar 14, 2017 at 7:27 PM, Eugene Kirpichov <
> >>>>>> [email protected]> wrote:
> >>>>>>
> >>>>>>> Hi! Please feel free to join this call, but I think we'd be mostly
> >>>>>>> discussing how to do it in the Spark runner in particular; so we'll
> >>>>>>> probably need another call for Apex anyway.
> >>>>>>>
> >>>>>>> On Tue, Mar 14, 2017 at 6:54 PM Thomas Weise <[email protected]> wrote:
> >>>>>>>
> >>>>>>>> Hi Eugene,
> >>>>>>>>
> >>>>>>>> This would work for me also. Please let me know if you want to keep
> >>>> the
> >>>>>>>> Apex related discussion separate or want me to join this call.
> >>>>>>>>
> >>>>>>>> Thanks,
> >>>>>>>> Thomas
> >>>>>>>>
> >>>>>>>>
> >>>>>>>> On Tue, Mar 14, 2017 at 1:56 PM, Eugene Kirpichov <
> >>>>>>>> [email protected]> wrote:
> >>>>>>>>
> >>>>>>>>> Sure, Friday morning sounds good. How about 9am Friday PST, at
> >>>>>>> videocall
> >>>>>>>> by
> >>>>>>>>> link
> >>>>>> https://hangouts.google.com/hangouts/_/google.com/splittabledofn
> >>>>>>> ?
> >>>>>>>>>
> >>>>>>>>> On Mon, Mar 13, 2017 at 10:30 PM Amit Sela <[email protected]>
> >>>>>>> wrote:
> >>>>>>>>>
> >>>>>>>>>> PST mornings are better, because they are evening/nights for me.
> >>>>>>> Friday
> >>>>>>>>>> would work-out best for me.
> >>>>>>>>>>
> >>>>>>>>>> On Mon, Mar 13, 2017 at 11:46 PM Eugene Kirpichov
> >>>>>>>>>> <[email protected]> wrote:
> >>>>>>>>>>
> >>>>>>>>>>> Awesome!!!
> >>>>>>>>>>>
> >>>>>>>>>>> Amit - remind me your time zone? JB, do you want to join?
> >>>>>>>>>>> I'm free this week all afternoons (say after 2pm) in Pacific
> >>>>>> Time,
> >>>>>>>> and
> >>>>>>>>>>> mornings of Wed & Fri. We'll probably need half an hour to an
> >>>>>> hour.
> >>>>>>>>>>>
> >>>>>>>>>>> On Mon, Mar 13, 2017 at 1:29 PM Aljoscha Krettek <
> >>>>>>>> [email protected]>
> >>>>>>>>>>> wrote:
> >>>>>>>>>>>
> >>>>>>>>>>>> I whipped up a quick version for Flink that seems to work:
> >>>>>>>>>>>> https://github.com/apache/beam/pull/2235
> >>>>>>>>>>>>
> >>>>>>>>>>>> There are still two failing tests, as described in the PR.
> >>>>>>>>>>>>
> >>>>>>>>>>>> On Mon, Mar 13, 2017, at 20:10, Amit Sela wrote:
> >>>>>>>>>>>>> +1 for a video call. I think it should be pretty straight
> >>>>>>> forward
> >>>>>>>>> for
> >>>>>>>>>>> the
> >>>>>>>>>>>>> Spark runner after the work on read from UnboundedSource and
> >>>>>>>> after
> >>>>>>>>>>>>> GroupAlsoByWindow, but from my experience such a call could
> >>>>>>> move
> >>>>>>>> us
> >>>>>>>>>>>>> forward
> >>>>>>>>>>>>> fast enough.
> >>>>>>>>>>>>>
> >>>>>>>>>>>>> On Mon, Mar 13, 2017, 20:37 Eugene Kirpichov <
> >>>>>>>> [email protected]
> >>>>>>>>>>
> >>>>>>>>>>>>> wrote:
> >>>>>>>>>>>>>
> >>>>>>>>>>>>>> Hi all,
> >>>>>>>>>>>>>>
> >>>>>>>>>>>>>> Let us continue working on this. I am back from various
> >>>>>>> travels
> >>>>>>>>> and
> >>>>>>>>>>> am
> >>>>>>>>>>>>>> eager to help.
> >>>>>>>>>>>>>>
> >>>>>>>>>>>>>> Amit, JB - would you like to perhaps have a videocall to
> >>>>>> hash
> >>>>>>>>> this
> >>>>>>>>>>> out
> >>>>>>>>>>>> for
> >>>>>>>>>>>>>> the Spark runner?
> >>>>>>>>>>>>>>
> >>>>>>>>>>>>>> Aljoscha - are the necessary Flink changes done / or is the
> >>>>>>>> need
> >>>>>>>>>> for
> >>>>>>>>>>>> them
> >>>>>>>>>>>>>> obviated by using the (existing) runner-facing state/timer
> >>>>>>>> APIs?
> >>>>>>>>>>>> Should we
> >>>>>>>>>>>>>> have a videocall too?
> >>>>>>>>>>>>>>
> >>>>>>>>>>>>>> Thomas - what do you think about getting this into Apex
> >>>>>>> runner?
> >>>>>>>>>>>>>>
> >>>>>>>>>>>>>> (I think videocalls will allow to make rapid progress, but
> >>>>>>> it's
> >>>>>>>>>>>> probably a
> >>>>>>>>>>>>>> better idea to keep them separate since they'll involve a
> >>>>>> lot
> >>>>>>>> of
> >>>>>>>>>>>>>> runner-specific details)
> >>>>>>>>>>>>>>
> >>>>>>>>>>>>>> PS - The completion of this in Dataflow streaming runner is
> >>>>>>>>>> currently
> >>>>>>>>>>>>>> waiting only on having a small service-side change
> >>>>>>> implemented
> >>>>>>>>> and
> >>>>>>>>>>>> rolled
> >>>>>>>>>>>>>> out for termination of streaming jobs.
> >>>>>>>>>>>>>>
> >>>>>>>>>>>>>> On Wed, Feb 8, 2017 at 10:55 AM Kenneth Knowles <
> >>>>>>>> [email protected]>
> >>>>>>>>>>>> wrote:
> >>>>>>>>>>>>>>
> >>>>>>>>>>>>>> I recommend proceeding with the runner-facing state & timer
> >>>>>>>> APIs;
> >>>>>>>>>>> they
> >>>>>>>>>>>> are
> >>>>>>>>>>>>>> lower-level and more appropriate for this. All runners
> >>>>>>> provide
> >>>>>>>>> them
> >>>>>>>>>>> or
> >>>>>>>>>>>> use
> >>>>>>>>>>>>>> runners/core implementations, as they are needed for
> >>>>>>>> triggering.
> >>>>>>>>>>>>>>
> >>>>>>>>>>>>>> On Wed, Feb 8, 2017 at 10:34 AM, Eugene Kirpichov <
> >>>>>>>>>>>> [email protected]>
> >>>>>>>>>>>>>> wrote:
> >>>>>>>>>>>>>>
> >>>>>>>>>>>>>> Thanks Aljoscha!
> >>>>>>>>>>>>>>
> >>>>>>>>>>>>>> Minor note: I'm not familiar with what level of support for
> >>>>>>>>> timers
> >>>>>>>>>>>> Flink
> >>>>>>>>>>>>>> currently has - however SDF in Direct and Dataflow runner
> >>>>>>>>> currently
> >>>>>>>>>>>> does
> >>>>>>>>>>>>>> not use the user-facing state/timer APIs - rather, it uses
> >>>>>>> the
> >>>>>>>>>>>>>> runner-facing APIs (StateInternals and TimerInternals) -
> >>>>>>>> perhaps
> >>>>>>>>>>> Flink
> >>>>>>>>>>>>>> already implements these. We may want to change this, but
> >>>>>> for
> >>>>>>>> now
> >>>>>>>>>>> it's
> >>>>>>>>>>>> good
> >>>>>>>>>>>>>> enough (besides, SDF uses watermark holds, which are not
> >>>>>>>>> supported
> >>>>>>>>>> by
> >>>>>>>>>>>> the
> >>>>>>>>>>>>>> user-facing state API yet).
> >>>>>>>>>>>>>>
> >>>>>>>>>>>>>> On Wed, Feb 8, 2017 at 10:19 AM Aljoscha Krettek <
> >>>>>>>>>>>>>> [email protected]> wrote:
> >>>>>>>>>>>>>>
> >>>>>>>>>>>>>> Thanks for the motivation, Eugene! :-)
> >>>>>>>>>>>>>>
> >>>>>>>>>>>>>> I've wanted to do this for a while now but was waiting for
> >>>>>>> the
> >>>>>>>>>> Flink
> >>>>>>>>>>>> 1.2
> >>>>>>>>>>>>>> release (which happened this week)! There's some
> >>>>>> prerequisite
> >>>>>>>>> work
> >>>>>>>>>> to
> >>>>>>>>>>>> be
> >>>>>>>>>>>>>> done on the Flink runner: we'll move to the new timer
> >>>>>>>> interfaces
> >>>>>>>>>>>> introduced
> >>>>>>>>>>>>>> in Flink 1.2 and implement support for both the user facing
> >>>>>>>> state
> >>>>>>>>>> and
> >>>>>>>>>>>> timer
> >>>>>>>>>>>>>> APIs. This should make implementation of SDF easier.
> >>>>>>>>>>>>>>
> >>>>>>>>>>>>>> On Wed, Feb 8, 2017 at 7:06 PM, Eugene Kirpichov <
> >>>>>>>>>>> [email protected]
> >>>>>>>>>>>>>
> >>>>>>>>>>>>>> wrote:
> >>>>>>>>>>>>>>
> >>>>>>>>>>>>>> Thanks! Looking forward to this work.
> >>>>>>>>>>>>>>
> >>>>>>>>>>>>>> On Wed, Feb 8, 2017 at 3:50 AM Jean-Baptiste Onofré <
> >>>>>>>>>> [email protected]
> >>>>>>>>>>>>
> >>>>>>>>>>>>>> wrote:
> >>>>>>>>>>>>>>
> >>>>>>>>>>>>>> Thanks for the update Eugene.
> >>>>>>>>>>>>>>
> >>>>>>>>>>>>>> I will work on the spark runner with Amit.
> >>>>>>>>>>>>>>
> >>>>>>>>>>>>>> Regards
> >>>>>>>>>>>>>> JB
> >>>>>>>>>>>>>>
> >>>>>>>>>>>>>> On Feb 7, 2017, 19:12, at 19:12, Eugene Kirpichov
> >>>>>>>>>>>>>> <[email protected]> wrote:
> >>>>>>>>>>>>>>> Hello,
> >>>>>>>>>>>>>>>
> >>>>>>>>>>>>>>> I'm almost done adding support for Splittable DoFn
> >>>>>>>>>>>>>>> http://s.apache.org/splittable-do-fn to Dataflow
> >>>>>> streaming
> >>>>>>>>>> runner*,
> >>>>>>>>>>>> and
> >>>>>>>>>>>>>>> very excited about that. There's only 1 PR
> >>>>>>>>>>>>>>> <https://github.com/apache/beam/pull/1898> remaining,
> >>>>>> plus
> >>>>>>>>>> enabling
> >>>>>>>>>>>>>>> some
> >>>>>>>>>>>>>>> tests.
> >>>>>>>>>>>>>>>
> >>>>>>>>>>>>>>> * (batch runner is much harder because it's not yet quite
> >>>>>>>> clear
> >>>>>>>>> to
> >>>>>>>>>>> me
> >>>>>>>>>>>>>>> how
> >>>>>>>>>>>>>>> to properly implement liquid sharding
> >>>>>>>>>>>>>>> <
> >>>>>>>>>>>>>>
> >>>>>>>>>>>>
> >>>>>>>>>>>
> >>>>>>>>>> https://cloud.google.com/blog/big-data/2016/05/no-shard-
> >>>>>>>>> left-behind-dynamic-work-rebalancing-in-google-cloud-dataflow
> >>>>>>>>>>>>>>>
> >>>>>>>>>>>>>>> with
> >>>>>>>>>>>>>>> SDF - and the current API is not ready for that yet)
> >>>>>>>>>>>>>>>
> >>>>>>>>>>>>>>> After implementing all the runner-agnostic parts of
> >>>>>>> Splittable
> >>>>>>>>>>> DoFn, I
> >>>>>>>>>>>>>>> found them quite easy to integrate into Dataflow streaming
> >>>>>>>>> runner,
> >>>>>>>>>>> and
> >>>>>>>>>>>>>>> I
> >>>>>>>>>>>>>>> think this means it should be easy to integrate into other
> >>>>>>>>> runners
> >>>>>>>>>>>> too.
> >>>>>>>>>>>>>>>
> >>>>>>>>>>>>>>> ====== Why it'd be cool ======
> >>>>>>>>>>>>>>> The general benefits of SDF are well-described in the
> >>>>>> design
> >>>>>>>> doc
> >>>>>>>>>>>>>>> (linked
> >>>>>>>>>>>>>>> above).
> >>>>>>>>>>>>>>> As for right now - if we integrated SDF with all runners,
> >>>>>>> it'd
> >>>>>>>>>>> already
> >>>>>>>>>>>>>>> enable us to start greatly simplifying the code of
> >>>>>> existing
> >>>>>>>>>>> streaming
> >>>>>>>>>>>>>>> connectors (CountingInput, Kafka, Pubsub, JMS) and writing
> >>>>>>> new
> >>>>>>>>>>>>>>> connectors
> >>>>>>>>>>>>>>> (e.g. a really nice one to implement would be "directory
> >>>>>>>>> watcher",
> >>>>>>>>>>>> that
> >>>>>>>>>>>>>>> continuously returns new files in a directory).
> >>>>>>>>>>>>>>>
> >>>>>>>>>>>>>>> As a teaser, here's the complete implementation of an
> >>>>>>>> "unbounded
> >>>>>>>>>>>>>>> counter" I
> >>>>>>>>>>>>>>> used for my test of Dataflow runner integration:
> >>>>>>>>>>>>>>>
> >>>>>>>>>>>>>>>  class CountFn extends DoFn<String, String> {
> >>>>>>>>>>>>>>>    @ProcessElement
> >>>>>>>>>>>>>>> public ProcessContinuation process(ProcessContext c,
> >>>>>>>>>>>> OffsetRangeTracker
> >>>>>>>>>>>>>>> tracker) {
> >>>>>>>>>>>>>>>      for (int i = tracker.currentRestriction().getFrom();
> >>>>>>>>>>>>>>> tracker.tryClaim(i); ++i) c.output(i);
> >>>>>>>>>>>>>>>      return resume();
> >>>>>>>>>>>>>>>    }
> >>>>>>>>>>>>>>>
> >>>>>>>>>>>>>>>    @GetInitialRestriction
> >>>>>>>>>>>>>>>    public OffsetRange getInitialRange(String element) {
> >>>>>>>> return
> >>>>>>>>>> new
> >>>>>>>>>>>>>>> OffsetRange(0, Integer.MAX_VALUE); }
> >>>>>>>>>>>>>>>
> >>>>>>>>>>>>>>>    @NewTracker
> >>>>>>>>>>>>>>>   public OffsetRangeTracker newTracker(OffsetRange
> >>>>>> range) {
> >>>>>>>>>> return
> >>>>>>>>>>>> new
> >>>>>>>>>>>>>>> OffsetRangeTracker(range); }
> >>>>>>>>>>>>>>>  }
> >>>>>>>>>>>>>>>
> >>>>>>>>>>>>>>> ====== What I'm asking ======
> >>>>>>>>>>>>>>> So, I'd like to ask for help integrating SDF into Spark,
> >>>>>>> Flink
> >>>>>>>>> and
> >>>>>>>>>>>> Apex
> >>>>>>>>>>>>>>> runners from people who are intimately familiar with them
> >>>>>> -
> >>>>>>>>>>>>>>> specifically, I
> >>>>>>>>>>>>>>> was hoping best-case I could nerd-snipe some of you into
> >>>>>>>> taking
> >>>>>>>>>> over
> >>>>>>>>>>>>>>> the
> >>>>>>>>>>>>>>> integration of SDF with your favorite runner ;)
> >>>>>>>>>>>>>>>
> >>>>>>>>>>>>>>> The proper set of people seems to be +Aljoscha Krettek
> >>>>>>>>>>>>>>> <[email protected]> +Maximilian Michels
> >>>>>>>>>>>>>>> <[email protected]>
> >>>>>>>>>>>>>>> [email protected] <[email protected]> +Amit Sela
> >>>>>>>>>>>>>>> <[email protected]> +Thomas
> >>>>>>>>>>>>>>> Weise unless I forgot somebody.
> >>>>>>>>>>>>>>>
> >>>>>>>>>>>>>>> Average-case, I was looking for runner-specific guidance
> >>>>>> on
> >>>>>>>> how
> >>>>>>>>> to
> >>>>>>>>>>> do
> >>>>>>>>>>>>>>> it
> >>>>>>>>>>>>>>> myself.
> >>>>>>>>>>>>>>>
> >>>>>>>>>>>>>>> ====== If you want to help ======
> >>>>>>>>>>>>>>> If somebody decides to take this over, in my absence (I'll
> >>>>>>> be
> >>>>>>>>>> mostly
> >>>>>>>>>>>>>>> gone
> >>>>>>>>>>>>>>> for ~the next month)., the best people to ask for
> >>>>>>>> implementation
> >>>>>>>>>>>>>>> advice are +Kenn
> >>>>>>>>>>>>>>> Knowles <[email protected]> and +Daniel Mills <
> >>>>>>> [email protected]
> >>>>>>>>>
> >>>>>>>>> .
> >>>>>>>>>>>>>>>
> >>>>>>>>>>>>>>> For reference, here's how SDF is implemented in the direct
> >>>>>>>>> runner:
> >>>>>>>>>>>>>>> - Direct runner overrides
> >>>>>>>>>>>>>>> <
> >>>>>>>>>>>>>>
> >>>>>>>>>>>>
> >>>>>>>>>>>
> >>>>>>>>>> https://github.com/apache/beam/blob/0616245e654c60ae94cc2c188f857b
> >>>>>>>>> 74a62d9b24/runners/direct-java/src/main/java/org/apache/
> >>>>>>>>> beam/runners/direct/ParDoMultiOverrideFactory.java
> >>>>>>>>>>>>>>>
> >>>>>>>>>>>>>>> ParDo.of() for a splittable DoFn and replaces it with
> >>>>>>>>>>> SplittableParDo
> >>>>>>>>>>>>>>> <
> >>>>>>>>>>>>>>
> >>>>>>>>>>>>
> >>>>>>>>>>>
> >>>>>>>>>> https://github.com/apache/beam/blob/master/runners/core-
> >>>>>>>>> java/src/main/java/org/apache/beam/runners/core/SplittableParDo.java
> >>>>>>>>>>>>>>>
> >>>>>>>>>>>>>>> (common
> >>>>>>>>>>>>>>> transform expansion)
> >>>>>>>>>>>>>>> - SplittableParDo uses two runner-specific primitive
> >>>>>>>> transforms:
> >>>>>>>>>>>>>>> "GBKIntoKeyedWorkItems" and "SplittableProcessElements".
> >>>>>>>> Direct
> >>>>>>>>>>> runner
> >>>>>>>>>>>>>>> overrides the first one like this
> >>>>>>>>>>>>>>> <
> >>>>>>>>>>>>>>
> >>>>>>>>>>>>
> >>>>>>>>>>>
> >>>>>>>>>> https://github.com/apache/beam/blob/cc28f0cb4c44169f933475ae29a325
> >>>>>>>>> 99024d3a1f/runners/direct-java/src/main/java/org/apache/
> >>>>>>>>> beam/runners/direct/DirectGBKIntoKeyedWorkItemsOverrideFactory.java
> >>>>>>>>>>>>>>> ,
> >>>>>>>>>>>>>>> and directly implements evaluation of the second one like
> >>>>>>> this
> >>>>>>>>>>>>>>> <
> >>>>>>>>>>>>>>
> >>>>>>>>>>>>
> >>>>>>>>>>>
> >>>>>>>>>> https://github.com/apache/beam/blob/cc28f0cb4c44169f933475ae29a325
> >>>>>>>>> 99024d3a1f/runners/direct-java/src/main/java/org/apache/
> >>>>>>>>> beam/runners/direct/SplittableProcessElementsEvaluatorFactory.java
> >>>>>>>>>>>>>>> ,
> >>>>>>>>>>>>>>> using runner hooks introduced in this PR
> >>>>>>>>>>>>>>> <https://github.com/apache/beam/pull/1824>. At the core
> >>>>>> of
> >>>>>>>> the
> >>>>>>>>>>> hooks
> >>>>>>>>>>>> is
> >>>>>>>>>>>>>>> "ProcessFn" which is like a regular DoFn but has to be
> >>>>>>>> prepared
> >>>>>>>>> at
> >>>>>>>>>>>>>>> runtime
> >>>>>>>>>>>>>>> with some hooks (state, timers, and runner access to
> >>>>>>>>>>>>>>> RestrictionTracker)
> >>>>>>>>>>>>>>> before you invoke it. I added a convenience implementation
> >>>>>>> of
> >>>>>>>>> the
> >>>>>>>>>>> hook
> >>>>>>>>>>>>>>> mimicking behavior of UnboundedSource.
> >>>>>>>>>>>>>>> - The relevant runner-agnostic tests are in
> >>>>>>> SplittableDoFnTest
> >>>>>>>>>>>>>>> <
> >>>>>>>>>>>>>>
> >>>>>>>>>>>>
> >>>>>>>>>>>
> >>>>>>>>>> https://github.com/apache/beam/blob/cc28f0cb4c44169f933475ae29a325
> >>>>>>>>> 99024d3a1f/sdks/java/core/src/test/java/org/apache/beam/sdk/
> >>>>>>>>> transforms/SplittableDoFnTest.java
> >>>>>>>>>>>>>>>
> >>>>>>>>>>>>>>> .
> >>>>>>>>>>>>>>>
> >>>>>>>>>>>>>>> That's all it takes, really - the runner has to implement
> >>>>>>>> these
> >>>>>>>>>> two
> >>>>>>>>>>>>>>> transforms. When I looked at Spark and Flink runners, it
> >>>>>> was
> >>>>>>>> not
> >>>>>>>>>>> quite
> >>>>>>>>>>>>>>> clear to me how to implement the GBKIntoKeyedWorkItems
> >>>>>>>>> transform,
> >>>>>>>>>>> e.g.
> >>>>>>>>>>>>>>> Spark runner currently doesn't use KeyedWorkItem at all -
> >>>>>>> but
> >>>>>>>> it
> >>>>>>>>>>> seems
> >>>>>>>>>>>>>>> definitely possible.
> >>>>>>>>>>>>>>>
> >>>>>>>>>>>>>>> Thanks!
> >>>>>>>>>>>>>>
> >>>>>>>>>>>>>>
> >>>>>>>>>>>>>>
> >>>>>>>>>>>>>>
> >>>>>>>>>>>>>> --
> >>>>>>>>>>>>>> Data Artisans GmbH | Stresemannstr. 121A | 10963 Berlin
> >>>>>>>>>>>>>>
> >>>>>>>>>>>>>> [email protected]
> >>>>>>>>>>>>>> +49-(0)30-55599146 <+49%2030%2055599146> <+49%2030%2055599146>
> >>>>>> <+49%2030%2055599146>
> >>>>>>>> <+49%2030%2055599146> <+49%2030%2055599146>
> >>>>>>>>>> <+49%2030%2055599146>
> >>>>>>>>>>>>>>
> >>>>>>>>>>>>>> Registered at Amtsgericht Charlottenburg - HRB 158244 B
> >>>>>>>>>>>>>> Managing Directors: Kostas Tzoumas, Stephan Ewen
> >>>>>>>>>>>>>>
> >>>>>>>>>>>>>>
> >>>>>>>>>>>>>>
> >>>>>>>>>>>>
> >>>>>>>>>>>
> >>>>>>>>>>
> >>>>>>>>>
> >>>>>>>>
> >>>>>>>
> >>>>>>
> >>>>>>
> >>>>>
> >>>>
> >>>> --
> >>>> Jean-Baptiste Onofré
> >>>> [email protected]
> >>>> http://blog.nanthrax.net
> >>>> Talend - http://www.talend.com
> >>>>
> >>>>
> >>>
> >>
> >> --
> >> Jean-Baptiste Onofré
> >> [email protected]
> >> http://blog.nanthrax.net
> >> Talend - http://www.talend.com
> 
> -- 
> Jean-Baptiste Onofré
> [email protected]
> http://blog.nanthrax.net
> Talend - http://www.talend.com

Reply via email to