Reuven, just inside the restriction tracker itself which is scoped per executing SplittableDoFn. A user could incorrectly write the synchronization since they are currently responsible for writing it though.
On Wed, Sep 26, 2018 at 2:51 PM Reuven Lax <[email protected]> wrote: > is synchronization over an entire work item, or just inside restriction > tracker? my concern is that some runners (especially streaming runners) > might have hundreds or thousands of parallel work items being processed for > the same SDF (for different keys), and I'm afraid of creating > lock-contention bottlenecks. > > On Fri, Sep 21, 2018 at 3:42 PM Lukasz Cwik <[email protected]> wrote: > >> The synchronization is related to Java thread safety since there is >> likely to be concurrent access needed to a restriction tracker to properly >> handle accessing the backlog and splitting concurrently from when the users >> DoFn is executing and updating the restriction tracker. This is similar to >> the Java thread safety needed in BoundedSource and UnboundedSource for >> fraction consumed, backlog bytes, and splitting. >> >> On Fri, Sep 21, 2018 at 2:38 PM Reuven Lax <[email protected]> wrote: >> >>> Can you give details on what the synchronization is per? Is it per key, >>> or global to each worker? >>> >>> On Fri, Sep 21, 2018 at 2:10 PM Lukasz Cwik <[email protected]> wrote: >>> >>>> As I was looking at the SplittableDoFn API while working towards making >>>> a proposal for how the backlog/splitting API could look, I found some sharp >>>> edges that could be improved. >>>> >>>> I noticed that: >>>> 1) We require users to write thread safe code, this is something that >>>> we haven't asked of users when writing a DoFn. >>>> 2) We "internal" methods within the RestrictionTracker that are not >>>> meant to be used by the runner. >>>> >>>> I can fix these issues by giving the user a forwarding restriction >>>> tracker[1] that provides an appropriate level of synchronization as needed >>>> and also provides the necessary observation hooks to see when a claim >>>> failed or succeeded. >>>> >>>> This requires a change to our experimental API since we need to pass >>>> a RestrictionTracker to the @ProcessElement method instead of a sub-type of >>>> RestrictionTracker. >>>> @ProcessElement >>>> processElement(ProcessContext context, OffsetRangeTracker tracker) { >>>> ... } >>>> becomes: >>>> @ProcessElement >>>> processElement(ProcessContext context, RestrictionTracker<OffsetRange, >>>> Long> tracker) { ... } >>>> >>>> This provides an additional benefit that it prevents users from working >>>> around the RestrictionTracker APIs and potentially making underlying >>>> changes to the tracker outside of the tryClaim call. >>>> >>>> Full implementation is available within this PR[2] and was wondering >>>> what people thought. >>>> >>>> 1: >>>> https://github.com/apache/beam/pull/6467/files#diff-ed95abb6bc30a9ed07faef5c3fea93f0R72 >>>> 2: https://github.com/apache/beam/pull/6467 >>>> >>>> >>>> On Mon, Sep 17, 2018 at 12:45 PM Lukasz Cwik <[email protected]> wrote: >>>> >>>>> The changes to the API have not been proposed yet. So far it has all >>>>> been about what is the representation and why. >>>>> >>>>> For splitting, the current idea has been about using the backlog as a >>>>> way of telling the SplittableDoFn where to split, so it would be in terms >>>>> of whatever the SDK decided to report. >>>>> The runner always chooses a number for backlog that is relative to the >>>>> SDKs reported backlog. It would be upto the SDK to round/clamp the number >>>>> given by the Runner to represent something meaningful for itself. >>>>> For example if the backlog that the SDK was reporting was bytes >>>>> remaining in a file such as 500, then the Runner could provide some value >>>>> like 212.2 which the SDK would then round to 212. >>>>> If the backlog that the SDK was reporting 57 pubsub messages, then the >>>>> Runner could provide a value like 300 which would mean to read 57 values >>>>> and then another 243 as part of the current restriction. >>>>> >>>>> I believe that BoundedSource/UnboundedSource will have wrappers added >>>>> that provide a basic SplittableDoFn implementation so existing IOs should >>>>> be migrated over without API changes. >>>>> >>>>> On Mon, Sep 17, 2018 at 1:11 AM Ismaël Mejía <[email protected]> >>>>> wrote: >>>>> >>>>>> Thanks a lot Luke for bringing this back to the mailing list and Ryan >>>>>> for taking >>>>>> the notes. >>>>>> >>>>>> I would like to know if there was some discussion, or if you guys >>>>>> have given >>>>>> some thought to the required changes in the SDK (API) part. What will >>>>>> be the >>>>>> equivalent of `splitAtFraction` and what should IO authors do to >>>>>> support it.. >>>>>> >>>>>> On Sat, Sep 15, 2018 at 1:52 AM Lukasz Cwik <[email protected]> wrote: >>>>>> > >>>>>> > Thanks to everyone who joined and for the questions asked. >>>>>> > >>>>>> > Ryan graciously collected notes of the discussion: >>>>>> https://docs.google.com/document/d/1kjJLGIiNAGvDiUCMEtQbw8tyOXESvwGeGZLL-0M06fQ/edit?usp=sharing >>>>>> > >>>>>> > The summary was that bringing BoundedSource/UnboundedSource into >>>>>> using a unified backlog-reporting mechanism with optional other signals >>>>>> that Dataflow has found useful (such as is the remaining restriction >>>>>> splittable (yes, no, unknown)). Other runners can use or not. SDFs should >>>>>> report backlog and watermark as minimum bar. The backlog should use an >>>>>> arbitrary precision float such as Java BigDecimal to prevent issues where >>>>>> limited precision removes the ability to compute delta efficiently. >>>>>> > >>>>>> > >>>>>> > >>>>>> > On Wed, Sep 12, 2018 at 3:54 PM Lukasz Cwik <[email protected]> >>>>>> wrote: >>>>>> >> >>>>>> >> Here is the link to join the discussion: >>>>>> https://meet.google.com/idc-japs-hwf >>>>>> >> Remember that it is this Friday Sept 14th from 11am-noon PST. >>>>>> >> >>>>>> >> >>>>>> >> >>>>>> >> On Mon, Sep 10, 2018 at 7:30 AM Maximilian Michels <[email protected]> >>>>>> wrote: >>>>>> >>> >>>>>> >>> Thanks for moving forward with this, Lukasz! >>>>>> >>> >>>>>> >>> Unfortunately, can't make it on Friday but I'll sync with >>>>>> somebody on >>>>>> >>> the call (e.g. Ryan) about your discussion. >>>>>> >>> >>>>>> >>> On 08.09.18 02:00, Lukasz Cwik wrote: >>>>>> >>> > Thanks for everyone who wanted to fill out the doodle poll. The >>>>>> most >>>>>> >>> > popular time was Friday Sept 14th from 11am-noon PST. I'll send >>>>>> out a >>>>>> >>> > calendar invite and meeting link early next week. >>>>>> >>> > >>>>>> >>> > I have received a lot of feedback on the document and have >>>>>> addressed >>>>>> >>> > some parts of it including: >>>>>> >>> > * clarifying terminology >>>>>> >>> > * processing skew due to some restrictions having their >>>>>> watermarks much >>>>>> >>> > further behind then others affecting scheduling of bundles by >>>>>> runners >>>>>> >>> > * external throttling & I/O wait overhead reporting to make >>>>>> sure we >>>>>> >>> > don't overscale >>>>>> >>> > >>>>>> >>> > Areas that still need additional feedback and details are: >>>>>> >>> > * reporting progress around the work that is done and is active >>>>>> >>> > * more examples >>>>>> >>> > * unbounded restrictions being caused by an unbounded number of >>>>>> splits >>>>>> >>> > of existing unbounded restrictions (infinite work growth) >>>>>> >>> > * whether we should be reporting this information at the >>>>>> PTransform >>>>>> >>> > level or at the bundle level >>>>>> >>> > >>>>>> >>> > >>>>>> >>> > >>>>>> >>> > On Wed, Sep 5, 2018 at 1:53 PM Lukasz Cwik <[email protected] >>>>>> >>> > <mailto:[email protected]>> wrote: >>>>>> >>> > >>>>>> >>> > Thanks to all those who have provided interest in this >>>>>> topic by the >>>>>> >>> > questions they have asked on the doc already and for those >>>>>> >>> > interested in having this discussion. I have setup this >>>>>> doodle to >>>>>> >>> > allow people to provide their availability: >>>>>> >>> > https://doodle.com/poll/nrw7w84255xnfwqy >>>>>> >>> > >>>>>> >>> > I'll send out the chosen time based upon peoples >>>>>> availability and a >>>>>> >>> > Hangout link by end of day Friday so please mark your >>>>>> availability >>>>>> >>> > using the link above. >>>>>> >>> > >>>>>> >>> > The agenda of the meeting will be as follows: >>>>>> >>> > * Overview of the proposal >>>>>> >>> > * Enumerate and discuss/answer questions brought up in the >>>>>> meeting >>>>>> >>> > >>>>>> >>> > Note that all questions and any discussions/answers >>>>>> provided will be >>>>>> >>> > added to the doc for those who are unable to attend. >>>>>> >>> > >>>>>> >>> > On Fri, Aug 31, 2018 at 9:47 AM Jean-Baptiste Onofré >>>>>> >>> > <[email protected] <mailto:[email protected]>> wrote: >>>>>> >>> > >>>>>> >>> > +1 >>>>>> >>> > >>>>>> >>> > Regards >>>>>> >>> > JB >>>>>> >>> > Le 31 août 2018, à 18:22, Lukasz Cwik <[email protected] >>>>>> >>> > <mailto:[email protected]>> a écrit: >>>>>> >>> > >>>>>> >>> > That is possible, I'll take people's date/time >>>>>> suggestions >>>>>> >>> > and create a simple online poll with them. >>>>>> >>> > >>>>>> >>> > On Fri, Aug 31, 2018 at 2:22 AM Robert Bradshaw >>>>>> >>> > <[email protected] <mailto:[email protected]>> >>>>>> wrote: >>>>>> >>> > >>>>>> >>> > Thanks for taking this up. I added some >>>>>> comments to the >>>>>> >>> > doc. A European-friendly time for discussion >>>>>> would >>>>>> >>> > be great. >>>>>> >>> > >>>>>> >>> > On Fri, Aug 31, 2018 at 3:14 AM Lukasz Cwik >>>>>> >>> > <[email protected] <mailto:[email protected]>> >>>>>> wrote: >>>>>> >>> > >>>>>> >>> > I came up with a proposal[1] for a progress >>>>>> model >>>>>> >>> > solely based off of the backlog and that >>>>>> splits >>>>>> >>> > should be based upon the remaining backlog >>>>>> we want >>>>>> >>> > the SDK to split at. I also give >>>>>> recommendations to >>>>>> >>> > runner authors as to how an autoscaling >>>>>> system could >>>>>> >>> > work based upon the measured backlog. A lot >>>>>> of >>>>>> >>> > discussions around progress reporting and >>>>>> splitting >>>>>> >>> > in the past has always been around finding >>>>>> an >>>>>> >>> > optimal solution, after reading a lot of >>>>>> information >>>>>> >>> > about work stealing, I don't believe there >>>>>> is a >>>>>> >>> > general solution and it really is upto >>>>>> >>> > SplittableDoFns to be well behaved. I did >>>>>> not do >>>>>> >>> > much work in classifying what a well behaved >>>>>> >>> > SplittableDoFn is though. Much of this work >>>>>> builds >>>>>> >>> > off ideas that Eugene had documented in the >>>>>> past[2]. >>>>>> >>> > >>>>>> >>> > I could use the communities wide knowledge >>>>>> of >>>>>> >>> > different I/Os to see if computing the >>>>>> backlog is >>>>>> >>> > practical in the way that I'm suggesting >>>>>> and to >>>>>> >>> > gather people's feedback. >>>>>> >>> > >>>>>> >>> > If there is a lot of interest, I would like >>>>>> to hold >>>>>> >>> > a community video conference between Sept >>>>>> 10th and >>>>>> >>> > 14th about this topic. Please reply with >>>>>> your >>>>>> >>> > availability by Sept 6th if your interested. >>>>>> >>> > >>>>>> >>> > 1: >>>>>> https://s.apache.org/beam-bundles-backlog-splitting >>>>>> >>> > 2: >>>>>> https://s.apache.org/beam-breaking-fusion >>>>>> >>> > >>>>>> >>> > On Mon, Aug 13, 2018 at 10:21 AM >>>>>> Jean-Baptiste >>>>>> >>> > Onofré <[email protected] <mailto: >>>>>> [email protected]>> wrote: >>>>>> >>> > >>>>>> >>> > Awesome ! >>>>>> >>> > >>>>>> >>> > Thanks Luke ! >>>>>> >>> > >>>>>> >>> > I plan to work with you and others on >>>>>> this one. >>>>>> >>> > >>>>>> >>> > Regards >>>>>> >>> > JB >>>>>> >>> > Le 13 août 2018, à 19:14, Lukasz Cwik >>>>>> >>> > <[email protected] <mailto: >>>>>> [email protected]>> a >>>>>> >>> > écrit: >>>>>> >>> > >>>>>> >>> > I wanted to reach out that I will be >>>>>> >>> > continuing from where Eugene left >>>>>> off with >>>>>> >>> > SplittableDoFn. I know that many of >>>>>> you have >>>>>> >>> > done a bunch of work with IOs >>>>>> and/or runner >>>>>> >>> > integration for SplittableDoFn and >>>>>> would >>>>>> >>> > appreciate your help in advancing >>>>>> this >>>>>> >>> > awesome idea. If you have questions >>>>>> or >>>>>> >>> > things you want to get reviewed >>>>>> related to >>>>>> >>> > SplittableDoFn, feel free to send >>>>>> them my >>>>>> >>> > way or include me on anything >>>>>> SplittableDoFn >>>>>> >>> > related. >>>>>> >>> > >>>>>> >>> > I was part of several discussions >>>>>> with >>>>>> >>> > Eugene and I think the biggest >>>>>> outstanding >>>>>> >>> > design portion is to figure out how >>>>>> dynamic >>>>>> >>> > work rebalancing would play out >>>>>> with the >>>>>> >>> > portability APIs. This includes >>>>>> reporting of >>>>>> >>> > progress from within a bundle. I >>>>>> know that >>>>>> >>> > Eugene had shared some documents in >>>>>> this >>>>>> >>> > regard but the position / split >>>>>> models >>>>>> >>> > didn't work too cleanly in a >>>>>> unified sense >>>>>> >>> > for bounded and unbounded >>>>>> SplittableDoFns. >>>>>> >>> > It will likely take me awhile to >>>>>> gather my >>>>>> >>> > thoughts but could use your >>>>>> expertise as to >>>>>> >>> > how compatible these ideas are with >>>>>> respect >>>>>> >>> > to to IOs and runners >>>>>> >>> > Flink/Spark/Dataflow/Samza/Apex/... >>>>>> and >>>>>> >>> > obviously help during >>>>>> implementation. >>>>>> >>> > >>>>>> >>>>>
