Closing the loop, I went with two URNs and an associated payload in https://github.com/apache/beam/pull/30545
Kenn On Wed, Mar 6, 2024 at 10:54 AM Kenneth Knowles <k...@apache.org> wrote: > OK of course hacking this up there's already combinatorial 2x2 that > perhaps people were alluding to but I missed. > > RedistributeByKey (user's choice) > RedistributeArbitrarily (runner's choice! default may be random keys but > that is not required) > > RedistributeArbitrarilyAllowingDuplicates (this is the use case I am > trying to get at with the design & impl - basically runner's choice and > also no need to dedup or persist) > RedistributeByKeyAllowingDuplicates (is this an important use case? I > don't know - if so, then it points to some future where you tag any > transform with this) > > So now I kind of want to have two URNs (one per input/output type) and a > config that allows duplicates. > > WDYT? Do the people who liked having separate URNs want to have 4 URNs? We > can still have whatever end-user SDK interface we need to have regardless. > I think in Java we want it to look like this regardless: > > Redistribute.arbitrarily() > Redistribute.byKey() > Redistribute.arbitrarily().allowingDuplicates() > Redistribute.byKey().allowingDuplicates() > > And Python > > beam.Redistribute() > beam.RedistributeByKey() > beam.Redistribute(allowDuplicates=true) > beam.RedistributeByKey(allowDuplicates=true) > > I'll add end-user APIs to the design doc (and ask for help on Python and > Go idioms) but they are pretty short and sweet. > > Kenn > > On Thu, Feb 8, 2024 at 1:45 PM Robert Burke <rob...@frantil.com> wrote: > >> Was that only October? Wow. >> >> Option 2 SGTM, with the adjustment to making the core of the URN >> "redistribute_allowing_duplicates" instead of building from the unspecified >> Reshuffle semantics. >> >> Transforms getting updated to use the new transform can have their >> @RequiresStableInputs annotation added accordingly if they need that >> property per previous discussions. >> >> >> >> On Thu, Feb 8, 2024, 10:31 AM Kenneth Knowles <k...@apache.org> wrote: >> >>> >>> >>> On Wed, Feb 7, 2024 at 5:15 PM Robert Burke <lostl...@apache.org> wrote: >>> >>>> OK, so my stance is a configurable Reshuffle might be interesting, so >>>> my vote is +1, along the following lines. >>>> >>>> 1. Use a new URN (beam:transform:reshuffle:v2) and attach a new >>>> ReshufflePayload to it. >>>> >>> >>> Ah, I see there's more than one variation of the "new URN" approach. >>> Namely, you have a new version of an existing URN prefix, while I had in >>> mind that it was a totally new base URN. In other words the open question I >>> meant to pose is between these options: >>> >>> 1. beam:transform:reshuffle:v2 + { allowing_duplicates: true } >>> 2. beam:transform:reshuffle_allowing_duplicates:v1 {} >>> >>> The most compelling argument in favor of option 2 is that it could have >>> a distinct payload type associated with the different URN (maybe parameters >>> around tweaking how much duplication? I don't know... I actually expect >>> neither payload to evolve much if at all). >>> >>> There were also two comments in favor of option 2 on the design doc. >>> >>> -> Unknown "urns for composite transforms" already default to the >>>> subtransform graph implementation for most (all?) runners. >>>> -> Having a payload to toggle this behavior then can have whatever >>>> desired behavior we like. It also allows for additional configurations >>>> added in later on. This is preferable to a plethora of one-off urns IMHO. >>>> We can have SDKs gate configuration combinations as needed if additional >>>> ones appear. >>>> >>>> 2. It's very cheap to add but also ignore, as the default is "Do what >>>> we're already doing without change", and not all SDKs need to add it right >>>> away. It's more important that the portable way is defined at least, so >>>> it's easy for other SDKs to add and handle it. >>>> >>>> I would prefer we have a clear starting point on what Reshuffle does >>>> though. I remain a fan of "The Reshuffle (v2) Transform is a user >>>> designated hint to a runner for a change in parallelism. By default, it >>>> produces an output PCollection that has the same elements as the input >>>> PCollection". >>>> >>> >>> +1 this is a better phrasing of the spec I propose in >>> https://s.apache.org/beam-redistribute but let's not get into it here >>> if we can, and just evaluate the delta from that design to >>> https://s.apache.org/beam-reshuffle-allowing-duplicates >>> >>> Kenn >>> >>> >>>> It remains an open question about what that means for >>>> checkpointing/durability behavior, but that's largely been runner dependent >>>> anyway. I admit the above definition is biased by the uses of Reshuffle I'm >>>> aware of, which largely are to incur a fusion break in the execution graph. >>>> >>>> Robert Burke >>>> Beam Go Busybody >>>> >>>> On 2024/01/31 16:01:33 Kenneth Knowles wrote: >>>> > On Wed, Jan 31, 2024 at 4:21 AM Jan Lukavský <je...@seznam.cz> wrote: >>>> > >>>> > > Hi, >>>> > > >>>> > > if I understand this proposal correctly, the motivation is actually >>>> > > reducing latency by bypassing bundle atomic guarantees, bundles >>>> after "at >>>> > > least once" Reshuffle would be reconstructed independently of the >>>> > > pre-shuffle bundling. Provided this is correct, it seems that the >>>> behavior >>>> > > is slightly more general than for the case of Reshuffle. We have >>>> already >>>> > > some transforms that manipulate a specific property of a >>>> PCollection - if >>>> > > it may or might not contain duplicates. That is manipulated in two >>>> ways - >>>> > > explicitly removing duplicates based on IDs on sources that generate >>>> > > duplicates and using @RequiresStableInput, mostly in sinks. These >>>> > > techniques modify an inherent property of a PCollection, that is if >>>> it >>>> > > contains or does not contain possible duplicates originating from >>>> the same >>>> > > input element. >>>> > > >>>> > > There are two types of duplicates - duplicate elements in _different >>>> > > bundles_ (typically from at-least-once sources) and duplicates >>>> arising due >>>> > > to bundle reprocessing (affecting only transforms with >>>> side-effects, that >>>> > > is what we solve by @RequiresStableInput). The point I'm trying to >>>> get to - >>>> > > should we add these properties to PCollections (contains >>>> cross-bundle >>>> > > duplicates vs. does not) and PTransforms ("outputs deduplicated >>>> elements" >>>> > > and "requires stable input")? That would allow us to analyze the >>>> Pipeline >>>> > > DAG and provide appropriate implementation for Reshuffle >>>> automatically, so >>>> > > that a new URN or flag would not be needed. Moreover, this might be >>>> useful >>>> > > for a broader range of optimizations. >>>> > > >>>> > > WDYT? >>>> > > >>>> > These are interesting ideas that could be useful. I think they >>>> achieve a >>>> > different goal in my case. I actually want to explicitly allow >>>> > Reshuffle.allowingDuplicates() to skip expensive parts of its >>>> > implementation that are used to prevent duplicates. >>>> > >>>> > The property that would make it possible to automate this in the case >>>> of >>>> > combiners, or at least validate that the pipeline still gives 100% >>>> accurate >>>> > answers, would be something like @InsensitiveToDuplicateElements >>>> which is >>>> > longer and less esoteric than @Idempotent. For situations where there >>>> is a >>>> > source or sink that only has at-least-once guarantees then yea maybe >>>> the >>>> > property "has duplicates" will let you know that you may as well use >>>> the >>>> > duplicating reshuffle without any loss. But still, you may not want to >>>> > introduce *more* duplicates. >>>> > >>>> > I would say my proposal is a step in this direction that would gain >>>> some >>>> > experience and tools that we might later use in a more automated way. >>>> > >>>> > Kenn >>>> > >>>> > > Jan >>>> > > On 1/30/24 23:22, Robert Burke wrote: >>>> > > >>>> > > Is the benefit of this proposal just the bounded deviation from the >>>> > > existing reshuffle? >>>> > > >>>> > > Reshuffle is already rather dictated by arbitrary runner choice, >>>> from >>>> > > simply ignoring the node, to forcing a materialization break, to a >>>> full >>>> > > shuffle implementation which has additional side effects. >>>> > > >>>> > > But model wise I don't believe it guarantees specific checkpointing >>>> or >>>> > > re-execution behavior as currently specified. The proto only says it >>>> > > represents the operation (without specifying the behavior, that is >>>> a big >>>> > > problem). >>>> > > >>>> > > I guess my concern here is that it implies/codifies that the >>>> existing >>>> > > reshuffle has more behavior than it promises outside of the Java >>>> SDK. >>>> > > >>>> > > "Allowing duplicates" WRT reshuffle is tricky. It feels like mostly >>>> allows >>>> > > an implementation that may mean the inputs into the reshuffle might >>>> be >>>> > > re-executed for example. But that's always under the runner's >>>> discretion , >>>> > > and ultimately it could also prevent even getting the intended >>>> benefit of a >>>> > > reshuffle (notionally, just a fusion break). >>>> > > >>>> > > Is there even a valid way to implement the notion of a reshuffle >>>> that >>>> > > leads to duplicates outside of a retry/resilience case? >>>> > > >>>> > > ------- >>>> > > >>>> > > To be clear, I'm not against the proposal. I'm against that its >>>> being >>>> > > built on a non-existent foundation. If the behavior isn't already >>>> defined, >>>> > > it's impossible to specify a real deviation from it. >>>> > > >>>> > > I'm all for more specific behaviors if means we actually clarify >>>> what the >>>> > > original version is in the protos, since its news to me ( just now, >>>> because >>>> > > I looked) that the Java reshuffle promises GBK-like side effects. >>>> But >>>> > > that's a long deprecated transform without a satisfying replacement >>>> for >>>> > > it's usage, so it may be moot. >>>> > > >>>> > > Robert Burke >>>> > > >>>> > > >>>> > > >>>> > > On Tue, Jan 30, 2024, 1:34 PM Kenneth Knowles <k...@apache.org> >>>> wrote: >>>> > > >>>> > >> Hi all, >>>> > >> >>>> > >> Just when you thought I had squeezed all the possible interest out >>>> of >>>> > >> this most boring-seeming of transforms :-) >>>> > >> >>>> > >> I wrote up a very quick proposal as a doc [1]. It is short enough >>>> that I >>>> > >> will also put the main idea and main question in this email so you >>>> can >>>> > >> quickly read. Best to put comments in the. >>>> > >> >>>> > >> Main idea: add a variation of Reshuffle that allows duplicates, >>>> aka "at >>>> > >> least once", so that users and runners can benefit from efficiency >>>> if it is >>>> > >> possible >>>> > >> >>>> > >> Main question: is it best as a parameter to existing reshuffle >>>> transforms >>>> > >> or as new URN(s)? I have proposed it as a parameter but I think >>>> either one >>>> > >> could work. >>>> > >> >>>> > >> I would love feedback on the main idea, main question, or anywhere >>>> on the >>>> > >> doc. >>>> > >> >>>> > >> Thanks! >>>> > >> >>>> > >> Kenn >>>> > >> >>>> > >> [1] https://s.apache.org/beam-reshuffle-allowing-duplicates >>>> > >> >>>> > > >>>> > >>>> >>>