On Fri, Oct 21, 2022 at 5:24 AM Jan Lukavský <je...@seznam.cz> wrote:

> Hi,
>
> I have some missing pieces in my understanding of the set of Beam's
> primitive transforms, which I'd like to fill. First a quick recap of what I
> think is the current state. We have (basically) the following primitive
> transforms:
>
>  - DoFn (stateless, stateful, splittable)
>
>  - Window
>
>  - Impulse
>
>  - GroupByKey
>
>  - Combine
>

Not a primitive, just a well-defined transform that runners can execute in
special ways.


>
>
>  - Flatten (pCollections)
>

The rest, yes.



> Inside runners, we most often transform GBK into ReduceFn
> (ReduceFnRunner), which does the actual logic for both GBK and stateful
> DoFn.
>

ReduceFnRunner is for windowing / triggers and has special feature to use a
CombineFn while doing it. Nothing to do with stateful DoFn.



> I'll compare this to the set of transforms we used to use in Euphoria
> (currently java SDK extension):
>
>  - FlatMap ~~ stateless DoFn
>
>  - Union ~~ Flatten
>
>  - ReduceStateByKey ~~ stateful DoFn, GBK, Combine, Window
>

Stateful DoFn does not require associative or commutative operation, while
reduce/combine does. Windowing is really just a secondary key for
GBK/Combine that allows completion of unbounded aggregations but has no
computation associated with it.



>  - (missing Impulse)
>

Then you must have some primitive sources with splitting?


>  - (missing splittable DoFn)
>

Kind of the same question - SDF is the one and only primitive that creates
parallelism.

The ReduceStateByKey is a transform that is a "combinable stateful DoFn" -
> i.e. the state might be created pre-shuffle, on trigger the state is
> shuffled and then merged. In Beam we already have CombiningState and
> MergingState facility (sort of), which is what is needed, we just do not
> have the ability to shuffle the partial states and then combine them. This
> also relates to the inability to run stateful DoFn for merging windowFns,
> because that is needed there as well. Is this something that is
> fundamentally impossible to define for all runners? What is worth noting is
> that building, shuffling and merging the state before shuffle requires
> compatible trigger (purely based on watermark), otherwise the transform
> fall-backs to "classical DoFn".
>

Stateful DoFn for merging windows can be defined. You could require all
state to be mergeable and then it is automatic. Or you could have an
"onMerge" callback. These should both be fine. The automatic version is
less likely to have nonsensical semantics, but allowing the callback to do
"whatever it wants" whether the result is good or not is more consistent
with the design of stateful DoFn.

Whether and where a shuffle takes place may vary. Start with the maths.

Kenn


> Bottom line: I'm thinking of proposing to drop Euphoria extension, because
> it has essentially no users and actually no maintainers, but I have a
> feeling there is a value in the set of operators that could be transferred
> to Beam core, maybe. I'm pretty sure it would bring value to users to have
> access to a "combining stateful DoFn" primitive (even better would be
> "combining splittable DoFn").
>
> Looking forward to any comments on this.
>
>  Jan
>
>
>

Reply via email to