Can you give details on what the synchronization is per? Is it per key, or
global to each worker?

On Fri, Sep 21, 2018 at 2:10 PM Lukasz Cwik <[email protected]> wrote:

> As I was looking at the SplittableDoFn API while working towards making a
> proposal for how the backlog/splitting API could look, I found some sharp
> edges that could be improved.
>
> I noticed that:
> 1) We require users to write thread safe code, this is something that we
> haven't asked of users when writing a DoFn.
> 2) We "internal" methods within the RestrictionTracker that are not meant
> to be used by the runner.
>
> I can fix these issues by giving the user a forwarding restriction
> tracker[1] that provides an appropriate level of synchronization as needed
> and also provides the necessary observation hooks to see when a claim
> failed or succeeded.
>
> This requires a change to our experimental API since we need to pass
> a RestrictionTracker to the @ProcessElement method instead of a sub-type of
> RestrictionTracker.
> @ProcessElement
> processElement(ProcessContext context, OffsetRangeTracker tracker) { ... }
> becomes:
> @ProcessElement
> processElement(ProcessContext context, RestrictionTracker<OffsetRange,
> Long> tracker) { ... }
>
> This provides an additional benefit that it prevents users from working
> around the RestrictionTracker APIs and potentially making underlying
> changes to the tracker outside of the tryClaim call.
>
> Full implementation is available within this PR[2] and was wondering what
> people thought.
>
> 1:
> https://github.com/apache/beam/pull/6467/files#diff-ed95abb6bc30a9ed07faef5c3fea93f0R72
> 2: https://github.com/apache/beam/pull/6467
>
>
> On Mon, Sep 17, 2018 at 12:45 PM Lukasz Cwik <[email protected]> wrote:
>
>> The changes to the API have not been proposed yet. So far it has all been
>> about what is the representation and why.
>>
>> For splitting, the current idea has been about using the backlog as a way
>> of telling the SplittableDoFn where to split, so it would be in terms of
>> whatever the SDK decided to report.
>> The runner always chooses a number for backlog that is relative to the
>> SDKs reported backlog. It would be upto the SDK to round/clamp the number
>> given by the Runner to represent something meaningful for itself.
>> For example if the backlog that the SDK was reporting was bytes remaining
>> in a file such as 500, then the Runner could provide some value like 212.2
>> which the SDK would then round to 212.
>> If the backlog that the SDK was reporting 57 pubsub messages, then the
>> Runner could provide a value like 300 which would mean to read 57 values
>> and then another 243 as part of the current restriction.
>>
>> I believe that BoundedSource/UnboundedSource will have wrappers added
>> that provide a basic SplittableDoFn implementation so existing IOs should
>> be migrated over without API changes.
>>
>> On Mon, Sep 17, 2018 at 1:11 AM Ismaël Mejía <[email protected]> wrote:
>>
>>> Thanks a lot Luke for bringing this back to the mailing list and Ryan
>>> for taking
>>> the notes.
>>>
>>> I would like to know if there was some discussion, or if you guys have
>>> given
>>> some thought to the required changes in the SDK (API) part. What will be
>>> the
>>> equivalent of `splitAtFraction` and what should IO authors do to support
>>> it..
>>>
>>> On Sat, Sep 15, 2018 at 1:52 AM Lukasz Cwik <[email protected]> wrote:
>>> >
>>> > Thanks to everyone who joined and for the questions asked.
>>> >
>>> > Ryan graciously collected notes of the discussion:
>>> https://docs.google.com/document/d/1kjJLGIiNAGvDiUCMEtQbw8tyOXESvwGeGZLL-0M06fQ/edit?usp=sharing
>>> >
>>> > The summary was that bringing BoundedSource/UnboundedSource into using
>>> a unified backlog-reporting mechanism with optional other signals that
>>> Dataflow has found useful (such as is the remaining restriction splittable
>>> (yes, no, unknown)). Other runners can use or not. SDFs should report
>>> backlog and watermark as minimum bar. The backlog should use an arbitrary
>>> precision float such as Java BigDecimal to prevent issues where limited
>>> precision removes the ability to compute delta efficiently.
>>> >
>>> >
>>> >
>>> > On Wed, Sep 12, 2018 at 3:54 PM Lukasz Cwik <[email protected]> wrote:
>>> >>
>>> >> Here is the link to join the discussion:
>>> https://meet.google.com/idc-japs-hwf
>>> >> Remember that it is this Friday Sept 14th from 11am-noon PST.
>>> >>
>>> >>
>>> >>
>>> >> On Mon, Sep 10, 2018 at 7:30 AM Maximilian Michels <[email protected]>
>>> wrote:
>>> >>>
>>> >>> Thanks for moving forward with this, Lukasz!
>>> >>>
>>> >>> Unfortunately, can't make it on Friday but I'll sync with somebody on
>>> >>> the call (e.g. Ryan) about your discussion.
>>> >>>
>>> >>> On 08.09.18 02:00, Lukasz Cwik wrote:
>>> >>> > Thanks for everyone who wanted to fill out the doodle poll. The
>>> most
>>> >>> > popular time was Friday Sept 14th from 11am-noon PST. I'll send
>>> out a
>>> >>> > calendar invite and meeting link early next week.
>>> >>> >
>>> >>> > I have received a lot of feedback on the document and have
>>> addressed
>>> >>> > some parts of it including:
>>> >>> > * clarifying terminology
>>> >>> > * processing skew due to some restrictions having their watermarks
>>> much
>>> >>> > further behind then others affecting scheduling of bundles by
>>> runners
>>> >>> > * external throttling & I/O wait overhead reporting to make sure we
>>> >>> > don't overscale
>>> >>> >
>>> >>> > Areas that still need additional feedback and details are:
>>> >>> > * reporting progress around the work that is done and is active
>>> >>> > * more examples
>>> >>> > * unbounded restrictions being caused by an unbounded number of
>>> splits
>>> >>> > of existing unbounded restrictions (infinite work growth)
>>> >>> > * whether we should be reporting this information at the PTransform
>>> >>> > level or at the bundle level
>>> >>> >
>>> >>> >
>>> >>> >
>>> >>> > On Wed, Sep 5, 2018 at 1:53 PM Lukasz Cwik <[email protected]
>>> >>> > <mailto:[email protected]>> wrote:
>>> >>> >
>>> >>> >     Thanks to all those who have provided interest in this topic
>>> by the
>>> >>> >     questions they have asked on the doc already and for those
>>> >>> >     interested in having this discussion. I have setup this doodle
>>> to
>>> >>> >     allow people to provide their availability:
>>> >>> >     https://doodle.com/poll/nrw7w84255xnfwqy
>>> >>> >
>>> >>> >     I'll send out the chosen time based upon peoples availability
>>> and a
>>> >>> >     Hangout link by end of day Friday so please mark your
>>> availability
>>> >>> >     using the link above.
>>> >>> >
>>> >>> >     The agenda of the meeting will be as follows:
>>> >>> >     * Overview of the proposal
>>> >>> >     * Enumerate and discuss/answer questions brought up in the
>>> meeting
>>> >>> >
>>> >>> >     Note that all questions and any discussions/answers provided
>>> will be
>>> >>> >     added to the doc for those who are unable to attend.
>>> >>> >
>>> >>> >     On Fri, Aug 31, 2018 at 9:47 AM Jean-Baptiste Onofré
>>> >>> >     <[email protected] <mailto:[email protected]>> wrote:
>>> >>> >
>>> >>> >         +1
>>> >>> >
>>> >>> >         Regards
>>> >>> >         JB
>>> >>> >         Le 31 août 2018, à 18:22, Lukasz Cwik <[email protected]
>>> >>> >         <mailto:[email protected]>> a écrit:
>>> >>> >
>>> >>> >             That is possible, I'll take people's date/time
>>> suggestions
>>> >>> >             and create a simple online poll with them.
>>> >>> >
>>> >>> >             On Fri, Aug 31, 2018 at 2:22 AM Robert Bradshaw
>>> >>> >             <[email protected] <mailto:[email protected]>>
>>> wrote:
>>> >>> >
>>> >>> >                 Thanks for taking this up. I added some comments
>>> to the
>>> >>> >                 doc. A European-friendly time for discussion would
>>> >>> >                 be great.
>>> >>> >
>>> >>> >                 On Fri, Aug 31, 2018 at 3:14 AM Lukasz Cwik
>>> >>> >                 <[email protected] <mailto:[email protected]>>
>>> wrote:
>>> >>> >
>>> >>> >                     I came up with a proposal[1] for a progress
>>> model
>>> >>> >                     solely based off of the backlog and that splits
>>> >>> >                     should be based upon the remaining backlog we
>>> want
>>> >>> >                     the SDK to split at. I also give
>>> recommendations to
>>> >>> >                     runner authors as to how an autoscaling system
>>> could
>>> >>> >                     work based upon the measured backlog. A lot of
>>> >>> >                     discussions around progress reporting and
>>> splitting
>>> >>> >                     in the past has always been around finding an
>>> >>> >                     optimal solution, after reading a lot of
>>> information
>>> >>> >                     about work stealing, I don't believe there is a
>>> >>> >                     general solution and it really is upto
>>> >>> >                     SplittableDoFns to be well behaved. I did not
>>> do
>>> >>> >                     much work in classifying what a well behaved
>>> >>> >                     SplittableDoFn is though. Much of this work
>>> builds
>>> >>> >                     off ideas that Eugene had documented in the
>>> past[2].
>>> >>> >
>>> >>> >                     I could use the communities wide knowledge of
>>> >>> >                     different I/Os to see if computing the backlog
>>> is
>>> >>> >                     practical in the way that I'm suggesting and to
>>> >>> >                     gather people's feedback.
>>> >>> >
>>> >>> >                     If there is a lot of interest, I would like to
>>> hold
>>> >>> >                     a community video conference between Sept 10th
>>> and
>>> >>> >                     14th about this topic. Please reply with your
>>> >>> >                     availability by Sept 6th if your interested.
>>> >>> >
>>> >>> >                     1:
>>> https://s.apache.org/beam-bundles-backlog-splitting
>>> >>> >                     2: https://s.apache.org/beam-breaking-fusion
>>> >>> >
>>> >>> >                     On Mon, Aug 13, 2018 at 10:21 AM Jean-Baptiste
>>> >>> >                     Onofré <[email protected] <mailto:
>>> [email protected]>> wrote:
>>> >>> >
>>> >>> >                         Awesome !
>>> >>> >
>>> >>> >                         Thanks Luke !
>>> >>> >
>>> >>> >                         I plan to work with you and others on this
>>> one.
>>> >>> >
>>> >>> >                         Regards
>>> >>> >                         JB
>>> >>> >                         Le 13 août 2018, à 19:14, Lukasz Cwik
>>> >>> >                         <[email protected] <mailto:[email protected]>>
>>> a
>>> >>> >                         écrit:
>>> >>> >
>>> >>> >                             I wanted to reach out that I will be
>>> >>> >                             continuing from where Eugene left off
>>> with
>>> >>> >                             SplittableDoFn. I know that many of
>>> you have
>>> >>> >                             done a bunch of work with IOs and/or
>>> runner
>>> >>> >                             integration for SplittableDoFn and
>>> would
>>> >>> >                             appreciate your help in advancing this
>>> >>> >                             awesome idea. If you have questions or
>>> >>> >                             things you want to get reviewed
>>> related to
>>> >>> >                             SplittableDoFn, feel free to send them
>>> my
>>> >>> >                             way or include me on anything
>>> SplittableDoFn
>>> >>> >                             related.
>>> >>> >
>>> >>> >                             I was part of several discussions with
>>> >>> >                             Eugene and I think the biggest
>>> outstanding
>>> >>> >                             design portion is to figure out how
>>> dynamic
>>> >>> >                             work rebalancing would play out with
>>> the
>>> >>> >                             portability APIs. This includes
>>> reporting of
>>> >>> >                             progress from within a bundle. I know
>>> that
>>> >>> >                             Eugene had shared some documents in
>>> this
>>> >>> >                             regard but the position / split models
>>> >>> >                             didn't work too cleanly in a unified
>>> sense
>>> >>> >                             for bounded and unbounded
>>> SplittableDoFns.
>>> >>> >                             It will likely take me awhile to
>>> gather my
>>> >>> >                             thoughts but could use your expertise
>>> as to
>>> >>> >                             how compatible these ideas are with
>>> respect
>>> >>> >                             to to IOs and runners
>>> >>> >                             Flink/Spark/Dataflow/Samza/Apex/... and
>>> >>> >                             obviously help during implementation.
>>> >>> >
>>>
>>

Reply via email to