Thank you, JB! 
It will be helpful to have a structured access to the technical discussions.

Looking into the Splittable DoFn proposal [1] the authors are pointing what I 
think to be the current main design issue:
Source objects cannot be produced by the pipeline at runtime. 
You are probably familiar with Husky [4] where they discuss about dynamically 
object creation; maybe semantics are different
but to me it seems to tackle the same problem. Their point could than fit into 
the Beam model?
I wonder if they could aspire to be a runner of Beam, I would appreciate any 
remarks on this.
 
Is this proposal mainly related to supporting autoscaling and dynamic work 
rebalancing as described in [2] & [3]?

Thank you and I am sorry if I interrupted your discussion on the proposal.

[1] 
https://docs.google.com/document/d/1AQmx-T9XjSi1PNoEp5_L-lT0j7BkgTbmQnc6uFEMI4c/edit#
 
<https://docs.google.com/document/d/1AQmx-T9XjSi1PNoEp5_L-lT0j7BkgTbmQnc6uFEMI4c/edit#>
[2] 
https://cloud.google.com/blog/big-data/2016/03/comparing-cloud-dataflow-autoscaling-to-spark-and-hadoop
 
<https://cloud.google.com/blog/big-data/2016/03/comparing-cloud-dataflow-autoscaling-to-spark-and-hadoop>
[3] 
https://cloud.google.com/blog/big-data/2016/05/no-shard-left-behind-dynamic-work-rebalancing-in-google-cloud-dataflow
 
<https://cloud.google.com/blog/big-data/2016/05/no-shard-left-behind-dynamic-work-rebalancing-in-google-cloud-dataflow>
[4] http://www.husky-project.com/ <http://www.husky-project.com/> ; 
http://www.vldb.org/pvldb/vol9/p420-yang.pdf 
<http://www.vldb.org/pvldb/vol9/p420-yang.pdf>

Best,
Ovidiu

