I somehow missed these answers, Reuven and Kenn, thanks for the discussion, it helped me clarify my understanding.

 Jan

On 10/26/22 21:10, Kenneth Knowles wrote:


On Tue, Oct 25, 2022 at 5:53 AM Jan Lukavský <je...@seznam.cz> wrote:

    > Not quite IMO. It is a subtle difference. Perhaps these
    transforms can be *implemented* using stateful DoFn, but defining
    their semantics directly at a high level is more powerful. The
    higher level we can make transforms, the more flexibility we have
    in the runners. You *could* suggest that we take the same approach
    as we do with Combine: not a primitive, but a special transform
    that we optimize. You could say that "vanilla ParDo" is a
    composite that has a stateful ParDo implementation, but a runner
    can implement the composite more efficiently (without a shuffle).
    Same with CoGBK. You could say that there is a default expansion
    of CoGBK that uses stateful DoFn (which implies a shuffle) but
    that smart runners will not use that expansion.

    Yes, semantics > optimizations. For optimizations Beam already has
    a facility - PTransformOverride. There is no fundamental
    difference about how we treat Combine wrt GBK. It *can* be
    expanded using GBK, but "smart runners will not use that
    expansion". This is essentially the root of this discussion.

    If I rephrase it:

     a) why do we distinguish between "some" actually composite
    transforms treating them as primitive, while others have
    expansions, although the fundamental reasoning seems the same for
    both (performance)?

It is identical to why you can choose different axioms for formal logic and get all the same provable statements. You have to choose something. But certainly a runner that just executes primitives is the bare minimum and all runners are really expected to take advantage of known composites. Before portability, the benefit was minimal to have the runner (written in Java) execute a transform directly vs calling a user DoFn. Now with portability it could be huge if it avoids a Fn API crossing.

     b) is there a fundamental reason why we do not support stateful
    DoFn for merging windows?

No reason. The original design was to force users to only use "mergeable" state in a stateful DoFn for merging windows. That is an annoying restriction that we don't really need. So I think the best way is to have an OnMerge callback. The internal legacy Java APIs for this are way too complex. But portability wire protocols support it (I think?) and making a good user facing API for all the SDKs shouldn't be too hard.

