I think we stated that CoGroupbyKey was also a primitive, though in
practice it's implemented in terms of GroupByKey today.

On Fri, Oct 21, 2022 at 3:05 PM Kenneth Knowles <k...@apache.org> wrote:

>
>
> On Fri, Oct 21, 2022 at 5:24 AM Jan Lukavský <je...@seznam.cz> wrote:
>
>> Hi,
>>
>> I have some missing pieces in my understanding of the set of Beam's
>> primitive transforms, which I'd like to fill. First a quick recap of what I
>> think is the current state. We have (basically) the following primitive
>> transforms:
>>
>>  - DoFn (stateless, stateful, splittable)
>>
>>  - Window
>>
>>  - Impulse
>>
>>  - GroupByKey
>>
>>  - Combine
>>
>
> Not a primitive, just a well-defined transform that runners can execute in
> special ways.
>
>
>>
>>
>>  - Flatten (pCollections)
>>
>
> The rest, yes.
>
>
>
>> Inside runners, we most often transform GBK into ReduceFn
>> (ReduceFnRunner), which does the actual logic for both GBK and stateful
>> DoFn.
>>
>
> ReduceFnRunner is for windowing / triggers and has special feature to use
> a CombineFn while doing it. Nothing to do with stateful DoFn.
>
>
>
>> I'll compare this to the set of transforms we used to use in Euphoria
>> (currently java SDK extension):
>>
>>  - FlatMap ~~ stateless DoFn
>>
>>  - Union ~~ Flatten
>>
>>  - ReduceStateByKey ~~ stateful DoFn, GBK, Combine, Window
>>
>
> Stateful DoFn does not require associative or commutative operation, while
> reduce/combine does. Windowing is really just a secondary key for
> GBK/Combine that allows completion of unbounded aggregations but has no
> computation associated with it.
>
>
>
>>  - (missing Impulse)
>>
>
> Then you must have some primitive sources with splitting?
>
>
>>  - (missing splittable DoFn)
>>
>
> Kind of the same question - SDF is the one and only primitive that creates
> parallelism.
>
> The ReduceStateByKey is a transform that is a "combinable stateful DoFn" -
>> i.e. the state might be created pre-shuffle, on trigger the state is
>> shuffled and then merged. In Beam we already have CombiningState and
>> MergingState facility (sort of), which is what is needed, we just do not
>> have the ability to shuffle the partial states and then combine them. This
>> also relates to the inability to run stateful DoFn for merging windowFns,
>> because that is needed there as well. Is this something that is
>> fundamentally impossible to define for all runners? What is worth noting is
>> that building, shuffling and merging the state before shuffle requires
>> compatible trigger (purely based on watermark), otherwise the transform
>> fall-backs to "classical DoFn".
>>
>
> Stateful DoFn for merging windows can be defined. You could require all
> state to be mergeable and then it is automatic. Or you could have an
> "onMerge" callback. These should both be fine. The automatic version is
> less likely to have nonsensical semantics, but allowing the callback to do
> "whatever it wants" whether the result is good or not is more consistent
> with the design of stateful DoFn.
>
> Whether and where a shuffle takes place may vary. Start with the maths.
>
> Kenn
>
>
>> Bottom line: I'm thinking of proposing to drop Euphoria extension,
>> because it has essentially no users and actually no maintainers, but I have
>> a feeling there is a value in the set of operators that could be
>> transferred to Beam core, maybe. I'm pretty sure it would bring value to
>> users to have access to a "combining stateful DoFn" primitive (even better
>> would be "combining splittable DoFn").
>>
>> Looking forward to any comments on this.
>>
>>  Jan
>>
>>
>>

Reply via email to