Hi,
I have another question about this: currently, unbounded sources have
special logic for determining the watermark and the system periodically
asks the sources for the current watermark. As I understood it, watermarks
are only "generated" at the sources. How will this work when sources are
implemented as a combination of DoFns and SplittableDoFns? Will
SplittableDoFns be asked for a watermark, does this mean that watermarks
can then be "generated" at any operation?

Cheers,
Aljoscha

On Sun, 21 Aug 2016 at 22:34 Eugene Kirpichov <[email protected]>
wrote:

> Hi JB,
>
> Yes, I'm assuming you're referring to the "magic" part on the transform
> expansion diagram. This is indeed runner-specific, and timers+state are
> likely the simplest way to do this for an SDF that does unbounded amount of
> work.
>
> On Sun, Aug 21, 2016 at 12:14 PM Jean-Baptiste Onofré <[email protected]>
> wrote:
>
> > Anyway, from a runner perspective, we will have kind of API (part of the
> > Runner API) to "orchestrate" the SDF as we discussed during the call,
> > right ?
> >
> > Regards
> > JB
> >
> > On 08/21/2016 07:24 PM, Eugene Kirpichov wrote:
> > > Hi Aljoscha,
> > > This is an excellent question! And the answer is, we don't need any new
> > > concepts like "SDF executor" and can rely on the per-key state and
> timers
> > > machinery that already exists in all runners because it's necessary to
> > > implement windowing/triggering properly.
> > >
> > > Note that this is already somewhat addressed in the previously posted
> > State
> > > and Timers proposal https://s.apache.org/beam-state , under "per-key
> > > workflows".
> > >
> > > Think of it this way, using the Kafka example: we'll expand it into a
> > > transform:
> > >
> > > (1) ParDo { topic -> (unique key, topic, partition, [0, inf))) for
> > > partition in topic.listPartitions() }
> > > (2) GroupByKey
> > > (3) ParDo { key, topic, partition, R -> Kafka reader code in the
> > > proposal/slides }
> > >   - R is the OffsetRange restriction which in this case will be always
> of
> > > the form [startOffset, inf).
> > >   - there'll be just 1 value per key, but we use GBK to just get access
> > to
> > > the per-key state/timers machinery. This may be runner-specific; maybe
> > some
> > > runners don't need a GBK to do that.
> > >
> > > Now suppose the topic has two partitions, P1 and P2, and they get
> > assigned
> > > unique keys K1, K2.
> > > Then the input to (3) will be a collection of: (K1, topic, P1, [0,
> inf)),
> > > (K2, topic, P2, [0, inf)).
> > > Suppose we have just 1 worker with just 1 thread. Now, how will this
> > thread
> > > be able to produce elements from both P1 and P2? here's how.
> > >
> > > The thread will process (K1, topic, P1, [0, inf)), checkpoint after a
> > > certain time or after a certain number of elements are output (just
> like
> > > with the current UnboundedSource reading code) producing a residual
> > > restriction R1' (basically a new start timestamp), put R11 into the
> > per-key
> > > state and set a timer T1 to resume.
> > > Then it will process (K2, topic, P2, [0, inf)), do the same producing a
> > > residual restriction R2' and setting a timer T2 to resume.
> > > Then timer T1 will fire in the context of the key K1. The thread will
> > call
> > > processElement again, this time supplying R1' as the restriction; the
> > > process repeats and after a while it checkpoints and stores R1'' into
> > state
> > > of K1.
> > > Then timer T2 will fire in the context of K2, run processElement for a
> > > while, set a new timer and store R2'' into the state of K2.
> > > Etc.
> > > If partition 1 goes away, the processElement call will return "do not
> > > resume", so a timer will not be set and instead the state associated
> with
> > > K1 will be GC'd.
> > >
> > > So basically it's almost like cooperative thread scheduling: things run
> > for
> > > a while, until the runner tells them to checkpoint, then they set a
> timer
> > > to resume themselves, and the runner fires the timers, and the process
> > > repeats. And, again, this only requires things that runners can already
> > do
> > > - state and timers, but no new concept of SDF executor (and
> consequently
> > no
> > > necessity to choose/tune how many you need).
> > >
> > > Makes sense?
> > >
> > > On Sat, Aug 20, 2016 at 9:46 AM Aljoscha Krettek <[email protected]>
> > > wrote:
> > >
> > >> Hi,
> > >> I have another question that I think wasn't addressed in the meeting.
> At
> > >> least it wasn't mentioned in the notes.
> > >>
> > >> In the context of replacing sources by a combination of to SDFs, how
> do
> > you
> > >> determine how many "SDF executor" instances you need downstream? For
> the
> > >> sake of argument assume that both SDFs are executed with parallelism 1
> > (or
> > >> one per worker). Now, if you have a file source that reads from a
> static
> > >> set of files the first SDF would emit the filenames while the second
> SDF
> > >> would receive the filenames and emit their contents. This works well
> and
> > >> the downstream SDF can process one filename after the other. Now,
> think
> > of
> > >> something like a Kafka source. The first SDF would emit the partitions
> > (say
> > >> 4 partitions, in this example) and the second SDF would be responsible
> > for
> > >> reading from a topic and emitting elements. Reading from one topic
> never
> > >> finishes so you can't process the topics in series. I think you would
> > need
> > >> to have 4 downstream "SDF executor" instances. The question now is:
> how
> > do
> > >> you determine whether you are in the first or the second situation?
> > >>
> > >> Probably I'm just overlooking something and this is already dealt with
> > >> somewhere... :-)
> > >>
> > >> Cheers,
> > >> Aljoscha
> > >>
> > >> On Fri, 19 Aug 2016 at 21:02 Ismaël Mejía <[email protected]> wrote:
> > >>
> > >>> Hello,
> > >>>
> > >>> Thanks for the notes both Dan and Eugene, and for taking the time to
> do
> > >> the
> > >>> presentation and  answer our questions.
> > >>>
> > >>> I mentioned the ongoing work on dynamic scaling on Flink because I
> > >> suppose
> > >>> that it will address dynamic rebalancing eventually (there are
> multiple
> > >>> changes going on for dynamic scaling).
> > >>>
> > >>>
> > >>>
> > >>
> >
> https://docs.google.com/document/d/1G1OS1z3xEBOrYD4wSu-LuBCyPUWyFd9l3T9WyssQ63w/edit#heading=h.2suhu1fjxmp4
> > >>>
> > >>>
> https://lists.apache.org/[email protected]:lte=1M:FLIP-8
> > >>>
> > >>> Anyway I am far from an expert on flink, but probably the flink guys
> > can
> > >>> give their opinion about this and refer to a more precise document
> that
> > >> the
> > >>> ones I mentioned..
> > >>>
> > >>> ​Thanks again,
> > >>> Ismaël​
> > >>>
> > >>> On Fri, Aug 19, 2016 at 8:52 PM, Jean-Baptiste Onofré <
> [email protected]
> > >
> > >>> wrote:
> > >>>
> > >>>> Great summary Eugene and Dan.
> > >>>>
> > >>>> And thanks again for the details, explanation, and discussion.
> > >>>>
> > >>>> Regards
> > >>>> JB
> > >>>>
> > >>>>
> > >>>> On 08/19/2016 08:16 PM, Eugene Kirpichov wrote:
> > >>>>
> > >>>>> Thanks for attending, everybody!
> > >>>>>
> > >>>>> Here are meeting notes (thanks Dan!).
> > >>>>>
> > >>>>> Q: Will SplittableDoFn enable better repartitioning of the
> > >> input/output
> > >>>>> data?
> > >>>>> A: Not really; repartitioning is orthogonal to SDF.
> > >>>>>
> > >>>>> Current Source API suffers from lack of composition and scalability
> > >>>>> because
> > >>>>> we treat sources too much as metadata, not enough as data.
> > >>>>>
> > >>>>> Q(slide with transform expansion): who does the "magic"?
> > >>>>> A: The runner. Checkpointing and dynamically splitting restrictions
> > >> will
> > >>>>> require collaboration with the runner.
> > >>>>>
> > >>>>> Q: How does the runner interact with the DoFn to control the
> > >>> restrictions?
> > >>>>> Is it related to the centralized job tracker etc.?
> > >>>>> A: RestrictionTracker is a simple helper object, that exists purely
> > on
> > >>> the
> > >>>>> worker while executing a single partition, and interacts with the
> > >> worker
> > >>>>> harness part of the runner. Not to be confused with the centralized
> > >> job
> > >>>>> tracker (master) - completely unrelated. Worker harness, of course,
> > >>>>> interacts with the master in some relevant ways (e.g. Dataflow
> master
> > >>> can
> > >>>>> tell "you're a straggler, you should split").
> > >>>>>
> > >>>>> Q: Is this a new DoFn subclass, or how will this integrate with the
> > >>>>> existing code?
> > >>>>> A: It's a feature of reflection-based DoFn (
> > >>> https://s.apache.org/a-new-do
> > >>>>> fn)
> > >>>>> - just another optional parameter of type RestrictionTracker to
> > >>>>> processElement() which is dynamically bound via reflection, so
> fully
> > >>>>> backward/forward compatible, and looks to users like a regular
> DoFn.
> > >>>>>
> > >>>>> Q: why is fractionClaimed a double?
> > >>>>> A: It's a number in 0..1. Job-wide magic (e.g. autoscaling, dynamic
> > >>>>> rebalancing) requires a uniform way to represent progress through
> > >>>>> different
> > >>>>> sources.
> > >>>>>
> > >>>>> Q: Spark runner is microbatch-based, so this seems to map well onto
> > >>>>> checkpoint/resume, right?
> > >>>>> A: Yes; actually the Dataflow runner is, at a worker level, also
> > >>>>> microbatch-based. The way SDF interacts with a runner will be very
> > >>> similar
> > >>>>> to how a Bounded/UnboundedSource interacts with a runner.
> > >>>>>
> > >>>>> Q: Using SDF, what would be the "packaging" of the IO?
> > >>>>> A: Same as currently: package IO's as PTransforms and their
> > >>> implementation
> > >>>>> under the hood can be anything: Source, simple ParDo's, SDF, etc.
> > E.g.
> > >>>>> Datastore was recently refactored from BoundedSource to ParDo
> (ended
> > >> up
> > >>>>> simpler and more scalable), transparently to users.
> > >>>>>
> > >>>>> Q: What's the timeline; what to do with the IOs currently in
> > >>> development?
> > >>>>> A: Timeline is O(months). Keep doing what you're doing and working
> on
> > >>> top
> > >>>>> of Source APIs when necessary and simple ParDo's otherwise.
> > >>>>>
> > >>>>> Q: What's the impact for the runner writers?
> > >>>>> A: Tentatively expected that most of the code for running an SDF
> will
> > >> be
> > >>>>> common to runners, with some amount of per-runner glue code, just
> > like
> > >>>>> GBK/windowing/triggering. Impact on Dataflow runner is larger since
> > it
> > >>>>> supports dynamic rebalancing in batch mode and this is the hardest
> > >> part,
> > >>>>> but for other runners shouldn't be too hard.
> > >>>>>
> > >>>>> JB: Talend has people who can help with this: e.g. help integrate
> > into
> > >>>>> Spark runner, refactor IOs etc. Amit also willing to chat about
> > >>> supporting
> > >>>>> SDF in Spark runner.
> > >>>>>
> > >>>>> Ismael: There's a Flink proposal about dynamic rebalancing. Ismael
> > >> will
> > >>>>> send a link.
> > >>>>>
> > >>>>>
> > >>>>> On Fri, Aug 19, 2016 at 7:21 AM Jean-Baptiste Onofré <
> > [email protected]
> > >>>
> > >>>>> wrote:
> > >>>>>
> > >>>>> Hi Eugene,
> > >>>>>>
> > >>>>>> thanks for the reminder.
> > >>>>>>
> > >>>>>> Just to prepare some topics for the call, please find some points:
> > >>>>>>
> > >>>>>> 1. Using SDF, what would be the "packaging" of the IO ? It sounds
> to
> > >> me
> > >>>>>> that we can keep the IO packaging style (using with* setters for
> the
> > >> IO
> > >>>>>> configuration) and replace PTransform, Source, Reader, ...
> directly
> > >>> with
> > >>>>>> SDF. Correct ?
> > >>>>>>
> > >>>>>> 2. What's your plan in term of release to include SDF ? We have
> > >> several
> > >>>>>> IOs in preparation and I wonder if it's worth to start to use the
> > new
> > >>>>>> SDF API or not.
> > >>>>>>
> > >>>>>> 3. What's the impact for the runner writers ? The runners will
> have
> > >> to
> > >>>>>> support SDF, that could be tricky depending of the execution
> engine.
> > >> In
> > >>>>>> the worst case where the runner can't fully support SDF, does it
> > mean
> > >>>>>> that most of our IOs will be useless ?
> > >>>>>>
> > >>>>>> Just my dumb topics ;)
> > >>>>>>
> > >>>>>> Thanks,
> > >>>>>> See you at 8am !
> > >>>>>>
> > >>>>>> Regards
> > >>>>>> JB
> > >>>>>>
> > >>>>>> On 08/19/2016 02:29 AM, Eugene Kirpichov wrote:
> > >>>>>>
> > >>>>>>> Hello everybody,
> > >>>>>>>
> > >>>>>>> Just a reminder:
> > >>>>>>>
> > >>>>>>> The meeting is happening tomorrow - Friday Aug 19th, 8am-9am PST,
> > to
> > >>>>>>> join
> > >>>>>>> the call go to
> > >>>>>>> https://hangouts.google.com/hangouts/_/google.com/splittabledofn
> .
> > >>>>>>> I intend to go over the proposed design and then have a free-form
> > >>>>>>> discussion.
> > >>>>>>>
> > >>>>>>> Please have a skim through the proposal doc:
> https://s.apache.org/
> > >>>>>>> splittable-do-fn
> > >>>>>>> I also made some slides that are basically a trimmed-down version
> > of
> > >>> the
> > >>>>>>> doc to use as a guide when conducting the meeting,
> > >>>>>>>
> > >>>>>>> https://docs.google.com/presentation/d/1DfCp9VRLH-5Ap5nbz047
> > >>>>>> Fiv3NfjrywJVSdef5dRY4aE/edit?usp=sharing
> > >>>>>>
> > >>>>>>> .
> > >>>>>>>
> > >>>>>>> I will post notes from the meeting on this thread afterwards.
> > >>>>>>>
> > >>>>>>> Thanks, looking forward.
> > >>>>>>>
> > >>>>>>> On Fri, Aug 12, 2016 at 5:35 PM Dan Halperin
> > >>>>>>> <[email protected]
> > >>>>>>>
> > >>>>>>> wrote:
> > >>>>>>>
> > >>>>>>> This is pretty cool! I'll be there too. (unless the hangout gets
> > too
> > >>>>>>>>
> > >>>>>>> full
> > >>>>>>
> > >>>>>>> -- if so, I'll drop out in favor of others who aren't lucky
> enough
> > >> to
> > >>>>>>>>
> > >>>>>>> get
> > >>>>>>
> > >>>>>>> to talk to Eugene all the time.)
> > >>>>>>>>
> > >>>>>>>> On Fri, Aug 12, 2016 at 4:03 PM, Andrew Psaltis <
> > >>>>>>>>
> > >>>>>>> [email protected]>
> > >>>>>>
> > >>>>>>> wrote:
> > >>>>>>>>
> > >>>>>>>> +1 I'll join
> > >>>>>>>>>
> > >>>>>>>>> On Friday, August 12, 2016, Aparup Banerjee (apbanerj) <
> > >>>>>>>>>
> > >>>>>>>> [email protected]
> > >>>>>>>>
> > >>>>>>>>>
> > >>>>>>>>>> wrote:
> > >>>>>>>>>
> > >>>>>>>>> + 1, me2
> > >>>>>>>>>>
> > >>>>>>>>>>
> > >>>>>>>>>>
> > >>>>>>>>>>
> > >>>>>>>>>> On 8/12/16, 9:27 AM, "Amit Sela" <[email protected]
> > >>>>>>>>>>
> > >>>>>>>>> <javascript:;>>
> > >>>>>>
> > >>>>>>> wrote:
> > >>>>>>>>>>
> > >>>>>>>>>> +1 as in I'll join ;-)
> > >>>>>>>>>>>
> > >>>>>>>>>>> On Fri, Aug 12, 2016, 19:14 Eugene Kirpichov
> > >>>>>>>>>>>
> > >>>>>>>>>> <[email protected]
> > >>>>>>>>>
> > >>>>>>>>>>
> > >>>>>>>>>>> wrote:
> > >>>>>>>>>>>
> > >>>>>>>>>>> Sounds good, thanks!
> > >>>>>>>>>>>> Then Friday Aug 19th it is, 8am-9am PST,
> > >>>>>>>>>>>> https://staging.talkgadget.google.com/hangouts/_/google.
> > >>>>>>>>>>>>
> > >>>>>>>>>>> com/splittabledofn
> > >>>>>>>>>>
> > >>>>>>>>>>>
> > >>>>>>>>>>>> On Thu, Aug 11, 2016 at 11:12 PM Jean-Baptiste Onofré <
> > >>>>>>>>>>>>
> > >>>>>>>>>>> [email protected]
> > >>>>>>>>>
> > >>>>>>>>>> <javascript:;>>
> > >>>>>>>>>>
> > >>>>>>>>>>> wrote:
> > >>>>>>>>>>>>
> > >>>>>>>>>>>> Hi
> > >>>>>>>>>>>>>
> > >>>>>>>>>>>>> Unfortunately I will be in Ireland on August 15th. What
> about
> > >>>>>>>>>>>>>
> > >>>>>>>>>>>> Friday
> > >>>>>>>>
> > >>>>>>>>> 19th ?
> > >>>>>>>>>>>>
> > >>>>>>>>>>>>>
> > >>>>>>>>>>>>> Regards
> > >>>>>>>>>>>>> JB
> > >>>>>>>>>>>>>
> > >>>>>>>>>>>>>
> > >>>>>>>>>>>>>
> > >>>>>>>>>>>>> On Aug 11, 2016, 23:22, at 23:22, Eugene Kirpichov
> > >>>>>>>>>>>>> <[email protected]> wrote:
> > >>>>>>>>>>>>>
> > >>>>>>>>>>>>>> Hi JB,
> > >>>>>>>>>>>>>>
> > >>>>>>>>>>>>>> Sounds great, does the suggested time over videoconference
> > >> work
> > >>>>>>>>>>>>>>
> > >>>>>>>>>>>>> for
> > >>>>>>>>
> > >>>>>>>>> you?
> > >>>>>>>>>>>>>>
> > >>>>>>>>>>>>>> On Thu, Aug 11, 2016 at 11:59 AM Jean-Baptiste Onofré <
> > >>>>>>>>>>>>>>
> > >>>>>>>>>>>>> [email protected] <javascript:;>>
> > >>>>>>>>>>
> > >>>>>>>>>>> wrote:
> > >>>>>>>>>>>>>>
> > >>>>>>>>>>>>>> Hi Eugene
> > >>>>>>>>>>>>>>>
> > >>>>>>>>>>>>>>> May we talk together next week ? I like the proposal. I
> > >> would
> > >>>>>>>>>>>>>>>
> > >>>>>>>>>>>>>> just
> > >>>>>>>>>
> > >>>>>>>>>> need
> > >>>>>>>>>>>>>>
> > >>>>>>>>>>>>>>> some details for my understanding.
> > >>>>>>>>>>>>>>>
> > >>>>>>>>>>>>>>> Thanks
> > >>>>>>>>>>>>>>> Regards
> > >>>>>>>>>>>>>>> JB
> > >>>>>>>>>>>>>>>
> > >>>>>>>>>>>>>>>
> > >>>>>>>>>>>>>>>
> > >>>>>>>>>>>>>>> On Aug 11, 2016, 19:46, at 19:46, Eugene Kirpichov
> > >>>>>>>>>>>>>>> <[email protected]> wrote:
> > >>>>>>>>>>>>>>>
> > >>>>>>>>>>>>>>>> Hi JB,
> > >>>>>>>>>>>>>>>>
> > >>>>>>>>>>>>>>>> What are your thoughts on this?
> > >>>>>>>>>>>>>>>>
> > >>>>>>>>>>>>>>>> I'm also thinking of having a virtual meeting to explain
> > >> more
> > >>>>>>>>>>>>>>>>
> > >>>>>>>>>>>>>>> about
> > >>>>>>>>>>
> > >>>>>>>>>>> this
> > >>>>>>>>>>>>>>>> proposal if necessary, since I understand it is a lot to
> > >>>>>>>>>>>>>>>>
> > >>>>>>>>>>>>>>> digest.
> > >>>>>>>>
> > >>>>>>>>>
> > >>>>>>>>>>>>>>>> How about: Monday Aug 15, 8am-9am Pacific time, over
> > >>> Hangouts?
> > >>>>>>>>>>>>>>>> (link:
> > >>>>>>>>>>>>>>>>
> > >>>>>>>>>>>>>>>>
> > >>>>>>>>>>>>>>>
> > >>>>>>>>>>>>>>
> > >>>>>>>>>>>>> https://staging.talkgadget.google.com/hangouts/_/google.
> > >>>>>>>>>>>>
> > >>>>>>>>>>> com/splittabledofn
> > >>>>>>>>>>
> > >>>>>>>>>>> -
> > >>>>>>>>>>>>>>>> I confirmed that it can be joined without being logged
> > >> into a
> > >>>>>>>>>>>>>>>>
> > >>>>>>>>>>>>>>> Google
> > >>>>>>>>>>
> > >>>>>>>>>>> account)
> > >>>>>>>>>>>>>>>>
> > >>>>>>>>>>>>>>>> Who'd be interested in attending, and does this
> time/date
> > >>> work
> > >>>>>>>>>>>>>>>>
> > >>>>>>>>>>>>>>> for
> > >>>>>>>>>
> > >>>>>>>>>> people?
> > >>>>>>>>>>>>>>>>
> > >>>>>>>>>>>>>>>> On Fri, Aug 5, 2016 at 10:48 AM Eugene Kirpichov
> > >>>>>>>>>>>>>>>>
> > >>>>>>>>>>>>>>> <[email protected] <javascript:;>>
> > >>>>>>>>>>>>>>
> > >>>>>>>>>>>>>>> wrote:
> > >>>>>>>>>>>>>>>>
> > >>>>>>>>>>>>>>>> Hi JB, thanks for reading and for your comments!
> > >>>>>>>>>>>>>>>>>
> > >>>>>>>>>>>>>>>>> It sounds like you are concerned about continued
> support
> > >> for
> > >>>>>>>>>>>>>>>>>
> > >>>>>>>>>>>>>>>> existing
> > >>>>>>>>>>>>>>
> > >>>>>>>>>>>>>>> IO's
> > >>>>>>>>>>>>>>>>
> > >>>>>>>>>>>>>>>>> people have developed, and about backward
> compatibility?
> > >>>>>>>>>>>>>>>>>
> > >>>>>>>>>>>>>>>>> We do not need to remove the Source API, and all
> existing
> > >>>>>>>>>>>>>>>>>
> > >>>>>>>>>>>>>>>> Source-based
> > >>>>>>>>>>>>>>>>
> > >>>>>>>>>>>>>>>>> connectors will continue to work [though the document
> > >>>>>>>>>>>>>>>>>
> > >>>>>>>>>>>>>>>> proposes
> > >>>>>>>>
> > >>>>>>>>> at
> > >>>>>>>>>>
> > >>>>>>>>>>> some
> > >>>>>>>>>>>>>>>>
> > >>>>>>>>>>>>>>>>> point to make Read.from(Source) to translate to a
> wrapper
> > >>>>>>>>>>>>>>>>>
> > >>>>>>>>>>>>>>>> SDF
> > >>>>>>>>
> > >>>>>>>>> under
> > >>>>>>>>>>>>>>
> > >>>>>>>>>>>>>>> the
> > >>>>>>>>>>>>>>>>
> > >>>>>>>>>>>>>>>>> hood, to exercise the feature more and to make sure
> that
> > >> it
> > >>>>>>>>>>>>>>>>>
> > >>>>>>>>>>>>>>>> is
> > >>>>>>>>
> > >>>>>>>>> strictly
> > >>>>>>>>>>>>>>>>
> > >>>>>>>>>>>>>>>>> more powerful - but this is an optional implementation
> > >>>>>>>>>>>>>>>>>
> > >>>>>>>>>>>>>>>> detail].
> > >>>>>>>>>
> > >>>>>>>>>>
> > >>>>>>>>>>>>>>>>> Perhaps the document phrases this too strongly -
> > >> "replacing
> > >>>>>>>>>>>>>>>>>
> > >>>>>>>>>>>>>>>> the
> > >>>>>>>>>
> > >>>>>>>>>> Source
> > >>>>>>>>>>>>>>>>
> > >>>>>>>>>>>>>>>>> API": a better phrasing would be "introducing a new API
> > so
> > >>>>>>>>>>>>>>>>>
> > >>>>>>>>>>>>>>>> powerful
> > >>>>>>>>>>>>>>
> > >>>>>>>>>>>>>>> and
> > >>>>>>>>>>>>>>>>
> > >>>>>>>>>>>>>>>>> easy-to-use that hopefully people will choose it over
> the
> > >>>>>>>>>>>>>>>>>
> > >>>>>>>>>>>>>>>> Source
> > >>>>>>>>>
> > >>>>>>>>>> API
> > >>>>>>>>>>>>>>
> > >>>>>>>>>>>>>>> all
> > >>>>>>>>>>>>>>>>
> > >>>>>>>>>>>>>>>>> the time, even though they don't have to" :) And we can
> > >>>>>>>>>>>>>>>>>
> > >>>>>>>>>>>>>>>> discuss
> > >>>>>>>>>
> > >>>>>>>>>> whether or
> > >>>>>>>>>>>>>>>>
> > >>>>>>>>>>>>>>>>> not to actually deprecate/remove the Source API at some
> > >>>>>>>>>>>>>>>>>
> > >>>>>>>>>>>>>>>> point
> > >>>>>>>>
> > >>>>>>>>> down
> > >>>>>>>>>>
> > >>>>>>>>>>> the
> > >>>>>>>>>>>>>>>>
> > >>>>>>>>>>>>>>>>> road, once it becomes clear whether this is the case or
> > >> not.
> > >>>>>>>>>>>>>>>>>
> > >>>>>>>>>>>>>>>>> To give more context: this proposal came out of
> > >> discussions
> > >>>>>>>>>>>>>>>>>
> > >>>>>>>>>>>>>>>> within
> > >>>>>>>>>>
> > >>>>>>>>>>> the SDK
> > >>>>>>>>>>>>>>>>
> > >>>>>>>>>>>>>>>>> team over the past ~1.5 years, before the Beam project
> > >>>>>>>>>>>>>>>>>
> > >>>>>>>>>>>>>>>> existed,
> > >>>>>>>>>
> > >>>>>>>>>> on
> > >>>>>>>>>>
> > >>>>>>>>>>> how to
> > >>>>>>>>>>>>>>>>
> > >>>>>>>>>>>>>>>>> make major improvements to the Source API; perhaps it
> > will
> > >>>>>>>>>>>>>>>>>
> > >>>>>>>>>>>>>>>> clarify
> > >>>>>>>>>>
> > >>>>>>>>>>> things
> > >>>>>>>>>>>>>>>>
> > >>>>>>>>>>>>>>>>> if I give a history of the ideas discussed:
> > >>>>>>>>>>>>>>>>> - The first idea was to introduce a
> > >>>>>>>>>>>>>>>>>
> > >>>>>>>>>>>>>>>> Read.from(PCollection<Source>)
> > >>>>>>>>>>
> > >>>>>>>>>>> transform while keeping the Source API intact - this, given
> > >>>>>>>>>>>>>>>>>
> > >>>>>>>>>>>>>>>> appropriate
> > >>>>>>>>>>>>>>>>
> > >>>>>>>>>>>>>>>>> implementation, would solve most of the scalability and
> > >>>>>>>>>>>>>>>>>
> > >>>>>>>>>>>>>>>> composability
> > >>>>>>>>>>>>>>
> > >>>>>>>>>>>>>>> issues of IO's. Then most connectors would look like :
> > >>>>>>>>>>>>>>>>>
> > >>>>>>>>>>>>>>>> ParDo<A,
> > >>>>>>>>>
> > >>>>>>>>>> Source<B>>
> > >>>>>>>>>>>>>>>>
> > >>>>>>>>>>>>>>>>> + Read.from().
> > >>>>>>>>>>>>>>>>> - Then we figured that the Source class is an
> unnecessary
> > >>>>>>>>>>>>>>>>>
> > >>>>>>>>>>>>>>>> abstraction, as
> > >>>>>>>>>>>>>>>>
> > >>>>>>>>>>>>>>>>> it simply holds data. What if we only had a Reader<S,
> B>
> > >>>>>>>>>>>>>>>>>
> > >>>>>>>>>>>>>>>> class
> > >>>>>>>>
> > >>>>>>>>> where
> > >>>>>>>>>>>>>>
> > >>>>>>>>>>>>>>> S is
> > >>>>>>>>>>>>>>>>
> > >>>>>>>>>>>>>>>>> the source type and B the output type? Then connectors
> > >> would
> > >>>>>>>>>>>>>>>>>
> > >>>>>>>>>>>>>>>> be
> > >>>>>>>>>
> > >>>>>>>>>> something
> > >>>>>>>>>>>>>>>>
> > >>>>>>>>>>>>>>>>> like: ParDo<A, S> + hypothetical Read.using(Reader<S,
> > B>).
> > >>>>>>>>>>>>>>>>> - Then somebody remarked that some of the features of
> > >> Source
> > >>>>>>>>>>>>>>>>>
> > >>>>>>>>>>>>>>>> are
> > >>>>>>>>>
> > >>>>>>>>>> useful to
> > >>>>>>>>>>>>>>>>
> > >>>>>>>>>>>>>>>>> ParDo's as well: e.g. ability to report progress when
> > >>>>>>>>>>>>>>>>>
> > >>>>>>>>>>>>>>>> processing a
> > >>>>>>>>>>
> > >>>>>>>>>>> very
> > >>>>>>>>>>>>>>>>
> > >>>>>>>>>>>>>>>>> heavy element, or ability to produce very large output
> in
> > >>>>>>>>>>>>>>>>>
> > >>>>>>>>>>>>>>>> parallel.
> > >>>>>>>>>>>>>>
> > >>>>>>>>>>>>>>> - The two previous bullets were already hinting that the
> > >>>>>>>>>>>>>>>>>
> > >>>>>>>>>>>>>>>> Read.using()
> > >>>>>>>>>>>>>>
> > >>>>>>>>>>>>>>> primitive might not be so special: it just takes S and
> > >>>>>>>>>>>>>>>>>
> > >>>>>>>>>>>>>>>> produces
> > >>>>>>>>>
> > >>>>>>>>>> B:
> > >>>>>>>>>>
> > >>>>>>>>>>> isn't
> > >>>>>>>>>>>>>>>>
> > >>>>>>>>>>>>>>>>> that what a ParDo does, plus some source magic, minus
> the
> > >>>>>>>>>>>>>>>>>
> > >>>>>>>>>>>>>>>> convenience
> > >>>>>>>>>>>>>>
> > >>>>>>>>>>>>>>> of
> > >>>>>>>>>>>>>>>>
> > >>>>>>>>>>>>>>>>> c.output() vs. the start/advance() state machine?
> > >>>>>>>>>>>>>>>>> - At this point it became clear that we should explore
> > >>>>>>>>>>>>>>>>>
> > >>>>>>>>>>>>>>>> unifying
> > >>>>>>>>>
> > >>>>>>>>>> sources
> > >>>>>>>>>>>>>>>>
> > >>>>>>>>>>>>>>>>> and ParDo's, in particular: can we bring the magic of
> > >>>>>>>>>>>>>>>>>
> > >>>>>>>>>>>>>>>> sources
> > >>>>>>>>
> > >>>>>>>>> to
> > >>>>>>>>>
> > >>>>>>>>>> ParDo's
> > >>>>>>>>>>>>>>>>
> > >>>>>>>>>>>>>>>>> but without the limitations and coding inconveniences?
> > And
> > >>>>>>>>>>>>>>>>>
> > >>>>>>>>>>>>>>>> this
> > >>>>>>>>>
> > >>>>>>>>>> is
> > >>>>>>>>>>
> > >>>>>>>>>>> how
> > >>>>>>>>>>>>>>>>
> > >>>>>>>>>>>>>>>>> SplittableDoFn was born: bringing source magic to a
> DoFn
> > >> by
> > >>>>>>>>>>>>>>>>>
> > >>>>>>>>>>>>>>>> providing
> > >>>>>>>>>>>>>>
> > >>>>>>>>>>>>>>> a
> > >>>>>>>>>>>>>>>>
> > >>>>>>>>>>>>>>>>> RangeTracker.
> > >>>>>>>>>>>>>>>>> - Once the idea of "splittable DoFn's" was born, it
> > became
> > >>>>>>>>>>>>>>>>>
> > >>>>>>>>>>>>>>>> clear
> > >>>>>>>>>
> > >>>>>>>>>> that
> > >>>>>>>>>>>>>>
> > >>>>>>>>>>>>>>> it
> > >>>>>>>>>>>>>>>>
> > >>>>>>>>>>>>>>>>> is strictly more general than sources; at least, in the
> > >>>>>>>>>>>>>>>>>
> > >>>>>>>>>>>>>>>> respect
> > >>>>>>>>>
> > >>>>>>>>>> that
> > >>>>>>>>>>>>>>
> > >>>>>>>>>>>>>>> sources have to produce output, while DoFn's don't: an
> SDF
> > >>>>>>>>>>>>>>>>>
> > >>>>>>>>>>>>>>>> may
> > >>>>>>>>
> > >>>>>>>>> very
> > >>>>>>>>>>>>>>
> > >>>>>>>>>>>>>>> well
> > >>>>>>>>>>>>>>>>
> > >>>>>>>>>>>>>>>>> produce no output at all, and simply perform a side
> > effect
> > >>>>>>>>>>>>>>>>>
> > >>>>>>>>>>>>>>>> in
> > >>>>>>>>
> > >>>>>>>>> a
> > >>>>>>>>>
> > >>>>>>>>>> parallel/resumable way.
> > >>>>>>>>>>>>>>>>> - Then there were countless hours of discussions on
> > >> unifying
> > >>>>>>>>>>>>>>>>>
> > >>>>>>>>>>>>>>>> the
> > >>>>>>>>>
> > >>>>>>>>>> bounded/unbounded cases, on the particulars of RangeTracker
> > >>>>>>>>>>>>>>>>>
> > >>>>>>>>>>>>>>>> APIs
> > >>>>>>>>>
> > >>>>>>>>>> reconciling parallelization and checkpointing, what the
> > >>>>>>>>>>>>>>>>>
> > >>>>>>>>>>>>>>>> relation
> > >>>>>>>>>
> > >>>>>>>>>> between
> > >>>>>>>>>>>>>>>>
> > >>>>>>>>>>>>>>>>> SDF and DF should be, etc. They culminated in the
> current
> > >>>>>>>>>>>>>>>>>
> > >>>>>>>>>>>>>>>> proposal.
> > >>>>>>>>>>>>>>
> > >>>>>>>>>>>>>>> The
> > >>>>>>>>>>>>>>>>
> > >>>>>>>>>>>>>>>>> proposal comes at a time when a couple of key
> ingredients
> > >>>>>>>>>>>>>>>>>
> > >>>>>>>>>>>>>>>> are
> > >>>>>>>>
> > >>>>>>>>> (almost)
> > >>>>>>>>>>>>>>>>
> > >>>>>>>>>>>>>>>>> ready: NewDoFn to make SDF look like a regular DoFn,
> and
> > >> the
> > >>>>>>>>>>>>>>>>>
> > >>>>>>>>>>>>>>>> State/Timers
> > >>>>>>>>>>>>>>>>
> > >>>>>>>>>>>>>>>>> proposal to enable unbounded work per element.
> > >>>>>>>>>>>>>>>>>
> > >>>>>>>>>>>>>>>>> To put it shortly:
> > >>>>>>>>>>>>>>>>> - Yes, we will support existing Source connectors, and
> > >> will
> > >>>>>>>>>>>>>>>>>
> > >>>>>>>>>>>>>>>> support
> > >>>>>>>>>>>>>>
> > >>>>>>>>>>>>>>> writing new ones, possibly forever. There is no
> > interference
> > >>>>>>>>>>>>>>>>>
> > >>>>>>>>>>>>>>>> with
> > >>>>>>>>>>
> > >>>>>>>>>>> current
> > >>>>>>>>>>>>>>>>
> > >>>>>>>>>>>>>>>>> users of Source.
> > >>>>>>>>>>>>>>>>> - The new API is an attempt to improve the Source API,
> > >> taken
> > >>>>>>>>>>>>>>>>>
> > >>>>>>>>>>>>>>>> to
> > >>>>>>>>>
> > >>>>>>>>>> its
> > >>>>>>>>>>>>>>
> > >>>>>>>>>>>>>>> logical limit where it turns out that users' goals can be
> > >>>>>>>>>>>>>>>>>
> > >>>>>>>>>>>>>>>> accomplished
> > >>>>>>>>>>>>>>>>
> > >>>>>>>>>>>>>>>>> easier and more generically entirely within ParDo's.
> > >>>>>>>>>>>>>>>>>
> > >>>>>>>>>>>>>>>>> Let me know what you think, and thanks again!
> > >>>>>>>>>>>>>>>>>
> > >>>>>>>>>>>>>>>>> On Fri, Aug 5, 2016 at 2:39 AM Jean-Baptiste Onofré
> > >>>>>>>>>>>>>>>>>
> > >>>>>>>>>>>>>>>> <[email protected] <javascript:;>>
> > >>>>>>>>>>>>>>
> > >>>>>>>>>>>>>>> wrote:
> > >>>>>>>>>>>>>>>>>
> > >>>>>>>>>>>>>>>>> Hi Eugene,
> > >>>>>>>>>>>>>>>>>>
> > >>>>>>>>>>>>>>>>>> Just a question: why is it in DoFn and note an
> > >> improvement
> > >>>>>>>>>>>>>>>>>>
> > >>>>>>>>>>>>>>>>> of
> > >>>>>>>>
> > >>>>>>>>> Source
> > >>>>>>>>>>>>>>
> > >>>>>>>>>>>>>>> ?
> > >>>>>>>>>>>>>>>>
> > >>>>>>>>>>>>>>>>>
> > >>>>>>>>>>>>>>>>>> If I understand correctly, it means that we will have
> to
> > >>>>>>>>>>>>>>>>>>
> > >>>>>>>>>>>>>>>>> refactore
> > >>>>>>>>>>>>>>
> > >>>>>>>>>>>>>>> all
> > >>>>>>>>>>>>>>>>
> > >>>>>>>>>>>>>>>>> existing IO: basically, what you propose is to remove
> all
> > >>>>>>>>>>>>>>>>>>
> > >>>>>>>>>>>>>>>>> Source
> > >>>>>>>>>>
> > >>>>>>>>>>> to
> > >>>>>>>>>>>>>>
> > >>>>>>>>>>>>>>> replace with NewDoFn.
> > >>>>>>>>>>>>>>>>>>
> > >>>>>>>>>>>>>>>>>> I'm concern with this approach, especially in term of
> > >>>>>>>>>>>>>>>>>>
> > >>>>>>>>>>>>>>>>> timing:
> > >>>>>>>>
> > >>>>>>>>> clearly,
> > >>>>>>>>>>>>>>>>
> > >>>>>>>>>>>>>>>>> the IO is the area where we have to move forward in
> Beam
> > >> as
> > >>>>>>>>>>>>>>>>>>
> > >>>>>>>>>>>>>>>>> it
> > >>>>>>>>>
> > >>>>>>>>>> will
> > >>>>>>>>>>>>>>
> > >>>>>>>>>>>>>>> allow new users to start in their projects.
> > >>>>>>>>>>>>>>>>>> So, we started to bring new IOs: Kafka, JMS,
> Cassandra,
> > >>>>>>>>>>>>>>>>>>
> > >>>>>>>>>>>>>>>>> MongoDB,
> > >>>>>>>>>>
> > >>>>>>>>>>> JDBC,
> > >>>>>>>>>>>>>>>>
> > >>>>>>>>>>>>>>>>> ... and some people started to learn the IO API
> > >>>>>>>>>>>>>>>>>>
> > >>>>>>>>>>>>>>>>> (Bounded/Unbouded
> > >>>>>>>>>>
> > >>>>>>>>>>> source, etc).
> > >>>>>>>>>>>>>>>>>>
> > >>>>>>>>>>>>>>>>>> I think it would make more sense to enhance the IO API
> > >>>>>>>>>>>>>>>>>>
> > >>>>>>>>>>>>>>>>> (Source)
> > >>>>>>>>>
> > >>>>>>>>>> instead
> > >>>>>>>>>>>>>>>>
> > >>>>>>>>>>>>>>>>> of introducing a NewDoFn.
> > >>>>>>>>>>>>>>>>>>
> > >>>>>>>>>>>>>>>>>> What are your thoughts for IO writer like me ? ;)
> > >>>>>>>>>>>>>>>>>>
> > >>>>>>>>>>>>>>>>>> Regards
> > >>>>>>>>>>>>>>>>>> JB
> > >>>>>>>>>>>>>>>>>>
> > >>>>>>>>>>>>>>>>>> On 08/04/2016 07:45 PM, Eugene Kirpichov wrote:
> > >>>>>>>>>>>>>>>>>>
> > >>>>>>>>>>>>>>>>>>> Hello Beam community,
> > >>>>>>>>>>>>>>>>>>>
> > >>>>>>>>>>>>>>>>>>> We (myself, Daniel Mills and Robert Bradshaw) would
> > like
> > >>>>>>>>>>>>>>>>>>>
> > >>>>>>>>>>>>>>>>>> to
> > >>>>>>>>
> > >>>>>>>>> propose
> > >>>>>>>>>>>>>>>>
> > >>>>>>>>>>>>>>>>> "Splittable DoFn" - a major generalization of DoFn,
> which
> > >>>>>>>>>>>>>>>>>>>
> > >>>>>>>>>>>>>>>>>> allows
> > >>>>>>>>>>>>>>
> > >>>>>>>>>>>>>>> processing
> > >>>>>>>>>>>>>>>>>>
> > >>>>>>>>>>>>>>>>>>> of a single element to be non-monolithic, i.e.
> > >>>>>>>>>>>>>>>>>>>
> > >>>>>>>>>>>>>>>>>> checkpointable
> > >>>>>>>>>
> > >>>>>>>>>> and
> > >>>>>>>>>>>>>>
> > >>>>>>>>>>>>>>> parallelizable, as well as doing an unbounded amount of
> > >>>>>>>>>>>>>>>>>>>
> > >>>>>>>>>>>>>>>>>> work
> > >>>>>>>>>
> > >>>>>>>>>> per
> > >>>>>>>>>>>>>>
> > >>>>>>>>>>>>>>> element.
> > >>>>>>>>>>>>>>>>>>
> > >>>>>>>>>>>>>>>>>>>
> > >>>>>>>>>>>>>>>>>>> This allows effectively replacing the current
> > >>>>>>>>>>>>>>>>>>>
> > >>>>>>>>>>>>>>>>>> Bounded/UnboundedSource
> > >>>>>>>>>>>>>>>>
> > >>>>>>>>>>>>>>>>> APIs
> > >>>>>>>>>>>>>>>>>>
> > >>>>>>>>>>>>>>>>>>> with DoFn's that are much easier to code, more
> scalable
> > >>>>>>>>>>>>>>>>>>>
> > >>>>>>>>>>>>>>>>>> and
> > >>>>>>>>
> > >>>>>>>>> composable
> > >>>>>>>>>>>>>>>>
> > >>>>>>>>>>>>>>>>> with
> > >>>>>>>>>>>>>>>>>>
> > >>>>>>>>>>>>>>>>>>> the rest of the Beam programming model, and enables
> > many
> > >>>>>>>>>>>>>>>>>>>
> > >>>>>>>>>>>>>>>>>> use
> > >>>>>>>>>
> > >>>>>>>>>> cases
> > >>>>>>>>>>>>>>
> > >>>>>>>>>>>>>>> that
> > >>>>>>>>>>>>>>>>
> > >>>>>>>>>>>>>>>>> were previously difficult or impossible, as well as
> some
> > >>>>>>>>>>>>>>>>>>>
> > >>>>>>>>>>>>>>>>>> non-obvious new
> > >>>>>>>>>>>>>>>>
> > >>>>>>>>>>>>>>>>> use cases.
> > >>>>>>>>>>>>>>>>>>>
> > >>>>>>>>>>>>>>>>>>> This proposal has been mentioned before in JIRA
> > >> [BEAM-65]
> > >>>>>>>>>>>>>>>>>>>
> > >>>>>>>>>>>>>>>>>> and
> > >>>>>>>>>
> > >>>>>>>>>> some
> > >>>>>>>>>>>>>>
> > >>>>>>>>>>>>>>> Beam
> > >>>>>>>>>>>>>>>>
> > >>>>>>>>>>>>>>>>> meetings, and now the whole thing is written up in a
> > >>>>>>>>>>>>>>>>>>>
> > >>>>>>>>>>>>>>>>>> document:
> > >>>>>>>>>>
> > >>>>>>>>>>>
> > >>>>>>>>>>>>>>>>>>>         https://s.apache.org/splittable-do-fn
> > >>>>>>>>>>>>>>>>>>>
> > >>>>>>>>>>>>>>>>>>> Here are some things that become possible with
> > >> Splittable
> > >>>>>>>>>>>>>>>>>>>
> > >>>>>>>>>>>>>>>>>> DoFn:
> > >>>>>>>>>>
> > >>>>>>>>>>> - Efficiently read a filepattern matching millions of
> > >>>>>>>>>>>>>>>>>>>
> > >>>>>>>>>>>>>>>>>> files
> > >>>>>>>>
> > >>>>>>>>> - Read a collection of files that are produced by an
> > >>>>>>>>>>>>>>>>>>>
> > >>>>>>>>>>>>>>>>>> earlier
> > >>>>>>>>>
> > >>>>>>>>>> step
> > >>>>>>>>>>>>>>
> > >>>>>>>>>>>>>>> in the
> > >>>>>>>>>>>>>>>>
> > >>>>>>>>>>>>>>>>> pipeline (e.g. easily implement a connector to a
> storage
> > >>>>>>>>>>>>>>>>>>>
> > >>>>>>>>>>>>>>>>>> system
> > >>>>>>>>>>
> > >>>>>>>>>>> that can
> > >>>>>>>>>>>>>>>>
> > >>>>>>>>>>>>>>>>> export itself to files)
> > >>>>>>>>>>>>>>>>>>> - Implement a Kafka reader by composing a "list
> > >>>>>>>>>>>>>>>>>>>
> > >>>>>>>>>>>>>>>>>> partitions"
> > >>>>>>>>
> > >>>>>>>>> DoFn
> > >>>>>>>>>>>>>>
> > >>>>>>>>>>>>>>> with a
> > >>>>>>>>>>>>>>>>
> > >>>>>>>>>>>>>>>>> DoFn that simply polls a consumer and outputs new
> records
> > >>>>>>>>>>>>>>>>>>>
> > >>>>>>>>>>>>>>>>>> in
> > >>>>>>>>>
> > >>>>>>>>>> a
> > >>>>>>>>>>
> > >>>>>>>>>>> while()
> > >>>>>>>>>>>>>>>>
> > >>>>>>>>>>>>>>>>> loop
> > >>>>>>>>>>>>>>>>>>
> > >>>>>>>>>>>>>>>>>>> - Implement a log tailer by composing a DoFn that
> > >>>>>>>>>>>>>>>>>>>
> > >>>>>>>>>>>>>>>>>> incrementally
> > >>>>>>>>>>
> > >>>>>>>>>>> returns
> > >>>>>>>>>>>>>>>>
> > >>>>>>>>>>>>>>>>> new
> > >>>>>>>>>>>>>>>>>>
> > >>>>>>>>>>>>>>>>>>> files in a directory and a DoFn that tails a file
> > >>>>>>>>>>>>>>>>>>> - Implement a parallel "count friends in common"
> > >>>>>>>>>>>>>>>>>>>
> > >>>>>>>>>>>>>>>>>> algorithm
> > >>>>>>>>
> > >>>>>>>>> (matrix
> > >>>>>>>>>>>>>>
> > >>>>>>>>>>>>>>> squaring) with good work balancing
> > >>>>>>>>>>>>>>>>>>>
> > >>>>>>>>>>>>>>>>>>> Here is the meaningful part of a hypothetical Kafka
> > >>>>>>>>>>>>>>>>>>>
> > >>>>>>>>>>>>>>>>>> reader
> > >>>>>>>>
> > >>>>>>>>> written
> > >>>>>>>>>>>>>>
> > >>>>>>>>>>>>>>> against
> > >>>>>>>>>>>>>>>>>>
> > >>>>>>>>>>>>>>>>>>> this API:
> > >>>>>>>>>>>>>>>>>>>
> > >>>>>>>>>>>>>>>>>>>     ProcessContinuation processElement(
> > >>>>>>>>>>>>>>>>>>>             ProcessContext context,
> OffsetRangeTracker
> > >>>>>>>>>>>>>>>>>>>
> > >>>>>>>>>>>>>>>>>> tracker)
> > >>>>>>>>>>
> > >>>>>>>>>>> {
> > >>>>>>>>>>>>>>
> > >>>>>>>>>>>>>>>       try (KafkaConsumer<String, String> consumer =
> > >>>>>>>>>>>>>>>>>>>
> >  Kafka.subscribe(context.element().topic,
> > >>>>>>>>>>>>>>>>>>>
> > >>>>>>>>>>>>>>>>>>>  context.element().partition)) {
> > >>>>>>>>>>
> > >>>>>>>>>>>         consumer.seek(tracker.start());
> > >>>>>>>>>>>>>>>>>>>         while (true) {
> > >>>>>>>>>>>>>>>>>>>           ConsumerRecords<String, String> records =
> > >>>>>>>>>>>>>>>>>>>
> > >>>>>>>>>>>>>>>>>> consumer.poll(100ms);
> > >>>>>>>>>>>>>>>>>>
> > >>>>>>>>>>>>>>>>>>>           if (records == null) return done();
> > >>>>>>>>>>>>>>>>>>>           for (ConsumerRecord<String, String> record
> :
> > >>>>>>>>>>>>>>>>>>>
> > >>>>>>>>>>>>>>>>>> records)
> > >>>>>>>>>>
> > >>>>>>>>>>> {
> > >>>>>>>>>>>>>>
> > >>>>>>>>>>>>>>>             if (!tracker.tryClaim(record.offset())) {
> > >>>>>>>>>>>>>>>>>>>               return
> > >>>>>>>>>>>>>>>>>>>
> > >>>>>>>>>>>>>>>>>>
> resume().withFutureOutputWatermark(record.timestamp());
> > >>>>>>>>>>>>>>>>>>
> > >>>>>>>>>>>>>>>>>>>             }
> > >>>>>>>>>>>>>>>>>>>             context.output(record);
> > >>>>>>>>>>>>>>>>>>>           }
> > >>>>>>>>>>>>>>>>>>>         }
> > >>>>>>>>>>>>>>>>>>>       }
> > >>>>>>>>>>>>>>>>>>>     }
> > >>>>>>>>>>>>>>>>>>>
> > >>>>>>>>>>>>>>>>>>> The document describes in detail the motivations
> behind
> > >>>>>>>>>>>>>>>>>>>
> > >>>>>>>>>>>>>>>>>> this
> > >>>>>>>>>
> > >>>>>>>>>> feature,
> > >>>>>>>>>>>>>>>>
> > >>>>>>>>>>>>>>>>> the
> > >>>>>>>>>>>>>>>>>>
> > >>>>>>>>>>>>>>>>>>> basic idea and API, open questions, and outlines an
> > >>>>>>>>>>>>>>>>>>>
> > >>>>>>>>>>>>>>>>>> incremental
> > >>>>>>>>>>
> > >>>>>>>>>>> delivery
> > >>>>>>>>>>>>>>>>
> > >>>>>>>>>>>>>>>>> plan.
> > >>>>>>>>>>>>>>>>>>>
> > >>>>>>>>>>>>>>>>>>> The proposed API builds on the reflection-based new
> > DoFn
> > >>>>>>>>>>>>>>>>>>>
> > >>>>>>>>>>>>>>>>>> [new-do-fn]
> > >>>>>>>>>>>>>>>>
> > >>>>>>>>>>>>>>>>> and is
> > >>>>>>>>>>>>>>>>>>
> > >>>>>>>>>>>>>>>>>>> loosely related to "State and Timers for DoFn"
> > >>>>>>>>>>>>>>>>>>>
> > >>>>>>>>>>>>>>>>>> [beam-state].
> > >>>>>>>>>
> > >>>>>>>>>>
> > >>>>>>>>>>>>>>>>>>> Please take a look and comment!
> > >>>>>>>>>>>>>>>>>>>
> > >>>>>>>>>>>>>>>>>>> Thanks.
> > >>>>>>>>>>>>>>>>>>>
> > >>>>>>>>>>>>>>>>>>> [BEAM-65]
> > https://issues.apache.org/jira/browse/BEAM-65
> > >>>>>>>>>>>>>>>>>>> [new-do-fn] https://s.apache.org/a-new-do-fn
> > >>>>>>>>>>>>>>>>>>> [beam-state] https://s.apache.org/beam-state
> > >>>>>>>>>>>>>>>>>>>
> > >>>>>>>>>>>>>>>>>>>
> > >>>>>>>>>>>>>>>>>> --
> > >>>>>>>>>>>>>>>>>> Jean-Baptiste Onofré
> > >>>>>>>>>>>>>>>>>> [email protected] <javascript:;>
> > >>>>>>>>>>>>>>>>>> http://blog.nanthrax.net
> > >>>>>>>>>>>>>>>>>> Talend - http://www.talend.com
> > >>>>>>>>>>>>>>>>>>
> > >>>>>>>>>>>>>>>>>>
> > >>>>>>>>>>>>>>>>>
> > >>>>>>>>>>>>>>>
> > >>>>>>>>>>>>>
> > >>>>>>>>>>>>
> > >>>>>>>>>>
> > >>>>>>>>>
> > >>>>>>>>> --
> > >>>>>>>>> Thanks,
> > >>>>>>>>> Andrew
> > >>>>>>>>>
> > >>>>>>>>> Subscribe to my book: Streaming Data <
> http://manning.com/psaltis
> > >
> > >>>>>>>>> <https://www.linkedin.com/pub/andrew-psaltis/1/17b/306>
> > >>>>>>>>> twiiter: @itmdata <
> > >>> http://twitter.com/intent/user?screen_name=itmdata
> > >>>>>>>>>>
> > >>>>>>>>>
> > >>>>>>>>>
> > >>>>>>>>
> > >>>>>>>
> > >>>>>> --
> > >>>>>> Jean-Baptiste Onofré
> > >>>>>> [email protected]
> > >>>>>> http://blog.nanthrax.net
> > >>>>>> Talend - http://www.talend.com
> > >>>>>>
> > >>>>>>
> > >>>>>
> > >>>> --
> > >>>> Jean-Baptiste Onofré
> > >>>> [email protected]
> > >>>> http://blog.nanthrax.net
> > >>>> Talend - http://www.talend.com
> > >>>>
> > >>>
> > >>
> > >
> >
> > --
> > Jean-Baptiste Onofré
> > [email protected]
> > http://blog.nanthrax.net
> > Talend - http://www.talend.com
> >
>

Reply via email to