I think we stated that CoGroupbyKey was also a primitive, though in practice it's implemented in terms of GroupByKey today.
On Fri, Oct 21, 2022 at 3:05 PM Kenneth Knowles <k...@apache.org> wrote: > > > On Fri, Oct 21, 2022 at 5:24 AM Jan Lukavský <je...@seznam.cz> wrote: > >> Hi, >> >> I have some missing pieces in my understanding of the set of Beam's >> primitive transforms, which I'd like to fill. First a quick recap of what I >> think is the current state. We have (basically) the following primitive >> transforms: >> >> - DoFn (stateless, stateful, splittable) >> >> - Window >> >> - Impulse >> >> - GroupByKey >> >> - Combine >> > > Not a primitive, just a well-defined transform that runners can execute in > special ways. > > >> >> >> - Flatten (pCollections) >> > > The rest, yes. > > > >> Inside runners, we most often transform GBK into ReduceFn >> (ReduceFnRunner), which does the actual logic for both GBK and stateful >> DoFn. >> > > ReduceFnRunner is for windowing / triggers and has special feature to use > a CombineFn while doing it. Nothing to do with stateful DoFn. > > > >> I'll compare this to the set of transforms we used to use in Euphoria >> (currently java SDK extension): >> >> - FlatMap ~~ stateless DoFn >> >> - Union ~~ Flatten >> >> - ReduceStateByKey ~~ stateful DoFn, GBK, Combine, Window >> > > Stateful DoFn does not require associative or commutative operation, while > reduce/combine does. Windowing is really just a secondary key for > GBK/Combine that allows completion of unbounded aggregations but has no > computation associated with it. > > > >> - (missing Impulse) >> > > Then you must have some primitive sources with splitting? > > >> - (missing splittable DoFn) >> > > Kind of the same question - SDF is the one and only primitive that creates > parallelism. > > The ReduceStateByKey is a transform that is a "combinable stateful DoFn" - >> i.e. the state might be created pre-shuffle, on trigger the state is >> shuffled and then merged. In Beam we already have CombiningState and >> MergingState facility (sort of), which is what is needed, we just do not >> have the ability to shuffle the partial states and then combine them. This >> also relates to the inability to run stateful DoFn for merging windowFns, >> because that is needed there as well. Is this something that is >> fundamentally impossible to define for all runners? What is worth noting is >> that building, shuffling and merging the state before shuffle requires >> compatible trigger (purely based on watermark), otherwise the transform >> fall-backs to "classical DoFn". >> > > Stateful DoFn for merging windows can be defined. You could require all > state to be mergeable and then it is automatic. Or you could have an > "onMerge" callback. These should both be fine. The automatic version is > less likely to have nonsensical semantics, but allowing the callback to do > "whatever it wants" whether the result is good or not is more consistent > with the design of stateful DoFn. > > Whether and where a shuffle takes place may vary. Start with the maths. > > Kenn > > >> Bottom line: I'm thinking of proposing to drop Euphoria extension, >> because it has essentially no users and actually no maintainers, but I have >> a feeling there is a value in the set of operators that could be >> transferred to Beam core, maybe. I'm pretty sure it would bring value to >> users to have access to a "combining stateful DoFn" primitive (even better >> would be "combining splittable DoFn"). >> >> Looking forward to any comments on this. >> >> Jan >> >> >>