Hi Ovidiu,

We had a "Technical Discussions" link on the website menu, but I can't see it anymore on the website (I just see "Technical Vision").
It contains all documents on which we are discussing.

Agree with you to have an area where we store all "Technical Discussion Documents".

Let me discuss with Frances about that.

Thanks !
Regards
JB

On 08/29/2016 02:50 PM, Ovidiu-Cristian MARCU wrote:
Hi everyone

Is there any repository where one can track all proposals, something like Flink 
does with this wiki [1]?

[1] https://cwiki.apache.org/confluence/display/FLINK/Flink+Improvement+Proposals 
<https://cwiki.apache.org/confluence/display/FLINK/Flink+Improvement+Proposals>

Thanks
Ovidiu

On 29 Aug 2016, at 12:01, Jean-Baptiste Onofré <j...@nanthrax.net> wrote:

Hi Aljoscha,

Indeed, it's something we discussed during our call.

AFAIU, it's one function of the tracker. When doing the tracker tryClaim (with offset, 
partition id, or any kind of tracked "ID"), if the claim is not possible, then 
we will update the watermark.

So the tracker is useful to determine the "split" and also to deal with 
watermark.

Regards
JB

On 08/29/2016 11:55 AM, Aljoscha Krettek wrote:
Hi,
I have another question about this: currently, unbounded sources have
special logic for determining the watermark and the system periodically
asks the sources for the current watermark. As I understood it, watermarks
are only "generated" at the sources. How will this work when sources are
implemented as a combination of DoFns and SplittableDoFns? Will
SplittableDoFns be asked for a watermark, does this mean that watermarks
can then be "generated" at any operation?

Cheers,
Aljoscha

On Sun, 21 Aug 2016 at 22:34 Eugene Kirpichov <kirpic...@google.com.invalid>
wrote:

Hi JB,

Yes, I'm assuming you're referring to the "magic" part on the transform
expansion diagram. This is indeed runner-specific, and timers+state are
likely the simplest way to do this for an SDF that does unbounded amount of
work.

On Sun, Aug 21, 2016 at 12:14 PM Jean-Baptiste Onofré <j...@nanthrax.net>
wrote:

Anyway, from a runner perspective, we will have kind of API (part of the
Runner API) to "orchestrate" the SDF as we discussed during the call,
right ?

Regards
JB

On 08/21/2016 07:24 PM, Eugene Kirpichov wrote:
Hi Aljoscha,
This is an excellent question! And the answer is, we don't need any new
concepts like "SDF executor" and can rely on the per-key state and
timers
machinery that already exists in all runners because it's necessary to
implement windowing/triggering properly.

Note that this is already somewhat addressed in the previously posted
State
and Timers proposal https://s.apache.org/beam-state , under "per-key
workflows".

Think of it this way, using the Kafka example: we'll expand it into a
transform:

(1) ParDo { topic -> (unique key, topic, partition, [0, inf))) for
partition in topic.listPartitions() }
(2) GroupByKey
(3) ParDo { key, topic, partition, R -> Kafka reader code in the
proposal/slides }
 - R is the OffsetRange restriction which in this case will be always
of
the form [startOffset, inf).
 - there'll be just 1 value per key, but we use GBK to just get access
to
the per-key state/timers machinery. This may be runner-specific; maybe
some
runners don't need a GBK to do that.

Now suppose the topic has two partitions, P1 and P2, and they get
assigned
unique keys K1, K2.
Then the input to (3) will be a collection of: (K1, topic, P1, [0,
inf)),
(K2, topic, P2, [0, inf)).
Suppose we have just 1 worker with just 1 thread. Now, how will this
thread
be able to produce elements from both P1 and P2? here's how.

The thread will process (K1, topic, P1, [0, inf)), checkpoint after a
certain time or after a certain number of elements are output (just
like
with the current UnboundedSource reading code) producing a residual
restriction R1' (basically a new start timestamp), put R11 into the
per-key
state and set a timer T1 to resume.
Then it will process (K2, topic, P2, [0, inf)), do the same producing a
residual restriction R2' and setting a timer T2 to resume.
Then timer T1 will fire in the context of the key K1. The thread will
call
processElement again, this time supplying R1' as the restriction; the
process repeats and after a while it checkpoints and stores R1'' into
state
of K1.
Then timer T2 will fire in the context of K2, run processElement for a
while, set a new timer and store R2'' into the state of K2.
Etc.
If partition 1 goes away, the processElement call will return "do not
resume", so a timer will not be set and instead the state associated
with
K1 will be GC'd.

