On Tue, Oct 25, 2022 at 5:53 AM Jan Lukavský <je...@seznam.cz> wrote:

> > Not quite IMO. It is a subtle difference. Perhaps these transforms can
> be *implemented* using stateful DoFn, but defining their semantics directly
> at a high level is more powerful. The higher level we can make transforms,
> the more flexibility we have in the runners. You *could* suggest that we
> take the same approach as we do with Combine: not a primitive, but a
> special transform that we optimize. You could say that "vanilla ParDo" is a
> composite that has a stateful ParDo implementation, but a runner can
> implement the composite more efficiently (without a shuffle). Same with
> CoGBK. You could say that there is a default expansion of CoGBK that uses
> stateful DoFn (which implies a shuffle) but that smart runners will not use
> that expansion.
>
> Yes, semantics > optimizations. For optimizations Beam already has a
> facility - PTransformOverride. There is no fundamental difference about how
> we treat Combine wrt GBK. It *can* be expanded using GBK, but "smart
> runners will not use that expansion". This is essentially the root of this
> discussion.
>
> If I rephrase it:
>
>  a) why do we distinguish between "some" actually composite transforms
> treating them as primitive, while others have expansions, although the
> fundamental reasoning seems the same for both (performance)?
>
>  b) is there a fundamental reason why we do not support stateful DoFn for
> merging windows?
>
Mostly because we would need the API to include a merge capability, and
that has never been implemented.


