Hi,
during my implementation of @RequiresTimeSortedInput I found out, that
current runners do not support stateful DoFns on merging windows [1].
The question is why is that? One obvious reason seems to be, that
current definition of StateSpec doesn't define a state merge function,
which is necessary for merging windows to work. Actually, on Euphoria
[2] (originated separately, now merged in Beam) we had only two basic
operators (PTransforms) - FlatMap (stateless ParDo) and an operation we
called ReduceStateByKey [3] (not merged into Beam, as there were some
difficulties, one of which might be the missing support for merging
windows). All the others operations could be derived from these two. The
ReduceStateByKey (RSBK) operator was keyed operator (shuffle) with
following user defined function:
- state factory (roughly equivalent to declaring a StateSpec)
- state merge function (State1 + State2 = State3)
- state update function (State + Value = new State)
- and state extract function (State -> Output) -- actually called
after each element was added to the state
Now if you look at this, this is essentially both Beam's Combine
operator and stateful ParDo. Moreove, GroupByKey is just RSBK with
BagState and append merge function. So, a question that come to mind,
what would happen if we add state merge function to StateSpec? I think
it can have the following benefits:
- Both Combine and GroupByKey can become derived operators (this is no
breaking change, as runners are always free to provide their override to
any PTransform)
- in batch, stateful ParDo can now be implemented more efficiently,
using Combine operation (as long, as doesn't @RequiresTimeSortedInput,
which is my favourite :))
- even in stream a combining approach to stateful pardo would be
possible (provided trigger would be AfterWatermark with no early
firings, and there will be no user timers)
- there is still a problem with merging windows on stateful DoFns,
which is early firings in general (that needs retractions, which is what
we first hit here [4], and solved by disabling early emitting from
merging windows)
I'd really like to hear any comments on this.
Jan
[1]
https://github.com/apache/beam/blob/1992cde69343b6e8bb5eea537182af3d036d155d/runners/core-java/src/main/java/org/apache/beam/runners/core/StatefulDoFnRunner.java#L71
[2] https://github.com/apache/beam/tree/master/sdks/java/extensions/euphoria
[3]
https://github.com/seznam/euphoria/blob/master/euphoria-core/src/main/java/cz/seznam/euphoria/core/client/operator/ReduceStateByKey.java
[4] https://github.com/seznam/euphoria/issues/43