Hi Max,

answers inline.

---------- Původní e-mail ----------
Od: Maximilian Michels <m...@apache.org>
Komu: dev@beam.apache.org
Datum: 16. 5. 2019 15:59:59
Předmět: Re: Definition of Unified model (WAS: Semantics of PCollection.
isBounded)
"Hi Jan,

Thanks for the discussion. Aljoscha already gave great answers. Just a
couple of remarks:

> a) streaming semantics (i.e. what I can express using Transforms) are 
subset of batch semantics

I think you mean streaming is a superset of batch, or batch is a subset 
of streaming. This is the ideal. In practice, the two execution modes
are sometimes accomplished by two different execution engines. Even in
Flink, we have independent APIs for batch and streaming and the
execution semantics are slightly different. For example, there are no
watermarks in the batch API. Thus, batch rarely is simply an execution
mode of streaming. However, I still think the unified Beam model works
in both cases.

> batch semantics and streaming semantics differs only in that I can have 
GlobalWindow with default trigger on batch and cannot on stream "
Actually I really thought, that regarding semantics, streaming should be 
subset of batch. That is because in batch, you can be sure that the
watermark will eventually approach infinity. That gives you one additional
feature, that streaming generally doesn't have (if you don't manually
forward watermark to infinity as you suggest).

"

You can have a GlobalWindow in streaming with a default trigger. You
could define additional triggers that do early firings. And you could
even trigger the global window by advancing the watermark to +inf. "
Yes, but then you actually changed streaming to batch, you just execute 
batch pipeline in streaming way.

"

> On batch engines, this is generally not an issue, because the buffering is
eliminated by sorting - when a Group by operation occurs, batch runners sort
elements with the same key to be together and therefore eliminate the need
for potentially infinite cache.

The batch engines you normally use might do that. However, I do not see 
how sorting is an inherent property of the streaming model. We do not
guarantee a deterministic order of events in streaming with respect to
event time. In that regard, batch is a true subset of streaming because 
we make no guarantees on the order. Actually, because we only advance
the watermark from -inf to +inf once we have read all data, this nicely 
aligns with the streaming model. "



Sure, streaming, doesn't  have the time ordering guarantees. Having so would
be impractical. But - there is no issues in having these quarantees in batch
mode, moreover, it gives the pipelines, that need to have "bounded out of 
orderness" the chance to ever finish.




I think that there is some issues in how we think about the properties of 
batch vs. stream. If we define streaming as the superset, then we cannot 
define some properties for batch, that streaming doesn't have. But - if we
just split it on the part of semantics and on the part of runtime properties
and guarantees, than it is possible to define properties of batch, that 
streaming doesn't have.




Jan



"

-Max

