On Fri, Oct 13, 2023 at 12:51 PM Jan Lukavský <je...@seznam.cz> wrote:
>
> Hi,
>
> I think there's been already said nearly everything in this thread, but ... 
> it is time for Friday discussions. :)
>
> Today I recalled of a discussion we've had long time ago, when we were 
> designing Euphoria (btw, deprecating and removing it is still on my todo 
> list, I should create a vote thread for that). We had 4 primitives:
>
>  a) non-shuffle, stateless ~ stateless ParDo
>
>  b) shuffle, stateful ~ stateful ParDo, with the ability (under the right 
> circumstances,  i.e. defined event-time trigger, defined state merge 
> function, ...) to be performed in a "combinable way".
>
>  c) shuffle, stateless ~ Reshuffle
>
>  d) non-shuffle, stateful - nope, makes no sense :) - part of the "combinable 
> stateful shuffle operation"
>
>  e) union ~ Flatten
>
> Turns out you can build everything bottom up from these.
>
> Now, the not-so-well defined semantics of Reshuffle (Redistribute) might 
> arise from the fact it is not a primitive. Stateless shuffling of data is 
> definitely a primitive of all runners.

Not Dataflow :-)

But more importantly, Beam primitives are deliberately chosen to be
fundamental data operations, not physical plan steps that a runner
might use. In other words, Beam is decidedly _not_ a library for
building composites that eventually are constructed from runner
primitives. It is more like SQL in that it is a library for building
composites that eventually are constructed from fundamental operations
on data, that every engine (like every RDBMS) will be able to
implement in its own way.

Kenn

