Hi,

there is also one other thing to mention with relation to Reshuffle/RequiresStableinput and that is that our current implementation of RequiresStableInput can break without Reshuffle in some corner cases on most portable runners, at least with Java GreedyPipelineFuser, see [1]. The only way to workaround this currently is inserting Reshuffle (or any other fusion-break transform) directly before the stable DoFn (Reshuffle is handy, because it does not change the data). I think we should either somehow fix the issue [1] or include fusion break as a mandatory requirement for the new Redistribute transform as well (at least with some variant) or possibly add a new "hint" for non-optional fusion breaking.

 Jan

[1] https://github.com/apache/beam/issues/24655

On 10/5/23 21:01, Robert Burke wrote:
Reshuffle/redistribute being a transform has the benefit of allowing existing runners that aren't updated to be aware of the new urns to rely on an SDK side implementation, which may be more expensive than what the runner is able to do with that awareness.

Aka: it gives purpose to the fallback implementations.

On Thu, Oct 5, 2023, 9:03 AM Kenneth Knowles <k...@apache.org> wrote:

    Another perspective, ignoring runners custom implementations and
    non-Java SDKs could be that the semantics are perfectly well
    defined: it is a composite and its semantics are defined by its
    implementation in terms of primitives. It is just that this
    expansion is not what we want so we should not use it (and also we
    shouldn't use "whatever the implementation does" as a spec for
    anything we care about).

    On Thu, Oct 5, 2023 at 11:56 AM Kenneth Knowles <k...@apache.org>
    wrote:

        I totally agree. I am motivated right now by the fact that it
        is already used all over the place but with no consistent
        semantics. Maybe it is simpler to focus on just making the
        minimal change, which would basically be to update the
        expansion of the Reshuffle in the Java SDK.

        Kenn

        On Thu, Oct 5, 2023 at 11:39 AM John Casey
        <theotherj...@google.com> wrote:

            Given that this is a hint, I'm not sure redistribute
            should be a PTransform as opposed to some other way to
            hint to a runner.

            I'm not sure of what the syntax of that would be, but a
            semantic no-op transform that the runner may or may not do
            anything with is odd.

            On Thu, Oct 5, 2023 at 11:30 AM Kenneth Knowles
            <k...@apache.org> wrote:

                So a high level suggestion from Robert that I want to
                highlight as a top-post:

                Instead of focusing on just fixing the SDKs and
                runners Reshuffle, this could be an opportunity to
                introduce Redistribute which was proposed in the
                long-ago thread. The semantics are identical but it is
                more clear that it /only/ is a hint about
                redistributing data and there is no expectation of a
                checkpoint.

                This new name may also be an opportunity to maintain
                update compatibility (though this may actually be
                leaving unsafe code in user's hands) and/or
                separate @RequiresStableInput/checkpointing uses of
                Reshuffle from redistribution-only uses of Reshuffle.

                Any other thoughts on this one high level bit?

                Kenn

                On Thu, Oct 5, 2023 at 11:15 AM Kenneth Knowles
                <k...@apache.org> wrote:


                    On Wed, Oct 4, 2023 at 7:45 PM Robert Burke
                    <lostl...@apache.org> wrote:

                        LGTM.

                        It looks the Go SDK already adheres to these
                        semantics as well for the reference impl(well,
                        reshuffle/redistribute_randomly, _by_key isn't
                        implemented in the Go SDK, and only uses the
                        existing unqualified reshuffle URN [0].

                        The original strategy, and then for every
                        element, the original Window, TS, and Pane are
                        all serialized, shuffled, and then
                        deserialized downstream.

                        
https://github.com/apache/beam/blob/master/sdks/go/pkg/beam/core/runtime/exec/reshuffle.go#L65

                        
https://github.com/apache/beam/blob/master/sdks/go/pkg/beam/core/runtime/exec/reshuffle.go#L145

                        Prism at the moment vaccuously implements
                        reshuffle by omitting the node, and rewriting
                        the inputs and outputs [1], as it's a local
                        runner with single transform per bundle
                        execution, but I was intending to make it a
                        fusion break regardless.  Ultimately prism's
                        "test" variant will default to executing the
                        SDKs dictated reference implementation for the
                        composite(s), and any "fast" or "prod" variant
                        would simply do the current implementation.


                    Very nice!

                    And of course I should have linked out to the
                    existing reshuffle URN in the proto.

                    Kenn


                        Robert Burke
                        Beam Go Busybody

                        [0]:
                        
https://github.com/apache/beam/blob/master/sdks/go/pkg/beam/core/runtime/graphx/translate.go#L46C3-L46C50
                        [1]:
                        
https://github.com/apache/beam/blob/master/sdks/go/pkg/beam/runners/prism/internal/handlerunner.go#L82



                        On 2023/09/26 15:43:53 Kenneth Knowles wrote:
                        > Hi everyone,
                        >
                        > Recently there was a bug [1] caused by
                        discrepancies between two of
                        > Dataflow's reshuffle implementations. I
                        think the reference implementation
                        > in the Java SDK [2] also does not match.
                        This all led to discussion on the
                        > bug and the pull request [3] about what the
                        actual semantics should be. I
                        > got it wrong, maybe multiple times. So I
                        wrote up a very short document to
                        > finish the discussion:
                        >
                        > https://s.apache.org/beam-reshuffle
                        >
                        > This is also probably among the simplest
                        imaginable use of
                        > http://s.apache.org/ptransform-design-doc in
                        case you want to see kind of
                        > how I intended it to be used.
                        >
                        > Kenn
                        >
                        > [1] https://github.com/apache/beam/issues/28219
                        > [2]
                        >
                        
https://github.com/apache/beam/blob/d52b077ad505c8b50f10ec6a4eb83d385cdaf96a/sdks/java/core/src/main/java/org/apache/beam/sdk/transforms/Reshuffle.java#L84
                        > [3] https://github.com/apache/beam/pull/28272
                        >

Reply via email to