On 16.05.19 15:20, Aljoscha Krettek wrote:
> Hi,
>
> I think it’s helpful to consider that events never truly arrive in order
in the real world (you mentioned as much yourself). For streaming use cases,
there might be some out-of-orderness (or a lot of it, depending on the use
case) so your implementation has to be able to deal with that. On the other
end of the spectrum we have batch use cases, where out-of-orderness is
potentially even bigger because it allows for more efficient parallel
execution. If your implementation can deal with out-of-orderness that also
shouldn’t be a problem.
>
> Another angle is completeness vs. latency: you usually cannot have both in
a streaming world. If you want 100 % completeness, i.e. you want to ensure
that you process all events and never drop anything, you can never advance
the watermark from its initial -Inf if you want to also never have watermark
violations. In typical use cases I would expect any sorting guarantees to be
constantly violated, unless you are willing to drop late data.
>
> I think these are some reasons why there is no mention of ordering by 
timestamp anywhere (unless I’m mistaken and there is somewhere). 
>
> You are right, of course, that batch-style runners can use grouping/
sorting for a GroupByKey operation. Flink does that and even allows sorting
by secondary key, so you could manually sort by timestamp as a secondary key
with hardly any additional cost. However, exposing that in the model would
make implementing Runners quite hard, or they would be prohibitively slow.
>
> You’re also right that user functions that do arbitrary stateful
operations can be quite dangerous and lead to unexpected behaviour. You 
example of reacting to changes in 0 and 1 would produce wrong results if 
events are not 100% sorted by timestamp. In general, state changes that rely
on processing order are problematic while operations that move monotonously
though some space are fine. Examples of such operations are adding elements
to a set or summing numbers. If you “see” a given set of events you can
apply them to state in any order and as long as you see the same set of 
events on different executions the result will be the same.
>
> As for the Beam execution model in relation to processing and time, I 
think the only “guarantees” are:
> - you will eventually see all events
> - the timestamp of those events is usually not less than the watermark 
(but not always)
> - the watermark will advance when the system thinks you won’t see events
with a smaller timestamp in the future (but you sometimes might)
>
> Those seem quite “poor”, but I think you can’t get better guarantees for
general cases for the reasons mentioned above. Also, this is just of the top
of my head and I might be wrong in my understanding of the Beam model. :-O
>
> Best,
> Aljoscha
>
>> On 16. May 2019, at 13:53, Jan Lukavský <je...@seznam.cz> wrote: 
>>
>> Hi,
>>
>> this is starting to be really exciting. It seems to me that there is 
either something wrong with my definition of "Unified model" or with how it
is implemented inside (at least) Direct and Flink Runners.
>>
>> So, first what I see as properties of Unified model:
>>
>> a) streaming semantics (i.e. what I can express using Transforms) are 
subset of batch semantics
>>
>> - this is true, batch semantics and streaming semantics differs only in
that I can have GlobalWindow with default trigger on batch and cannot on 
stream
>>
>> b) runtime conditions of batch have to be subset of streaming conditions
>>
>> - this is because otherwise it might be intractable to run streaming 
pipeline on batch engine
>>
>> - generally this is also true - in batch mode watermark advances only 
between two states (-inf and +inf), which makes it possible to turn (most)
stateful operations into group by key operations, and take advantage of many
other optimizations (ability to re-read inputs make it possible to drop 
checkpointing, etc, etc)
>>
>> Now there is also one not so obvious runtime condition of streaming
engines - that is how skewed watermark and event time of elements being 
processed can be - if this gets too high (i.e. watermark is not moving, and/
or elements are very out-of-order, then the processing might become
intractable, because everything might have to be buffered).
>>
>> On batch engines, this is generally not an issue, because the buffering
is eliminated by sorting - when a Group by operation occurs, batch runners
sort elements with the same key to be together and therefore eliminate the
need for potentially infinite cache.
>>
>> When this turns out to be an issue, is whenever there is a stateful ParDo
operation, because then (without sorting) there is violation of property b)
- on streaming engine the difference between element timestamp and watermark
will tend to be generally low (and late events will be dropped to restrict
the size of buffers), but on batch it can be arbitrarily large and therefore
size buffers that would be needed is potentially unbounded.
>>
>> This line of thinking leads me to a conclusion, that if Beam doesn't (on
purpose) sort elements before stateful ParDo by timestamp, then it basically
violates the Unified model, because pipelines with stateful ParDo will not
function properly on batch engines. Which is what I observe - there is non
determinism on batch pipeline although everything seems to be "well
defined", elements arrive arbitrarily out of order and are arbitrarily out
of order dropped. This leads to different results everytime batch pipeline
is run.
>>
>> Looking forward to any comments on this.
>>
>> Jan
>>
>> On 5/16/19 10:53 AM, Aljoscha Krettek wrote:
>>> Please take this with a grain of salt, because I might be a bit rusty on
this.
>>>
>>> I think the Beam model does not prescribe any ordering (by time or
otherwise) on inputs. Mostly because always requiring it would be
prohibitively expensive on most Runners, especially global sorting.
>>>
>>> If you want to have sorting by key, you could do a GroupByKey and then
sort the groups in memory. This only works, of course, if your groups are 
not too large.
>>>
>>>> On 15. May 2019, at 21:01, Jan Lukavský <je...@seznam.cz> wrote:
>>>>
>>>> Hmmm, looking into the code of FlinkRunner (and also by observing
results from the stateful ParDo), it seems, that I got it wrong from the 
beginning. The data is not sorted before the stateful ParDo, but that a 
little surprises me. How the operator should work in this case? It would 
mean, that in the batch case I have to hold arbitrarily long allowedLateness
inside the BagState, which seems to be kind of suboptimal. Or am I missing
something obvious here? I'll describe the use case in more detail, let's 
suppose I have a series of ones and zeros and I want emit at each time point
value of 1 if value changes from 0 to 1, value of -1 if changes from 1 to 0
and 0 otherwise. So:
>>>>
>>>> 0, 1, 1, 0, 0, 1 -> 0, 1, 0, -1, 0, 1
>>>>
>>>> Does anyone have a better idea how to solve it? And if not, how to make
it running on batch, without possibly infinite buffer? Should the input to
stateful ParDo be sorted in batch case? My intuition would be that it should
be, because in my understanding of "batch as a special case of streaming" in
batch case, there is (by default) single window, time advances from -inf to
+inf at the end, and the data contains no out of order data, in places where
this might matter (which therefore enables some optimizations). The order 
would be relevant only in the stateful ParDo, I'd say.
>>>>
>>>> Jan
>>>>
>>>> On 5/15/19 8:34 PM, Jan Lukavský wrote:
>>>>> Just to clarify, I understand, that changing semantics of the
PCollection.isBounded, is probably impossible now, because would probably 
introduce chicken egg problem. Maybe I will state it more clearly - would it
be better to be able to run bounded pipelines using batch semantics on
DirectRunner (including sorting before stateful ParDos), or would it be 
better to come up with some way to notify the pipeline that it will be
running in a streaming way although it consists only of bounded inputs? And
I'm not saying how to do it, just trying to find out if anyone else ever had
such a need.
>>>>>
>>>>> Jan
>>>>>
>>>>> On 5/15/19 5:20 PM, Jan Lukavský wrote:
>>>>>> Hi,
>>>>>>
>>>>>> I have come across unexpected (at least for me) behavior of some 
apparent inconsistency of how a PCollection is processed in DirectRunner and
what PCollection.isBounded signals. Let me explain:
>>>>>>
>>>>>> - I have a stateful ParDo, which needs to make sure that elements 
arrive in order - it accomplishes this by defining BagState for buffering 
input elements and sorting them inside this buffer, it also keeps track of
element with highest timestamp to somehow estimate local watermark (minus 
some allowed lateness), to know when to remove elements from the buffer, 
sort them by time and pass them to some (time ordered) processing
>>>>>>
>>>>>> - this seems to work well for streaming (unbounded) data
>>>>>>
>>>>>> - for batch (bounded) data the semantics of stateful ParDo should be
(please correct me if I'm wrong) that elements always arrive in order,
because the runner can sort them by timestamp
>>>>>>
>>>>>> - this implies that for batch processed input (bounded) the
allowedLateness can be set to zero, so that the processing is little more 
effective, because it doesn't have to use the BagState at all
>>>>>>
>>>>>> - now, the trouble seems to be, that DirectRunner always uses
streaming processing, even if the input is bounded (that is by definition 
possible), but there is no way now to know when it is possible to change 
allowed lateness to zero (because input will arrive ordered)
>>>>>>
>>>>>> - so - it seems to me, that either DirectRunner should apply sorting
to stateful ParDo, when it processes bounded data (the same way that other
runners do), or it can apply streaming processing, but then it should change
PCollection.isBounded to UNBOUNDED, even if the input is originally bounded
>>>>>>
>>>>>> - that way, the semantics of PCollection.isBounded, would be not if
the data are known in advance to be finite, but *how* the data are going to
be processed, which is much more valuable (IMO)
>>>>>>
>>>>>> Any thoughts?
>>>>>>
>>>>>> Jan
>>>>>>
>
"

Reply via email to