So basically it's almost like cooperative thread scheduling: things run
for
a while, until the runner tells them to checkpoint, then they set a
timer
to resume themselves, and the runner fires the timers, and the process
repeats. And, again, this only requires things that runners can already
do
- state and timers, but no new concept of SDF executor (and
consequently
no
necessity to choose/tune how many you need).

Makes sense?

On Sat, Aug 20, 2016 at 9:46 AM Aljoscha Krettek <aljos...@apache.org>
wrote:

Hi,
I have another question that I think wasn't addressed in the meeting.
At
least it wasn't mentioned in the notes.

In the context of replacing sources by a combination of to SDFs, how
do
you
determine how many "SDF executor" instances you need downstream? For
the
sake of argument assume that both SDFs are executed with parallelism 1
(or
one per worker). Now, if you have a file source that reads from a
static
set of files the first SDF would emit the filenames while the second
SDF
would receive the filenames and emit their contents. This works well
and
the downstream SDF can process one filename after the other. Now,
think
of
something like a Kafka source. The first SDF would emit the partitions
(say
4 partitions, in this example) and the second SDF would be responsible
for
reading from a topic and emitting elements. Reading from one topic
never
finishes so you can't process the topics in series. I think you would
need
to have 4 downstream "SDF executor" instances. The question now is:
how
do
you determine whether you are in the first or the second situation?

Probably I'm just overlooking something and this is already dealt with
somewhere... :-)

Cheers,
Aljoscha

On Fri, 19 Aug 2016 at 21:02 Ismaël Mejía <ieme...@gmail.com> wrote:

Hello,

Thanks for the notes both Dan and Eugene, and for taking the time to
do
the
presentation and  answer our questions.

I mentioned the ongoing work on dynamic scaling on Flink because I
suppose
that it will address dynamic rebalancing eventually (there are
multiple
changes going on for dynamic scaling).





https://docs.google.com/document/d/1G1OS1z3xEBOrYD4wSu-LuBCyPUWyFd9l3T9WyssQ63w/edit#heading=h.2suhu1fjxmp4


https://lists.apache.org/list.html?d...@flink.apache.org:lte=1M:FLIP-8

Anyway I am far from an expert on flink, but probably the flink guys
can
give their opinion about this and refer to a more precise document
that
the
ones I mentioned..

​Thanks again,
Ismaël​

On Fri, Aug 19, 2016 at 8:52 PM, Jean-Baptiste Onofré <
j...@nanthrax.net

wrote:

Great summary Eugene and Dan.

And thanks again for the details, explanation, and discussion.

Regards
JB


On 08/19/2016 08:16 PM, Eugene Kirpichov wrote:

Thanks for attending, everybody!

Here are meeting notes (thanks Dan!).

Q: Will SplittableDoFn enable better repartitioning of the
input/output
data?
A: Not really; repartitioning is orthogonal to SDF.

Current Source API suffers from lack of composition and scalability
because
we treat sources too much as metadata, not enough as data.

Q(slide with transform expansion): who does the "magic"?
A: The runner. Checkpointing and dynamically splitting restrictions
will
require collaboration with the runner.

Q: How does the runner interact with the DoFn to control the
restrictions?
Is it related to the centralized job tracker etc.?
A: RestrictionTracker is a simple helper object, that exists purely
on
the
worker while executing a single partition, and interacts with the
worker
harness part of the runner. Not to be confused with the centralized
job
tracker (master) - completely unrelated. Worker harness, of course,
interacts with the master in some relevant ways (e.g. Dataflow
master
can
tell "you're a straggler, you should split").

Q: Is this a new DoFn subclass, or how will this integrate with the
existing code?
A: It's a feature of reflection-based DoFn (
https://s.apache.org/a-new-do
fn)
- just another optional parameter of type RestrictionTracker to
processElement() which is dynamically bound via reflection, so
fully
backward/forward compatible, and looks to users like a regular
DoFn.

Q: why is fractionClaimed a double?
A: It's a number in 0..1. Job-wide magic (e.g. autoscaling, dynamic
rebalancing) requires a uniform way to represent progress through
different
sources.

Q: Spark runner is microbatch-based, so this seems to map well onto
checkpoint/resume, right?
A: Yes; actually the Dataflow runner is, at a worker level, also
microbatch-based. The way SDF interacts with a runner will be very
similar
to how a Bounded/UnboundedSource interacts with a runner.

