Don't want to crash the tech discussion here, but... I just gave a session
at the Beam Summit about Splittable DoFn's as a users perspective (from
things I could gather from the documentation and experimentation). Her is
the slides deck, maybe it could be useful:
https://docs.google.com/presentation/d/1dSc6oKh5pZItQPB_QiUyEoLT2TebMnj-pmdGipkVFPk/edit?usp=sharing
(quite
proud of the animations though ;-)

 _/
_/ Alex Van Boxel


On Thu, Sep 27, 2018 at 12:04 AM Lukasz Cwik <[email protected]> wrote:

> Reuven, just inside the restriction tracker itself which is scoped per
> executing SplittableDoFn. A user could incorrectly write the
> synchronization since they are currently responsible for writing it though.
>
> On Wed, Sep 26, 2018 at 2:51 PM Reuven Lax <[email protected]> wrote:
>
>> is synchronization over an entire work item, or just inside restriction
>> tracker? my concern is that some runners (especially streaming runners)
>> might have hundreds or thousands of parallel work items being processed for
>> the same SDF (for different keys), and I'm afraid of creating
>> lock-contention bottlenecks.
>>
>> On Fri, Sep 21, 2018 at 3:42 PM Lukasz Cwik <[email protected]> wrote:
>>
>>> The synchronization is related to Java thread safety since there is
>>> likely to be concurrent access needed to a restriction tracker to properly
>>> handle accessing the backlog and splitting concurrently from when the users
>>> DoFn is executing and updating the restriction tracker. This is similar to
>>> the Java thread safety needed in BoundedSource and UnboundedSource for
>>> fraction consumed, backlog bytes, and splitting.
>>>
>>> On Fri, Sep 21, 2018 at 2:38 PM Reuven Lax <[email protected]> wrote:
>>>
>>>> Can you give details on what the synchronization is per? Is it per key,
>>>> or global to each worker?
>>>>
>>>> On Fri, Sep 21, 2018 at 2:10 PM Lukasz Cwik <[email protected]> wrote:
>>>>
>>>>> As I was looking at the SplittableDoFn API while working towards
>>>>> making a proposal for how the backlog/splitting API could look, I found
>>>>> some sharp edges that could be improved.
>>>>>
>>>>> I noticed that:
>>>>> 1) We require users to write thread safe code, this is something that
>>>>> we haven't asked of users when writing a DoFn.
>>>>> 2) We "internal" methods within the RestrictionTracker that are not
>>>>> meant to be used by the runner.
>>>>>
>>>>> I can fix these issues by giving the user a forwarding restriction
>>>>> tracker[1] that provides an appropriate level of synchronization as needed
>>>>> and also provides the necessary observation hooks to see when a claim
>>>>> failed or succeeded.
>>>>>
>>>>> This requires a change to our experimental API since we need to pass
>>>>> a RestrictionTracker to the @ProcessElement method instead of a sub-type 
>>>>> of
>>>>> RestrictionTracker.
>>>>> @ProcessElement
>>>>> processElement(ProcessContext context, OffsetRangeTracker tracker) {
>>>>> ... }
>>>>> becomes:
>>>>> @ProcessElement
>>>>> processElement(ProcessContext context, RestrictionTracker<OffsetRange,
>>>>> Long> tracker) { ... }
>>>>>
>>>>> This provides an additional benefit that it prevents users from
>>>>> working around the RestrictionTracker APIs and potentially making
>>>>> underlying changes to the tracker outside of the tryClaim call.
>>>>>
>>>>> Full implementation is available within this PR[2] and was wondering
>>>>> what people thought.
>>>>>
>>>>> 1:
>>>>> https://github.com/apache/beam/pull/6467/files#diff-ed95abb6bc30a9ed07faef5c3fea93f0R72
>>>>> 2: https://github.com/apache/beam/pull/6467
>>>>>
>>>>>
>>>>> On Mon, Sep 17, 2018 at 12:45 PM Lukasz Cwik <[email protected]> wrote:
>>>>>
>>>>>> The changes to the API have not been proposed yet. So far it has all
>>>>>> been about what is the representation and why.
>>>>>>
>>>>>> For splitting, the current idea has been about using the backlog as a
>>>>>> way of telling the SplittableDoFn where to split, so it would be in terms
>>>>>> of whatever the SDK decided to report.
>>>>>> The runner always chooses a number for backlog that is relative to
>>>>>> the SDKs reported backlog. It would be upto the SDK to round/clamp the
>>>>>> number given by the Runner to represent something meaningful for itself.
>>>>>> For example if the backlog that the SDK was reporting was bytes
>>>>>> remaining in a file such as 500, then the Runner could provide some value
>>>>>> like 212.2 which the SDK would then round to 212.
>>>>>> If the backlog that the SDK was reporting 57 pubsub messages, then
>>>>>> the Runner could provide a value like 300 which would mean to read 57
>>>>>> values and then another 243 as part of the current restriction.
>>>>>>
>>>>>> I believe that BoundedSource/UnboundedSource will have wrappers added
>>>>>> that provide a basic SplittableDoFn implementation so existing IOs should
>>>>>> be migrated over without API changes.
>>>>>>
>>>>>> On Mon, Sep 17, 2018 at 1:11 AM Ismaël Mejía <[email protected]>
>>>>>> wrote:
>>>>>>
>>>>>>> Thanks a lot Luke for bringing this back to the mailing list and
>>>>>>> Ryan for taking
>>>>>>> the notes.
>>>>>>>
>>>>>>> I would like to know if there was some discussion, or if you guys
>>>>>>> have given
>>>>>>> some thought to the required changes in the SDK (API) part. What
>>>>>>> will be the
>>>>>>> equivalent of `splitAtFraction` and what should IO authors do to
>>>>>>> support it..
>>>>>>>
>>>>>>> On Sat, Sep 15, 2018 at 1:52 AM Lukasz Cwik <[email protected]>
>>>>>>> wrote:
>>>>>>> >
>>>>>>> > Thanks to everyone who joined and for the questions asked.
>>>>>>> >
>>>>>>> > Ryan graciously collected notes of the discussion:
>>>>>>> https://docs.google.com/document/d/1kjJLGIiNAGvDiUCMEtQbw8tyOXESvwGeGZLL-0M06fQ/edit?usp=sharing
>>>>>>> >
>>>>>>> > The summary was that bringing BoundedSource/UnboundedSource into
>>>>>>> using a unified backlog-reporting mechanism with optional other signals
>>>>>>> that Dataflow has found useful (such as is the remaining restriction
>>>>>>> splittable (yes, no, unknown)). Other runners can use or not. SDFs 
>>>>>>> should
>>>>>>> report backlog and watermark as minimum bar. The backlog should use an
>>>>>>> arbitrary precision float such as Java BigDecimal to prevent issues 
>>>>>>> where
>>>>>>> limited precision removes the ability to compute delta efficiently.
>>>>>>> >
>>>>>>> >
>>>>>>> >
>>>>>>> > On Wed, Sep 12, 2018 at 3:54 PM Lukasz Cwik <[email protected]>
>>>>>>> wrote:
>>>>>>> >>
>>>>>>> >> Here is the link to join the discussion:
>>>>>>> https://meet.google.com/idc-japs-hwf
>>>>>>> >> Remember that it is this Friday Sept 14th from 11am-noon PST.
>>>>>>> >>
>>>>>>> >>
>>>>>>> >>
>>>>>>> >> On Mon, Sep 10, 2018 at 7:30 AM Maximilian Michels <
>>>>>>> [email protected]> wrote:
>>>>>>> >>>
>>>>>>> >>> Thanks for moving forward with this, Lukasz!
>>>>>>> >>>
>>>>>>> >>> Unfortunately, can't make it on Friday but I'll sync with
>>>>>>> somebody on
>>>>>>> >>> the call (e.g. Ryan) about your discussion.
>>>>>>> >>>
>>>>>>> >>> On 08.09.18 02:00, Lukasz Cwik wrote:
>>>>>>> >>> > Thanks for everyone who wanted to fill out the doodle poll.
>>>>>>> The most
>>>>>>> >>> > popular time was Friday Sept 14th from 11am-noon PST. I'll
>>>>>>> send out a
>>>>>>> >>> > calendar invite and meeting link early next week.
>>>>>>> >>> >
>>>>>>> >>> > I have received a lot of feedback on the document and have
>>>>>>> addressed
>>>>>>> >>> > some parts of it including:
>>>>>>> >>> > * clarifying terminology
>>>>>>> >>> > * processing skew due to some restrictions having their
>>>>>>> watermarks much
>>>>>>> >>> > further behind then others affecting scheduling of bundles by
>>>>>>> runners
>>>>>>> >>> > * external throttling & I/O wait overhead reporting to make
>>>>>>> sure we
>>>>>>> >>> > don't overscale
>>>>>>> >>> >
>>>>>>> >>> > Areas that still need additional feedback and details are:
>>>>>>> >>> > * reporting progress around the work that is done and is active
>>>>>>> >>> > * more examples
>>>>>>> >>> > * unbounded restrictions being caused by an unbounded number
>>>>>>> of splits
>>>>>>> >>> > of existing unbounded restrictions (infinite work growth)
>>>>>>> >>> > * whether we should be reporting this information at the
>>>>>>> PTransform
>>>>>>> >>> > level or at the bundle level
>>>>>>> >>> >
>>>>>>> >>> >
>>>>>>> >>> >
>>>>>>> >>> > On Wed, Sep 5, 2018 at 1:53 PM Lukasz Cwik <[email protected]
>>>>>>> >>> > <mailto:[email protected]>> wrote:
>>>>>>> >>> >
>>>>>>> >>> >     Thanks to all those who have provided interest in this
>>>>>>> topic by the
>>>>>>> >>> >     questions they have asked on the doc already and for those
>>>>>>> >>> >     interested in having this discussion. I have setup this
>>>>>>> doodle to
>>>>>>> >>> >     allow people to provide their availability:
>>>>>>> >>> >     https://doodle.com/poll/nrw7w84255xnfwqy
>>>>>>> >>> >
>>>>>>> >>> >     I'll send out the chosen time based upon peoples
>>>>>>> availability and a
>>>>>>> >>> >     Hangout link by end of day Friday so please mark your
>>>>>>> availability
>>>>>>> >>> >     using the link above.
>>>>>>> >>> >
>>>>>>> >>> >     The agenda of the meeting will be as follows:
>>>>>>> >>> >     * Overview of the proposal
>>>>>>> >>> >     * Enumerate and discuss/answer questions brought up in the
>>>>>>> meeting
>>>>>>> >>> >
>>>>>>> >>> >     Note that all questions and any discussions/answers
>>>>>>> provided will be
>>>>>>> >>> >     added to the doc for those who are unable to attend.
>>>>>>> >>> >
>>>>>>> >>> >     On Fri, Aug 31, 2018 at 9:47 AM Jean-Baptiste Onofré
>>>>>>> >>> >     <[email protected] <mailto:[email protected]>> wrote:
>>>>>>> >>> >
>>>>>>> >>> >         +1
>>>>>>> >>> >
>>>>>>> >>> >         Regards
>>>>>>> >>> >         JB
>>>>>>> >>> >         Le 31 août 2018, à 18:22, Lukasz Cwik <
>>>>>>> [email protected]
>>>>>>> >>> >         <mailto:[email protected]>> a écrit:
>>>>>>> >>> >
>>>>>>> >>> >             That is possible, I'll take people's date/time
>>>>>>> suggestions
>>>>>>> >>> >             and create a simple online poll with them.
>>>>>>> >>> >
>>>>>>> >>> >             On Fri, Aug 31, 2018 at 2:22 AM Robert Bradshaw
>>>>>>> >>> >             <[email protected] <mailto:[email protected]>>
>>>>>>> wrote:
>>>>>>> >>> >
>>>>>>> >>> >                 Thanks for taking this up. I added some
>>>>>>> comments to the
>>>>>>> >>> >                 doc. A European-friendly time for discussion
>>>>>>> would
>>>>>>> >>> >                 be great.
>>>>>>> >>> >
>>>>>>> >>> >                 On Fri, Aug 31, 2018 at 3:14 AM Lukasz Cwik
>>>>>>> >>> >                 <[email protected] <mailto:[email protected]>>
>>>>>>> wrote:
>>>>>>> >>> >
>>>>>>> >>> >                     I came up with a proposal[1] for a
>>>>>>> progress model
>>>>>>> >>> >                     solely based off of the backlog and that
>>>>>>> splits
>>>>>>> >>> >                     should be based upon the remaining backlog
>>>>>>> we want
>>>>>>> >>> >                     the SDK to split at. I also give
>>>>>>> recommendations to
>>>>>>> >>> >                     runner authors as to how an autoscaling
>>>>>>> system could
>>>>>>> >>> >                     work based upon the measured backlog. A
>>>>>>> lot of
>>>>>>> >>> >                     discussions around progress reporting and
>>>>>>> splitting
>>>>>>> >>> >                     in the past has always been around finding
>>>>>>> an
>>>>>>> >>> >                     optimal solution, after reading a lot of
>>>>>>> information
>>>>>>> >>> >                     about work stealing, I don't believe there
>>>>>>> is a
>>>>>>> >>> >                     general solution and it really is upto
>>>>>>> >>> >                     SplittableDoFns to be well behaved. I did
>>>>>>> not do
>>>>>>> >>> >                     much work in classifying what a well
>>>>>>> behaved
>>>>>>> >>> >                     SplittableDoFn is though. Much of this
>>>>>>> work builds
>>>>>>> >>> >                     off ideas that Eugene had documented in
>>>>>>> the past[2].
>>>>>>> >>> >
>>>>>>> >>> >                     I could use the communities wide knowledge
>>>>>>> of
>>>>>>> >>> >                     different I/Os to see if computing the
>>>>>>> backlog is
>>>>>>> >>> >                     practical in the way that I'm suggesting
>>>>>>> and to
>>>>>>> >>> >                     gather people's feedback.
>>>>>>> >>> >
>>>>>>> >>> >                     If there is a lot of interest, I would
>>>>>>> like to hold
>>>>>>> >>> >                     a community video conference between Sept
>>>>>>> 10th and
>>>>>>> >>> >                     14th about this topic. Please reply with
>>>>>>> your
>>>>>>> >>> >                     availability by Sept 6th if your
>>>>>>> interested.
>>>>>>> >>> >
>>>>>>> >>> >                     1:
>>>>>>> https://s.apache.org/beam-bundles-backlog-splitting
>>>>>>> >>> >                     2:
>>>>>>> https://s.apache.org/beam-breaking-fusion
>>>>>>> >>> >
>>>>>>> >>> >                     On Mon, Aug 13, 2018 at 10:21 AM
>>>>>>> Jean-Baptiste
>>>>>>> >>> >                     Onofré <[email protected] <mailto:
>>>>>>> [email protected]>> wrote:
>>>>>>> >>> >
>>>>>>> >>> >                         Awesome !
>>>>>>> >>> >
>>>>>>> >>> >                         Thanks Luke !
>>>>>>> >>> >
>>>>>>> >>> >                         I plan to work with you and others on
>>>>>>> this one.
>>>>>>> >>> >
>>>>>>> >>> >                         Regards
>>>>>>> >>> >                         JB
>>>>>>> >>> >                         Le 13 août 2018, à 19:14, Lukasz Cwik
>>>>>>> >>> >                         <[email protected] <mailto:
>>>>>>> [email protected]>> a
>>>>>>> >>> >                         écrit:
>>>>>>> >>> >
>>>>>>> >>> >                             I wanted to reach out that I will
>>>>>>> be
>>>>>>> >>> >                             continuing from where Eugene left
>>>>>>> off with
>>>>>>> >>> >                             SplittableDoFn. I know that many
>>>>>>> of you have
>>>>>>> >>> >                             done a bunch of work with IOs
>>>>>>> and/or runner
>>>>>>> >>> >                             integration for SplittableDoFn and
>>>>>>> would
>>>>>>> >>> >                             appreciate your help in advancing
>>>>>>> this
>>>>>>> >>> >                             awesome idea. If you have
>>>>>>> questions or
>>>>>>> >>> >                             things you want to get reviewed
>>>>>>> related to
>>>>>>> >>> >                             SplittableDoFn, feel free to send
>>>>>>> them my
>>>>>>> >>> >                             way or include me on anything
>>>>>>> SplittableDoFn
>>>>>>> >>> >                             related.
>>>>>>> >>> >
>>>>>>> >>> >                             I was part of several discussions
>>>>>>> with
>>>>>>> >>> >                             Eugene and I think the biggest
>>>>>>> outstanding
>>>>>>> >>> >                             design portion is to figure out
>>>>>>> how dynamic
>>>>>>> >>> >                             work rebalancing would play out
>>>>>>> with the
>>>>>>> >>> >                             portability APIs. This includes
>>>>>>> reporting of
>>>>>>> >>> >                             progress from within a bundle. I
>>>>>>> know that
>>>>>>> >>> >                             Eugene had shared some documents
>>>>>>> in this
>>>>>>> >>> >                             regard but the position / split
>>>>>>> models
>>>>>>> >>> >                             didn't work too cleanly in a
>>>>>>> unified sense
>>>>>>> >>> >                             for bounded and unbounded
>>>>>>> SplittableDoFns.
>>>>>>> >>> >                             It will likely take me awhile to
>>>>>>> gather my
>>>>>>> >>> >                             thoughts but could use your
>>>>>>> expertise as to
>>>>>>> >>> >                             how compatible these ideas are
>>>>>>> with respect
>>>>>>> >>> >                             to to IOs and runners
>>>>>>> >>> >
>>>>>>>  Flink/Spark/Dataflow/Samza/Apex/... and
>>>>>>> >>> >                             obviously help during
>>>>>>> implementation.
>>>>>>> >>> >
>>>>>>>
>>>>>>

Reply via email to