Hey all,

An update: https://github.com/apache/incubator-beam/pull/896 has been
merged, laying groundwork and adding support for splittable DoFn to the
in-memory runner.

What this PR does:
- It defines an API, in full accordance with the proposal discussed on this
thread.
- It adds a mostly runner-agnostic expansion of the ParDo transform for a
splittable DoFn, with one runner-specific primitive transform that needs to
be overridden by every runner.
- It overrides said transform in the in-memory runner, so this works
end-to-end in the in-memory runner.
- All this code is covered by tests (unit and integration
@RunnableOnService) and appears to work properly in combination with the
rest of the Beam model: e.g., inputs to a splittable DoFn can be windowed,
and their windows and timestamps are transparently propagated.

Caveats:
- The API is marked @Experimental, but this is an understatement: it is
assumed to be in flux and is not intended to be used yet. Overwhelmingly
likely, it *will* change in incompatible ways. DO NOT write pipelines with
this transform yet.
- It only works in the in-memory runner: the vast majority of code is
runner-agnostic, but a central runner-specific primitive transform is only
overridden by the in-memory runner.

My immediate next plan is to make this work in the Cloud Dataflow streaming
runner (since this is the runner I'm most familiar with), in order to get
experience with what kind of runner hooks are needed and to put the API in
shape for adding hooks for other runners - and then work either myself or
with the community on making it work in other runners too. Once all runners
sufficiently support a particular subset of features, we can start
transitioning some connectors or writing new ones using that subset (I
expect that streaming connectors will come first).

Additionally, the Python SDK is considering using Splittable DoFn as the
*only* API for streaming sources (right now it doesn't have any API for
that, so there's no compatibility concerns). No implementation work has
happened yet, but it seems like a good idea.

On Tue, Aug 30, 2016 at 1:45 AM Aljoscha Krettek <aljos...@apache.org>
wrote:

> Thanks for the explanation Eugene and JB.
>
> By the way, I'm not trying to find holes in this, I really like the
> feature. I just sometimes wonder how a specific thing might be implemented
> with this.
>
> On Mon, 29 Aug 2016 at 18:00 Eugene Kirpichov <kirpic...@google.com.invalid
> >
> wrote:
>
> > Hi Aljoscha,
> >
> > The watermark reporting is done via
> > ProcessContinuation.futureOutputWatermark, at the granularity of
> returning
> > from individual processElement() calls - you return from the call and
> give
> > a watermark on your future output. We assume that updating watermark is
> > sufficient at a per-bundle level (or, if not, then that you can make
> > bundles small enough) cause that's the same level at which state changes,
> > timers etc. are committed.
> > It can be implemented by setting a per-key watermark hold and updating it
> > when each call for this element returns. That's the way it is implemented
> > in my current prototype
> https://github.com/apache/incubator-beam/pull/896
> > (see
> > SplittableParDo.ProcessFn)
> >
> > On Mon, Aug 29, 2016 at 2:55 AM Aljoscha Krettek <aljos...@apache.org>
> > wrote:
> >
> > > Hi,
> > > I have another question about this: currently, unbounded sources have
> > > special logic for determining the watermark and the system periodically
> > > asks the sources for the current watermark. As I understood it,
> > watermarks
> > > are only "generated" at the sources. How will this work when sources
> are
> > > implemented as a combination of DoFns and SplittableDoFns? Will
> > > SplittableDoFns be asked for a watermark, does this mean that
> watermarks
> > > can then be "generated" at any operation?
> > >
> > > Cheers,
> > > Aljoscha
> > >
> > > On Sun, 21 Aug 2016 at 22:34 Eugene Kirpichov
> > <kirpic...@google.com.invalid
> > > >
> > > wrote:
> > >
> > > > Hi JB,
> > > >
> > > > Yes, I'm assuming you're referring to the "magic" part on the
> transform
> > > > expansion diagram. This is indeed runner-specific, and timers+state
> are
> > > > likely the simplest way to do this for an SDF that does unbounded
> > amount
> > > of
> > > > work.
> > > >
> > > > On Sun, Aug 21, 2016 at 12:14 PM Jean-Baptiste Onofré <
> j...@nanthrax.net
> > >
> > > > wrote:
> > > >
> > > > > Anyway, from a runner perspective, we will have kind of API (part
> of
> > > the
> > > > > Runner API) to "orchestrate" the SDF as we discussed during the
> call,
> > > > > right ?
> > > > >
> > > > > Regards
> > > > > JB
> > > > >
> > > > > On 08/21/2016 07:24 PM, Eugene Kirpichov wrote:
> > > > > > Hi Aljoscha,
> > > > > > This is an excellent question! And the answer is, we don't need
> any
> > > new
> > > > > > concepts like "SDF executor" and can rely on the per-key state
> and
> > > > timers
> > > > > > machinery that already exists in all runners because it's
> necessary
> > > to
> > > > > > implement windowing/triggering properly.
> > > > > >
> > > > > > Note that this is already somewhat addressed in the previously
> > posted
> > > > > State
> > > > > > and Timers proposal https://s.apache.org/beam-state , under
> > "per-key
> > > > > > workflows".
> > > > > >
> > > > > > Think of it this way, using the Kafka example: we'll expand it
> > into a
> > > > > > transform:
> > > > > >
> > > > > > (1) ParDo { topic -> (unique key, topic, partition, [0, inf)))
> for
> > > > > > partition in topic.listPartitions() }
> > > > > > (2) GroupByKey
> > > > > > (3) ParDo { key, topic, partition, R -> Kafka reader code in the
> > > > > > proposal/slides }
> > > > > >   - R is the OffsetRange restriction which in this case will be
> > > always
> > > > of
> > > > > > the form [startOffset, inf).
> > > > > >   - there'll be just 1 value per key, but we use GBK to just get
> > > access
> > > > > to
> > > > > > the per-key state/timers machinery. This may be runner-specific;
> > > maybe
> > > > > some
> > > > > > runners don't need a GBK to do that.
> > > > > >
> > > > > > Now suppose the topic has two partitions, P1 and P2, and they get
> > > > > assigned
> > > > > > unique keys K1, K2.
> > > > > > Then the input to (3) will be a collection of: (K1, topic, P1,
> [0,
> > > > inf)),
> > > > > > (K2, topic, P2, [0, inf)).
> > > > > > Suppose we have just 1 worker with just 1 thread. Now, how will
> > this
> > > > > thread
> > > > > > be able to produce elements from both P1 and P2? here's how.
> > > > > >
> > > > > > The thread will process (K1, topic, P1, [0, inf)), checkpoint
> > after a
> > > > > > certain time or after a certain number of elements are output
> (just
> > > > like
> > > > > > with the current UnboundedSource reading code) producing a
> residual
> > > > > > restriction R1' (basically a new start timestamp), put R11 into
> the
> > > > > per-key
> > > > > > state and set a timer T1 to resume.
> > > > > > Then it will process (K2, topic, P2, [0, inf)), do the same
> > > producing a
> > > > > > residual restriction R2' and setting a timer T2 to resume.
> > > > > > Then timer T1 will fire in the context of the key K1. The thread
> > will
> > > > > call
> > > > > > processElement again, this time supplying R1' as the restriction;
> > the
> > > > > > process repeats and after a while it checkpoints and stores R1''
> > into
> > > > > state
> > > > > > of K1.
> > > > > > Then timer T2 will fire in the context of K2, run processElement
> > for
> > > a
> > > > > > while, set a new timer and store R2'' into the state of K2.
> > > > > > Etc.
> > > > > > If partition 1 goes away, the processElement call will return "do
> > not
> > > > > > resume", so a timer will not be set and instead the state
> > associated
> > > > with
> > > > > > K1 will be GC'd.
> > > > > >
> > > > > > So basically it's almost like cooperative thread scheduling:
> things
> > > run
> > > > > for
> > > > > > a while, until the runner tells them to checkpoint, then they
> set a
> > > > timer
> > > > > > to resume themselves, and the runner fires the timers, and the
> > > process
> > > > > > repeats. And, again, this only requires things that runners can
> > > already
> > > > > do
> > > > > > - state and timers, but no new concept of SDF executor (and
> > > > consequently
> > > > > no
> > > > > > necessity to choose/tune how many you need).
> > > > > >
> > > > > > Makes sense?
> > > > > >
> > > > > > On Sat, Aug 20, 2016 at 9:46 AM Aljoscha Krettek <
> > > aljos...@apache.org>
> > > > > > wrote:
> > > > > >
> > > > > >> Hi,
> > > > > >> I have another question that I think wasn't addressed in the
> > > meeting.
> > > > At
> > > > > >> least it wasn't mentioned in the notes.
> > > > > >>
> > > > > >> In the context of replacing sources by a combination of to SDFs,
> > how
> > > > do
> > > > > you
> > > > > >> determine how many "SDF executor" instances you need downstream?
> > For
> > > > the
> > > > > >> sake of argument assume that both SDFs are executed with
> > > parallelism 1
> > > > > (or
> > > > > >> one per worker). Now, if you have a file source that reads from
> a
> > > > static
> > > > > >> set of files the first SDF would emit the filenames while the
> > second
> > > > SDF
> > > > > >> would receive the filenames and emit their contents. This works
> > well
> > > > and
> > > > > >> the downstream SDF can process one filename after the other.
> Now,
> > > > think
> > > > > of
> > > > > >> something like a Kafka source. The first SDF would emit the
> > > partitions
> > > > > (say
> > > > > >> 4 partitions, in this example) and the second SDF would be
> > > responsible
> > > > > for
> > > > > >> reading from a topic and emitting elements. Reading from one
> topic
> > > > never
> > > > > >> finishes so you can't process the topics in series. I think you
> > > would
> > > > > need
> > > > > >> to have 4 downstream "SDF executor" instances. The question now
> > is:
> > > > how
> > > > > do
> > > > > >> you determine whether you are in the first or the second
> > situation?
> > > > > >>
> > > > > >> Probably I'm just overlooking something and this is already
> dealt
> > > with
> > > > > >> somewhere... :-)
> > > > > >>
> > > > > >> Cheers,
> > > > > >> Aljoscha
> > > > > >>
> > > > > >> On Fri, 19 Aug 2016 at 21:02 Ismaël Mejía <ieme...@gmail.com>
> > > wrote:
> > > > > >>
> > > > > >>> Hello,
> > > > > >>>
> > > > > >>> Thanks for the notes both Dan and Eugene, and for taking the
> time
> > > to
> > > > do
> > > > > >> the
> > > > > >>> presentation and  answer our questions.
> > > > > >>>
> > > > > >>> I mentioned the ongoing work on dynamic scaling on Flink
> because
> > I
> > > > > >> suppose
> > > > > >>> that it will address dynamic rebalancing eventually (there are
> > > > multiple
> > > > > >>> changes going on for dynamic scaling).
> > > > > >>>
> > > > > >>>
> > > > > >>>
> > > > > >>
> > > > >
> > > >
> > >
> >
> https://docs.google.com/document/d/1G1OS1z3xEBOrYD4wSu-LuBCyPUWyFd9l3T9WyssQ63w/edit#heading=h.2suhu1fjxmp4
> > > > > >>>
> > > > > >>>
> > > >
> https://lists.apache.org/list.html?d...@flink.apache.org:lte=1M:FLIP-8
> > > > > >>>
> > > > > >>> Anyway I am far from an expert on flink, but probably the flink
> > > guys
> > > > > can
> > > > > >>> give their opinion about this and refer to a more precise
> > document
> > > > that
> > > > > >> the
> > > > > >>> ones I mentioned..
> > > > > >>>
> > > > > >>> ​Thanks again,
> > > > > >>> Ismaël​
> > > > > >>>
> > > > > >>> On Fri, Aug 19, 2016 at 8:52 PM, Jean-Baptiste Onofré <
> > > > j...@nanthrax.net
> > > > > >
> > > > > >>> wrote:
> > > > > >>>
> > > > > >>>> Great summary Eugene and Dan.
> > > > > >>>>
> > > > > >>>> And thanks again for the details, explanation, and discussion.
> > > > > >>>>
> > > > > >>>> Regards
> > > > > >>>> JB
> > > > > >>>>
> > > > > >>>>
> > > > > >>>> On 08/19/2016 08:16 PM, Eugene Kirpichov wrote:
> > > > > >>>>
> > > > > >>>>> Thanks for attending, everybody!
> > > > > >>>>>
> > > > > >>>>> Here are meeting notes (thanks Dan!).
> > > > > >>>>>
> > > > > >>>>> Q: Will SplittableDoFn enable better repartitioning of the
> > > > > >> input/output
> > > > > >>>>> data?
> > > > > >>>>> A: Not really; repartitioning is orthogonal to SDF.
> > > > > >>>>>
> > > > > >>>>> Current Source API suffers from lack of composition and
> > > scalability
> > > > > >>>>> because
> > > > > >>>>> we treat sources too much as metadata, not enough as data.
> > > > > >>>>>
> > > > > >>>>> Q(slide with transform expansion): who does the "magic"?
> > > > > >>>>> A: The runner. Checkpointing and dynamically splitting
> > > restrictions
> > > > > >> will
> > > > > >>>>> require collaboration with the runner.
> > > > > >>>>>
> > > > > >>>>> Q: How does the runner interact with the DoFn to control the
> > > > > >>> restrictions?
> > > > > >>>>> Is it related to the centralized job tracker etc.?
> > > > > >>>>> A: RestrictionTracker is a simple helper object, that exists
> > > purely
> > > > > on
> > > > > >>> the
> > > > > >>>>> worker while executing a single partition, and interacts with
> > the
> > > > > >> worker
> > > > > >>>>> harness part of the runner. Not to be confused with the
> > > centralized
> > > > > >> job
> > > > > >>>>> tracker (master) - completely unrelated. Worker harness, of
> > > course,
> > > > > >>>>> interacts with the master in some relevant ways (e.g.
> Dataflow
> > > > master
> > > > > >>> can
> > > > > >>>>> tell "you're a straggler, you should split").
> > > > > >>>>>
> > > > > >>>>> Q: Is this a new DoFn subclass, or how will this integrate
> with
> > > the
> > > > > >>>>> existing code?
> > > > > >>>>> A: It's a feature of reflection-based DoFn (
> > > > > >>> https://s.apache.org/a-new-do
> > > > > >>>>> fn)
> > > > > >>>>> - just another optional parameter of type RestrictionTracker
> to
> > > > > >>>>> processElement() which is dynamically bound via reflection,
> so
> > > > fully
> > > > > >>>>> backward/forward compatible, and looks to users like a
> regular
> > > > DoFn.
> > > > > >>>>>
> > > > > >>>>> Q: why is fractionClaimed a double?
> > > > > >>>>> A: It's a number in 0..1. Job-wide magic (e.g. autoscaling,
> > > dynamic
> > > > > >>>>> rebalancing) requires a uniform way to represent progress
> > through
> > > > > >>>>> different
> > > > > >>>>> sources.
> > > > > >>>>>
> > > > > >>>>> Q: Spark runner is microbatch-based, so this seems to map
> well
> > > onto
> > > > > >>>>> checkpoint/resume, right?
> > > > > >>>>> A: Yes; actually the Dataflow runner is, at a worker level,
> > also
> > > > > >>>>> microbatch-based. The way SDF interacts with a runner will be
> > > very
> > > > > >>> similar
> > > > > >>>>> to how a Bounded/UnboundedSource interacts with a runner.
> > > > > >>>>>
> > > > > >>>>> Q: Using SDF, what would be the "packaging" of the IO?
> > > > > >>>>> A: Same as currently: package IO's as PTransforms and their
> > > > > >>> implementation
> > > > > >>>>> under the hood can be anything: Source, simple ParDo's, SDF,
> > etc.
> > > > > E.g.
> > > > > >>>>> Datastore was recently refactored from BoundedSource to ParDo
> > > > (ended
> > > > > >> up
> > > > > >>>>> simpler and more scalable), transparently to users.
> > > > > >>>>>
> > > > > >>>>> Q: What's the timeline; what to do with the IOs currently in
> > > > > >>> development?
> > > > > >>>>> A: Timeline is O(months). Keep doing what you're doing and
> > > working
> > > > on
> > > > > >>> top
> > > > > >>>>> of Source APIs when necessary and simple ParDo's otherwise.
> > > > > >>>>>
> > > > > >>>>> Q: What's the impact for the runner writers?
> > > > > >>>>> A: Tentatively expected that most of the code for running an
> > SDF
> > > > will
> > > > > >> be
> > > > > >>>>> common to runners, with some amount of per-runner glue code,
> > just
> > > > > like
> > > > > >>>>> GBK/windowing/triggering. Impact on Dataflow runner is larger
> > > since
> > > > > it
> > > > > >>>>> supports dynamic rebalancing in batch mode and this is the
> > > hardest
> > > > > >> part,
> > > > > >>>>> but for other runners shouldn't be too hard.
> > > > > >>>>>
> > > > > >>>>> JB: Talend has people who can help with this: e.g. help
> > integrate
> > > > > into
> > > > > >>>>> Spark runner, refactor IOs etc. Amit also willing to chat
> about
> > > > > >>> supporting
> > > > > >>>>> SDF in Spark runner.
> > > > > >>>>>
> > > > > >>>>> Ismael: There's a Flink proposal about dynamic rebalancing.
> > > Ismael
> > > > > >> will
> > > > > >>>>> send a link.
> > > > > >>>>>
> > > > > >>>>>
> > > > > >>>>> On Fri, Aug 19, 2016 at 7:21 AM Jean-Baptiste Onofré <
> > > > > j...@nanthrax.net
> > > > > >>>
> > > > > >>>>> wrote:
> > > > > >>>>>
> > > > > >>>>> Hi Eugene,
> > > > > >>>>>>
> > > > > >>>>>> thanks for the reminder.
> > > > > >>>>>>
> > > > > >>>>>> Just to prepare some topics for the call, please find some
> > > points:
> > > > > >>>>>>
> > > > > >>>>>> 1. Using SDF, what would be the "packaging" of the IO ? It
> > > sounds
> > > > to
> > > > > >> me
> > > > > >>>>>> that we can keep the IO packaging style (using with* setters
> > for
> > > > the
> > > > > >> IO
> > > > > >>>>>> configuration) and replace PTransform, Source, Reader, ...
> > > > directly
> > > > > >>> with
> > > > > >>>>>> SDF. Correct ?
> > > > > >>>>>>
> > > > > >>>>>> 2. What's your plan in term of release to include SDF ? We
> > have
> > > > > >> several
> > > > > >>>>>> IOs in preparation and I wonder if it's worth to start to
> use
> > > the
> > > > > new
> > > > > >>>>>> SDF API or not.
> > > > > >>>>>>
> > > > > >>>>>> 3. What's the impact for the runner writers ? The runners
> will
> > > > have
> > > > > >> to
> > > > > >>>>>> support SDF, that could be tricky depending of the execution
> > > > engine.
> > > > > >> In
> > > > > >>>>>> the worst case where the runner can't fully support SDF,
> does
> > it
> > > > > mean
> > > > > >>>>>> that most of our IOs will be useless ?
> > > > > >>>>>>
> > > > > >>>>>> Just my dumb topics ;)
> > > > > >>>>>>
> > > > > >>>>>> Thanks,
> > > > > >>>>>> See you at 8am !
> > > > > >>>>>>
> > > > > >>>>>> Regards
> > > > > >>>>>> JB
> > > > > >>>>>>
> > > > > >>>>>> On 08/19/2016 02:29 AM, Eugene Kirpichov wrote:
> > > > > >>>>>>
> > > > > >>>>>>> Hello everybody,
> > > > > >>>>>>>
> > > > > >>>>>>> Just a reminder:
> > > > > >>>>>>>
> > > > > >>>>>>> The meeting is happening tomorrow - Friday Aug 19th,
> 8am-9am
> > > PST,
> > > > > to
> > > > > >>>>>>> join
> > > > > >>>>>>> the call go to
> > > > > >>>>>>>
> > > https://hangouts.google.com/hangouts/_/google.com/splittabledofn
> > > > .
> > > > > >>>>>>> I intend to go over the proposed design and then have a
> > > free-form
> > > > > >>>>>>> discussion.
> > > > > >>>>>>>
> > > > > >>>>>>> Please have a skim through the proposal doc:
> > > > https://s.apache.org/
> > > > > >>>>>>> splittable-do-fn
> > > > > >>>>>>> I also made some slides that are basically a trimmed-down
> > > version
> > > > > of
> > > > > >>> the
> > > > > >>>>>>> doc to use as a guide when conducting the meeting,
> > > > > >>>>>>>
> > > > > >>>>>>>
> https://docs.google.com/presentation/d/1DfCp9VRLH-5Ap5nbz047
> > > > > >>>>>> Fiv3NfjrywJVSdef5dRY4aE/edit?usp=sharing
> > > > > >>>>>>
> > > > > >>>>>>> .
> > > > > >>>>>>>
> > > > > >>>>>>> I will post notes from the meeting on this thread
> afterwards.
> > > > > >>>>>>>
> > > > > >>>>>>> Thanks, looking forward.
> > > > > >>>>>>>
> > > > > >>>>>>> On Fri, Aug 12, 2016 at 5:35 PM Dan Halperin
> > > > > >>>>>>> <dhalp...@google.com.invalid
> > > > > >>>>>>>
> > > > > >>>>>>> wrote:
> > > > > >>>>>>>
> > > > > >>>>>>> This is pretty cool! I'll be there too. (unless the hangout
> > > gets
> > > > > too
> > > > > >>>>>>>>
> > > > > >>>>>>> full
> > > > > >>>>>>
> > > > > >>>>>>> -- if so, I'll drop out in favor of others who aren't lucky
> > > > enough
> > > > > >> to
> > > > > >>>>>>>>
> > > > > >>>>>>> get
> > > > > >>>>>>
> > > > > >>>>>>> to talk to Eugene all the time.)
> > > > > >>>>>>>>
> > > > > >>>>>>>> On Fri, Aug 12, 2016 at 4:03 PM, Andrew Psaltis <
> > > > > >>>>>>>>
> > > > > >>>>>>> psaltis.and...@gmail.com>
> > > > > >>>>>>
> > > > > >>>>>>> wrote:
> > > > > >>>>>>>>
> > > > > >>>>>>>> +1 I'll join
> > > > > >>>>>>>>>
> > > > > >>>>>>>>> On Friday, August 12, 2016, Aparup Banerjee (apbanerj) <
> > > > > >>>>>>>>>
> > > > > >>>>>>>> apban...@cisco.com
> > > > > >>>>>>>>
> > > > > >>>>>>>>>
> > > > > >>>>>>>>>> wrote:
> > > > > >>>>>>>>>
> > > > > >>>>>>>>> + 1, me2
> > > > > >>>>>>>>>>
> > > > > >>>>>>>>>>
> > > > > >>>>>>>>>>
> > > > > >>>>>>>>>>
> > > > > >>>>>>>>>> On 8/12/16, 9:27 AM, "Amit Sela" <amitsel...@gmail.com
> > > > > >>>>>>>>>>
> > > > > >>>>>>>>> <javascript:;>>
> > > > > >>>>>>
> > > > > >>>>>>> wrote:
> > > > > >>>>>>>>>>
> > > > > >>>>>>>>>> +1 as in I'll join ;-)
> > > > > >>>>>>>>>>>
> > > > > >>>>>>>>>>> On Fri, Aug 12, 2016, 19:14 Eugene Kirpichov
> > > > > >>>>>>>>>>>
> > > > > >>>>>>>>>> <kirpic...@google.com.invalid
> > > > > >>>>>>>>>
> > > > > >>>>>>>>>>
> > > > > >>>>>>>>>>> wrote:
> > > > > >>>>>>>>>>>
> > > > > >>>>>>>>>>> Sounds good, thanks!
> > > > > >>>>>>>>>>>> Then Friday Aug 19th it is, 8am-9am PST,
> > > > > >>>>>>>>>>>>
> https://staging.talkgadget.google.com/hangouts/_/google
> > .
> > > > > >>>>>>>>>>>>
> > > > > >>>>>>>>>>> com/splittabledofn
> > > > > >>>>>>>>>>
> > > > > >>>>>>>>>>>
> > > > > >>>>>>>>>>>> On Thu, Aug 11, 2016 at 11:12 PM Jean-Baptiste Onofré
> <
> > > > > >>>>>>>>>>>>
> > > > > >>>>>>>>>>> j...@nanthrax.net
> > > > > >>>>>>>>>
> > > > > >>>>>>>>>> <javascript:;>>
> > > > > >>>>>>>>>>
> > > > > >>>>>>>>>>> wrote:
> > > > > >>>>>>>>>>>>
> > > > > >>>>>>>>>>>> Hi
> > > > > >>>>>>>>>>>>>
> > > > > >>>>>>>>>>>>> Unfortunately I will be in Ireland on August 15th.
> What
> > > > about
> > > > > >>>>>>>>>>>>>
> > > > > >>>>>>>>>>>> Friday
> > > > > >>>>>>>>
> > > > > >>>>>>>>> 19th ?
> > > > > >>>>>>>>>>>>
> > > > > >>>>>>>>>>>>>
> > > > > >>>>>>>>>>>>> Regards
> > > > > >>>>>>>>>>>>> JB
> > > > > >>>>>>>>>>>>>
> > > > > >>>>>>>>>>>>>
> > > > > >>>>>>>>>>>>>
> > > > > >>>>>>>>>>>>> On Aug 11, 2016, 23:22, at 23:22, Eugene Kirpichov
> > > > > >>>>>>>>>>>>> <kirpic...@google.com.INVALID> wrote:
> > > > > >>>>>>>>>>>>>
> > > > > >>>>>>>>>>>>>> Hi JB,
> > > > > >>>>>>>>>>>>>>
> > > > > >>>>>>>>>>>>>> Sounds great, does the suggested time over
> > > videoconference
> > > > > >> work
> > > > > >>>>>>>>>>>>>>
> > > > > >>>>>>>>>>>>> for
> > > > > >>>>>>>>
> > > > > >>>>>>>>> you?
> > > > > >>>>>>>>>>>>>>
> > > > > >>>>>>>>>>>>>> On Thu, Aug 11, 2016 at 11:59 AM Jean-Baptiste
> Onofré
> > <
> > > > > >>>>>>>>>>>>>>
> > > > > >>>>>>>>>>>>> j...@nanthrax.net <javascript:;>>
> > > > > >>>>>>>>>>
> > > > > >>>>>>>>>>> wrote:
> > > > > >>>>>>>>>>>>>>
> > > > > >>>>>>>>>>>>>> Hi Eugene
> > > > > >>>>>>>>>>>>>>>
> > > > > >>>>>>>>>>>>>>> May we talk together next week ? I like the
> > proposal. I
> > > > > >> would
> > > > > >>>>>>>>>>>>>>>
> > > > > >>>>>>>>>>>>>> just
> > > > > >>>>>>>>>
> > > > > >>>>>>>>>> need
> > > > > >>>>>>>>>>>>>>
> > > > > >>>>>>>>>>>>>>> some details for my understanding.
> > > > > >>>>>>>>>>>>>>>
> > > > > >>>>>>>>>>>>>>> Thanks
> > > > > >>>>>>>>>>>>>>> Regards
> > > > > >>>>>>>>>>>>>>> JB
> > > > > >>>>>>>>>>>>>>>
> > > > > >>>>>>>>>>>>>>>
> > > > > >>>>>>>>>>>>>>>
> > > > > >>>>>>>>>>>>>>> On Aug 11, 2016, 19:46, at 19:46, Eugene Kirpichov
> > > > > >>>>>>>>>>>>>>> <kirpic...@google.com.INVALID> wrote:
> > > > > >>>>>>>>>>>>>>>
> > > > > >>>>>>>>>>>>>>>> Hi JB,
> > > > > >>>>>>>>>>>>>>>>
> > > > > >>>>>>>>>>>>>>>> What are your thoughts on this?
> > > > > >>>>>>>>>>>>>>>>
> > > > > >>>>>>>>>>>>>>>> I'm also thinking of having a virtual meeting to
> > > explain
> > > > > >> more
> > > > > >>>>>>>>>>>>>>>>
> > > > > >>>>>>>>>>>>>>> about
> > > > > >>>>>>>>>>
> > > > > >>>>>>>>>>> this
> > > > > >>>>>>>>>>>>>>>> proposal if necessary, since I understand it is a
> > lot
> > > to
> > > > > >>>>>>>>>>>>>>>>
> > > > > >>>>>>>>>>>>>>> digest.
> > > > > >>>>>>>>
> > > > > >>>>>>>>>
> > > > > >>>>>>>>>>>>>>>> How about: Monday Aug 15, 8am-9am Pacific time,
> over
> > > > > >>> Hangouts?
> > > > > >>>>>>>>>>>>>>>> (link:
> > > > > >>>>>>>>>>>>>>>>
> > > > > >>>>>>>>>>>>>>>>
> > > > > >>>>>>>>>>>>>>>
> > > > > >>>>>>>>>>>>>>
> > > > > >>>>>>>>>>>>>
> > https://staging.talkgadget.google.com/hangouts/_/google.
> > > > > >>>>>>>>>>>>
> > > > > >>>>>>>>>>> com/splittabledofn
> > > > > >>>>>>>>>>
> > > > > >>>>>>>>>>> -
> > > > > >>>>>>>>>>>>>>>> I confirmed that it can be joined without being
> > logged
> > > > > >> into a
> > > > > >>>>>>>>>>>>>>>>
> > > > > >>>>>>>>>>>>>>> Google
> > > > > >>>>>>>>>>
> > > > > >>>>>>>>>>> account)
> > > > > >>>>>>>>>>>>>>>>
> > > > > >>>>>>>>>>>>>>>> Who'd be interested in attending, and does this
> > > > time/date
> > > > > >>> work
> > > > > >>>>>>>>>>>>>>>>
> > > > > >>>>>>>>>>>>>>> for
> > > > > >>>>>>>>>
> > > > > >>>>>>>>>> people?
> > > > > >>>>>>>>>>>>>>>>
> > > > > >>>>>>>>>>>>>>>> On Fri, Aug 5, 2016 at 10:48 AM Eugene Kirpichov
> > > > > >>>>>>>>>>>>>>>>
> > > > > >>>>>>>>>>>>>>> <kirpic...@google.com <javascript:;>>
> > > > > >>>>>>>>>>>>>>
> > > > > >>>>>>>>>>>>>>> wrote:
> > > > > >>>>>>>>>>>>>>>>
> > > > > >>>>>>>>>>>>>>>> Hi JB, thanks for reading and for your comments!
> > > > > >>>>>>>>>>>>>>>>>
> > > > > >>>>>>>>>>>>>>>>> It sounds like you are concerned about continued
> > > > support
> > > > > >> for
> > > > > >>>>>>>>>>>>>>>>>
> > > > > >>>>>>>>>>>>>>>> existing
> > > > > >>>>>>>>>>>>>>
> > > > > >>>>>>>>>>>>>>> IO's
> > > > > >>>>>>>>>>>>>>>>
> > > > > >>>>>>>>>>>>>>>>> people have developed, and about backward
> > > > compatibility?
> > > > > >>>>>>>>>>>>>>>>>
> > > > > >>>>>>>>>>>>>>>>> We do not need to remove the Source API, and all
> > > > existing
> > > > > >>>>>>>>>>>>>>>>>
> > > > > >>>>>>>>>>>>>>>> Source-based
> > > > > >>>>>>>>>>>>>>>>
> > > > > >>>>>>>>>>>>>>>>> connectors will continue to work [though the
> > document
> > > > > >>>>>>>>>>>>>>>>>
> > > > > >>>>>>>>>>>>>>>> proposes
> > > > > >>>>>>>>
> > > > > >>>>>>>>> at
> > > > > >>>>>>>>>>
> > > > > >>>>>>>>>>> some
> > > > > >>>>>>>>>>>>>>>>
> > > > > >>>>>>>>>>>>>>>>> point to make Read.from(Source) to translate to a
> > > > wrapper
> > > > > >>>>>>>>>>>>>>>>>
> > > > > >>>>>>>>>>>>>>>> SDF
> > > > > >>>>>>>>
> > > > > >>>>>>>>> under
> > > > > >>>>>>>>>>>>>>
> > > > > >>>>>>>>>>>>>>> the
> > > > > >>>>>>>>>>>>>>>>
> > > > > >>>>>>>>>>>>>>>>> hood, to exercise the feature more and to make
> sure
> > > > that
> > > > > >> it
> > > > > >>>>>>>>>>>>>>>>>
> > > > > >>>>>>>>>>>>>>>> is
> > > > > >>>>>>>>
> > > > > >>>>>>>>> strictly
> > > > > >>>>>>>>>>>>>>>>
> > > > > >>>>>>>>>>>>>>>>> more powerful - but this is an optional
> > > implementation
> > > > > >>>>>>>>>>>>>>>>>
> > > > > >>>>>>>>>>>>>>>> detail].
> > > > > >>>>>>>>>
> > > > > >>>>>>>>>>
> > > > > >>>>>>>>>>>>>>>>> Perhaps the document phrases this too strongly -
> > > > > >> "replacing
> > > > > >>>>>>>>>>>>>>>>>
> > > > > >>>>>>>>>>>>>>>> the
> > > > > >>>>>>>>>
> > > > > >>>>>>>>>> Source
> > > > > >>>>>>>>>>>>>>>>
> > > > > >>>>>>>>>>>>>>>>> API": a better phrasing would be "introducing a
> new
> > > API
> > > > > so
> > > > > >>>>>>>>>>>>>>>>>
> > > > > >>>>>>>>>>>>>>>> powerful
> > > > > >>>>>>>>>>>>>>
> > > > > >>>>>>>>>>>>>>> and
> > > > > >>>>>>>>>>>>>>>>
> > > > > >>>>>>>>>>>>>>>>> easy-to-use that hopefully people will choose it
> > over
> > > > the
> > > > > >>>>>>>>>>>>>>>>>
> > > > > >>>>>>>>>>>>>>>> Source
> > > > > >>>>>>>>>
> > > > > >>>>>>>>>> API
> > > > > >>>>>>>>>>>>>>
> > > > > >>>>>>>>>>>>>>> all
> > > > > >>>>>>>>>>>>>>>>
> > > > > >>>>>>>>>>>>>>>>> the time, even though they don't have to" :) And
> we
> > > can
> > > > > >>>>>>>>>>>>>>>>>
> > > > > >>>>>>>>>>>>>>>> discuss
> > > > > >>>>>>>>>
> > > > > >>>>>>>>>> whether or
> > > > > >>>>>>>>>>>>>>>>
> > > > > >>>>>>>>>>>>>>>>> not to actually deprecate/remove the Source API
> at
> > > some
> > > > > >>>>>>>>>>>>>>>>>
> > > > > >>>>>>>>>>>>>>>> point
> > > > > >>>>>>>>
> > > > > >>>>>>>>> down
> > > > > >>>>>>>>>>
> > > > > >>>>>>>>>>> the
> > > > > >>>>>>>>>>>>>>>>
> > > > > >>>>>>>>>>>>>>>>> road, once it becomes clear whether this is the
> > case
> > > or
> > > > > >> not.
> > > > > >>>>>>>>>>>>>>>>>
> > > > > >>>>>>>>>>>>>>>>> To give more context: this proposal came out of
> > > > > >> discussions
> > > > > >>>>>>>>>>>>>>>>>
> > > > > >>>>>>>>>>>>>>>> within
> > > > > >>>>>>>>>>
> > > > > >>>>>>>>>>> the SDK
> > > > > >>>>>>>>>>>>>>>>
> > > > > >>>>>>>>>>>>>>>>> team over the past ~1.5 years, before the Beam
> > > project
> > > > > >>>>>>>>>>>>>>>>>
> > > > > >>>>>>>>>>>>>>>> existed,
> > > > > >>>>>>>>>
> > > > > >>>>>>>>>> on
> > > > > >>>>>>>>>>
> > > > > >>>>>>>>>>> how to
> > > > > >>>>>>>>>>>>>>>>
> > > > > >>>>>>>>>>>>>>>>> make major improvements to the Source API;
> perhaps
> > it
> > > > > will
> > > > > >>>>>>>>>>>>>>>>>
> > > > > >>>>>>>>>>>>>>>> clarify
> > > > > >>>>>>>>>>
> > > > > >>>>>>>>>>> things
> > > > > >>>>>>>>>>>>>>>>
> > > > > >>>>>>>>>>>>>>>>> if I give a history of the ideas discussed:
> > > > > >>>>>>>>>>>>>>>>> - The first idea was to introduce a
> > > > > >>>>>>>>>>>>>>>>>
> > > > > >>>>>>>>>>>>>>>> Read.from(PCollection<Source>)
> > > > > >>>>>>>>>>
> > > > > >>>>>>>>>>> transform while keeping the Source API intact - this,
> > given
> > > > > >>>>>>>>>>>>>>>>>
> > > > > >>>>>>>>>>>>>>>> appropriate
> > > > > >>>>>>>>>>>>>>>>
> > > > > >>>>>>>>>>>>>>>>> implementation, would solve most of the
> scalability
> > > and
> > > > > >>>>>>>>>>>>>>>>>
> > > > > >>>>>>>>>>>>>>>> composability
> > > > > >>>>>>>>>>>>>>
> > > > > >>>>>>>>>>>>>>> issues of IO's. Then most connectors would look
> like
> > :
> > > > > >>>>>>>>>>>>>>>>>
> > > > > >>>>>>>>>>>>>>>> ParDo<A,
> > > > > >>>>>>>>>
> > > > > >>>>>>>>>> Source<B>>
> > > > > >>>>>>>>>>>>>>>>
> > > > > >>>>>>>>>>>>>>>>> + Read.from().
> > > > > >>>>>>>>>>>>>>>>> - Then we figured that the Source class is an
> > > > unnecessary
> > > > > >>>>>>>>>>>>>>>>>
> > > > > >>>>>>>>>>>>>>>> abstraction, as
> > > > > >>>>>>>>>>>>>>>>
> > > > > >>>>>>>>>>>>>>>>> it simply holds data. What if we only had a
> > Reader<S,
> > > > B>
> > > > > >>>>>>>>>>>>>>>>>
> > > > > >>>>>>>>>>>>>>>> class
> > > > > >>>>>>>>
> > > > > >>>>>>>>> where
> > > > > >>>>>>>>>>>>>>
> > > > > >>>>>>>>>>>>>>> S is
> > > > > >>>>>>>>>>>>>>>>
> > > > > >>>>>>>>>>>>>>>>> the source type and B the output type? Then
> > > connectors
> > > > > >> would
> > > > > >>>>>>>>>>>>>>>>>
> > > > > >>>>>>>>>>>>>>>> be
> > > > > >>>>>>>>>
> > > > > >>>>>>>>>> something
> > > > > >>>>>>>>>>>>>>>>
> > > > > >>>>>>>>>>>>>>>>> like: ParDo<A, S> + hypothetical
> > Read.using(Reader<S,
> > > > > B>).
> > > > > >>>>>>>>>>>>>>>>> - Then somebody remarked that some of the
> features
> > of
> > > > > >> Source
> > > > > >>>>>>>>>>>>>>>>>
> > > > > >>>>>>>>>>>>>>>> are
> > > > > >>>>>>>>>
> > > > > >>>>>>>>>> useful to
> > > > > >>>>>>>>>>>>>>>>
> > > > > >>>>>>>>>>>>>>>>> ParDo's as well: e.g. ability to report progress
> > when
> > > > > >>>>>>>>>>>>>>>>>
> > > > > >>>>>>>>>>>>>>>> processing a
> > > > > >>>>>>>>>>
> > > > > >>>>>>>>>>> very
> > > > > >>>>>>>>>>>>>>>>
> > > > > >>>>>>>>>>>>>>>>> heavy element, or ability to produce very large
> > > output
> > > > in
> > > > > >>>>>>>>>>>>>>>>>
> > > > > >>>>>>>>>>>>>>>> parallel.
> > > > > >>>>>>>>>>>>>>
> > > > > >>>>>>>>>>>>>>> - The two previous bullets were already hinting
> that
> > > the
> > > > > >>>>>>>>>>>>>>>>>
> > > > > >>>>>>>>>>>>>>>> Read.using()
> > > > > >>>>>>>>>>>>>>
> > > > > >>>>>>>>>>>>>>> primitive might not be so special: it just takes S
> > and
> > > > > >>>>>>>>>>>>>>>>>
> > > > > >>>>>>>>>>>>>>>> produces
> > > > > >>>>>>>>>
> > > > > >>>>>>>>>> B:
> > > > > >>>>>>>>>>
> > > > > >>>>>>>>>>> isn't
> > > > > >>>>>>>>>>>>>>>>
> > > > > >>>>>>>>>>>>>>>>> that what a ParDo does, plus some source magic,
> > minus
> > > > the
> > > > > >>>>>>>>>>>>>>>>>
> > > > > >>>>>>>>>>>>>>>> convenience
> > > > > >>>>>>>>>>>>>>
> > > > > >>>>>>>>>>>>>>> of
> > > > > >>>>>>>>>>>>>>>>
> > > > > >>>>>>>>>>>>>>>>> c.output() vs. the start/advance() state machine?
> > > > > >>>>>>>>>>>>>>>>> - At this point it became clear that we should
> > > explore
> > > > > >>>>>>>>>>>>>>>>>
> > > > > >>>>>>>>>>>>>>>> unifying
> > > > > >>>>>>>>>
> > > > > >>>>>>>>>> sources
> > > > > >>>>>>>>>>>>>>>>
> > > > > >>>>>>>>>>>>>>>>> and ParDo's, in particular: can we bring the
> magic
> > of
> > > > > >>>>>>>>>>>>>>>>>
> > > > > >>>>>>>>>>>>>>>> sources
> > > > > >>>>>>>>
> > > > > >>>>>>>>> to
> > > > > >>>>>>>>>
> > > > > >>>>>>>>>> ParDo's
> > > > > >>>>>>>>>>>>>>>>
> > > > > >>>>>>>>>>>>>>>>> but without the limitations and coding
> > > inconveniences?
> > > > > And
> > > > > >>>>>>>>>>>>>>>>>
> > > > > >>>>>>>>>>>>>>>> this
> > > > > >>>>>>>>>
> > > > > >>>>>>>>>> is
> > > > > >>>>>>>>>>
> > > > > >>>>>>>>>>> how
> > > > > >>>>>>>>>>>>>>>>
> > > > > >>>>>>>>>>>>>>>>> SplittableDoFn was born: bringing source magic
> to a
> > > > DoFn
> > > > > >> by
> > > > > >>>>>>>>>>>>>>>>>
> > > > > >>>>>>>>>>>>>>>> providing
> > > > > >>>>>>>>>>>>>>
> > > > > >>>>>>>>>>>>>>> a
> > > > > >>>>>>>>>>>>>>>>
> > > > > >>>>>>>>>>>>>>>>> RangeTracker.
> > > > > >>>>>>>>>>>>>>>>> - Once the idea of "splittable DoFn's" was born,
> it
> > > > > became
> > > > > >>>>>>>>>>>>>>>>>
> > > > > >>>>>>>>>>>>>>>> clear
> > > > > >>>>>>>>>
> > > > > >>>>>>>>>> that
> > > > > >>>>>>>>>>>>>>
> > > > > >>>>>>>>>>>>>>> it
> > > > > >>>>>>>>>>>>>>>>
> > > > > >>>>>>>>>>>>>>>>> is strictly more general than sources; at least,
> in
> > > the
> > > > > >>>>>>>>>>>>>>>>>
> > > > > >>>>>>>>>>>>>>>> respect
> > > > > >>>>>>>>>
> > > > > >>>>>>>>>> that
> > > > > >>>>>>>>>>>>>>
> > > > > >>>>>>>>>>>>>>> sources have to produce output, while DoFn's don't:
> > an
> > > > SDF
> > > > > >>>>>>>>>>>>>>>>>
> > > > > >>>>>>>>>>>>>>>> may
> > > > > >>>>>>>>
> > > > > >>>>>>>>> very
> > > > > >>>>>>>>>>>>>>
> > > > > >>>>>>>>>>>>>>> well
> > > > > >>>>>>>>>>>>>>>>
> > > > > >>>>>>>>>>>>>>>>> produce no output at all, and simply perform a
> side
> > > > > effect
> > > > > >>>>>>>>>>>>>>>>>
> > > > > >>>>>>>>>>>>>>>> in
> > > > > >>>>>>>>
> > > > > >>>>>>>>> a
> > > > > >>>>>>>>>
> > > > > >>>>>>>>>> parallel/resumable way.
> > > > > >>>>>>>>>>>>>>>>> - Then there were countless hours of discussions
> on
> > > > > >> unifying
> > > > > >>>>>>>>>>>>>>>>>
> > > > > >>>>>>>>>>>>>>>> the
> > > > > >>>>>>>>>
> > > > > >>>>>>>>>> bounded/unbounded cases, on the particulars of
> > RangeTracker
> > > > > >>>>>>>>>>>>>>>>>
> > > > > >>>>>>>>>>>>>>>> APIs
> > > > > >>>>>>>>>
> > > > > >>>>>>>>>> reconciling parallelization and checkpointing, what the
> > > > > >>>>>>>>>>>>>>>>>
> > > > > >>>>>>>>>>>>>>>> relation
> > > > > >>>>>>>>>
> > > > > >>>>>>>>>> between
> > > > > >>>>>>>>>>>>>>>>
> > > > > >>>>>>>>>>>>>>>>> SDF and DF should be, etc. They culminated in the
> > > > current
> > > > > >>>>>>>>>>>>>>>>>
> > > > > >>>>>>>>>>>>>>>> proposal.
> > > > > >>>>>>>>>>>>>>
> > > > > >>>>>>>>>>>>>>> The
> > > > > >>>>>>>>>>>>>>>>
> > > > > >>>>>>>>>>>>>>>>> proposal comes at a time when a couple of key
> > > > ingredients
> > > > > >>>>>>>>>>>>>>>>>
> > > > > >>>>>>>>>>>>>>>> are
> > > > > >>>>>>>>
> > > > > >>>>>>>>> (almost)
> > > > > >>>>>>>>>>>>>>>>
> > > > > >>>>>>>>>>>>>>>>> ready: NewDoFn to make SDF look like a regular
> > DoFn,
> > > > and
> > > > > >> the
> > > > > >>>>>>>>>>>>>>>>>
> > > > > >>>>>>>>>>>>>>>> State/Timers
> > > > > >>>>>>>>>>>>>>>>
> > > > > >>>>>>>>>>>>>>>>> proposal to enable unbounded work per element.
> > > > > >>>>>>>>>>>>>>>>>
> > > > > >>>>>>>>>>>>>>>>> To put it shortly:
> > > > > >>>>>>>>>>>>>>>>> - Yes, we will support existing Source
> connectors,
> > > and
> > > > > >> will
> > > > > >>>>>>>>>>>>>>>>>
> > > > > >>>>>>>>>>>>>>>> support
> > > > > >>>>>>>>>>>>>>
> > > > > >>>>>>>>>>>>>>> writing new ones, possibly forever. There is no
> > > > > interference
> > > > > >>>>>>>>>>>>>>>>>
> > > > > >>>>>>>>>>>>>>>> with
> > > > > >>>>>>>>>>
> > > > > >>>>>>>>>>> current
> > > > > >>>>>>>>>>>>>>>>
> > > > > >>>>>>>>>>>>>>>>> users of Source.
> > > > > >>>>>>>>>>>>>>>>> - The new API is an attempt to improve the Source
> > > API,
> > > > > >> taken
> > > > > >>>>>>>>>>>>>>>>>
> > > > > >>>>>>>>>>>>>>>> to
> > > > > >>>>>>>>>
> > > > > >>>>>>>>>> its
> > > > > >>>>>>>>>>>>>>
> > > > > >>>>>>>>>>>>>>> logical limit where it turns out that users' goals
> > can
> > > be
> > > > > >>>>>>>>>>>>>>>>>
> > > > > >>>>>>>>>>>>>>>> accomplished
> > > > > >>>>>>>>>>>>>>>>
> > > > > >>>>>>>>>>>>>>>>> easier and more generically entirely within
> > ParDo's.
> > > > > >>>>>>>>>>>>>>>>>
> > > > > >>>>>>>>>>>>>>>>> Let me know what you think, and thanks again!
> > > > > >>>>>>>>>>>>>>>>>
> > > > > >>>>>>>>>>>>>>>>> On Fri, Aug 5, 2016 at 2:39 AM Jean-Baptiste
> Onofré
> > > > > >>>>>>>>>>>>>>>>>
> > > > > >>>>>>>>>>>>>>>> <j...@nanthrax.net <javascript:;>>
> > > > > >>>>>>>>>>>>>>
> > > > > >>>>>>>>>>>>>>> wrote:
> > > > > >>>>>>>>>>>>>>>>>
> > > > > >>>>>>>>>>>>>>>>> Hi Eugene,
> > > > > >>>>>>>>>>>>>>>>>>
> > > > > >>>>>>>>>>>>>>>>>> Just a question: why is it in DoFn and note an
> > > > > >> improvement
> > > > > >>>>>>>>>>>>>>>>>>
> > > > > >>>>>>>>>>>>>>>>> of
> > > > > >>>>>>>>
> > > > > >>>>>>>>> Source
> > > > > >>>>>>>>>>>>>>
> > > > > >>>>>>>>>>>>>>> ?
> > > > > >>>>>>>>>>>>>>>>
> > > > > >>>>>>>>>>>>>>>>>
> > > > > >>>>>>>>>>>>>>>>>> If I understand correctly, it means that we will
> > > have
> > > > to
> > > > > >>>>>>>>>>>>>>>>>>
> > > > > >>>>>>>>>>>>>>>>> refactore
> > > > > >>>>>>>>>>>>>>
> > > > > >>>>>>>>>>>>>>> all
> > > > > >>>>>>>>>>>>>>>>
> > > > > >>>>>>>>>>>>>>>>> existing IO: basically, what you propose is to
> > remove
> > > > all
> > > > > >>>>>>>>>>>>>>>>>>
> > > > > >>>>>>>>>>>>>>>>> Source
> > > > > >>>>>>>>>>
> > > > > >>>>>>>>>>> to
> > > > > >>>>>>>>>>>>>>
> > > > > >>>>>>>>>>>>>>> replace with NewDoFn.
> > > > > >>>>>>>>>>>>>>>>>>
> > > > > >>>>>>>>>>>>>>>>>> I'm concern with this approach, especially in
> term
> > > of
> > > > > >>>>>>>>>>>>>>>>>>
> > > > > >>>>>>>>>>>>>>>>> timing:
> > > > > >>>>>>>>
> > > > > >>>>>>>>> clearly,
> > > > > >>>>>>>>>>>>>>>>
> > > > > >>>>>>>>>>>>>>>>> the IO is the area where we have to move forward
> in
> > > > Beam
> > > > > >> as
> > > > > >>>>>>>>>>>>>>>>>>
> > > > > >>>>>>>>>>>>>>>>> it
> > > > > >>>>>>>>>
> > > > > >>>>>>>>>> will
> > > > > >>>>>>>>>>>>>>
> > > > > >>>>>>>>>>>>>>> allow new users to start in their projects.
> > > > > >>>>>>>>>>>>>>>>>> So, we started to bring new IOs: Kafka, JMS,
> > > > Cassandra,
> > > > > >>>>>>>>>>>>>>>>>>
> > > > > >>>>>>>>>>>>>>>>> MongoDB,
> > > > > >>>>>>>>>>
> > > > > >>>>>>>>>>> JDBC,
> > > > > >>>>>>>>>>>>>>>>
> > > > > >>>>>>>>>>>>>>>>> ... and some people started to learn the IO API
> > > > > >>>>>>>>>>>>>>>>>>
> > > > > >>>>>>>>>>>>>>>>> (Bounded/Unbouded
> > > > > >>>>>>>>>>
> > > > > >>>>>>>>>>> source, etc).
> > > > > >>>>>>>>>>>>>>>>>>
> > > > > >>>>>>>>>>>>>>>>>> I think it would make more sense to enhance the
> IO
> > > API
> > > > > >>>>>>>>>>>>>>>>>>
> > > > > >>>>>>>>>>>>>>>>> (Source)
> > > > > >>>>>>>>>
> > > > > >>>>>>>>>> instead
> > > > > >>>>>>>>>>>>>>>>
> > > > > >>>>>>>>>>>>>>>>> of introducing a NewDoFn.
> > > > > >>>>>>>>>>>>>>>>>>
> > > > > >>>>>>>>>>>>>>>>>> What are your thoughts for IO writer like me ?
> ;)
> > > > > >>>>>>>>>>>>>>>>>>
> > > > > >>>>>>>>>>>>>>>>>> Regards
> > > > > >>>>>>>>>>>>>>>>>> JB
> > > > > >>>>>>>>>>>>>>>>>>
> > > > > >>>>>>>>>>>>>>>>>> On 08/04/2016 07:45 PM, Eugene Kirpichov wrote:
> > > > > >>>>>>>>>>>>>>>>>>
> > > > > >>>>>>>>>>>>>>>>>>> Hello Beam community,
> > > > > >>>>>>>>>>>>>>>>>>>
> > > > > >>>>>>>>>>>>>>>>>>> We (myself, Daniel Mills and Robert Bradshaw)
> > would
> > > > > like
> > > > > >>>>>>>>>>>>>>>>>>>
> > > > > >>>>>>>>>>>>>>>>>> to
> > > > > >>>>>>>>
> > > > > >>>>>>>>> propose
> > > > > >>>>>>>>>>>>>>>>
> > > > > >>>>>>>>>>>>>>>>> "Splittable DoFn" - a major generalization of
> DoFn,
> > > > which
> > > > > >>>>>>>>>>>>>>>>>>>
> > > > > >>>>>>>>>>>>>>>>>> allows
> > > > > >>>>>>>>>>>>>>
> > > > > >>>>>>>>>>>>>>> processing
> > > > > >>>>>>>>>>>>>>>>>>
> > > > > >>>>>>>>>>>>>>>>>>> of a single element to be non-monolithic, i.e.
> > > > > >>>>>>>>>>>>>>>>>>>
> > > > > >>>>>>>>>>>>>>>>>> checkpointable
> > > > > >>>>>>>>>
> > > > > >>>>>>>>>> and
> > > > > >>>>>>>>>>>>>>
> > > > > >>>>>>>>>>>>>>> parallelizable, as well as doing an unbounded
> amount
> > of
> > > > > >>>>>>>>>>>>>>>>>>>
> > > > > >>>>>>>>>>>>>>>>>> work
> > > > > >>>>>>>>>
> > > > > >>>>>>>>>> per
> > > > > >>>>>>>>>>>>>>
> > > > > >>>>>>>>>>>>>>> element.
> > > > > >>>>>>>>>>>>>>>>>>
> > > > > >>>>>>>>>>>>>>>>>>>
> > > > > >>>>>>>>>>>>>>>>>>> This allows effectively replacing the current
> > > > > >>>>>>>>>>>>>>>>>>>
> > > > > >>>>>>>>>>>>>>>>>> Bounded/UnboundedSource
> > > > > >>>>>>>>>>>>>>>>
> > > > > >>>>>>>>>>>>>>>>> APIs
> > > > > >>>>>>>>>>>>>>>>>>
> > > > > >>>>>>>>>>>>>>>>>>> with DoFn's that are much easier to code, more
> > > > scalable
> > > > > >>>>>>>>>>>>>>>>>>>
> > > > > >>>>>>>>>>>>>>>>>> and
> > > > > >>>>>>>>
> > > > > >>>>>>>>> composable
> > > > > >>>>>>>>>>>>>>>>
> > > > > >>>>>>>>>>>>>>>>> with
> > > > > >>>>>>>>>>>>>>>>>>
> > > > > >>>>>>>>>>>>>>>>>>> the rest of the Beam programming model, and
> > enables
> > > > > many
> > > > > >>>>>>>>>>>>>>>>>>>
> > > > > >>>>>>>>>>>>>>>>>> use
> > > > > >>>>>>>>>
> > > > > >>>>>>>>>> cases
> > > > > >>>>>>>>>>>>>>
> > > > > >>>>>>>>>>>>>>> that
> > > > > >>>>>>>>>>>>>>>>
> > > > > >>>>>>>>>>>>>>>>> were previously difficult or impossible, as well
> as
> > > > some
> > > > > >>>>>>>>>>>>>>>>>>>
> > > > > >>>>>>>>>>>>>>>>>> non-obvious new
> > > > > >>>>>>>>>>>>>>>>
> > > > > >>>>>>>>>>>>>>>>> use cases.
> > > > > >>>>>>>>>>>>>>>>>>>
> > > > > >>>>>>>>>>>>>>>>>>> This proposal has been mentioned before in JIRA
> > > > > >> [BEAM-65]
> > > > > >>>>>>>>>>>>>>>>>>>
> > > > > >>>>>>>>>>>>>>>>>> and
> > > > > >>>>>>>>>
> > > > > >>>>>>>>>> some
> > > > > >>>>>>>>>>>>>>
> > > > > >>>>>>>>>>>>>>> Beam
> > > > > >>>>>>>>>>>>>>>>
> > > > > >>>>>>>>>>>>>>>>> meetings, and now the whole thing is written up
> in
> > a
> > > > > >>>>>>>>>>>>>>>>>>>
> > > > > >>>>>>>>>>>>>>>>>> document:
> > > > > >>>>>>>>>>
> > > > > >>>>>>>>>>>
> > > > > >>>>>>>>>>>>>>>>>>>         https://s.apache.org/splittable-do-fn
> > > > > >>>>>>>>>>>>>>>>>>>
> > > > > >>>>>>>>>>>>>>>>>>> Here are some things that become possible with
> > > > > >> Splittable
> > > > > >>>>>>>>>>>>>>>>>>>
> > > > > >>>>>>>>>>>>>>>>>> DoFn:
> > > > > >>>>>>>>>>
> > > > > >>>>>>>>>>> - Efficiently read a filepattern matching millions of
> > > > > >>>>>>>>>>>>>>>>>>>
> > > > > >>>>>>>>>>>>>>>>>> files
> > > > > >>>>>>>>
> > > > > >>>>>>>>> - Read a collection of files that are produced by an
> > > > > >>>>>>>>>>>>>>>>>>>
> > > > > >>>>>>>>>>>>>>>>>> earlier
> > > > > >>>>>>>>>
> > > > > >>>>>>>>>> step
> > > > > >>>>>>>>>>>>>>
> > > > > >>>>>>>>>>>>>>> in the
> > > > > >>>>>>>>>>>>>>>>
> > > > > >>>>>>>>>>>>>>>>> pipeline (e.g. easily implement a connector to a
> > > > storage
> > > > > >>>>>>>>>>>>>>>>>>>
> > > > > >>>>>>>>>>>>>>>>>> system
> > > > > >>>>>>>>>>
> > > > > >>>>>>>>>>> that can
> > > > > >>>>>>>>>>>>>>>>
> > > > > >>>>>>>>>>>>>>>>> export itself to files)
> > > > > >>>>>>>>>>>>>>>>>>> - Implement a Kafka reader by composing a "list
> > > > > >>>>>>>>>>>>>>>>>>>
> > > > > >>>>>>>>>>>>>>>>>> partitions"
> > > > > >>>>>>>>
> > > > > >>>>>>>>> DoFn
> > > > > >>>>>>>>>>>>>>
> > > > > >>>>>>>>>>>>>>> with a
> > > > > >>>>>>>>>>>>>>>>
> > > > > >>>>>>>>>>>>>>>>> DoFn that simply polls a consumer and outputs new
> > > > records
> > > > > >>>>>>>>>>>>>>>>>>>
> > > > > >>>>>>>>>>>>>>>>>> in
> > > > > >>>>>>>>>
> > > > > >>>>>>>>>> a
> > > > > >>>>>>>>>>
> > > > > >>>>>>>>>>> while()
> > > > > >>>>>>>>>>>>>>>>
> > > > > >>>>>>>>>>>>>>>>> loop
> > > > > >>>>>>>>>>>>>>>>>>
> > > > > >>>>>>>>>>>>>>>>>>> - Implement a log tailer by composing a DoFn
> that
> > > > > >>>>>>>>>>>>>>>>>>>
> > > > > >>>>>>>>>>>>>>>>>> incrementally
> > > > > >>>>>>>>>>
> > > > > >>>>>>>>>>> returns
> > > > > >>>>>>>>>>>>>>>>
> > > > > >>>>>>>>>>>>>>>>> new
> > > > > >>>>>>>>>>>>>>>>>>
> > > > > >>>>>>>>>>>>>>>>>>> files in a directory and a DoFn that tails a
> file
> > > > > >>>>>>>>>>>>>>>>>>> - Implement a parallel "count friends in
> common"
> > > > > >>>>>>>>>>>>>>>>>>>
> > > > > >>>>>>>>>>>>>>>>>> algorithm
> > > > > >>>>>>>>
> > > > > >>>>>>>>> (matrix
> > > > > >>>>>>>>>>>>>>
> > > > > >>>>>>>>>>>>>>> squaring) with good work balancing
> > > > > >>>>>>>>>>>>>>>>>>>
> > > > > >>>>>>>>>>>>>>>>>>> Here is the meaningful part of a hypothetical
> > Kafka
> > > > > >>>>>>>>>>>>>>>>>>>
> > > > > >>>>>>>>>>>>>>>>>> reader
> > > > > >>>>>>>>
> > > > > >>>>>>>>> written
> > > > > >>>>>>>>>>>>>>
> > > > > >>>>>>>>>>>>>>> against
> > > > > >>>>>>>>>>>>>>>>>>
> > > > > >>>>>>>>>>>>>>>>>>> this API:
> > > > > >>>>>>>>>>>>>>>>>>>
> > > > > >>>>>>>>>>>>>>>>>>>     ProcessContinuation processElement(
> > > > > >>>>>>>>>>>>>>>>>>>             ProcessContext context,
> > > > OffsetRangeTracker
> > > > > >>>>>>>>>>>>>>>>>>>
> > > > > >>>>>>>>>>>>>>>>>> tracker)
> > > > > >>>>>>>>>>
> > > > > >>>>>>>>>>> {
> > > > > >>>>>>>>>>>>>>
> > > > > >>>>>>>>>>>>>>>       try (KafkaConsumer<String, String> consumer =
> > > > > >>>>>>>>>>>>>>>>>>>
> > > > >  Kafka.subscribe(context.element().topic,
> > > > > >>>>>>>>>>>>>>>>>>>
> > > > > >>>>>>>>>>>>>>>>>>>  context.element().partition)) {
> > > > > >>>>>>>>>>
> > > > > >>>>>>>>>>>         consumer.seek(tracker.start());
> > > > > >>>>>>>>>>>>>>>>>>>         while (true) {
> > > > > >>>>>>>>>>>>>>>>>>>           ConsumerRecords<String, String>
> > records =
> > > > > >>>>>>>>>>>>>>>>>>>
> > > > > >>>>>>>>>>>>>>>>>> consumer.poll(100ms);
> > > > > >>>>>>>>>>>>>>>>>>
> > > > > >>>>>>>>>>>>>>>>>>>           if (records == null) return done();
> > > > > >>>>>>>>>>>>>>>>>>>           for (ConsumerRecord<String, String>
> > > record
> > > > :
> > > > > >>>>>>>>>>>>>>>>>>>
> > > > > >>>>>>>>>>>>>>>>>> records)
> > > > > >>>>>>>>>>
> > > > > >>>>>>>>>>> {
> > > > > >>>>>>>>>>>>>>
> > > > > >>>>>>>>>>>>>>>             if
> (!tracker.tryClaim(record.offset())) {
> > > > > >>>>>>>>>>>>>>>>>>>               return
> > > > > >>>>>>>>>>>>>>>>>>>
> > > > > >>>>>>>>>>>>>>>>>>
> > > > resume().withFutureOutputWatermark(record.timestamp());
> > > > > >>>>>>>>>>>>>>>>>>
> > > > > >>>>>>>>>>>>>>>>>>>             }
> > > > > >>>>>>>>>>>>>>>>>>>             context.output(record);
> > > > > >>>>>>>>>>>>>>>>>>>           }
> > > > > >>>>>>>>>>>>>>>>>>>         }
> > > > > >>>>>>>>>>>>>>>>>>>       }
> > > > > >>>>>>>>>>>>>>>>>>>     }
> > > > > >>>>>>>>>>>>>>>>>>>
> > > > > >>>>>>>>>>>>>>>>>>> The document describes in detail the
> motivations
> > > > behind
> > > > > >>>>>>>>>>>>>>>>>>>
> > > > > >>>>>>>>>>>>>>>>>> this
> > > > > >>>>>>>>>
> > > > > >>>>>>>>>> feature,
> > > > > >>>>>>>>>>>>>>>>
> > > > > >>>>>>>>>>>>>>>>> the
> > > > > >>>>>>>>>>>>>>>>>>
> > > > > >>>>>>>>>>>>>>>>>>> basic idea and API, open questions, and
> outlines
> > an
> > > > > >>>>>>>>>>>>>>>>>>>
> > > > > >>>>>>>>>>>>>>>>>> incremental
> > > > > >>>>>>>>>>
> > > > > >>>>>>>>>>> delivery
> > > > > >>>>>>>>>>>>>>>>
> > > > > >>>>>>>>>>>>>>>>> plan.
> > > > > >>>>>>>>>>>>>>>>>>>
> > > > > >>>>>>>>>>>>>>>>>>> The proposed API builds on the reflection-based
> > new
> > > > > DoFn
> > > > > >>>>>>>>>>>>>>>>>>>
> > > > > >>>>>>>>>>>>>>>>>> [new-do-fn]
> > > > > >>>>>>>>>>>>>>>>
> > > > > >>>>>>>>>>>>>>>>> and is
> > > > > >>>>>>>>>>>>>>>>>>
> > > > > >>>>>>>>>>>>>>>>>>> loosely related to "State and Timers for DoFn"
> > > > > >>>>>>>>>>>>>>>>>>>
> > > > > >>>>>>>>>>>>>>>>>> [beam-state].
> > > > > >>>>>>>>>
> > > > > >>>>>>>>>>
> > > > > >>>>>>>>>>

Reply via email to