We do have merging state. However merging timers is a bit more awkward. I
now tend to think that we're better off providing an onMerge function and
let the user handle this.

On Fri, Jun 28, 2019, 11:06 AM Jan Lukavský <[email protected]> wrote:

> Hi,
>
> during my implementation of @RequiresTimeSortedInput I found out, that
> current runners do not support stateful DoFns on merging windows [1].
> The question is why is that? One obvious reason seems to be, that
> current definition of StateSpec doesn't define a state merge function,
> which is necessary for merging windows to work. Actually, on Euphoria
> [2] (originated separately, now merged in Beam) we had only two basic
> operators (PTransforms) - FlatMap (stateless ParDo) and an operation we
> called ReduceStateByKey [3] (not merged into Beam, as there were some
> difficulties, one of which might be the missing support for merging
> windows). All the others operations could be derived from these two. The
> ReduceStateByKey (RSBK) operator was keyed operator (shuffle) with
> following user defined function:
>
>   - state factory (roughly equivalent to declaring a StateSpec)
>
>   - state merge function (State1 + State2 = State3)
>
>   - state update function (State + Value = new State)
>
>   - and state extract function (State -> Output) -- actually called
> after each element was added to the state
>
> Now if you look at this, this is essentially both Beam's Combine
> operator and stateful ParDo. Moreove, GroupByKey is just RSBK with
> BagState and append merge function. So, a question that come to mind,
> what would happen if we add state merge function to StateSpec? I think
> it can have the following benefits:
>
>   - Both Combine and GroupByKey can become derived operators (this is no
> breaking change, as runners are always free to provide their override to
> any PTransform)
>
>   - in batch, stateful ParDo can now be implemented more efficiently,
> using Combine operation (as long, as doesn't @RequiresTimeSortedInput,
> which is my favourite :))
>
>   - even in stream a combining approach to stateful pardo would be
> possible (provided trigger would be AfterWatermark with no early
> firings, and there will be no user timers)
>
>   - there is still a problem with merging windows on stateful DoFns,
> which is early firings in general (that needs retractions, which is what
> we first hit here [4], and solved by disabling early emitting from
> merging windows)
>
> I'd really like to hear any comments on this.
>
> Jan
>
> [1]
>
> https://github.com/apache/beam/blob/1992cde69343b6e8bb5eea537182af3d036d155d/runners/core-java/src/main/java/org/apache/beam/runners/core/StatefulDoFnRunner.java#L71
>
> [2]
> https://github.com/apache/beam/tree/master/sdks/java/extensions/euphoria
>
> [3]
>
> https://github.com/seznam/euphoria/blob/master/euphoria-core/src/main/java/cz/seznam/euphoria/core/client/operator/ReduceStateByKey.java
>
> [4] https://github.com/seznam/euphoria/issues/43
>
>

Reply via email to