Hi everyone

Is there any repository where one can track all proposals, something like Flink 
does with this wiki [1]?

[1] 
https://cwiki.apache.org/confluence/display/FLINK/Flink+Improvement+Proposals 
<https://cwiki.apache.org/confluence/display/FLINK/Flink+Improvement+Proposals>

Thanks
Ovidiu

> On 29 Aug 2016, at 12:01, Jean-Baptiste Onofré <[email protected]> wrote:
> 
> Hi Aljoscha,
> 
> Indeed, it's something we discussed during our call.
> 
> AFAIU, it's one function of the tracker. When doing the tracker tryClaim 
> (with offset, partition id, or any kind of tracked "ID"), if the claim is not 
> possible, then we will update the watermark.
> 
> So the tracker is useful to determine the "split" and also to deal with 
> watermark.
> 
> Regards
> JB
> 
> On 08/29/2016 11:55 AM, Aljoscha Krettek wrote:
>> Hi,
>> I have another question about this: currently, unbounded sources have
>> special logic for determining the watermark and the system periodically
>> asks the sources for the current watermark. As I understood it, watermarks
>> are only "generated" at the sources. How will this work when sources are
>> implemented as a combination of DoFns and SplittableDoFns? Will
>> SplittableDoFns be asked for a watermark, does this mean that watermarks
>> can then be "generated" at any operation?
>> 
>> Cheers,
>> Aljoscha
>> 
>> On Sun, 21 Aug 2016 at 22:34 Eugene Kirpichov <[email protected]>
>> wrote:
>> 
>>> Hi JB,
>>> 
>>> Yes, I'm assuming you're referring to the "magic" part on the transform
>>> expansion diagram. This is indeed runner-specific, and timers+state are
>>> likely the simplest way to do this for an SDF that does unbounded amount of
>>> work.
>>> 
>>> On Sun, Aug 21, 2016 at 12:14 PM Jean-Baptiste Onofré <[email protected]>
>>> wrote:
>>> 
>>>> Anyway, from a runner perspective, we will have kind of API (part of the
>>>> Runner API) to "orchestrate" the SDF as we discussed during the call,
>>>> right ?
>>>> 
>>>> Regards
>>>> JB
>>>> 
>>>> On 08/21/2016 07:24 PM, Eugene Kirpichov wrote:
>>>>> Hi Aljoscha,
>>>>> This is an excellent question! And the answer is, we don't need any new
>>>>> concepts like "SDF executor" and can rely on the per-key state and
>>> timers
>>>>> machinery that already exists in all runners because it's necessary to
>>>>> implement windowing/triggering properly.
>>>>> 
>>>>> Note that this is already somewhat addressed in the previously posted
>>>> State
>>>>> and Timers proposal https://s.apache.org/beam-state , under "per-key
>>>>> workflows".
>>>>> 
>>>>> Think of it this way, using the Kafka example: we'll expand it into a
>>>>> transform:
>>>>> 
>>>>> (1) ParDo { topic -> (unique key, topic, partition, [0, inf))) for
>>>>> partition in topic.listPartitions() }
>>>>> (2) GroupByKey
>>>>> (3) ParDo { key, topic, partition, R -> Kafka reader code in the
>>>>> proposal/slides }
>>>>>  - R is the OffsetRange restriction which in this case will be always
>>> of
>>>>> the form [startOffset, inf).
>>>>>  - there'll be just 1 value per key, but we use GBK to just get access
>>>> to
>>>>> the per-key state/timers machinery. This may be runner-specific; maybe
>>>> some
>>>>> runners don't need a GBK to do that.
>>>>> 
>>>>> Now suppose the topic has two partitions, P1 and P2, and they get
>>>> assigned
>>>>> unique keys K1, K2.
>>>>> Then the input to (3) will be a collection of: (K1, topic, P1, [0,
>>> inf)),
>>>>> (K2, topic, P2, [0, inf)).
>>>>> Suppose we have just 1 worker with just 1 thread. Now, how will this
>>>> thread
>>>>> be able to produce elements from both P1 and P2? here's how.
>>>>> 
>>>>> The thread will process (K1, topic, P1, [0, inf)), checkpoint after a
>>>>> certain time or after a certain number of elements are output (just
>>> like
>>>>> with the current UnboundedSource reading code) producing a residual
>>>>> restriction R1' (basically a new start timestamp), put R11 into the
>>>> per-key
>>>>> state and set a timer T1 to resume.
>>>>> Then it will process (K2, topic, P2, [0, inf)), do the same producing a
>>>>> residual restriction R2' and setting a timer T2 to resume.
>>>>> Then timer T1 will fire in the context of the key K1. The thread will
>>>> call
>>>>> processElement again, this time supplying R1' as the restriction; the
>>>>> process repeats and after a while it checkpoints and stores R1'' into
>>>> state
>>>>> of K1.
>>>>> Then timer T2 will fire in the context of K2, run processElement for a
>>>>> while, set a new timer and store R2'' into the state of K2.
>>>>> Etc.
>>>>> If partition 1 goes away, the processElement call will return "do not
>>>>> resume", so a timer will not be set and instead the state associated
>>> with
>>>>> K1 will be GC'd.
>>>>> 
>>>>> So basically it's almost like cooperative thread scheduling: things run
>>>> for
>>>>> a while, until the runner tells them to checkpoint, then they set a
>>> timer
>>>>> to resume themselves, and the runner fires the timers, and the process
>>>>> repeats. And, again, this only requires things that runners can already
>>>> do
>>>>> - state and timers, but no new concept of SDF executor (and
>>> consequently
>>>> no
>>>>> necessity to choose/tune how many you need).
>>>>> 
>>>>> Makes sense?
>>>>> 
>>>>> On Sat, Aug 20, 2016 at 9:46 AM Aljoscha Krettek <[email protected]>
>>>>> wrote:
>>>>> 
>>>>>> Hi,
>>>>>> I have another question that I think wasn't addressed in the meeting.
>>> At
>>>>>> least it wasn't mentioned in the notes.
>>>>>> 
>>>>>> In the context of replacing sources by a combination of to SDFs, how
>>> do
>>>> you
>>>>>> determine how many "SDF executor" instances you need downstream? For
>>> the
>>>>>> sake of argument assume that both SDFs are executed with parallelism 1
>>>> (or
>>>>>> one per worker). Now, if you have a file source that reads from a
>>> static
>>>>>> set of files the first SDF would emit the filenames while the second
>>> SDF
>>>>>> would receive the filenames and emit their contents. This works well
>>> and
>>>>>> the downstream SDF can process one filename after the other. Now,
>>> think
>>>> of
>>>>>> something like a Kafka source. The first SDF would emit the partitions
>>>> (say
>>>>>> 4 partitions, in this example) and the second SDF would be responsible
>>>> for
>>>>>> reading from a topic and emitting elements. Reading from one topic
>>> never
>>>>>> finishes so you can't process the topics in series. I think you would
>>>> need
>>>>>> to have 4 downstream "SDF executor" instances. The question now is:
>>> how
>>>> do
>>>>>> you determine whether you are in the first or the second situation?
>>>>>> 
>>>>>> Probably I'm just overlooking something and this is already dealt with
>>>>>> somewhere... :-)
>>>>>> 
>>>>>> Cheers,
>>>>>> Aljoscha
>>>>>> 
>>>>>> On Fri, 19 Aug 2016 at 21:02 Ismaël Mejía <[email protected]> wrote:
>>>>>> 
>>>>>>> Hello,
>>>>>>> 
>>>>>>> Thanks for the notes both Dan and Eugene, and for taking the time to
>>> do
>>>>>> the
>>>>>>> presentation and  answer our questions.
>>>>>>> 
>>>>>>> I mentioned the ongoing work on dynamic scaling on Flink because I
>>>>>> suppose
>>>>>>> that it will address dynamic rebalancing eventually (there are
>>> multiple
>>>>>>> changes going on for dynamic scaling).
>>>>>>> 
>>>>>>> 
>>>>>>> 
>>>>>> 
>>>> 
>>> https://docs.google.com/document/d/1G1OS1z3xEBOrYD4wSu-LuBCyPUWyFd9l3T9WyssQ63w/edit#heading=h.2suhu1fjxmp4
>>>>>>> 
>>>>>>> 
>>> https://lists.apache.org/[email protected]:lte=1M:FLIP-8
>>>>>>> 
>>>>>>> Anyway I am far from an expert on flink, but probably the flink guys
>>>> can
>>>>>>> give their opinion about this and refer to a more precise document
>>> that
>>>>>> the
>>>>>>> ones I mentioned..
>>>>>>> 
>>>>>>> ​Thanks again,
>>>>>>> Ismaël​
>>>>>>> 
>>>>>>> On Fri, Aug 19, 2016 at 8:52 PM, Jean-Baptiste Onofré <
>>> [email protected]
>>>>> 
>>>>>>> wrote:
>>>>>>> 
>>>>>>>> Great summary Eugene and Dan.
>>>>>>>> 
>>>>>>>> And thanks again for the details, explanation, and discussion.
>>>>>>>> 
>>>>>>>> Regards
>>>>>>>> JB
>>>>>>>> 
>>>>>>>> 
>>>>>>>> On 08/19/2016 08:16 PM, Eugene Kirpichov wrote:
>>>>>>>> 
>>>>>>>>> Thanks for attending, everybody!
>>>>>>>>> 
>>>>>>>>> Here are meeting notes (thanks Dan!).
>>>>>>>>> 
>>>>>>>>> Q: Will SplittableDoFn enable better repartitioning of the
>>>>>> input/output
>>>>>>>>> data?
>>>>>>>>> A: Not really; repartitioning is orthogonal to SDF.
>>>>>>>>> 
>>>>>>>>> Current Source API suffers from lack of composition and scalability
>>>>>>>>> because
>>>>>>>>> we treat sources too much as metadata, not enough as data.
>>>>>>>>> 
>>>>>>>>> Q(slide with transform expansion): who does the "magic"?
>>>>>>>>> A: The runner. Checkpointing and dynamically splitting restrictions
>>>>>> will
>>>>>>>>> require collaboration with the runner.
>>>>>>>>> 
>>>>>>>>> Q: How does the runner interact with the DoFn to control the
>>>>>>> restrictions?
>>>>>>>>> Is it related to the centralized job tracker etc.?
>>>>>>>>> A: RestrictionTracker is a simple helper object, that exists purely
>>>> on
>>>>>>> the
>>>>>>>>> worker while executing a single partition, and interacts with the
>>>>>> worker
>>>>>>>>> harness part of the runner. Not to be confused with the centralized
>>>>>> job
>>>>>>>>> tracker (master) - completely unrelated. Worker harness, of course,
>>>>>>>>> interacts with the master in some relevant ways (e.g. Dataflow
>>> master
>>>>>>> can
>>>>>>>>> tell "you're a straggler, you should split").
>>>>>>>>> 
>>>>>>>>> Q: Is this a new DoFn subclass, or how will this integrate with the
>>>>>>>>> existing code?
>>>>>>>>> A: It's a feature of reflection-based DoFn (
>>>>>>> https://s.apache.org/a-new-do
>>>>>>>>> fn)
>>>>>>>>> - just another optional parameter of type RestrictionTracker to
>>>>>>>>> processElement() which is dynamically bound via reflection, so
>>> fully
>>>>>>>>> backward/forward compatible, and looks to users like a regular
>>> DoFn.
>>>>>>>>> 
>>>>>>>>> Q: why is fractionClaimed a double?
>>>>>>>>> A: It's a number in 0..1. Job-wide magic (e.g. autoscaling, dynamic
>>>>>>>>> rebalancing) requires a uniform way to represent progress through
>>>>>>>>> different
>>>>>>>>> sources.
>>>>>>>>> 
>>>>>>>>> Q: Spark runner is microbatch-based, so this seems to map well onto
>>>>>>>>> checkpoint/resume, right?
>>>>>>>>> A: Yes; actually the Dataflow runner is, at a worker level, also
>>>>>>>>> microbatch-based. The way SDF interacts with a runner will be very
>>>>>>> similar
>>>>>>>>> to how a Bounded/UnboundedSource interacts with a runner.
>>>>>>>>> 
>>>>>>>>> Q: Using SDF, what would be the "packaging" of the IO?
>>>>>>>>> A: Same as currently: package IO's as PTransforms and their
>>>>>>> implementation
>>>>>>>>> under the hood can be anything: Source, simple ParDo's, SDF, etc.
>>>> E.g.
>>>>>>>>> Datastore was recently refactored from BoundedSource to ParDo
>>> (ended
>>>>>> up
>>>>>>>>> simpler and more scalable), transparently to users.
>>>>>>>>> 
>>>>>>>>> Q: What's the timeline; what to do with the IOs currently in
>>>>>>> development?
>>>>>>>>> A: Timeline is O(months). Keep doing what you're doing and working
>>> on
>>>>>>> top
>>>>>>>>> of Source APIs when necessary and simple ParDo's otherwise.
>>>>>>>>> 
>>>>>>>>> Q: What's the impact for the runner writers?
>>>>>>>>> A: Tentatively expected that most of the code for running an SDF
>>> will
>>>>>> be
>>>>>>>>> common to runners, with some amount of per-runner glue code, just
>>>> like
>>>>>>>>> GBK/windowing/triggering. Impact on Dataflow runner is larger since
>>>> it
>>>>>>>>> supports dynamic rebalancing in batch mode and this is the hardest
>>>>>> part,
>>>>>>>>> but for other runners shouldn't be too hard.
>>>>>>>>> 
>>>>>>>>> JB: Talend has people who can help with this: e.g. help integrate
>>>> into
>>>>>>>>> Spark runner, refactor IOs etc. Amit also willing to chat about
>>>>>>> supporting
>>>>>>>>> SDF in Spark runner.
>>>>>>>>> 
>>>>>>>>> Ismael: There's a Flink proposal about dynamic rebalancing. Ismael
>>>>>> will
>>>>>>>>> send a link.
>>>>>>>>> 
>>>>>>>>> 
>>>>>>>>> On Fri, Aug 19, 2016 at 7:21 AM Jean-Baptiste Onofré <
>>>> [email protected]
>>>>>>> 
>>>>>>>>> wrote:
>>>>>>>>> 
>>>>>>>>> Hi Eugene,
>>>>>>>>>> 
>>>>>>>>>> thanks for the reminder.
>>>>>>>>>> 
>>>>>>>>>> Just to prepare some topics for the call, please find some points:
>>>>>>>>>> 
>>>>>>>>>> 1. Using SDF, what would be the "packaging" of the IO ? It sounds
>>> to
>>>>>> me
>>>>>>>>>> that we can keep the IO packaging style (using with* setters for
>>> the
>>>>>> IO
>>>>>>>>>> configuration) and replace PTransform, Source, Reader, ...
>>> directly
>>>>>>> with
>>>>>>>>>> SDF. Correct ?
>>>>>>>>>> 
>>>>>>>>>> 2. What's your plan in term of release to include SDF ? We have
>>>>>> several
>>>>>>>>>> IOs in preparation and I wonder if it's worth to start to use the
>>>> new
>>>>>>>>>> SDF API or not.
>>>>>>>>>> 
>>>>>>>>>> 3. What's the impact for the runner writers ? The runners will
>>> have
>>>>>> to
>>>>>>>>>> support SDF, that could be tricky depending of the execution
>>> engine.
>>>>>> In
>>>>>>>>>> the worst case where the runner can't fully support SDF, does it
>>>> mean
>>>>>>>>>> that most of our IOs will be useless ?
>>>>>>>>>> 
>>>>>>>>>> Just my dumb topics ;)
>>>>>>>>>> 
>>>>>>>>>> Thanks,
>>>>>>>>>> See you at 8am !
>>>>>>>>>> 
>>>>>>>>>> Regards
>>>>>>>>>> JB
>>>>>>>>>> 
>>>>>>>>>> On 08/19/2016 02:29 AM, Eugene Kirpichov wrote:
>>>>>>>>>> 
>>>>>>>>>>> Hello everybody,
>>>>>>>>>>> 
>>>>>>>>>>> Just a reminder:
>>>>>>>>>>> 
>>>>>>>>>>> The meeting is happening tomorrow - Friday Aug 19th, 8am-9am PST,
>>>> to
>>>>>>>>>>> join
>>>>>>>>>>> the call go to
>>>>>>>>>>> https://hangouts.google.com/hangouts/_/google.com/splittabledofn
>>> .
>>>>>>>>>>> I intend to go over the proposed design and then have a free-form
>>>>>>>>>>> discussion.
>>>>>>>>>>> 
>>>>>>>>>>> Please have a skim through the proposal doc:
>>> https://s.apache.org/
>>>>>>>>>>> splittable-do-fn
>>>>>>>>>>> I also made some slides that are basically a trimmed-down version
>>>> of
>>>>>>> the
>>>>>>>>>>> doc to use as a guide when conducting the meeting,
>>>>>>>>>>> 
>>>>>>>>>>> https://docs.google.com/presentation/d/1DfCp9VRLH-5Ap5nbz047
>>>>>>>>>> Fiv3NfjrywJVSdef5dRY4aE/edit?usp=sharing
>>>>>>>>>> 
>>>>>>>>>>> .
>>>>>>>>>>> 
>>>>>>>>>>> I will post notes from the meeting on this thread afterwards.
>>>>>>>>>>> 
>>>>>>>>>>> Thanks, looking forward.
>>>>>>>>>>> 
>>>>>>>>>>> On Fri, Aug 12, 2016 at 5:35 PM Dan Halperin
>>>>>>>>>>> <[email protected]
>>>>>>>>>>> 
>>>>>>>>>>> wrote:
>>>>>>>>>>> 
>>>>>>>>>>> This is pretty cool! I'll be there too. (unless the hangout gets
>>>> too
>>>>>>>>>>>> 
>>>>>>>>>>> full
>>>>>>>>>> 
>>>>>>>>>>> -- if so, I'll drop out in favor of others who aren't lucky
>>> enough
>>>>>> to
>>>>>>>>>>>> 
>>>>>>>>>>> get
>>>>>>>>>> 
>>>>>>>>>>> to talk to Eugene all the time.)
>>>>>>>>>>>> 
>>>>>>>>>>>> On Fri, Aug 12, 2016 at 4:03 PM, Andrew Psaltis <
>>>>>>>>>>>> 
>>>>>>>>>>> [email protected]>
>>>>>>>>>> 
>>>>>>>>>>> wrote:
>>>>>>>>>>>> 
>>>>>>>>>>>> +1 I'll join
>>>>>>>>>>>>> 
>>>>>>>>>>>>> On Friday, August 12, 2016, Aparup Banerjee (apbanerj) <
>>>>>>>>>>>>> 
>>>>>>>>>>>> [email protected]
>>>>>>>>>>>> 
>>>>>>>>>>>>> 
>>>>>>>>>>>>>> wrote:
>>>>>>>>>>>>> 
>>>>>>>>>>>>> + 1, me2
>>>>>>>>>>>>>> 
>>>>>>>>>>>>>> 
>>>>>>>>>>>>>> 
>>>>>>>>>>>>>> 
>>>>>>>>>>>>>> On 8/12/16, 9:27 AM, "Amit Sela" <[email protected]
>>>>>>>>>>>>>> 
>>>>>>>>>>>>> <javascript:;>>
>>>>>>>>>> 
>>>>>>>>>>> wrote:
>>>>>>>>>>>>>> 
>>>>>>>>>>>>>> +1 as in I'll join ;-)
>>>>>>>>>>>>>>> 
>>>>>>>>>>>>>>> On Fri, Aug 12, 2016, 19:14 Eugene Kirpichov
>>>>>>>>>>>>>>> 
>>>>>>>>>>>>>> <[email protected]
>>>>>>>>>>>>> 
>>>>>>>>>>>>>> 
>>>>>>>>>>>>>>> wrote:
>>>>>>>>>>>>>>> 
>>>>>>>>>>>>>>> Sounds good, thanks!
>>>>>>>>>>>>>>>> Then Friday Aug 19th it is, 8am-9am PST,
>>>>>>>>>>>>>>>> https://staging.talkgadget.google.com/hangouts/_/google.
>>>>>>>>>>>>>>>> 
>>>>>>>>>>>>>>> com/splittabledofn
>>>>>>>>>>>>>> 
>>>>>>>>>>>>>>> 
>>>>>>>>>>>>>>>> On Thu, Aug 11, 2016 at 11:12 PM Jean-Baptiste Onofré <
>>>>>>>>>>>>>>>> 
>>>>>>>>>>>>>>> [email protected]
>>>>>>>>>>>>> 
>>>>>>>>>>>>>> <javascript:;>>
>>>>>>>>>>>>>> 
>>>>>>>>>>>>>>> wrote:
>>>>>>>>>>>>>>>> 
>>>>>>>>>>>>>>>> Hi
>>>>>>>>>>>>>>>>> 
>>>>>>>>>>>>>>>>> Unfortunately I will be in Ireland on August 15th. What
>>> about
>>>>>>>>>>>>>>>>> 
>>>>>>>>>>>>>>>> Friday
>>>>>>>>>>>> 
>>>>>>>>>>>>> 19th ?
>>>>>>>>>>>>>>>> 
>>>>>>>>>>>>>>>>> 
>>>>>>>>>>>>>>>>> Regards
>>>>>>>>>>>>>>>>> JB
>>>>>>>>>>>>>>>>> 
>>>>>>>>>>>>>>>>> 
>>>>>>>>>>>>>>>>> 
>>>>>>>>>>>>>>>>> On Aug 11, 2016, 23:22, at 23:22, Eugene Kirpichov
>>>>>>>>>>>>>>>>> <[email protected]> wrote:
>>>>>>>>>>>>>>>>> 
>>>>>>>>>>>>>>>>>> Hi JB,
>>>>>>>>>>>>>>>>>> 
>>>>>>>>>>>>>>>>>> Sounds great, does the suggested time over videoconference
>>>>>> work
>>>>>>>>>>>>>>>>>> 
>>>>>>>>>>>>>>>>> for
>>>>>>>>>>>> 
>>>>>>>>>>>>> you?
>>>>>>>>>>>>>>>>>> 
>>>>>>>>>>>>>>>>>> On Thu, Aug 11, 2016 at 11:59 AM Jean-Baptiste Onofré <
>>>>>>>>>>>>>>>>>> 
>>>>>>>>>>>>>>>>> [email protected] <javascript:;>>
>>>>>>>>>>>>>> 
>>>>>>>>>>>>>>> wrote:
>>>>>>>>>>>>>>>>>> 
>>>>>>>>>>>>>>>>>> Hi Eugene
>>>>>>>>>>>>>>>>>>> 
>>>>>>>>>>>>>>>>>>> May we talk together next week ? I like the proposal. I
>>>>>> would
>>>>>>>>>>>>>>>>>>> 
>>>>>>>>>>>>>>>>>> just
>>>>>>>>>>>>> 
>>>>>>>>>>>>>> need
>>>>>>>>>>>>>>>>>> 
>>>>>>>>>>>>>>>>>>> some details for my understanding.
>>>>>>>>>>>>>>>>>>> 
>>>>>>>>>>>>>>>>>>> Thanks
>>>>>>>>>>>>>>>>>>> Regards
>>>>>>>>>>>>>>>>>>> JB
>>>>>>>>>>>>>>>>>>> 
>>>>>>>>>>>>>>>>>>> 
>>>>>>>>>>>>>>>>>>> 
>>>>>>>>>>>>>>>>>>> On Aug 11, 2016, 19:46, at 19:46, Eugene Kirpichov
>>>>>>>>>>>>>>>>>>> <[email protected]> wrote:
>>>>>>>>>>>>>>>>>>> 
>>>>>>>>>>>>>>>>>>>> Hi JB,
>>>>>>>>>>>>>>>>>>>> 
>>>>>>>>>>>>>>>>>>>> What are your thoughts on this?
>>>>>>>>>>>>>>>>>>>> 
>>>>>>>>>>>>>>>>>>>> I'm also thinking of having a virtual meeting to explain
>>>>>> more
>>>>>>>>>>>>>>>>>>>> 
>>>>>>>>>>>>>>>>>>> about
>>>>>>>>>>>>>> 
>>>>>>>>>>>>>>> this
>>>>>>>>>>>>>>>>>>>> proposal if necessary, since I understand it is a lot to
>>>>>>>>>>>>>>>>>>>> 
>>>>>>>>>>>>>>>>>>> digest.
>>>>>>>>>>>> 
>>>>>>>>>>>>> 
>>>>>>>>>>>>>>>>>>>> How about: Monday Aug 15, 8am-9am Pacific time, over
>>>>>>> Hangouts?
>>>>>>>>>>>>>>>>>>>> (link:
>>>>>>>>>>>>>>>>>>>> 
>>>>>>>>>>>>>>>>>>>> 
>>>>>>>>>>>>>>>>>>> 
>>>>>>>>>>>>>>>>>> 
>>>>>>>>>>>>>>>>> https://staging.talkgadget.google.com/hangouts/_/google.
>>>>>>>>>>>>>>>> 
>>>>>>>>>>>>>>> com/splittabledofn
>>>>>>>>>>>>>> 
>>>>>>>>>>>>>>> -
>>>>>>>>>>>>>>>>>>>> I confirmed that it can be joined without being logged
>>>>>> into a
>>>>>>>>>>>>>>>>>>>> 
>>>>>>>>>>>>>>>>>>> Google
>>>>>>>>>>>>>> 
>>>>>>>>>>>>>>> account)
>>>>>>>>>>>>>>>>>>>> 
>>>>>>>>>>>>>>>>>>>> Who'd be interested in attending, and does this
>>> time/date
>>>>>>> work
>>>>>>>>>>>>>>>>>>>> 
>>>>>>>>>>>>>>>>>>> for
>>>>>>>>>>>>> 
>>>>>>>>>>>>>> people?
>>>>>>>>>>>>>>>>>>>> 
>>>>>>>>>>>>>>>>>>>> On Fri, Aug 5, 2016 at 10:48 AM Eugene Kirpichov
>>>>>>>>>>>>>>>>>>>> 
>>>>>>>>>>>>>>>>>>> <[email protected] <javascript:;>>
>>>>>>>>>>>>>>>>>> 
>>>>>>>>>>>>>>>>>>> wrote:
>>>>>>>>>>>>>>>>>>>> 
>>>>>>>>>>>>>>>>>>>> Hi JB, thanks for reading and for your comments!
>>>>>>>>>>>>>>>>>>>>> 
>>>>>>>>>>>>>>>>>>>>> It sounds like you are concerned about continued
>>> support
>>>>>> for
>>>>>>>>>>>>>>>>>>>>> 
>>>>>>>>>>>>>>>>>>>> existing
>>>>>>>>>>>>>>>>>> 
>>>>>>>>>>>>>>>>>>> IO's
>>>>>>>>>>>>>>>>>>>> 
>>>>>>>>>>>>>>>>>>>>> people have developed, and about backward
>>> compatibility?
>>>>>>>>>>>>>>>>>>>>> 
>>>>>>>>>>>>>>>>>>>>> We do not need to remove the Source API, and all
>>> existing
>>>>>>>>>>>>>>>>>>>>> 
>>>>>>>>>>>>>>>>>>>> Source-based
>>>>>>>>>>>>>>>>>>>> 
>>>>>>>>>>>>>>>>>>>>> connectors will continue to work [though the document
>>>>>>>>>>>>>>>>>>>>> 
>>>>>>>>>>>>>>>>>>>> proposes
>>>>>>>>>>>> 
>>>>>>>>>>>>> at
>>>>>>>>>>>>>> 
>>>>>>>>>>>>>>> some
>>>>>>>>>>>>>>>>>>>> 
>>>>>>>>>>>>>>>>>>>>> point to make Read.from(Source) to translate to a
>>> wrapper
>>>>>>>>>>>>>>>>>>>>> 
>>>>>>>>>>>>>>>>>>>> SDF
>>>>>>>>>>>> 
>>>>>>>>>>>>> under
>>>>>>>>>>>>>>>>>> 
>>>>>>>>>>>>>>>>>>> the
>>>>>>>>>>>>>>>>>>>> 
>>>>>>>>>>>>>>>>>>>>> hood, to exercise the feature more and to make sure
>>> that
>>>>>> it
>>>>>>>>>>>>>>>>>>>>> 
>>>>>>>>>>>>>>>>>>>> is
>>>>>>>>>>>> 
>>>>>>>>>>>>> strictly
>>>>>>>>>>>>>>>>>>>> 
>>>>>>>>>>>>>>>>>>>>> more powerful - but this is an optional implementation
>>>>>>>>>>>>>>>>>>>>> 
>>>>>>>>>>>>>>>>>>>> detail].
>>>>>>>>>>>>> 
>>>>>>>>>>>>>> 
>>>>>>>>>>>>>>>>>>>>> Perhaps the document phrases this too strongly -
>>>>>> "replacing
>>>>>>>>>>>>>>>>>>>>> 
>>>>>>>>>>>>>>>>>>>> the
>>>>>>>>>>>>> 
>>>>>>>>>>>>>> Source
>>>>>>>>>>>>>>>>>>>> 
>>>>>>>>>>>>>>>>>>>>> API": a better phrasing would be "introducing a new API
>>>> so
>>>>>>>>>>>>>>>>>>>>> 
>>>>>>>>>>>>>>>>>>>> powerful
>>>>>>>>>>>>>>>>>> 
>>>>>>>>>>>>>>>>>>> and
>>>>>>>>>>>>>>>>>>>> 
>>>>>>>>>>>>>>>>>>>>> easy-to-use that hopefully people will choose it over
>>> the
>>>>>>>>>>>>>>>>>>>>> 
>>>>>>>>>>>>>>>>>>>> Source
>>>>>>>>>>>>> 
>>>>>>>>>>>>>> API
>>>>>>>>>>>>>>>>>> 
>>>>>>>>>>>>>>>>>>> all
>>>>>>>>>>>>>>>>>>>> 
>>>>>>>>>>>>>>>>>>>>> the time, even though they don't have to" :) And we can
>>>>>>>>>>>>>>>>>>>>> 
>>>>>>>>>>>>>>>>>>>> discuss
>>>>>>>>>>>>> 
>>>>>>>>>>>>>> whether or
>>>>>>>>>>>>>>>>>>>> 
>>>>>>>>>>>>>>>>>>>>> not to actually deprecate/remove the Source API at some
>>>>>>>>>>>>>>>>>>>>> 
>>>>>>>>>>>>>>>>>>>> point
>>>>>>>>>>>> 
>>>>>>>>>>>>> down
>>>>>>>>>>>>>> 
>>>>>>>>>>>>>>> the
>>>>>>>>>>>>>>>>>>>> 
>>>>>>>>>>>>>>>>>>>>> road, once it becomes clear whether this is the case or
>>>>>> not.
>>>>>>>>>>>>>>>>>>>>> 
>>>>>>>>>>>>>>>>>>>>> To give more context: this proposal came out of
>>>>>> discussions
>>>>>>>>>>>>>>>>>>>>> 
>>>>>>>>>>>>>>>>>>>> within
>>>>>>>>>>>>>> 
>>>>>>>>>>>>>>> the SDK
>>>>>>>>>>>>>>>>>>>> 
>>>>>>>>>>>>>>>>>>>>> team over the past ~1.5 years, before the Beam project
>>>>>>>>>>>>>>>>>>>>> 
>>>>>>>>>>>>>>>>>>>> existed,
>>>>>>>>>>>>> 
>>>>>>>>>>>>>> on
>>>>>>>>>>>>>> 
>>>>>>>>>>>>>>> how to
>>>>>>>>>>>>>>>>>>>> 
>>>>>>>>>>>>>>>>>>>>> make major improvements to the Source API; perhaps it
>>>> will
>>>>>>>>>>>>>>>>>>>>> 
>>>>>>>>>>>>>>>>>>>> clarify
>>>>>>>>>>>>>> 
>>>>>>>>>>>>>>> things
>>>>>>>>>>>>>>>>>>>> 
>>>>>>>>>>>>>>>>>>>>> if I give a history of the ideas discussed:
>>>>>>>>>>>>>>>>>>>>> - The first idea was to introduce a
>>>>>>>>>>>>>>>>>>>>> 
>>>>>>>>>>>>>>>>>>>> Read.from(PCollection<Source>)
>>>>>>>>>>>>>> 
>>>>>>>>>>>>>>> transform while keeping the Source API intact - this, given
>>>>>>>>>>>>>>>>>>>>> 
>>>>>>>>>>>>>>>>>>>> appropriate
>>>>>>>>>>>>>>>>>>>> 
>>>>>>>>>>>>>>>>>>>>> implementation, would solve most of the scalability and
>>>>>>>>>>>>>>>>>>>>> 
>>>>>>>>>>>>>>>>>>>> composability
>>>>>>>>>>>>>>>>>> 
>>>>>>>>>>>>>>>>>>> issues of IO's. Then most connectors would look like :
>>>>>>>>>>>>>>>>>>>>> 
>>>>>>>>>>>>>>>>>>>> ParDo<A,
>>>>>>>>>>>>> 
>>>>>>>>>>>>>> Source<B>>
>>>>>>>>>>>>>>>>>>>> 
>>>>>>>>>>>>>>>>>>>>> + Read.from().
>>>>>>>>>>>>>>>>>>>>> - Then we figured that the Source class is an
>>> unnecessary
>>>>>>>>>>>>>>>>>>>>> 
>>>>>>>>>>>>>>>>>>>> abstraction, as
>>>>>>>>>>>>>>>>>>>> 
>>>>>>>>>>>>>>>>>>>>> it simply holds data. What if we only had a Reader<S,
>>> B>
>>>>>>>>>>>>>>>>>>>>> 
>>>>>>>>>>>>>>>>>>>> class
>>>>>>>>>>>> 
>>>>>>>>>>>>> where
>>>>>>>>>>>>>>>>>> 
>>>>>>>>>>>>>>>>>>> S is
>>>>>>>>>>>>>>>>>>>> 
>>>>>>>>>>>>>>>>>>>>> the source type and B the output type? Then connectors
>>>>>> would
>>>>>>>>>>>>>>>>>>>>> 
>>>>>>>>>>>>>>>>>>>> be
>>>>>>>>>>>>> 
>>>>>>>>>>>>>> something
>>>>>>>>>>>>>>>>>>>> 
>>>>>>>>>>>>>>>>>>>>> like: ParDo<A, S> + hypothetical Read.using(Reader<S,
>>>> B>).
>>>>>>>>>>>>>>>>>>>>> - Then somebody remarked that some of the features of
>>>>>> Source
>>>>>>>>>>>>>>>>>>>>> 
>>>>>>>>>>>>>>>>>>>> are
>>>>>>>>>>>>> 
>>>>>>>>>>>>>> useful to
>>>>>>>>>>>>>>>>>>>> 
>>>>>>>>>>>>>>>>>>>>> ParDo's as well: e.g. ability to report progress when
>>>>>>>>>>>>>>>>>>>>> 
>>>>>>>>>>>>>>>>>>>> processing a
>>>>>>>>>>>>>> 
>>>>>>>>>>>>>>> very
>>>>>>>>>>>>>>>>>>>> 
>>>>>>>>>>>>>>>>>>>>> heavy element, or ability to produce very large output
>>> in
>>>>>>>>>>>>>>>>>>>>> 
>>>>>>>>>>>>>>>>>>>> parallel.
>>>>>>>>>>>>>>>>>> 
>>>>>>>>>>>>>>>>>>> - The two previous bullets were already hinting that the
>>>>>>>>>>>>>>>>>>>>> 
>>>>>>>>>>>>>>>>>>>> Read.using()
>>>>>>>>>>>>>>>>>> 
>>>>>>>>>>>>>>>>>>> primitive might not be so special: it just takes S and
>>>>>>>>>>>>>>>>>>>>> 
>>>>>>>>>>>>>>>>>>>> produces
>>>>>>>>>>>>> 
>>>>>>>>>>>>>> B:
>>>>>>>>>>>>>> 
>>>>>>>>>>>>>>> isn't
>>>>>>>>>>>>>>>>>>>> 
>>>>>>>>>>>>>>>>>>>>> that what a ParDo does, plus some source magic, minus
>>> the
>>>>>>>>>>>>>>>>>>>>> 
>>>>>>>>>>>>>>>>>>>> convenience
>>>>>>>>>>>>>>>>>> 
>>>>>>>>>>>>>>>>>>> of
>>>>>>>>>>>>>>>>>>>> 
>>>>>>>>>>>>>>>>>>>>> c.output() vs. the start/advance() state machine?
>>>>>>>>>>>>>>>>>>>>> - At this point it became clear that we should explore
>>>>>>>>>>>>>>>>>>>>> 
>>>>>>>>>>>>>>>>>>>> unifying
>>>>>>>>>>>>> 
>>>>>>>>>>>>>> sources
>>>>>>>>>>>>>>>>>>>> 
>>>>>>>>>>>>>>>>>>>>> and ParDo's, in particular: can we bring the magic of
>>>>>>>>>>>>>>>>>>>>> 
>>>>>>>>>>>>>>>>>>>> sources
>>>>>>>>>>>> 
>>>>>>>>>>>>> to
>>>>>>>>>>>>> 
>>>>>>>>>>>>>> ParDo's
>>>>>>>>>>>>>>>>>>>> 
>>>>>>>>>>>>>>>>>>>>> but without the limitations and coding inconveniences?
>>>> And
>>>>>>>>>>>>>>>>>>>>> 
>>>>>>>>>>>>>>>>>>>> this
>>>>>>>>>>>>> 
>>>>>>>>>>>>>> is
>>>>>>>>>>>>>> 
>>>>>>>>>>>>>>> how
>>>>>>>>>>>>>>>>>>>> 
>>>>>>>>>>>>>>>>>>>>> SplittableDoFn was born: bringing source magic to a
>>> DoFn
>>>>>> by
>>>>>>>>>>>>>>>>>>>>> 
>>>>>>>>>>>>>>>>>>>> providing
>>>>>>>>>>>>>>>>>> 
>>>>>>>>>>>>>>>>>>> a
>>>>>>>>>>>>>>>>>>>> 
>>>>>>>>>>>>>>>>>>>>> RangeTracker.
>>>>>>>>>>>>>>>>>>>>> - Once the idea of "splittable DoFn's" was born, it
>>>> became
>>>>>>>>>>>>>>>>>>>>> 
>>>>>>>>>>>>>>>>>>>> clear
>>>>>>>>>>>>> 
>>>>>>>>>>>>>> that
>>>>>>>>>>>>>>>>>> 
>>>>>>>>>>>>>>>>>>> it
>>>>>>>>>>>>>>>>>>>> 
>>>>>>>>>>>>>>>>>>>>> is strictly more general than sources; at least, in the
>>>>>>>>>>>>>>>>>>>>> 
>>>>>>>>>>>>>>>>>>>> respect
>>>>>>>>>>>>> 
>>>>>>>>>>>>>> that
>>>>>>>>>>>>>>>>>> 
>>>>>>>>>>>>>>>>>>> sources have to produce output, while DoFn's don't: an
>>> SDF
>>>>>>>>>>>>>>>>>>>>> 
>>>>>>>>>>>>>>>>>>>> may
>>>>>>>>>>>> 
>>>>>>>>>>>>> very
>>>>>>>>>>>>>>>>>> 
>>>>>>>>>>>>>>>>>>> well
>>>>>>>>>>>>>>>>>>>> 
>>>>>>>>>>>>>>>>>>>>> produce no output at all, and simply perform a side
>>>> effect
>>>>>>>>>>>>>>>>>>>>> 
>>>>>>>>>>>>>>>>>>>> in
>>>>>>>>>>>> 
>>>>>>>>>>>>> a
>>>>>>>>>>>>> 
>>>>>>>>>>>>>> parallel/resumable way.
>>>>>>>>>>>>>>>>>>>>> - Then there were countless hours of discussions on
>>>>>> unifying
>>>>>>>>>>>>>>>>>>>>> 
>>>>>>>>>>>>>>>>>>>> the
>>>>>>>>>>>>> 
>>>>>>>>>>>>>> bounded/unbounded cases, on the particulars of RangeTracker
>>>>>>>>>>>>>>>>>>>>> 
>>>>>>>>>>>>>>>>>>>> APIs
>>>>>>>>>>>>> 
>>>>>>>>>>>>>> reconciling parallelization and checkpointing, what the
>>>>>>>>>>>>>>>>>>>>> 
>>>>>>>>>>>>>>>>>>>> relation
>>>>>>>>>>>>> 
>>>>>>>>>>>>>> between
>>>>>>>>>>>>>>>>>>>> 
>>>>>>>>>>>>>>>>>>>>> SDF and DF should be, etc. They culminated in the
>>> current
>>>>>>>>>>>>>>>>>>>>> 
>>>>>>>>>>>>>>>>>>>> proposal.
>>>>>>>>>>>>>>>>>> 
>>>>>>>>>>>>>>>>>>> The
>>>>>>>>>>>>>>>>>>>> 
>>>>>>>>>>>>>>>>>>>>> proposal comes at a time when a couple of key
>>> ingredients
>>>>>>>>>>>>>>>>>>>>> 
>>>>>>>>>>>>>>>>>>>> are
>>>>>>>>>>>> 
>>>>>>>>>>>>> (almost)
>>>>>>>>>>>>>>>>>>>> 
>>>>>>>>>>>>>>>>>>>>> ready: NewDoFn to make SDF look like a regular DoFn,
>>> and
>>>>>> the
>>>>>>>>>>>>>>>>>>>>> 
>>>>>>>>>>>>>>>>>>>> State/Timers
>>>>>>>>>>>>>>>>>>>> 
>>>>>>>>>>>>>>>>>>>>> proposal to enable unbounded work per element.
>>>>>>>>>>>>>>>>>>>>> 
>>>>>>>>>>>>>>>>>>>>> To put it shortly:
>>>>>>>>>>>>>>>>>>>>> - Yes, we will support existing Source connectors, and
>>>>>> will
>>>>>>>>>>>>>>>>>>>>> 
>>>>>>>>>>>>>>>>>>>> support
>>>>>>>>>>>>>>>>>> 
>>>>>>>>>>>>>>>>>>> writing new ones, possibly forever. There is no
>>>> interference
>>>>>>>>>>>>>>>>>>>>> 
>>>>>>>>>>>>>>>>>>>> with
>>>>>>>>>>>>>> 
>>>>>>>>>>>>>>> current
>>>>>>>>>>>>>>>>>>>> 
>>>>>>>>>>>>>>>>>>>>> users of Source.
>>>>>>>>>>>>>>>>>>>>> - The new API is an attempt to improve the Source API,
>>>>>> taken
>>>>>>>>>>>>>>>>>>>>> 
>>>>>>>>>>>>>>>>>>>> to
>>>>>>>>>>>>> 
>>>>>>>>>>>>>> its
>>>>>>>>>>>>>>>>>> 
>>>>>>>>>>>>>>>>>>> logical limit where it turns out that users' goals can be
>>>>>>>>>>>>>>>>>>>>> 
>>>>>>>>>>>>>>>>>>>> accomplished
>>>>>>>>>>>>>>>>>>>> 
>>>>>>>>>>>>>>>>>>>>> easier and more generically entirely within ParDo's.
>>>>>>>>>>>>>>>>>>>>> 
>>>>>>>>>>>>>>>>>>>>> Let me know what you think, and thanks again!
>>>>>>>>>>>>>>>>>>>>> 
>>>>>>>>>>>>>>>>>>>>> On Fri, Aug 5, 2016 at 2:39 AM Jean-Baptiste Onofré
>>>>>>>>>>>>>>>>>>>>> 
>>>>>>>>>>>>>>>>>>>> <[email protected] <javascript:;>>
>>>>>>>>>>>>>>>>>> 
>>>>>>>>>>>>>>>>>>> wrote:
>>>>>>>>>>>>>>>>>>>>> 
>>>>>>>>>>>>>>>>>>>>> Hi Eugene,
>>>>>>>>>>>>>>>>>>>>>> 
>>>>>>>>>>>>>>>>>>>>>> Just a question: why is it in DoFn and note an
>>>>>> improvement
>>>>>>>>>>>>>>>>>>>>>> 
>>>>>>>>>>>>>>>>>>>>> of
>>>>>>>>>>>> 
>>>>>>>>>>>>> Source
>>>>>>>>>>>>>>>>>> 
>>>>>>>>>>>>>>>>>>> ?
>>>>>>>>>>>>>>>>>>>> 
>>>>>>>>>>>>>>>>>>>>> 
>>>>>>>>>>>>>>>>>>>>>> If I understand correctly, it means that we will have
>>> to
>>>>>>>>>>>>>>>>>>>>>> 
>>>>>>>>>>>>>>>>>>>>> refactore
>>>>>>>>>>>>>>>>>> 
>>>>>>>>>>>>>>>>>>> all
>>>>>>>>>>>>>>>>>>>> 
>>>>>>>>>>>>>>>>>>>>> existing IO: basically, what you propose is to remove
>>> all
>>>>>>>>>>>>>>>>>>>>>> 
>>>>>>>>>>>>>>>>>>>>> Source
>>>>>>>>>>>>>> 
>>>>>>>>>>>>>>> to
>>>>>>>>>>>>>>>>>> 
>>>>>>>>>>>>>>>>>>> replace with NewDoFn.
>>>>>>>>>>>>>>>>>>>>>> 
>>>>>>>>>>>>>>>>>>>>>> I'm concern with this approach, especially in term of
>>>>>>>>>>>>>>>>>>>>>> 
>>>>>>>>>>>>>>>>>>>>> timing:
>>>>>>>>>>>> 
>>>>>>>>>>>>> clearly,
>>>>>>>>>>>>>>>>>>>> 
>>>>>>>>>>>>>>>>>>>>> the IO is the area where we have to move forward in
>>> Beam
>>>>>> as
>>>>>>>>>>>>>>>>>>>>>> 
>>>>>>>>>>>>>>>>>>>>> it
>>>>>>>>>>>>> 
>>>>>>>>>>>>>> will
>>>>>>>>>>>>>>>>>> 
>>>>>>>>>>>>>>>>>>> allow new users to start in their projects.
>>>>>>>>>>>>>>>>>>>>>> So, we started to bring new IOs: Kafka, JMS,
>>> Cassandra,
>>>>>>>>>>>>>>>>>>>>>> 
>>>>>>>>>>>>>>>>>>>>> MongoDB,
>>>>>>>>>>>>>> 
>>>>>>>>>>>>>>> JDBC,
>>>>>>>>>>>>>>>>>>>> 
>>>>>>>>>>>>>>>>>>>>> ... and some people started to learn the IO API
>>>>>>>>>>>>>>>>>>>>>> 
>>>>>>>>>>>>>>>>>>>>> (Bounded/Unbouded
>>>>>>>>>>>>>> 
>>>>>>>>>>>>>>> source, etc).
>>>>>>>>>>>>>>>>>>>>>> 
>>>>>>>>>>>>>>>>>>>>>> I think it would make more sense to enhance the IO API
>>>>>>>>>>>>>>>>>>>>>> 
>>>>>>>>>>>>>>>>>>>>> (Source)
>>>>>>>>>>>>> 
>>>>>>>>>>>>>> instead
>>>>>>>>>>>>>>>>>>>> 
>>>>>>>>>>>>>>>>>>>>> of introducing a NewDoFn.
>>>>>>>>>>>>>>>>>>>>>> 
>>>>>>>>>>>>>>>>>>>>>> What are your thoughts for IO writer like me ? ;)
>>>>>>>>>>>>>>>>>>>>>> 
>>>>>>>>>>>>>>>>>>>>>> Regards
>>>>>>>>>>>>>>>>>>>>>> JB
>>>>>>>>>>>>>>>>>>>>>> 
>>>>>>>>>>>>>>>>>>>>>> On 08/04/2016 07:45 PM, Eugene Kirpichov wrote:
>>>>>>>>>>>>>>>>>>>>>> 
>>>>>>>>>>>>>>>>>>>>>>> Hello Beam community,
>>>>>>>>>>>>>>>>>>>>>>> 
>>>>>>>>>>>>>>>>>>>>>>> We (myself, Daniel Mills and Robert Bradshaw) would
>>>> like
>>>>>>>>>>>>>>>>>>>>>>> 
>>>>>>>>>>>>>>>>>>>>>> to
>>>>>>>>>>>> 
>>>>>>>>>>>>> propose
>>>>>>>>>>>>>>>>>>>> 
>>>>>>>>>>>>>>>>>>>>> "Splittable DoFn" - a major generalization of DoFn,
>>> which
>>>>>>>>>>>>>>>>>>>>>>> 
>>>>>>>>>>>>>>>>>>>>>> allows
>>>>>>>>>>>>>>>>>> 
>>>>>>>>>>>>>>>>>>> processing
>>>>>>>>>>>>>>>>>>>>>> 
>>>>>>>>>>>>>>>>>>>>>>> of a single element to be non-monolithic, i.e.
>>>>>>>>>>>>>>>>>>>>>>> 
>>>>>>>>>>>>>>>>>>>>>> checkpointable
>>>>>>>>>>>>> 
>>>>>>>>>>>>>> and
>>>>>>>>>>>>>>>>>> 
>>>>>>>>>>>>>>>>>>> parallelizable, as well as doing an unbounded amount of
>>>>>>>>>>>>>>>>>>>>>>> 
>>>>>>>>>>>>>>>>>>>>>> work
>>>>>>>>>>>>> 
>>>>>>>>>>>>>> per
>>>>>>>>>>>>>>>>>> 
>>>>>>>>>>>>>>>>>>> element.
>>>>>>>>>>>>>>>>>>>>>> 
>>>>>>>>>>>>>>>>>>>>>>> 
>>>>>>>>>>>>>>>>>>>>>>> This allows effectively replacing the current
>>>>>>>>>>>>>>>>>>>>>>> 
>>>>>>>>>>>>>>>>>>>>>> Bounded/UnboundedSource
>>>>>>>>>>>>>>>>>>>> 
>>>>>>>>>>>>>>>>>>>>> APIs
>>>>>>>>>>>>>>>>>>>>>> 
>>>>>>>>>>>>>>>>>>>>>>> with DoFn's that are much easier to code, more
>>> scalable
>>>>>>>>>>>>>>>>>>>>>>> 
>>>>>>>>>>>>>>>>>>>>>> and
>>>>>>>>>>>> 
>>>>>>>>>>>>> composable
>>>>>>>>>>>>>>>>>>>> 
>>>>>>>>>>>>>>>>>>>>> with
>>>>>>>>>>>>>>>>>>>>>> 
>>>>>>>>>>>>>>>>>>>>>>> the rest of the Beam programming model, and enables
>>>> many
>>>>>>>>>>>>>>>>>>>>>>> 
>>>>>>>>>>>>>>>>>>>>>> use
>>>>>>>>>>>>> 
>>>>>>>>>>>>>> cases
>>>>>>>>>>>>>>>>>> 
>>>>>>>>>>>>>>>>>>> that
>>>>>>>>>>>>>>>>>>>> 
>>>>>>>>>>>>>>>>>>>>> were previously difficult or impossible, as well as
>>> some
>>>>>>>>>>>>>>>>>>>>>>> 
>>>>>>>>>>>>>>>>>>>>>> non-obvious new
>>>>>>>>>>>>>>>>>>>> 
>>>>>>>>>>>>>>>>>>>>> use cases.
>>>>>>>>>>>>>>>>>>>>>>> 
>>>>>>>>>>>>>>>>>>>>>>> This proposal has been mentioned before in JIRA
>>>>>> [BEAM-65]
>>>>>>>>>>>>>>>>>>>>>>> 
>>>>>>>>>>>>>>>>>>>>>> and
>>>>>>>>>>>>> 
>>>>>>>>>>>>>> some
>>>>>>>>>>>>>>>>>> 
>>>>>>>>>>>>>>>>>>> Beam
>>>>>>>>>>>>>>>>>>>> 
>>>>>>>>>>>>>>>>>>>>> meetings, and now the whole thing is written up in a
>>>>>>>>>>>>>>>>>>>>>>> 
>>>>>>>>>>>>>>>>>>>>>> document:
>>>>>>>>>>>>>> 
>>>>>>>>>>>>>>> 
>>>>>>>>>>>>>>>>>>>>>>>        https://s.apache.org/splittable-do-fn
>>>>>>>>>>>>>>>>>>>>>>> 
>>>>>>>>>>>>>>>>>>>>>>> Here are some things that become possible with
>>>>>> Splittable
>>>>>>>>>>>>>>>>>>>>>>> 
>>>>>>>>>>>>>>>>>>>>>> DoFn:
>>>>>>>>>>>>>> 
>>>>>>>>>>>>>>> - Efficiently read a filepattern matching millions of
>>>>>>>>>>>>>>>>>>>>>>> 
>>>>>>>>>>>>>>>>>>>>>> files
>>>>>>>>>>>> 
>>>>>>>>>>>>> - Read a collection of files that are produced by an
>>>>>>>>>>>>>>>>>>>>>>> 
>>>>>>>>>>>>>>>>>>>>>> earlier
>>>>>>>>>>>>> 
>>>>>>>>>>>>>> step
>>>>>>>>>>>>>>>>>> 
>>>>>>>>>>>>>>>>>>> in the
>>>>>>>>>>>>>>>>>>>> 
>>>>>>>>>>>>>>>>>>>>> pipeline (e.g. easily implement a connector to a
>>> storage
>>>>>>>>>>>>>>>>>>>>>>> 
>>>>>>>>>>>>>>>>>>>>>> system
>>>>>>>>>>>>>> 
>>>>>>>>>>>>>>> that can
>>>>>>>>>>>>>>>>>>>> 
>>>>>>>>>>>>>>>>>>>>> export itself to files)
>>>>>>>>>>>>>>>>>>>>>>> - Implement a Kafka reader by composing a "list
>>>>>>>>>>>>>>>>>>>>>>> 
>>>>>>>>>>>>>>>>>>>>>> partitions"
>>>>>>>>>>>> 
>>>>>>>>>>>>> DoFn
>>>>>>>>>>>>>>>>>> 
>>>>>>>>>>>>>>>>>>> with a
>>>>>>>>>>>>>>>>>>>> 
>>>>>>>>>>>>>>>>>>>>> DoFn that simply polls a consumer and outputs new
>>> records
>>>>>>>>>>>>>>>>>>>>>>> 
>>>>>>>>>>>>>>>>>>>>>> in
>>>>>>>>>>>>> 
>>>>>>>>>>>>>> a
>>>>>>>>>>>>>> 
>>>>>>>>>>>>>>> while()
>>>>>>>>>>>>>>>>>>>> 
>>>>>>>>>>>>>>>>>>>>> loop
>>>>>>>>>>>>>>>>>>>>>> 
>>>>>>>>>>>>>>>>>>>>>>> - Implement a log tailer by composing a DoFn that
>>>>>>>>>>>>>>>>>>>>>>> 
>>>>>>>>>>>>>>>>>>>>>> incrementally
>>>>>>>>>>>>>> 
>>>>>>>>>>>>>>> returns
>>>>>>>>>>>>>>>>>>>> 
>>>>>>>>>>>>>>>>>>>>> new
>>>>>>>>>>>>>>>>>>>>>> 
>>>>>>>>>>>>>>>>>>>>>>> files in a directory and a DoFn that tails a file
>>>>>>>>>>>>>>>>>>>>>>> - Implement a parallel "count friends in common"
>>>>>>>>>>>>>>>>>>>>>>> 
>>>>>>>>>>>>>>>>>>>>>> algorithm
>>>>>>>>>>>> 
>>>>>>>>>>>>> (matrix
>>>>>>>>>>>>>>>>>> 
>>>>>>>>>>>>>>>>>>> squaring) with good work balancing
>>>>>>>>>>>>>>>>>>>>>>> 
>>>>>>>>>>>>>>>>>>>>>>> Here is the meaningful part of a hypothetical Kafka
>>>>>>>>>>>>>>>>>>>>>>> 
>>>>>>>>>>>>>>>>>>>>>> reader
>>>>>>>>>>>> 
>>>>>>>>>>>>> written
>>>>>>>>>>>>>>>>>> 
>>>>>>>>>>>>>>>>>>> against
>>>>>>>>>>>>>>>>>>>>>> 
>>>>>>>>>>>>>>>>>>>>>>> this API:
>>>>>>>>>>>>>>>>>>>>>>> 
>>>>>>>>>>>>>>>>>>>>>>>    ProcessContinuation processElement(
>>>>>>>>>>>>>>>>>>>>>>>            ProcessContext context,
>>> OffsetRangeTracker
>>>>>>>>>>>>>>>>>>>>>>> 
>>>>>>>>>>>>>>>>>>>>>> tracker)
>>>>>>>>>>>>>> 
>>>>>>>>>>>>>>> {
>>>>>>>>>>>>>>>>>> 
>>>>>>>>>>>>>>>>>>>      try (KafkaConsumer<String, String> consumer =
>>>>>>>>>>>>>>>>>>>>>>> 
>>>> Kafka.subscribe(context.element().topic,
>>>>>>>>>>>>>>>>>>>>>>> 
>>>>>>>>>>>>>>>>>>>>>>> context.element().partition)) {
>>>>>>>>>>>>>> 
>>>>>>>>>>>>>>>        consumer.seek(tracker.start());
>>>>>>>>>>>>>>>>>>>>>>>        while (true) {
>>>>>>>>>>>>>>>>>>>>>>>          ConsumerRecords<String, String> records =
>>>>>>>>>>>>>>>>>>>>>>> 
>>>>>>>>>>>>>>>>>>>>>> consumer.poll(100ms);
>>>>>>>>>>>>>>>>>>>>>> 
>>>>>>>>>>>>>>>>>>>>>>>          if (records == null) return done();
>>>>>>>>>>>>>>>>>>>>>>>          for (ConsumerRecord<String, String> record
>>> :
>>>>>>>>>>>>>>>>>>>>>>> 
>>>>>>>>>>>>>>>>>>>>>> records)
>>>>>>>>>>>>>> 
>>>>>>>>>>>>>>> {
>>>>>>>>>>>>>>>>>> 
>>>>>>>>>>>>>>>>>>>            if (!tracker.tryClaim(record.offset())) {
>>>>>>>>>>>>>>>>>>>>>>>              return
>>>>>>>>>>>>>>>>>>>>>>> 
>>>>>>>>>>>>>>>>>>>>>> 
>>> resume().withFutureOutputWatermark(record.timestamp());
>>>>>>>>>>>>>>>>>>>>>> 
>>>>>>>>>>>>>>>>>>>>>>>            }
>>>>>>>>>>>>>>>>>>>>>>>            context.output(record);
>>>>>>>>>>>>>>>>>>>>>>>          }
>>>>>>>>>>>>>>>>>>>>>>>        }
>>>>>>>>>>>>>>>>>>>>>>>      }
>>>>>>>>>>>>>>>>>>>>>>>    }
>>>>>>>>>>>>>>>>>>>>>>> 
>>>>>>>>>>>>>>>>>>>>>>> The document describes in detail the motivations
>>> behind
>>>>>>>>>>>>>>>>>>>>>>> 
>>>>>>>>>>>>>>>>>>>>>> this
>>>>>>>>>>>>> 
>>>>>>>>>>>>>> feature,
>>>>>>>>>>>>>>>>>>>> 
>>>>>>>>>>>>>>>>>>>>> the
>>>>>>>>>>>>>>>>>>>>>> 
>>>>>>>>>>>>>>>>>>>>>>> basic idea and API, open questions, and outlines an
>>>>>>>>>>>>>>>>>>>>>>> 
>>>>>>>>>>>>>>>>>>>>>> incremental
>>>>>>>>>>>>>> 
>>>>>>>>>>>>>>> delivery
>>>>>>>>>>>>>>>>>>>> 
>>>>>>>>>>>>>>>>>>>>> plan.
>>>>>>>>>>>>>>>>>>>>>>> 
>>>>>>>>>>>>>>>>>>>>>>> The proposed API builds on the reflection-based new
>>>> DoFn
>>>>>>>>>>>>>>>>>>>>>>> 
>>>>>>>>>>>>>>>>>>>>>> [new-do-fn]
>>>>>>>>>>>>>>>>>>>> 
>>>>>>>>>>>>>>>>>>>>> and is
>>>>>>>>>>>>>>>>>>>>>> 
>>>>>>>>>>>>>>>>>>>>>>> loosely related to "State and Timers for DoFn"
>>>>>>>>>>>>>>>>>>>>>>> 
>>>>>>>>>>>>>>>>>>>>>> [beam-state].
>>>>>>>>>>>>> 
>>>>>>>>>>>>>> 
>>>>>>>>>>>>>>>>>>>>>>> Please take a look and comment!
>>>>>>>>>>>>>>>>>>>>>>> 
>>>>>>>>>>>>>>>>>>>>>>> Thanks.
>>>>>>>>>>>>>>>>>>>>>>> 
>>>>>>>>>>>>>>>>>>>>>>> [BEAM-65]
>>>> https://issues.apache.org/jira/browse/BEAM-65
>>>>>>>>>>>>>>>>>>>>>>> [new-do-fn] https://s.apache.org/a-new-do-fn
>>>>>>>>>>>>>>>>>>>>>>> [beam-state] https://s.apache.org/beam-state
>>>>>>>>>>>>>>>>>>>>>>> 
>>>>>>>>>>>>>>>>>>>>>>> 
>>>>>>>>>>>>>>>>>>>>>> --
>>>>>>>>>>>>>>>>>>>>>> Jean-Baptiste Onofré
>>>>>>>>>>>>>>>>>>>>>> [email protected] <javascript:;>
>>>>>>>>>>>>>>>>>>>>>> http://blog.nanthrax.net
>>>>>>>>>>>>>>>>>>>>>> Talend - http://www.talend.com
>>>>>>>>>>>>>>>>>>>>>> 
>>>>>>>>>>>>>>>>>>>>>> 
>>>>>>>>>>>>>>>>>>>>> 
>>>>>>>>>>>>>>>>>>> 
>>>>>>>>>>>>>>>>> 
>>>>>>>>>>>>>>>> 
>>>>>>>>>>>>>> 
>>>>>>>>>>>>> 
>>>>>>>>>>>>> --
>>>>>>>>>>>>> Thanks,
>>>>>>>>>>>>> Andrew
>>>>>>>>>>>>> 
>>>>>>>>>>>>> Subscribe to my book: Streaming Data <
>>> http://manning.com/psaltis
>>>>> 
>>>>>>>>>>>>> <https://www.linkedin.com/pub/andrew-psaltis/1/17b/306>
>>>>>>>>>>>>> twiiter: @itmdata <
>>>>>>> http://twitter.com/intent/user?screen_name=itmdata
>>>>>>>>>>>>>> 
>>>>>>>>>>>>> 
>>>>>>>>>>>>> 
>>>>>>>>>>>> 
>>>>>>>>>>> 
>>>>>>>>>> --
>>>>>>>>>> Jean-Baptiste Onofré
>>>>>>>>>> [email protected]
>>>>>>>>>> http://blog.nanthrax.net
>>>>>>>>>> Talend - http://www.talend.com
>>>>>>>>>> 
>>>>>>>>>> 
>>>>>>>>> 
>>>>>>>> --
>>>>>>>> Jean-Baptiste Onofré
>>>>>>>> [email protected]
>>>>>>>> http://blog.nanthrax.net
>>>>>>>> Talend - http://www.talend.com
>>>>>>>> 
>>>>>>> 
>>>>>> 
>>>>> 
>>>> 
>>>> --
>>>> Jean-Baptiste Onofré
>>>> [email protected]
>>>> http://blog.nanthrax.net
>>>> Talend - http://www.talend.com
>>>> 
>>> 
>> 
> 
> -- 
> Jean-Baptiste Onofré
> [email protected]
> http://blog.nanthrax.net
> Talend - http://www.talend.com
> 
> 

Reply via email to