Thanks for the explanation Eugene and JB.

By the way, I'm not trying to find holes in this, I really like the
feature. I just sometimes wonder how a specific thing might be implemented
with this.

On Mon, 29 Aug 2016 at 18:00 Eugene Kirpichov <[email protected]>
wrote:

> Hi Aljoscha,
>
> The watermark reporting is done via
> ProcessContinuation.futureOutputWatermark, at the granularity of returning
> from individual processElement() calls - you return from the call and give
> a watermark on your future output. We assume that updating watermark is
> sufficient at a per-bundle level (or, if not, then that you can make
> bundles small enough) cause that's the same level at which state changes,
> timers etc. are committed.
> It can be implemented by setting a per-key watermark hold and updating it
> when each call for this element returns. That's the way it is implemented
> in my current prototype https://github.com/apache/incubator-beam/pull/896
> (see
> SplittableParDo.ProcessFn)
>
> On Mon, Aug 29, 2016 at 2:55 AM Aljoscha Krettek <[email protected]>
> wrote:
>
> > Hi,
> > I have another question about this: currently, unbounded sources have
> > special logic for determining the watermark and the system periodically
> > asks the sources for the current watermark. As I understood it,
> watermarks
> > are only "generated" at the sources. How will this work when sources are
> > implemented as a combination of DoFns and SplittableDoFns? Will
> > SplittableDoFns be asked for a watermark, does this mean that watermarks
> > can then be "generated" at any operation?
> >
> > Cheers,
> > Aljoscha
> >
> > On Sun, 21 Aug 2016 at 22:34 Eugene Kirpichov
> <[email protected]
> > >
> > wrote:
> >
> > > Hi JB,
> > >
> > > Yes, I'm assuming you're referring to the "magic" part on the transform
> > > expansion diagram. This is indeed runner-specific, and timers+state are
> > > likely the simplest way to do this for an SDF that does unbounded
> amount
> > of
> > > work.
> > >
> > > On Sun, Aug 21, 2016 at 12:14 PM Jean-Baptiste Onofré <[email protected]
> >
> > > wrote:
> > >
> > > > Anyway, from a runner perspective, we will have kind of API (part of
> > the
> > > > Runner API) to "orchestrate" the SDF as we discussed during the call,
> > > > right ?
> > > >
> > > > Regards
> > > > JB
> > > >
> > > > On 08/21/2016 07:24 PM, Eugene Kirpichov wrote:
> > > > > Hi Aljoscha,
> > > > > This is an excellent question! And the answer is, we don't need any
> > new
> > > > > concepts like "SDF executor" and can rely on the per-key state and
> > > timers
> > > > > machinery that already exists in all runners because it's necessary
> > to
> > > > > implement windowing/triggering properly.
> > > > >
> > > > > Note that this is already somewhat addressed in the previously
> posted
> > > > State
> > > > > and Timers proposal https://s.apache.org/beam-state , under
> "per-key
> > > > > workflows".
> > > > >
> > > > > Think of it this way, using the Kafka example: we'll expand it
> into a
> > > > > transform:
> > > > >
> > > > > (1) ParDo { topic -> (unique key, topic, partition, [0, inf))) for
> > > > > partition in topic.listPartitions() }
> > > > > (2) GroupByKey
> > > > > (3) ParDo { key, topic, partition, R -> Kafka reader code in the
> > > > > proposal/slides }
> > > > >   - R is the OffsetRange restriction which in this case will be
> > always
> > > of
> > > > > the form [startOffset, inf).
> > > > >   - there'll be just 1 value per key, but we use GBK to just get
> > access
> > > > to
> > > > > the per-key state/timers machinery. This may be runner-specific;
> > maybe
> > > > some
> > > > > runners don't need a GBK to do that.
> > > > >
> > > > > Now suppose the topic has two partitions, P1 and P2, and they get
> > > > assigned
> > > > > unique keys K1, K2.
> > > > > Then the input to (3) will be a collection of: (K1, topic, P1, [0,
> > > inf)),
> > > > > (K2, topic, P2, [0, inf)).
> > > > > Suppose we have just 1 worker with just 1 thread. Now, how will
> this
> > > > thread
> > > > > be able to produce elements from both P1 and P2? here's how.
> > > > >
> > > > > The thread will process (K1, topic, P1, [0, inf)), checkpoint
> after a
> > > > > certain time or after a certain number of elements are output (just
> > > like
> > > > > with the current UnboundedSource reading code) producing a residual
> > > > > restriction R1' (basically a new start timestamp), put R11 into the
> > > > per-key
> > > > > state and set a timer T1 to resume.
> > > > > Then it will process (K2, topic, P2, [0, inf)), do the same
> > producing a
> > > > > residual restriction R2' and setting a timer T2 to resume.
> > > > > Then timer T1 will fire in the context of the key K1. The thread
> will
> > > > call
> > > > > processElement again, this time supplying R1' as the restriction;
> the
> > > > > process repeats and after a while it checkpoints and stores R1''
> into
> > > > state
> > > > > of K1.
> > > > > Then timer T2 will fire in the context of K2, run processElement
> for
> > a
> > > > > while, set a new timer and store R2'' into the state of K2.
> > > > > Etc.
> > > > > If partition 1 goes away, the processElement call will return "do
> not
> > > > > resume", so a timer will not be set and instead the state
> associated
> > > with
> > > > > K1 will be GC'd.
> > > > >
> > > > > So basically it's almost like cooperative thread scheduling: things
> > run
> > > > for
> > > > > a while, until the runner tells them to checkpoint, then they set a
> > > timer
> > > > > to resume themselves, and the runner fires the timers, and the
> > process
> > > > > repeats. And, again, this only requires things that runners can
> > already
> > > > do
> > > > > - state and timers, but no new concept of SDF executor (and
> > > consequently
> > > > no
> > > > > necessity to choose/tune how many you need).
> > > > >
> > > > > Makes sense?
> > > > >
> > > > > On Sat, Aug 20, 2016 at 9:46 AM Aljoscha Krettek <
> > [email protected]>
> > > > > wrote:
> > > > >
> > > > >> Hi,
> > > > >> I have another question that I think wasn't addressed in the
> > meeting.
> > > At
> > > > >> least it wasn't mentioned in the notes.
> > > > >>
> > > > >> In the context of replacing sources by a combination of to SDFs,
> how
> > > do
> > > > you
> > > > >> determine how many "SDF executor" instances you need downstream?
> For
> > > the
> > > > >> sake of argument assume that both SDFs are executed with
> > parallelism 1
> > > > (or
> > > > >> one per worker). Now, if you have a file source that reads from a
> > > static
> > > > >> set of files the first SDF would emit the filenames while the
> second
> > > SDF
> > > > >> would receive the filenames and emit their contents. This works
> well
> > > and
> > > > >> the downstream SDF can process one filename after the other. Now,
> > > think
> > > > of
> > > > >> something like a Kafka source. The first SDF would emit the
> > partitions
> > > > (say
> > > > >> 4 partitions, in this example) and the second SDF would be
> > responsible
> > > > for
> > > > >> reading from a topic and emitting elements. Reading from one topic
> > > never
> > > > >> finishes so you can't process the topics in series. I think you
> > would
> > > > need
> > > > >> to have 4 downstream "SDF executor" instances. The question now
> is:
> > > how
> > > > do
> > > > >> you determine whether you are in the first or the second
> situation?
> > > > >>
> > > > >> Probably I'm just overlooking something and this is already dealt
> > with
> > > > >> somewhere... :-)
> > > > >>
> > > > >> Cheers,
> > > > >> Aljoscha
> > > > >>
> > > > >> On Fri, 19 Aug 2016 at 21:02 Ismaël Mejía <[email protected]>
> > wrote:
> > > > >>
> > > > >>> Hello,
> > > > >>>
> > > > >>> Thanks for the notes both Dan and Eugene, and for taking the time
> > to
> > > do
> > > > >> the
> > > > >>> presentation and  answer our questions.
> > > > >>>
> > > > >>> I mentioned the ongoing work on dynamic scaling on Flink because
> I
> > > > >> suppose
> > > > >>> that it will address dynamic rebalancing eventually (there are
> > > multiple
> > > > >>> changes going on for dynamic scaling).
> > > > >>>
> > > > >>>
> > > > >>>
> > > > >>
> > > >
> > >
> >
> https://docs.google.com/document/d/1G1OS1z3xEBOrYD4wSu-LuBCyPUWyFd9l3T9WyssQ63w/edit#heading=h.2suhu1fjxmp4
> > > > >>>
> > > > >>>
> > > https://lists.apache.org/[email protected]:lte=1M:FLIP-8
> > > > >>>
> > > > >>> Anyway I am far from an expert on flink, but probably the flink
> > guys
> > > > can
> > > > >>> give their opinion about this and refer to a more precise
> document
> > > that
> > > > >> the
> > > > >>> ones I mentioned..
> > > > >>>
> > > > >>> ​Thanks again,
> > > > >>> Ismaël​
> > > > >>>
> > > > >>> On Fri, Aug 19, 2016 at 8:52 PM, Jean-Baptiste Onofré <
> > > [email protected]
> > > > >
> > > > >>> wrote:
> > > > >>>
> > > > >>>> Great summary Eugene and Dan.
> > > > >>>>
> > > > >>>> And thanks again for the details, explanation, and discussion.
> > > > >>>>
> > > > >>>> Regards
> > > > >>>> JB
> > > > >>>>
> > > > >>>>
> > > > >>>> On 08/19/2016 08:16 PM, Eugene Kirpichov wrote:
> > > > >>>>
> > > > >>>>> Thanks for attending, everybody!
> > > > >>>>>
> > > > >>>>> Here are meeting notes (thanks Dan!).
> > > > >>>>>
> > > > >>>>> Q: Will SplittableDoFn enable better repartitioning of the
> > > > >> input/output
> > > > >>>>> data?
> > > > >>>>> A: Not really; repartitioning is orthogonal to SDF.
> > > > >>>>>
> > > > >>>>> Current Source API suffers from lack of composition and
> > scalability
> > > > >>>>> because
> > > > >>>>> we treat sources too much as metadata, not enough as data.
> > > > >>>>>
> > > > >>>>> Q(slide with transform expansion): who does the "magic"?
> > > > >>>>> A: The runner. Checkpointing and dynamically splitting
> > restrictions
> > > > >> will
> > > > >>>>> require collaboration with the runner.
> > > > >>>>>
> > > > >>>>> Q: How does the runner interact with the DoFn to control the
> > > > >>> restrictions?
> > > > >>>>> Is it related to the centralized job tracker etc.?
> > > > >>>>> A: RestrictionTracker is a simple helper object, that exists
> > purely
> > > > on
> > > > >>> the
> > > > >>>>> worker while executing a single partition, and interacts with
> the
> > > > >> worker
> > > > >>>>> harness part of the runner. Not to be confused with the
> > centralized
> > > > >> job
> > > > >>>>> tracker (master) - completely unrelated. Worker harness, of
> > course,
> > > > >>>>> interacts with the master in some relevant ways (e.g. Dataflow
> > > master
> > > > >>> can
> > > > >>>>> tell "you're a straggler, you should split").
> > > > >>>>>
> > > > >>>>> Q: Is this a new DoFn subclass, or how will this integrate with
> > the
> > > > >>>>> existing code?
> > > > >>>>> A: It's a feature of reflection-based DoFn (
> > > > >>> https://s.apache.org/a-new-do
> > > > >>>>> fn)
> > > > >>>>> - just another optional parameter of type RestrictionTracker to
> > > > >>>>> processElement() which is dynamically bound via reflection, so
> > > fully
> > > > >>>>> backward/forward compatible, and looks to users like a regular
> > > DoFn.
> > > > >>>>>
> > > > >>>>> Q: why is fractionClaimed a double?
> > > > >>>>> A: It's a number in 0..1. Job-wide magic (e.g. autoscaling,
> > dynamic
> > > > >>>>> rebalancing) requires a uniform way to represent progress
> through
> > > > >>>>> different
> > > > >>>>> sources.
> > > > >>>>>
> > > > >>>>> Q: Spark runner is microbatch-based, so this seems to map well
> > onto
> > > > >>>>> checkpoint/resume, right?
> > > > >>>>> A: Yes; actually the Dataflow runner is, at a worker level,
> also
> > > > >>>>> microbatch-based. The way SDF interacts with a runner will be
> > very
> > > > >>> similar
> > > > >>>>> to how a Bounded/UnboundedSource interacts with a runner.
> > > > >>>>>
> > > > >>>>> Q: Using SDF, what would be the "packaging" of the IO?
> > > > >>>>> A: Same as currently: package IO's as PTransforms and their
> > > > >>> implementation
> > > > >>>>> under the hood can be anything: Source, simple ParDo's, SDF,
> etc.
> > > > E.g.
> > > > >>>>> Datastore was recently refactored from BoundedSource to ParDo
> > > (ended
> > > > >> up
> > > > >>>>> simpler and more scalable), transparently to users.
> > > > >>>>>
> > > > >>>>> Q: What's the timeline; what to do with the IOs currently in
> > > > >>> development?
> > > > >>>>> A: Timeline is O(months). Keep doing what you're doing and
> > working
> > > on
> > > > >>> top
> > > > >>>>> of Source APIs when necessary and simple ParDo's otherwise.
> > > > >>>>>
> > > > >>>>> Q: What's the impact for the runner writers?
> > > > >>>>> A: Tentatively expected that most of the code for running an
> SDF
> > > will
> > > > >> be
> > > > >>>>> common to runners, with some amount of per-runner glue code,
> just
> > > > like
> > > > >>>>> GBK/windowing/triggering. Impact on Dataflow runner is larger
> > since
> > > > it
> > > > >>>>> supports dynamic rebalancing in batch mode and this is the
> > hardest
> > > > >> part,
> > > > >>>>> but for other runners shouldn't be too hard.
> > > > >>>>>
> > > > >>>>> JB: Talend has people who can help with this: e.g. help
> integrate
> > > > into
> > > > >>>>> Spark runner, refactor IOs etc. Amit also willing to chat about
> > > > >>> supporting
> > > > >>>>> SDF in Spark runner.
> > > > >>>>>
> > > > >>>>> Ismael: There's a Flink proposal about dynamic rebalancing.
> > Ismael
> > > > >> will
> > > > >>>>> send a link.
> > > > >>>>>
> > > > >>>>>
> > > > >>>>> On Fri, Aug 19, 2016 at 7:21 AM Jean-Baptiste Onofré <
> > > > [email protected]
> > > > >>>
> > > > >>>>> wrote:
> > > > >>>>>
> > > > >>>>> Hi Eugene,
> > > > >>>>>>
> > > > >>>>>> thanks for the reminder.
> > > > >>>>>>
> > > > >>>>>> Just to prepare some topics for the call, please find some
> > points:
> > > > >>>>>>
> > > > >>>>>> 1. Using SDF, what would be the "packaging" of the IO ? It
> > sounds
> > > to
> > > > >> me
> > > > >>>>>> that we can keep the IO packaging style (using with* setters
> for
> > > the
> > > > >> IO
> > > > >>>>>> configuration) and replace PTransform, Source, Reader, ...
> > > directly
> > > > >>> with
> > > > >>>>>> SDF. Correct ?
> > > > >>>>>>
> > > > >>>>>> 2. What's your plan in term of release to include SDF ? We
> have
> > > > >> several
> > > > >>>>>> IOs in preparation and I wonder if it's worth to start to use
> > the
> > > > new
> > > > >>>>>> SDF API or not.
> > > > >>>>>>
> > > > >>>>>> 3. What's the impact for the runner writers ? The runners will
> > > have
> > > > >> to
> > > > >>>>>> support SDF, that could be tricky depending of the execution
> > > engine.
> > > > >> In
> > > > >>>>>> the worst case where the runner can't fully support SDF, does
> it
> > > > mean
> > > > >>>>>> that most of our IOs will be useless ?
> > > > >>>>>>
> > > > >>>>>> Just my dumb topics ;)
> > > > >>>>>>
> > > > >>>>>> Thanks,
> > > > >>>>>> See you at 8am !
> > > > >>>>>>
> > > > >>>>>> Regards
> > > > >>>>>> JB
> > > > >>>>>>
> > > > >>>>>> On 08/19/2016 02:29 AM, Eugene Kirpichov wrote:
> > > > >>>>>>
> > > > >>>>>>> Hello everybody,
> > > > >>>>>>>
> > > > >>>>>>> Just a reminder:
> > > > >>>>>>>
> > > > >>>>>>> The meeting is happening tomorrow - Friday Aug 19th, 8am-9am
> > PST,
> > > > to
> > > > >>>>>>> join
> > > > >>>>>>> the call go to
> > > > >>>>>>>
> > https://hangouts.google.com/hangouts/_/google.com/splittabledofn
> > > .
> > > > >>>>>>> I intend to go over the proposed design and then have a
> > free-form
> > > > >>>>>>> discussion.
> > > > >>>>>>>
> > > > >>>>>>> Please have a skim through the proposal doc:
> > > https://s.apache.org/
> > > > >>>>>>> splittable-do-fn
> > > > >>>>>>> I also made some slides that are basically a trimmed-down
> > version
> > > > of
> > > > >>> the
> > > > >>>>>>> doc to use as a guide when conducting the meeting,
> > > > >>>>>>>
> > > > >>>>>>> https://docs.google.com/presentation/d/1DfCp9VRLH-5Ap5nbz047
> > > > >>>>>> Fiv3NfjrywJVSdef5dRY4aE/edit?usp=sharing
> > > > >>>>>>
> > > > >>>>>>> .
> > > > >>>>>>>
> > > > >>>>>>> I will post notes from the meeting on this thread afterwards.
> > > > >>>>>>>
> > > > >>>>>>> Thanks, looking forward.
> > > > >>>>>>>
> > > > >>>>>>> On Fri, Aug 12, 2016 at 5:35 PM Dan Halperin
> > > > >>>>>>> <[email protected]
> > > > >>>>>>>
> > > > >>>>>>> wrote:
> > > > >>>>>>>
> > > > >>>>>>> This is pretty cool! I'll be there too. (unless the hangout
> > gets
> > > > too
> > > > >>>>>>>>
> > > > >>>>>>> full
> > > > >>>>>>
> > > > >>>>>>> -- if so, I'll drop out in favor of others who aren't lucky
> > > enough
> > > > >> to
> > > > >>>>>>>>
> > > > >>>>>>> get
> > > > >>>>>>
> > > > >>>>>>> to talk to Eugene all the time.)
> > > > >>>>>>>>
> > > > >>>>>>>> On Fri, Aug 12, 2016 at 4:03 PM, Andrew Psaltis <
> > > > >>>>>>>>
> > > > >>>>>>> [email protected]>
> > > > >>>>>>
> > > > >>>>>>> wrote:
> > > > >>>>>>>>
> > > > >>>>>>>> +1 I'll join
> > > > >>>>>>>>>
> > > > >>>>>>>>> On Friday, August 12, 2016, Aparup Banerjee (apbanerj) <
> > > > >>>>>>>>>
> > > > >>>>>>>> [email protected]
> > > > >>>>>>>>
> > > > >>>>>>>>>
> > > > >>>>>>>>>> wrote:
> > > > >>>>>>>>>
> > > > >>>>>>>>> + 1, me2
> > > > >>>>>>>>>>
> > > > >>>>>>>>>>
> > > > >>>>>>>>>>
> > > > >>>>>>>>>>
> > > > >>>>>>>>>> On 8/12/16, 9:27 AM, "Amit Sela" <[email protected]
> > > > >>>>>>>>>>
> > > > >>>>>>>>> <javascript:;>>
> > > > >>>>>>
> > > > >>>>>>> wrote:
> > > > >>>>>>>>>>
> > > > >>>>>>>>>> +1 as in I'll join ;-)
> > > > >>>>>>>>>>>
> > > > >>>>>>>>>>> On Fri, Aug 12, 2016, 19:14 Eugene Kirpichov
> > > > >>>>>>>>>>>
> > > > >>>>>>>>>> <[email protected]
> > > > >>>>>>>>>
> > > > >>>>>>>>>>
> > > > >>>>>>>>>>> wrote:
> > > > >>>>>>>>>>>
> > > > >>>>>>>>>>> Sounds good, thanks!
> > > > >>>>>>>>>>>> Then Friday Aug 19th it is, 8am-9am PST,
> > > > >>>>>>>>>>>> https://staging.talkgadget.google.com/hangouts/_/google
> .
> > > > >>>>>>>>>>>>
> > > > >>>>>>>>>>> com/splittabledofn
> > > > >>>>>>>>>>
> > > > >>>>>>>>>>>
> > > > >>>>>>>>>>>> On Thu, Aug 11, 2016 at 11:12 PM Jean-Baptiste Onofré <
> > > > >>>>>>>>>>>>
> > > > >>>>>>>>>>> [email protected]
> > > > >>>>>>>>>
> > > > >>>>>>>>>> <javascript:;>>
> > > > >>>>>>>>>>
> > > > >>>>>>>>>>> wrote:
> > > > >>>>>>>>>>>>
> > > > >>>>>>>>>>>> Hi
> > > > >>>>>>>>>>>>>
> > > > >>>>>>>>>>>>> Unfortunately I will be in Ireland on August 15th. What
> > > about
> > > > >>>>>>>>>>>>>
> > > > >>>>>>>>>>>> Friday
> > > > >>>>>>>>
> > > > >>>>>>>>> 19th ?
> > > > >>>>>>>>>>>>
> > > > >>>>>>>>>>>>>
> > > > >>>>>>>>>>>>> Regards
> > > > >>>>>>>>>>>>> JB
> > > > >>>>>>>>>>>>>
> > > > >>>>>>>>>>>>>
> > > > >>>>>>>>>>>>>
> > > > >>>>>>>>>>>>> On Aug 11, 2016, 23:22, at 23:22, Eugene Kirpichov
> > > > >>>>>>>>>>>>> <[email protected]> wrote:
> > > > >>>>>>>>>>>>>
> > > > >>>>>>>>>>>>>> Hi JB,
> > > > >>>>>>>>>>>>>>
> > > > >>>>>>>>>>>>>> Sounds great, does the suggested time over
> > videoconference
> > > > >> work
> > > > >>>>>>>>>>>>>>
> > > > >>>>>>>>>>>>> for
> > > > >>>>>>>>
> > > > >>>>>>>>> you?
> > > > >>>>>>>>>>>>>>
> > > > >>>>>>>>>>>>>> On Thu, Aug 11, 2016 at 11:59 AM Jean-Baptiste Onofré
> <
> > > > >>>>>>>>>>>>>>
> > > > >>>>>>>>>>>>> [email protected] <javascript:;>>
> > > > >>>>>>>>>>
> > > > >>>>>>>>>>> wrote:
> > > > >>>>>>>>>>>>>>
> > > > >>>>>>>>>>>>>> Hi Eugene
> > > > >>>>>>>>>>>>>>>
> > > > >>>>>>>>>>>>>>> May we talk together next week ? I like the
> proposal. I
> > > > >> would
> > > > >>>>>>>>>>>>>>>
> > > > >>>>>>>>>>>>>> just
> > > > >>>>>>>>>
> > > > >>>>>>>>>> need
> > > > >>>>>>>>>>>>>>
> > > > >>>>>>>>>>>>>>> some details for my understanding.
> > > > >>>>>>>>>>>>>>>
> > > > >>>>>>>>>>>>>>> Thanks
> > > > >>>>>>>>>>>>>>> Regards
> > > > >>>>>>>>>>>>>>> JB
> > > > >>>>>>>>>>>>>>>
> > > > >>>>>>>>>>>>>>>
> > > > >>>>>>>>>>>>>>>
> > > > >>>>>>>>>>>>>>> On Aug 11, 2016, 19:46, at 19:46, Eugene Kirpichov
> > > > >>>>>>>>>>>>>>> <[email protected]> wrote:
> > > > >>>>>>>>>>>>>>>
> > > > >>>>>>>>>>>>>>>> Hi JB,
> > > > >>>>>>>>>>>>>>>>
> > > > >>>>>>>>>>>>>>>> What are your thoughts on this?
> > > > >>>>>>>>>>>>>>>>
> > > > >>>>>>>>>>>>>>>> I'm also thinking of having a virtual meeting to
> > explain
> > > > >> more
> > > > >>>>>>>>>>>>>>>>
> > > > >>>>>>>>>>>>>>> about
> > > > >>>>>>>>>>
> > > > >>>>>>>>>>> this
> > > > >>>>>>>>>>>>>>>> proposal if necessary, since I understand it is a
> lot
> > to
> > > > >>>>>>>>>>>>>>>>
> > > > >>>>>>>>>>>>>>> digest.
> > > > >>>>>>>>
> > > > >>>>>>>>>
> > > > >>>>>>>>>>>>>>>> How about: Monday Aug 15, 8am-9am Pacific time, over
> > > > >>> Hangouts?
> > > > >>>>>>>>>>>>>>>> (link:
> > > > >>>>>>>>>>>>>>>>
> > > > >>>>>>>>>>>>>>>>
> > > > >>>>>>>>>>>>>>>
> > > > >>>>>>>>>>>>>>
> > > > >>>>>>>>>>>>>
> https://staging.talkgadget.google.com/hangouts/_/google.
> > > > >>>>>>>>>>>>
> > > > >>>>>>>>>>> com/splittabledofn
> > > > >>>>>>>>>>
> > > > >>>>>>>>>>> -
> > > > >>>>>>>>>>>>>>>> I confirmed that it can be joined without being
> logged
> > > > >> into a
> > > > >>>>>>>>>>>>>>>>
> > > > >>>>>>>>>>>>>>> Google
> > > > >>>>>>>>>>
> > > > >>>>>>>>>>> account)
> > > > >>>>>>>>>>>>>>>>
> > > > >>>>>>>>>>>>>>>> Who'd be interested in attending, and does this
> > > time/date
> > > > >>> work
> > > > >>>>>>>>>>>>>>>>
> > > > >>>>>>>>>>>>>>> for
> > > > >>>>>>>>>
> > > > >>>>>>>>>> people?
> > > > >>>>>>>>>>>>>>>>
> > > > >>>>>>>>>>>>>>>> On Fri, Aug 5, 2016 at 10:48 AM Eugene Kirpichov
> > > > >>>>>>>>>>>>>>>>
> > > > >>>>>>>>>>>>>>> <[email protected] <javascript:;>>
> > > > >>>>>>>>>>>>>>
> > > > >>>>>>>>>>>>>>> wrote:
> > > > >>>>>>>>>>>>>>>>
> > > > >>>>>>>>>>>>>>>> Hi JB, thanks for reading and for your comments!
> > > > >>>>>>>>>>>>>>>>>
> > > > >>>>>>>>>>>>>>>>> It sounds like you are concerned about continued
> > > support
> > > > >> for
> > > > >>>>>>>>>>>>>>>>>
> > > > >>>>>>>>>>>>>>>> existing
> > > > >>>>>>>>>>>>>>
> > > > >>>>>>>>>>>>>>> IO's
> > > > >>>>>>>>>>>>>>>>
> > > > >>>>>>>>>>>>>>>>> people have developed, and about backward
> > > compatibility?
> > > > >>>>>>>>>>>>>>>>>
> > > > >>>>>>>>>>>>>>>>> We do not need to remove the Source API, and all
> > > existing
> > > > >>>>>>>>>>>>>>>>>
> > > > >>>>>>>>>>>>>>>> Source-based
> > > > >>>>>>>>>>>>>>>>
> > > > >>>>>>>>>>>>>>>>> connectors will continue to work [though the
> document
> > > > >>>>>>>>>>>>>>>>>
> > > > >>>>>>>>>>>>>>>> proposes
> > > > >>>>>>>>
> > > > >>>>>>>>> at
> > > > >>>>>>>>>>
> > > > >>>>>>>>>>> some
> > > > >>>>>>>>>>>>>>>>
> > > > >>>>>>>>>>>>>>>>> point to make Read.from(Source) to translate to a
> > > wrapper
> > > > >>>>>>>>>>>>>>>>>
> > > > >>>>>>>>>>>>>>>> SDF
> > > > >>>>>>>>
> > > > >>>>>>>>> under
> > > > >>>>>>>>>>>>>>
> > > > >>>>>>>>>>>>>>> the
> > > > >>>>>>>>>>>>>>>>
> > > > >>>>>>>>>>>>>>>>> hood, to exercise the feature more and to make sure
> > > that
> > > > >> it
> > > > >>>>>>>>>>>>>>>>>
> > > > >>>>>>>>>>>>>>>> is
> > > > >>>>>>>>
> > > > >>>>>>>>> strictly
> > > > >>>>>>>>>>>>>>>>
> > > > >>>>>>>>>>>>>>>>> more powerful - but this is an optional
> > implementation
> > > > >>>>>>>>>>>>>>>>>
> > > > >>>>>>>>>>>>>>>> detail].
> > > > >>>>>>>>>
> > > > >>>>>>>>>>
> > > > >>>>>>>>>>>>>>>>> Perhaps the document phrases this too strongly -
> > > > >> "replacing
> > > > >>>>>>>>>>>>>>>>>
> > > > >>>>>>>>>>>>>>>> the
> > > > >>>>>>>>>
> > > > >>>>>>>>>> Source
> > > > >>>>>>>>>>>>>>>>
> > > > >>>>>>>>>>>>>>>>> API": a better phrasing would be "introducing a new
> > API
> > > > so
> > > > >>>>>>>>>>>>>>>>>
> > > > >>>>>>>>>>>>>>>> powerful
> > > > >>>>>>>>>>>>>>
> > > > >>>>>>>>>>>>>>> and
> > > > >>>>>>>>>>>>>>>>
> > > > >>>>>>>>>>>>>>>>> easy-to-use that hopefully people will choose it
> over
> > > the
> > > > >>>>>>>>>>>>>>>>>
> > > > >>>>>>>>>>>>>>>> Source
> > > > >>>>>>>>>
> > > > >>>>>>>>>> API
> > > > >>>>>>>>>>>>>>
> > > > >>>>>>>>>>>>>>> all
> > > > >>>>>>>>>>>>>>>>
> > > > >>>>>>>>>>>>>>>>> the time, even though they don't have to" :) And we
> > can
> > > > >>>>>>>>>>>>>>>>>
> > > > >>>>>>>>>>>>>>>> discuss
> > > > >>>>>>>>>
> > > > >>>>>>>>>> whether or
> > > > >>>>>>>>>>>>>>>>
> > > > >>>>>>>>>>>>>>>>> not to actually deprecate/remove the Source API at
> > some
> > > > >>>>>>>>>>>>>>>>>
> > > > >>>>>>>>>>>>>>>> point
> > > > >>>>>>>>
> > > > >>>>>>>>> down
> > > > >>>>>>>>>>
> > > > >>>>>>>>>>> the
> > > > >>>>>>>>>>>>>>>>
> > > > >>>>>>>>>>>>>>>>> road, once it becomes clear whether this is the
> case
> > or
> > > > >> not.
> > > > >>>>>>>>>>>>>>>>>
> > > > >>>>>>>>>>>>>>>>> To give more context: this proposal came out of
> > > > >> discussions
> > > > >>>>>>>>>>>>>>>>>
> > > > >>>>>>>>>>>>>>>> within
> > > > >>>>>>>>>>
> > > > >>>>>>>>>>> the SDK
> > > > >>>>>>>>>>>>>>>>
> > > > >>>>>>>>>>>>>>>>> team over the past ~1.5 years, before the Beam
> > project
> > > > >>>>>>>>>>>>>>>>>
> > > > >>>>>>>>>>>>>>>> existed,
> > > > >>>>>>>>>
> > > > >>>>>>>>>> on
> > > > >>>>>>>>>>
> > > > >>>>>>>>>>> how to
> > > > >>>>>>>>>>>>>>>>
> > > > >>>>>>>>>>>>>>>>> make major improvements to the Source API; perhaps
> it
> > > > will
> > > > >>>>>>>>>>>>>>>>>
> > > > >>>>>>>>>>>>>>>> clarify
> > > > >>>>>>>>>>
> > > > >>>>>>>>>>> things
> > > > >>>>>>>>>>>>>>>>
> > > > >>>>>>>>>>>>>>>>> if I give a history of the ideas discussed:
> > > > >>>>>>>>>>>>>>>>> - The first idea was to introduce a
> > > > >>>>>>>>>>>>>>>>>
> > > > >>>>>>>>>>>>>>>> Read.from(PCollection<Source>)
> > > > >>>>>>>>>>
> > > > >>>>>>>>>>> transform while keeping the Source API intact - this,
> given
> > > > >>>>>>>>>>>>>>>>>
> > > > >>>>>>>>>>>>>>>> appropriate
> > > > >>>>>>>>>>>>>>>>
> > > > >>>>>>>>>>>>>>>>> implementation, would solve most of the scalability
> > and
> > > > >>>>>>>>>>>>>>>>>
> > > > >>>>>>>>>>>>>>>> composability
> > > > >>>>>>>>>>>>>>
> > > > >>>>>>>>>>>>>>> issues of IO's. Then most connectors would look like
> :
> > > > >>>>>>>>>>>>>>>>>
> > > > >>>>>>>>>>>>>>>> ParDo<A,
> > > > >>>>>>>>>
> > > > >>>>>>>>>> Source<B>>
> > > > >>>>>>>>>>>>>>>>
> > > > >>>>>>>>>>>>>>>>> + Read.from().
> > > > >>>>>>>>>>>>>>>>> - Then we figured that the Source class is an
> > > unnecessary
> > > > >>>>>>>>>>>>>>>>>
> > > > >>>>>>>>>>>>>>>> abstraction, as
> > > > >>>>>>>>>>>>>>>>
> > > > >>>>>>>>>>>>>>>>> it simply holds data. What if we only had a
> Reader<S,
> > > B>
> > > > >>>>>>>>>>>>>>>>>
> > > > >>>>>>>>>>>>>>>> class
> > > > >>>>>>>>
> > > > >>>>>>>>> where
> > > > >>>>>>>>>>>>>>
> > > > >>>>>>>>>>>>>>> S is
> > > > >>>>>>>>>>>>>>>>
> > > > >>>>>>>>>>>>>>>>> the source type and B the output type? Then
> > connectors
> > > > >> would
> > > > >>>>>>>>>>>>>>>>>
> > > > >>>>>>>>>>>>>>>> be
> > > > >>>>>>>>>
> > > > >>>>>>>>>> something
> > > > >>>>>>>>>>>>>>>>
> > > > >>>>>>>>>>>>>>>>> like: ParDo<A, S> + hypothetical
> Read.using(Reader<S,
> > > > B>).
> > > > >>>>>>>>>>>>>>>>> - Then somebody remarked that some of the features
> of
> > > > >> Source
> > > > >>>>>>>>>>>>>>>>>
> > > > >>>>>>>>>>>>>>>> are
> > > > >>>>>>>>>
> > > > >>>>>>>>>> useful to
> > > > >>>>>>>>>>>>>>>>
> > > > >>>>>>>>>>>>>>>>> ParDo's as well: e.g. ability to report progress
> when
> > > > >>>>>>>>>>>>>>>>>
> > > > >>>>>>>>>>>>>>>> processing a
> > > > >>>>>>>>>>
> > > > >>>>>>>>>>> very
> > > > >>>>>>>>>>>>>>>>
> > > > >>>>>>>>>>>>>>>>> heavy element, or ability to produce very large
> > output
> > > in
> > > > >>>>>>>>>>>>>>>>>
> > > > >>>>>>>>>>>>>>>> parallel.
> > > > >>>>>>>>>>>>>>
> > > > >>>>>>>>>>>>>>> - The two previous bullets were already hinting that
> > the
> > > > >>>>>>>>>>>>>>>>>
> > > > >>>>>>>>>>>>>>>> Read.using()
> > > > >>>>>>>>>>>>>>
> > > > >>>>>>>>>>>>>>> primitive might not be so special: it just takes S
> and
> > > > >>>>>>>>>>>>>>>>>
> > > > >>>>>>>>>>>>>>>> produces
> > > > >>>>>>>>>
> > > > >>>>>>>>>> B:
> > > > >>>>>>>>>>
> > > > >>>>>>>>>>> isn't
> > > > >>>>>>>>>>>>>>>>
> > > > >>>>>>>>>>>>>>>>> that what a ParDo does, plus some source magic,
> minus
> > > the
> > > > >>>>>>>>>>>>>>>>>
> > > > >>>>>>>>>>>>>>>> convenience
> > > > >>>>>>>>>>>>>>
> > > > >>>>>>>>>>>>>>> of
> > > > >>>>>>>>>>>>>>>>
> > > > >>>>>>>>>>>>>>>>> c.output() vs. the start/advance() state machine?
> > > > >>>>>>>>>>>>>>>>> - At this point it became clear that we should
> > explore
> > > > >>>>>>>>>>>>>>>>>
> > > > >>>>>>>>>>>>>>>> unifying
> > > > >>>>>>>>>
> > > > >>>>>>>>>> sources
> > > > >>>>>>>>>>>>>>>>
> > > > >>>>>>>>>>>>>>>>> and ParDo's, in particular: can we bring the magic
> of
> > > > >>>>>>>>>>>>>>>>>
> > > > >>>>>>>>>>>>>>>> sources
> > > > >>>>>>>>
> > > > >>>>>>>>> to
> > > > >>>>>>>>>
> > > > >>>>>>>>>> ParDo's
> > > > >>>>>>>>>>>>>>>>
> > > > >>>>>>>>>>>>>>>>> but without the limitations and coding
> > inconveniences?
> > > > And
> > > > >>>>>>>>>>>>>>>>>
> > > > >>>>>>>>>>>>>>>> this
> > > > >>>>>>>>>
> > > > >>>>>>>>>> is
> > > > >>>>>>>>>>
> > > > >>>>>>>>>>> how
> > > > >>>>>>>>>>>>>>>>
> > > > >>>>>>>>>>>>>>>>> SplittableDoFn was born: bringing source magic to a
> > > DoFn
> > > > >> by
> > > > >>>>>>>>>>>>>>>>>
> > > > >>>>>>>>>>>>>>>> providing
> > > > >>>>>>>>>>>>>>
> > > > >>>>>>>>>>>>>>> a
> > > > >>>>>>>>>>>>>>>>
> > > > >>>>>>>>>>>>>>>>> RangeTracker.
> > > > >>>>>>>>>>>>>>>>> - Once the idea of "splittable DoFn's" was born, it
> > > > became
> > > > >>>>>>>>>>>>>>>>>
> > > > >>>>>>>>>>>>>>>> clear
> > > > >>>>>>>>>
> > > > >>>>>>>>>> that
> > > > >>>>>>>>>>>>>>
> > > > >>>>>>>>>>>>>>> it
> > > > >>>>>>>>>>>>>>>>
> > > > >>>>>>>>>>>>>>>>> is strictly more general than sources; at least, in
> > the
> > > > >>>>>>>>>>>>>>>>>
> > > > >>>>>>>>>>>>>>>> respect
> > > > >>>>>>>>>
> > > > >>>>>>>>>> that
> > > > >>>>>>>>>>>>>>
> > > > >>>>>>>>>>>>>>> sources have to produce output, while DoFn's don't:
> an
> > > SDF
> > > > >>>>>>>>>>>>>>>>>
> > > > >>>>>>>>>>>>>>>> may
> > > > >>>>>>>>
> > > > >>>>>>>>> very
> > > > >>>>>>>>>>>>>>
> > > > >>>>>>>>>>>>>>> well
> > > > >>>>>>>>>>>>>>>>
> > > > >>>>>>>>>>>>>>>>> produce no output at all, and simply perform a side
> > > > effect
> > > > >>>>>>>>>>>>>>>>>
> > > > >>>>>>>>>>>>>>>> in
> > > > >>>>>>>>
> > > > >>>>>>>>> a
> > > > >>>>>>>>>
> > > > >>>>>>>>>> parallel/resumable way.
> > > > >>>>>>>>>>>>>>>>> - Then there were countless hours of discussions on
> > > > >> unifying
> > > > >>>>>>>>>>>>>>>>>
> > > > >>>>>>>>>>>>>>>> the
> > > > >>>>>>>>>
> > > > >>>>>>>>>> bounded/unbounded cases, on the particulars of
> RangeTracker
> > > > >>>>>>>>>>>>>>>>>
> > > > >>>>>>>>>>>>>>>> APIs
> > > > >>>>>>>>>
> > > > >>>>>>>>>> reconciling parallelization and checkpointing, what the
> > > > >>>>>>>>>>>>>>>>>
> > > > >>>>>>>>>>>>>>>> relation
> > > > >>>>>>>>>
> > > > >>>>>>>>>> between
> > > > >>>>>>>>>>>>>>>>
> > > > >>>>>>>>>>>>>>>>> SDF and DF should be, etc. They culminated in the
> > > current
> > > > >>>>>>>>>>>>>>>>>
> > > > >>>>>>>>>>>>>>>> proposal.
> > > > >>>>>>>>>>>>>>
> > > > >>>>>>>>>>>>>>> The
> > > > >>>>>>>>>>>>>>>>
> > > > >>>>>>>>>>>>>>>>> proposal comes at a time when a couple of key
> > > ingredients
> > > > >>>>>>>>>>>>>>>>>
> > > > >>>>>>>>>>>>>>>> are
> > > > >>>>>>>>
> > > > >>>>>>>>> (almost)
> > > > >>>>>>>>>>>>>>>>
> > > > >>>>>>>>>>>>>>>>> ready: NewDoFn to make SDF look like a regular
> DoFn,
> > > and
> > > > >> the
> > > > >>>>>>>>>>>>>>>>>
> > > > >>>>>>>>>>>>>>>> State/Timers
> > > > >>>>>>>>>>>>>>>>
> > > > >>>>>>>>>>>>>>>>> proposal to enable unbounded work per element.
> > > > >>>>>>>>>>>>>>>>>
> > > > >>>>>>>>>>>>>>>>> To put it shortly:
> > > > >>>>>>>>>>>>>>>>> - Yes, we will support existing Source connectors,
> > and
> > > > >> will
> > > > >>>>>>>>>>>>>>>>>
> > > > >>>>>>>>>>>>>>>> support
> > > > >>>>>>>>>>>>>>
> > > > >>>>>>>>>>>>>>> writing new ones, possibly forever. There is no
> > > > interference
> > > > >>>>>>>>>>>>>>>>>
> > > > >>>>>>>>>>>>>>>> with
> > > > >>>>>>>>>>
> > > > >>>>>>>>>>> current
> > > > >>>>>>>>>>>>>>>>
> > > > >>>>>>>>>>>>>>>>> users of Source.
> > > > >>>>>>>>>>>>>>>>> - The new API is an attempt to improve the Source
> > API,
> > > > >> taken
> > > > >>>>>>>>>>>>>>>>>
> > > > >>>>>>>>>>>>>>>> to
> > > > >>>>>>>>>
> > > > >>>>>>>>>> its
> > > > >>>>>>>>>>>>>>
> > > > >>>>>>>>>>>>>>> logical limit where it turns out that users' goals
> can
> > be
> > > > >>>>>>>>>>>>>>>>>
> > > > >>>>>>>>>>>>>>>> accomplished
> > > > >>>>>>>>>>>>>>>>
> > > > >>>>>>>>>>>>>>>>> easier and more generically entirely within
> ParDo's.
> > > > >>>>>>>>>>>>>>>>>
> > > > >>>>>>>>>>>>>>>>> Let me know what you think, and thanks again!
> > > > >>>>>>>>>>>>>>>>>
> > > > >>>>>>>>>>>>>>>>> On Fri, Aug 5, 2016 at 2:39 AM Jean-Baptiste Onofré
> > > > >>>>>>>>>>>>>>>>>
> > > > >>>>>>>>>>>>>>>> <[email protected] <javascript:;>>
> > > > >>>>>>>>>>>>>>
> > > > >>>>>>>>>>>>>>> wrote:
> > > > >>>>>>>>>>>>>>>>>
> > > > >>>>>>>>>>>>>>>>> Hi Eugene,
> > > > >>>>>>>>>>>>>>>>>>
> > > > >>>>>>>>>>>>>>>>>> Just a question: why is it in DoFn and note an
> > > > >> improvement
> > > > >>>>>>>>>>>>>>>>>>
> > > > >>>>>>>>>>>>>>>>> of
> > > > >>>>>>>>
> > > > >>>>>>>>> Source
> > > > >>>>>>>>>>>>>>
> > > > >>>>>>>>>>>>>>> ?
> > > > >>>>>>>>>>>>>>>>
> > > > >>>>>>>>>>>>>>>>>
> > > > >>>>>>>>>>>>>>>>>> If I understand correctly, it means that we will
> > have
> > > to
> > > > >>>>>>>>>>>>>>>>>>
> > > > >>>>>>>>>>>>>>>>> refactore
> > > > >>>>>>>>>>>>>>
> > > > >>>>>>>>>>>>>>> all
> > > > >>>>>>>>>>>>>>>>
> > > > >>>>>>>>>>>>>>>>> existing IO: basically, what you propose is to
> remove
> > > all
> > > > >>>>>>>>>>>>>>>>>>
> > > > >>>>>>>>>>>>>>>>> Source
> > > > >>>>>>>>>>
> > > > >>>>>>>>>>> to
> > > > >>>>>>>>>>>>>>
> > > > >>>>>>>>>>>>>>> replace with NewDoFn.
> > > > >>>>>>>>>>>>>>>>>>
> > > > >>>>>>>>>>>>>>>>>> I'm concern with this approach, especially in term
> > of
> > > > >>>>>>>>>>>>>>>>>>
> > > > >>>>>>>>>>>>>>>>> timing:
> > > > >>>>>>>>
> > > > >>>>>>>>> clearly,
> > > > >>>>>>>>>>>>>>>>
> > > > >>>>>>>>>>>>>>>>> the IO is the area where we have to move forward in
> > > Beam
> > > > >> as
> > > > >>>>>>>>>>>>>>>>>>
> > > > >>>>>>>>>>>>>>>>> it
> > > > >>>>>>>>>
> > > > >>>>>>>>>> will
> > > > >>>>>>>>>>>>>>
> > > > >>>>>>>>>>>>>>> allow new users to start in their projects.
> > > > >>>>>>>>>>>>>>>>>> So, we started to bring new IOs: Kafka, JMS,
> > > Cassandra,
> > > > >>>>>>>>>>>>>>>>>>
> > > > >>>>>>>>>>>>>>>>> MongoDB,
> > > > >>>>>>>>>>
> > > > >>>>>>>>>>> JDBC,
> > > > >>>>>>>>>>>>>>>>
> > > > >>>>>>>>>>>>>>>>> ... and some people started to learn the IO API
> > > > >>>>>>>>>>>>>>>>>>
> > > > >>>>>>>>>>>>>>>>> (Bounded/Unbouded
> > > > >>>>>>>>>>
> > > > >>>>>>>>>>> source, etc).
> > > > >>>>>>>>>>>>>>>>>>
> > > > >>>>>>>>>>>>>>>>>> I think it would make more sense to enhance the IO
> > API
> > > > >>>>>>>>>>>>>>>>>>
> > > > >>>>>>>>>>>>>>>>> (Source)
> > > > >>>>>>>>>
> > > > >>>>>>>>>> instead
> > > > >>>>>>>>>>>>>>>>
> > > > >>>>>>>>>>>>>>>>> of introducing a NewDoFn.
> > > > >>>>>>>>>>>>>>>>>>
> > > > >>>>>>>>>>>>>>>>>> What are your thoughts for IO writer like me ? ;)
> > > > >>>>>>>>>>>>>>>>>>
> > > > >>>>>>>>>>>>>>>>>> Regards
> > > > >>>>>>>>>>>>>>>>>> JB
> > > > >>>>>>>>>>>>>>>>>>
> > > > >>>>>>>>>>>>>>>>>> On 08/04/2016 07:45 PM, Eugene Kirpichov wrote:
> > > > >>>>>>>>>>>>>>>>>>
> > > > >>>>>>>>>>>>>>>>>>> Hello Beam community,
> > > > >>>>>>>>>>>>>>>>>>>
> > > > >>>>>>>>>>>>>>>>>>> We (myself, Daniel Mills and Robert Bradshaw)
> would
> > > > like
> > > > >>>>>>>>>>>>>>>>>>>
> > > > >>>>>>>>>>>>>>>>>> to
> > > > >>>>>>>>
> > > > >>>>>>>>> propose
> > > > >>>>>>>>>>>>>>>>
> > > > >>>>>>>>>>>>>>>>> "Splittable DoFn" - a major generalization of DoFn,
> > > which
> > > > >>>>>>>>>>>>>>>>>>>
> > > > >>>>>>>>>>>>>>>>>> allows
> > > > >>>>>>>>>>>>>>
> > > > >>>>>>>>>>>>>>> processing
> > > > >>>>>>>>>>>>>>>>>>
> > > > >>>>>>>>>>>>>>>>>>> of a single element to be non-monolithic, i.e.
> > > > >>>>>>>>>>>>>>>>>>>
> > > > >>>>>>>>>>>>>>>>>> checkpointable
> > > > >>>>>>>>>
> > > > >>>>>>>>>> and
> > > > >>>>>>>>>>>>>>
> > > > >>>>>>>>>>>>>>> parallelizable, as well as doing an unbounded amount
> of
> > > > >>>>>>>>>>>>>>>>>>>
> > > > >>>>>>>>>>>>>>>>>> work
> > > > >>>>>>>>>
> > > > >>>>>>>>>> per
> > > > >>>>>>>>>>>>>>
> > > > >>>>>>>>>>>>>>> element.
> > > > >>>>>>>>>>>>>>>>>>
> > > > >>>>>>>>>>>>>>>>>>>
> > > > >>>>>>>>>>>>>>>>>>> This allows effectively replacing the current
> > > > >>>>>>>>>>>>>>>>>>>
> > > > >>>>>>>>>>>>>>>>>> Bounded/UnboundedSource
> > > > >>>>>>>>>>>>>>>>
> > > > >>>>>>>>>>>>>>>>> APIs
> > > > >>>>>>>>>>>>>>>>>>
> > > > >>>>>>>>>>>>>>>>>>> with DoFn's that are much easier to code, more
> > > scalable
> > > > >>>>>>>>>>>>>>>>>>>
> > > > >>>>>>>>>>>>>>>>>> and
> > > > >>>>>>>>
> > > > >>>>>>>>> composable
> > > > >>>>>>>>>>>>>>>>
> > > > >>>>>>>>>>>>>>>>> with
> > > > >>>>>>>>>>>>>>>>>>
> > > > >>>>>>>>>>>>>>>>>>> the rest of the Beam programming model, and
> enables
> > > > many
> > > > >>>>>>>>>>>>>>>>>>>
> > > > >>>>>>>>>>>>>>>>>> use
> > > > >>>>>>>>>
> > > > >>>>>>>>>> cases
> > > > >>>>>>>>>>>>>>
> > > > >>>>>>>>>>>>>>> that
> > > > >>>>>>>>>>>>>>>>
> > > > >>>>>>>>>>>>>>>>> were previously difficult or impossible, as well as
> > > some
> > > > >>>>>>>>>>>>>>>>>>>
> > > > >>>>>>>>>>>>>>>>>> non-obvious new
> > > > >>>>>>>>>>>>>>>>
> > > > >>>>>>>>>>>>>>>>> use cases.
> > > > >>>>>>>>>>>>>>>>>>>
> > > > >>>>>>>>>>>>>>>>>>> This proposal has been mentioned before in JIRA
> > > > >> [BEAM-65]
> > > > >>>>>>>>>>>>>>>>>>>
> > > > >>>>>>>>>>>>>>>>>> and
> > > > >>>>>>>>>
> > > > >>>>>>>>>> some
> > > > >>>>>>>>>>>>>>
> > > > >>>>>>>>>>>>>>> Beam
> > > > >>>>>>>>>>>>>>>>
> > > > >>>>>>>>>>>>>>>>> meetings, and now the whole thing is written up in
> a
> > > > >>>>>>>>>>>>>>>>>>>
> > > > >>>>>>>>>>>>>>>>>> document:
> > > > >>>>>>>>>>
> > > > >>>>>>>>>>>
> > > > >>>>>>>>>>>>>>>>>>>         https://s.apache.org/splittable-do-fn
> > > > >>>>>>>>>>>>>>>>>>>
> > > > >>>>>>>>>>>>>>>>>>> Here are some things that become possible with
> > > > >> Splittable
> > > > >>>>>>>>>>>>>>>>>>>
> > > > >>>>>>>>>>>>>>>>>> DoFn:
> > > > >>>>>>>>>>
> > > > >>>>>>>>>>> - Efficiently read a filepattern matching millions of
> > > > >>>>>>>>>>>>>>>>>>>
> > > > >>>>>>>>>>>>>>>>>> files
> > > > >>>>>>>>
> > > > >>>>>>>>> - Read a collection of files that are produced by an
> > > > >>>>>>>>>>>>>>>>>>>
> > > > >>>>>>>>>>>>>>>>>> earlier
> > > > >>>>>>>>>
> > > > >>>>>>>>>> step
> > > > >>>>>>>>>>>>>>
> > > > >>>>>>>>>>>>>>> in the
> > > > >>>>>>>>>>>>>>>>
> > > > >>>>>>>>>>>>>>>>> pipeline (e.g. easily implement a connector to a
> > > storage
> > > > >>>>>>>>>>>>>>>>>>>
> > > > >>>>>>>>>>>>>>>>>> system
> > > > >>>>>>>>>>
> > > > >>>>>>>>>>> that can
> > > > >>>>>>>>>>>>>>>>
> > > > >>>>>>>>>>>>>>>>> export itself to files)
> > > > >>>>>>>>>>>>>>>>>>> - Implement a Kafka reader by composing a "list
> > > > >>>>>>>>>>>>>>>>>>>
> > > > >>>>>>>>>>>>>>>>>> partitions"
> > > > >>>>>>>>
> > > > >>>>>>>>> DoFn
> > > > >>>>>>>>>>>>>>
> > > > >>>>>>>>>>>>>>> with a
> > > > >>>>>>>>>>>>>>>>
> > > > >>>>>>>>>>>>>>>>> DoFn that simply polls a consumer and outputs new
> > > records
> > > > >>>>>>>>>>>>>>>>>>>
> > > > >>>>>>>>>>>>>>>>>> in
> > > > >>>>>>>>>
> > > > >>>>>>>>>> a
> > > > >>>>>>>>>>
> > > > >>>>>>>>>>> while()
> > > > >>>>>>>>>>>>>>>>
> > > > >>>>>>>>>>>>>>>>> loop
> > > > >>>>>>>>>>>>>>>>>>
> > > > >>>>>>>>>>>>>>>>>>> - Implement a log tailer by composing a DoFn that
> > > > >>>>>>>>>>>>>>>>>>>
> > > > >>>>>>>>>>>>>>>>>> incrementally
> > > > >>>>>>>>>>
> > > > >>>>>>>>>>> returns
> > > > >>>>>>>>>>>>>>>>
> > > > >>>>>>>>>>>>>>>>> new
> > > > >>>>>>>>>>>>>>>>>>
> > > > >>>>>>>>>>>>>>>>>>> files in a directory and a DoFn that tails a file
> > > > >>>>>>>>>>>>>>>>>>> - Implement a parallel "count friends in common"
> > > > >>>>>>>>>>>>>>>>>>>
> > > > >>>>>>>>>>>>>>>>>> algorithm
> > > > >>>>>>>>
> > > > >>>>>>>>> (matrix
> > > > >>>>>>>>>>>>>>
> > > > >>>>>>>>>>>>>>> squaring) with good work balancing
> > > > >>>>>>>>>>>>>>>>>>>
> > > > >>>>>>>>>>>>>>>>>>> Here is the meaningful part of a hypothetical
> Kafka
> > > > >>>>>>>>>>>>>>>>>>>
> > > > >>>>>>>>>>>>>>>>>> reader
> > > > >>>>>>>>
> > > > >>>>>>>>> written
> > > > >>>>>>>>>>>>>>
> > > > >>>>>>>>>>>>>>> against
> > > > >>>>>>>>>>>>>>>>>>
> > > > >>>>>>>>>>>>>>>>>>> this API:
> > > > >>>>>>>>>>>>>>>>>>>
> > > > >>>>>>>>>>>>>>>>>>>     ProcessContinuation processElement(
> > > > >>>>>>>>>>>>>>>>>>>             ProcessContext context,
> > > OffsetRangeTracker
> > > > >>>>>>>>>>>>>>>>>>>
> > > > >>>>>>>>>>>>>>>>>> tracker)
> > > > >>>>>>>>>>
> > > > >>>>>>>>>>> {
> > > > >>>>>>>>>>>>>>
> > > > >>>>>>>>>>>>>>>       try (KafkaConsumer<String, String> consumer =
> > > > >>>>>>>>>>>>>>>>>>>
> > > >  Kafka.subscribe(context.element().topic,
> > > > >>>>>>>>>>>>>>>>>>>
> > > > >>>>>>>>>>>>>>>>>>>  context.element().partition)) {
> > > > >>>>>>>>>>
> > > > >>>>>>>>>>>         consumer.seek(tracker.start());
> > > > >>>>>>>>>>>>>>>>>>>         while (true) {
> > > > >>>>>>>>>>>>>>>>>>>           ConsumerRecords<String, String>
> records =
> > > > >>>>>>>>>>>>>>>>>>>
> > > > >>>>>>>>>>>>>>>>>> consumer.poll(100ms);
> > > > >>>>>>>>>>>>>>>>>>
> > > > >>>>>>>>>>>>>>>>>>>           if (records == null) return done();
> > > > >>>>>>>>>>>>>>>>>>>           for (ConsumerRecord<String, String>
> > record
> > > :
> > > > >>>>>>>>>>>>>>>>>>>
> > > > >>>>>>>>>>>>>>>>>> records)
> > > > >>>>>>>>>>
> > > > >>>>>>>>>>> {
> > > > >>>>>>>>>>>>>>
> > > > >>>>>>>>>>>>>>>             if (!tracker.tryClaim(record.offset())) {
> > > > >>>>>>>>>>>>>>>>>>>               return
> > > > >>>>>>>>>>>>>>>>>>>
> > > > >>>>>>>>>>>>>>>>>>
> > > resume().withFutureOutputWatermark(record.timestamp());
> > > > >>>>>>>>>>>>>>>>>>
> > > > >>>>>>>>>>>>>>>>>>>             }
> > > > >>>>>>>>>>>>>>>>>>>             context.output(record);
> > > > >>>>>>>>>>>>>>>>>>>           }
> > > > >>>>>>>>>>>>>>>>>>>         }
> > > > >>>>>>>>>>>>>>>>>>>       }
> > > > >>>>>>>>>>>>>>>>>>>     }
> > > > >>>>>>>>>>>>>>>>>>>
> > > > >>>>>>>>>>>>>>>>>>> The document describes in detail the motivations
> > > behind
> > > > >>>>>>>>>>>>>>>>>>>
> > > > >>>>>>>>>>>>>>>>>> this
> > > > >>>>>>>>>
> > > > >>>>>>>>>> feature,
> > > > >>>>>>>>>>>>>>>>
> > > > >>>>>>>>>>>>>>>>> the
> > > > >>>>>>>>>>>>>>>>>>
> > > > >>>>>>>>>>>>>>>>>>> basic idea and API, open questions, and outlines
> an
> > > > >>>>>>>>>>>>>>>>>>>
> > > > >>>>>>>>>>>>>>>>>> incremental
> > > > >>>>>>>>>>
> > > > >>>>>>>>>>> delivery
> > > > >>>>>>>>>>>>>>>>
> > > > >>>>>>>>>>>>>>>>> plan.
> > > > >>>>>>>>>>>>>>>>>>>
> > > > >>>>>>>>>>>>>>>>>>> The proposed API builds on the reflection-based
> new
> > > > DoFn
> > > > >>>>>>>>>>>>>>>>>>>
> > > > >>>>>>>>>>>>>>>>>> [new-do-fn]
> > > > >>>>>>>>>>>>>>>>
> > > > >>>>>>>>>>>>>>>>> and is
> > > > >>>>>>>>>>>>>>>>>>
> > > > >>>>>>>>>>>>>>>>>>> loosely related to "State and Timers for DoFn"
> > > > >>>>>>>>>>>>>>>>>>>
> > > > >>>>>>>>>>>>>>>>>> [beam-state].
> > > > >>>>>>>>>
> > > > >>>>>>>>>>
> > > > >>>>>>>>>>>>>>>>>>> Please take a look and comment!
> > > > >>>>>>>>>>>>>>>>>>>
> > > > >>>>>>>>>>>>>>>>>>> Thanks.
> > > > >>>>>>>>>>>>>>>>>>>
> > > > >>>>>>>>>>>>>>>>>>> [BEAM-65]
> > > > https://issues.apache.org/jira/browse/BEAM-65
> > > > >>>>>>>>>>>>>>>>>>> [new-do-fn] https://s.apache.org/a-new-do-fn
> > > > >>>>>>>>>>>>>>>>>>> [beam-state] https://s.apache.org/beam-state
> > > > >>>>>>>>>>>>>>>>>>>
> > > > >>>>>>>>>>>>>>>>>>>
> > > > >>>>>>>>>>>>>>>>>> --
> > > > >>>>>>>>>>>>>>>>>> Jean-Baptiste Onofré
> > > > >>>>>>>>>>>>>>>>>> [email protected] <javascript:;>
> > > > >>>>>>>>>>>>>>>>>> http://blog.nanthrax.net
> > > > >>>>>>>>>>>>>>>>>> Talend - http://www.talend.com
> > > > >>>>>>>>>>>>>>>>>>
> > > > >>>>>>>>>>>>>>>>>>
> > > > >>>>>>>>>>>>>>>>>
> > > > >>>>>>>>>>>>>>>
> > > > >>>>>>>>>>>>>
> > > > >>>>>>>>>>>>
> > > > >>>>>>>>>>
> > > > >>>>>>>>>
> > > > >>>>>>>>> --
> > > > >>>>>>>>> Thanks,
> > > > >>>>>>>>> Andrew
> > > > >>>>>>>>>
> > > > >>>>>>>>> Subscribe to my book: Streaming Data <
> > > http://manning.com/psaltis
> > > > >
> > > > >>>>>>>>> <https://www.linkedin.com/pub/andrew-psaltis/1/17b/306>
> > > > >>>>>>>>> twiiter: @itmdata <
> > > > >>> http://twitter.com/intent/user?screen_name=itmdata
> > > > >>>>>>>>>>
> > > > >>>>>>>>>
> > > > >>>>>>>>>
> > > > >>>>>>>>
> > > > >>>>>>>
> > > > >>>>>> --
> > > > >>>>>> Jean-Baptiste Onofré
> > > > >>>>>> [email protected]
> > > > >>>>>> http://blog.nanthrax.net
> > > > >>>>>> Talend - http://www.talend.com
> > > > >>>>>>
> > > > >>>>>>
> > > > >>>>>
> > > > >>>> --
> > > > >>>> Jean-Baptiste Onofré
> > > > >>>> [email protected]
> > > > >>>> http://blog.nanthrax.net
> > > > >>>> Talend - http://www.talend.com
> > > > >>>>
> > > > >>>
> > > > >>
> > > > >
> > > >
> > > > --
> > > > Jean-Baptiste Onofré
> > > > [email protected]
> > > > http://blog.nanthrax.net
> > > > Talend - http://www.talend.com
> > > >
> > >
> >
>

Reply via email to