Hi Robert,
yes, performance of runners that do not use sort-merge grouping strategy
would be affected by (1) (and that's why I proposed the ability to
opt-out). The argument of testing could be resolved, because there is no
need for test runners (local probably) to adopt the zero lateness
(sorted) policy. It is actually only a sort of optimization that is
needed only in (certain) large scale cases. Test runners would still be
free to actually sort by time in ascending order, if they choose to
(because fully streaming semantics can be used there, and ascending
order falls in category "undefined" :)).
There could be one more solution - only runners that actually use
sort-merge grouping could do the sorting. There is nothing that would
stop them, but then we probably would have two groups of runners - one
that sort (and fire timers with the ordered elements, that's critical in
this case!) and ones that don't. I would prefer a more consistent approach.
So, if there is a price to pay for (1) (and there is) - then (2) or (3)
seems to be better to me as well.
As I have already posted, there is a PR [1] that actually implements
(2). Would you help me review it?
Jan
[1] https://github.com/apache/beam/pull/8774
On 11/13/19 2:24 AM, Robert Bradshaw wrote:
One concern with (1) is that it may not be cheap to do for all
runners. There also seems to be the implication that in batch elements
would be 100% in order but in streaming kind-of-in-order is OK, which
would lead to pipelines being developed/tested against stronger
guarantees than are generally provided in a streaming system. It also
means batch and streaming have different semantics, not just different
runtime characteristics, etc. (Note also that for streaming the
out-of-order limits are essentially unbounded as well, but if you fall
"too far" behind you generally have other problems so in practice it's
OK for a "healthy" pipeline.)
I think (2) is the most consistent, as we can't meaningfully limit the
amount of unboundedness to say a particular runner (or mode) has
violated it.
On Tue, Nov 12, 2019 at 1:36 AM Jan Lukavský <[email protected]> wrote:
Hi,
this is follow up of multiple threads covering the topic of how to (in a
unified way) process event streams. Event streams can be characterized
by a common property that ordering of events matter. The processing
(usually) looks something like
unordered stream -> buffer (per key) -> ordered stream -> stateful
logic (DoFn)
This is perfectly fine and can be solved by current tools Beam offers
(state & timers), but *only for streaming case*. The batch case is
essentially broken, because:
a) out-of-orderness is essentially *unbounded* (as opposed to input
being bounded, strangely, that is not a contradiction), out-of-orderness
in streaming case is *bounded*, because the watermark can fall behind
only limit amount of time (sooner or later, nobody would actually care
about results from streaming pipeline being months or years late, right?)
b) with unbounded out-of-orderness, the spatial requirements of state
grow with O(N), worst case, where N is size of the whole input
c) moreover, many runners restrict the size of state per key to fit in
memory (spark, flink)
Now, solutions to this problems seem to be:
1) refine the model guarantees for batch stateful processing, so that
we limit the out-of-orderness (the source of issues here) - the only
reasonable way to do that is to enforce sorting before all stateful
dofns in batch case (perhaps there might opt-out for that), or
2) define a way to mark stateful dofn as requiring the sorting (e.g.
@RequiresTimeSortedInput) - note this has to be done for both batch and
streaming case, as opposed to 1), or
3) define a different URN for "ordered stateful dofn", with default
expansion using state as buffer (for both batch and streaming case) -
that way this can be overridden in batch runners that can get into
trouble otherwise (and could be regarded as sort of natural extension of
the current approach).
I still think that the best solution is 1), for multiple reasons going
from being internally logically consistent to being practical and easily
implemented (a few lines of code in flink's case for instance). On the
other hand, if this is really not what we want to do, then I'd like to
know the community's opinion on the two other options (or, if there
maybe is some other option I didn't cover).
Many thanks for opinions and help with fixing what is (sort of) broken
right now.
Jan