On Wed, Feb 7, 2024 at 5:15 PM Robert Burke <lostl...@apache.org> wrote:
> OK, so my stance is a configurable Reshuffle might be interesting, so my > vote is +1, along the following lines. > > 1. Use a new URN (beam:transform:reshuffle:v2) and attach a new > ReshufflePayload to it. > Ah, I see there's more than one variation of the "new URN" approach. Namely, you have a new version of an existing URN prefix, while I had in mind that it was a totally new base URN. In other words the open question I meant to pose is between these options: 1. beam:transform:reshuffle:v2 + { allowing_duplicates: true } 2. beam:transform:reshuffle_allowing_duplicates:v1 {} The most compelling argument in favor of option 2 is that it could have a distinct payload type associated with the different URN (maybe parameters around tweaking how much duplication? I don't know... I actually expect neither payload to evolve much if at all). There were also two comments in favor of option 2 on the design doc. -> Unknown "urns for composite transforms" already default to the > subtransform graph implementation for most (all?) runners. > -> Having a payload to toggle this behavior then can have whatever > desired behavior we like. It also allows for additional configurations > added in later on. This is preferable to a plethora of one-off urns IMHO. > We can have SDKs gate configuration combinations as needed if additional > ones appear. > > 2. It's very cheap to add but also ignore, as the default is "Do what > we're already doing without change", and not all SDKs need to add it right > away. It's more important that the portable way is defined at least, so > it's easy for other SDKs to add and handle it. > > I would prefer we have a clear starting point on what Reshuffle does > though. I remain a fan of "The Reshuffle (v2) Transform is a user > designated hint to a runner for a change in parallelism. By default, it > produces an output PCollection that has the same elements as the input > PCollection". > +1 this is a better phrasing of the spec I propose in https://s.apache.org/beam-redistribute but let's not get into it here if we can, and just evaluate the delta from that design to https://s.apache.org/beam-reshuffle-allowing-duplicates Kenn > It remains an open question about what that means for > checkpointing/durability behavior, but that's largely been runner dependent > anyway. I admit the above definition is biased by the uses of Reshuffle I'm > aware of, which largely are to incur a fusion break in the execution graph. > > Robert Burke > Beam Go Busybody > > On 2024/01/31 16:01:33 Kenneth Knowles wrote: > > On Wed, Jan 31, 2024 at 4:21 AM Jan Lukavský <je...@seznam.cz> wrote: > > > > > Hi, > > > > > > if I understand this proposal correctly, the motivation is actually > > > reducing latency by bypassing bundle atomic guarantees, bundles after > "at > > > least once" Reshuffle would be reconstructed independently of the > > > pre-shuffle bundling. Provided this is correct, it seems that the > behavior > > > is slightly more general than for the case of Reshuffle. We have > already > > > some transforms that manipulate a specific property of a PCollection - > if > > > it may or might not contain duplicates. That is manipulated in two > ways - > > > explicitly removing duplicates based on IDs on sources that generate > > > duplicates and using @RequiresStableInput, mostly in sinks. These > > > techniques modify an inherent property of a PCollection, that is if it > > > contains or does not contain possible duplicates originating from the > same > > > input element. > > > > > > There are two types of duplicates - duplicate elements in _different > > > bundles_ (typically from at-least-once sources) and duplicates arising > due > > > to bundle reprocessing (affecting only transforms with side-effects, > that > > > is what we solve by @RequiresStableInput). The point I'm trying to get > to - > > > should we add these properties to PCollections (contains cross-bundle > > > duplicates vs. does not) and PTransforms ("outputs deduplicated > elements" > > > and "requires stable input")? That would allow us to analyze the > Pipeline > > > DAG and provide appropriate implementation for Reshuffle > automatically, so > > > that a new URN or flag would not be needed. Moreover, this might be > useful > > > for a broader range of optimizations. > > > > > > WDYT? > > > > > These are interesting ideas that could be useful. I think they achieve a > > different goal in my case. I actually want to explicitly allow > > Reshuffle.allowingDuplicates() to skip expensive parts of its > > implementation that are used to prevent duplicates. > > > > The property that would make it possible to automate this in the case of > > combiners, or at least validate that the pipeline still gives 100% > accurate > > answers, would be something like @InsensitiveToDuplicateElements which is > > longer and less esoteric than @Idempotent. For situations where there is > a > > source or sink that only has at-least-once guarantees then yea maybe the > > property "has duplicates" will let you know that you may as well use the > > duplicating reshuffle without any loss. But still, you may not want to > > introduce *more* duplicates. > > > > I would say my proposal is a step in this direction that would gain some > > experience and tools that we might later use in a more automated way. > > > > Kenn > > > > > Jan > > > On 1/30/24 23:22, Robert Burke wrote: > > > > > > Is the benefit of this proposal just the bounded deviation from the > > > existing reshuffle? > > > > > > Reshuffle is already rather dictated by arbitrary runner choice, from > > > simply ignoring the node, to forcing a materialization break, to a full > > > shuffle implementation which has additional side effects. > > > > > > But model wise I don't believe it guarantees specific checkpointing or > > > re-execution behavior as currently specified. The proto only says it > > > represents the operation (without specifying the behavior, that is a > big > > > problem). > > > > > > I guess my concern here is that it implies/codifies that the existing > > > reshuffle has more behavior than it promises outside of the Java SDK. > > > > > > "Allowing duplicates" WRT reshuffle is tricky. It feels like mostly > allows > > > an implementation that may mean the inputs into the reshuffle might be > > > re-executed for example. But that's always under the runner's > discretion , > > > and ultimately it could also prevent even getting the intended benefit > of a > > > reshuffle (notionally, just a fusion break). > > > > > > Is there even a valid way to implement the notion of a reshuffle that > > > leads to duplicates outside of a retry/resilience case? > > > > > > ------- > > > > > > To be clear, I'm not against the proposal. I'm against that its being > > > built on a non-existent foundation. If the behavior isn't already > defined, > > > it's impossible to specify a real deviation from it. > > > > > > I'm all for more specific behaviors if means we actually clarify what > the > > > original version is in the protos, since its news to me ( just now, > because > > > I looked) that the Java reshuffle promises GBK-like side effects. But > > > that's a long deprecated transform without a satisfying replacement for > > > it's usage, so it may be moot. > > > > > > Robert Burke > > > > > > > > > > > > On Tue, Jan 30, 2024, 1:34 PM Kenneth Knowles <k...@apache.org> wrote: > > > > > >> Hi all, > > >> > > >> Just when you thought I had squeezed all the possible interest out of > > >> this most boring-seeming of transforms :-) > > >> > > >> I wrote up a very quick proposal as a doc [1]. It is short enough > that I > > >> will also put the main idea and main question in this email so you can > > >> quickly read. Best to put comments in the. > > >> > > >> Main idea: add a variation of Reshuffle that allows duplicates, aka > "at > > >> least once", so that users and runners can benefit from efficiency if > it is > > >> possible > > >> > > >> Main question: is it best as a parameter to existing reshuffle > transforms > > >> or as new URN(s)? I have proposed it as a parameter but I think > either one > > >> could work. > > >> > > >> I would love feedback on the main idea, main question, or anywhere on > the > > >> doc. > > >> > > >> Thanks! > > >> > > >> Kenn > > >> > > >> [1] https://s.apache.org/beam-reshuffle-allowing-duplicates > > >> > > > > > >