Closing the loop, I went with two URNs and an associated payload in
https://github.com/apache/beam/pull/30545

Kenn

On Wed, Mar 6, 2024 at 10:54 AM Kenneth Knowles <k...@apache.org> wrote:

> OK of course hacking this up there's already combinatorial 2x2 that
> perhaps people were alluding to but I missed.
>
> RedistributeByKey (user's choice)
> RedistributeArbitrarily (runner's choice! default may be random keys but
> that is not required)
>
> RedistributeArbitrarilyAllowingDuplicates (this is the use case I am
> trying to get at with the design & impl - basically runner's choice and
> also no need to dedup or persist)
> RedistributeByKeyAllowingDuplicates (is this an important use case? I
> don't know - if so, then it points to some future where you tag any
> transform with this)
>
> So now I kind of want to have two URNs (one per input/output type) and a
> config that allows duplicates.
>
> WDYT? Do the people who liked having separate URNs want to have 4 URNs? We
> can still have whatever end-user SDK interface we need to have regardless.
> I think in Java we want it to look like this regardless:
>
> Redistribute.arbitrarily()
> Redistribute.byKey()
> Redistribute.arbitrarily().allowingDuplicates()
> Redistribute.byKey().allowingDuplicates()
>
> And Python
>
> beam.Redistribute()
> beam.RedistributeByKey()
> beam.Redistribute(allowDuplicates=true)
> beam.RedistributeByKey(allowDuplicates=true)
>
> I'll add end-user APIs to the design doc (and ask for help on Python and
> Go idioms) but they are pretty short and sweet.
>
> Kenn
>
> On Thu, Feb 8, 2024 at 1:45 PM Robert Burke <rob...@frantil.com> wrote:
>
>> Was that only October? Wow.
>>
>> Option 2 SGTM, with the adjustment to making the core of the URN
>> "redistribute_allowing_duplicates" instead of building from the unspecified
>> Reshuffle semantics.
>>
>> Transforms getting updated to use the new transform can have their
>> @RequiresStableInputs annotation added  accordingly if they need that
>> property per previous discussions.
>>
>>
>>
>> On Thu, Feb 8, 2024, 10:31 AM Kenneth Knowles <k...@apache.org> wrote:
>>
>>>
>>>
>>> On Wed, Feb 7, 2024 at 5:15 PM Robert Burke <lostl...@apache.org> wrote:
>>>
>>>> OK, so my stance is a configurable Reshuffle might be interesting, so
>>>> my vote is +1, along the following lines.
>>>>
>>>> 1. Use a new URN (beam:transform:reshuffle:v2) and attach a new
>>>> ReshufflePayload to it.
>>>>
>>>
>>> Ah, I see there's more than one variation of the "new URN" approach.
>>> Namely, you have a new version of an existing URN prefix, while I had in
>>> mind that it was a totally new base URN. In other words the open question I
>>> meant to pose is between these options:
>>>
>>> 1. beam:transform:reshuffle:v2 + { allowing_duplicates: true }
>>> 2. beam:transform:reshuffle_allowing_duplicates:v1 {}
>>>
>>> The most compelling argument in favor of option 2 is that it could have
>>> a distinct payload type associated with the different URN (maybe parameters
>>> around tweaking how much duplication? I don't know... I actually expect
>>> neither payload to evolve much if at all).
>>>
>>> There were also two comments in favor of option 2 on the design doc.
>>>
>>>   -> Unknown "urns for composite transforms" already default to the
>>>> subtransform graph implementation for most (all?) runners.
>>>>   -> Having a payload to toggle this behavior then can have whatever
>>>> desired behavior we like. It also allows for additional configurations
>>>> added in later on. This is preferable to a plethora of one-off urns IMHO.
>>>> We can have SDKs gate configuration combinations as needed if additional
>>>> ones appear.
>>>>
>>>> 2. It's very cheap to add but also ignore, as the default is "Do what
>>>> we're already doing without change", and not all SDKs need to add it right
>>>> away. It's more important that the portable way is defined at least, so
>>>> it's easy for other SDKs to add and handle it.
>>>>
>>>> I would prefer we have a clear starting point on what Reshuffle does
>>>> though. I remain a fan of "The Reshuffle (v2) Transform is a user
>>>> designated hint to a runner for a change in parallelism. By default, it
>>>> produces an output PCollection that has the same elements as the input
>>>> PCollection".
>>>>
>>>
>>> +1 this is a better phrasing of the spec I propose in
>>> https://s.apache.org/beam-redistribute but let's not get into it here
>>> if we can, and just evaluate the delta from that design to
>>> https://s.apache.org/beam-reshuffle-allowing-duplicates
>>>
>>> Kenn
>>>
>>>
>>>> It remains an open question about what that means for
>>>> checkpointing/durability behavior, but that's largely been runner dependent
>>>> anyway. I admit the above definition is biased by the uses of Reshuffle I'm
>>>> aware of, which largely are to incur a fusion break in the execution graph.
>>>>
>>>> Robert Burke
>>>> Beam Go Busybody
>>>>
>>>> On 2024/01/31 16:01:33 Kenneth Knowles wrote:
>>>> > On Wed, Jan 31, 2024 at 4:21 AM Jan Lukavský <je...@seznam.cz> wrote:
>>>> >
>>>> > > Hi,
>>>> > >
>>>> > > if I understand this proposal correctly, the motivation is actually
>>>> > > reducing latency by bypassing bundle atomic guarantees, bundles
>>>> after "at
>>>> > > least once" Reshuffle would be reconstructed independently of the
>>>> > > pre-shuffle bundling. Provided this is correct, it seems that the
>>>> behavior
>>>> > > is slightly more general than for the case of Reshuffle. We have
>>>> already
>>>> > > some transforms that manipulate a specific property of a
>>>> PCollection - if
>>>> > > it may or might not contain duplicates. That is manipulated in two
>>>> ways -
>>>> > > explicitly removing duplicates based on IDs on sources that generate
>>>> > > duplicates and using @RequiresStableInput, mostly in sinks. These
>>>> > > techniques modify an inherent property of a PCollection, that is if
>>>> it
>>>> > > contains or does not contain possible duplicates originating from
>>>> the same
>>>> > > input element.
>>>> > >
>>>> > > There are two types of duplicates - duplicate elements in _different
>>>> > > bundles_ (typically from at-least-once sources) and duplicates
>>>> arising due
>>>> > > to bundle reprocessing (affecting only transforms with
>>>> side-effects, that
>>>> > > is what we solve by @RequiresStableInput). The point I'm trying to
>>>> get to -
>>>> > > should we add these properties to PCollections (contains
>>>> cross-bundle
>>>> > > duplicates vs. does not) and PTransforms ("outputs deduplicated
>>>> elements"
>>>> > > and "requires stable input")? That would allow us to analyze the
>>>> Pipeline
>>>> > > DAG and provide appropriate implementation for Reshuffle
>>>> automatically, so
>>>> > > that a new URN or flag would not be needed. Moreover, this might be
>>>> useful
>>>> > > for a broader range of optimizations.
>>>> > >
>>>> > > WDYT?
>>>> > >
>>>> > These are interesting ideas that could be useful. I think they
>>>> achieve a
>>>> > different goal in my case. I actually want to explicitly allow
>>>> > Reshuffle.allowingDuplicates() to skip expensive parts of its
>>>> > implementation that are used to prevent duplicates.
>>>> >
>>>> > The property that would make it possible to automate this in the case
>>>> of
>>>> > combiners, or at least validate that the pipeline still gives 100%
>>>> accurate
>>>> > answers, would be something like @InsensitiveToDuplicateElements
>>>> which is
>>>> > longer and less esoteric than @Idempotent. For situations where there
>>>> is a
>>>> > source or sink that only has at-least-once guarantees then yea maybe
>>>> the
>>>> > property "has duplicates" will let you know that you may as well use
>>>> the
>>>> > duplicating reshuffle without any loss. But still, you may not want to
>>>> > introduce *more* duplicates.
>>>> >
>>>> > I would say my proposal is a step in this direction that would gain
>>>> some
>>>> > experience and tools that we might later use in a more automated way.
>>>> >
>>>> > Kenn
>>>> >
>>>> > >  Jan
>>>> > > On 1/30/24 23:22, Robert Burke wrote:
>>>> > >
>>>> > > Is the benefit of this proposal just the bounded deviation from the
>>>> > > existing reshuffle?
>>>> > >
>>>> > > Reshuffle is already rather dictated by arbitrary runner choice,
>>>> from
>>>> > > simply ignoring the node, to forcing a materialization break, to a
>>>> full
>>>> > > shuffle implementation which has additional side effects.
>>>> > >
>>>> > > But model wise I don't believe it guarantees specific checkpointing
>>>> or
>>>> > > re-execution behavior as currently specified. The proto only says it
>>>> > > represents the operation (without specifying the behavior, that is
>>>> a big
>>>> > > problem).
>>>> > >
>>>> > > I guess my concern here is that it implies/codifies that the
>>>> existing
>>>> > > reshuffle has more behavior than it promises outside of the Java
>>>> SDK.
>>>> > >
>>>> > > "Allowing duplicates" WRT reshuffle is tricky. It feels like mostly
>>>> allows
>>>> > > an implementation that may mean the inputs into the reshuffle might
>>>> be
>>>> > > re-executed for example. But that's always under the runner's
>>>> discretion ,
>>>> > > and ultimately it could also prevent even getting the intended
>>>> benefit of a
>>>> > > reshuffle (notionally, just a fusion break).
>>>> > >
>>>> > > Is there even a valid way to implement the notion of a reshuffle
>>>> that
>>>> > > leads to duplicates outside of a retry/resilience case?
>>>> > >
>>>> > > -------
>>>> > >
>>>> > > To be clear, I'm not against the proposal. I'm against that its
>>>> being
>>>> > > built on a non-existent foundation. If the behavior isn't already
>>>> defined,
>>>> > > it's impossible to specify a real deviation from it.
>>>> > >
>>>> > > I'm all for more specific behaviors if means we actually clarify
>>>> what the
>>>> > > original version is in the protos, since its news to me ( just now,
>>>> because
>>>> > > I looked) that the Java reshuffle promises GBK-like side effects.
>>>> But
>>>> > > that's a long deprecated transform without a satisfying replacement
>>>> for
>>>> > > it's usage, so it may be moot.
>>>> > >
>>>> > > Robert Burke
>>>> > >
>>>> > >
>>>> > >
>>>> > > On Tue, Jan 30, 2024, 1:34 PM Kenneth Knowles <k...@apache.org>
>>>> wrote:
>>>> > >
>>>> > >> Hi all,
>>>> > >>
>>>> > >> Just when you thought I had squeezed all the possible interest out
>>>> of
>>>> > >> this most boring-seeming of transforms :-)
>>>> > >>
>>>> > >> I wrote up a very quick proposal as a doc [1]. It is short enough
>>>> that I
>>>> > >> will also put the main idea and main question in this email so you
>>>> can
>>>> > >> quickly read. Best to put comments in the.
>>>> > >>
>>>> > >> Main idea: add a variation of Reshuffle that allows duplicates,
>>>> aka "at
>>>> > >> least once", so that users and runners can benefit from efficiency
>>>> if it is
>>>> > >> possible
>>>> > >>
>>>> > >> Main question: is it best as a parameter to existing reshuffle
>>>> transforms
>>>> > >> or as new URN(s)? I have proposed it as a parameter but I think
>>>> either one
>>>> > >> could work.
>>>> > >>
>>>> > >> I would love feedback on the main idea, main question, or anywhere
>>>> on the
>>>> > >> doc.
>>>> > >>
>>>> > >> Thanks!
>>>> > >>
>>>> > >> Kenn
>>>> > >>
>>>> > >> [1] https://s.apache.org/beam-reshuffle-allowing-duplicates
>>>> > >>
>>>> > >
>>>> >
>>>>
>>>

Reply via email to