Hi,
if I understand this proposal correctly, the motivation is actually
reducing latency by bypassing bundle atomic guarantees, bundles after
"at least once" Reshuffle would be reconstructed independently of the
pre-shuffle bundling. Provided this is correct, it seems that the
behavior is slightly more general than for the case of Reshuffle. We
have already some transforms that manipulate a specific property of a
PCollection - if it may or might not contain duplicates. That is
manipulated in two ways - explicitly removing duplicates based on IDs on
sources that generate duplicates and using @RequiresStableInput, mostly
in sinks. These techniques modify an inherent property of a PCollection,
that is if it contains or does not contain possible duplicates
originating from the same input element.
There are two types of duplicates - duplicate elements in _different
bundles_ (typically from at-least-once sources) and duplicates arising
due to bundle reprocessing (affecting only transforms with side-effects,
that is what we solve by @RequiresStableInput). The point I'm trying to
get to - should we add these properties to PCollections (contains
cross-bundle duplicates vs. does not) and PTransforms ("outputs
deduplicated elements" and "requires stable input")? That would allow us
to analyze the Pipeline DAG and provide appropriate implementation for
Reshuffle automatically, so that a new URN or flag would not be needed.
Moreover, this might be useful for a broader range of optimizations.
WDYT?
Jan
On 1/30/24 23:22, Robert Burke wrote:
Is the benefit of this proposal just the bounded deviation from the
existing reshuffle?
Reshuffle is already rather dictated by arbitrary runner choice, from
simply ignoring the node, to forcing a materialization break, to a
full shuffle implementation which has additional side effects.
But model wise I don't believe it guarantees specific checkpointing or
re-execution behavior as currently specified. The proto only says it
represents the operation (without specifying the behavior, that is a
big problem).
I guess my concern here is that it implies/codifies that the existing
reshuffle has more behavior than it promises outside of the Java SDK.
"Allowing duplicates" WRT reshuffle is tricky. It feels like mostly
allows an implementation that may mean the inputs into the reshuffle
might be re-executed for example. But that's always under the runner's
discretion , and ultimately it could also prevent even getting the
intended benefit of a reshuffle (notionally, just a fusion break).
Is there even a valid way to implement the notion of a reshuffle that
leads to duplicates outside of a retry/resilience case?
-------
To be clear, I'm not against the proposal. I'm against that its being
built on a non-existent foundation. If the behavior isn't already
defined, it's impossible to specify a real deviation from it.
I'm all for more specific behaviors if means we actually clarify what
the original version is in the protos, since its news to me ( just
now, because I looked) that the Java reshuffle promises GBK-like side
effects. But that's a long deprecated transform without a satisfying
replacement for it's usage, so it may be moot.
Robert Burke
On Tue, Jan 30, 2024, 1:34 PM Kenneth Knowles <k...@apache.org> wrote:
Hi all,
Just when you thought I had squeezed all the possible interest out
of this most boring-seeming of transforms :-)
I wrote up a very quick proposal as a doc [1]. It is short enough
that I will also put the main idea and main question in this email
so you can quickly read. Best to put comments in the.
Main idea: add a variation of Reshuffle that allows duplicates,
aka "at least once", so that users and runners can benefit from
efficiency if it is possible
Main question: is it best as a parameter to existing reshuffle
transforms or as new URN(s)? I have proposed it as a parameter but
I think either one could work.
I would love feedback on the main idea, main question, or anywhere
on the doc.
Thanks!
Kenn
[1] https://s.apache.org/beam-reshuffle-allowing-duplicates