Don't want to crash the tech discussion here, but... I just gave a session at the Beam Summit about Splittable DoFn's as a users perspective (from things I could gather from the documentation and experimentation). Her is the slides deck, maybe it could be useful: https://docs.google.com/presentation/d/1dSc6oKh5pZItQPB_QiUyEoLT2TebMnj-pmdGipkVFPk/edit?usp=sharing (quite proud of the animations though ;-)
_/ _/ Alex Van Boxel On Thu, Sep 27, 2018 at 12:04 AM Lukasz Cwik <[email protected]> wrote: > Reuven, just inside the restriction tracker itself which is scoped per > executing SplittableDoFn. A user could incorrectly write the > synchronization since they are currently responsible for writing it though. > > On Wed, Sep 26, 2018 at 2:51 PM Reuven Lax <[email protected]> wrote: > >> is synchronization over an entire work item, or just inside restriction >> tracker? my concern is that some runners (especially streaming runners) >> might have hundreds or thousands of parallel work items being processed for >> the same SDF (for different keys), and I'm afraid of creating >> lock-contention bottlenecks. >> >> On Fri, Sep 21, 2018 at 3:42 PM Lukasz Cwik <[email protected]> wrote: >> >>> The synchronization is related to Java thread safety since there is >>> likely to be concurrent access needed to a restriction tracker to properly >>> handle accessing the backlog and splitting concurrently from when the users >>> DoFn is executing and updating the restriction tracker. This is similar to >>> the Java thread safety needed in BoundedSource and UnboundedSource for >>> fraction consumed, backlog bytes, and splitting. >>> >>> On Fri, Sep 21, 2018 at 2:38 PM Reuven Lax <[email protected]> wrote: >>> >>>> Can you give details on what the synchronization is per? Is it per key, >>>> or global to each worker? >>>> >>>> On Fri, Sep 21, 2018 at 2:10 PM Lukasz Cwik <[email protected]> wrote: >>>> >>>>> As I was looking at the SplittableDoFn API while working towards >>>>> making a proposal for how the backlog/splitting API could look, I found >>>>> some sharp edges that could be improved. >>>>> >>>>> I noticed that: >>>>> 1) We require users to write thread safe code, this is something that >>>>> we haven't asked of users when writing a DoFn. >>>>> 2) We "internal" methods within the RestrictionTracker that are not >>>>> meant to be used by the runner. >>>>> >>>>> I can fix these issues by giving the user a forwarding restriction >>>>> tracker[1] that provides an appropriate level of synchronization as needed >>>>> and also provides the necessary observation hooks to see when a claim >>>>> failed or succeeded. >>>>> >>>>> This requires a change to our experimental API since we need to pass >>>>> a RestrictionTracker to the @ProcessElement method instead of a sub-type >>>>> of >>>>> RestrictionTracker. >>>>> @ProcessElement >>>>> processElement(ProcessContext context, OffsetRangeTracker tracker) { >>>>> ... } >>>>> becomes: >>>>> @ProcessElement >>>>> processElement(ProcessContext context, RestrictionTracker<OffsetRange, >>>>> Long> tracker) { ... } >>>>> >>>>> This provides an additional benefit that it prevents users from >>>>> working around the RestrictionTracker APIs and potentially making >>>>> underlying changes to the tracker outside of the tryClaim call. >>>>> >>>>> Full implementation is available within this PR[2] and was wondering >>>>> what people thought. >>>>> >>>>> 1: >>>>> https://github.com/apache/beam/pull/6467/files#diff-ed95abb6bc30a9ed07faef5c3fea93f0R72 >>>>> 2: https://github.com/apache/beam/pull/6467 >>>>> >>>>> >>>>> On Mon, Sep 17, 2018 at 12:45 PM Lukasz Cwik <[email protected]> wrote: >>>>> >>>>>> The changes to the API have not been proposed yet. So far it has all >>>>>> been about what is the representation and why. >>>>>> >>>>>> For splitting, the current idea has been about using the backlog as a >>>>>> way of telling the SplittableDoFn where to split, so it would be in terms >>>>>> of whatever the SDK decided to report. >>>>>> The runner always chooses a number for backlog that is relative to >>>>>> the SDKs reported backlog. It would be upto the SDK to round/clamp the >>>>>> number given by the Runner to represent something meaningful for itself. >>>>>> For example if the backlog that the SDK was reporting was bytes >>>>>> remaining in a file such as 500, then the Runner could provide some value >>>>>> like 212.2 which the SDK would then round to 212. >>>>>> If the backlog that the SDK was reporting 57 pubsub messages, then >>>>>> the Runner could provide a value like 300 which would mean to read 57 >>>>>> values and then another 243 as part of the current restriction. >>>>>> >>>>>> I believe that BoundedSource/UnboundedSource will have wrappers added >>>>>> that provide a basic SplittableDoFn implementation so existing IOs should >>>>>> be migrated over without API changes. >>>>>> >>>>>> On Mon, Sep 17, 2018 at 1:11 AM Ismaël Mejía <[email protected]> >>>>>> wrote: >>>>>> >>>>>>> Thanks a lot Luke for bringing this back to the mailing list and >>>>>>> Ryan for taking >>>>>>> the notes. >>>>>>> >>>>>>> I would like to know if there was some discussion, or if you guys >>>>>>> have given >>>>>>> some thought to the required changes in the SDK (API) part. What >>>>>>> will be the >>>>>>> equivalent of `splitAtFraction` and what should IO authors do to >>>>>>> support it.. >>>>>>> >>>>>>> On Sat, Sep 15, 2018 at 1:52 AM Lukasz Cwik <[email protected]> >>>>>>> wrote: >>>>>>> > >>>>>>> > Thanks to everyone who joined and for the questions asked. >>>>>>> > >>>>>>> > Ryan graciously collected notes of the discussion: >>>>>>> https://docs.google.com/document/d/1kjJLGIiNAGvDiUCMEtQbw8tyOXESvwGeGZLL-0M06fQ/edit?usp=sharing >>>>>>> > >>>>>>> > The summary was that bringing BoundedSource/UnboundedSource into >>>>>>> using a unified backlog-reporting mechanism with optional other signals >>>>>>> that Dataflow has found useful (such as is the remaining restriction >>>>>>> splittable (yes, no, unknown)). Other runners can use or not. SDFs >>>>>>> should >>>>>>> report backlog and watermark as minimum bar. The backlog should use an >>>>>>> arbitrary precision float such as Java BigDecimal to prevent issues >>>>>>> where >>>>>>> limited precision removes the ability to compute delta efficiently. >>>>>>> > >>>>>>> > >>>>>>> > >>>>>>> > On Wed, Sep 12, 2018 at 3:54 PM Lukasz Cwik <[email protected]> >>>>>>> wrote: >>>>>>> >> >>>>>>> >> Here is the link to join the discussion: >>>>>>> https://meet.google.com/idc-japs-hwf >>>>>>> >> Remember that it is this Friday Sept 14th from 11am-noon PST. >>>>>>> >> >>>>>>> >> >>>>>>> >> >>>>>>> >> On Mon, Sep 10, 2018 at 7:30 AM Maximilian Michels < >>>>>>> [email protected]> wrote: >>>>>>> >>> >>>>>>> >>> Thanks for moving forward with this, Lukasz! >>>>>>> >>> >>>>>>> >>> Unfortunately, can't make it on Friday but I'll sync with >>>>>>> somebody on >>>>>>> >>> the call (e.g. Ryan) about your discussion. >>>>>>> >>> >>>>>>> >>> On 08.09.18 02:00, Lukasz Cwik wrote: >>>>>>> >>> > Thanks for everyone who wanted to fill out the doodle poll. >>>>>>> The most >>>>>>> >>> > popular time was Friday Sept 14th from 11am-noon PST. I'll >>>>>>> send out a >>>>>>> >>> > calendar invite and meeting link early next week. >>>>>>> >>> > >>>>>>> >>> > I have received a lot of feedback on the document and have >>>>>>> addressed >>>>>>> >>> > some parts of it including: >>>>>>> >>> > * clarifying terminology >>>>>>> >>> > * processing skew due to some restrictions having their >>>>>>> watermarks much >>>>>>> >>> > further behind then others affecting scheduling of bundles by >>>>>>> runners >>>>>>> >>> > * external throttling & I/O wait overhead reporting to make >>>>>>> sure we >>>>>>> >>> > don't overscale >>>>>>> >>> > >>>>>>> >>> > Areas that still need additional feedback and details are: >>>>>>> >>> > * reporting progress around the work that is done and is active >>>>>>> >>> > * more examples >>>>>>> >>> > * unbounded restrictions being caused by an unbounded number >>>>>>> of splits >>>>>>> >>> > of existing unbounded restrictions (infinite work growth) >>>>>>> >>> > * whether we should be reporting this information at the >>>>>>> PTransform >>>>>>> >>> > level or at the bundle level >>>>>>> >>> > >>>>>>> >>> > >>>>>>> >>> > >>>>>>> >>> > On Wed, Sep 5, 2018 at 1:53 PM Lukasz Cwik <[email protected] >>>>>>> >>> > <mailto:[email protected]>> wrote: >>>>>>> >>> > >>>>>>> >>> > Thanks to all those who have provided interest in this >>>>>>> topic by the >>>>>>> >>> > questions they have asked on the doc already and for those >>>>>>> >>> > interested in having this discussion. I have setup this >>>>>>> doodle to >>>>>>> >>> > allow people to provide their availability: >>>>>>> >>> > https://doodle.com/poll/nrw7w84255xnfwqy >>>>>>> >>> > >>>>>>> >>> > I'll send out the chosen time based upon peoples >>>>>>> availability and a >>>>>>> >>> > Hangout link by end of day Friday so please mark your >>>>>>> availability >>>>>>> >>> > using the link above. >>>>>>> >>> > >>>>>>> >>> > The agenda of the meeting will be as follows: >>>>>>> >>> > * Overview of the proposal >>>>>>> >>> > * Enumerate and discuss/answer questions brought up in the >>>>>>> meeting >>>>>>> >>> > >>>>>>> >>> > Note that all questions and any discussions/answers >>>>>>> provided will be >>>>>>> >>> > added to the doc for those who are unable to attend. >>>>>>> >>> > >>>>>>> >>> > On Fri, Aug 31, 2018 at 9:47 AM Jean-Baptiste Onofré >>>>>>> >>> > <[email protected] <mailto:[email protected]>> wrote: >>>>>>> >>> > >>>>>>> >>> > +1 >>>>>>> >>> > >>>>>>> >>> > Regards >>>>>>> >>> > JB >>>>>>> >>> > Le 31 août 2018, à 18:22, Lukasz Cwik < >>>>>>> [email protected] >>>>>>> >>> > <mailto:[email protected]>> a écrit: >>>>>>> >>> > >>>>>>> >>> > That is possible, I'll take people's date/time >>>>>>> suggestions >>>>>>> >>> > and create a simple online poll with them. >>>>>>> >>> > >>>>>>> >>> > On Fri, Aug 31, 2018 at 2:22 AM Robert Bradshaw >>>>>>> >>> > <[email protected] <mailto:[email protected]>> >>>>>>> wrote: >>>>>>> >>> > >>>>>>> >>> > Thanks for taking this up. I added some >>>>>>> comments to the >>>>>>> >>> > doc. A European-friendly time for discussion >>>>>>> would >>>>>>> >>> > be great. >>>>>>> >>> > >>>>>>> >>> > On Fri, Aug 31, 2018 at 3:14 AM Lukasz Cwik >>>>>>> >>> > <[email protected] <mailto:[email protected]>> >>>>>>> wrote: >>>>>>> >>> > >>>>>>> >>> > I came up with a proposal[1] for a >>>>>>> progress model >>>>>>> >>> > solely based off of the backlog and that >>>>>>> splits >>>>>>> >>> > should be based upon the remaining backlog >>>>>>> we want >>>>>>> >>> > the SDK to split at. I also give >>>>>>> recommendations to >>>>>>> >>> > runner authors as to how an autoscaling >>>>>>> system could >>>>>>> >>> > work based upon the measured backlog. A >>>>>>> lot of >>>>>>> >>> > discussions around progress reporting and >>>>>>> splitting >>>>>>> >>> > in the past has always been around finding >>>>>>> an >>>>>>> >>> > optimal solution, after reading a lot of >>>>>>> information >>>>>>> >>> > about work stealing, I don't believe there >>>>>>> is a >>>>>>> >>> > general solution and it really is upto >>>>>>> >>> > SplittableDoFns to be well behaved. I did >>>>>>> not do >>>>>>> >>> > much work in classifying what a well >>>>>>> behaved >>>>>>> >>> > SplittableDoFn is though. Much of this >>>>>>> work builds >>>>>>> >>> > off ideas that Eugene had documented in >>>>>>> the past[2]. >>>>>>> >>> > >>>>>>> >>> > I could use the communities wide knowledge >>>>>>> of >>>>>>> >>> > different I/Os to see if computing the >>>>>>> backlog is >>>>>>> >>> > practical in the way that I'm suggesting >>>>>>> and to >>>>>>> >>> > gather people's feedback. >>>>>>> >>> > >>>>>>> >>> > If there is a lot of interest, I would >>>>>>> like to hold >>>>>>> >>> > a community video conference between Sept >>>>>>> 10th and >>>>>>> >>> > 14th about this topic. Please reply with >>>>>>> your >>>>>>> >>> > availability by Sept 6th if your >>>>>>> interested. >>>>>>> >>> > >>>>>>> >>> > 1: >>>>>>> https://s.apache.org/beam-bundles-backlog-splitting >>>>>>> >>> > 2: >>>>>>> https://s.apache.org/beam-breaking-fusion >>>>>>> >>> > >>>>>>> >>> > On Mon, Aug 13, 2018 at 10:21 AM >>>>>>> Jean-Baptiste >>>>>>> >>> > Onofré <[email protected] <mailto: >>>>>>> [email protected]>> wrote: >>>>>>> >>> > >>>>>>> >>> > Awesome ! >>>>>>> >>> > >>>>>>> >>> > Thanks Luke ! >>>>>>> >>> > >>>>>>> >>> > I plan to work with you and others on >>>>>>> this one. >>>>>>> >>> > >>>>>>> >>> > Regards >>>>>>> >>> > JB >>>>>>> >>> > Le 13 août 2018, à 19:14, Lukasz Cwik >>>>>>> >>> > <[email protected] <mailto: >>>>>>> [email protected]>> a >>>>>>> >>> > écrit: >>>>>>> >>> > >>>>>>> >>> > I wanted to reach out that I will >>>>>>> be >>>>>>> >>> > continuing from where Eugene left >>>>>>> off with >>>>>>> >>> > SplittableDoFn. I know that many >>>>>>> of you have >>>>>>> >>> > done a bunch of work with IOs >>>>>>> and/or runner >>>>>>> >>> > integration for SplittableDoFn and >>>>>>> would >>>>>>> >>> > appreciate your help in advancing >>>>>>> this >>>>>>> >>> > awesome idea. If you have >>>>>>> questions or >>>>>>> >>> > things you want to get reviewed >>>>>>> related to >>>>>>> >>> > SplittableDoFn, feel free to send >>>>>>> them my >>>>>>> >>> > way or include me on anything >>>>>>> SplittableDoFn >>>>>>> >>> > related. >>>>>>> >>> > >>>>>>> >>> > I was part of several discussions >>>>>>> with >>>>>>> >>> > Eugene and I think the biggest >>>>>>> outstanding >>>>>>> >>> > design portion is to figure out >>>>>>> how dynamic >>>>>>> >>> > work rebalancing would play out >>>>>>> with the >>>>>>> >>> > portability APIs. This includes >>>>>>> reporting of >>>>>>> >>> > progress from within a bundle. I >>>>>>> know that >>>>>>> >>> > Eugene had shared some documents >>>>>>> in this >>>>>>> >>> > regard but the position / split >>>>>>> models >>>>>>> >>> > didn't work too cleanly in a >>>>>>> unified sense >>>>>>> >>> > for bounded and unbounded >>>>>>> SplittableDoFns. >>>>>>> >>> > It will likely take me awhile to >>>>>>> gather my >>>>>>> >>> > thoughts but could use your >>>>>>> expertise as to >>>>>>> >>> > how compatible these ideas are >>>>>>> with respect >>>>>>> >>> > to to IOs and runners >>>>>>> >>> > >>>>>>> Flink/Spark/Dataflow/Samza/Apex/... and >>>>>>> >>> > obviously help during >>>>>>> implementation. >>>>>>> >>> > >>>>>>> >>>>>>