Kenn

    I feel that these are related and have historical reasons, but I'd
    like to know that for sure. :)

     Jan

    On 10/24/22 19:59, Kenneth Knowles wrote:


    On Mon, Oct 24, 2022 at 5:51 AM Jan Lukavský <je...@seznam.cz> wrote:

        On 10/22/22 21:47, Reuven Lax via dev wrote:
        I think we stated that CoGroupbyKey was also a primitive,
        though in practice it's implemented in terms of GroupByKey
        today.

        On Fri, Oct 21, 2022 at 3:05 PM Kenneth Knowles
        <k...@apache.org> wrote:



            On Fri, Oct 21, 2022 at 5:24 AM Jan Lukavský
            <je...@seznam.cz> wrote:

                Hi,

                I have some missing pieces in my understanding of
                the set of Beam's primitive transforms, which I'd
                like to fill. First a quick recap of what I think is
                the current state. We have (basically) the following
                primitive transforms:

                 - DoFn (stateless, stateful, splittable)

                 - Window

                 - Impulse

                 - GroupByKey

                 - Combine


            Not a primitive, just a well-defined transform that
            runners can execute in special ways.

        Yep, OK, agree. Performance is orthogonal to semantics.



                 - Flatten (pCollections)


            The rest, yes.

                Inside runners, we most often transform GBK into
                ReduceFn (ReduceFnRunner), which does the actual
                logic for both GBK and stateful DoFn.


            ReduceFnRunner is for windowing / triggers and has
            special feature to use a CombineFn while doing it.
            Nothing to do with stateful DoFn.

        My bad, wrong wording. The point was that *all* of the
        semantics of GBK and Combine can be defined in terms of
        stateful DoFn. There are some changes needed to stateful DoFn
        to support the Combine functionality. But as mentioned above
        - optimization is orthogonal to semantics.


    Not quite IMO. It is a subtle difference. Perhaps these
    transforms can be *implemented* using stateful DoFn, but defining
    their semantics directly at a high level is more powerful. The
    higher level we can make transforms, the more flexibility we have
    in the runners. You *could* suggest that we take the same
    approach as we do with Combine: not a primitive, but a special
    transform that we optimize. You could say that "vanilla ParDo" is
    a composite that has a stateful ParDo implementation, but a
    runner can implement the composite more efficiently (without a
    shuffle). Same with CoGBK. You could say that there is a default
    expansion of CoGBK that uses stateful DoFn (which implies a
    shuffle) but that smart runners will not use that expansion.

                I'll compare this to the set of transforms we used
                to use in Euphoria (currently java SDK extension):

                 - FlatMap ~~ stateless DoFn

                 - Union ~~ Flatten

                 - ReduceStateByKey ~~ stateful DoFn, GBK, Combine,
                Window


            Stateful DoFn does not require associative or
            commutative operation, while reduce/combine does.
            Windowing is really just a secondary key for GBK/Combine
            that allows completion of unbounded aggregations but has
            no computation associated with it.

        Merging WindowFn contains some computation. The fact that
        stateful DoFn do not require specific form of reduce function
        is precisely what makes it the actual primitive, no?


                 - (missing Impulse)


            Then you must have some primitive sources with splitting?

                 - (missing splittable DoFn)


            Kind of the same question - SDF is the one and only
            primitive that creates parallelism.

        Original Euphoria had an analogy to (Un)boundedReader. The
        SDK extension in Beam works on top of PCollecions and
        therefore does not deal with IOs.


                The ReduceStateByKey is a transform that is a
                "combinable stateful DoFn" - i.e. the state might be
                created pre-shuffle, on trigger the state is
                shuffled and then merged. In Beam we already have
                CombiningState and MergingState facility (sort of),
                which is what is needed, we just do not have the
                ability to shuffle the partial states and then
                combine them. This also relates to the inability to
                run stateful DoFn for merging windowFns, because
                that is needed there as well. Is this something that
                is fundamentally impossible to define for all
                runners? What is worth noting is that building,
                shuffling and merging the state before shuffle
                requires compatible trigger (purely based on
                watermark), otherwise the transform fall-backs to
                "classical DoFn".


            Stateful DoFn for merging windows can be defined. You
            could require all state to be mergeable and then it is
            automatic. Or you could have an "onMerge" callback.
            These should both be fine. The automatic version is less
            likely to have nonsensical semantics, but allowing the
            callback to do "whatever it wants" whether the result is
            good or not is more consistent with the design of
            stateful DoFn.

        Yes, but this is the same for CombineFn, right? The merge (or
        combine) has to be correctly aligned with the computation.
        The current situation is that we do not support stateful
        DoFns for merging WindowFn [1].


            Whether and where a shuffle takes place may vary. Start
            with the maths.

        Shuffle happens at least whenever there is a need to regroup
        keys. I'm not sure which maths you refer to, can you clarify
        please?

         Jan

        [1]
        
https://github.com/apache/beam/blob/45b6ac71a87bb2ed83613c90d35ef2d0752266bf/runners/core-java/src/main/java/org/apache/beam/runners/core/StatefulDoFnRunner.java#L106


            Kenn

                Bottom line: I'm thinking of proposing to drop
                Euphoria extension, because it has essentially no
                users and actually no maintainers, but I have a
                feeling there is a value in the set of operators
                that could be transferred to Beam core, maybe. I'm
                pretty sure it would bring value to users to have
                access to a "combining stateful DoFn" primitive
                (even better would be "combining splittable DoFn").

                Looking forward to any comments on this.

                 Jan

Reply via email to