On Mon, Oct 24, 2022 at 5:50 AM Jan Lukavský <je...@seznam.cz> wrote:
> On 10/22/22 21:47, Reuven Lax via dev wrote: > > I think we stated that CoGroupbyKey was also a primitive, though in > practice it's implemented in terms of GroupByKey today. > > On Fri, Oct 21, 2022 at 3:05 PM Kenneth Knowles <k...@apache.org> wrote: > >> >> >> On Fri, Oct 21, 2022 at 5:24 AM Jan Lukavský <je...@seznam.cz> wrote: >> >>> Hi, >>> >>> I have some missing pieces in my understanding of the set of Beam's >>> primitive transforms, which I'd like to fill. First a quick recap of what I >>> think is the current state. We have (basically) the following primitive >>> transforms: >>> >>> - DoFn (stateless, stateful, splittable) >>> >>> - Window >>> >>> - Impulse >>> >>> - GroupByKey >>> >>> - Combine >>> >> >> Not a primitive, just a well-defined transform that runners can execute >> in special ways. >> > Yep, OK, agree. Performance is orthogonal to semantics. > > >> >>> >>> >>> - Flatten (pCollections) >>> >> >> The rest, yes. >> >> >> >>> Inside runners, we most often transform GBK into ReduceFn >>> (ReduceFnRunner), which does the actual logic for both GBK and stateful >>> DoFn. >>> >> >> ReduceFnRunner is for windowing / triggers and has special feature to use >> a CombineFn while doing it. Nothing to do with stateful DoFn. >> > My bad, wrong wording. The point was that *all* of the semantics of GBK > and Combine can be defined in terms of stateful DoFn. There are some > changes needed to stateful DoFn to support the Combine functionality. But > as mentioned above - optimization is orthogonal to semantics. > Yes, though we would need Multimap state to do it properly, which isn't yet available on all runners. (You could model it _very_ inefficiently with BagState, but that would be quite bad) > >> >> >>> I'll compare this to the set of transforms we used to use in Euphoria >>> (currently java SDK extension): >>> >>> - FlatMap ~~ stateless DoFn >>> >>> - Union ~~ Flatten >>> >>> - ReduceStateByKey ~~ stateful DoFn, GBK, Combine, Window >>> >> >> Stateful DoFn does not require associative or commutative operation, >> while reduce/combine does. Windowing is really just a secondary key for >> GBK/Combine that allows completion of unbounded aggregations but has no >> computation associated with it. >> > Merging WindowFn contains some computation. The fact that stateful DoFn do > not require specific form of reduce function is precisely what makes it the > actual primitive, no? > > >> >> >>> - (missing Impulse) >>> >> >> Then you must have some primitive sources with splitting? >> >> >>> - (missing splittable DoFn) >>> >> >> Kind of the same question - SDF is the one and only primitive that >> creates parallelism. >> > Original Euphoria had an analogy to (Un)boundedReader. The SDK extension > in Beam works on top of PCollecions and therefore does not deal with IOs. > > >> The ReduceStateByKey is a transform that is a "combinable stateful DoFn" >>> - i.e. the state might be created pre-shuffle, on trigger the state is >>> shuffled and then merged. In Beam we already have CombiningState and >>> MergingState facility (sort of), which is what is needed, we just do not >>> have the ability to shuffle the partial states and then combine them. This >>> also relates to the inability to run stateful DoFn for merging windowFns, >>> because that is needed there as well. Is this something that is >>> fundamentally impossible to define for all runners? What is worth noting is >>> that building, shuffling and merging the state before shuffle requires >>> compatible trigger (purely based on watermark), otherwise the transform >>> fall-backs to "classical DoFn". >>> >> >> Stateful DoFn for merging windows can be defined. You could require all >> state to be mergeable and then it is automatic. Or you could have an >> "onMerge" callback. These should both be fine. The automatic version is >> less likely to have nonsensical semantics, but allowing the callback to do >> "whatever it wants" whether the result is good or not is more consistent >> with the design of stateful DoFn. >> > Yes, but this is the same for CombineFn, right? The merge (or combine) has > to be correctly aligned with the computation. The current situation is that > we do not support stateful DoFns for merging WindowFn [1]. > > >> Whether and where a shuffle takes place may vary. Start with the maths. >> > Shuffle happens at least whenever there is a need to regroup keys. I'm not > sure which maths you refer to, can you clarify please? > > Jan > > [1] > https://github.com/apache/beam/blob/45b6ac71a87bb2ed83613c90d35ef2d0752266bf/runners/core-java/src/main/java/org/apache/beam/runners/core/StatefulDoFnRunner.java#L106 > > >> Kenn >> >> >>> Bottom line: I'm thinking of proposing to drop Euphoria extension, >>> because it has essentially no users and actually no maintainers, but I have >>> a feeling there is a value in the set of operators that could be >>> transferred to Beam core, maybe. I'm pretty sure it would bring value to >>> users to have access to a "combining stateful DoFn" primitive (even better >>> would be "combining splittable DoFn"). >>> >>> Looking forward to any comments on this. >>> >>> Jan >>> >>> >>>