Hi,

Anyway, I hope it will result with some notes on the mailing list as it could be helpful.

I'm not against a video call to move forward, but, from ma community perspective, we should always provide minute notes on the mailing list.

Unfortunately, next Friday, I will still be in China, so not possible to join (even if I would have like to participate :().

Regards
JB

On 03/15/2017 07:45 PM, Amit Sela wrote:
I have dinner at 9am.. which doesn't sound like a real thing if you forget
about timezones J
How about 8am ? or something later like 12pm mid-day ?
Apex can take the 9am time slot ;-)

On Wed, Mar 15, 2017 at 4:28 AM Eugene Kirpichov
<kirpic...@google.com.invalid> wrote:

Hi! Please feel free to join this call, but I think we'd be mostly
discussing how to do it in the Spark runner in particular; so we'll
probably need another call for Apex anyway.

On Tue, Mar 14, 2017 at 6:54 PM Thomas Weise <t...@apache.org> wrote:

Hi Eugene,

This would work for me also. Please let me know if you want to keep the
Apex related discussion separate or want me to join this call.

Thanks,
Thomas


On Tue, Mar 14, 2017 at 1:56 PM, Eugene Kirpichov <
kirpic...@google.com.invalid> wrote:

Sure, Friday morning sounds good. How about 9am Friday PST, at
videocall
by
link https://hangouts.google.com/hangouts/_/google.com/splittabledofn
?

On Mon, Mar 13, 2017 at 10:30 PM Amit Sela <amitsel...@gmail.com>
wrote:

PST mornings are better, because they are evening/nights for me.
Friday
would work-out best for me.

On Mon, Mar 13, 2017 at 11:46 PM Eugene Kirpichov
<kirpic...@google.com.invalid> wrote:

Awesome!!!

Amit - remind me your time zone? JB, do you want to join?
I'm free this week all afternoons (say after 2pm) in Pacific Time,
and
mornings of Wed & Fri. We'll probably need half an hour to an hour.

On Mon, Mar 13, 2017 at 1:29 PM Aljoscha Krettek <
aljos...@apache.org>
wrote:

I whipped up a quick version for Flink that seems to work:
https://github.com/apache/beam/pull/2235

There are still two failing tests, as described in the PR.

On Mon, Mar 13, 2017, at 20:10, Amit Sela wrote:
+1 for a video call. I think it should be pretty straight
forward
for
the
Spark runner after the work on read from UnboundedSource and
after
GroupAlsoByWindow, but from my experience such a call could
move
us
forward
fast enough.

On Mon, Mar 13, 2017, 20:37 Eugene Kirpichov <
kirpic...@google.com

wrote:

Hi all,

Let us continue working on this. I am back from various
travels
and
am
eager to help.

Amit, JB - would you like to perhaps have a videocall to hash
this
out
for
the Spark runner?

Aljoscha - are the necessary Flink changes done / or is the
need
for
them
obviated by using the (existing) runner-facing state/timer
APIs?
Should we
have a videocall too?

Thomas - what do you think about getting this into Apex
runner?

(I think videocalls will allow to make rapid progress, but
it's
probably a
better idea to keep them separate since they'll involve a lot
of
runner-specific details)

PS - The completion of this in Dataflow streaming runner is
currently
waiting only on having a small service-side change
implemented
and
rolled
out for termination of streaming jobs.

On Wed, Feb 8, 2017 at 10:55 AM Kenneth Knowles <
k...@google.com>
wrote:

I recommend proceeding with the runner-facing state & timer
APIs;
they
are
lower-level and more appropriate for this. All runners
provide
them
or
use
runners/core implementations, as they are needed for
triggering.

On Wed, Feb 8, 2017 at 10:34 AM, Eugene Kirpichov <
kirpic...@google.com>
wrote:

Thanks Aljoscha!

Minor note: I'm not familiar with what level of support for
timers
Flink
currently has - however SDF in Direct and Dataflow runner
currently
does
not use the user-facing state/timer APIs - rather, it uses
the
runner-facing APIs (StateInternals and TimerInternals) -
perhaps
Flink
already implements these. We may want to change this, but for
now
it's
good
enough (besides, SDF uses watermark holds, which are not
supported
by
the
user-facing state API yet).

On Wed, Feb 8, 2017 at 10:19 AM Aljoscha Krettek <
aljos...@data-artisans.com> wrote:

Thanks for the motivation, Eugene! :-)

I've wanted to do this for a while now but was waiting for
the
Flink
1.2
release (which happened this week)! There's some prerequisite
work
to
be
done on the Flink runner: we'll move to the new timer
interfaces
introduced
in Flink 1.2 and implement support for both the user facing
state
and
timer
APIs. This should make implementation of SDF easier.

On Wed, Feb 8, 2017 at 7:06 PM, Eugene Kirpichov <
kirpic...@google.com

wrote:

Thanks! Looking forward to this work.

On Wed, Feb 8, 2017 at 3:50 AM Jean-Baptiste Onofré <
j...@nanthrax.net

wrote:

Thanks for the update Eugene.

I will work on the spark runner with Amit.

Regards
JB

On Feb 7, 2017, 19:12, at 19:12, Eugene Kirpichov
<kirpic...@google.com.INVALID> wrote:
Hello,

I'm almost done adding support for Splittable DoFn
http://s.apache.org/splittable-do-fn to Dataflow streaming
runner*,
and
very excited about that. There's only 1 PR
<https://github.com/apache/beam/pull/1898> remaining, plus
enabling
some
tests.

