Reza: One could provide something like this as a utility class, but
one downside is that it is not scale invariant. It requires a tuning
parameter that, if to small, won't mitigate the problem, but if to
big, greatly increases latency. (Possibly one could define a dynamic
session-like window to solve this though...) It also might be harder
for runners that *can* cheaply present stuff in timestamp order to
optimize. (That and, in practice, our annotation-style process methods
don't lend themselves to easy composition.) I think it could work in
specific cases though.

More inline below.

On Tue, May 21, 2019 at 11:38 AM Jan Lukavský <je...@seznam.cz> wrote:
>
> Hi Robert,
>
>  > Beam has an exactly-once model. If the data was consumed, state
> mutated, and outputs written downstream (these three are committed
> together atomically) it will not be replayed. That does not, of course,
> solve the non-determanism due to ordering (including the fact that two
> operations reading the same PCollection may view different ordering).
>
> I think what you describe is a property of a runner, not of the model,
> right? I think if I run my pipeline on Flink I will not get this
> atomicity, because although Flink uses also exactly-once model if might
> write outputs multiple times.

Actually, I think it is a larger (open) question whether exactly once
is guaranteed by the model or whether runners are allowed to relax
that. I would think, however, that sources correctly implemented
should be idempotent when run atop an exactly once infrastructure such
as Flink of Dataflow.

>  > 1) Is it correct for a (Stateful)DoFn to assume elements are received
> in a specific order? In the current model, it is not. Being able to
> read, handle, and produced out-of-order data, including late data, is a
> pretty fundamental property of distributed systems.
>
> Yes, absolutely. The argument here is not that Stateful ParDo should
> presume to receive elements in any order, but to _present_ it as such to
> the user @ProcessElement function.

Sounds like we should make this clearer.