Q: Using SDF, what would be the "packaging" of the IO?
A: Same as currently: package IO's as PTransforms and their
implementation
under the hood can be anything: Source, simple ParDo's, SDF, etc.
E.g.
Datastore was recently refactored from BoundedSource to ParDo
(ended
up
simpler and more scalable), transparently to users.

Q: What's the timeline; what to do with the IOs currently in
development?
A: Timeline is O(months). Keep doing what you're doing and working
on
top
of Source APIs when necessary and simple ParDo's otherwise.

Q: What's the impact for the runner writers?
A: Tentatively expected that most of the code for running an SDF
will
be
common to runners, with some amount of per-runner glue code, just
like
GBK/windowing/triggering. Impact on Dataflow runner is larger since
it
supports dynamic rebalancing in batch mode and this is the hardest
part,
but for other runners shouldn't be too hard.

JB: Talend has people who can help with this: e.g. help integrate
into
Spark runner, refactor IOs etc. Amit also willing to chat about
supporting
SDF in Spark runner.

Ismael: There's a Flink proposal about dynamic rebalancing. Ismael
will
send a link.


On Fri, Aug 19, 2016 at 7:21 AM Jean-Baptiste Onofré <
j...@nanthrax.net

wrote:

Hi Eugene,

thanks for the reminder.

Just to prepare some topics for the call, please find some points:

1. Using SDF, what would be the "packaging" of the IO ? It sounds
to
me
that we can keep the IO packaging style (using with* setters for
the
IO
configuration) and replace PTransform, Source, Reader, ...
directly
with
SDF. Correct ?

2. What's your plan in term of release to include SDF ? We have
several
IOs in preparation and I wonder if it's worth to start to use the
new
SDF API or not.

3. What's the impact for the runner writers ? The runners will
have
to
support SDF, that could be tricky depending of the execution
engine.
In
the worst case where the runner can't fully support SDF, does it
mean
that most of our IOs will be useless ?

Just my dumb topics ;)

Thanks,
See you at 8am !

Regards
JB

On 08/19/2016 02:29 AM, Eugene Kirpichov wrote:

Hello everybody,

Just a reminder:

The meeting is happening tomorrow - Friday Aug 19th, 8am-9am PST,
to
join
the call go to
https://hangouts.google.com/hangouts/_/google.com/splittabledofn
.
I intend to go over the proposed design and then have a free-form
discussion.

Please have a skim through the proposal doc:
https://s.apache.org/
splittable-do-fn
I also made some slides that are basically a trimmed-down version
of
the
doc to use as a guide when conducting the meeting,

https://docs.google.com/presentation/d/1DfCp9VRLH-5Ap5nbz047
Fiv3NfjrywJVSdef5dRY4aE/edit?usp=sharing

.

I will post notes from the meeting on this thread afterwards.

Thanks, looking forward.

On Fri, Aug 12, 2016 at 5:35 PM Dan Halperin
<dhalp...@google.com.invalid

wrote:

This is pretty cool! I'll be there too. (unless the hangout gets
too

full

-- if so, I'll drop out in favor of others who aren't lucky
enough
to

get

to talk to Eugene all the time.)

On Fri, Aug 12, 2016 at 4:03 PM, Andrew Psaltis <

psaltis.and...@gmail.com>

wrote:

+1 I'll join

On Friday, August 12, 2016, Aparup Banerjee (apbanerj) <

apban...@cisco.com


wrote:

+ 1, me2




On 8/12/16, 9:27 AM, "Amit Sela" <amitsel...@gmail.com

<javascript:;>>

wrote:

+1 as in I'll join ;-)

On Fri, Aug 12, 2016, 19:14 Eugene Kirpichov

<kirpic...@google.com.invalid


wrote:

Sounds good, thanks!
Then Friday Aug 19th it is, 8am-9am PST,
https://staging.talkgadget.google.com/hangouts/_/google.

com/splittabledofn


On Thu, Aug 11, 2016 at 11:12 PM Jean-Baptiste Onofré <

j...@nanthrax.net

<javascript:;>>

wrote:

Hi

Unfortunately I will be in Ireland on August 15th. What
about

Friday

19th ?


Regards
JB



On Aug 11, 2016, 23:22, at 23:22, Eugene Kirpichov
<kirpic...@google.com.INVALID> wrote:

Hi JB,

Sounds great, does the suggested time over videoconference
work

for

you?

On Thu, Aug 11, 2016 at 11:59 AM Jean-Baptiste Onofré <

j...@nanthrax.net <javascript:;>>

