Was that only October? Wow.

Option 2 SGTM, with the adjustment to making the core of the URN
"redistribute_allowing_duplicates" instead of building from the unspecified
Reshuffle semantics.

Transforms getting updated to use the new transform can have their
@RequiresStableInputs annotation added  accordingly if they need that
property per previous discussions.



On Thu, Feb 8, 2024, 10:31 AM Kenneth Knowles <k...@apache.org> wrote:

>
>
> On Wed, Feb 7, 2024 at 5:15 PM Robert Burke <lostl...@apache.org> wrote:
>
>> OK, so my stance is a configurable Reshuffle might be interesting, so my
>> vote is +1, along the following lines.
>>
>> 1. Use a new URN (beam:transform:reshuffle:v2) and attach a new
>> ReshufflePayload to it.
>>
>
> Ah, I see there's more than one variation of the "new URN" approach.
> Namely, you have a new version of an existing URN prefix, while I had in
> mind that it was a totally new base URN. In other words the open question I
> meant to pose is between these options:
>
> 1. beam:transform:reshuffle:v2 + { allowing_duplicates: true }
> 2. beam:transform:reshuffle_allowing_duplicates:v1 {}
>
> The most compelling argument in favor of option 2 is that it could have a
> distinct payload type associated with the different URN (maybe parameters
> around tweaking how much duplication? I don't know... I actually expect
> neither payload to evolve much if at all).
>
> There were also two comments in favor of option 2 on the design doc.
>
>   -> Unknown "urns for composite transforms" already default to the
>> subtransform graph implementation for most (all?) runners.
>>   -> Having a payload to toggle this behavior then can have whatever
>> desired behavior we like. It also allows for additional configurations
>> added in later on. This is preferable to a plethora of one-off urns IMHO.
>> We can have SDKs gate configuration combinations as needed if additional
>> ones appear.
>>
>> 2. It's very cheap to add but also ignore, as the default is "Do what
>> we're already doing without change", and not all SDKs need to add it right
>> away. It's more important that the portable way is defined at least, so
>> it's easy for other SDKs to add and handle it.
>>
>> I would prefer we have a clear starting point on what Reshuffle does
>> though. I remain a fan of "The Reshuffle (v2) Transform is a user
>> designated hint to a runner for a change in parallelism. By default, it
>> produces an output PCollection that has the same elements as the input
>> PCollection".
>>
>
> +1 this is a better phrasing of the spec I propose in
> https://s.apache.org/beam-redistribute but let's not get into it here if
> we can, and just evaluate the delta from that design to
> https://s.apache.org/beam-reshuffle-allowing-duplicates
>
> Kenn
>
>
>> It remains an open question about what that means for
>> checkpointing/durability behavior, but that's largely been runner dependent
>> anyway. I admit the above definition is biased by the uses of Reshuffle I'm
>> aware of, which largely are to incur a fusion break in the execution graph.
>>
>> Robert Burke
>> Beam Go Busybody
>>
>> On 2024/01/31 16:01:33 Kenneth Knowles wrote:
>> > On Wed, Jan 31, 2024 at 4:21 AM Jan Lukavský <je...@seznam.cz> wrote:
>> >
>> > > Hi,
>> > >
>> > > if I understand this proposal correctly, the motivation is actually
>> > > reducing latency by bypassing bundle atomic guarantees, bundles after
>> "at
>> > > least once" Reshuffle would be reconstructed independently of the
>> > > pre-shuffle bundling. Provided this is correct, it seems that the
>> behavior
>> > > is slightly more general than for the case of Reshuffle. We have
>> already
>> > > some transforms that manipulate a specific property of a PCollection
>> - if
>> > > it may or might not contain duplicates. That is manipulated in two
>> ways -
>> > > explicitly removing duplicates based on IDs on sources that generate
>> > > duplicates and using @RequiresStableInput, mostly in sinks. These
>> > > techniques modify an inherent property of a PCollection, that is if it
>> > > contains or does not contain possible duplicates originating from the
>> same
>> > > input element.
>> > >
>> > > There are two types of duplicates - duplicate elements in _different
>> > > bundles_ (typically from at-least-once sources) and duplicates
>> arising due
>> > > to bundle reprocessing (affecting only transforms with side-effects,
>> that
>> > > is what we solve by @RequiresStableInput). The point I'm trying to
>> get to -
>> > > should we add these properties to PCollections (contains cross-bundle
>> > > duplicates vs. does not) and PTransforms ("outputs deduplicated
>> elements"
>> > > and "requires stable input")? That would allow us to analyze the
>> Pipeline
>> > > DAG and provide appropriate implementation for Reshuffle
>> automatically, so
>> > > that a new URN or flag would not be needed. Moreover, this might be
>> useful
>> > > for a broader range of optimizations.
>> > >
>> > > WDYT?
>> > >
>> > These are interesting ideas that could be useful. I think they achieve a
>> > different goal in my case. I actually want to explicitly allow
>> > Reshuffle.allowingDuplicates() to skip expensive parts of its
>> > implementation that are used to prevent duplicates.
>> >
>> > The property that would make it possible to automate this in the case of
>> > combiners, or at least validate that the pipeline still gives 100%
>> accurate
>> > answers, would be something like @InsensitiveToDuplicateElements which
>> is
>> > longer and less esoteric than @Idempotent. For situations where there
>> is a
>> > source or sink that only has at-least-once guarantees then yea maybe the
>> > property "has duplicates" will let you know that you may as well use the
>> > duplicating reshuffle without any loss. But still, you may not want to
>> > introduce *more* duplicates.
>> >
>> > I would say my proposal is a step in this direction that would gain some
>> > experience and tools that we might later use in a more automated way.
>> >
>> > Kenn
>> >
>> > >  Jan
>> > > On 1/30/24 23:22, Robert Burke wrote:
>> > >
>> > > Is the benefit of this proposal just the bounded deviation from the
>> > > existing reshuffle?
>> > >
>> > > Reshuffle is already rather dictated by arbitrary runner choice, from
>> > > simply ignoring the node, to forcing a materialization break, to a
>> full
>> > > shuffle implementation which has additional side effects.
>> > >
>> > > But model wise I don't believe it guarantees specific checkpointing or
>> > > re-execution behavior as currently specified. The proto only says it
>> > > represents the operation (without specifying the behavior, that is a
>> big
>> > > problem).
>> > >
>> > > I guess my concern here is that it implies/codifies that the existing
>> > > reshuffle has more behavior than it promises outside of the Java SDK.
>> > >
>> > > "Allowing duplicates" WRT reshuffle is tricky. It feels like mostly
>> allows
>> > > an implementation that may mean the inputs into the reshuffle might be
>> > > re-executed for example. But that's always under the runner's
>> discretion ,
>> > > and ultimately it could also prevent even getting the intended
>> benefit of a
>> > > reshuffle (notionally, just a fusion break).
>> > >
>> > > Is there even a valid way to implement the notion of a reshuffle that
>> > > leads to duplicates outside of a retry/resilience case?
>> > >
>> > > -------
>> > >
>> > > To be clear, I'm not against the proposal. I'm against that its being
>> > > built on a non-existent foundation. If the behavior isn't already
>> defined,
>> > > it's impossible to specify a real deviation from it.
>> > >
>> > > I'm all for more specific behaviors if means we actually clarify what
>> the
>> > > original version is in the protos, since its news to me ( just now,
>> because
>> > > I looked) that the Java reshuffle promises GBK-like side effects. But
>> > > that's a long deprecated transform without a satisfying replacement
>> for
>> > > it's usage, so it may be moot.
>> > >
>> > > Robert Burke
>> > >
>> > >
>> > >
>> > > On Tue, Jan 30, 2024, 1:34 PM Kenneth Knowles <k...@apache.org>
>> wrote:
>> > >
>> > >> Hi all,
>> > >>
>> > >> Just when you thought I had squeezed all the possible interest out of
>> > >> this most boring-seeming of transforms :-)
>> > >>
>> > >> I wrote up a very quick proposal as a doc [1]. It is short enough
>> that I
>> > >> will also put the main idea and main question in this email so you
>> can
>> > >> quickly read. Best to put comments in the.
>> > >>
>> > >> Main idea: add a variation of Reshuffle that allows duplicates, aka
>> "at
>> > >> least once", so that users and runners can benefit from efficiency
>> if it is
>> > >> possible
>> > >>
>> > >> Main question: is it best as a parameter to existing reshuffle
>> transforms
>> > >> or as new URN(s)? I have proposed it as a parameter but I think
>> either one
>> > >> could work.
>> > >>
>> > >> I would love feedback on the main idea, main question, or anywhere
>> on the
>> > >> doc.
>> > >>
>> > >> Thanks!
>> > >>
>> > >> Kenn
>> > >>
>> > >> [1] https://s.apache.org/beam-reshuffle-allowing-duplicates
>> > >>
>> > >
>> >
>>
>

Reply via email to