> On 29 Aug 2016, at 14:54, Jean-Baptiste Onofré <[email protected]> wrote:
> 
> Hi Ovidiu,
> 
> We had a "Technical Discussions" link on the website menu, but I can't see it 
> anymore on the website (I just see "Technical Vision").
> It contains all documents on which we are discussing.
> 
> Agree with you to have an area where we store all "Technical Discussion 
> Documents".
> 
> Let me discuss with Frances about that.
> 
> Thanks !
> Regards
> JB
> 
> On 08/29/2016 02:50 PM, Ovidiu-Cristian MARCU wrote:
>> Hi everyone
>> 
>> Is there any repository where one can track all proposals, something like 
>> Flink does with this wiki [1]?
>> 
>> [1] 
>> https://cwiki.apache.org/confluence/display/FLINK/Flink+Improvement+Proposals
>>  
>> <https://cwiki.apache.org/confluence/display/FLINK/Flink+Improvement+Proposals>
>> 
>> Thanks
>> Ovidiu
>> 
>>> On 29 Aug 2016, at 12:01, Jean-Baptiste Onofré <[email protected]> wrote:
>>> 
>>> Hi Aljoscha,
>>> 
>>> Indeed, it's something we discussed during our call.
>>> 
>>> AFAIU, it's one function of the tracker. When doing the tracker tryClaim 
>>> (with offset, partition id, or any kind of tracked "ID"), if the claim is 
>>> not possible, then we will update the watermark.
>>> 
>>> So the tracker is useful to determine the "split" and also to deal with 
>>> watermark.
>>> 
>>> Regards
>>> JB
>>> 
>>> On 08/29/2016 11:55 AM, Aljoscha Krettek wrote:
>>>> Hi,
>>>> I have another question about this: currently, unbounded sources have
>>>> special logic for determining the watermark and the system periodically
>>>> asks the sources for the current watermark. As I understood it, watermarks
>>>> are only "generated" at the sources. How will this work when sources are
>>>> implemented as a combination of DoFns and SplittableDoFns? Will
>>>> SplittableDoFns be asked for a watermark, does this mean that watermarks
>>>> can then be "generated" at any operation?
>>>> 
>>>> Cheers,
>>>> Aljoscha
>>>> 
>>>> On Sun, 21 Aug 2016 at 22:34 Eugene Kirpichov 
>>>> <[email protected]>
>>>> wrote:
>>>> 
>>>>> Hi JB,
>>>>> 
>>>>> Yes, I'm assuming you're referring to the "magic" part on the transform
>>>>> expansion diagram. This is indeed runner-specific, and timers+state are
>>>>> likely the simplest way to do this for an SDF that does unbounded amount 
>>>>> of
>>>>> work.
>>>>> 
>>>>> On Sun, Aug 21, 2016 at 12:14 PM Jean-Baptiste Onofré <[email protected]>
>>>>> wrote:
>>>>> 
>>>>>> Anyway, from a runner perspective, we will have kind of API (part of the
>>>>>> Runner API) to "orchestrate" the SDF as we discussed during the call,
>>>>>> right ?
>>>>>> 
>>>>>> Regards
>>>>>> JB
>>>>>> 
>>>>>> On 08/21/2016 07:24 PM, Eugene Kirpichov wrote:
>>>>>>> Hi Aljoscha,
>>>>>>> This is an excellent question! And the answer is, we don't need any new
>>>>>>> concepts like "SDF executor" and can rely on the per-key state and
>>>>> timers
>>>>>>> machinery that already exists in all runners because it's necessary to
>>>>>>> implement windowing/triggering properly.
>>>>>>> 
>>>>>>> Note that this is already somewhat addressed in the previously posted
>>>>>> State
>>>>>>> and Timers proposal https://s.apache.org/beam-state , under "per-key
>>>>>>> workflows".
>>>>>>> 
>>>>>>> Think of it this way, using the Kafka example: we'll expand it into a
>>>>>>> transform:
>>>>>>> 
>>>>>>> (1) ParDo { topic -> (unique key, topic, partition, [0, inf))) for
>>>>>>> partition in topic.listPartitions() }
>>>>>>> (2) GroupByKey
>>>>>>> (3) ParDo { key, topic, partition, R -> Kafka reader code in the
>>>>>>> proposal/slides }
>>>>>>> - R is the OffsetRange restriction which in this case will be always
>>>>> of
>>>>>>> the form [startOffset, inf).
>>>>>>> - there'll be just 1 value per key, but we use GBK to just get access
>>>>>> to
>>>>>>> the per-key state/timers machinery. This may be runner-specific; maybe
>>>>>> some
>>>>>>> runners don't need a GBK to do that.
>>>>>>> 
>>>>>>> Now suppose the topic has two partitions, P1 and P2, and they get
>>>>>> assigned
>>>>>>> unique keys K1, K2.
>>>>>>> Then the input to (3) will be a collection of: (K1, topic, P1, [0,
>>>>> inf)),
>>>>>>> (K2, topic, P2, [0, inf)).
>>>>>>> Suppose we have just 1 worker with just 1 thread. Now, how will this
>>>>>> thread
>>>>>>> be able to produce elements from both P1 and P2? here's how.
>>>>>>> 
>>>>>>> The thread will process (K1, topic, P1, [0, inf)), checkpoint after a
>>>>>>> certain time or after a certain number of elements are output (just
>>>>> like
>>>>>>> with the current UnboundedSource reading code) producing a residual
>>>>>>> restriction R1' (basically a new start timestamp), put R11 into the
>>>>>> per-key
>>>>>>> state and set a timer T1 to resume.
>>>>>>> Then it will process (K2, topic, P2, [0, inf)), do the same producing a
>>>>>>> residual restriction R2' and setting a timer T2 to resume.
>>>>>>> Then timer T1 will fire in the context of the key K1. The thread will
>>>>>> call
>>>>>>> processElement again, this time supplying R1' as the restriction; the
>>>>>>> process repeats and after a while it checkpoints and stores R1'' into
>>>>>> state
>>>>>>> of K1.
>>>>>>> Then timer T2 will fire in the context of K2, run processElement for a
>>>>>>> while, set a new timer and store R2'' into the state of K2.
>>>>>>> Etc.
>>>>>>> If partition 1 goes away, the processElement call will return "do not
>>>>>>> resume", so a timer will not be set and instead the state associated
>>>>> with
>>>>>>> K1 will be GC'd.
>>>>>>> 
>>>>>>> So basically it's almost like cooperative thread scheduling: things run
>>>>>> for
>>>>>>> a while, until the runner tells them to checkpoint, then they set a
>>>>> timer
>>>>>>> to resume themselves, and the runner fires the timers, and the process
>>>>>>> repeats. And, again, this only requires things that runners can already
>>>>>> do
>>>>>>> - state and timers, but no new concept of SDF executor (and
>>>>> consequently
>>>>>> no
>>>>>>> necessity to choose/tune how many you need).
>>>>>>> 
>>>>>>> Makes sense?
>>>>>>> 
>>>>>>> On Sat, Aug 20, 2016 at 9:46 AM Aljoscha Krettek <[email protected]>
>>>>>>> wrote:
>>>>>>> 
>>>>>>>> Hi,
>>>>>>>> I have another question that I think wasn't addressed in the meeting.
>>>>> At
>>>>>>>> least it wasn't mentioned in the notes.
>>>>>>>> 
>>>>>>>> In the context of replacing sources by a combination of to SDFs, how
>>>>> do
>>>>>> you
>>>>>>>> determine how many "SDF executor" instances you need downstream? For
>>>>> the
>>>>>>>> sake of argument assume that both SDFs are executed with parallelism 1
>>>>>> (or
>>>>>>>> one per worker). Now, if you have a file source that reads from a
>>>>> static
>>>>>>>> set of files the first SDF would emit the filenames while the second
>>>>> SDF
>>>>>>>> would receive the filenames and emit their contents. This works well
>>>>> and
>>>>>>>> the downstream SDF can process one filename after the other. Now,
>>>>> think
>>>>>> of
>>>>>>>> something like a Kafka source. The first SDF would emit the partitions
>>>>>> (say
>>>>>>>> 4 partitions, in this example) and the second SDF would be responsible
>>>>>> for
>>>>>>>> reading from a topic and emitting elements. Reading from one topic
>>>>> never
>>>>>>>> finishes so you can't process the topics in series. I think you would
>>>>>> need
>>>>>>>> to have 4 downstream "SDF executor" instances. The question now is:
>>>>> how
>>>>>> do
>>>>>>>> you determine whether you are in the first or the second situation?
>>>>>>>> 
>>>>>>>> Probably I'm just overlooking something and this is already dealt with
>>>>>>>> somewhere... :-)
>>>>>>>> 
>>>>>>>> Cheers,
>>>>>>>> Aljoscha
>>>>>>>> 
>>>>>>>> On Fri, 19 Aug 2016 at 21:02 Ismaël Mejía <[email protected]> wrote:
>>>>>>>> 
>>>>>>>>> Hello,
>>>>>>>>> 
>>>>>>>>> Thanks for the notes both Dan and Eugene, and for taking the time to
>>>>> do
>>>>>>>> the
>>>>>>>>> presentation and  answer our questions.
>>>>>>>>> 
>>>>>>>>> I mentioned the ongoing work on dynamic scaling on Flink because I
>>>>>>>> suppose
>>>>>>>>> that it will address dynamic rebalancing eventually (there are
>>>>> multiple
>>>>>>>>> changes going on for dynamic scaling).
>>>>>>>>> 
>>>>>>>>> 
>>>>>>>>> 
>>>>>>>> 
>>>>>> 
>>>>> https://docs.google.com/document/d/1G1OS1z3xEBOrYD4wSu-LuBCyPUWyFd9l3T9WyssQ63w/edit#heading=h.2suhu1fjxmp4
>>>>>>>>> 
>>>>>>>>> 
>>>>> https://lists.apache.org/[email protected]:lte=1M:FLIP-8
>>>>>>>>> 
>>>>>>>>> Anyway I am far from an expert on flink, but probably the flink guys
>>>>>> can
>>>>>>>>> give their opinion about this and refer to a more precise document
>>>>> that
>>>>>>>> the
>>>>>>>>> ones I mentioned..
>>>>>>>>> 
>>>>>>>>> ​Thanks again,
>>>>>>>>> Ismaël​
>>>>>>>>> 
>>>>>>>>> On Fri, Aug 19, 2016 at 8:52 PM, Jean-Baptiste Onofré <
>>>>> [email protected]
>>>>>>> 
>>>>>>>>> wrote:
>>>>>>>>> 
>>>>>>>>>> Great summary Eugene and Dan.
>>>>>>>>>> 
>>>>>>>>>> And thanks again for the details, explanation, and discussion.
>>>>>>>>>> 
>>>>>>>>>> Regards
>>>>>>>>>> JB
>>>>>>>>>> 
>>>>>>>>>> 
>>>>>>>>>> On 08/19/2016 08:16 PM, Eugene Kirpichov wrote:
>>>>>>>>>> 
>>>>>>>>>>> Thanks for attending, everybody!
>>>>>>>>>>> 
>>>>>>>>>>> Here are meeting notes (thanks Dan!).
>>>>>>>>>>> 
>>>>>>>>>>> Q: Will SplittableDoFn enable better repartitioning of the
>>>>>>>> input/output
>>>>>>>>>>> data?
>>>>>>>>>>> A: Not really; repartitioning is orthogonal to SDF.
>>>>>>>>>>> 
>>>>>>>>>>> Current Source API suffers from lack of composition and scalability
>>>>>>>>>>> because
>>>>>>>>>>> we treat sources too much as metadata, not enough as data.
>>>>>>>>>>> 
>>>>>>>>>>> Q(slide with transform expansion): who does the "magic"?
>>>>>>>>>>> A: The runner. Checkpointing and dynamically splitting restrictions
>>>>>>>> will
>>>>>>>>>>> require collaboration with the runner.
>>>>>>>>>>> 
>>>>>>>>>>> Q: How does the runner interact with the DoFn to control the
>>>>>>>>> restrictions?
>>>>>>>>>>> Is it related to the centralized job tracker etc.?
>>>>>>>>>>> A: RestrictionTracker is a simple helper object, that exists purely
>>>>>> on
>>>>>>>>> the
>>>>>>>>>>> worker while executing a single partition, and interacts with the
>>>>>>>> worker
>>>>>>>>>>> harness part of the runner. Not to be confused with the centralized
>>>>>>>> job
>>>>>>>>>>> tracker (master) - completely unrelated. Worker harness, of course,
>>>>>>>>>>> interacts with the master in some relevant ways (e.g. Dataflow
>>>>> master
>>>>>>>>> can
>>>>>>>>>>> tell "you're a straggler, you should split").
>>>>>>>>>>> 
>>>>>>>>>>> Q: Is this a new DoFn subclass, or how will this integrate with the
>>>>>>>>>>> existing code?
>>>>>>>>>>> A: It's a feature of reflection-based DoFn (
>>>>>>>>> https://s.apache.org/a-new-do
>>>>>>>>>>> fn)
>>>>>>>>>>> - just another optional parameter of type RestrictionTracker to
>>>>>>>>>>> processElement() which is dynamically bound via reflection, so
>>>>> fully
>>>>>>>>>>> backward/forward compatible, and looks to users like a regular
>>>>> DoFn.
>>>>>>>>>>> 
>>>>>>>>>>> Q: why is fractionClaimed a double?
>>>>>>>>>>> A: It's a number in 0..1. Job-wide magic (e.g. autoscaling, dynamic
>>>>>>>>>>> rebalancing) requires a uniform way to represent progress through
>>>>>>>>>>> different
>>>>>>>>>>> sources.
>>>>>>>>>>> 
>>>>>>>>>>> Q: Spark runner is microbatch-based, so this seems to map well onto
>>>>>>>>>>> checkpoint/resume, right?
>>>>>>>>>>> A: Yes; actually the Dataflow runner is, at a worker level, also
>>>>>>>>>>> microbatch-based. The way SDF interacts with a runner will be very
>>>>>>>>> similar
>>>>>>>>>>> to how a Bounded/UnboundedSource interacts with a runner.
>>>>>>>>>>> 
>>>>>>>>>>> Q: Using SDF, what would be the "packaging" of the IO?
>>>>>>>>>>> A: Same as currently: package IO's as PTransforms and their
>>>>>>>>> implementation
>>>>>>>>>>> under the hood can be anything: Source, simple ParDo's, SDF, etc.
>>>>>> E.g.
>>>>>>>>>>> Datastore was recently refactored from BoundedSource to ParDo
>>>>> (ended
>>>>>>>> up
>>>>>>>>>>> simpler and more scalable), transparently to users.
>>>>>>>>>>> 
>>>>>>>>>>> Q: What's the timeline; what to do with the IOs currently in
>>>>>>>>> development?
>>>>>>>>>>> A: Timeline is O(months). Keep doing what you're doing and working
>>>>> on
>>>>>>>>> top
>>>>>>>>>>> of Source APIs when necessary and simple ParDo's otherwise.
>>>>>>>>>>> 
>>>>>>>>>>> Q: What's the impact for the runner writers?
>>>>>>>>>>> A: Tentatively expected that most of the code for running an SDF
>>>>> will
>>>>>>>> be
>>>>>>>>>>> common to runners, with some amount of per-runner glue code, just
>>>>>> like
>>>>>>>>>>> GBK/windowing/triggering. Impact on Dataflow runner is larger since
>>>>>> it
>>>>>>>>>>> supports dynamic rebalancing in batch mode and this is the hardest
>>>>>>>> part,
>>>>>>>>>>> but for other runners shouldn't be too hard.
>>>>>>>>>>> 
>>>>>>>>>>> JB: Talend has people who can help with this: e.g. help integrate
>>>>>> into
>>>>>>>>>>> Spark runner, refactor IOs etc. Amit also willing to chat about
>>>>>>>>> supporting
>>>>>>>>>>> SDF in Spark runner.
>>>>>>>>>>> 
>>>>>>>>>>> Ismael: There's a Flink proposal about dynamic rebalancing. Ismael
>>>>>>>> will
>>>>>>>>>>> send a link.
>>>>>>>>>>> 
>>>>>>>>>>> 
>>>>>>>>>>> On Fri, Aug 19, 2016 at 7:21 AM Jean-Baptiste Onofré <
>>>>>> [email protected]
>>>>>>>>> 
>>>>>>>>>>> wrote:
>>>>>>>>>>> 
>>>>>>>>>>> Hi Eugene,
>>>>>>>>>>>> 
>>>>>>>>>>>> thanks for the reminder.
>>>>>>>>>>>> 
>>>>>>>>>>>> Just to prepare some topics for the call, please find some points:
>>>>>>>>>>>> 
>>>>>>>>>>>> 1. Using SDF, what would be the "packaging" of the IO ? It sounds
>>>>> to
>>>>>>>> me
>>>>>>>>>>>> that we can keep the IO packaging style (using with* setters for
>>>>> the
>>>>>>>> IO
>>>>>>>>>>>> configuration) and replace PTransform, Source, Reader, ...
>>>>> directly
>>>>>>>>> with
>>>>>>>>>>>> SDF. Correct ?
>>>>>>>>>>>> 
>>>>>>>>>>>> 2. What's your plan in term of release to include SDF ? We have
>>>>>>>> several
>>>>>>>>>>>> IOs in preparation and I wonder if it's worth to start to use the
>>>>>> new
>>>>>>>>>>>> SDF API or not.
>>>>>>>>>>>> 
>>>>>>>>>>>> 3. What's the impact for the runner writers ? The runners will
>>>>> have
>>>>>>>> to
>>>>>>>>>>>> support SDF, that could be tricky depending of the execution
>>>>> engine.
>>>>>>>> In
>>>>>>>>>>>> the worst case where the runner can't fully support SDF, does it
>>>>>> mean
>>>>>>>>>>>> that most of our IOs will be useless ?
>>>>>>>>>>>> 
>>>>>>>>>>>> Just my dumb topics ;)
>>>>>>>>>>>> 
>>>>>>>>>>>> Thanks,
>>>>>>>>>>>> See you at 8am !
>>>>>>>>>>>> 
>>>>>>>>>>>> Regards
>>>>>>>>>>>> JB
>>>>>>>>>>>> 
>>>>>>>>>>>> On 08/19/2016 02:29 AM, Eugene Kirpichov wrote:
>>>>>>>>>>>> 
>>>>>>>>>>>>> Hello everybody,
>>>>>>>>>>>>> 
>>>>>>>>>>>>> Just a reminder:
>>>>>>>>>>>>> 
>>>>>>>>>>>>> The meeting is happening tomorrow - Friday Aug 19th, 8am-9am PST,
>>>>>> to
>>>>>>>>>>>>> join
>>>>>>>>>>>>> the call go to
>>>>>>>>>>>>> https://hangouts.google.com/hangouts/_/google.com/splittabledofn
>>>>> .
>>>>>>>>>>>>> I intend to go over the proposed design and then have a free-form
>>>>>>>>>>>>> discussion.
>>>>>>>>>>>>> 
>>>>>>>>>>>>> Please have a skim through the proposal doc:
>>>>> https://s.apache.org/
>>>>>>>>>>>>> splittable-do-fn
>>>>>>>>>>>>> I also made some slides that are basically a trimmed-down version
>>>>>> of
>>>>>>>>> the
>>>>>>>>>>>>> doc to use as a guide when conducting the meeting,
>>>>>>>>>>>>> 
>>>>>>>>>>>>> https://docs.google.com/presentation/d/1DfCp9VRLH-5Ap5nbz047
>>>>>>>>>>>> Fiv3NfjrywJVSdef5dRY4aE/edit?usp=sharing
>>>>>>>>>>>> 
>>>>>>>>>>>>> .
>>>>>>>>>>>>> 
>>>>>>>>>>>>> I will post notes from the meeting on this thread afterwards.
>>>>>>>>>>>>> 
>>>>>>>>>>>>> Thanks, looking forward.
>>>>>>>>>>>>> 
>>>>>>>>>>>>> On Fri, Aug 12, 2016 at 5:35 PM Dan Halperin
>>>>>>>>>>>>> <[email protected]
>>>>>>>>>>>>> 
>>>>>>>>>>>>> wrote:
>>>>>>>>>>>>> 
>>>>>>>>>>>>> This is pretty cool! I'll be there too. (unless the hangout gets
>>>>>> too
>>>>>>>>>>>>>> 
>>>>>>>>>>>>> full
>>>>>>>>>>>> 
>>>>>>>>>>>>> -- if so, I'll drop out in favor of others who aren't lucky
>>>>> enough
>>>>>>>> to
>>>>>>>>>>>>>> 
>>>>>>>>>>>>> get
>>>>>>>>>>>> 
>>>>>>>>>>>>> to talk to Eugene all the time.)
>>>>>>>>>>>>>> 
>>>>>>>>>>>>>> On Fri, Aug 12, 2016 at 4:03 PM, Andrew Psaltis <
>>>>>>>>>>>>>> 
>>>>>>>>>>>>> [email protected]>
>>>>>>>>>>>> 
>>>>>>>>>>>>> wrote:
>>>>>>>>>>>>>> 
>>>>>>>>>>>>>> +1 I'll join
>>>>>>>>>>>>>>> 
>>>>>>>>>>>>>>> On Friday, August 12, 2016, Aparup Banerjee (apbanerj) <
>>>>>>>>>>>>>>> 
>>>>>>>>>>>>>> [email protected]
>>>>>>>>>>>>>> 
>>>>>>>>>>>>>>> 
>>>>>>>>>>>>>>>> wrote:
>>>>>>>>>>>>>>> 
>>>>>>>>>>>>>>> + 1, me2
>>>>>>>>>>>>>>>> 
>>>>>>>>>>>>>>>> 
>>>>>>>>>>>>>>>> 
>>>>>>>>>>>>>>>> 
>>>>>>>>>>>>>>>> On 8/12/16, 9:27 AM, "Amit Sela" <[email protected]
>>>>>>>>>>>>>>>> 
>>>>>>>>>>>>>>> <javascript:;>>
>>>>>>>>>>>> 
>>>>>>>>>>>>> wrote:
>>>>>>>>>>>>>>>> 
>>>>>>>>>>>>>>>> +1 as in I'll join ;-)
>>>>>>>>>>>>>>>>> 
>>>>>>>>>>>>>>>>> On Fri, Aug 12, 2016, 19:14 Eugene Kirpichov
>>>>>>>>>>>>>>>>> 
>>>>>>>>>>>>>>>> <[email protected]
>>>>>>>>>>>>>>> 
>>>>>>>>>>>>>>>> 
>>>>>>>>>>>>>>>>> wrote:
>>>>>>>>>>>>>>>>> 
>>>>>>>>>>>>>>>>> Sounds good, thanks!
>>>>>>>>>>>>>>>>>> Then Friday Aug 19th it is, 8am-9am PST,
>>>>>>>>>>>>>>>>>> https://staging.talkgadget.google.com/hangouts/_/google.
>>>>>>>>>>>>>>>>>> 
>>>>>>>>>>>>>>>>> com/splittabledofn
>>>>>>>>>>>>>>>> 
>>>>>>>>>>>>>>>>> 
>>>>>>>>>>>>>>>>>> On Thu, Aug 11, 2016 at 11:12 PM Jean-Baptiste Onofré <
>>>>>>>>>>>>>>>>>> 
>>>>>>>>>>>>>>>>> [email protected]
>>>>>>>>>>>>>>> 
>>>>>>>>>>>>>>>> <javascript:;>>
>>>>>>>>>>>>>>>> 
>>>>>>>>>>>>>>>>> wrote:
>>>>>>>>>>>>>>>>>> 
>>>>>>>>>>>>>>>>>> Hi
>>>>>>>>>>>>>>>>>>> 
>>>>>>>>>>>>>>>>>>> Unfortunately I will be in Ireland on August 15th. What
>>>>> about
>>>>>>>>>>>>>>>>>>> 
>>>>>>>>>>>>>>>>>> Friday
>>>>>>>>>>>>>> 
>>>>>>>>>>>>>>> 19th ?
>>>>>>>>>>>>>>>>>> 
>>>>>>>>>>>>>>>>>>> 
>>>>>>>>>>>>>>>>>>> Regards
>>>>>>>>>>>>>>>>>>> JB
>>>>>>>>>>>>>>>>>>> 
>>>>>>>>>>>>>>>>>>> 
>>>>>>>>>>>>>>>>>>> 
>>>>>>>>>>>>>>>>>>> On Aug 11, 2016, 23:22, at 23:22, Eugene Kirpichov
>>>>>>>>>>>>>>>>>>> <[email protected]> wrote:
>>>>>>>>>>>>>>>>>>> 
>>>>>>>>>>>>>>>>>>>> Hi JB,
>>>>>>>>>>>>>>>>>>>> 
>>>>>>>>>>>>>>>>>>>> Sounds great, does the suggested time over videoconference
>>>>>>>> work
>>>>>>>>>>>>>>>>>>>> 
>>>>>>>>>>>>>>>>>>> for
>>>>>>>>>>>>>> 
>>>>>>>>>>>>>>> you?
>>>>>>>>>>>>>>>>>>>> 
>>>>>>>>>>>>>>>>>>>> On Thu, Aug 11, 2016 at 11:59 AM Jean-Baptiste Onofré <
>>>>>>>>>>>>>>>>>>>> 
>>>>>>>>>>>>>>>>>>> [email protected] <javascript:;>>
>>>>>>>>>>>>>>>> 
>>>>>>>>>>>>>>>>> wrote:
>>>>>>>>>>>>>>>>>>>> 
>>>>>>>>>>>>>>>>>>>> Hi Eugene
>>>>>>>>>>>>>>>>>>>>> 
>>>>>>>>>>>>>>>>>>>>> May we talk together next week ? I like the proposal. I
>>>>>>>> would
>>>>>>>>>>>>>>>>>>>>> 
>>>>>>>>>>>>>>>>>>>> just
>>>>>>>>>>>>>>> 
>>>>>>>>>>>>>>>> need
>>>>>>>>>>>>>>>>>>>> 
>>>>>>>>>>>>>>>>>>>>> some details for my understanding.
>>>>>>>>>>>>>>>>>>>>> 
>>>>>>>>>>>>>>>>>>>>> Thanks
>>>>>>>>>>>>>>>>>>>>> Regards
>>>>>>>>>>>>>>>>>>>>> JB
>>>>>>>>>>>>>>>>>>>>> 
>>>>>>>>>>>>>>>>>>>>> 
>>>>>>>>>>>>>>>>>>>>> 
>>>>>>>>>>>>>>>>>>>>> On Aug 11, 2016, 19:46, at 19:46, Eugene Kirpichov
>>>>>>>>>>>>>>>>>>>>> <[email protected]> wrote:
>>>>>>>>>>>>>>>>>>>>> 
>>>>>>>>>>>>>>>>>>>>>> Hi JB,
>>>>>>>>>>>>>>>>>>>>>> 
>>>>>>>>>>>>>>>>>>>>>> What are your thoughts on this?
>>>>>>>>>>>>>>>>>>>>>> 
>>>>>>>>>>>>>>>>>>>>>> I'm also thinking of having a virtual meeting to explain
>>>>>>>> more
>>>>>>>>>>>>>>>>>>>>>> 
>>>>>>>>>>>>>>>>>>>>> about
>>>>>>>>>>>>>>>> 
>>>>>>>>>>>>>>>>> this
>>>>>>>>>>>>>>>>>>>>>> proposal if necessary, since I understand it is a lot to
>>>>>>>>>>>>>>>>>>>>>> 
>>>>>>>>>>>>>>>>>>>>> digest.
>>>>>>>>>>>>>> 
>>>>>>>>>>>>>>> 
>>>>>>>>>>>>>>>>>>>>>> How about: Monday Aug 15, 8am-9am Pacific time, over
>>>>>>>>> Hangouts?
>>>>>>>>>>>>>>>>>>>>>> (link:
>>>>>>>>>>>>>>>>>>>>>> 
>>>>>>>>>>>>>>>>>>>>>> 
>>>>>>>>>>>>>>>>>>>>> 
>>>>>>>>>>>>>>>>>>>> 
>>>>>>>>>>>>>>>>>>> https://staging.talkgadget.google.com/hangouts/_/google.
>>>>>>>>>>>>>>>>>> 
>>>>>>>>>>>>>>>>> com/splittabledofn
>>>>>>>>>>>>>>>> 
>>>>>>>>>>>>>>>>> -
>>>>>>>>>>>>>>>>>>>>>> I confirmed that it can be joined without being logged
>>>>>>>> into a
>>>>>>>>>>>>>>>>>>>>>> 
>>>>>>>>>>>>>>>>>>>>> Google
>>>>>>>>>>>>>>>> 
>>>>>>>>>>>>>>>>> account)
>>>>>>>>>>>>>>>>>>>>>> 
>>>>>>>>>>>>>>>>>>>>>> Who'd be interested in attending, and does this
>>>>> time/date
>>>>>>>>> work
>>>>>>>>>>>>>>>>>>>>>> 
>>>>>>>>>>>>>>>>>>>>> for
>>>>>>>>>>>>>>> 
>>>>>>>>>>>>>>>> people?
>>>>>>>>>>>>>>>>>>>>>> 
>>>>>>>>>>>>>>>>>>>>>> On Fri, Aug 5, 2016 at 10:48 AM Eugene Kirpichov
>>>>>>>>>>>>>>>>>>>>>> 
>>>>>>>>>>>>>>>>>>>>> <[email protected] <javascript:;>>
>>>>>>>>>>>>>>>>>>>> 
>>>>>>>>>>>>>>>>>>>>> wrote:
>>>>>>>>>>>>>>>>>>>>>> 
>>>>>>>>>>>>>>>>>>>>>> Hi JB, thanks for reading and for your comments!
>>>>>>>>>>>>>>>>>>>>>>> 
>>>>>>>>>>>>>>>>>>>>>>> It sounds like you are concerned about continued
>>>>> support
>>>>>>>> for
>>>>>>>>>>>>>>>>>>>>>>> 
>>>>>>>>>>>>>>>>>>>>>> existing
>>>>>>>>>>>>>>>>>>>> 
>>>>>>>>>>>>>>>>>>>>> IO's
>>>>>>>>>>>>>>>>>>>>>> 
>>>>>>>>>>>>>>>>>>>>>>> people have developed, and about backward
>>>>> compatibility?
>>>>>>>>>>>>>>>>>>>>>>> 
>>>>>>>>>>>>>>>>>>>>>>> We do not need to remove the Source API, and all
>>>>> existing
>>>>>>>>>>>>>>>>>>>>>>> 
>>>>>>>>>>>>>>>>>>>>>> Source-based
>>>>>>>>>>>>>>>>>>>>>> 
>>>>>>>>>>>>>>>>>>>>>>> connectors will continue to work [though the document
>>>>>>>>>>>>>>>>>>>>>>> 
>>>>>>>>>>>>>>>>>>>>>> proposes
>>>>>>>>>>>>>> 
>>>>>>>>>>>>>>> at
>>>>>>>>>>>>>>>> 
>>>>>>>>>>>>>>>>> some
>>>>>>>>>>>>>>>>>>>>>> 
>>>>>>>>>>>>>>>>>>>>>>> point to make Read.from(Source) to translate to a
>>>>> wrapper
>>>>>>>>>>>>>>>>>>>>>>> 
>>>>>>>>>>>>>>>>>>>>>> SDF
>>>>>>>>>>>>>> 
>>>>>>>>>>>>>>> under
>>>>>>>>>>>>>>>>>>>> 
>>>>>>>>>>>>>>>>>>>>> the
>>>>>>>>>>>>>>>>>>>>>> 
>>>>>>>>>>>>>>>>>>>>>>> hood, to exercise the feature more and to make sure
>>>>> that
>>>>>>>> it
>>>>>>>>>>>>>>>>>>>>>>> 
>>>>>>>>>>>>>>>>>>>>>> is
>>>>>>>>>>>>>> 
>>>>>>>>>>>>>>> strictly
>>>>>>>>>>>>>>>>>>>>>> 
>>>>>>>>>>>>>>>>>>>>>>> more powerful - but this is an optional implementation
>>>>>>>>>>>>>>>>>>>>>>> 
>>>>>>>>>>>>>>>>>>>>>> detail].
>>>>>>>>>>>>>>> 
>>>>>>>>>>>>>>>> 
>>>>>>>>>>>>>>>>>>>>>>> Perhaps the document phrases this too strongly -
>>>>>>>> "replacing
>>>>>>>>>>>>>>>>>>>>>>> 
>>>>>>>>>>>>>>>>>>>>>> the
>>>>>>>>>>>>>>> 
>>>>>>>>>>>>>>>> Source
>>>>>>>>>>>>>>>>>>>>>> 
>>>>>>>>>>>>>>>>>>>>>>> API": a better phrasing would be "introducing a new API
>>>>>> so
>>>>>>>>>>>>>>>>>>>>>>> 
>>>>>>>>>>>>>>>>>>>>>> powerful
>>>>>>>>>>>>>>>>>>>> 
>>>>>>>>>>>>>>>>>>>>> and
>>>>>>>>>>>>>>>>>>>>>> 
>>>>>>>>>>>>>>>>>>>>>>> easy-to-use that hopefully people will choose it over
>>>>> the
>>>>>>>>>>>>>>>>>>>>>>> 
>>>>>>>>>>>>>>>>>>>>>> Source
>>>>>>>>>>>>>>> 
>>>>>>>>>>>>>>>> API
>>>>>>>>>>>>>>>>>>>> 
>>>>>>>>>>>>>>>>>>>>> all
>>>>>>>>>>>>>>>>>>>>>> 
>>>>>>>>>>>>>>>>>>>>>>> the time, even though they don't have to" :) And we can
>>>>>>>>>>>>>>>>>>>>>>> 
>>>>>>>>>>>>>>>>>>>>>> discuss
>>>>>>>>>>>>>>> 
>>>>>>>>>>>>>>>> whether or
>>>>>>>>>>>>>>>>>>>>>> 
>>>>>>>>>>>>>>>>>>>>>>> not to actually deprecate/remove the Source API at some
>>>>>>>>>>>>>>>>>>>>>>> 
>>>>>>>>>>>>>>>>>>>>>> point
>>>>>>>>>>>>>> 
>>>>>>>>>>>>>>> down
>>>>>>>>>>>>>>>> 
>>>>>>>>>>>>>>>>> the
>>>>>>>>>>>>>>>>>>>>>> 
>>>>>>>>>>>>>>>>>>>>>>> road, once it becomes clear whether this is the case or
>>>>>>>> not.
>>>>>>>>>>>>>>>>>>>>>>> 
>>>>>>>>>>>>>>>>>>>>>>> To give more context: this proposal came out of
>>>>>>>> discussions
>>>>>>>>>>>>>>>>>>>>>>> 
>>>>>>>>>>>>>>>>>>>>>> within
>>>>>>>>>>>>>>>> 
>>>>>>>>>>>>>>>>> the SDK
>>>>>>>>>>>>>>>>>>>>>> 
>>>>>>>>>>>>>>>>>>>>>>> team over the past ~1.5 years, before the Beam project
>>>>>>>>>>>>>>>>>>>>>>> 
>>>>>>>>>>>>>>>>>>>>>> existed,
>>>>>>>>>>>>>>> 
>>>>>>>>>>>>>>>> on
>>>>>>>>>>>>>>>> 
>>>>>>>>>>>>>>>>> how to
>>>>>>>>>>>>>>>>>>>>>> 
>>>>>>>>>>>>>>>>>>>>>>> make major improvements to the Source API; perhaps it
>>>>>> will
>>>>>>>>>>>>>>>>>>>>>>> 
>>>>>>>>>>>>>>>>>>>>>> clarify
>>>>>>>>>>>>>>>> 
>>>>>>>>>>>>>>>>> things
>>>>>>>>>>>>>>>>>>>>>> 
>>>>>>>>>>>>>>>>>>>>>>> if I give a history of the ideas discussed:
>>>>>>>>>>>>>>>>>>>>>>> - The first idea was to introduce a
>>>>>>>>>>>>>>>>>>>>>>> 
>>>>>>>>>>>>>>>>>>>>>> Read.from(PCollection<Source>)
>>>>>>>>>>>>>>>> 
>>>>>>>>>>>>>>>>> transform while keeping the Source API intact - this, given
>>>>>>>>>>>>>>>>>>>>>>> 
>>>>>>>>>>>>>>>>>>>>>> appropriate
>>>>>>>>>>>>>>>>>>>>>> 
>>>>>>>>>>>>>>>>>>>>>>> implementation, would solve most of the scalability and
>>>>>>>>>>>>>>>>>>>>>>> 
>>>>>>>>>>>>>>>>>>>>>> composability
>>>>>>>>>>>>>>>>>>>> 
>>>>>>>>>>>>>>>>>>>>> issues of IO's. Then most connectors would look like :
>>>>>>>>>>>>>>>>>>>>>>> 
>>>>>>>>>>>>>>>>>>>>>> ParDo<A,
>>>>>>>>>>>>>>> 
>>>>>>>>>>>>>>>> Source<B>>
>>>>>>>>>>>>>>>>>>>>>> 
>>>>>>>>>>>>>>>>>>>>>>> + Read.from().
>>>>>>>>>>>>>>>>>>>>>>> - Then we figured that the Source class is an
>>>>> unnecessary
>>>>>>>>>>>>>>>>>>>>>>> 
>>>>>>>>>>>>>>>>>>>>>> abstraction, as
>>>>>>>>>>>>>>>>>>>>>> 
>>>>>>>>>>>>>>>>>>>>>>> it simply holds data. What if we only had a Reader<S,
>>>>> B>
>>>>>>>>>>>>>>>>>>>>>>> 
>>>>>>>>>>>>>>>>>>>>>> class
>>>>>>>>>>>>>> 
>>>>>>>>>>>>>>> where
>>>>>>>>>>>>>>>>>>>> 
>>>>>>>>>>>>>>>>>>>>> S is
>>>>>>>>>>>>>>>>>>>>>> 
>>>>>>>>>>>>>>>>>>>>>>> the source type and B the output type? Then connectors
>>>>>>>> would
>>>>>>>>>>>>>>>>>>>>>>> 
>>>>>>>>>>>>>>>>>>>>>> be
>>>>>>>>>>>>>>> 
>>>>>>>>>>>>>>>> something
>>>>>>>>>>>>>>>>>>>>>> 
>>>>>>>>>>>>>>>>>>>>>>> like: ParDo<A, S> + hypothetical Read.using(Reader<S,
>>>>>> B>).
>>>>>>>>>>>>>>>>>>>>>>> - Then somebody remarked that some of the features of
>>>>>>>> Source
>>>>>>>>>>>>>>>>>>>>>>> 
>>>>>>>>>>>>>>>>>>>>>> are
>>>>>>>>>>>>>>> 
>>>>>>>>>>>>>>>> useful to
>>>>>>>>>>>>>>>>>>>>>> 
>>>>>>>>>>>>>>>>>>>>>>> ParDo's as well: e.g. ability to report progress when
>>>>>>>>>>>>>>>>>>>>>>> 
>>>>>>>>>>>>>>>>>>>>>> processing a
>>>>>>>>>>>>>>>> 
>>>>>>>>>>>>>>>>> very
>>>>>>>>>>>>>>>>>>>>>> 
>>>>>>>>>>>>>>>>>>>>>>> heavy element, or ability to produce very large output
>>>>> in
>>>>>>>>>>>>>>>>>>>>>>> 
>>>>>>>>>>>>>>>>>>>>>> parallel.
>>>>>>>>>>>>>>>>>>>> 
>>>>>>>>>>>>>>>>>>>>> - The two previous bullets were already hinting that the
>>>>>>>>>>>>>>>>>>>>>>> 
>>>>>>>>>>>>>>>>>>>>>> Read.using()
>>>>>>>>>>>>>>>>>>>> 
>>>>>>>>>>>>>>>>>>>>> primitive might not be so special: it just takes S and
>>>>>>>>>>>>>>>>>>>>>>> 
>>>>>>>>>>>>>>>>>>>>>> produces
>>>>>>>>>>>>>>> 
>>>>>>>>>>>>>>>> B:
>>>>>>>>>>>>>>>> 
>>>>>>>>>>>>>>>>> isn't
>>>>>>>>>>>>>>>>>>>>>> 
>>>>>>>>>>>>>>>>>>>>>>> that what a ParDo does, plus some source magic, minus
>>>>> the
>>>>>>>>>>>>>>>>>>>>>>> 
>>>>>>>>>>>>>>>>>>>>>> convenience
>>>>>>>>>>>>>>>>>>>> 
>>>>>>>>>>>>>>>>>>>>> of
>>>>>>>>>>>>>>>>>>>>>> 
>>>>>>>>>>>>>>>>>>>>>>> c.output() vs. the start/advance() state machine?
>>>>>>>>>>>>>>>>>>>>>>> - At this point it became clear that we should explore
>>>>>>>>>>>>>>>>>>>>>>> 
>>>>>>>>>>>>>>>>>>>>>> unifying
>>>>>>>>>>>>>>> 
>>>>>>>>>>>>>>>> sources
>>>>>>>>>>>>>>>>>>>>>> 
>>>>>>>>>>>>>>>>>>>>>>> and ParDo's, in particular: can we bring the magic of
>>>>>>>>>>>>>>>>>>>>>>> 
>>>>>>>>>>>>>>>>>>>>>> sources
>>>>>>>>>>>>>> 
>>>>>>>>>>>>>>> to
>>>>>>>>>>>>>>> 
>>>>>>>>>>>>>>>> ParDo's
>>>>>>>>>>>>>>>>>>>>>> 
>>>>>>>>>>>>>>>>>>>>>>> but without the limitations and coding inconveniences?
>>>>>> And
>>>>>>>>>>>>>>>>>>>>>>> 
>>>>>>>>>>>>>>>>>>>>>> this
>>>>>>>>>>>>>>> 
>>>>>>>>>>>>>>>> is
>>>>>>>>>>>>>>>> 
>>>>>>>>>>>>>>>>> how
>>>>>>>>>>>>>>>>>>>>>> 
>>>>>>>>>>>>>>>>>>>>>>> SplittableDoFn was born: bringing source magic to a
>>>>> DoFn
>>>>>>>> by
>>>>>>>>>>>>>>>>>>>>>>> 
>>>>>>>>>>>>>>>>>>>>>> providing
>>>>>>>>>>>>>>>>>>>> 
>>>>>>>>>>>>>>>>>>>>> a
>>>>>>>>>>>>>>>>>>>>>> 
>>>>>>>>>>>>>>>>>>>>>>> RangeTracker.
>>>>>>>>>>>>>>>>>>>>>>> - Once the idea of "splittable DoFn's" was born, it
>>>>>> became
>>>>>>>>>>>>>>>>>>>>>>> 
>>>>>>>>>>>>>>>>>>>>>> clear
>>>>>>>>>>>>>>> 
>>>>>>>>>>>>>>>> that
>>>>>>>>>>>>>>>>>>>> 
>>>>>>>>>>>>>>>>>>>>> it
>>>>>>>>>>>>>>>>>>>>>> 
>>>>>>>>>>>>>>>>>>>>>>> is strictly more general than sources; at least, in the
>>>>>>>>>>>>>>>>>>>>>>> 
>>>>>>>>>>>>>>>>>>>>>> respect
>>>>>>>>>>>>>>> 
>>>>>>>>>>>>>>>> that
>>>>>>>>>>>>>>>>>>>> 
>>>>>>>>>>>>>>>>>>>>> sources have to produce output, while DoFn's don't: an
>>>>> SDF
>>>>>>>>>>>>>>>>>>>>>>> 
>>>>>>>>>>>>>>>>>>>>>> may
>>>>>>>>>>>>>> 
>>>>>>>>>>>>>>> very
>>>>>>>>>>>>>>>>>>>> 
>>>>>>>>>>>>>>>>>>>>> well
>>>>>>>>>>>>>>>>>>>>>> 
>>>>>>>>>>>>>>>>>>>>>>> produce no output at all, and simply perform a side
>>>>>> effect
>>>>>>>>>>>>>>>>>>>>>>> 
>>>>>>>>>>>>>>>>>>>>>> in
>>>>>>>>>>>>>> 
>>>>>>>>>>>>>>> a
>>>>>>>>>>>>>>> 
>>>>>>>>>>>>>>>> parallel/resumable way.
>>>>>>>>>>>>>>>>>>>>>>> - Then there were countless hours of discussions on
>>>>>>>> unifying
>>>>>>>>>>>>>>>>>>>>>>> 
>>>>>>>>>>>>>>>>>>>>>> the
>>>>>>>>>>>>>>> 
>>>>>>>>>>>>>>>> bounded/unbounded cases, on the particulars of RangeTracker
>>>>>>>>>>>>>>>>>>>>>>> 
>>>>>>>>>>>>>>>>>>>>>> APIs
>>>>>>>>>>>>>>> 
>>>>>>>>>>>>>>>> reconciling parallelization and checkpointing, what the
>>>>>>>>>>>>>>>>>>>>>>> 
>>>>>>>>>>>>>>>>>>>>>> relation
>>>>>>>>>>>>>>> 
>>>>>>>>>>>>>>>> between
>>>>>>>>>>>>>>>>>>>>>> 
>>>>>>>>>>>>>>>>>>>>>>> SDF and DF should be, etc. They culminated in the
>>>>> current
>>>>>>>>>>>>>>>>>>>>>>> 
>>>>>>>>>>>>>>>>>>>>>> proposal.
>>>>>>>>>>>>>>>>>>>> 
>>>>>>>>>>>>>>>>>>>>> The
>>>>>>>>>>>>>>>>>>>>>> 
>>>>>>>>>>>>>>>>>>>>>>> proposal comes at a time when a couple of key
>>>>> ingredients
>>>>>>>>>>>>>>>>>>>>>>> 
>>>>>>>>>>>>>>>>>>>>>> are
>>>>>>>>>>>>>> 
>>>>>>>>>>>>>>> (almost)
>>>>>>>>>>>>>>>>>>>>>> 
>>>>>>>>>>>>>>>>>>>>>>> ready: NewDoFn to make SDF look like a regular DoFn,
>>>>> and
>>>>>>>> the
>>>>>>>>>>>>>>>>>>>>>>> 
>>>>>>>>>>>>>>>>>>>>>> State/Timers
>>>>>>>>>>>>>>>>>>>>>> 
>>>>>>>>>>>>>>>>>>>>>>> proposal to enable unbounded work per element.
>>>>>>>>>>>>>>>>>>>>>>> 
>>>>>>>>>>>>>>>>>>>>>>> To put it shortly:
>>>>>>>>>>>>>>>>>>>>>>> - Yes, we will support existing Source connectors, and
>>>>>>>> will
>>>>>>>>>>>>>>>>>>>>>>> 
>>>>>>>>>>>>>>>>>>>>>> support
>>>>>>>>>>>>>>>>>>>> 
>>>>>>>>>>>>>>>>>>>>> writing new ones, possibly forever. There is no
>>>>>> interference
>>>>>>>>>>>>>>>>>>>>>>> 
>>>>>>>>>>>>>>>>>>>>>> with
>>>>>>>>>>>>>>>> 
>>>>>>>>>>>>>>>>> current
>>>>>>>>>>>>>>>>>>>>>> 
>>>>>>>>>>>>>>>>>>>>>>> users of Source.
>>>>>>>>>>>>>>>>>>>>>>> - The new API is an attempt to improve the Source API,
>>>>>>>> taken
>>>>>>>>>>>>>>>>>>>>>>> 
>>>>>>>>>>>>>>>>>>>>>> to
>>>>>>>>>>>>>>> 
>>>>>>>>>>>>>>>> its
>>>>>>>>>>>>>>>>>>>> 
>>>>>>>>>>>>>>>>>>>>> logical limit where it turns out that users' goals can be
>>>>>>>>>>>>>>>>>>>>>>> 
>>>>>>>>>>>>>>>>>>>>>> accomplished
>>>>>>>>>>>>>>>>>>>>>> 
>>>>>>>>>>>>>>>>>>>>>>> easier and more generically entirely within ParDo's.
>>>>>>>>>>>>>>>>>>>>>>> 
>>>>>>>>>>>>>>>>>>>>>>> Let me know what you think, and thanks again!
>>>>>>>>>>>>>>>>>>>>>>> 
>>>>>>>>>>>>>>>>>>>>>>> On Fri, Aug 5, 2016 at 2:39 AM Jean-Baptiste Onofré
>>>>>>>>>>>>>>>>>>>>>>> 
>>>>>>>>>>>>>>>>>>>>>> <[email protected] <javascript:;>>
>>>>>>>>>>>>>>>>>>>> 
>>>>>>>>>>>>>>>>>>>>> wrote:
>>>>>>>>>>>>>>>>>>>>>>> 
>>>>>>>>>>>>>>>>>>>>>>> Hi Eugene,
>>>>>>>>>>>>>>>>>>>>>>>> 
>>>>>>>>>>>>>>>>>>>>>>>> Just a question: why is it in DoFn and note an
>>>>>>>> improvement
>>>>>>>>>>>>>>>>>>>>>>>> 
>>>>>>>>>>>>>>>>>>>>>>> of
>>>>>>>>>>>>>> 
>>>>>>>>>>>>>>> Source
>>>>>>>>>>>>>>>>>>>> 
>>>>>>>>>>>>>>>>>>>>> ?
>>>>>>>>>>>>>>>>>>>>>> 
>>>>>>>>>>>>>>>>>>>>>>> 
>>>>>>>>>>>>>>>>>>>>>>>> If I understand correctly, it means that we will have
>>>>> to
>>>>>>>>>>>>>>>>>>>>>>>> 
>>>>>>>>>>>>>>>>>>>>>>> refactore
>>>>>>>>>>>>>>>>>>>> 
>>>>>>>>>>>>>>>>>>>>> all
>>>>>>>>>>>>>>>>>>>>>> 
>>>>>>>>>>>>>>>>>>>>>>> existing IO: basically, what you propose is to remove
>>>>> all
>>>>>>>>>>>>>>>>>>>>>>>> 
>>>>>>>>>>>>>>>>>>>>>>> Source
>>>>>>>>>>>>>>>> 
>>>>>>>>>>>>>>>>> to
>>>>>>>>>>>>>>>>>>>> 
>>>>>>>>>>>>>>>>>>>>> replace with NewDoFn.
>>>>>>>>>>>>>>>>>>>>>>>> 
>>>>>>>>>>>>>>>>>>>>>>>> I'm concern with this approach, especially in term of
>>>>>>>>>>>>>>>>>>>>>>>> 
>>>>>>>>>>>>>>>>>>>>>>> timing:
>>>>>>>>>>>>>> 
>>>>>>>>>>>>>>> clearly,
>>>>>>>>>>>>>>>>>>>>>> 
>>>>>>>>>>>>>>>>>>>>>>> the IO is the area where we have to move forward in
>>>>> Beam
>>>>>>>> as
>>>>>>>>>>>>>>>>>>>>>>>> 
>>>>>>>>>>>>>>>>>>>>>>> it
>>>>>>>>>>>>>>> 
>>>>>>>>>>>>>>>> will
>>>>>>>>>>>>>>>>>>>> 
>>>>>>>>>>>>>>>>>>>>> allow new users to start in their projects.
>>>>>>>>>>>>>>>>>>>>>>>> So, we started to bring new IOs: Kafka, JMS,
>>>>> Cassandra,
>>>>>>>>>>>>>>>>>>>>>>>> 
>>>>>>>>>>>>>>>>>>>>>>> MongoDB,
>>>>>>>>>>>>>>>> 
>>>>>>>>>>>>>>>>> JDBC,
>>>>>>>>>>>>>>>>>>>>>> 
>>>>>>>>>>>>>>>>>>>>>>> ... and some people started to learn the IO API
>>>>>>>>>>>>>>>>>>>>>>>> 
>>>>>>>>>>>>>>>>>>>>>>> (Bounded/Unbouded
>>>>>>>>>>>>>>>> 
>>>>>>>>>>>>>>>>> source, etc).
>>>>>>>>>>>>>>>>>>>>>>>> 
>>>>>>>>>>>>>>>>>>>>>>>> I think it would make more sense to enhance the IO API
>>>>>>>>>>>>>>>>>>>>>>>> 
>>>>>>>>>>>>>>>>>>>>>>> (Source)
>>>>>>>>>>>>>>> 
>>>>>>>>>>>>>>>> instead
>>>>>>>>>>>>>>>>>>>>>> 
>>>>>>>>>>>>>>>>>>>>>>> of introducing a NewDoFn.
>>>>>>>>>>>>>>>>>>>>>>>> 
>>>>>>>>>>>>>>>>>>>>>>>> What are your thoughts for IO writer like me ? ;)
>>>>>>>>>>>>>>>>>>>>>>>> 
>>>>>>>>>>>>>>>>>>>>>>>> Regards
>>>>>>>>>>>>>>>>>>>>>>>> JB
>>>>>>>>>>>>>>>>>>>>>>>> 
>>>>>>>>>>>>>>>>>>>>>>>> On 08/04/2016 07:45 PM, Eugene Kirpichov wrote:
>>>>>>>>>>>>>>>>>>>>>>>> 
>>>>>>>>>>>>>>>>>>>>>>>>> Hello Beam community,
>>>>>>>>>>>>>>>>>>>>>>>>> 
>>>>>>>>>>>>>>>>>>>>>>>>> We (myself, Daniel Mills and Robert Bradshaw) would
>>>>>> like
>>>>>>>>>>>>>>>>>>>>>>>>> 
>>>>>>>>>>>>>>>>>>>>>>>> to
>>>>>>>>>>>>>> 
>>>>>>>>>>>>>>> propose
>>>>>>>>>>>>>>>>>>>>>> 
>>>>>>>>>>>>>>>>>>>>>>> "Splittable DoFn" - a major generalization of DoFn,
>>>>> which
>>>>>>>>>>>>>>>>>>>>>>>>> 
>>>>>>>>>>>>>>>>>>>>>>>> allows
>>>>>>>>>>>>>>>>>>>> 
>>>>>>>>>>>>>>>>>>>>> processing
>>>>>>>>>>>>>>>>>>>>>>>> 
>>>>>>>>>>>>>>>>>>>>>>>>> of a single element to be non-monolithic, i.e.
>>>>>>>>>>>>>>>>>>>>>>>>> 
>>>>>>>>>>>>>>>>>>>>>>>> checkpointable
>>>>>>>>>>>>>>> 
>>>>>>>>>>>>>>>> and
>>>>>>>>>>>>>>>>>>>> 
>>>>>>>>>>>>>>>>>>>>> parallelizable, as well as doing an unbounded amount of
>>>>>>>>>>>>>>>>>>>>>>>>> 
>>>>>>>>>>>>>>>>>>>>>>>> work
>>>>>>>>>>>>>>> 
>>>>>>>>>>>>>>>> per
>>>>>>>>>>>>>>>>>>>> 
>>>>>>>>>>>>>>>>>>>>> element.
>>>>>>>>>>>>>>>>>>>>>>>> 
>>>>>>>>>>>>>>>>>>>>>>>>> 
>>>>>>>>>>>>>>>>>>>>>>>>> This allows effectively replacing the current
>>>>>>>>>>>>>>>>>>>>>>>>> 
>>>>>>>>>>>>>>>>>>>>>>>> Bounded/UnboundedSource
>>>>>>>>>>>>>>>>>>>>>> 
>>>>>>>>>>>>>>>>>>>>>>> APIs
>>>>>>>>>>>>>>>>>>>>>>>> 
>>>>>>>>>>>>>>>>>>>>>>>>> with DoFn's that are much easier to code, more
>>>>> scalable
>>>>>>>>>>>>>>>>>>>>>>>>> 
>>>>>>>>>>>>>>>>>>>>>>>> and
>>>>>>>>>>>>>> 
>>>>>>>>>>>>>>> composable
>>>>>>>>>>>>>>>>>>>>>> 
>>>>>>>>>>>>>>>>>>>>>>> with
>>>>>>>>>>>>>>>>>>>>>>>> 
>>>>>>>>>>>>>>>>>>>>>>>>> the rest of the Beam programming model, and enables
>>>>>> many
>>>>>>>>>>>>>>>>>>>>>>>>> 
>>>>>>>>>>>>>>>>>>>>>>>> use
>>>>>>>>>>>>>>> 
>>>>>>>>>>>>>>>> cases
>>>>>>>>>>>>>>>>>>>> 
>>>>>>>>>>>>>>>>>>>>> that
>>>>>>>>>>>>>>>>>>>>>> 
>>>>>>>>>>>>>>>>>>>>>>> were previously difficult or impossible, as well as
>>>>> some
>>>>>>>>>>>>>>>>>>>>>>>>> 
>>>>>>>>>>>>>>>>>>>>>>>> non-obvious new
>>>>>>>>>>>>>>>>>>>>>> 
>>>>>>>>>>>>>>>>>>>>>>> use cases.
>>>>>>>>>>>>>>>>>>>>>>>>> 
>>>>>>>>>>>>>>>>>>>>>>>>> This proposal has been mentioned before in JIRA
>>>>>>>> [BEAM-65]
>>>>>>>>>>>>>>>>>>>>>>>>> 
>>>>>>>>>>>>>>>>>>>>>>>> and
>>>>>>>>>>>>>>> 
>>>>>>>>>>>>>>>> some
>>>>>>>>>>>>>>>>>>>> 
>>>>>>>>>>>>>>>>>>>>> Beam
>>>>>>>>>>>>>>>>>>>>>> 
>>>>>>>>>>>>>>>>>>>>>>> meetings, and now the whole thing is written up in a
>>>>>>>>>>>>>>>>>>>>>>>>> 
>>>>>>>>>>>>>>>>>>>>>>>> document:
>>>>>>>>>>>>>>>> 
>>>>>>>>>>>>>>>>> 
>>>>>>>>>>>>>>>>>>>>>>>>>       https://s.apache.org/splittable-do-fn
>>>>>>>>>>>>>>>>>>>>>>>>> 
>>>>>>>>>>>>>>>>>>>>>>>>> Here are some things that become possible with
>>>>>>>> Splittable
>>>>>>>>>>>>>>>>>>>>>>>>> 
>>>>>>>>>>>>>>>>>>>>>>>> DoFn:
>>>>>>>>>>>>>>>> 
>>>>>>>>>>>>>>>>> - Efficiently read a filepattern matching millions of
>>>>>>>>>>>>>>>>>>>>>>>>> 
>>>>>>>>>>>>>>>>>>>>>>>> files
>>>>>>>>>>>>>> 
>>>>>>>>>>>>>>> - Read a collection of files that are produced by an
>>>>>>>>>>>>>>>>>>>>>>>>> 
>>>>>>>>>>>>>>>>>>>>>>>> earlier
>>>>>>>>>>>>>>> 
>>>>>>>>>>>>>>>> step
>>>>>>>>>>>>>>>>>>>> 
>>>>>>>>>>>>>>>>>>>>> in the
>>>>>>>>>>>>>>>>>>>>>> 
>>>>>>>>>>>>>>>>>>>>>>> pipeline (e.g. easily implement a connector to a
>>>>> storage
>>>>>>>>>>>>>>>>>>>>>>>>> 
>>>>>>>>>>>>>>>>>>>>>>>> system
>>>>>>>>>>>>>>>> 
>>>>>>>>>>>>>>>>> that can
>>>>>>>>>>>>>>>>>>>>>> 
>>>>>>>>>>>>>>>>>>>>>>> export itself to files)
>>>>>>>>>>>>>>>>>>>>>>>>> - Implement a Kafka reader by composing a "list
>>>>>>>>>>>>>>>>>>>>>>>>> 
>>>>>>>>>>>>>>>>>>>>>>>> partitions"
>>>>>>>>>>>>>> 
>>>>>>>>>>>>>>> DoFn
>>>>>>>>>>>>>>>>>>>> 
>>>>>>>>>>>>>>>>>>>>> with a
>>>>>>>>>>>>>>>>>>>>>> 
>>>>>>>>>>>>>>>>>>>>>>> DoFn that simply polls a consumer and outputs new
>>>>> records
>>>>>>>>>>>>>>>>>>>>>>>>> 
>>>>>>>>>>>>>>>>>>>>>>>> in
>>>>>>>>>>>>>>> 
>>>>>>>>>>>>>>>> a
>>>>>>>>>>>>>>>> 
>>>>>>>>>>>>>>>>> while()
>>>>>>>>>>>>>>>>>>>>>> 
>>>>>>>>>>>>>>>>>>>>>>> loop
>>>>>>>>>>>>>>>>>>>>>>>> 
>>>>>>>>>>>>>>>>>>>>>>>>> - Implement a log tailer by composing a DoFn that
>>>>>>>>>>>>>>>>>>>>>>>>> 
>>>>>>>>>>>>>>>>>>>>>>>> incrementally
>>>>>>>>>>>>>>>> 
>>>>>>>>>>>>>>>>> returns
>>>>>>>>>>>>>>>>>>>>>> 
>>>>>>>>>>>>>>>>>>>>>>> new
>>>>>>>>>>>>>>>>>>>>>>>> 
>>>>>>>>>>>>>>>>>>>>>>>>> files in a directory and a DoFn that tails a file
>>>>>>>>>>>>>>>>>>>>>>>>> - Implement a parallel "count friends in common"
>>>>>>>>>>>>>>>>>>>>>>>>> 
>>>>>>>>>>>>>>>>>>>>>>>> algorithm
>>>>>>>>>>>>>> 
>>>>>>>>>>>>>>> (matrix
>>>>>>>>>>>>>>>>>>>> 
>>>>>>>>>>>>>>>>>>>>> squaring) with good work balancing
>>>>>>>>>>>>>>>>>>>>>>>>> 
>>>>>>>>>>>>>>>>>>>>>>>>> Here is the meaningful part of a hypothetical Kafka
>>>>>>>>>>>>>>>>>>>>>>>>> 
>>>>>>>>>>>>>>>>>>>>>>>> reader
>>>>>>>>>>>>>> 
>>>>>>>>>>>>>>> written
>>>>>>>>>>>>>>>>>>>> 
>>>>>>>>>>>>>>>>>>>>> against
>>>>>>>>>>>>>>>>>>>>>>>> 
>>>>>>>>>>>>>>>>>>>>>>>>> this API:
>>>>>>>>>>>>>>>>>>>>>>>>> 
>>>>>>>>>>>>>>>>>>>>>>>>>   ProcessContinuation processElement(
>>>>>>>>>>>>>>>>>>>>>>>>>           ProcessContext context,
>>>>> OffsetRangeTracker
>>>>>>>>>>>>>>>>>>>>>>>>> 
>>>>>>>>>>>>>>>>>>>>>>>> tracker)
>>>>>>>>>>>>>>>> 
>>>>>>>>>>>>>>>>> {
>>>>>>>>>>>>>>>>>>>> 
>>>>>>>>>>>>>>>>>>>>>     try (KafkaConsumer<String, String> consumer =
>>>>>>>>>>>>>>>>>>>>>>>>> 
>>>>>> Kafka.subscribe(context.element().topic,
>>>>>>>>>>>>>>>>>>>>>>>>> 
>>>>>>>>>>>>>>>>>>>>>>>>> context.element().partition)) {
>>>>>>>>>>>>>>>> 
>>>>>>>>>>>>>>>>>       consumer.seek(tracker.start());
>>>>>>>>>>>>>>>>>>>>>>>>>       while (true) {
>>>>>>>>>>>>>>>>>>>>>>>>>         ConsumerRecords<String, String> records =
>>>>>>>>>>>>>>>>>>>>>>>>> 
>>>>>>>>>>>>>>>>>>>>>>>> consumer.poll(100ms);
>>>>>>>>>>>>>>>>>>>>>>>> 
>>>>>>>>>>>>>>>>>>>>>>>>>         if (records == null) return done();
>>>>>>>>>>>>>>>>>>>>>>>>>         for (ConsumerRecord<String, String> record
>>>>> :
>>>>>>>>>>>>>>>>>>>>>>>>> 
>>>>>>>>>>>>>>>>>>>>>>>> records)
>>>>>>>>>>>>>>>> 
>>>>>>>>>>>>>>>>> {
>>>>>>>>>>>>>>>>>>>> 
>>>>>>>>>>>>>>>>>>>>>           if (!tracker.tryClaim(record.offset())) {
>>>>>>>>>>>>>>>>>>>>>>>>>             return
>>>>>>>>>>>>>>>>>>>>>>>>> 
>>>>>>>>>>>>>>>>>>>>>>>> 
>>>>> resume().withFutureOutputWatermark(record.timestamp());
>>>>>>>>>>>>>>>>>>>>>>>> 
>>>>>>>>>>>>>>>>>>>>>>>>>           }
>>>>>>>>>>>>>>>>>>>>>>>>>           context.output(record);
>>>>>>>>>>>>>>>>>>>>>>>>>         }
>>>>>>>>>>>>>>>>>>>>>>>>>       }
>>>>>>>>>>>>>>>>>>>>>>>>>     }
>>>>>>>>>>>>>>>>>>>>>>>>>   }
>>>>>>>>>>>>>>>>>>>>>>>>> 
>>>>>>>>>>>>>>>>>>>>>>>>> The document describes in detail the motivations
>>>>> behind
>>>>>>>>>>>>>>>>>>>>>>>>> 
>>>>>>>>>>>>>>>>>>>>>>>> this
>>>>>>>>>>>>>>> 
>>>>>>>>>>>>>>>> feature,
>>>>>>>>>>>>>>>>>>>>>> 
>>>>>>>>>>>>>>>>>>>>>>> the
>>>>>>>>>>>>>>>>>>>>>>>> 
>>>>>>>>>>>>>>>>>>>>>>>>> basic idea and API, open questions, and outlines an
>>>>>>>>>>>>>>>>>>>>>>>>> 
>>>>>>>>>>>>>>>>>>>>>>>> incremental
>>>>>>>>>>>>>>>> 
>>>>>>>>>>>>>>>>> delivery
>>>>>>>>>>>>>>>>>>>>>> 
>>>>>>>>>>>>>>>>>>>>>>> plan.
>>>>>>>>>>>>>>>>>>>>>>>>> 
>>>>>>>>>>>>>>>>>>>>>>>>> The proposed API builds on the reflection-based new
>>>>>> DoFn
>>>>>>>>>>>>>>>>>>>>>>>>> 
>>>>>>>>>>>>>>>>>>>>>>>> [new-do-fn]
>>>>>>>>>>>>>>>>>>>>>> 
>>>>>>>>>>>>>>>>>>>>>>> and is
>>>>>>>>>>>>>>>>>>>>>>>> 
>>>>>>>>>>>>>>>>>>>>>>>>> loosely related to "State and Timers for DoFn"
>>>>>>>>>>>>>>>>>>>>>>>>> 
>>>>>>>>>>>>>>>>>>>>>>>> [beam-state].
>>>>>>>>>>>>>>> 
>>>>>>>>>>>>>>>> 
>>>>>>>>>>>>>>>>>>>>>>>>> Please take a look and comment!
>>>>>>>>>>>>>>>>>>>>>>>>> 
>>>>>>>>>>>>>>>>>>>>>>>>> Thanks.
>>>>>>>>>>>>>>>>>>>>>>>>> 
>>>>>>>>>>>>>>>>>>>>>>>>> [BEAM-65]
>>>>>> https://issues.apache.org/jira/browse/BEAM-65
>>>>>>>>>>>>>>>>>>>>>>>>> [new-do-fn] https://s.apache.org/a-new-do-fn
>>>>>>>>>>>>>>>>>>>>>>>>> [beam-state] https://s.apache.org/beam-state
>>>>>>>>>>>>>>>>>>>>>>>>> 
>>>>>>>>>>>>>>>>>>>>>>>>> 
>>>>>>>>>>>>>>>>>>>>>>>> --
>>>>>>>>>>>>>>>>>>>>>>>> Jean-Baptiste Onofré
>>>>>>>>>>>>>>>>>>>>>>>> [email protected] <javascript:;>
>>>>>>>>>>>>>>>>>>>>>>>> http://blog.nanthrax.net
>>>>>>>>>>>>>>>>>>>>>>>> Talend - http://www.talend.com
>>>>>>>>>>>>>>>>>>>>>>>> 
>>>>>>>>>>>>>>>>>>>>>>>> 
>>>>>>>>>>>>>>>>>>>>>>> 
>>>>>>>>>>>>>>>>>>>>> 
>>>>>>>>>>>>>>>>>>> 
>>>>>>>>>>>>>>>>>> 
>>>>>>>>>>>>>>>> 
>>>>>>>>>>>>>>> 
>>>>>>>>>>>>>>> --
>>>>>>>>>>>>>>> Thanks,
>>>>>>>>>>>>>>> Andrew
>>>>>>>>>>>>>>> 
>>>>>>>>>>>>>>> Subscribe to my book: Streaming Data <
>>>>> http://manning.com/psaltis
>>>>>>> 
>>>>>>>>>>>>>>> <https://www.linkedin.com/pub/andrew-psaltis/1/17b/306>
>>>>>>>>>>>>>>> twiiter: @itmdata <
>>>>>>>>> http://twitter.com/intent/user?screen_name=itmdata
>>>>>>>>>>>>>>>> 
>>>>>>>>>>>>>>> 
>>>>>>>>>>>>>>> 
>>>>>>>>>>>>>> 
>>>>>>>>>>>>> 
>>>>>>>>>>>> --
>>>>>>>>>>>> Jean-Baptiste Onofré
>>>>>>>>>>>> [email protected]
>>>>>>>>>>>> http://blog.nanthrax.net
>>>>>>>>>>>> Talend - http://www.talend.com
>>>>>>>>>>>> 
>>>>>>>>>>>> 
>>>>>>>>>>> 
>>>>>>>>>> --
>>>>>>>>>> Jean-Baptiste Onofré
>>>>>>>>>> [email protected]
>>>>>>>>>> http://blog.nanthrax.net
>>>>>>>>>> Talend - http://www.talend.com
>>>>>>>>>> 
>>>>>>>>> 
>>>>>>>> 
>>>>>>> 
>>>>>> 
>>>>>> --
>>>>>> Jean-Baptiste Onofré
>>>>>> [email protected]
>>>>>> http://blog.nanthrax.net
>>>>>> Talend - http://www.talend.com
>>>>>> 
>>>>> 
>>>> 
>>> 
>>> --
>>> Jean-Baptiste Onofré
>>> [email protected]
>>> http://blog.nanthrax.net
>>> Talend - http://www.talend.com
>>> 
>>> 
>> 
>> 
> 
> -- 
> Jean-Baptiste Onofré
> [email protected]
> http://blog.nanthrax.net
> Talend - http://www.talend.com
> 
> 

Reply via email to