wrote:

Hi Eugene

May we talk together next week ? I like the proposal. I
would

just

need

some details for my understanding.

Thanks
Regards
JB



On Aug 11, 2016, 19:46, at 19:46, Eugene Kirpichov
<kirpic...@google.com.INVALID> wrote:

Hi JB,

What are your thoughts on this?

I'm also thinking of having a virtual meeting to explain
more

about

this
proposal if necessary, since I understand it is a lot to

digest.


How about: Monday Aug 15, 8am-9am Pacific time, over
Hangouts?
(link:




https://staging.talkgadget.google.com/hangouts/_/google.

com/splittabledofn

-
I confirmed that it can be joined without being logged
into a

Google

account)

Who'd be interested in attending, and does this
time/date
work

for

people?

On Fri, Aug 5, 2016 at 10:48 AM Eugene Kirpichov

<kirpic...@google.com <javascript:;>>

wrote:

Hi JB, thanks for reading and for your comments!

It sounds like you are concerned about continued
support
for

existing

IO's

people have developed, and about backward
compatibility?

We do not need to remove the Source API, and all
existing

Source-based

connectors will continue to work [though the document

proposes

at

some

point to make Read.from(Source) to translate to a
wrapper

SDF

under

the

hood, to exercise the feature more and to make sure
that
it

is

strictly

more powerful - but this is an optional implementation

detail].


Perhaps the document phrases this too strongly -
"replacing

the

Source

API": a better phrasing would be "introducing a new API
so

powerful

and

easy-to-use that hopefully people will choose it over
the

Source

API

all

the time, even though they don't have to" :) And we can

discuss

whether or

not to actually deprecate/remove the Source API at some

point

down

the

road, once it becomes clear whether this is the case or
not.

To give more context: this proposal came out of
discussions

within

the SDK

team over the past ~1.5 years, before the Beam project

existed,

on

how to

make major improvements to the Source API; perhaps it
will

clarify

things

if I give a history of the ideas discussed:
- The first idea was to introduce a

Read.from(PCollection<Source>)

transform while keeping the Source API intact - this, given

appropriate

implementation, would solve most of the scalability and

composability

issues of IO's. Then most connectors would look like :

ParDo<A,

Source<B>>

+ Read.from().
- Then we figured that the Source class is an
unnecessary

abstraction, as

it simply holds data. What if we only had a Reader<S,
B>

class

where

S is

the source type and B the output type? Then connectors
would

be

something

like: ParDo<A, S> + hypothetical Read.using(Reader<S,
B>).
- Then somebody remarked that some of the features of
Source

are

useful to

ParDo's as well: e.g. ability to report progress when

processing a

very

heavy element, or ability to produce very large output
in

parallel.

- The two previous bullets were already hinting that the

Read.using()

primitive might not be so special: it just takes S and

produces

B:

isn't

that what a ParDo does, plus some source magic, minus
the

convenience

of

c.output() vs. the start/advance() state machine?
- At this point it became clear that we should explore

unifying

sources

and ParDo's, in particular: can we bring the magic of

sources

to

ParDo's

but without the limitations and coding inconveniences?
And

this

is

how

SplittableDoFn was born: bringing source magic to a
DoFn
by

providing

a

RangeTracker.
- Once the idea of "splittable DoFn's" was born, it
became

clear

that

it

is strictly more general than sources; at least, in the

respect

that

sources have to produce output, while DoFn's don't: an
SDF

may

very

well

produce no output at all, and simply perform a side
effect

in

a

parallel/resumable way.
- Then there were countless hours of discussions on
unifying

the

bounded/unbounded cases, on the particulars of RangeTracker

APIs

reconciling parallelization and checkpointing, what the

relation

between

SDF and DF should be, etc. They culminated in the
current

proposal.

The

proposal comes at a time when a couple of key
ingredients

are

(almost)

ready: NewDoFn to make SDF look like a regular DoFn,
and
the

State/Timers

proposal to enable unbounded work per element.

To put it shortly:
- Yes, we will support existing Source connectors, and
will

support

writing new ones, possibly forever. There is no
interference

with

current

users of Source.
- The new API is an attempt to improve the Source API,
taken

to

its

logical limit where it turns out that users' goals can be

accomplished

easier and more generically entirely within ParDo's.

Let me know what you think, and thanks again!

On Fri, Aug 5, 2016 at 2:39 AM Jean-Baptiste Onofré

<j...@nanthrax.net <javascript:;>>

wrote:

Hi Eugene,

