Hi,
this is starting to be really exciting. It seems to me that there is
either something wrong with my definition of "Unified model" or with how
it is implemented inside (at least) Direct and Flink Runners.
So, first what I see as properties of Unified model:
a) streaming semantics (i.e. what I can express using Transforms) are
subset of batch semantics
- this is true, batch semantics and streaming semantics differs only
in that I can have GlobalWindow with default trigger on batch and cannot
on stream
b) runtime conditions of batch have to be subset of streaming conditions
- this is because otherwise it might be intractable to run streaming
pipeline on batch engine
- generally this is also true - in batch mode watermark advances only
between two states (-inf and +inf), which makes it possible to turn
(most) stateful operations into group by key operations, and take
advantage of many other optimizations (ability to re-read inputs make it
possible to drop checkpointing, etc, etc)
Now there is also one not so obvious runtime condition of streaming
engines - that is how skewed watermark and event time of elements being
processed can be - if this gets too high (i.e. watermark is not moving,
and/or elements are very out-of-order, then the processing might become
intractable, because everything might have to be buffered).
On batch engines, this is generally not an issue, because the buffering
is eliminated by sorting - when a Group by operation occurs, batch
runners sort elements with the same key to be together and therefore
eliminate the need for potentially infinite cache.
When this turns out to be an issue, is whenever there is a stateful
ParDo operation, because then (without sorting) there is violation of
property b) - on streaming engine the difference between element
timestamp and watermark will tend to be generally low (and late events
will be dropped to restrict the size of buffers), but on batch it can be
arbitrarily large and therefore size buffers that would be needed is
potentially unbounded.
This line of thinking leads me to a conclusion, that if Beam doesn't (on
purpose) sort elements before stateful ParDo by timestamp, then it
basically violates the Unified model, because pipelines with stateful
ParDo will not function properly on batch engines. Which is what I
observe - there is non determinism on batch pipeline although everything
seems to be "well defined", elements arrive arbitrarily out of order and
are arbitrarily out of order dropped. This leads to different results
everytime batch pipeline is run.
Looking forward to any comments on this.
Jan
On 5/16/19 10:53 AM, Aljoscha Krettek wrote:
Please take this with a grain of salt, because I might be a bit rusty on this.
I think the Beam model does not prescribe any ordering (by time or otherwise)
on inputs. Mostly because always requiring it would be prohibitively expensive
on most Runners, especially global sorting.
If you want to have sorting by key, you could do a GroupByKey and then sort the
groups in memory. This only works, of course, if your groups are not too large.
On 15. May 2019, at 21:01, Jan Lukavský <je...@seznam.cz> wrote:
Hmmm, looking into the code of FlinkRunner (and also by observing results from
the stateful ParDo), it seems, that I got it wrong from the beginning. The data
is not sorted before the stateful ParDo, but that a little surprises me. How
the operator should work in this case? It would mean, that in the batch case I
have to hold arbitrarily long allowedLateness inside the BagState, which seems
to be kind of suboptimal. Or am I missing something obvious here? I'll describe
the use case in more detail, let's suppose I have a series of ones and zeros
and I want emit at each time point value of 1 if value changes from 0 to 1,
value of -1 if changes from 1 to 0 and 0 otherwise. So:
0, 1, 1, 0, 0, 1 -> 0, 1, 0, -1, 0, 1
Does anyone have a better idea how to solve it? And if not, how to make it running on
batch, without possibly infinite buffer? Should the input to stateful ParDo be sorted in
batch case? My intuition would be that it should be, because in my understanding of
"batch as a special case of streaming" in batch case, there is (by default)
single window, time advances from -inf to +inf at the end, and the data contains no out
of order data, in places where this might matter (which therefore enables some
optimizations). The order would be relevant only in the stateful ParDo, I'd say.
Jan
On 5/15/19 8:34 PM, Jan Lukavský wrote:
Just to clarify, I understand, that changing semantics of the
PCollection.isBounded, is probably impossible now, because would probably
introduce chicken egg problem. Maybe I will state it more clearly - would it be
better to be able to run bounded pipelines using batch semantics on
DirectRunner (including sorting before stateful ParDos), or would it be better
to come up with some way to notify the pipeline that it will be running in a
streaming way although it consists only of bounded inputs? And I'm not saying
how to do it, just trying to find out if anyone else ever had such a need.
Jan
On 5/15/19 5:20 PM, Jan Lukavský wrote:
Hi,
I have come across unexpected (at least for me) behavior of some apparent
inconsistency of how a PCollection is processed in DirectRunner and what
PCollection.isBounded signals. Let me explain:
- I have a stateful ParDo, which needs to make sure that elements arrive in
order - it accomplishes this by defining BagState for buffering input elements
and sorting them inside this buffer, it also keeps track of element with
highest timestamp to somehow estimate local watermark (minus some allowed
lateness), to know when to remove elements from the buffer, sort them by time
and pass them to some (time ordered) processing
- this seems to work well for streaming (unbounded) data
- for batch (bounded) data the semantics of stateful ParDo should be (please
correct me if I'm wrong) that elements always arrive in order, because the
runner can sort them by timestamp
- this implies that for batch processed input (bounded) the allowedLateness
can be set to zero, so that the processing is little more effective, because it
doesn't have to use the BagState at all
- now, the trouble seems to be, that DirectRunner always uses streaming
processing, even if the input is bounded (that is by definition possible), but
there is no way now to know when it is possible to change allowed lateness to
zero (because input will arrive ordered)
- so - it seems to me, that either DirectRunner should apply sorting to
stateful ParDo, when it processes bounded data (the same way that other runners
do), or it can apply streaming processing, but then it should change
PCollection.isBounded to UNBOUNDED, even if the input is originally bounded
- that way, the semantics of PCollection.isBounded, would be not if the data
are known in advance to be finite, but *how* the data are going to be
processed, which is much more valuable (IMO)
Any thoughts?
Jan