Hi,
there is also one other thing to mention with relation to
Reshuffle/RequiresStableinput and that is that our current
implementation of RequiresStableInput can break without Reshuffle in
some corner cases on most portable runners, at least with Java
GreedyPipelineFuser, see [1]. The only way to workaround this currently
is inserting Reshuffle (or any other fusion-break transform) directly
before the stable DoFn (Reshuffle is handy, because it does not change
the data). I think we should either somehow fix the issue [1] or include
fusion break as a mandatory requirement for the new Redistribute
transform as well (at least with some variant) or possibly add a new
"hint" for non-optional fusion breaking.
Jan
[1] https://github.com/apache/beam/issues/24655
On 10/5/23 21:01, Robert Burke wrote:
Reshuffle/redistribute being a transform has the benefit of allowing
existing runners that aren't updated to be aware of the new urns to
rely on an SDK side implementation, which may be more expensive than
what the runner is able to do with that awareness.
Aka: it gives purpose to the fallback implementations.
On Thu, Oct 5, 2023, 9:03 AM Kenneth Knowles <k...@apache.org> wrote:
Another perspective, ignoring runners custom implementations and
non-Java SDKs could be that the semantics are perfectly well
defined: it is a composite and its semantics are defined by its
implementation in terms of primitives. It is just that this
expansion is not what we want so we should not use it (and also we
shouldn't use "whatever the implementation does" as a spec for
anything we care about).
On Thu, Oct 5, 2023 at 11:56 AM Kenneth Knowles <k...@apache.org>
wrote:
I totally agree. I am motivated right now by the fact that it
is already used all over the place but with no consistent
semantics. Maybe it is simpler to focus on just making the
minimal change, which would basically be to update the
expansion of the Reshuffle in the Java SDK.
Kenn
On Thu, Oct 5, 2023 at 11:39 AM John Casey
<theotherj...@google.com> wrote:
Given that this is a hint, I'm not sure redistribute
should be a PTransform as opposed to some other way to
hint to a runner.
I'm not sure of what the syntax of that would be, but a
semantic no-op transform that the runner may or may not do
anything with is odd.
On Thu, Oct 5, 2023 at 11:30 AM Kenneth Knowles
<k...@apache.org> wrote:
So a high level suggestion from Robert that I want to
highlight as a top-post:
Instead of focusing on just fixing the SDKs and
runners Reshuffle, this could be an opportunity to
introduce Redistribute which was proposed in the
long-ago thread. The semantics are identical but it is
more clear that it /only/ is a hint about
redistributing data and there is no expectation of a
checkpoint.
This new name may also be an opportunity to maintain
update compatibility (though this may actually be
leaving unsafe code in user's hands) and/or
separate @RequiresStableInput/checkpointing uses of
Reshuffle from redistribution-only uses of Reshuffle.
Any other thoughts on this one high level bit?
Kenn
On Thu, Oct 5, 2023 at 11:15 AM Kenneth Knowles
<k...@apache.org> wrote:
On Wed, Oct 4, 2023 at 7:45 PM Robert Burke
<lostl...@apache.org> wrote:
LGTM.
It looks the Go SDK already adheres to these
semantics as well for the reference impl(well,
reshuffle/redistribute_randomly, _by_key isn't
implemented in the Go SDK, and only uses the
existing unqualified reshuffle URN [0].
The original strategy, and then for every
element, the original Window, TS, and Pane are
all serialized, shuffled, and then
deserialized downstream.
https://github.com/apache/beam/blob/master/sdks/go/pkg/beam/core/runtime/exec/reshuffle.go#L65
https://github.com/apache/beam/blob/master/sdks/go/pkg/beam/core/runtime/exec/reshuffle.go#L145
Prism at the moment vaccuously implements
reshuffle by omitting the node, and rewriting
the inputs and outputs [1], as it's a local
runner with single transform per bundle
execution, but I was intending to make it a
fusion break regardless. Ultimately prism's
"test" variant will default to executing the
SDKs dictated reference implementation for the
composite(s), and any "fast" or "prod" variant
would simply do the current implementation.
Very nice!
And of course I should have linked out to the
existing reshuffle URN in the proto.
Kenn
Robert Burke
Beam Go Busybody
[0]:
https://github.com/apache/beam/blob/master/sdks/go/pkg/beam/core/runtime/graphx/translate.go#L46C3-L46C50
[1]:
https://github.com/apache/beam/blob/master/sdks/go/pkg/beam/runners/prism/internal/handlerunner.go#L82
On 2023/09/26 15:43:53 Kenneth Knowles wrote:
> Hi everyone,
>
> Recently there was a bug [1] caused by
discrepancies between two of
> Dataflow's reshuffle implementations. I
think the reference implementation
> in the Java SDK [2] also does not match.
This all led to discussion on the
> bug and the pull request [3] about what the
actual semantics should be. I
> got it wrong, maybe multiple times. So I
wrote up a very short document to
> finish the discussion:
>
> https://s.apache.org/beam-reshuffle
>
> This is also probably among the simplest
imaginable use of
> http://s.apache.org/ptransform-design-doc in
case you want to see kind of
> how I intended it to be used.
>
> Kenn
>
> [1] https://github.com/apache/beam/issues/28219
> [2]
>
https://github.com/apache/beam/blob/d52b077ad505c8b50f10ec6a4eb83d385cdaf96a/sdks/java/core/src/main/java/org/apache/beam/sdk/transforms/Reshuffle.java#L84
> [3] https://github.com/apache/beam/pull/28272
>