Reza: One could provide something like this as a utility class, but one downside is that it is not scale invariant. It requires a tuning parameter that, if to small, won't mitigate the problem, but if to big, greatly increases latency. (Possibly one could define a dynamic session-like window to solve this though...) It also might be harder for runners that *can* cheaply present stuff in timestamp order to optimize. (That and, in practice, our annotation-style process methods don't lend themselves to easy composition.) I think it could work in specific cases though.
More inline below. On Tue, May 21, 2019 at 11:38 AM Jan Lukavský <je...@seznam.cz> wrote: > > Hi Robert, > > > Beam has an exactly-once model. If the data was consumed, state > mutated, and outputs written downstream (these three are committed > together atomically) it will not be replayed. That does not, of course, > solve the non-determanism due to ordering (including the fact that two > operations reading the same PCollection may view different ordering). > > I think what you describe is a property of a runner, not of the model, > right? I think if I run my pipeline on Flink I will not get this > atomicity, because although Flink uses also exactly-once model if might > write outputs multiple times. Actually, I think it is a larger (open) question whether exactly once is guaranteed by the model or whether runners are allowed to relax that. I would think, however, that sources correctly implemented should be idempotent when run atop an exactly once infrastructure such as Flink of Dataflow. > > 1) Is it correct for a (Stateful)DoFn to assume elements are received > in a specific order? In the current model, it is not. Being able to > read, handle, and produced out-of-order data, including late data, is a > pretty fundamental property of distributed systems. > > Yes, absolutely. The argument here is not that Stateful ParDo should > presume to receive elements in any order, but to _present_ it as such to > the user @ProcessElement function. Sounds like we should make this clearer. > > 2) Given that some operations are easier (or possibly only possible) > to write when operating on ordered data, and that different runners may > have (significantly) cheaper ways to provide this ordering than can be > done by the user themselves, should we elevate this to a property of > (Stateful?)DoFns that the runner can provide? I think a compelling > argument can be made here that we should. > > +1 > > Jan > > On 5/21/19 11:07 AM, Robert Bradshaw wrote: > > On Mon, May 20, 2019 at 5:24 PM Jan Lukavský <je...@seznam.cz> wrote: > >> > I don't see batch vs. streaming as part of the model. One can have > >> microbatch, or even a runner that alternates between different modes. > >> > >> Although I understand motivation of this statement, this project name is > >> "Apache Beam: An advanced unified programming model". What does the > >> model unify, if "streaming vs. batch" is not part of the model? > > What I mean is that streaming vs. batch is no longer part of the model > > (or ideally API), but pushed down to be a concern of the runner > > (executor) of the pipeline. > > > > > > On Tue, May 21, 2019 at 10:32 AM Jan Lukavský <je...@seznam.cz> wrote: > >> Hi Kenn, > >> > >> OK, so if we introduce annotation, we can have stateful ParDo with > >> sorting, that would perfectly resolve my issues. I still have some doubts, > >> though. Let me explain. The current behavior of stateful ParDo has the > >> following properties: > >> > >> a) might fail in batch, although runs fine in streaming (that is due to > >> the buffering, and unbounded lateness in batch, which was discussed back > >> and forth in this thread) > >> > >> b) might be non deterministic (this is because the elements arrive at > >> somewhat random order, and even if you do the operation "assign unique ID > >> to elements" this might produce different results when run multiple times) > > PCollections are *explicitly* unordered. Any operations that assume or > > depend on a specific ordering for correctness (or determinism) must > > provide that ordering themselves (i.e. tolerate "arbitrary shuffling > > of inputs"). As you point out, that may be very expensive if you have > > very hot keys with very large (unbounded) timestamp skew. > > > > StatefulDoFns are low-level operations that should be used with care; > > the simpler windowing model gives determinism in the face of unordered > > data (though late data and non-end-of-window triggering introduces > > some of the non-determanism back in). > > > >> What worries me most is the property b), because it seems to me to have > >> serious consequences - not only that if you run twice batch pipeline you > >> would get different results, but even on streaming, when pipeline fails > >> and gets restarted from checkpoint, produced output might differ from the > >> previous run and data from the first run might have already been persisted > >> into sink. That would create somewhat messy outputs. > > Beam has an exactly-once model. If the data was consumed, state > > mutated, and outputs written downstream (these three are committed > > together atomically) it will not be replayed. That does not, of > > course, solve the non-determanism due to ordering (including the fact > > that two operations reading the same PCollection may view different > > ordering). > > > >> These two properties makes me think that the current implementation is > >> more of a _special case_ than the general one. The general one would be > >> that your state doesn't have the properties to be able to tolerate > >> buffering problems and/or non-determinism. Which is the case where you > >> need sorting in both streaming and batch to be part of the model. > >> > >> Let me point out one more analogy - that is merging vs. non-merging > >> windows. The general case (merging windows) implies sorting by timestamp > >> in both batch case (explicit) and streaming (buffering). The special case > >> (non-merging windows) doesn't rely on any timestamp ordering, so the > >> sorting and buffering can be dropped. The underlying root cause of this is > >> the same for both stateful ParDo and windowing (essentially, assigning > >> window labels is a stateful operation when windowing function is merging). > >> > >> The reason for the current behavior of stateful ParDo seems to be > >> performance, but is it right to abandon correctness in favor of > >> performance? Wouldn't it be more consistent to have the default behavior > >> prefer correctness and when you have the specific conditions of state > >> function having special properties, then you can annotate your DoFn (with > >> something like @TimeOrderingAgnostic), which would yield a better > >> performance in that case? > > There are two separable questions here. > > > > 1) Is it correct for a (Stateful)DoFn to assume elements are received > > in a specific order? In the current model, it is not. Being able to > > read, handle, and produced out-of-order data, including late data, is > > a pretty fundamental property of distributed systems. > > > > 2) Given that some operations are easier (or possibly only possible) > > to write when operating on ordered data, and that different runners > > may have (significantly) cheaper ways to provide this ordering than > > can be done by the user themselves, should we elevate this to a > > property of (Stateful?)DoFns that the runner can provide? I think a > > compelling argument can be made here that we should. > > > > - Robert > > > > > > > >> On 5/21/19 1:00 AM, Kenneth Knowles wrote: > >> > >> Thanks for the nice small example of a calculation that depends on order. > >> You are right that many state machines have this property. I agree w/ you > >> and Luke that it is convenient for batch processing to sort by event > >> timestamp before running a stateful ParDo. In streaming you could also > >> implement "sort by event timestamp" by buffering until you know all > >> earlier data will be dropped - a slack buffer up to allowed lateness. > >> > >> I do not think that it is OK to sort in batch and not in streaming. Many > >> state machines diverge very rapidly when things are out of order. So each > >> runner if they see the "@OrderByTimestamp" annotation (or whatever) needs > >> to deliver sorted data (by some mix of buffering and dropping), or to > >> reject the pipeline as unsupported. > >> > >> And also want to say that this is not the default case - many uses of > >> state & timers in ParDo yield different results at the element level, but > >> the results are equivalent at in the big picture. Such as the example of > >> "assign a unique sequence number to each element" or "group into batches" > >> it doesn't matter exactly what the result is, only that it meets the spec. > >> And other cases like user funnels are monotonic enough that you also don't > >> actually need sorting. > >> > >> Kenn > >> > >> On Mon, May 20, 2019 at 2:59 PM Jan Lukavský <je...@seznam.cz> wrote: > >>> Yes, the problem will arise probably mostly when you have not well > >>> distributed keys (or too few keys). I'm really not sure if a pure GBK > >>> with a trigger can solve this - it might help to have data driven > >>> trigger. There would still be some doubts, though. The main question is > >>> still here - people say, that sorting by timestamp before stateful ParDo > >>> would be prohibitively slow, but I don't really see why - the sorting is > >>> very probably already there. And if not (hash grouping instead of sorted > >>> grouping), then the sorting would affect only user defined StatefulParDos. > >>> > >>> This would suggest that the best way out of this would be really to add > >>> annotation, so that the author of the pipeline can decide. > >>> > >>> If that would be acceptable I think I can try to prepare some basic > >>> functionality, but I'm not sure, if I would be able to cover all runners > >>> / sdks. > >>> > >>> On 5/20/19 11:36 PM, Lukasz Cwik wrote: > >>> > >>> It is read all per key and window and not just read all (this still won't > >>> scale with hot keys in the global window). The GBK preceding the > >>> StatefulParDo will guarantee that you are processing all the values for a > >>> specific key and window at any given time. Is there a specific > >>> window/trigger that is missing that you feel would remove the need for > >>> you to use StatefulParDo? > >>> > >>> On Mon, May 20, 2019 at 12:54 PM Jan Lukavský <je...@seznam.cz> wrote: > >>>> Hi Lukasz, > >>>> > >>>>> Today, if you must have a strict order, you must guarantee that your > >>>>> StatefulParDo implements the necessary "buffering & sorting" into state. > >>>> Yes, no problem with that. But this whole discussion started, because > >>>> *this doesn't work on batch*. You simply cannot first read everything > >>>> from distributed storage and then buffer it all into memory, just to > >>>> read it again, but sorted. That will not work. And even if it would, it > >>>> would be a terrible waste of resources. > >>>> > >>>> Jan > >>>> > >>>> On 5/20/19 8:39 PM, Lukasz Cwik wrote: > >>>> > >>>> > >>>> > >>>> On Mon, May 20, 2019 at 8:24 AM Jan Lukavský <je...@seznam.cz> wrote: > >>>>> This discussion brings many really interesting questions for me. :-) > >>>>> > >>>>> > I don't see batch vs. streaming as part of the model. One can have > >>>>> microbatch, or even a runner that alternates between different modes. > >>>>> > >>>>> Although I understand motivation of this statement, this project name is > >>>>> "Apache Beam: An advanced unified programming model". What does the > >>>>> model unify, if "streaming vs. batch" is not part of the model? > >>>>> > >>>>> Using microbatching, chaining of batch jobs, or pure streaming are > >>>>> exactly the "runtime conditions/characteristics" I refer to. All these > >>>>> define several runtime parameters, which in turn define how well/badly > >>>>> will the pipeline perform and how many resources might be needed. From > >>>>> my point of view, pure streaming should be the most resource demanding > >>>>> (if not, why bother with batch? why not run everything in streaming > >>>>> only? what will there remain to "unify"?). > >>>>> > >>>>> > Fortunately, for batch, only the state for a single key needs to be > >>>>> preserved at a time, rather than the state for all keys across the range > >>>>> of skew. Of course if you have few or hot keys, one can still have > >>>>> issues (and this is not specific to StatefulDoFns). > >>>>> > >>>>> Yes, but here is still the presumption that my stateful DoFn can > >>>>> tolerate arbitrary shuffling of inputs. Let me explain the use case in > >>>>> more detail. > >>>>> > >>>>> Suppose you have input stream consisting of 1s and 0s (and some key for > >>>>> each element, which is irrelevant for the demonstration). Your task is > >>>>> to calculate in running global window the actual number of changes > >>>>> between state 0 and state 1 and vice versa. When the state doesn't > >>>>> change, you don't calculate anything. If input (for given key) would be > >>>>> (tN denotes timestamp N): > >>>>> > >>>>> t1: 1 > >>>>> > >>>>> t2: 0 > >>>>> > >>>>> t3: 0 > >>>>> > >>>>> t4: 1 > >>>>> > >>>>> t5: 1 > >>>>> > >>>>> t6: 0 > >>>>> > >>>>> then the output should yield (supposing that default state is zero): > >>>>> > >>>>> t1: (one: 1, zero: 0) > >>>>> > >>>>> t2: (one: 1, zero: 1) > >>>>> > >>>>> t3: (one: 1, zero: 1) > >>>>> > >>>>> t4: (one: 2, zero: 1) > >>>>> > >>>>> t5: (one: 2, zero: 1) > >>>>> > >>>>> t6: (one: 2, zero: 2) > >>>>> > >>>>> How would you implement this in current Beam semantics? > >>>> > >>>> I think your saying here that I know that my input is ordered in a > >>>> specific way and since I assume the order when writing my pipeline I can > >>>> perform this optimization. But there is nothing preventing a runner from > >>>> noticing that your processing in the global window with a specific type > >>>> of trigger and re-ordering your inputs/processing to get better > >>>> performance (since you can't use an AfterWatermark trigger for your > >>>> pipeline in streaming for the GlobalWindow). > >>>> > >>>> Today, if you must have a strict order, you must guarantee that your > >>>> StatefulParDo implements the necessary "buffering & sorting" into state. > >>>> I can see why you would want an annotation that says I must have > >>>> timestamp ordered elements, since it makes writing certain > >>>> StatefulParDos much easier. StatefulParDo is a low-level function, it > >>>> really is the "here you go and do whatever you need to but here be > >>>> dragons" function while windowing and triggering is meant to keep many > >>>> people from writing StatefulParDo in the first place. > >>>> > >>>>> > Pipelines that fail in the "worst case" batch scenario are likely to > >>>>> degrade poorly (possibly catastrophically) when the watermark falls > >>>>> behind in streaming mode as well. > >>>>> > >>>>> But the worst case is defined by input of size (available resources + > >>>>> single byte) -> pipeline fail. Although it could have finished, given > >>>>> the right conditions. > >>>>> > >>>>> > This might be reasonable, implemented by default by buffering > >>>>> everything and releasing elements as the watermark (+lateness) advances, > >>>>> but would likely lead to inefficient (though *maybe* easier to reason > >>>>> about) code. > >>>>> > >>>>> Sure, the pipeline will be less efficient, because it would have to > >>>>> buffer and sort the inputs. But at least it will produce correct results > >>>>> in cases where updates to state are order-sensitive. > >>>>> > >>>>> > Would it be roughly equivalent to GBK + FlatMap(lambda (key, > >>>>> values): > >>>>> [(key, value) for value in values])? > >>>>> > >>>>> I'd say roughly yes, but difference would be in the trigger. The trigger > >>>>> should ideally fire as soon as watermark (+lateness) crosses element > >>>>> with lowest timestamp in the buffer. Although this could be somehow > >>>>> emulated by fixed trigger each X millis. > >>>>> > >>>>> > Or is the underlying desire just to be able to hint to the runner > >>>>> that the code may perform better (e.g. require less resources) as skew > >>>>> is reduced (and hence to order by timestamp iff it's cheap)? > >>>>> > >>>>> No, the sorting would have to be done in streaming case as well. That is > >>>>> an imperative of the unified model. I think it is possible to sort by > >>>>> timestamp only in batch case (and do it for *all* batch stateful pardos > >>>>> without annotation), or introduce annotation, but then make the same > >>>>> guarantees for streaming case as well. > >>>>> > >>>>> Jan > >>>>> > >>>>> On 5/20/19 4:41 PM, Robert Bradshaw wrote: > >>>>>> On Mon, May 20, 2019 at 1:19 PM Jan Lukavský <je...@seznam.cz> wrote: > >>>>>>> Hi Robert, > >>>>>>> > >>>>>>> yes, I think you rephrased my point - although no *explicit* > >>>>>>> guarantees > >>>>>>> of ordering are given in either mode, there is *implicit* ordering in > >>>>>>> streaming case that is due to nature of the processing - the > >>>>>>> difference > >>>>>>> between watermark and timestamp of elements flowing through the > >>>>>>> pipeline > >>>>>>> are generally low (too high difference leads to the overbuffering > >>>>>>> problem), but there is no such bound on batch. > >>>>>> Fortunately, for batch, only the state for a single key needs to be > >>>>>> preserved at a time, rather than the state for all keys across the > >>>>>> range of skew. Of course if you have few or hot keys, one can still > >>>>>> have issues (and this is not specific to StatefulDoFns). > >>>>>> > >>>>>>> As a result, I see a few possible solutions: > >>>>>>> > >>>>>>> - the best and most natural seems to be extension of the model, so > >>>>>>> that it defines batch as not only "streaming pipeline executed in > >>>>>>> batch > >>>>>>> fashion", but "pipeline with at least as good runtime characteristics > >>>>>>> as > >>>>>>> in streaming case, executed in batch fashion", I really don't think > >>>>>>> that > >>>>>>> there are any conflicts with the current model, or that this could > >>>>>>> affect performance, because the required sorting (as pointed by > >>>>>>> Aljoscha) is very probably already done during translation of stateful > >>>>>>> pardos. Also note that this definition only affects user defined > >>>>>>> stateful pardos > >>>>>> I don't see batch vs. streaming as part of the model. One can have > >>>>>> microbatch, or even a runner that alternates between different modes. > >>>>>> The model describes what the valid outputs are given a (sometimes > >>>>>> partial) set of inputs. It becomes really hard to define things like > >>>>>> "as good runtime characteristics." Once you allow any > >>>>>> out-of-orderedness, it is not very feasible to try and define (and > >>>>>> more cheaply implement) a "upper bound" of acceptable > >>>>>> out-of-orderedness. > >>>>>> > >>>>>> Pipelines that fail in the "worst case" batch scenario are likely to > >>>>>> degrade poorly (possibly catastrophically) when the watermark falls > >>>>>> behind in streaming mode as well. > >>>>>> > >>>>>>> - another option would be to introduce annotation for DoFns (e.g. > >>>>>>> @RequiresStableTimeCharacteristics), which would result in the sorting > >>>>>>> in batch case - but - this extension would have to ensure the sorting > >>>>>>> in > >>>>>>> streaming mode also - it would require definition of allowed lateness, > >>>>>>> and triggger (essentially similar to window) > >>>>>> This might be reasonable, implemented by default by buffering > >>>>>> everything and releasing elements as the watermark (+lateness) > >>>>>> advances, but would likely lead to inefficient (though *maybe* easier > >>>>>> to reason about) code. Not sure about the semantics of triggering > >>>>>> here, especially data-driven triggers. Would it be roughly equivalent > >>>>>> to GBK + FlatMap(lambda (key, values): [(key, value) for value in > >>>>>> values])? > >>>>>> > >>>>>> Or is the underlying desire just to be able to hint to the runner that > >>>>>> the code may perform better (e.g. require less resources) as skew is > >>>>>> reduced (and hence to order by timestamp iff it's cheap)? > >>>>>> > >>>>>>> - last option would be to introduce these "higher order > >>>>>>> guarantees" in > >>>>>>> some extension DSL (e.g. Euphoria), but that seems to be the worst > >>>>>>> option to me > >>>>>>> > >>>>>>> I see the first two options quite equally good, although the letter > >>>>>>> one > >>>>>>> is probably more time consuming to implement. But it would bring > >>>>>>> additional feature to streaming case as well. > >>>>>>> > >>>>>>> Thanks for any thoughts. > >>>>>>> > >>>>>>> Jan > >>>>>>> > >>>>>>> On 5/20/19 12:41 PM, Robert Bradshaw wrote: > >>>>>>>> On Fri, May 17, 2019 at 4:48 PM Jan Lukavský <je...@seznam.cz> wrote: > >>>>>>>>> Hi Reuven, > >>>>>>>>> > >>>>>>>>>> How so? AFAIK stateful DoFns work just fine in batch runners. > >>>>>>>>> Stateful ParDo works in batch as far, as the logic inside the state > >>>>>>>>> works for absolutely unbounded out-of-orderness of elements. That > >>>>>>>>> basically (practically) can work only for cases, where the order of > >>>>>>>>> input elements doesn't matter. But, "state" can refer to "state > >>>>>>>>> machine", and any time you have a state machine involved, then the > >>>>>>>>> ordering of elements would matter. > >>>>>>>> No guarantees on order are provided in *either* streaming or batch > >>>>>>>> mode by the model. However, it is the case that in order to make > >>>>>>>> forward progress most streaming runners attempt to limit the amount > >>>>>>>> of > >>>>>>>> out-of-orderedness of elements (in terms of event time vs. processing > >>>>>>>> time) to make forward progress, which in turn could help cap the > >>>>>>>> amount of state that must be held concurrently, whereas a batch > >>>>>>>> runner > >>>>>>>> may not allow any state to be safely discarded until the whole > >>>>>>>> timeline from infinite past to infinite future has been observed. > >>>>>>>> > >>>>>>>> Also, as pointed out, state is not preserved "batch to batch" in > >>>>>>>> batch mode. > >>>>>>>> > >>>>>>>> > >>>>>>>> On Thu, May 16, 2019 at 3:59 PM Maximilian Michels <m...@apache.org> > >>>>>>>> wrote: > >>>>>>>> > >>>>>>>>>> batch semantics and streaming semantics differs only in that I > >>>>>>>>>> can have GlobalWindow with default trigger on batch and cannot on > >>>>>>>>>> stream > >>>>>>>>> You can have a GlobalWindow in streaming with a default trigger. You > >>>>>>>>> could define additional triggers that do early firings. And you > >>>>>>>>> could > >>>>>>>>> even trigger the global window by advancing the watermark to +inf. > >>>>>>>> IIRC, as a pragmatic note, we prohibited global window with default > >>>>>>>> trigger on unbounded PCollections in the SDK because this is more > >>>>>>>> likely to be user error than an actual desire to have no output until > >>>>>>>> drain. But it's semantically valid in the model.