Can you give details on what the synchronization is per? Is it per key, or global to each worker?
On Fri, Sep 21, 2018 at 2:10 PM Lukasz Cwik <[email protected]> wrote: > As I was looking at the SplittableDoFn API while working towards making a > proposal for how the backlog/splitting API could look, I found some sharp > edges that could be improved. > > I noticed that: > 1) We require users to write thread safe code, this is something that we > haven't asked of users when writing a DoFn. > 2) We "internal" methods within the RestrictionTracker that are not meant > to be used by the runner. > > I can fix these issues by giving the user a forwarding restriction > tracker[1] that provides an appropriate level of synchronization as needed > and also provides the necessary observation hooks to see when a claim > failed or succeeded. > > This requires a change to our experimental API since we need to pass > a RestrictionTracker to the @ProcessElement method instead of a sub-type of > RestrictionTracker. > @ProcessElement > processElement(ProcessContext context, OffsetRangeTracker tracker) { ... } > becomes: > @ProcessElement > processElement(ProcessContext context, RestrictionTracker<OffsetRange, > Long> tracker) { ... } > > This provides an additional benefit that it prevents users from working > around the RestrictionTracker APIs and potentially making underlying > changes to the tracker outside of the tryClaim call. > > Full implementation is available within this PR[2] and was wondering what > people thought. > > 1: > https://github.com/apache/beam/pull/6467/files#diff-ed95abb6bc30a9ed07faef5c3fea93f0R72 > 2: https://github.com/apache/beam/pull/6467 > > > On Mon, Sep 17, 2018 at 12:45 PM Lukasz Cwik <[email protected]> wrote: > >> The changes to the API have not been proposed yet. So far it has all been >> about what is the representation and why. >> >> For splitting, the current idea has been about using the backlog as a way >> of telling the SplittableDoFn where to split, so it would be in terms of >> whatever the SDK decided to report. >> The runner always chooses a number for backlog that is relative to the >> SDKs reported backlog. It would be upto the SDK to round/clamp the number >> given by the Runner to represent something meaningful for itself. >> For example if the backlog that the SDK was reporting was bytes remaining >> in a file such as 500, then the Runner could provide some value like 212.2 >> which the SDK would then round to 212. >> If the backlog that the SDK was reporting 57 pubsub messages, then the >> Runner could provide a value like 300 which would mean to read 57 values >> and then another 243 as part of the current restriction. >> >> I believe that BoundedSource/UnboundedSource will have wrappers added >> that provide a basic SplittableDoFn implementation so existing IOs should >> be migrated over without API changes. >> >> On Mon, Sep 17, 2018 at 1:11 AM Ismaël Mejía <[email protected]> wrote: >> >>> Thanks a lot Luke for bringing this back to the mailing list and Ryan >>> for taking >>> the notes. >>> >>> I would like to know if there was some discussion, or if you guys have >>> given >>> some thought to the required changes in the SDK (API) part. What will be >>> the >>> equivalent of `splitAtFraction` and what should IO authors do to support >>> it.. >>> >>> On Sat, Sep 15, 2018 at 1:52 AM Lukasz Cwik <[email protected]> wrote: >>> > >>> > Thanks to everyone who joined and for the questions asked. >>> > >>> > Ryan graciously collected notes of the discussion: >>> https://docs.google.com/document/d/1kjJLGIiNAGvDiUCMEtQbw8tyOXESvwGeGZLL-0M06fQ/edit?usp=sharing >>> > >>> > The summary was that bringing BoundedSource/UnboundedSource into using >>> a unified backlog-reporting mechanism with optional other signals that >>> Dataflow has found useful (such as is the remaining restriction splittable >>> (yes, no, unknown)). Other runners can use or not. SDFs should report >>> backlog and watermark as minimum bar. The backlog should use an arbitrary >>> precision float such as Java BigDecimal to prevent issues where limited >>> precision removes the ability to compute delta efficiently. >>> > >>> > >>> > >>> > On Wed, Sep 12, 2018 at 3:54 PM Lukasz Cwik <[email protected]> wrote: >>> >> >>> >> Here is the link to join the discussion: >>> https://meet.google.com/idc-japs-hwf >>> >> Remember that it is this Friday Sept 14th from 11am-noon PST. >>> >> >>> >> >>> >> >>> >> On Mon, Sep 10, 2018 at 7:30 AM Maximilian Michels <[email protected]> >>> wrote: >>> >>> >>> >>> Thanks for moving forward with this, Lukasz! >>> >>> >>> >>> Unfortunately, can't make it on Friday but I'll sync with somebody on >>> >>> the call (e.g. Ryan) about your discussion. >>> >>> >>> >>> On 08.09.18 02:00, Lukasz Cwik wrote: >>> >>> > Thanks for everyone who wanted to fill out the doodle poll. The >>> most >>> >>> > popular time was Friday Sept 14th from 11am-noon PST. I'll send >>> out a >>> >>> > calendar invite and meeting link early next week. >>> >>> > >>> >>> > I have received a lot of feedback on the document and have >>> addressed >>> >>> > some parts of it including: >>> >>> > * clarifying terminology >>> >>> > * processing skew due to some restrictions having their watermarks >>> much >>> >>> > further behind then others affecting scheduling of bundles by >>> runners >>> >>> > * external throttling & I/O wait overhead reporting to make sure we >>> >>> > don't overscale >>> >>> > >>> >>> > Areas that still need additional feedback and details are: >>> >>> > * reporting progress around the work that is done and is active >>> >>> > * more examples >>> >>> > * unbounded restrictions being caused by an unbounded number of >>> splits >>> >>> > of existing unbounded restrictions (infinite work growth) >>> >>> > * whether we should be reporting this information at the PTransform >>> >>> > level or at the bundle level >>> >>> > >>> >>> > >>> >>> > >>> >>> > On Wed, Sep 5, 2018 at 1:53 PM Lukasz Cwik <[email protected] >>> >>> > <mailto:[email protected]>> wrote: >>> >>> > >>> >>> > Thanks to all those who have provided interest in this topic >>> by the >>> >>> > questions they have asked on the doc already and for those >>> >>> > interested in having this discussion. I have setup this doodle >>> to >>> >>> > allow people to provide their availability: >>> >>> > https://doodle.com/poll/nrw7w84255xnfwqy >>> >>> > >>> >>> > I'll send out the chosen time based upon peoples availability >>> and a >>> >>> > Hangout link by end of day Friday so please mark your >>> availability >>> >>> > using the link above. >>> >>> > >>> >>> > The agenda of the meeting will be as follows: >>> >>> > * Overview of the proposal >>> >>> > * Enumerate and discuss/answer questions brought up in the >>> meeting >>> >>> > >>> >>> > Note that all questions and any discussions/answers provided >>> will be >>> >>> > added to the doc for those who are unable to attend. >>> >>> > >>> >>> > On Fri, Aug 31, 2018 at 9:47 AM Jean-Baptiste Onofré >>> >>> > <[email protected] <mailto:[email protected]>> wrote: >>> >>> > >>> >>> > +1 >>> >>> > >>> >>> > Regards >>> >>> > JB >>> >>> > Le 31 août 2018, à 18:22, Lukasz Cwik <[email protected] >>> >>> > <mailto:[email protected]>> a écrit: >>> >>> > >>> >>> > That is possible, I'll take people's date/time >>> suggestions >>> >>> > and create a simple online poll with them. >>> >>> > >>> >>> > On Fri, Aug 31, 2018 at 2:22 AM Robert Bradshaw >>> >>> > <[email protected] <mailto:[email protected]>> >>> wrote: >>> >>> > >>> >>> > Thanks for taking this up. I added some comments >>> to the >>> >>> > doc. A European-friendly time for discussion would >>> >>> > be great. >>> >>> > >>> >>> > On Fri, Aug 31, 2018 at 3:14 AM Lukasz Cwik >>> >>> > <[email protected] <mailto:[email protected]>> >>> wrote: >>> >>> > >>> >>> > I came up with a proposal[1] for a progress >>> model >>> >>> > solely based off of the backlog and that splits >>> >>> > should be based upon the remaining backlog we >>> want >>> >>> > the SDK to split at. I also give >>> recommendations to >>> >>> > runner authors as to how an autoscaling system >>> could >>> >>> > work based upon the measured backlog. A lot of >>> >>> > discussions around progress reporting and >>> splitting >>> >>> > in the past has always been around finding an >>> >>> > optimal solution, after reading a lot of >>> information >>> >>> > about work stealing, I don't believe there is a >>> >>> > general solution and it really is upto >>> >>> > SplittableDoFns to be well behaved. I did not >>> do >>> >>> > much work in classifying what a well behaved >>> >>> > SplittableDoFn is though. Much of this work >>> builds >>> >>> > off ideas that Eugene had documented in the >>> past[2]. >>> >>> > >>> >>> > I could use the communities wide knowledge of >>> >>> > different I/Os to see if computing the backlog >>> is >>> >>> > practical in the way that I'm suggesting and to >>> >>> > gather people's feedback. >>> >>> > >>> >>> > If there is a lot of interest, I would like to >>> hold >>> >>> > a community video conference between Sept 10th >>> and >>> >>> > 14th about this topic. Please reply with your >>> >>> > availability by Sept 6th if your interested. >>> >>> > >>> >>> > 1: >>> https://s.apache.org/beam-bundles-backlog-splitting >>> >>> > 2: https://s.apache.org/beam-breaking-fusion >>> >>> > >>> >>> > On Mon, Aug 13, 2018 at 10:21 AM Jean-Baptiste >>> >>> > Onofré <[email protected] <mailto: >>> [email protected]>> wrote: >>> >>> > >>> >>> > Awesome ! >>> >>> > >>> >>> > Thanks Luke ! >>> >>> > >>> >>> > I plan to work with you and others on this >>> one. >>> >>> > >>> >>> > Regards >>> >>> > JB >>> >>> > Le 13 août 2018, à 19:14, Lukasz Cwik >>> >>> > <[email protected] <mailto:[email protected]>> >>> a >>> >>> > écrit: >>> >>> > >>> >>> > I wanted to reach out that I will be >>> >>> > continuing from where Eugene left off >>> with >>> >>> > SplittableDoFn. I know that many of >>> you have >>> >>> > done a bunch of work with IOs and/or >>> runner >>> >>> > integration for SplittableDoFn and >>> would >>> >>> > appreciate your help in advancing this >>> >>> > awesome idea. If you have questions or >>> >>> > things you want to get reviewed >>> related to >>> >>> > SplittableDoFn, feel free to send them >>> my >>> >>> > way or include me on anything >>> SplittableDoFn >>> >>> > related. >>> >>> > >>> >>> > I was part of several discussions with >>> >>> > Eugene and I think the biggest >>> outstanding >>> >>> > design portion is to figure out how >>> dynamic >>> >>> > work rebalancing would play out with >>> the >>> >>> > portability APIs. This includes >>> reporting of >>> >>> > progress from within a bundle. I know >>> that >>> >>> > Eugene had shared some documents in >>> this >>> >>> > regard but the position / split models >>> >>> > didn't work too cleanly in a unified >>> sense >>> >>> > for bounded and unbounded >>> SplittableDoFns. >>> >>> > It will likely take me awhile to >>> gather my >>> >>> > thoughts but could use your expertise >>> as to >>> >>> > how compatible these ideas are with >>> respect >>> >>> > to to IOs and runners >>> >>> > Flink/Spark/Dataflow/Samza/Apex/... and >>> >>> > obviously help during implementation. >>> >>> > >>> >>
