Reuven, just inside the restriction tracker itself which is scoped per
executing SplittableDoFn. A user could incorrectly write the
synchronization since they are currently responsible for writing it though.

On Wed, Sep 26, 2018 at 2:51 PM Reuven Lax <[email protected]> wrote:

> is synchronization over an entire work item, or just inside restriction
> tracker? my concern is that some runners (especially streaming runners)
> might have hundreds or thousands of parallel work items being processed for
> the same SDF (for different keys), and I'm afraid of creating
> lock-contention bottlenecks.
>
> On Fri, Sep 21, 2018 at 3:42 PM Lukasz Cwik <[email protected]> wrote:
>
>> The synchronization is related to Java thread safety since there is
>> likely to be concurrent access needed to a restriction tracker to properly
>> handle accessing the backlog and splitting concurrently from when the users
>> DoFn is executing and updating the restriction tracker. This is similar to
>> the Java thread safety needed in BoundedSource and UnboundedSource for
>> fraction consumed, backlog bytes, and splitting.
>>
>> On Fri, Sep 21, 2018 at 2:38 PM Reuven Lax <[email protected]> wrote:
>>
>>> Can you give details on what the synchronization is per? Is it per key,
>>> or global to each worker?
>>>
>>> On Fri, Sep 21, 2018 at 2:10 PM Lukasz Cwik <[email protected]> wrote:
>>>
>>>> As I was looking at the SplittableDoFn API while working towards making
>>>> a proposal for how the backlog/splitting API could look, I found some sharp
>>>> edges that could be improved.
>>>>
>>>> I noticed that:
>>>> 1) We require users to write thread safe code, this is something that
>>>> we haven't asked of users when writing a DoFn.
>>>> 2) We "internal" methods within the RestrictionTracker that are not
>>>> meant to be used by the runner.
>>>>
>>>> I can fix these issues by giving the user a forwarding restriction
>>>> tracker[1] that provides an appropriate level of synchronization as needed
>>>> and also provides the necessary observation hooks to see when a claim
>>>> failed or succeeded.
>>>>
>>>> This requires a change to our experimental API since we need to pass
>>>> a RestrictionTracker to the @ProcessElement method instead of a sub-type of
>>>> RestrictionTracker.
>>>> @ProcessElement
>>>> processElement(ProcessContext context, OffsetRangeTracker tracker) {
>>>> ... }
>>>> becomes:
>>>> @ProcessElement
>>>> processElement(ProcessContext context, RestrictionTracker<OffsetRange,
>>>> Long> tracker) { ... }
>>>>
>>>> This provides an additional benefit that it prevents users from working
>>>> around the RestrictionTracker APIs and potentially making underlying
>>>> changes to the tracker outside of the tryClaim call.
>>>>
>>>> Full implementation is available within this PR[2] and was wondering
>>>> what people thought.
>>>>
>>>> 1:
>>>> https://github.com/apache/beam/pull/6467/files#diff-ed95abb6bc30a9ed07faef5c3fea93f0R72
>>>> 2: https://github.com/apache/beam/pull/6467
>>>>
>>>>
>>>> On Mon, Sep 17, 2018 at 12:45 PM Lukasz Cwik <[email protected]> wrote:
>>>>
>>>>> The changes to the API have not been proposed yet. So far it has all
>>>>> been about what is the representation and why.
>>>>>
>>>>> For splitting, the current idea has been about using the backlog as a
>>>>> way of telling the SplittableDoFn where to split, so it would be in terms
>>>>> of whatever the SDK decided to report.
>>>>> The runner always chooses a number for backlog that is relative to the
>>>>> SDKs reported backlog. It would be upto the SDK to round/clamp the number
>>>>> given by the Runner to represent something meaningful for itself.
>>>>> For example if the backlog that the SDK was reporting was bytes
>>>>> remaining in a file such as 500, then the Runner could provide some value
>>>>> like 212.2 which the SDK would then round to 212.
>>>>> If the backlog that the SDK was reporting 57 pubsub messages, then the
>>>>> Runner could provide a value like 300 which would mean to read 57 values
>>>>> and then another 243 as part of the current restriction.
>>>>>
>>>>> I believe that BoundedSource/UnboundedSource will have wrappers added
>>>>> that provide a basic SplittableDoFn implementation so existing IOs should
>>>>> be migrated over without API changes.
>>>>>
>>>>> On Mon, Sep 17, 2018 at 1:11 AM Ismaël Mejía <[email protected]>
>>>>> wrote:
>>>>>
>>>>>> Thanks a lot Luke for bringing this back to the mailing list and Ryan
>>>>>> for taking
>>>>>> the notes.
>>>>>>
>>>>>> I would like to know if there was some discussion, or if you guys
>>>>>> have given
>>>>>> some thought to the required changes in the SDK (API) part. What will
>>>>>> be the
>>>>>> equivalent of `splitAtFraction` and what should IO authors do to
>>>>>> support it..
>>>>>>
>>>>>> On Sat, Sep 15, 2018 at 1:52 AM Lukasz Cwik <[email protected]> wrote:
>>>>>> >
>>>>>> > Thanks to everyone who joined and for the questions asked.
>>>>>> >
>>>>>> > Ryan graciously collected notes of the discussion:
>>>>>> https://docs.google.com/document/d/1kjJLGIiNAGvDiUCMEtQbw8tyOXESvwGeGZLL-0M06fQ/edit?usp=sharing
>>>>>> >
>>>>>> > The summary was that bringing BoundedSource/UnboundedSource into
>>>>>> using a unified backlog-reporting mechanism with optional other signals
>>>>>> that Dataflow has found useful (such as is the remaining restriction
>>>>>> splittable (yes, no, unknown)). Other runners can use or not. SDFs should
>>>>>> report backlog and watermark as minimum bar. The backlog should use an
>>>>>> arbitrary precision float such as Java BigDecimal to prevent issues where
>>>>>> limited precision removes the ability to compute delta efficiently.
>>>>>> >
>>>>>> >
>>>>>> >
>>>>>> > On Wed, Sep 12, 2018 at 3:54 PM Lukasz Cwik <[email protected]>
>>>>>> wrote:
>>>>>> >>
>>>>>> >> Here is the link to join the discussion:
>>>>>> https://meet.google.com/idc-japs-hwf
>>>>>> >> Remember that it is this Friday Sept 14th from 11am-noon PST.
>>>>>> >>
>>>>>> >>
>>>>>> >>
>>>>>> >> On Mon, Sep 10, 2018 at 7:30 AM Maximilian Michels <[email protected]>
>>>>>> wrote:
>>>>>> >>>
>>>>>> >>> Thanks for moving forward with this, Lukasz!
>>>>>> >>>
>>>>>> >>> Unfortunately, can't make it on Friday but I'll sync with
>>>>>> somebody on
>>>>>> >>> the call (e.g. Ryan) about your discussion.
>>>>>> >>>
>>>>>> >>> On 08.09.18 02:00, Lukasz Cwik wrote:
>>>>>> >>> > Thanks for everyone who wanted to fill out the doodle poll. The
>>>>>> most
>>>>>> >>> > popular time was Friday Sept 14th from 11am-noon PST. I'll send
>>>>>> out a
>>>>>> >>> > calendar invite and meeting link early next week.
>>>>>> >>> >
>>>>>> >>> > I have received a lot of feedback on the document and have
>>>>>> addressed
>>>>>> >>> > some parts of it including:
>>>>>> >>> > * clarifying terminology
>>>>>> >>> > * processing skew due to some restrictions having their
>>>>>> watermarks much
>>>>>> >>> > further behind then others affecting scheduling of bundles by
>>>>>> runners
>>>>>> >>> > * external throttling & I/O wait overhead reporting to make
>>>>>> sure we
>>>>>> >>> > don't overscale
>>>>>> >>> >
>>>>>> >>> > Areas that still need additional feedback and details are:
>>>>>> >>> > * reporting progress around the work that is done and is active
>>>>>> >>> > * more examples
>>>>>> >>> > * unbounded restrictions being caused by an unbounded number of
>>>>>> splits
>>>>>> >>> > of existing unbounded restrictions (infinite work growth)
>>>>>> >>> > * whether we should be reporting this information at the
>>>>>> PTransform
>>>>>> >>> > level or at the bundle level
>>>>>> >>> >
>>>>>> >>> >
>>>>>> >>> >
>>>>>> >>> > On Wed, Sep 5, 2018 at 1:53 PM Lukasz Cwik <[email protected]
>>>>>> >>> > <mailto:[email protected]>> wrote:
>>>>>> >>> >
>>>>>> >>> >     Thanks to all those who have provided interest in this
>>>>>> topic by the
>>>>>> >>> >     questions they have asked on the doc already and for those
>>>>>> >>> >     interested in having this discussion. I have setup this
>>>>>> doodle to
>>>>>> >>> >     allow people to provide their availability:
>>>>>> >>> >     https://doodle.com/poll/nrw7w84255xnfwqy
>>>>>> >>> >
>>>>>> >>> >     I'll send out the chosen time based upon peoples
>>>>>> availability and a
>>>>>> >>> >     Hangout link by end of day Friday so please mark your
>>>>>> availability
>>>>>> >>> >     using the link above.
>>>>>> >>> >
>>>>>> >>> >     The agenda of the meeting will be as follows:
>>>>>> >>> >     * Overview of the proposal
>>>>>> >>> >     * Enumerate and discuss/answer questions brought up in the
>>>>>> meeting
>>>>>> >>> >
>>>>>> >>> >     Note that all questions and any discussions/answers
>>>>>> provided will be
>>>>>> >>> >     added to the doc for those who are unable to attend.
>>>>>> >>> >
>>>>>> >>> >     On Fri, Aug 31, 2018 at 9:47 AM Jean-Baptiste Onofré
>>>>>> >>> >     <[email protected] <mailto:[email protected]>> wrote:
>>>>>> >>> >
>>>>>> >>> >         +1
>>>>>> >>> >
>>>>>> >>> >         Regards
>>>>>> >>> >         JB
>>>>>> >>> >         Le 31 août 2018, à 18:22, Lukasz Cwik <[email protected]
>>>>>> >>> >         <mailto:[email protected]>> a écrit:
>>>>>> >>> >
>>>>>> >>> >             That is possible, I'll take people's date/time
>>>>>> suggestions
>>>>>> >>> >             and create a simple online poll with them.
>>>>>> >>> >
>>>>>> >>> >             On Fri, Aug 31, 2018 at 2:22 AM Robert Bradshaw
>>>>>> >>> >             <[email protected] <mailto:[email protected]>>
>>>>>> wrote:
>>>>>> >>> >
>>>>>> >>> >                 Thanks for taking this up. I added some
>>>>>> comments to the
>>>>>> >>> >                 doc. A European-friendly time for discussion
>>>>>> would
>>>>>> >>> >                 be great.
>>>>>> >>> >
>>>>>> >>> >                 On Fri, Aug 31, 2018 at 3:14 AM Lukasz Cwik
>>>>>> >>> >                 <[email protected] <mailto:[email protected]>>
>>>>>> wrote:
>>>>>> >>> >
>>>>>> >>> >                     I came up with a proposal[1] for a progress
>>>>>> model
>>>>>> >>> >                     solely based off of the backlog and that
>>>>>> splits
>>>>>> >>> >                     should be based upon the remaining backlog
>>>>>> we want
>>>>>> >>> >                     the SDK to split at. I also give
>>>>>> recommendations to
>>>>>> >>> >                     runner authors as to how an autoscaling
>>>>>> system could
>>>>>> >>> >                     work based upon the measured backlog. A lot
>>>>>> of
>>>>>> >>> >                     discussions around progress reporting and
>>>>>> splitting
>>>>>> >>> >                     in the past has always been around finding
>>>>>> an
>>>>>> >>> >                     optimal solution, after reading a lot of
>>>>>> information
>>>>>> >>> >                     about work stealing, I don't believe there
>>>>>> is a
>>>>>> >>> >                     general solution and it really is upto
>>>>>> >>> >                     SplittableDoFns to be well behaved. I did
>>>>>> not do
>>>>>> >>> >                     much work in classifying what a well behaved
>>>>>> >>> >                     SplittableDoFn is though. Much of this work
>>>>>> builds
>>>>>> >>> >                     off ideas that Eugene had documented in the
>>>>>> past[2].
>>>>>> >>> >
>>>>>> >>> >                     I could use the communities wide knowledge
>>>>>> of
>>>>>> >>> >                     different I/Os to see if computing the
>>>>>> backlog is
>>>>>> >>> >                     practical in the way that I'm suggesting
>>>>>> and to
>>>>>> >>> >                     gather people's feedback.
>>>>>> >>> >
>>>>>> >>> >                     If there is a lot of interest, I would like
>>>>>> to hold
>>>>>> >>> >                     a community video conference between Sept
>>>>>> 10th and
>>>>>> >>> >                     14th about this topic. Please reply with
>>>>>> your
>>>>>> >>> >                     availability by Sept 6th if your interested.
>>>>>> >>> >
>>>>>> >>> >                     1:
>>>>>> https://s.apache.org/beam-bundles-backlog-splitting
>>>>>> >>> >                     2:
>>>>>> https://s.apache.org/beam-breaking-fusion
>>>>>> >>> >
>>>>>> >>> >                     On Mon, Aug 13, 2018 at 10:21 AM
>>>>>> Jean-Baptiste
>>>>>> >>> >                     Onofré <[email protected] <mailto:
>>>>>> [email protected]>> wrote:
>>>>>> >>> >
>>>>>> >>> >                         Awesome !
>>>>>> >>> >
>>>>>> >>> >                         Thanks Luke !
>>>>>> >>> >
>>>>>> >>> >                         I plan to work with you and others on
>>>>>> this one.
>>>>>> >>> >
>>>>>> >>> >                         Regards
>>>>>> >>> >                         JB
>>>>>> >>> >                         Le 13 août 2018, à 19:14, Lukasz Cwik
>>>>>> >>> >                         <[email protected] <mailto:
>>>>>> [email protected]>> a
>>>>>> >>> >                         écrit:
>>>>>> >>> >
>>>>>> >>> >                             I wanted to reach out that I will be
>>>>>> >>> >                             continuing from where Eugene left
>>>>>> off with
>>>>>> >>> >                             SplittableDoFn. I know that many of
>>>>>> you have
>>>>>> >>> >                             done a bunch of work with IOs
>>>>>> and/or runner
>>>>>> >>> >                             integration for SplittableDoFn and
>>>>>> would
>>>>>> >>> >                             appreciate your help in advancing
>>>>>> this
>>>>>> >>> >                             awesome idea. If you have questions
>>>>>> or
>>>>>> >>> >                             things you want to get reviewed
>>>>>> related to
>>>>>> >>> >                             SplittableDoFn, feel free to send
>>>>>> them my
>>>>>> >>> >                             way or include me on anything
>>>>>> SplittableDoFn
>>>>>> >>> >                             related.
>>>>>> >>> >
>>>>>> >>> >                             I was part of several discussions
>>>>>> with
>>>>>> >>> >                             Eugene and I think the biggest
>>>>>> outstanding
>>>>>> >>> >                             design portion is to figure out how
>>>>>> dynamic
>>>>>> >>> >                             work rebalancing would play out
>>>>>> with the
>>>>>> >>> >                             portability APIs. This includes
>>>>>> reporting of
>>>>>> >>> >                             progress from within a bundle. I
>>>>>> know that
>>>>>> >>> >                             Eugene had shared some documents in
>>>>>> this
>>>>>> >>> >                             regard but the position / split
>>>>>> models
>>>>>> >>> >                             didn't work too cleanly in a
>>>>>> unified sense
>>>>>> >>> >                             for bounded and unbounded
>>>>>> SplittableDoFns.
>>>>>> >>> >                             It will likely take me awhile to
>>>>>> gather my
>>>>>> >>> >                             thoughts but could use your
>>>>>> expertise as to
>>>>>> >>> >                             how compatible these ideas are with
>>>>>> respect
>>>>>> >>> >                             to to IOs and runners
>>>>>> >>> >                             Flink/Spark/Dataflow/Samza/Apex/...
>>>>>> and
>>>>>> >>> >                             obviously help during
>>>>>> implementation.
>>>>>> >>> >
>>>>>>
>>>>>

Reply via email to