On Fri, Oct 13, 2023 at 12:51 PM Jan Lukavský <je...@seznam.cz> wrote: > > Hi, > > I think there's been already said nearly everything in this thread, but ... > it is time for Friday discussions. :) > > Today I recalled of a discussion we've had long time ago, when we were > designing Euphoria (btw, deprecating and removing it is still on my todo > list, I should create a vote thread for that). We had 4 primitives: > > a) non-shuffle, stateless ~ stateless ParDo > > b) shuffle, stateful ~ stateful ParDo, with the ability (under the right > circumstances, i.e. defined event-time trigger, defined state merge > function, ...) to be performed in a "combinable way". > > c) shuffle, stateless ~ Reshuffle > > d) non-shuffle, stateful - nope, makes no sense :) - part of the "combinable > stateful shuffle operation" > > e) union ~ Flatten > > Turns out you can build everything bottom up from these. > > Now, the not-so-well defined semantics of Reshuffle (Redistribute) might > arise from the fact it is not a primitive. Stateless shuffling of data is > definitely a primitive of all runners.
Not Dataflow :-) But more importantly, Beam primitives are deliberately chosen to be fundamental data operations, not physical plan steps that a runner might use. In other words, Beam is decidedly _not_ a library for building composites that eventually are constructed from runner primitives. It is more like SQL in that it is a library for building composites that eventually are constructed from fundamental operations on data, that every engine (like every RDBMS) will be able to implement in its own way. Kenn > > Therefore here goes the question - should Redistribute be a primitive and not > be built up from other transforms? > > Best, > > Jan > > On 10/6/23 21:12, Kenneth Knowles wrote: > > > > On Fri, Oct 6, 2023 at 3:07 PM Jan Lukavský <je...@seznam.cz> wrote: >> >> >> On 10/6/23 15:11, Kenneth Knowles wrote: >> >> >> >> On Fri, Oct 6, 2023 at 3:20 AM Jan Lukavský <je...@seznam.cz> wrote: >>> >>> Hi, >>> >>> there is also one other thing to mention with relation to >>> Reshuffle/RequiresStableinput and that is that our current implementation >>> of RequiresStableInput can break without Reshuffle in some corner cases on >>> most portable runners, at least with Java GreedyPipelineFuser, see [1]. The >>> only way to workaround this currently is inserting Reshuffle (or any other >>> fusion-break transform) directly before the stable DoFn (Reshuffle is >>> handy, because it does not change the data). I think we should either >>> somehow fix the issue [1] or include fusion break as a mandatory >>> requirement for the new Redistribute transform as well (at least with some >>> variant) or possibly add a new "hint" for non-optional fusion breaking. >> >> This is actually the bug we have wanted to fix for years - redistribute has >> nothing to do with checkpointing or stable input and Reshuffle incorrectly >> merges the two concepts. >> >> I agree that we couldn't make any immediate change that will break a runner. >> I believe runners that depend on Reshuffle to provide stable input will also >> provide stable input after GroupByKey. Since the SDK expansion of Reshuffle >> will still contains a GBK, those runners functionality will be unchanged. >> >> I don't yet have a firm opinion between the these approaches: >> >> 1. Adjust the Java SDK implementation of Reshuffle (and maybe other SDKs if >> needed). With some flag so that users can use the old wrong behavior for >> update compatibility. >> 2. Add a Redistribute transform to the SDKs that has the right behavior and >> leave Reshuffle as it is. >> 1+2. Add the Redistribute transform but also make Reshuffle call it, so >> Reshuffle also gets the new behavior, with the same flag so that users can >> use the old wrong behavior for update compatibility. >> >> All of these will leave "Reshuffle for RequestStableInput" alone for now. >> The options that include (2) will move us a little closer to migrating to a >> "better" future state. >> >> I might have not expressed the right way. I understand that Reshuffle having >> "stable input" functionality is non-portable side-effect. It would be nice >> to get rid of it and my impression from this thread was that we would try to >> deprecate Reshuffle and introduce Redistribute which will not have such >> semantics. All of this is fine, problem is that we currently (is some corner >> cases) rely on Reshuffle *even though* Pipeline uses @RequiresStableInput. >> That is due to the fact that Reshuffle also ensures fusion breaking. Fusing >> non-deterministic DoFn with stable DoFn breaks the stable input property, >> because runners can ensure stability only at the input of executable stage. >> Therefore we would either need to: >> >> a) define Redistribute as being an unconditional fusion break boundary, or >> >> b) define some other transform or hint to be able to enforce fusion breaking >> >> Otherwise I'd be in favor of 2 and deprecation of Reshuffle. > > > Just to be very clear - my goal right now is to just give Reshuffle a > consistent semantics. Even for the old "stable input + redistribute" use of > Reshuffle, the semantics are inconsistent/undefined and the Java SDK > expansion is wrong. Changing things having to do with stable input are not > part of what I am trying to change right now. But it is fine to do things > that prepare for that. > > Kenn > >> >> Jan >> >> >> Any votes? Any other options? >> >> Kenn >> >>> Jan >>> >>> [1] https://github.com/apache/beam/issues/24655 >>> >>> On 10/5/23 21:01, Robert Burke wrote: >>> >>> Reshuffle/redistribute being a transform has the benefit of allowing >>> existing runners that aren't updated to be aware of the new urns to rely on >>> an SDK side implementation, which may be more expensive than what the >>> runner is able to do with that awareness. >>> >>> Aka: it gives purpose to the fallback implementations. >>> >>> On Thu, Oct 5, 2023, 9:03 AM Kenneth Knowles <k...@apache.org> wrote: >>>> >>>> Another perspective, ignoring runners custom implementations and non-Java >>>> SDKs could be that the semantics are perfectly well defined: it is a >>>> composite and its semantics are defined by its implementation in terms of >>>> primitives. It is just that this expansion is not what we want so we >>>> should not use it (and also we shouldn't use "whatever the implementation >>>> does" as a spec for anything we care about). >>>> >>>> On Thu, Oct 5, 2023 at 11:56 AM Kenneth Knowles <k...@apache.org> wrote: >>>>> >>>>> I totally agree. I am motivated right now by the fact that it is already >>>>> used all over the place but with no consistent semantics. Maybe it is >>>>> simpler to focus on just making the minimal change, which would basically >>>>> be to update the expansion of the Reshuffle in the Java SDK. >>>>> >>>>> Kenn >>>>> >>>>> On Thu, Oct 5, 2023 at 11:39 AM John Casey <theotherj...@google.com> >>>>> wrote: >>>>>> >>>>>> Given that this is a hint, I'm not sure redistribute should be a >>>>>> PTransform as opposed to some other way to hint to a runner. >>>>>> >>>>>> I'm not sure of what the syntax of that would be, but a semantic no-op >>>>>> transform that the runner may or may not do anything with is odd. >>>>>> >>>>>> On Thu, Oct 5, 2023 at 11:30 AM Kenneth Knowles <k...@apache.org> wrote: >>>>>>> >>>>>>> So a high level suggestion from Robert that I want to highlight as a >>>>>>> top-post: >>>>>>> >>>>>>> Instead of focusing on just fixing the SDKs and runners Reshuffle, this >>>>>>> could be an opportunity to introduce Redistribute which was proposed in >>>>>>> the long-ago thread. The semantics are identical but it is more clear >>>>>>> that it only is a hint about redistributing data and there is no >>>>>>> expectation of a checkpoint. >>>>>>> >>>>>>> This new name may also be an opportunity to maintain update >>>>>>> compatibility (though this may actually be leaving unsafe code in >>>>>>> user's hands) and/or separate @RequiresStableInput/checkpointing uses >>>>>>> of Reshuffle from redistribution-only uses of Reshuffle. >>>>>>> >>>>>>> Any other thoughts on this one high level bit? >>>>>>> >>>>>>> Kenn >>>>>>> >>>>>>> On Thu, Oct 5, 2023 at 11:15 AM Kenneth Knowles <k...@apache.org> wrote: >>>>>>>> >>>>>>>> >>>>>>>> On Wed, Oct 4, 2023 at 7:45 PM Robert Burke <lostl...@apache.org> >>>>>>>> wrote: >>>>>>>>> >>>>>>>>> LGTM. >>>>>>>>> >>>>>>>>> It looks the Go SDK already adheres to these semantics as well for >>>>>>>>> the reference impl(well, reshuffle/redistribute_randomly, _by_key >>>>>>>>> isn't implemented in the Go SDK, and only uses the existing >>>>>>>>> unqualified reshuffle URN [0]. >>>>>>>>> >>>>>>>>> The original strategy, and then for every element, the original >>>>>>>>> Window, TS, and Pane are all serialized, shuffled, and then >>>>>>>>> deserialized downstream. >>>>>>>>> >>>>>>>>> https://github.com/apache/beam/blob/master/sdks/go/pkg/beam/core/runtime/exec/reshuffle.go#L65 >>>>>>>>> >>>>>>>>> https://github.com/apache/beam/blob/master/sdks/go/pkg/beam/core/runtime/exec/reshuffle.go#L145 >>>>>>>>> >>>>>>>>> Prism at the moment vaccuously implements reshuffle by omitting the >>>>>>>>> node, and rewriting the inputs and outputs [1], as it's a local >>>>>>>>> runner with single transform per bundle execution, but I was >>>>>>>>> intending to make it a fusion break regardless. Ultimately prism's >>>>>>>>> "test" variant will default to executing the SDKs dictated reference >>>>>>>>> implementation for the composite(s), and any "fast" or "prod" variant >>>>>>>>> would simply do the current implementation. >>>>>>>> >>>>>>>> >>>>>>>> Very nice! >>>>>>>> >>>>>>>> And of course I should have linked out to the existing reshuffle URN >>>>>>>> in the proto. >>>>>>>> >>>>>>>> Kenn >>>>>>>> >>>>>>>> >>>>>>>>> >>>>>>>>> >>>>>>>>> Robert Burke >>>>>>>>> Beam Go Busybody >>>>>>>>> >>>>>>>>> [0]: >>>>>>>>> https://github.com/apache/beam/blob/master/sdks/go/pkg/beam/core/runtime/graphx/translate.go#L46C3-L46C50 >>>>>>>>> [1]: >>>>>>>>> https://github.com/apache/beam/blob/master/sdks/go/pkg/beam/runners/prism/internal/handlerunner.go#L82 >>>>>>>>> >>>>>>>>> >>>>>>>>> >>>>>>>>> On 2023/09/26 15:43:53 Kenneth Knowles wrote: >>>>>>>>> > Hi everyone, >>>>>>>>> > >>>>>>>>> > Recently there was a bug [1] caused by discrepancies between two of >>>>>>>>> > Dataflow's reshuffle implementations. I think the reference >>>>>>>>> > implementation >>>>>>>>> > in the Java SDK [2] also does not match. This all led to discussion >>>>>>>>> > on the >>>>>>>>> > bug and the pull request [3] about what the actual semantics should >>>>>>>>> > be. I >>>>>>>>> > got it wrong, maybe multiple times. So I wrote up a very short >>>>>>>>> > document to >>>>>>>>> > finish the discussion: >>>>>>>>> > >>>>>>>>> > https://s.apache.org/beam-reshuffle >>>>>>>>> > >>>>>>>>> > This is also probably among the simplest imaginable use of >>>>>>>>> > http://s.apache.org/ptransform-design-doc in case you want to see >>>>>>>>> > kind of >>>>>>>>> > how I intended it to be used. >>>>>>>>> > >>>>>>>>> > Kenn >>>>>>>>> > >>>>>>>>> > [1] https://github.com/apache/beam/issues/28219 >>>>>>>>> > [2] >>>>>>>>> > https://github.com/apache/beam/blob/d52b077ad505c8b50f10ec6a4eb83d385cdaf96a/sdks/java/core/src/main/java/org/apache/beam/sdk/transforms/Reshuffle.java#L84 >>>>>>>>> > [3] https://github.com/apache/beam/pull/28272 >>>>>>>>> >