* (batch runner is much harder because it's not yet quite
clear
to
me
how
to properly implement liquid sharding
<



https://cloud.google.com/blog/big-data/2016/05/no-shard-
left-behind-dynamic-work-rebalancing-in-google-cloud-dataflow

with
SDF - and the current API is not ready for that yet)

After implementing all the runner-agnostic parts of
Splittable
DoFn, I
found them quite easy to integrate into Dataflow streaming
runner,
and
I
think this means it should be easy to integrate into other
runners
too.

====== Why it'd be cool ======
The general benefits of SDF are well-described in the design
doc
(linked
above).
As for right now - if we integrated SDF with all runners,
it'd
already
enable us to start greatly simplifying the code of existing
streaming
connectors (CountingInput, Kafka, Pubsub, JMS) and writing
new
connectors
(e.g. a really nice one to implement would be "directory
watcher",
that
continuously returns new files in a directory).

As a teaser, here's the complete implementation of an
"unbounded
counter" I
used for my test of Dataflow runner integration:

 class CountFn extends DoFn<String, String> {
   @ProcessElement
public ProcessContinuation process(ProcessContext c,
OffsetRangeTracker
tracker) {
     for (int i = tracker.currentRestriction().getFrom();
tracker.tryClaim(i); ++i) c.output(i);
     return resume();
   }

   @GetInitialRestriction
   public OffsetRange getInitialRange(String element) {
return
new
OffsetRange(0, Integer.MAX_VALUE); }

   @NewTracker
  public OffsetRangeTracker newTracker(OffsetRange range) {
return
new
OffsetRangeTracker(range); }
 }

====== What I'm asking ======
So, I'd like to ask for help integrating SDF into Spark,
Flink
and
Apex
runners from people who are intimately familiar with them -
specifically, I
was hoping best-case I could nerd-snipe some of you into
taking
over
the
integration of SDF with your favorite runner ;)

The proper set of people seems to be +Aljoscha Krettek
<aljos...@data-artisans.com> +Maximilian Michels
<m...@data-artisans.com>
+ieme...@gmail.com <ieme...@gmail.com> +Amit Sela
<amitsel...@gmail.com> +Thomas
Weise unless I forgot somebody.

Average-case, I was looking for runner-specific guidance on
how
to
do
it
myself.

====== If you want to help ======
If somebody decides to take this over, in my absence (I'll
be
mostly
gone
for ~the next month)., the best people to ask for
implementation
advice are +Kenn
Knowles <k...@google.com> and +Daniel Mills <
mil...@google.com

.

For reference, here's how SDF is implemented in the direct
runner:
- Direct runner overrides
<



https://github.com/apache/beam/blob/0616245e654c60ae94cc2c188f857b
74a62d9b24/runners/direct-java/src/main/java/org/apache/
beam/runners/direct/ParDoMultiOverrideFactory.java

ParDo.of() for a splittable DoFn and replaces it with
SplittableParDo
<



https://github.com/apache/beam/blob/master/runners/core-
java/src/main/java/org/apache/beam/runners/core/SplittableParDo.java

(common
transform expansion)
- SplittableParDo uses two runner-specific primitive
transforms:
"GBKIntoKeyedWorkItems" and "SplittableProcessElements".
Direct
runner
overrides the first one like this
<



https://github.com/apache/beam/blob/cc28f0cb4c44169f933475ae29a325
99024d3a1f/runners/direct-java/src/main/java/org/apache/
beam/runners/direct/DirectGBKIntoKeyedWorkItemsOverrideFactory.java
,
and directly implements evaluation of the second one like
this
<



https://github.com/apache/beam/blob/cc28f0cb4c44169f933475ae29a325
99024d3a1f/runners/direct-java/src/main/java/org/apache/
beam/runners/direct/SplittableProcessElementsEvaluatorFactory.java
,
using runner hooks introduced in this PR
<https://github.com/apache/beam/pull/1824>. At the core of
the
hooks
is
"ProcessFn" which is like a regular DoFn but has to be
prepared
at
runtime
with some hooks (state, timers, and runner access to
RestrictionTracker)
before you invoke it. I added a convenience implementation
of
the
hook
mimicking behavior of UnboundedSource.
- The relevant runner-agnostic tests are in
SplittableDoFnTest
<



https://github.com/apache/beam/blob/cc28f0cb4c44169f933475ae29a325
99024d3a1f/sdks/java/core/src/test/java/org/apache/beam/sdk/
transforms/SplittableDoFnTest.java

.

That's all it takes, really - the runner has to implement
these
two
transforms. When I looked at Spark and Flink runners, it was
not
quite
clear to me how to implement the GBKIntoKeyedWorkItems
transform,
e.g.
Spark runner currently doesn't use KeyedWorkItem at all -
but
it
seems
definitely possible.

Thanks!




--
Data Artisans GmbH | Stresemannstr. 121A | 10963 Berlin

i...@data-artisans.com
+49-(0)30-55599146 <+49%2030%2055599146>
<+49%2030%2055599146>
<+49%2030%2055599146> <+49%2030%2055599146>
<+49%2030%2055599146>

Registered at Amtsgericht Charlottenburg - HRB 158244 B
Managing Directors: Kostas Tzoumas, Stephan Ewen











--
Jean-Baptiste Onofré
jbono...@apache.org
http://blog.nanthrax.net
Talend - http://www.talend.com

Reply via email to