>  > 2) Given that some operations are easier (or possibly only possible)
> to write when operating on ordered data, and that different runners may
> have (significantly) cheaper ways to provide this ordering than can be
> done by the user themselves, should we elevate this to a property of
> (Stateful?)DoFns that the runner can provide? I think a compelling
> argument can be made here that we should.
>
> +1
>
> Jan
>
> On 5/21/19 11:07 AM, Robert Bradshaw wrote:
> > On Mon, May 20, 2019 at 5:24 PM Jan Lukavský <je...@seznam.cz> wrote:
> >>   > I don't see batch vs. streaming as part of the model. One can have
> >> microbatch, or even a runner that alternates between different modes.
> >>
> >> Although I understand motivation of this statement, this project name is
> >> "Apache Beam: An advanced unified programming model". What does the
> >> model unify, if "streaming vs. batch" is not part of the model?
> > What I mean is that streaming vs. batch is no longer part of the model
> > (or ideally API), but pushed down to be a concern of the runner
> > (executor) of the pipeline.
> >
> >
> > On Tue, May 21, 2019 at 10:32 AM Jan Lukavský <je...@seznam.cz> wrote:
> >> Hi Kenn,
> >>
> >> OK, so if we introduce annotation, we can have stateful ParDo with 
> >> sorting, that would perfectly resolve my issues. I still have some doubts, 
> >> though. Let me explain. The current behavior of stateful ParDo has the 
> >> following properties:
> >>
> >>   a) might fail in batch, although runs fine in streaming (that is due to 
> >> the buffering, and unbounded lateness in batch, which was discussed back 
> >> and forth in this thread)
> >>
> >>   b) might be non deterministic (this is because the elements arrive at 
> >> somewhat random order, and even if you do the operation "assign unique ID 
> >> to elements" this might produce different results when run multiple times)
> > PCollections are *explicitly* unordered. Any operations that assume or
> > depend on a specific ordering for correctness (or determinism) must
> > provide that ordering themselves (i.e. tolerate "arbitrary shuffling
> > of inputs"). As you point out, that may be very expensive if you have
> > very hot keys with very large (unbounded) timestamp skew.
> >
> > StatefulDoFns are low-level operations that should be used with care;
> > the simpler windowing model gives determinism in the face of unordered
> > data (though late data and non-end-of-window triggering introduces
> > some of the non-determanism back in).
> >
> >> What worries me most is the property b), because it seems to me to have 
> >> serious consequences - not only that if you run twice batch pipeline you 
> >> would get different results, but even on streaming, when pipeline fails 
> >> and gets restarted from checkpoint, produced output might differ from the 
> >> previous run and data from the first run might have already been persisted 
> >> into sink. That would create somewhat messy outputs.
> > Beam has an exactly-once model. If the data was consumed, state
> > mutated, and outputs written downstream (these three are committed
> > together atomically) it will not be replayed. That does not, of
> > course, solve the non-determanism due to ordering (including the fact
> > that two operations reading the same PCollection may view different
> > ordering).
> >
> >> These two properties makes me think that the current implementation is 
> >> more of a _special case_ than the general one. The general one would be 
> >> that your state doesn't have the properties to be able to tolerate 
> >> buffering problems and/or non-determinism. Which is the case where you 
> >> need sorting in both streaming and batch to be part of the model.
> >>
> >> Let me point out one more analogy - that is merging vs. non-merging 
> >> windows. The general case (merging windows) implies sorting by timestamp 
> >> in both batch case (explicit) and streaming (buffering). The special case 
> >> (non-merging windows) doesn't rely on any timestamp ordering, so the 
> >> sorting and buffering can be dropped. The underlying root cause of this is 
> >> the same for both stateful ParDo and windowing (essentially, assigning 
> >> window labels is a stateful operation when windowing function is merging).
> >>
> >> The reason for the current behavior of stateful ParDo seems to be 
> >> performance, but is it right to abandon correctness in favor of 
> >> performance? Wouldn't it be more consistent to have the default behavior 
> >> prefer correctness and when you have the specific conditions of state 
> >> function having special properties, then you can annotate your DoFn (with 
> >> something like @TimeOrderingAgnostic), which would yield a better 
> >> performance in that case?
> > There are two separable questions here.
> >
> > 1) Is it correct for a (Stateful)DoFn to assume elements are received
> > in a specific order? In the current model, it is not. Being able to
> > read, handle, and produced out-of-order data, including late data, is
> > a pretty fundamental property of distributed systems.
> >
> > 2) Given that some operations are easier (or possibly only possible)
> > to write when operating on ordered data, and that different runners
> > may have (significantly) cheaper ways to provide this ordering than
> > can be done by the user themselves, should we elevate this to a
> > property of (Stateful?)DoFns that the runner can provide? I think a
> > compelling argument can be made here that we should.
> >
> > - Robert
> >
> >
> >
> >> On 5/21/19 1:00 AM, Kenneth Knowles wrote:
> >>
> >> Thanks for the nice small example of a calculation that depends on order. 
> >> You are right that many state machines have this property. I agree w/ you 
> >> and Luke that it is convenient for batch processing to sort by event 
> >> timestamp before running a stateful ParDo. In streaming you could also 
> >> implement "sort by event timestamp" by buffering until you know all 
> >> earlier data will be dropped - a slack buffer up to allowed lateness.
> >>
> >> I do not think that it is OK to sort in batch and not in streaming. Many 
> >> state machines diverge very rapidly when things are out of order. So each 
> >> runner if they see the "@OrderByTimestamp" annotation (or whatever) needs 
> >> to deliver sorted data (by some mix of buffering and dropping), or to 
> >> reject the pipeline as unsupported.
> >>
> >> And also want to say that this is not the default case - many uses of 
> >> state & timers in ParDo yield different results at the element level, but 
> >> the results are equivalent at in the big picture. Such as the example of 
> >> "assign a unique sequence number to each element" or "group into batches" 
> >> it doesn't matter exactly what the result is, only that it meets the spec. 
> >> And other cases like user funnels are monotonic enough that you also don't 
> >> actually need sorting.
> >>
> >> Kenn
> >>
> >> On Mon, May 20, 2019 at 2:59 PM Jan Lukavský <je...@seznam.cz> wrote:
> >>> Yes, the problem will arise probably mostly when you have not well 
> >>> distributed keys (or too few keys). I'm really not sure if a pure GBK 
> >>> with a trigger can solve this - it might help to have data driven 
> >>> trigger. There would still be some doubts, though. The main question is 
> >>> still here - people say, that sorting by timestamp before stateful ParDo 
> >>> would be prohibitively slow, but I don't really see why - the sorting is 
> >>> very probably already there. And if not (hash grouping instead of sorted 
> >>> grouping), then the sorting would affect only user defined StatefulParDos.
> >>>
> >>> This would suggest that the best way out of this would be really to add 
> >>> annotation, so that the author of the pipeline can decide.
> >>>
> >>> If that would be acceptable I think I can try to prepare some basic 
> >>> functionality, but I'm not sure, if I would be able to cover all runners 
> >>> / sdks.
> >>>
> >>> On 5/20/19 11:36 PM, Lukasz Cwik wrote:
> >>>
> >>> It is read all per key and window and not just read all (this still won't 
> >>> scale with hot keys in the global window). The GBK preceding the 
> >>> StatefulParDo will guarantee that you are processing all the values for a 
> >>> specific key and window at any given time. Is there a specific 
> >>> window/trigger that is missing that you feel would remove the need for 
> >>> you to use StatefulParDo?
> >>>
> >>> On Mon, May 20, 2019 at 12:54 PM Jan Lukavský <je...@seznam.cz> wrote:
> >>>> Hi Lukasz,
> >>>>
> >>>>> Today, if you must have a strict order, you must guarantee that your 
> >>>>> StatefulParDo implements the necessary "buffering & sorting" into state.
> >>>> Yes, no problem with that. But this whole discussion started, because 
> >>>> *this doesn't work on batch*. You simply cannot first read everything 
> >>>> from distributed storage and then buffer it all into memory, just to 
> >>>> read it again, but sorted. That will not work. And even if it would, it 
> >>>> would be a terrible waste of resources.
> >>>>
> >>>> Jan
> >>>>
> >>>> On 5/20/19 8:39 PM, Lukasz Cwik wrote:
> >>>>
> >>>>
> >>>>
> >>>> On Mon, May 20, 2019 at 8:24 AM Jan Lukavský <je...@seznam.cz> wrote:
> >>>>> This discussion brings many really interesting questions for me. :-)
> >>>>>
> >>>>>   > I don't see batch vs. streaming as part of the model. One can have
> >>>>> microbatch, or even a runner that alternates between different modes.
> >>>>>
> >>>>> Although I understand motivation of this statement, this project name is
> >>>>> "Apache Beam: An advanced unified programming model". What does the
> >>>>> model unify, if "streaming vs. batch" is not part of the model?
> >>>>>
> >>>>> Using microbatching, chaining of batch jobs, or pure streaming are
> >>>>> exactly the "runtime conditions/characteristics" I refer to. All these
> >>>>> define several runtime parameters, which in turn define how well/badly
> >>>>> will the pipeline perform and how many resources might be needed. From
> >>>>> my point of view, pure streaming should be the most resource demanding
> >>>>> (if not, why bother with batch? why not run everything in streaming
> >>>>> only? what will there remain to "unify"?).
> >>>>>
> >>>>>   > Fortunately, for batch, only the state for a single key needs to be
> >>>>> preserved at a time, rather than the state for all keys across the range
> >>>>> of skew. Of course if you have few or hot keys, one can still have
> >>>>> issues (and this is not specific to StatefulDoFns).
> >>>>>
> >>>>> Yes, but here is still the presumption that my stateful DoFn can
> >>>>> tolerate arbitrary shuffling of inputs. Let me explain the use case in
> >>>>> more detail.
> >>>>>
> >>>>> Suppose you have input stream consisting of 1s and 0s (and some key for
> >>>>> each element, which is irrelevant for the demonstration). Your task is
> >>>>> to calculate in running global window the actual number of changes
> >>>>> between state 0 and state 1 and vice versa. When the state doesn't
> >>>>> change, you don't calculate anything. If input (for given key) would be
> >>>>> (tN denotes timestamp N):
> >>>>>
> >>>>>    t1: 1
> >>>>>
> >>>>>    t2: 0
> >>>>>
> >>>>>    t3: 0
> >>>>>
> >>>>>    t4: 1
> >>>>>
> >>>>>    t5: 1
> >>>>>
> >>>>>    t6: 0
> >>>>>
> >>>>> then the output should yield (supposing that default state is zero):
> >>>>>
> >>>>>    t1: (one: 1, zero: 0)
> >>>>>
> >>>>>    t2: (one: 1, zero: 1)
> >>>>>
> >>>>>    t3: (one: 1, zero: 1)
> >>>>>
> >>>>>    t4: (one: 2, zero: 1)
> >>>>>
> >>>>>    t5: (one: 2, zero: 1)
> >>>>>
> >>>>>    t6: (one: 2, zero: 2)
> >>>>>
> >>>>> How would you implement this in current Beam semantics?
> >>>>
> >>>> I think your saying here that I know that my input is ordered in a 
> >>>> specific way and since I assume the order when writing my pipeline I can 
> >>>> perform this optimization. But there is nothing preventing a runner from 
> >>>> noticing that your processing in the global window with a specific type 
> >>>> of trigger and re-ordering your inputs/processing to get better 
> >>>> performance (since you can't use an AfterWatermark trigger for your 
> >>>> pipeline in streaming for the GlobalWindow).
> >>>>
> >>>> Today, if you must have a strict order, you must guarantee that your 
> >>>> StatefulParDo implements the necessary "buffering & sorting" into state. 
> >>>> I can see why you would want an annotation that says I must have 
> >>>> timestamp ordered elements, since it makes writing certain 
> >>>> StatefulParDos much easier. StatefulParDo is a low-level function, it 
> >>>> really is the "here you go and do whatever you need to but here be 
> >>>> dragons" function while windowing and triggering is meant to keep many 
> >>>> people from writing StatefulParDo in the first place.
> >>>>
> >>>>>   > Pipelines that fail in the "worst case" batch scenario are likely to
> >>>>> degrade poorly (possibly catastrophically) when the watermark falls
> >>>>> behind in streaming mode as well.
> >>>>>
> >>>>> But the worst case is defined by input of size (available resources +
> >>>>> single byte) -> pipeline fail. Although it could have finished, given
> >>>>> the right conditions.
> >>>>>
> >>>>>   > This might be reasonable, implemented by default by buffering
> >>>>> everything and releasing elements as the watermark (+lateness) advances,
> >>>>> but would likely lead to inefficient (though *maybe* easier to reason
> >>>>> about) code.
> >>>>>
> >>>>> Sure, the pipeline will be less efficient, because it would have to
> >>>>> buffer and sort the inputs. But at least it will produce correct results
> >>>>> in cases where updates to state are order-sensitive.
> >>>>>
> >>>>>   > Would it be roughly equivalent to GBK + FlatMap(lambda (key, 
> >>>>> values):
> >>>>> [(key, value) for value in values])?
> >>>>>
> >>>>> I'd say roughly yes, but difference would be in the trigger. The trigger
> >>>>> should ideally fire as soon as watermark (+lateness) crosses element
> >>>>> with lowest timestamp in the buffer. Although this could be somehow
> >>>>> emulated by fixed trigger each X millis.
> >>>>>
> >>>>>   > Or is the underlying desire just to be able to hint to the runner
> >>>>> that the code may perform better (e.g. require less resources) as skew
> >>>>> is reduced (and hence to order by timestamp iff it's cheap)?
> >>>>>
> >>>>> No, the sorting would have to be done in streaming case as well. That is
> >>>>> an imperative of the unified model. I think it is possible to sort by
> >>>>> timestamp only in batch case (and do it for *all* batch stateful pardos
> >>>>> without annotation), or introduce annotation, but then make the same
> >>>>> guarantees for streaming case as well.
> >>>>>
> >>>>> Jan
> >>>>>
> >>>>> On 5/20/19 4:41 PM, Robert Bradshaw wrote:
> >>>>>> On Mon, May 20, 2019 at 1:19 PM Jan Lukavský <je...@seznam.cz> wrote:
> >>>>>>> Hi Robert,
> >>>>>>>
> >>>>>>> yes, I think you rephrased my point - although no *explicit* 
> >>>>>>> guarantees
> >>>>>>> of ordering are given in either mode, there is *implicit* ordering in
> >>>>>>> streaming case that is due to nature of the processing - the 
> >>>>>>> difference
> >>>>>>> between watermark and timestamp of elements flowing through the 
> >>>>>>> pipeline
> >>>>>>> are generally low (too high difference leads to the overbuffering
> >>>>>>> problem), but there is no such bound on batch.
> >>>>>> Fortunately, for batch, only the state for a single key needs to be
> >>>>>> preserved at a time, rather than the state for all keys across the
> >>>>>> range of skew. Of course if you have few or hot keys, one can still
> >>>>>> have issues (and this is not specific to StatefulDoFns).
> >>>>>>
> >>>>>>> As a result, I see a few possible solutions:
> >>>>>>>
> >>>>>>>     - the best and most natural seems to be extension of the model, so
> >>>>>>> that it defines batch as not only "streaming pipeline executed in 
> >>>>>>> batch
> >>>>>>> fashion", but "pipeline with at least as good runtime characteristics 
> >>>>>>> as
> >>>>>>> in streaming case, executed in batch fashion", I really don't think 
> >>>>>>> that
> >>>>>>> there are any conflicts with the current model, or that this could
> >>>>>>> affect performance, because the required sorting (as pointed by
> >>>>>>> Aljoscha) is very probably already done during translation of stateful
> >>>>>>> pardos. Also note that this definition only affects user defined
> >>>>>>> stateful pardos
> >>>>>> I don't see batch vs. streaming as part of the model. One can have
> >>>>>> microbatch, or even a runner that alternates between different modes.
> >>>>>> The model describes what the valid outputs are given a (sometimes
> >>>>>> partial) set of inputs. It becomes really hard to define things like
> >>>>>> "as good runtime characteristics." Once you allow any
> >>>>>> out-of-orderedness, it is not very feasible to try and define (and
> >>>>>> more cheaply implement) a "upper bound" of acceptable
> >>>>>> out-of-orderedness.
> >>>>>>
> >>>>>> Pipelines that fail in the "worst case" batch scenario are likely to
> >>>>>> degrade poorly (possibly catastrophically) when the watermark falls
> >>>>>> behind in streaming mode as well.
> >>>>>>
> >>>>>>>     - another option would be to introduce annotation for DoFns (e.g.
> >>>>>>> @RequiresStableTimeCharacteristics), which would result in the sorting
> >>>>>>> in batch case - but - this extension would have to ensure the sorting 
> >>>>>>> in
> >>>>>>> streaming mode also - it would require definition of allowed lateness,
> >>>>>>> and triggger (essentially similar to window)
> >>>>>> This might be reasonable, implemented by default by buffering
> >>>>>> everything and releasing elements as the watermark (+lateness)
> >>>>>> advances, but would likely lead to inefficient (though *maybe* easier
> >>>>>> to reason about) code. Not sure about the semantics of triggering
> >>>>>> here, especially data-driven triggers. Would it be roughly equivalent
> >>>>>> to GBK + FlatMap(lambda (key, values): [(key, value) for value in
> >>>>>> values])?
> >>>>>>
> >>>>>> Or is the underlying desire just to be able to hint to the runner that
> >>>>>> the code may perform better (e.g. require less resources) as skew is
> >>>>>> reduced (and hence to order by timestamp iff it's cheap)?
> >>>>>>
> >>>>>>>     - last option would be to introduce these "higher order 
> >>>>>>> guarantees" in
> >>>>>>> some extension DSL (e.g. Euphoria), but that seems to be the worst
> >>>>>>> option to me
> >>>>>>>
> >>>>>>> I see the first two options quite equally good, although the letter 
> >>>>>>> one
> >>>>>>> is probably more time consuming to implement. But it would bring
> >>>>>>> additional feature to streaming case as well.
> >>>>>>>
> >>>>>>> Thanks for any thoughts.
> >>>>>>>
> >>>>>>>     Jan
> >>>>>>>
> >>>>>>> On 5/20/19 12:41 PM, Robert Bradshaw wrote:
> >>>>>>>> On Fri, May 17, 2019 at 4:48 PM Jan Lukavský <je...@seznam.cz> wrote:
> >>>>>>>>> Hi Reuven,
> >>>>>>>>>
> >>>>>>>>>> How so? AFAIK stateful DoFns work just fine in batch runners.
> >>>>>>>>> Stateful ParDo works in batch as far, as the logic inside the state 
> >>>>>>>>> works for absolutely unbounded out-of-orderness of elements. That 
> >>>>>>>>> basically (practically) can work only for cases, where the order of 
> >>>>>>>>> input elements doesn't matter. But, "state" can refer to "state 
> >>>>>>>>> machine", and any time you have a state machine involved, then the 
> >>>>>>>>> ordering of elements would matter.
> >>>>>>>> No guarantees on order are provided in *either* streaming or batch
> >>>>>>>> mode by the model. However, it is the case that in order to make
> >>>>>>>> forward progress most streaming runners attempt to limit the amount 
> >>>>>>>> of
> >>>>>>>> out-of-orderedness of elements (in terms of event time vs. processing
> >>>>>>>> time) to make forward progress, which in turn could help cap the
> >>>>>>>> amount of state that must be held concurrently, whereas a batch 
> >>>>>>>> runner
> >>>>>>>> may not allow any state to be safely discarded until the whole
> >>>>>>>> timeline from infinite past to infinite future has been observed.
> >>>>>>>>
> >>>>>>>> Also, as pointed out, state is not preserved "batch to batch" in 
> >>>>>>>> batch mode.
> >>>>>>>>
> >>>>>>>>
> >>>>>>>> On Thu, May 16, 2019 at 3:59 PM Maximilian Michels <m...@apache.org> 
> >>>>>>>> wrote:
> >>>>>>>>
> >>>>>>>>>>     batch semantics and streaming semantics differs only in that I 
> >>>>>>>>>> can have GlobalWindow with default trigger on batch and cannot on 
> >>>>>>>>>> stream
> >>>>>>>>> You can have a GlobalWindow in streaming with a default trigger. You
> >>>>>>>>> could define additional triggers that do early firings. And you 
> >>>>>>>>> could
> >>>>>>>>> even trigger the global window by advancing the watermark to +inf.
> >>>>>>>> IIRC, as a pragmatic note, we prohibited global window with default
> >>>>>>>> trigger on unbounded PCollections in the SDK because this is more
> >>>>>>>> likely to be user error than an actual desire to have no output until
> >>>>>>>> drain. But it's semantically valid in the model.

Reply via email to