Understood. I think I can put down a design document of API changes that would be required to implement this. Unfortunately, I don't have capacity to implement this myself (mostly because I actually don't have a use case for that). This discussion was meant to be a kind of theoretical exercise if we can find benefits of viewing Combine and GroupByKey as a special case of stateful DoFns. I think there are two direct consequences of that:

 a) it might be a little easier to implement new runner (as it might suffice to implement stateless and stateful ParDos to get "at least something up and running")

 b) it opens some optimizations - e.g. using combineByKey in batch to implement stateful (unordered) pardo, probably even on streaming it would be possible to push some calculations before the shuffle (timers would trigger state shuffle and merge downstream)

Maybe we can find even more benefits. Would this be something worth exploring? Would anyone be interested in this?

Jan

On 7/1/19 11:51 PM, Reuven Lax wrote:
The problem is that it's not obvious how to merge timers. For any obvious strategy, I think I can imagine a use case where it will be wrong.

I'm leaning to the conclusion that we're much better off providing an onMerge callback, and letting the user explicitly handle the merging there.

Reuven

On Mon, Jul 1, 2019 at 1:04 AM Jan Lukavský <[email protected] <mailto:[email protected]>> wrote:

    What are the issues you see with merging timers? Does it "only"
    suffer from the same issue as early emitting from merging windows
    (i.e. needs retractions to work correctly), or are there some
    other issues?

    On 6/28/19 3:48 PM, Reuven Lax wrote:
    We do have merging state. However merging timers is a bit more
    awkward. I now tend to think that we're better off providing an
    onMerge function and let the user handle this.

    On Fri, Jun 28, 2019, 11:06 AM Jan Lukavský <[email protected]
    <mailto:[email protected]>> wrote:

        Hi,

        during my implementation of @RequiresTimeSortedInput I found
        out, that
        current runners do not support stateful DoFns on merging
        windows [1].
        The question is why is that? One obvious reason seems to be,
        that
        current definition of StateSpec doesn't define a state merge
        function,
        which is necessary for merging windows to work. Actually, on
        Euphoria
        [2] (originated separately, now merged in Beam) we had only
        two basic
        operators (PTransforms) - FlatMap (stateless ParDo) and an
        operation we
        called ReduceStateByKey [3] (not merged into Beam, as there
        were some
        difficulties, one of which might be the missing support for
        merging
        windows). All the others operations could be derived from
        these two. The
        ReduceStateByKey (RSBK) operator was keyed operator (shuffle)
        with
        following user defined function:

          - state factory (roughly equivalent to declaring a StateSpec)

          - state merge function (State1 + State2 = State3)

          - state update function (State + Value = new State)

          - and state extract function (State -> Output) -- actually
        called
        after each element was added to the state

        Now if you look at this, this is essentially both Beam's Combine
        operator and stateful ParDo. Moreove, GroupByKey is just RSBK
        with
        BagState and append merge function. So, a question that come
        to mind,
        what would happen if we add state merge function to
        StateSpec? I think
        it can have the following benefits:

          - Both Combine and GroupByKey can become derived operators
        (this is no
        breaking change, as runners are always free to provide their
        override to
        any PTransform)

          - in batch, stateful ParDo can now be implemented more
        efficiently,
        using Combine operation (as long, as doesn't
        @RequiresTimeSortedInput,
        which is my favourite :))

          - even in stream a combining approach to stateful pardo
        would be
        possible (provided trigger would be AfterWatermark with no early
        firings, and there will be no user timers)

          - there is still a problem with merging windows on stateful
        DoFns,
        which is early firings in general (that needs retractions,
        which is what
        we first hit here [4], and solved by disabling early emitting
        from
        merging windows)

        I'd really like to hear any comments on this.

        Jan

        [1]
        
https://github.com/apache/beam/blob/1992cde69343b6e8bb5eea537182af3d036d155d/runners/core-java/src/main/java/org/apache/beam/runners/core/StatefulDoFnRunner.java#L71

        [2]
        https://github.com/apache/beam/tree/master/sdks/java/extensions/euphoria

        [3]
        
https://github.com/seznam/euphoria/blob/master/euphoria-core/src/main/java/cz/seznam/euphoria/core/client/operator/ReduceStateByKey.java

        [4] https://github.com/seznam/euphoria/issues/43

Reply via email to