Just a question: why is it in DoFn and note an
improvement

of

Source

?


If I understand correctly, it means that we will have
to

refactore

all

existing IO: basically, what you propose is to remove
all

Source

to

replace with NewDoFn.

I'm concern with this approach, especially in term of

timing:

clearly,

the IO is the area where we have to move forward in
Beam
as

it

will

allow new users to start in their projects.
So, we started to bring new IOs: Kafka, JMS,
Cassandra,

MongoDB,

JDBC,

... and some people started to learn the IO API

(Bounded/Unbouded

source, etc).

I think it would make more sense to enhance the IO API

(Source)

instead

of introducing a NewDoFn.

What are your thoughts for IO writer like me ? ;)

Regards
JB

On 08/04/2016 07:45 PM, Eugene Kirpichov wrote:

Hello Beam community,

We (myself, Daniel Mills and Robert Bradshaw) would
like

to

propose

"Splittable DoFn" - a major generalization of DoFn,
which

allows

processing

of a single element to be non-monolithic, i.e.

checkpointable

and

parallelizable, as well as doing an unbounded amount of

work

per

element.


This allows effectively replacing the current

Bounded/UnboundedSource

APIs

with DoFn's that are much easier to code, more
scalable

and

composable

with

the rest of the Beam programming model, and enables
many

use

cases

that

were previously difficult or impossible, as well as
some

non-obvious new

use cases.

This proposal has been mentioned before in JIRA
[BEAM-65]

and

some

Beam

meetings, and now the whole thing is written up in a

document:


       https://s.apache.org/splittable-do-fn

Here are some things that become possible with
Splittable

DoFn:

- Efficiently read a filepattern matching millions of

files

- Read a collection of files that are produced by an

earlier

step

in the

pipeline (e.g. easily implement a connector to a
storage

system

that can

export itself to files)
- Implement a Kafka reader by composing a "list

partitions"

DoFn

with a

DoFn that simply polls a consumer and outputs new
records

in

a

while()

loop

- Implement a log tailer by composing a DoFn that

incrementally

returns

new

files in a directory and a DoFn that tails a file
- Implement a parallel "count friends in common"

algorithm

(matrix

squaring) with good work balancing

Here is the meaningful part of a hypothetical Kafka

reader

written

against

this API:

   ProcessContinuation processElement(
           ProcessContext context,
OffsetRangeTracker

tracker)

{

     try (KafkaConsumer<String, String> consumer =

Kafka.subscribe(context.element().topic,

context.element().partition)) {

       consumer.seek(tracker.start());
       while (true) {
         ConsumerRecords<String, String> records =

consumer.poll(100ms);

         if (records == null) return done();
         for (ConsumerRecord<String, String> record
:

records)

{

           if (!tracker.tryClaim(record.offset())) {
             return


resume().withFutureOutputWatermark(record.timestamp());

           }
           context.output(record);
         }
       }
     }
   }

The document describes in detail the motivations
behind

this

feature,

the

basic idea and API, open questions, and outlines an

incremental

delivery

plan.

The proposed API builds on the reflection-based new
DoFn

[new-do-fn]

and is

loosely related to "State and Timers for DoFn"

[beam-state].


Please take a look and comment!

Thanks.

[BEAM-65]
https://issues.apache.org/jira/browse/BEAM-65
[new-do-fn] https://s.apache.org/a-new-do-fn
[beam-state] https://s.apache.org/beam-state


--
Jean-Baptiste Onofré
jbono...@apache.org <javascript:;>
http://blog.nanthrax.net
Talend - http://www.talend.com








--
Thanks,
Andrew

Subscribe to my book: Streaming Data <
http://manning.com/psaltis

<https://www.linkedin.com/pub/andrew-psaltis/1/17b/306>
twiiter: @itmdata <
http://twitter.com/intent/user?screen_name=itmdata





--
Jean-Baptiste Onofré
jbono...@apache.org
http://blog.nanthrax.net
Talend - http://www.talend.com



--
Jean-Baptiste Onofré
jbono...@apache.org
http://blog.nanthrax.net
Talend - http://www.talend.com





--
Jean-Baptiste Onofré
jbono...@apache.org
http://blog.nanthrax.net
Talend - http://www.talend.com




--
Jean-Baptiste Onofré
jbono...@apache.org
http://blog.nanthrax.net
Talend - http://www.talend.com





--
Jean-Baptiste Onofré
jbono...@apache.org
http://blog.nanthrax.net
Talend - http://www.talend.com


Reply via email to