> I feel that these are related and have historical reasons, but I'd like to
> know that for sure. :)
>
>  Jan
> On 10/24/22 19:59, Kenneth Knowles wrote:
>
>
>
> On Mon, Oct 24, 2022 at 5:51 AM Jan Lukavský <je...@seznam.cz> wrote:
>
>> On 10/22/22 21:47, Reuven Lax via dev wrote:
>>
>> I think we stated that CoGroupbyKey was also a primitive, though in
>> practice it's implemented in terms of GroupByKey today.
>>
>> On Fri, Oct 21, 2022 at 3:05 PM Kenneth Knowles <k...@apache.org> wrote:
>>
>>>
>>>
>>> On Fri, Oct 21, 2022 at 5:24 AM Jan Lukavský <je...@seznam.cz> wrote:
>>>
>>>> Hi,
>>>>
>>>> I have some missing pieces in my understanding of the set of Beam's
>>>> primitive transforms, which I'd like to fill. First a quick recap of what I
>>>> think is the current state. We have (basically) the following primitive
>>>> transforms:
>>>>
>>>>  - DoFn (stateless, stateful, splittable)
>>>>
>>>>  - Window
>>>>
>>>>  - Impulse
>>>>
>>>>  - GroupByKey
>>>>
>>>>  - Combine
>>>>
>>>
>>> Not a primitive, just a well-defined transform that runners can execute
>>> in special ways.
>>>
>> Yep, OK, agree. Performance is orthogonal to semantics.
>>
>>
>>>
>>>>
>>>>
>>>>  - Flatten (pCollections)
>>>>
>>>
>>> The rest, yes.
>>>
>>>
>>>
>>>> Inside runners, we most often transform GBK into ReduceFn
>>>> (ReduceFnRunner), which does the actual logic for both GBK and stateful
>>>> DoFn.
>>>>
>>>
>>> ReduceFnRunner is for windowing / triggers and has special feature to
>>> use a CombineFn while doing it. Nothing to do with stateful DoFn.
>>>
>> My bad, wrong wording. The point was that *all* of the semantics of GBK
>> and Combine can be defined in terms of stateful DoFn. There are some
>> changes needed to stateful DoFn to support the Combine functionality. But
>> as mentioned above - optimization is orthogonal to semantics.
>>
>
> Not quite IMO. It is a subtle difference. Perhaps these transforms can be
> *implemented* using stateful DoFn, but defining their semantics directly at
> a high level is more powerful. The higher level we can make transforms, the
> more flexibility we have in the runners. You *could* suggest that we take
> the same approach as we do with Combine: not a primitive, but a special
> transform that we optimize. You could say that "vanilla ParDo" is a
> composite that has a stateful ParDo implementation, but a runner can
> implement the composite more efficiently (without a shuffle). Same with
> CoGBK. You could say that there is a default expansion of CoGBK that uses
> stateful DoFn (which implies a shuffle) but that smart runners will not use
> that expansion.
>
>>
>>>
>>>
>>>> I'll compare this to the set of transforms we used to use in Euphoria
>>>> (currently java SDK extension):
>>>>
>>>>  - FlatMap ~~ stateless DoFn
>>>>
>>>>  - Union ~~ Flatten
>>>>
>>>>  - ReduceStateByKey ~~ stateful DoFn, GBK, Combine, Window
>>>>
>>>
>>> Stateful DoFn does not require associative or commutative operation,
>>> while reduce/combine does. Windowing is really just a secondary key for
>>> GBK/Combine that allows completion of unbounded aggregations but has no
>>> computation associated with it.
>>>
>> Merging WindowFn contains some computation. The fact that stateful DoFn
>> do not require specific form of reduce function is precisely what makes it
>> the actual primitive, no?
>>
>>
>>>
>>>
>>>>  - (missing Impulse)
>>>>
>>>
>>> Then you must have some primitive sources with splitting?
>>>
>>>
>>>>  - (missing splittable DoFn)
>>>>
>>>
>>> Kind of the same question - SDF is the one and only primitive that
>>> creates parallelism.
>>>
>> Original Euphoria had an analogy to (Un)boundedReader. The SDK extension
>> in Beam works on top of PCollecions and therefore does not deal with IOs.
>>
>>
>>> The ReduceStateByKey is a transform that is a "combinable stateful DoFn"
>>>> - i.e. the state might be created pre-shuffle, on trigger the state is
>>>> shuffled and then merged. In Beam we already have CombiningState and
>>>> MergingState facility (sort of), which is what is needed, we just do not
>>>> have the ability to shuffle the partial states and then combine them. This
>>>> also relates to the inability to run stateful DoFn for merging windowFns,
>>>> because that is needed there as well. Is this something that is
>>>> fundamentally impossible to define for all runners? What is worth noting is
>>>> that building, shuffling and merging the state before shuffle requires
>>>> compatible trigger (purely based on watermark), otherwise the transform
>>>> fall-backs to "classical DoFn".
>>>>
>>>
>>> Stateful DoFn for merging windows can be defined. You could require all
>>> state to be mergeable and then it is automatic. Or you could have an
>>> "onMerge" callback. These should both be fine. The automatic version is
>>> less likely to have nonsensical semantics, but allowing the callback to do
>>> "whatever it wants" whether the result is good or not is more consistent
>>> with the design of stateful DoFn.
>>>
>> Yes, but this is the same for CombineFn, right? The merge (or combine)
>> has to be correctly aligned with the computation. The current situation is
>> that we do not support stateful DoFns for merging WindowFn [1].
>>
>>
>>> Whether and where a shuffle takes place may vary. Start with the maths.
>>>
>> Shuffle happens at least whenever there is a need to regroup keys. I'm
>> not sure which maths you refer to, can you clarify please?
>>
>>  Jan
>>
>> [1]
>> https://github.com/apache/beam/blob/45b6ac71a87bb2ed83613c90d35ef2d0752266bf/runners/core-java/src/main/java/org/apache/beam/runners/core/StatefulDoFnRunner.java#L106
>>
>>
>>> Kenn
>>>
>>>
>>>> Bottom line: I'm thinking of proposing to drop Euphoria extension,
>>>> because it has essentially no users and actually no maintainers, but I have
>>>> a feeling there is a value in the set of operators that could be
>>>> transferred to Beam core, maybe. I'm pretty sure it would bring value to
>>>> users to have access to a "combining stateful DoFn" primitive (even better
>>>> would be "combining splittable DoFn").
>>>>
>>>> Looking forward to any comments on this.
>>>>
>>>>  Jan
>>>>
>>>>
>>>>

Reply via email to