OK, so my stance is a configurable Reshuffle might be interesting, so my vote is +1, along the following lines.
1. Use a new URN (beam:transform:reshuffle:v2) and attach a new ReshufflePayload to it. -> Unknown "urns for composite transforms" already default to the subtransform graph implementation for most (all?) runners. -> Having a payload to toggle this behavior then can have whatever desired behavior we like. It also allows for additional configurations added in later on. This is preferable to a plethora of one-off urns IMHO. We can have SDKs gate configuration combinations as needed if additional ones appear. 2. It's very cheap to add but also ignore, as the default is "Do what we're already doing without change", and not all SDKs need to add it right away. It's more important that the portable way is defined at least, so it's easy for other SDKs to add and handle it. I would prefer we have a clear starting point on what Reshuffle does though. I remain a fan of "The Reshuffle (v2) Transform is a user designated hint to a runner for a change in parallelism. By default, it produces an output PCollection that has the same elements as the input PCollection". It remains an open question about what that means for checkpointing/durability behavior, but that's largely been runner dependent anyway. I admit the above definition is biased by the uses of Reshuffle I'm aware of, which largely are to incur a fusion break in the execution graph. Robert Burke Beam Go Busybody On 2024/01/31 16:01:33 Kenneth Knowles wrote: > On Wed, Jan 31, 2024 at 4:21 AM Jan Lukavský <je...@seznam.cz> wrote: > > > Hi, > > > > if I understand this proposal correctly, the motivation is actually > > reducing latency by bypassing bundle atomic guarantees, bundles after "at > > least once" Reshuffle would be reconstructed independently of the > > pre-shuffle bundling. Provided this is correct, it seems that the behavior > > is slightly more general than for the case of Reshuffle. We have already > > some transforms that manipulate a specific property of a PCollection - if > > it may or might not contain duplicates. That is manipulated in two ways - > > explicitly removing duplicates based on IDs on sources that generate > > duplicates and using @RequiresStableInput, mostly in sinks. These > > techniques modify an inherent property of a PCollection, that is if it > > contains or does not contain possible duplicates originating from the same > > input element. > > > > There are two types of duplicates - duplicate elements in _different > > bundles_ (typically from at-least-once sources) and duplicates arising due > > to bundle reprocessing (affecting only transforms with side-effects, that > > is what we solve by @RequiresStableInput). The point I'm trying to get to - > > should we add these properties to PCollections (contains cross-bundle > > duplicates vs. does not) and PTransforms ("outputs deduplicated elements" > > and "requires stable input")? That would allow us to analyze the Pipeline > > DAG and provide appropriate implementation for Reshuffle automatically, so > > that a new URN or flag would not be needed. Moreover, this might be useful > > for a broader range of optimizations. > > > > WDYT? > > > These are interesting ideas that could be useful. I think they achieve a > different goal in my case. I actually want to explicitly allow > Reshuffle.allowingDuplicates() to skip expensive parts of its > implementation that are used to prevent duplicates. > > The property that would make it possible to automate this in the case of > combiners, or at least validate that the pipeline still gives 100% accurate > answers, would be something like @InsensitiveToDuplicateElements which is > longer and less esoteric than @Idempotent. For situations where there is a > source or sink that only has at-least-once guarantees then yea maybe the > property "has duplicates" will let you know that you may as well use the > duplicating reshuffle without any loss. But still, you may not want to > introduce *more* duplicates. > > I would say my proposal is a step in this direction that would gain some > experience and tools that we might later use in a more automated way. > > Kenn > > > Jan > > On 1/30/24 23:22, Robert Burke wrote: > > > > Is the benefit of this proposal just the bounded deviation from the > > existing reshuffle? > > > > Reshuffle is already rather dictated by arbitrary runner choice, from > > simply ignoring the node, to forcing a materialization break, to a full > > shuffle implementation which has additional side effects. > > > > But model wise I don't believe it guarantees specific checkpointing or > > re-execution behavior as currently specified. The proto only says it > > represents the operation (without specifying the behavior, that is a big > > problem). > > > > I guess my concern here is that it implies/codifies that the existing > > reshuffle has more behavior than it promises outside of the Java SDK. > > > > "Allowing duplicates" WRT reshuffle is tricky. It feels like mostly allows > > an implementation that may mean the inputs into the reshuffle might be > > re-executed for example. But that's always under the runner's discretion , > > and ultimately it could also prevent even getting the intended benefit of a > > reshuffle (notionally, just a fusion break). > > > > Is there even a valid way to implement the notion of a reshuffle that > > leads to duplicates outside of a retry/resilience case? > > > > ------- > > > > To be clear, I'm not against the proposal. I'm against that its being > > built on a non-existent foundation. If the behavior isn't already defined, > > it's impossible to specify a real deviation from it. > > > > I'm all for more specific behaviors if means we actually clarify what the > > original version is in the protos, since its news to me ( just now, because > > I looked) that the Java reshuffle promises GBK-like side effects. But > > that's a long deprecated transform without a satisfying replacement for > > it's usage, so it may be moot. > > > > Robert Burke > > > > > > > > On Tue, Jan 30, 2024, 1:34 PM Kenneth Knowles <k...@apache.org> wrote: > > > >> Hi all, > >> > >> Just when you thought I had squeezed all the possible interest out of > >> this most boring-seeming of transforms :-) > >> > >> I wrote up a very quick proposal as a doc [1]. It is short enough that I > >> will also put the main idea and main question in this email so you can > >> quickly read. Best to put comments in the. > >> > >> Main idea: add a variation of Reshuffle that allows duplicates, aka "at > >> least once", so that users and runners can benefit from efficiency if it is > >> possible > >> > >> Main question: is it best as a parameter to existing reshuffle transforms > >> or as new URN(s)? I have proposed it as a parameter but I think either one > >> could work. > >> > >> I would love feedback on the main idea, main question, or anywhere on the > >> doc. > >> > >> Thanks! > >> > >> Kenn > >> > >> [1] https://s.apache.org/beam-reshuffle-allowing-duplicates > >> > > >