>
> Therefore here goes the question - should Redistribute be a primitive and not 
> be built up from other transforms?
>
> Best,
>
>  Jan
>
> On 10/6/23 21:12, Kenneth Knowles wrote:
>
>
>
> On Fri, Oct 6, 2023 at 3:07 PM Jan Lukavský <je...@seznam.cz> wrote:
>>
>>
>> On 10/6/23 15:11, Kenneth Knowles wrote:
>>
>>
>>
>> On Fri, Oct 6, 2023 at 3:20 AM Jan Lukavský <je...@seznam.cz> wrote:
>>>
>>> Hi,
>>>
>>> there is also one other thing to mention with relation to 
>>> Reshuffle/RequiresStableinput and that is that our current implementation 
>>> of RequiresStableInput can break without Reshuffle in some corner cases on 
>>> most portable runners, at least with Java GreedyPipelineFuser, see [1]. The 
>>> only way to workaround this currently is inserting Reshuffle (or any other 
>>> fusion-break transform) directly before the stable DoFn (Reshuffle is 
>>> handy, because it does not change the data). I think we should either 
>>> somehow fix the issue [1] or include fusion break as a mandatory 
>>> requirement for the new Redistribute transform as well (at least with some 
>>> variant) or possibly add a new "hint" for non-optional fusion breaking.
>>
>> This is actually the bug we have wanted to fix for years - redistribute has 
>> nothing to do with checkpointing or stable input and Reshuffle incorrectly 
>> merges the two concepts.
>>
>> I agree that we couldn't make any immediate change that will break a runner. 
>> I believe runners that depend on Reshuffle to provide stable input will also 
>> provide stable input after GroupByKey. Since the SDK expansion of Reshuffle 
>> will still contains a GBK, those runners functionality will be unchanged.
>>
>> I don't yet have a firm opinion between the these approaches:
>>
>> 1. Adjust the Java SDK implementation of Reshuffle (and maybe other SDKs if 
>> needed). With some flag so that users can use the old wrong behavior for 
>> update compatibility.
>> 2. Add a Redistribute transform to the SDKs that has the right behavior and 
>> leave Reshuffle as it is.
>> 1+2. Add the Redistribute transform but also make Reshuffle call it, so 
>> Reshuffle also gets the new behavior, with the same flag so that users can 
>> use the old wrong behavior for update compatibility.
>>
>> All of these will leave "Reshuffle for RequestStableInput" alone for now. 
>> The options that include (2) will move us a little closer to migrating to a 
>> "better" future state.
>>
>> I might have not expressed the right way. I understand that Reshuffle having 
>> "stable input" functionality is non-portable side-effect. It would be nice 
>> to get rid of it and my impression from this thread was that we would try to 
>> deprecate Reshuffle and introduce Redistribute which will not have such 
>> semantics. All of this is fine, problem is that we currently (is some corner 
>> cases) rely on Reshuffle *even though* Pipeline uses @RequiresStableInput. 
>> That is due to the fact that Reshuffle also ensures fusion breaking.  Fusing 
>> non-deterministic DoFn with stable DoFn breaks the stable input property, 
>> because runners can ensure stability only at the input of executable stage. 
>> Therefore we would either need to:
>>
>>  a) define Redistribute as being an unconditional fusion break boundary, or
>>
>>  b) define some other transform or hint to be able to enforce fusion breaking
>>
>> Otherwise I'd be in favor of 2 and deprecation of Reshuffle.
>
>
> Just to be very clear - my goal right now is to just give Reshuffle a 
> consistent semantics. Even for the old "stable input + redistribute" use of 
> Reshuffle, the semantics are inconsistent/undefined and the Java SDK 
> expansion is wrong. Changing things having to do with stable input are not 
> part of what I am trying to change right now. But it is fine to do things 
> that prepare for that.
>
> Kenn
>
>>
>>  Jan
>>
>>
>> Any votes? Any other options?
>>
>> Kenn
>>
>>>  Jan
>>>
>>> [1] https://github.com/apache/beam/issues/24655
>>>
>>> On 10/5/23 21:01, Robert Burke wrote:
>>>
>>> Reshuffle/redistribute being a transform has the benefit of allowing 
>>> existing runners that aren't updated to be aware of the new urns to rely on 
>>> an SDK side implementation, which may be more expensive than what the 
>>> runner is able to do with that awareness.
>>>
>>> Aka: it gives purpose to the fallback implementations.
>>>
>>> On Thu, Oct 5, 2023, 9:03 AM Kenneth Knowles <k...@apache.org> wrote:
>>>>
>>>> Another perspective, ignoring runners custom implementations and non-Java 
>>>> SDKs could be that the semantics are perfectly well defined: it is a 
>>>> composite and its semantics are defined by its implementation in terms of 
>>>> primitives. It is just that this expansion is not what we want so we 
>>>> should not use it (and also we shouldn't use "whatever the implementation 
>>>> does" as a spec for anything we care about).
>>>>
>>>> On Thu, Oct 5, 2023 at 11:56 AM Kenneth Knowles <k...@apache.org> wrote:
>>>>>
>>>>> I totally agree. I am motivated right now by the fact that it is already 
>>>>> used all over the place but with no consistent semantics. Maybe it is 
>>>>> simpler to focus on just making the minimal change, which would basically 
>>>>> be to update the expansion of the Reshuffle in the Java SDK.
>>>>>
>>>>> Kenn
>>>>>
>>>>> On Thu, Oct 5, 2023 at 11:39 AM John Casey <theotherj...@google.com> 
>>>>> wrote:
>>>>>>
>>>>>> Given that this is a hint, I'm not sure redistribute should be a 
>>>>>> PTransform as opposed to some other way to hint to a runner.
>>>>>>
>>>>>> I'm not sure of what the syntax of that would be, but a semantic no-op 
>>>>>> transform that the runner may or may not do anything with is odd.
>>>>>>
>>>>>> On Thu, Oct 5, 2023 at 11:30 AM Kenneth Knowles <k...@apache.org> wrote:
>>>>>>>
>>>>>>> So a high level suggestion from Robert that I want to highlight as a 
>>>>>>> top-post:
>>>>>>>
>>>>>>> Instead of focusing on just fixing the SDKs and runners Reshuffle, this 
>>>>>>> could be an opportunity to introduce Redistribute which was proposed in 
>>>>>>> the long-ago thread. The semantics are identical but it is more clear 
>>>>>>> that it only is a hint about redistributing data and there is no 
>>>>>>> expectation of a checkpoint.
>>>>>>>
>>>>>>> This new name may also be an opportunity to maintain update 
>>>>>>> compatibility (though this may actually be leaving unsafe code in 
>>>>>>> user's hands) and/or separate @RequiresStableInput/checkpointing uses 
>>>>>>> of Reshuffle from redistribution-only uses of Reshuffle.
>>>>>>>
>>>>>>> Any other thoughts on this one high level bit?
>>>>>>>
>>>>>>> Kenn
>>>>>>>
>>>>>>> On Thu, Oct 5, 2023 at 11:15 AM Kenneth Knowles <k...@apache.org> wrote:
>>>>>>>>
>>>>>>>>
>>>>>>>> On Wed, Oct 4, 2023 at 7:45 PM Robert Burke <lostl...@apache.org> 
>>>>>>>> wrote:
>>>>>>>>>
>>>>>>>>> LGTM.
>>>>>>>>>
>>>>>>>>> It looks the Go SDK already adheres to these semantics as well for 
>>>>>>>>> the reference impl(well, reshuffle/redistribute_randomly, _by_key 
>>>>>>>>> isn't implemented in the Go SDK, and only uses the existing 
>>>>>>>>> unqualified reshuffle URN [0].
>>>>>>>>>
>>>>>>>>> The original strategy, and then for every element, the original 
>>>>>>>>> Window, TS, and Pane are all serialized, shuffled, and then 
>>>>>>>>> deserialized downstream.
>>>>>>>>>
>>>>>>>>> https://github.com/apache/beam/blob/master/sdks/go/pkg/beam/core/runtime/exec/reshuffle.go#L65
>>>>>>>>>
>>>>>>>>> https://github.com/apache/beam/blob/master/sdks/go/pkg/beam/core/runtime/exec/reshuffle.go#L145
>>>>>>>>>
>>>>>>>>> Prism at the moment vaccuously implements reshuffle by omitting the 
>>>>>>>>> node, and rewriting the inputs and outputs [1], as it's a local 
>>>>>>>>> runner with single transform per bundle execution, but I was 
>>>>>>>>> intending to make it a fusion break regardless.  Ultimately prism's 
>>>>>>>>> "test" variant will default to executing the SDKs dictated reference 
>>>>>>>>> implementation for the composite(s), and any "fast" or "prod" variant 
>>>>>>>>> would simply do the current implementation.
>>>>>>>>
>>>>>>>>
>>>>>>>> Very nice!
>>>>>>>>
>>>>>>>> And of course I should have linked out to the existing reshuffle URN 
>>>>>>>> in the proto.
>>>>>>>>
>>>>>>>> Kenn
>>>>>>>>
>>>>>>>>
>>>>>>>>>
>>>>>>>>>
>>>>>>>>> Robert Burke
>>>>>>>>> Beam Go Busybody
>>>>>>>>>
>>>>>>>>> [0]: 
>>>>>>>>> https://github.com/apache/beam/blob/master/sdks/go/pkg/beam/core/runtime/graphx/translate.go#L46C3-L46C50
>>>>>>>>> [1]: 
>>>>>>>>> https://github.com/apache/beam/blob/master/sdks/go/pkg/beam/runners/prism/internal/handlerunner.go#L82
>>>>>>>>>
>>>>>>>>>
>>>>>>>>>
>>>>>>>>> On 2023/09/26 15:43:53 Kenneth Knowles wrote:
>>>>>>>>> > Hi everyone,
>>>>>>>>> >
>>>>>>>>> > Recently there was a bug [1] caused by discrepancies between two of
>>>>>>>>> > Dataflow's reshuffle implementations. I think the reference 
>>>>>>>>> > implementation
>>>>>>>>> > in the Java SDK [2] also does not match. This all led to discussion 
>>>>>>>>> > on the
>>>>>>>>> > bug and the pull request [3] about what the actual semantics should 
>>>>>>>>> > be. I
>>>>>>>>> > got it wrong, maybe multiple times. So I wrote up a very short 
>>>>>>>>> > document to
>>>>>>>>> > finish the discussion:
>>>>>>>>> >
>>>>>>>>> >     https://s.apache.org/beam-reshuffle
>>>>>>>>> >
>>>>>>>>> > This is also probably among the simplest imaginable use of
>>>>>>>>> > http://s.apache.org/ptransform-design-doc in case you want to see 
>>>>>>>>> > kind of
>>>>>>>>> > how I intended it to be used.
>>>>>>>>> >
>>>>>>>>> > Kenn
>>>>>>>>> >
>>>>>>>>> > [1] https://github.com/apache/beam/issues/28219
>>>>>>>>> > [2]
>>>>>>>>> > https://github.com/apache/beam/blob/d52b077ad505c8b50f10ec6a4eb83d385cdaf96a/sdks/java/core/src/main/java/org/apache/beam/sdk/transforms/Reshuffle.java#L84
>>>>>>>>> > [3] https://github.com/apache/beam/pull/28272
>>>>>>>>> >

Reply via email to