Hi,
this is follow up of multiple threads covering the topic of how to (in a
unified way) process event streams. Event streams can be characterized
by a common property that ordering of events matter. The processing
(usually) looks something like
unordered stream -> buffer (per key) -> ordered stream -> stateful
logic (DoFn)
This is perfectly fine and can be solved by current tools Beam offers
(state & timers), but *only for streaming case*. The batch case is
essentially broken, because:
a) out-of-orderness is essentially *unbounded* (as opposed to input
being bounded, strangely, that is not a contradiction), out-of-orderness
in streaming case is *bounded*, because the watermark can fall behind
only limit amount of time (sooner or later, nobody would actually care
about results from streaming pipeline being months or years late, right?)
b) with unbounded out-of-orderness, the spatial requirements of state
grow with O(N), worst case, where N is size of the whole input
c) moreover, many runners restrict the size of state per key to fit in
memory (spark, flink)
Now, solutions to this problems seem to be:
1) refine the model guarantees for batch stateful processing, so that
we limit the out-of-orderness (the source of issues here) - the only
reasonable way to do that is to enforce sorting before all stateful
dofns in batch case (perhaps there might opt-out for that), or
2) define a way to mark stateful dofn as requiring the sorting (e.g.
@RequiresTimeSortedInput) - note this has to be done for both batch and
streaming case, as opposed to 1), or
3) define a different URN for "ordered stateful dofn", with default
expansion using state as buffer (for both batch and streaming case) -
that way this can be overridden in batch runners that can get into
trouble otherwise (and could be regarded as sort of natural extension of
the current approach).
I still think that the best solution is 1), for multiple reasons going
from being internally logically consistent to being practical and easily
implemented (a few lines of code in flink's case for instance). On the
other hand, if this is really not what we want to do, then I'd like to
know the community's opinion on the two other options (or, if there
maybe is some other option I didn't cover).
Many thanks for opinions and help with fixing what is (sort of) broken
right now.
Jan