Was that only October? Wow. Option 2 SGTM, with the adjustment to making the core of the URN "redistribute_allowing_duplicates" instead of building from the unspecified Reshuffle semantics.
Transforms getting updated to use the new transform can have their @RequiresStableInputs annotation added accordingly if they need that property per previous discussions. On Thu, Feb 8, 2024, 10:31 AM Kenneth Knowles <k...@apache.org> wrote: > > > On Wed, Feb 7, 2024 at 5:15 PM Robert Burke <lostl...@apache.org> wrote: > >> OK, so my stance is a configurable Reshuffle might be interesting, so my >> vote is +1, along the following lines. >> >> 1. Use a new URN (beam:transform:reshuffle:v2) and attach a new >> ReshufflePayload to it. >> > > Ah, I see there's more than one variation of the "new URN" approach. > Namely, you have a new version of an existing URN prefix, while I had in > mind that it was a totally new base URN. In other words the open question I > meant to pose is between these options: > > 1. beam:transform:reshuffle:v2 + { allowing_duplicates: true } > 2. beam:transform:reshuffle_allowing_duplicates:v1 {} > > The most compelling argument in favor of option 2 is that it could have a > distinct payload type associated with the different URN (maybe parameters > around tweaking how much duplication? I don't know... I actually expect > neither payload to evolve much if at all). > > There were also two comments in favor of option 2 on the design doc. > > -> Unknown "urns for composite transforms" already default to the >> subtransform graph implementation for most (all?) runners. >> -> Having a payload to toggle this behavior then can have whatever >> desired behavior we like. It also allows for additional configurations >> added in later on. This is preferable to a plethora of one-off urns IMHO. >> We can have SDKs gate configuration combinations as needed if additional >> ones appear. >> >> 2. It's very cheap to add but also ignore, as the default is "Do what >> we're already doing without change", and not all SDKs need to add it right >> away. It's more important that the portable way is defined at least, so >> it's easy for other SDKs to add and handle it. >> >> I would prefer we have a clear starting point on what Reshuffle does >> though. I remain a fan of "The Reshuffle (v2) Transform is a user >> designated hint to a runner for a change in parallelism. By default, it >> produces an output PCollection that has the same elements as the input >> PCollection". >> > > +1 this is a better phrasing of the spec I propose in > https://s.apache.org/beam-redistribute but let's not get into it here if > we can, and just evaluate the delta from that design to > https://s.apache.org/beam-reshuffle-allowing-duplicates > > Kenn > > >> It remains an open question about what that means for >> checkpointing/durability behavior, but that's largely been runner dependent >> anyway. I admit the above definition is biased by the uses of Reshuffle I'm >> aware of, which largely are to incur a fusion break in the execution graph. >> >> Robert Burke >> Beam Go Busybody >> >> On 2024/01/31 16:01:33 Kenneth Knowles wrote: >> > On Wed, Jan 31, 2024 at 4:21 AM Jan Lukavský <je...@seznam.cz> wrote: >> > >> > > Hi, >> > > >> > > if I understand this proposal correctly, the motivation is actually >> > > reducing latency by bypassing bundle atomic guarantees, bundles after >> "at >> > > least once" Reshuffle would be reconstructed independently of the >> > > pre-shuffle bundling. Provided this is correct, it seems that the >> behavior >> > > is slightly more general than for the case of Reshuffle. We have >> already >> > > some transforms that manipulate a specific property of a PCollection >> - if >> > > it may or might not contain duplicates. That is manipulated in two >> ways - >> > > explicitly removing duplicates based on IDs on sources that generate >> > > duplicates and using @RequiresStableInput, mostly in sinks. These >> > > techniques modify an inherent property of a PCollection, that is if it >> > > contains or does not contain possible duplicates originating from the >> same >> > > input element. >> > > >> > > There are two types of duplicates - duplicate elements in _different >> > > bundles_ (typically from at-least-once sources) and duplicates >> arising due >> > > to bundle reprocessing (affecting only transforms with side-effects, >> that >> > > is what we solve by @RequiresStableInput). The point I'm trying to >> get to - >> > > should we add these properties to PCollections (contains cross-bundle >> > > duplicates vs. does not) and PTransforms ("outputs deduplicated >> elements" >> > > and "requires stable input")? That would allow us to analyze the >> Pipeline >> > > DAG and provide appropriate implementation for Reshuffle >> automatically, so >> > > that a new URN or flag would not be needed. Moreover, this might be >> useful >> > > for a broader range of optimizations. >> > > >> > > WDYT? >> > > >> > These are interesting ideas that could be useful. I think they achieve a >> > different goal in my case. I actually want to explicitly allow >> > Reshuffle.allowingDuplicates() to skip expensive parts of its >> > implementation that are used to prevent duplicates. >> > >> > The property that would make it possible to automate this in the case of >> > combiners, or at least validate that the pipeline still gives 100% >> accurate >> > answers, would be something like @InsensitiveToDuplicateElements which >> is >> > longer and less esoteric than @Idempotent. For situations where there >> is a >> > source or sink that only has at-least-once guarantees then yea maybe the >> > property "has duplicates" will let you know that you may as well use the >> > duplicating reshuffle without any loss. But still, you may not want to >> > introduce *more* duplicates. >> > >> > I would say my proposal is a step in this direction that would gain some >> > experience and tools that we might later use in a more automated way. >> > >> > Kenn >> > >> > > Jan >> > > On 1/30/24 23:22, Robert Burke wrote: >> > > >> > > Is the benefit of this proposal just the bounded deviation from the >> > > existing reshuffle? >> > > >> > > Reshuffle is already rather dictated by arbitrary runner choice, from >> > > simply ignoring the node, to forcing a materialization break, to a >> full >> > > shuffle implementation which has additional side effects. >> > > >> > > But model wise I don't believe it guarantees specific checkpointing or >> > > re-execution behavior as currently specified. The proto only says it >> > > represents the operation (without specifying the behavior, that is a >> big >> > > problem). >> > > >> > > I guess my concern here is that it implies/codifies that the existing >> > > reshuffle has more behavior than it promises outside of the Java SDK. >> > > >> > > "Allowing duplicates" WRT reshuffle is tricky. It feels like mostly >> allows >> > > an implementation that may mean the inputs into the reshuffle might be >> > > re-executed for example. But that's always under the runner's >> discretion , >> > > and ultimately it could also prevent even getting the intended >> benefit of a >> > > reshuffle (notionally, just a fusion break). >> > > >> > > Is there even a valid way to implement the notion of a reshuffle that >> > > leads to duplicates outside of a retry/resilience case? >> > > >> > > ------- >> > > >> > > To be clear, I'm not against the proposal. I'm against that its being >> > > built on a non-existent foundation. If the behavior isn't already >> defined, >> > > it's impossible to specify a real deviation from it. >> > > >> > > I'm all for more specific behaviors if means we actually clarify what >> the >> > > original version is in the protos, since its news to me ( just now, >> because >> > > I looked) that the Java reshuffle promises GBK-like side effects. But >> > > that's a long deprecated transform without a satisfying replacement >> for >> > > it's usage, so it may be moot. >> > > >> > > Robert Burke >> > > >> > > >> > > >> > > On Tue, Jan 30, 2024, 1:34 PM Kenneth Knowles <k...@apache.org> >> wrote: >> > > >> > >> Hi all, >> > >> >> > >> Just when you thought I had squeezed all the possible interest out of >> > >> this most boring-seeming of transforms :-) >> > >> >> > >> I wrote up a very quick proposal as a doc [1]. It is short enough >> that I >> > >> will also put the main idea and main question in this email so you >> can >> > >> quickly read. Best to put comments in the. >> > >> >> > >> Main idea: add a variation of Reshuffle that allows duplicates, aka >> "at >> > >> least once", so that users and runners can benefit from efficiency >> if it is >> > >> possible >> > >> >> > >> Main question: is it best as a parameter to existing reshuffle >> transforms >> > >> or as new URN(s)? I have proposed it as a parameter but I think >> either one >> > >> could work. >> > >> >> > >> I would love feedback on the main idea, main question, or anywhere >> on the >> > >> doc. >> > >> >> > >> Thanks! >> > >> >> > >> Kenn >> > >> >> > >> [1] https://s.apache.org/beam-reshuffle-allowing-duplicates >> > >> >> > > >> > >> >