Hello,

Thanks for the notes both Dan and Eugene, and for taking the time to do the
presentation and  answer our questions.

I mentioned the ongoing work on dynamic scaling on Flink because I suppose
that it will address dynamic rebalancing eventually (there are multiple
changes going on for dynamic scaling).

https://docs.google.com/document/d/1G1OS1z3xEBOrYD4wSu-LuBCyPUWyFd9l3T9WyssQ63w/edit#heading=h.2suhu1fjxmp4

https://lists.apache.org/[email protected]:lte=1M:FLIP-8

Anyway I am far from an expert on flink, but probably the flink guys can
give their opinion about this and refer to a more precise document that the
ones I mentioned..

​Thanks again,
Ismaël​

On Fri, Aug 19, 2016 at 8:52 PM, Jean-Baptiste Onofré <[email protected]>
wrote:

> Great summary Eugene and Dan.
>
> And thanks again for the details, explanation, and discussion.
>
> Regards
> JB
>
>
> On 08/19/2016 08:16 PM, Eugene Kirpichov wrote:
>
>> Thanks for attending, everybody!
>>
>> Here are meeting notes (thanks Dan!).
>>
>> Q: Will SplittableDoFn enable better repartitioning of the input/output
>> data?
>> A: Not really; repartitioning is orthogonal to SDF.
>>
>> Current Source API suffers from lack of composition and scalability
>> because
>> we treat sources too much as metadata, not enough as data.
>>
>> Q(slide with transform expansion): who does the "magic"?
>> A: The runner. Checkpointing and dynamically splitting restrictions will
>> require collaboration with the runner.
>>
>> Q: How does the runner interact with the DoFn to control the restrictions?
>> Is it related to the centralized job tracker etc.?
>> A: RestrictionTracker is a simple helper object, that exists purely on the
>> worker while executing a single partition, and interacts with the worker
>> harness part of the runner. Not to be confused with the centralized job
>> tracker (master) - completely unrelated. Worker harness, of course,
>> interacts with the master in some relevant ways (e.g. Dataflow master can
>> tell "you're a straggler, you should split").
>>
>> Q: Is this a new DoFn subclass, or how will this integrate with the
>> existing code?
>> A: It's a feature of reflection-based DoFn (https://s.apache.org/a-new-do
>> fn)
>> - just another optional parameter of type RestrictionTracker to
>> processElement() which is dynamically bound via reflection, so fully
>> backward/forward compatible, and looks to users like a regular DoFn.
>>
>> Q: why is fractionClaimed a double?
>> A: It's a number in 0..1. Job-wide magic (e.g. autoscaling, dynamic
>> rebalancing) requires a uniform way to represent progress through
>> different
>> sources.
>>
>> Q: Spark runner is microbatch-based, so this seems to map well onto
>> checkpoint/resume, right?
>> A: Yes; actually the Dataflow runner is, at a worker level, also
>> microbatch-based. The way SDF interacts with a runner will be very similar
>> to how a Bounded/UnboundedSource interacts with a runner.
>>
>> Q: Using SDF, what would be the "packaging" of the IO?
>> A: Same as currently: package IO's as PTransforms and their implementation
>> under the hood can be anything: Source, simple ParDo's, SDF, etc. E.g.
>> Datastore was recently refactored from BoundedSource to ParDo (ended up
>> simpler and more scalable), transparently to users.
>>
>> Q: What's the timeline; what to do with the IOs currently in development?
>> A: Timeline is O(months). Keep doing what you're doing and working on top
>> of Source APIs when necessary and simple ParDo's otherwise.
>>
>> Q: What's the impact for the runner writers?
>> A: Tentatively expected that most of the code for running an SDF will be
>> common to runners, with some amount of per-runner glue code, just like
>> GBK/windowing/triggering. Impact on Dataflow runner is larger since it
>> supports dynamic rebalancing in batch mode and this is the hardest part,
>> but for other runners shouldn't be too hard.
>>
>> JB: Talend has people who can help with this: e.g. help integrate into
>> Spark runner, refactor IOs etc. Amit also willing to chat about supporting
>> SDF in Spark runner.
>>
>> Ismael: There's a Flink proposal about dynamic rebalancing. Ismael will
>> send a link.
>>
>>
>> On Fri, Aug 19, 2016 at 7:21 AM Jean-Baptiste Onofré <[email protected]>
>> wrote:
>>
>> Hi Eugene,
>>>
>>> thanks for the reminder.
>>>
>>> Just to prepare some topics for the call, please find some points:
>>>
>>> 1. Using SDF, what would be the "packaging" of the IO ? It sounds to me
>>> that we can keep the IO packaging style (using with* setters for the IO
>>> configuration) and replace PTransform, Source, Reader, ... directly with
>>> SDF. Correct ?
>>>
>>> 2. What's your plan in term of release to include SDF ? We have several
>>> IOs in preparation and I wonder if it's worth to start to use the new
>>> SDF API or not.
>>>
>>> 3. What's the impact for the runner writers ? The runners will have to
>>> support SDF, that could be tricky depending of the execution engine. In
>>> the worst case where the runner can't fully support SDF, does it mean
>>> that most of our IOs will be useless ?
>>>
>>> Just my dumb topics ;)
>>>
>>> Thanks,
>>> See you at 8am !
>>>
>>> Regards
>>> JB
>>>
>>> On 08/19/2016 02:29 AM, Eugene Kirpichov wrote:
>>>
>>>> Hello everybody,
>>>>
>>>> Just a reminder:
>>>>
>>>> The meeting is happening tomorrow - Friday Aug 19th, 8am-9am PST, to
>>>> join
>>>> the call go to
>>>> https://hangouts.google.com/hangouts/_/google.com/splittabledofn .
>>>> I intend to go over the proposed design and then have a free-form
>>>> discussion.
>>>>
>>>> Please have a skim through the proposal doc: https://s.apache.org/
>>>> splittable-do-fn
>>>> I also made some slides that are basically a trimmed-down version of the
>>>> doc to use as a guide when conducting the meeting,
>>>>
>>>> https://docs.google.com/presentation/d/1DfCp9VRLH-5Ap5nbz047
>>> Fiv3NfjrywJVSdef5dRY4aE/edit?usp=sharing
>>>
>>>> .
>>>>
>>>> I will post notes from the meeting on this thread afterwards.
>>>>
>>>> Thanks, looking forward.
>>>>
>>>> On Fri, Aug 12, 2016 at 5:35 PM Dan Halperin
>>>> <[email protected]
>>>>
>>>> wrote:
>>>>
>>>> This is pretty cool! I'll be there too. (unless the hangout gets too
>>>>>
>>>> full
>>>
>>>> -- if so, I'll drop out in favor of others who aren't lucky enough to
>>>>>
>>>> get
>>>
>>>> to talk to Eugene all the time.)
>>>>>
>>>>> On Fri, Aug 12, 2016 at 4:03 PM, Andrew Psaltis <
>>>>>
>>>> [email protected]>
>>>
>>>> wrote:
>>>>>
>>>>> +1 I'll join
>>>>>>
>>>>>> On Friday, August 12, 2016, Aparup Banerjee (apbanerj) <
>>>>>>
>>>>> [email protected]
>>>>>
>>>>>>
>>>>>>> wrote:
>>>>>>
>>>>>> + 1, me2
>>>>>>>
>>>>>>>
>>>>>>>
>>>>>>>
>>>>>>> On 8/12/16, 9:27 AM, "Amit Sela" <[email protected]
>>>>>>>
>>>>>> <javascript:;>>
>>>
>>>> wrote:
>>>>>>>
>>>>>>> +1 as in I'll join ;-)
>>>>>>>>
>>>>>>>> On Fri, Aug 12, 2016, 19:14 Eugene Kirpichov
>>>>>>>>
>>>>>>> <[email protected]
>>>>>>
>>>>>>>
>>>>>>>> wrote:
>>>>>>>>
>>>>>>>> Sounds good, thanks!
>>>>>>>>> Then Friday Aug 19th it is, 8am-9am PST,
>>>>>>>>> https://staging.talkgadget.google.com/hangouts/_/google.
>>>>>>>>>
>>>>>>>> com/splittabledofn
>>>>>>>
>>>>>>>>
>>>>>>>>> On Thu, Aug 11, 2016 at 11:12 PM Jean-Baptiste Onofré <
>>>>>>>>>
>>>>>>>> [email protected]
>>>>>>
>>>>>>> <javascript:;>>
>>>>>>>
>>>>>>>> wrote:
>>>>>>>>>
>>>>>>>>> Hi
>>>>>>>>>>
>>>>>>>>>> Unfortunately I will be in Ireland on August 15th. What about
>>>>>>>>>>
>>>>>>>>> Friday
>>>>>
>>>>>> 19th ?
>>>>>>>>>
>>>>>>>>>>
>>>>>>>>>> Regards
>>>>>>>>>> JB
>>>>>>>>>>
>>>>>>>>>>
>>>>>>>>>>
>>>>>>>>>> On Aug 11, 2016, 23:22, at 23:22, Eugene Kirpichov
>>>>>>>>>> <[email protected]> wrote:
>>>>>>>>>>
>>>>>>>>>>> Hi JB,
>>>>>>>>>>>
>>>>>>>>>>> Sounds great, does the suggested time over videoconference work
>>>>>>>>>>>
>>>>>>>>>> for
>>>>>
>>>>>> you?
>>>>>>>>>>>
>>>>>>>>>>> On Thu, Aug 11, 2016 at 11:59 AM Jean-Baptiste Onofré <
>>>>>>>>>>>
>>>>>>>>>> [email protected] <javascript:;>>
>>>>>>>
>>>>>>>> wrote:
>>>>>>>>>>>
>>>>>>>>>>> Hi Eugene
>>>>>>>>>>>>
>>>>>>>>>>>> May we talk together next week ? I like the proposal. I would
>>>>>>>>>>>>
>>>>>>>>>>> just
>>>>>>
>>>>>>> need
>>>>>>>>>>>
>>>>>>>>>>>> some details for my understanding.
>>>>>>>>>>>>
>>>>>>>>>>>> Thanks
>>>>>>>>>>>> Regards
>>>>>>>>>>>> JB
>>>>>>>>>>>>
>>>>>>>>>>>>
>>>>>>>>>>>>
>>>>>>>>>>>> On Aug 11, 2016, 19:46, at 19:46, Eugene Kirpichov
>>>>>>>>>>>> <[email protected]> wrote:
>>>>>>>>>>>>
>>>>>>>>>>>>> Hi JB,
>>>>>>>>>>>>>
>>>>>>>>>>>>> What are your thoughts on this?
>>>>>>>>>>>>>
>>>>>>>>>>>>> I'm also thinking of having a virtual meeting to explain more
>>>>>>>>>>>>>
>>>>>>>>>>>> about
>>>>>>>
>>>>>>>> this
>>>>>>>>>>>>> proposal if necessary, since I understand it is a lot to
>>>>>>>>>>>>>
>>>>>>>>>>>> digest.
>>>>>
>>>>>>
>>>>>>>>>>>>> How about: Monday Aug 15, 8am-9am Pacific time, over Hangouts?
>>>>>>>>>>>>> (link:
>>>>>>>>>>>>>
>>>>>>>>>>>>>
>>>>>>>>>>>>
>>>>>>>>>>>
>>>>>>>>>> https://staging.talkgadget.google.com/hangouts/_/google.
>>>>>>>>>
>>>>>>>> com/splittabledofn
>>>>>>>
>>>>>>>> -
>>>>>>>>>>>>> I confirmed that it can be joined without being logged into a
>>>>>>>>>>>>>
>>>>>>>>>>>> Google
>>>>>>>
>>>>>>>> account)
>>>>>>>>>>>>>
>>>>>>>>>>>>> Who'd be interested in attending, and does this time/date work
>>>>>>>>>>>>>
>>>>>>>>>>>> for
>>>>>>
>>>>>>> people?
>>>>>>>>>>>>>
>>>>>>>>>>>>> On Fri, Aug 5, 2016 at 10:48 AM Eugene Kirpichov
>>>>>>>>>>>>>
>>>>>>>>>>>> <[email protected] <javascript:;>>
>>>>>>>>>>>
>>>>>>>>>>>> wrote:
>>>>>>>>>>>>>
>>>>>>>>>>>>> Hi JB, thanks for reading and for your comments!
>>>>>>>>>>>>>>
>>>>>>>>>>>>>> It sounds like you are concerned about continued support for
>>>>>>>>>>>>>>
>>>>>>>>>>>>> existing
>>>>>>>>>>>
>>>>>>>>>>>> IO's
>>>>>>>>>>>>>
>>>>>>>>>>>>>> people have developed, and about backward compatibility?
>>>>>>>>>>>>>>
>>>>>>>>>>>>>> We do not need to remove the Source API, and all existing
>>>>>>>>>>>>>>
>>>>>>>>>>>>> Source-based
>>>>>>>>>>>>>
>>>>>>>>>>>>>> connectors will continue to work [though the document
>>>>>>>>>>>>>>
>>>>>>>>>>>>> proposes
>>>>>
>>>>>> at
>>>>>>>
>>>>>>>> some
>>>>>>>>>>>>>
>>>>>>>>>>>>>> point to make Read.from(Source) to translate to a wrapper
>>>>>>>>>>>>>>
>>>>>>>>>>>>> SDF
>>>>>
>>>>>> under
>>>>>>>>>>>
>>>>>>>>>>>> the
>>>>>>>>>>>>>
>>>>>>>>>>>>>> hood, to exercise the feature more and to make sure that it
>>>>>>>>>>>>>>
>>>>>>>>>>>>> is
>>>>>
>>>>>> strictly
>>>>>>>>>>>>>
>>>>>>>>>>>>>> more powerful - but this is an optional implementation
>>>>>>>>>>>>>>
>>>>>>>>>>>>> detail].
>>>>>>
>>>>>>>
>>>>>>>>>>>>>> Perhaps the document phrases this too strongly - "replacing
>>>>>>>>>>>>>>
>>>>>>>>>>>>> the
>>>>>>
>>>>>>> Source
>>>>>>>>>>>>>
>>>>>>>>>>>>>> API": a better phrasing would be "introducing a new API so
>>>>>>>>>>>>>>
>>>>>>>>>>>>> powerful
>>>>>>>>>>>
>>>>>>>>>>>> and
>>>>>>>>>>>>>
>>>>>>>>>>>>>> easy-to-use that hopefully people will choose it over the
>>>>>>>>>>>>>>
>>>>>>>>>>>>> Source
>>>>>>
>>>>>>> API
>>>>>>>>>>>
>>>>>>>>>>>> all
>>>>>>>>>>>>>
>>>>>>>>>>>>>> the time, even though they don't have to" :) And we can
>>>>>>>>>>>>>>
>>>>>>>>>>>>> discuss
>>>>>>
>>>>>>> whether or
>>>>>>>>>>>>>
>>>>>>>>>>>>>> not to actually deprecate/remove the Source API at some
>>>>>>>>>>>>>>
>>>>>>>>>>>>> point
>>>>>
>>>>>> down
>>>>>>>
>>>>>>>> the
>>>>>>>>>>>>>
>>>>>>>>>>>>>> road, once it becomes clear whether this is the case or not.
>>>>>>>>>>>>>>
>>>>>>>>>>>>>> To give more context: this proposal came out of discussions
>>>>>>>>>>>>>>
>>>>>>>>>>>>> within
>>>>>>>
>>>>>>>> the SDK
>>>>>>>>>>>>>
>>>>>>>>>>>>>> team over the past ~1.5 years, before the Beam project
>>>>>>>>>>>>>>
>>>>>>>>>>>>> existed,
>>>>>>
>>>>>>> on
>>>>>>>
>>>>>>>> how to
>>>>>>>>>>>>>
>>>>>>>>>>>>>> make major improvements to the Source API; perhaps it will
>>>>>>>>>>>>>>
>>>>>>>>>>>>> clarify
>>>>>>>
>>>>>>>> things
>>>>>>>>>>>>>
>>>>>>>>>>>>>> if I give a history of the ideas discussed:
>>>>>>>>>>>>>> - The first idea was to introduce a
>>>>>>>>>>>>>>
>>>>>>>>>>>>> Read.from(PCollection<Source>)
>>>>>>>
>>>>>>>> transform while keeping the Source API intact - this, given
>>>>>>>>>>>>>>
>>>>>>>>>>>>> appropriate
>>>>>>>>>>>>>
>>>>>>>>>>>>>> implementation, would solve most of the scalability and
>>>>>>>>>>>>>>
>>>>>>>>>>>>> composability
>>>>>>>>>>>
>>>>>>>>>>>> issues of IO's. Then most connectors would look like :
>>>>>>>>>>>>>>
>>>>>>>>>>>>> ParDo<A,
>>>>>>
>>>>>>> Source<B>>
>>>>>>>>>>>>>
>>>>>>>>>>>>>> + Read.from().
>>>>>>>>>>>>>> - Then we figured that the Source class is an unnecessary
>>>>>>>>>>>>>>
>>>>>>>>>>>>> abstraction, as
>>>>>>>>>>>>>
>>>>>>>>>>>>>> it simply holds data. What if we only had a Reader<S, B>
>>>>>>>>>>>>>>
>>>>>>>>>>>>> class
>>>>>
>>>>>> where
>>>>>>>>>>>
>>>>>>>>>>>> S is
>>>>>>>>>>>>>
>>>>>>>>>>>>>> the source type and B the output type? Then connectors would
>>>>>>>>>>>>>>
>>>>>>>>>>>>> be
>>>>>>
>>>>>>> something
>>>>>>>>>>>>>
>>>>>>>>>>>>>> like: ParDo<A, S> + hypothetical Read.using(Reader<S, B>).
>>>>>>>>>>>>>> - Then somebody remarked that some of the features of Source
>>>>>>>>>>>>>>
>>>>>>>>>>>>> are
>>>>>>
>>>>>>> useful to
>>>>>>>>>>>>>
>>>>>>>>>>>>>> ParDo's as well: e.g. ability to report progress when
>>>>>>>>>>>>>>
>>>>>>>>>>>>> processing a
>>>>>>>
>>>>>>>> very
>>>>>>>>>>>>>
>>>>>>>>>>>>>> heavy element, or ability to produce very large output in
>>>>>>>>>>>>>>
>>>>>>>>>>>>> parallel.
>>>>>>>>>>>
>>>>>>>>>>>> - The two previous bullets were already hinting that the
>>>>>>>>>>>>>>
>>>>>>>>>>>>> Read.using()
>>>>>>>>>>>
>>>>>>>>>>>> primitive might not be so special: it just takes S and
>>>>>>>>>>>>>>
>>>>>>>>>>>>> produces
>>>>>>
>>>>>>> B:
>>>>>>>
>>>>>>>> isn't
>>>>>>>>>>>>>
>>>>>>>>>>>>>> that what a ParDo does, plus some source magic, minus the
>>>>>>>>>>>>>>
>>>>>>>>>>>>> convenience
>>>>>>>>>>>
>>>>>>>>>>>> of
>>>>>>>>>>>>>
>>>>>>>>>>>>>> c.output() vs. the start/advance() state machine?
>>>>>>>>>>>>>> - At this point it became clear that we should explore
>>>>>>>>>>>>>>
>>>>>>>>>>>>> unifying
>>>>>>
>>>>>>> sources
>>>>>>>>>>>>>
>>>>>>>>>>>>>> and ParDo's, in particular: can we bring the magic of
>>>>>>>>>>>>>>
>>>>>>>>>>>>> sources
>>>>>
>>>>>> to
>>>>>>
>>>>>>> ParDo's
>>>>>>>>>>>>>
>>>>>>>>>>>>>> but without the limitations and coding inconveniences? And
>>>>>>>>>>>>>>
>>>>>>>>>>>>> this
>>>>>>
>>>>>>> is
>>>>>>>
>>>>>>>> how
>>>>>>>>>>>>>
>>>>>>>>>>>>>> SplittableDoFn was born: bringing source magic to a DoFn by
>>>>>>>>>>>>>>
>>>>>>>>>>>>> providing
>>>>>>>>>>>
>>>>>>>>>>>> a
>>>>>>>>>>>>>
>>>>>>>>>>>>>> RangeTracker.
>>>>>>>>>>>>>> - Once the idea of "splittable DoFn's" was born, it became
>>>>>>>>>>>>>>
>>>>>>>>>>>>> clear
>>>>>>
>>>>>>> that
>>>>>>>>>>>
>>>>>>>>>>>> it
>>>>>>>>>>>>>
>>>>>>>>>>>>>> is strictly more general than sources; at least, in the
>>>>>>>>>>>>>>
>>>>>>>>>>>>> respect
>>>>>>
>>>>>>> that
>>>>>>>>>>>
>>>>>>>>>>>> sources have to produce output, while DoFn's don't: an SDF
>>>>>>>>>>>>>>
>>>>>>>>>>>>> may
>>>>>
>>>>>> very
>>>>>>>>>>>
>>>>>>>>>>>> well
>>>>>>>>>>>>>
>>>>>>>>>>>>>> produce no output at all, and simply perform a side effect
>>>>>>>>>>>>>>
>>>>>>>>>>>>> in
>>>>>
>>>>>> a
>>>>>>
>>>>>>> parallel/resumable way.
>>>>>>>>>>>>>> - Then there were countless hours of discussions on unifying
>>>>>>>>>>>>>>
>>>>>>>>>>>>> the
>>>>>>
>>>>>>> bounded/unbounded cases, on the particulars of RangeTracker
>>>>>>>>>>>>>>
>>>>>>>>>>>>> APIs
>>>>>>
>>>>>>> reconciling parallelization and checkpointing, what the
>>>>>>>>>>>>>>
>>>>>>>>>>>>> relation
>>>>>>
>>>>>>> between
>>>>>>>>>>>>>
>>>>>>>>>>>>>> SDF and DF should be, etc. They culminated in the current
>>>>>>>>>>>>>>
>>>>>>>>>>>>> proposal.
>>>>>>>>>>>
>>>>>>>>>>>> The
>>>>>>>>>>>>>
>>>>>>>>>>>>>> proposal comes at a time when a couple of key ingredients
>>>>>>>>>>>>>>
>>>>>>>>>>>>> are
>>>>>
>>>>>> (almost)
>>>>>>>>>>>>>
>>>>>>>>>>>>>> ready: NewDoFn to make SDF look like a regular DoFn, and the
>>>>>>>>>>>>>>
>>>>>>>>>>>>> State/Timers
>>>>>>>>>>>>>
>>>>>>>>>>>>>> proposal to enable unbounded work per element.
>>>>>>>>>>>>>>
>>>>>>>>>>>>>> To put it shortly:
>>>>>>>>>>>>>> - Yes, we will support existing Source connectors, and will
>>>>>>>>>>>>>>
>>>>>>>>>>>>> support
>>>>>>>>>>>
>>>>>>>>>>>> writing new ones, possibly forever. There is no interference
>>>>>>>>>>>>>>
>>>>>>>>>>>>> with
>>>>>>>
>>>>>>>> current
>>>>>>>>>>>>>
>>>>>>>>>>>>>> users of Source.
>>>>>>>>>>>>>> - The new API is an attempt to improve the Source API, taken
>>>>>>>>>>>>>>
>>>>>>>>>>>>> to
>>>>>>
>>>>>>> its
>>>>>>>>>>>
>>>>>>>>>>>> logical limit where it turns out that users' goals can be
>>>>>>>>>>>>>>
>>>>>>>>>>>>> accomplished
>>>>>>>>>>>>>
>>>>>>>>>>>>>> easier and more generically entirely within ParDo's.
>>>>>>>>>>>>>>
>>>>>>>>>>>>>> Let me know what you think, and thanks again!
>>>>>>>>>>>>>>
>>>>>>>>>>>>>> On Fri, Aug 5, 2016 at 2:39 AM Jean-Baptiste Onofré
>>>>>>>>>>>>>>
>>>>>>>>>>>>> <[email protected] <javascript:;>>
>>>>>>>>>>>
>>>>>>>>>>>> wrote:
>>>>>>>>>>>>>>
>>>>>>>>>>>>>> Hi Eugene,
>>>>>>>>>>>>>>>
>>>>>>>>>>>>>>> Just a question: why is it in DoFn and note an improvement
>>>>>>>>>>>>>>>
>>>>>>>>>>>>>> of
>>>>>
>>>>>> Source
>>>>>>>>>>>
>>>>>>>>>>>> ?
>>>>>>>>>>>>>
>>>>>>>>>>>>>>
>>>>>>>>>>>>>>> If I understand correctly, it means that we will have to
>>>>>>>>>>>>>>>
>>>>>>>>>>>>>> refactore
>>>>>>>>>>>
>>>>>>>>>>>> all
>>>>>>>>>>>>>
>>>>>>>>>>>>>> existing IO: basically, what you propose is to remove all
>>>>>>>>>>>>>>>
>>>>>>>>>>>>>> Source
>>>>>>>
>>>>>>>> to
>>>>>>>>>>>
>>>>>>>>>>>> replace with NewDoFn.
>>>>>>>>>>>>>>>
>>>>>>>>>>>>>>> I'm concern with this approach, especially in term of
>>>>>>>>>>>>>>>
>>>>>>>>>>>>>> timing:
>>>>>
>>>>>> clearly,
>>>>>>>>>>>>>
>>>>>>>>>>>>>> the IO is the area where we have to move forward in Beam as
>>>>>>>>>>>>>>>
>>>>>>>>>>>>>> it
>>>>>>
>>>>>>> will
>>>>>>>>>>>
>>>>>>>>>>>> allow new users to start in their projects.
>>>>>>>>>>>>>>> So, we started to bring new IOs: Kafka, JMS, Cassandra,
>>>>>>>>>>>>>>>
>>>>>>>>>>>>>> MongoDB,
>>>>>>>
>>>>>>>> JDBC,
>>>>>>>>>>>>>
>>>>>>>>>>>>>> ... and some people started to learn the IO API
>>>>>>>>>>>>>>>
>>>>>>>>>>>>>> (Bounded/Unbouded
>>>>>>>
>>>>>>>> source, etc).
>>>>>>>>>>>>>>>
>>>>>>>>>>>>>>> I think it would make more sense to enhance the IO API
>>>>>>>>>>>>>>>
>>>>>>>>>>>>>> (Source)
>>>>>>
>>>>>>> instead
>>>>>>>>>>>>>
>>>>>>>>>>>>>> of introducing a NewDoFn.
>>>>>>>>>>>>>>>
>>>>>>>>>>>>>>> What are your thoughts for IO writer like me ? ;)
>>>>>>>>>>>>>>>
>>>>>>>>>>>>>>> Regards
>>>>>>>>>>>>>>> JB
>>>>>>>>>>>>>>>
>>>>>>>>>>>>>>> On 08/04/2016 07:45 PM, Eugene Kirpichov wrote:
>>>>>>>>>>>>>>>
>>>>>>>>>>>>>>>> Hello Beam community,
>>>>>>>>>>>>>>>>
>>>>>>>>>>>>>>>> We (myself, Daniel Mills and Robert Bradshaw) would like
>>>>>>>>>>>>>>>>
>>>>>>>>>>>>>>> to
>>>>>
>>>>>> propose
>>>>>>>>>>>>>
>>>>>>>>>>>>>> "Splittable DoFn" - a major generalization of DoFn, which
>>>>>>>>>>>>>>>>
>>>>>>>>>>>>>>> allows
>>>>>>>>>>>
>>>>>>>>>>>> processing
>>>>>>>>>>>>>>>
>>>>>>>>>>>>>>>> of a single element to be non-monolithic, i.e.
>>>>>>>>>>>>>>>>
>>>>>>>>>>>>>>> checkpointable
>>>>>>
>>>>>>> and
>>>>>>>>>>>
>>>>>>>>>>>> parallelizable, as well as doing an unbounded amount of
>>>>>>>>>>>>>>>>
>>>>>>>>>>>>>>> work
>>>>>>
>>>>>>> per
>>>>>>>>>>>
>>>>>>>>>>>> element.
>>>>>>>>>>>>>>>
>>>>>>>>>>>>>>>>
>>>>>>>>>>>>>>>> This allows effectively replacing the current
>>>>>>>>>>>>>>>>
>>>>>>>>>>>>>>> Bounded/UnboundedSource
>>>>>>>>>>>>>
>>>>>>>>>>>>>> APIs
>>>>>>>>>>>>>>>
>>>>>>>>>>>>>>>> with DoFn's that are much easier to code, more scalable
>>>>>>>>>>>>>>>>
>>>>>>>>>>>>>>> and
>>>>>
>>>>>> composable
>>>>>>>>>>>>>
>>>>>>>>>>>>>> with
>>>>>>>>>>>>>>>
>>>>>>>>>>>>>>>> the rest of the Beam programming model, and enables many
>>>>>>>>>>>>>>>>
>>>>>>>>>>>>>>> use
>>>>>>
>>>>>>> cases
>>>>>>>>>>>
>>>>>>>>>>>> that
>>>>>>>>>>>>>
>>>>>>>>>>>>>> were previously difficult or impossible, as well as some
>>>>>>>>>>>>>>>>
>>>>>>>>>>>>>>> non-obvious new
>>>>>>>>>>>>>
>>>>>>>>>>>>>> use cases.
>>>>>>>>>>>>>>>>
>>>>>>>>>>>>>>>> This proposal has been mentioned before in JIRA [BEAM-65]
>>>>>>>>>>>>>>>>
>>>>>>>>>>>>>>> and
>>>>>>
>>>>>>> some
>>>>>>>>>>>
>>>>>>>>>>>> Beam
>>>>>>>>>>>>>
>>>>>>>>>>>>>> meetings, and now the whole thing is written up in a
>>>>>>>>>>>>>>>>
>>>>>>>>>>>>>>> document:
>>>>>>>
>>>>>>>>
>>>>>>>>>>>>>>>>         https://s.apache.org/splittable-do-fn
>>>>>>>>>>>>>>>>
>>>>>>>>>>>>>>>> Here are some things that become possible with Splittable
>>>>>>>>>>>>>>>>
>>>>>>>>>>>>>>> DoFn:
>>>>>>>
>>>>>>>> - Efficiently read a filepattern matching millions of
>>>>>>>>>>>>>>>>
>>>>>>>>>>>>>>> files
>>>>>
>>>>>> - Read a collection of files that are produced by an
>>>>>>>>>>>>>>>>
>>>>>>>>>>>>>>> earlier
>>>>>>
>>>>>>> step
>>>>>>>>>>>
>>>>>>>>>>>> in the
>>>>>>>>>>>>>
>>>>>>>>>>>>>> pipeline (e.g. easily implement a connector to a storage
>>>>>>>>>>>>>>>>
>>>>>>>>>>>>>>> system
>>>>>>>
>>>>>>>> that can
>>>>>>>>>>>>>
>>>>>>>>>>>>>> export itself to files)
>>>>>>>>>>>>>>>> - Implement a Kafka reader by composing a "list
>>>>>>>>>>>>>>>>
>>>>>>>>>>>>>>> partitions"
>>>>>
>>>>>> DoFn
>>>>>>>>>>>
>>>>>>>>>>>> with a
>>>>>>>>>>>>>
>>>>>>>>>>>>>> DoFn that simply polls a consumer and outputs new records
>>>>>>>>>>>>>>>>
>>>>>>>>>>>>>>> in
>>>>>>
>>>>>>> a
>>>>>>>
>>>>>>>> while()
>>>>>>>>>>>>>
>>>>>>>>>>>>>> loop
>>>>>>>>>>>>>>>
>>>>>>>>>>>>>>>> - Implement a log tailer by composing a DoFn that
>>>>>>>>>>>>>>>>
>>>>>>>>>>>>>>> incrementally
>>>>>>>
>>>>>>>> returns
>>>>>>>>>>>>>
>>>>>>>>>>>>>> new
>>>>>>>>>>>>>>>
>>>>>>>>>>>>>>>> files in a directory and a DoFn that tails a file
>>>>>>>>>>>>>>>> - Implement a parallel "count friends in common"
>>>>>>>>>>>>>>>>
>>>>>>>>>>>>>>> algorithm
>>>>>
>>>>>> (matrix
>>>>>>>>>>>
>>>>>>>>>>>> squaring) with good work balancing
>>>>>>>>>>>>>>>>
>>>>>>>>>>>>>>>> Here is the meaningful part of a hypothetical Kafka
>>>>>>>>>>>>>>>>
>>>>>>>>>>>>>>> reader
>>>>>
>>>>>> written
>>>>>>>>>>>
>>>>>>>>>>>> against
>>>>>>>>>>>>>>>
>>>>>>>>>>>>>>>> this API:
>>>>>>>>>>>>>>>>
>>>>>>>>>>>>>>>>     ProcessContinuation processElement(
>>>>>>>>>>>>>>>>             ProcessContext context, OffsetRangeTracker
>>>>>>>>>>>>>>>>
>>>>>>>>>>>>>>> tracker)
>>>>>>>
>>>>>>>> {
>>>>>>>>>>>
>>>>>>>>>>>>       try (KafkaConsumer<String, String> consumer =
>>>>>>>>>>>>>>>>                 Kafka.subscribe(context.element().topic,
>>>>>>>>>>>>>>>>
>>>>>>>>>>>>>>>>  context.element().partition)) {
>>>>>>>
>>>>>>>>         consumer.seek(tracker.start());
>>>>>>>>>>>>>>>>         while (true) {
>>>>>>>>>>>>>>>>           ConsumerRecords<String, String> records =
>>>>>>>>>>>>>>>>
>>>>>>>>>>>>>>> consumer.poll(100ms);
>>>>>>>>>>>>>>>
>>>>>>>>>>>>>>>>           if (records == null) return done();
>>>>>>>>>>>>>>>>           for (ConsumerRecord<String, String> record :
>>>>>>>>>>>>>>>>
>>>>>>>>>>>>>>> records)
>>>>>>>
>>>>>>>> {
>>>>>>>>>>>
>>>>>>>>>>>>             if (!tracker.tryClaim(record.offset())) {
>>>>>>>>>>>>>>>>               return
>>>>>>>>>>>>>>>>
>>>>>>>>>>>>>>> resume().withFutureOutputWatermark(record.timestamp());
>>>>>>>>>>>>>>>
>>>>>>>>>>>>>>>>             }
>>>>>>>>>>>>>>>>             context.output(record);
>>>>>>>>>>>>>>>>           }
>>>>>>>>>>>>>>>>         }
>>>>>>>>>>>>>>>>       }
>>>>>>>>>>>>>>>>     }
>>>>>>>>>>>>>>>>
>>>>>>>>>>>>>>>> The document describes in detail the motivations behind
>>>>>>>>>>>>>>>>
>>>>>>>>>>>>>>> this
>>>>>>
>>>>>>> feature,
>>>>>>>>>>>>>
>>>>>>>>>>>>>> the
>>>>>>>>>>>>>>>
>>>>>>>>>>>>>>>> basic idea and API, open questions, and outlines an
>>>>>>>>>>>>>>>>
>>>>>>>>>>>>>>> incremental
>>>>>>>
>>>>>>>> delivery
>>>>>>>>>>>>>
>>>>>>>>>>>>>> plan.
>>>>>>>>>>>>>>>>
>>>>>>>>>>>>>>>> The proposed API builds on the reflection-based new DoFn
>>>>>>>>>>>>>>>>
>>>>>>>>>>>>>>> [new-do-fn]
>>>>>>>>>>>>>
>>>>>>>>>>>>>> and is
>>>>>>>>>>>>>>>
>>>>>>>>>>>>>>>> loosely related to "State and Timers for DoFn"
>>>>>>>>>>>>>>>>
>>>>>>>>>>>>>>> [beam-state].
>>>>>>
>>>>>>>
>>>>>>>>>>>>>>>> Please take a look and comment!
>>>>>>>>>>>>>>>>
>>>>>>>>>>>>>>>> Thanks.
>>>>>>>>>>>>>>>>
>>>>>>>>>>>>>>>> [BEAM-65] https://issues.apache.org/jira/browse/BEAM-65
>>>>>>>>>>>>>>>> [new-do-fn] https://s.apache.org/a-new-do-fn
>>>>>>>>>>>>>>>> [beam-state] https://s.apache.org/beam-state
>>>>>>>>>>>>>>>>
>>>>>>>>>>>>>>>>
>>>>>>>>>>>>>>> --
>>>>>>>>>>>>>>> Jean-Baptiste Onofré
>>>>>>>>>>>>>>> [email protected] <javascript:;>
>>>>>>>>>>>>>>> http://blog.nanthrax.net
>>>>>>>>>>>>>>> Talend - http://www.talend.com
>>>>>>>>>>>>>>>
>>>>>>>>>>>>>>>
>>>>>>>>>>>>>>
>>>>>>>>>>>>
>>>>>>>>>>
>>>>>>>>>
>>>>>>>
>>>>>>
>>>>>> --
>>>>>> Thanks,
>>>>>> Andrew
>>>>>>
>>>>>> Subscribe to my book: Streaming Data <http://manning.com/psaltis>
>>>>>> <https://www.linkedin.com/pub/andrew-psaltis/1/17b/306>
>>>>>> twiiter: @itmdata <http://twitter.com/intent/user?screen_name=itmdata
>>>>>> >
>>>>>>
>>>>>>
>>>>>
>>>>
>>> --
>>> Jean-Baptiste Onofré
>>> [email protected]
>>> http://blog.nanthrax.net
>>> Talend - http://www.talend.com
>>>
>>>
>>
> --
> Jean-Baptiste Onofré
> [email protected]
> http://blog.nanthrax.net
> Talend - http://www.talend.com
>

Reply via email to