On Tue, Oct 25, 2022 at 5:53 AM Jan Lukavský <je...@seznam.cz> wrote:
> > Not quite IMO. It is a subtle difference. Perhaps these transforms can > be *implemented* using stateful DoFn, but defining their semantics directly > at a high level is more powerful. The higher level we can make transforms, > the more flexibility we have in the runners. You *could* suggest that we > take the same approach as we do with Combine: not a primitive, but a > special transform that we optimize. You could say that "vanilla ParDo" is a > composite that has a stateful ParDo implementation, but a runner can > implement the composite more efficiently (without a shuffle). Same with > CoGBK. You could say that there is a default expansion of CoGBK that uses > stateful DoFn (which implies a shuffle) but that smart runners will not use > that expansion. > > Yes, semantics > optimizations. For optimizations Beam already has a > facility - PTransformOverride. There is no fundamental difference about how > we treat Combine wrt GBK. It *can* be expanded using GBK, but "smart > runners will not use that expansion". This is essentially the root of this > discussion. > > If I rephrase it: > > a) why do we distinguish between "some" actually composite transforms > treating them as primitive, while others have expansions, although the > fundamental reasoning seems the same for both (performance)? > > b) is there a fundamental reason why we do not support stateful DoFn for > merging windows? > Mostly because we would need the API to include a merge capability, and that has never been implemented. > I feel that these are related and have historical reasons, but I'd like to > know that for sure. :) > > Jan > On 10/24/22 19:59, Kenneth Knowles wrote: > > > > On Mon, Oct 24, 2022 at 5:51 AM Jan Lukavský <je...@seznam.cz> wrote: > >> On 10/22/22 21:47, Reuven Lax via dev wrote: >> >> I think we stated that CoGroupbyKey was also a primitive, though in >> practice it's implemented in terms of GroupByKey today. >> >> On Fri, Oct 21, 2022 at 3:05 PM Kenneth Knowles <k...@apache.org> wrote: >> >>> >>> >>> On Fri, Oct 21, 2022 at 5:24 AM Jan Lukavský <je...@seznam.cz> wrote: >>> >>>> Hi, >>>> >>>> I have some missing pieces in my understanding of the set of Beam's >>>> primitive transforms, which I'd like to fill. First a quick recap of what I >>>> think is the current state. We have (basically) the following primitive >>>> transforms: >>>> >>>> - DoFn (stateless, stateful, splittable) >>>> >>>> - Window >>>> >>>> - Impulse >>>> >>>> - GroupByKey >>>> >>>> - Combine >>>> >>> >>> Not a primitive, just a well-defined transform that runners can execute >>> in special ways. >>> >> Yep, OK, agree. Performance is orthogonal to semantics. >> >> >>> >>>> >>>> >>>> - Flatten (pCollections) >>>> >>> >>> The rest, yes. >>> >>> >>> >>>> Inside runners, we most often transform GBK into ReduceFn >>>> (ReduceFnRunner), which does the actual logic for both GBK and stateful >>>> DoFn. >>>> >>> >>> ReduceFnRunner is for windowing / triggers and has special feature to >>> use a CombineFn while doing it. Nothing to do with stateful DoFn. >>> >> My bad, wrong wording. The point was that *all* of the semantics of GBK >> and Combine can be defined in terms of stateful DoFn. There are some >> changes needed to stateful DoFn to support the Combine functionality. But >> as mentioned above - optimization is orthogonal to semantics. >> > > Not quite IMO. It is a subtle difference. Perhaps these transforms can be > *implemented* using stateful DoFn, but defining their semantics directly at > a high level is more powerful. The higher level we can make transforms, the > more flexibility we have in the runners. You *could* suggest that we take > the same approach as we do with Combine: not a primitive, but a special > transform that we optimize. You could say that "vanilla ParDo" is a > composite that has a stateful ParDo implementation, but a runner can > implement the composite more efficiently (without a shuffle). Same with > CoGBK. You could say that there is a default expansion of CoGBK that uses > stateful DoFn (which implies a shuffle) but that smart runners will not use > that expansion. > >> >>> >>> >>>> I'll compare this to the set of transforms we used to use in Euphoria >>>> (currently java SDK extension): >>>> >>>> - FlatMap ~~ stateless DoFn >>>> >>>> - Union ~~ Flatten >>>> >>>> - ReduceStateByKey ~~ stateful DoFn, GBK, Combine, Window >>>> >>> >>> Stateful DoFn does not require associative or commutative operation, >>> while reduce/combine does. Windowing is really just a secondary key for >>> GBK/Combine that allows completion of unbounded aggregations but has no >>> computation associated with it. >>> >> Merging WindowFn contains some computation. The fact that stateful DoFn >> do not require specific form of reduce function is precisely what makes it >> the actual primitive, no? >> >> >>> >>> >>>> - (missing Impulse) >>>> >>> >>> Then you must have some primitive sources with splitting? >>> >>> >>>> - (missing splittable DoFn) >>>> >>> >>> Kind of the same question - SDF is the one and only primitive that >>> creates parallelism. >>> >> Original Euphoria had an analogy to (Un)boundedReader. The SDK extension >> in Beam works on top of PCollecions and therefore does not deal with IOs. >> >> >>> The ReduceStateByKey is a transform that is a "combinable stateful DoFn" >>>> - i.e. the state might be created pre-shuffle, on trigger the state is >>>> shuffled and then merged. In Beam we already have CombiningState and >>>> MergingState facility (sort of), which is what is needed, we just do not >>>> have the ability to shuffle the partial states and then combine them. This >>>> also relates to the inability to run stateful DoFn for merging windowFns, >>>> because that is needed there as well. Is this something that is >>>> fundamentally impossible to define for all runners? What is worth noting is >>>> that building, shuffling and merging the state before shuffle requires >>>> compatible trigger (purely based on watermark), otherwise the transform >>>> fall-backs to "classical DoFn". >>>> >>> >>> Stateful DoFn for merging windows can be defined. You could require all >>> state to be mergeable and then it is automatic. Or you could have an >>> "onMerge" callback. These should both be fine. The automatic version is >>> less likely to have nonsensical semantics, but allowing the callback to do >>> "whatever it wants" whether the result is good or not is more consistent >>> with the design of stateful DoFn. >>> >> Yes, but this is the same for CombineFn, right? The merge (or combine) >> has to be correctly aligned with the computation. The current situation is >> that we do not support stateful DoFns for merging WindowFn [1]. >> >> >>> Whether and where a shuffle takes place may vary. Start with the maths. >>> >> Shuffle happens at least whenever there is a need to regroup keys. I'm >> not sure which maths you refer to, can you clarify please? >> >> Jan >> >> [1] >> https://github.com/apache/beam/blob/45b6ac71a87bb2ed83613c90d35ef2d0752266bf/runners/core-java/src/main/java/org/apache/beam/runners/core/StatefulDoFnRunner.java#L106 >> >> >>> Kenn >>> >>> >>>> Bottom line: I'm thinking of proposing to drop Euphoria extension, >>>> because it has essentially no users and actually no maintainers, but I have >>>> a feeling there is a value in the set of operators that could be >>>> transferred to Beam core, maybe. I'm pretty sure it would bring value to >>>> users to have access to a "combining stateful DoFn" primitive (even better >>>> would be "combining splittable DoFn"). >>>> >>>> Looking forward to any comments on this. >>>> >>>> Jan >>>> >>>> >>>>