So Jan's example of state machines is quite a valid use case for ordering.
However in my experience, timestamp ordering is insufficient for state
machines. Elements that cause state transitions might come in with the
exact same timestamp, yet still have a necessary ordering. Especially given
Beam's decision to have milliseconds timestamps this is possible, but even
at microsecond or nanosecond precision this can happen at scale. To handle
state machines you usually need some sort of FIFO ordering along with an
ordered sources, such as Kafka, not timestamp ordering.

Reuven

On Thu, May 23, 2019 at 12:32 AM Jan Lukavský <je...@seznam.cz> wrote:

> Hi all,
>
> thanks everyone for this discussion. I think I have gathered enough
> feedback to be able to put down a proposition for changes, which I will
> do and send to this list for further discussion. There are still doubts
> remaining the non-determinism and it's relation to outputs stability vs.
> latency. But I will try to clarify all this in the design document.
>
> Thanks,
>
>   Jan
>
> On 5/22/19 3:49 PM, Maximilian Michels wrote:
> >> Someone from Flink might correct me if I'm wrong, but that's my
> >> current understanding.
> >
> > In essence your description of how exactly-once works in Flink is
> > correct. The general assumption in Flink is that pipelines must be
> > deterministic and thus produce idempotent writes in the case of
> > failures. However, that doesn't mean Beam sinks can't guarantee a bit
> > more with what Flink has to offer.
> >
> > Luke already mentioned the design discussions for @RequiresStableInput
> > which ensures idempotent writes for non-deterministic pipelines. This
> > is not part of the model but an optional Beam feature.
> >
> > We recently implemented support for @RequiresStableInput in the Flink
> > Runner. Reuven mentioned the Flink checkpoint confirmation, which
> > allows us to buffer (and checkpoint) processed data and only emit it
> > once a Flink checkpoint has completed.
> >
> > Cheers,
> > Max
> >
> > On 21.05.19 16:49, Jan Lukavský wrote:
> >> Hi,
> >>
> >>  > Actually, I think it is a larger (open) question whether exactly
> >> once is guaranteed by the model or whether runners are allowed to
> >> relax that. I would think, however, that sources correctly
> >> implemented should be idempotent when run atop an exactly once
> >> infrastructure such as Flink of Dataflow.
> >>
> >> I would assume, that the model basically inherits guarantees of
> >> underlying infrastructure. Because Flink does not work as you
> >> described (atomic commit of inputs, state and outputs), but rather a
> >> checkpoint mark is flowing through the DAG much like watermark and on
> >> failures operators are restored and data reprocessed, it (IMHO)
> >> implies, that you have exactly once everywhere in the DAG *but*
> >> sinks. That is because sinks cannot be restored to previous state,
> >> instead sinks are supposed to be idempotent in order for the exactly
> >> once to really work (or at least be able to commit outputs on
> >> checkpoint in sink). That implies that if you don't have sink that is
> >> able to commit outputs atomically on checkpoint, the pipeline
> >> execution should be deterministic upon retries, otherwise shadow
> >> writes from failed paths of the pipeline might appear.
> >>
> >> Someone from Flink might correct me if I'm wrong, but that's my
> >> current understanding.
> >>
> >>  > Sounds like we should make this clearer.
> >>
> >> I meant that you are right that we must not in any thoughts we are
> >> having forget that streams are by definition out-of-order. That is
> >> property that we cannot change. But - that doesn't limit us from
> >> creating operator that presents the data to UDF as if the stream was
> >> ideally sorted. It can do that by introducing latency, of course.
> >>
> >> On 5/21/19 4:01 PM, Robert Bradshaw wrote:
> >>> Reza: One could provide something like this as a utility class, but
> >>> one downside is that it is not scale invariant. It requires a tuning
> >>> parameter that, if to small, won't mitigate the problem, but if to
> >>> big, greatly increases latency. (Possibly one could define a dynamic
> >>> session-like window to solve this though...) It also might be harder
> >>> for runners that *can* cheaply present stuff in timestamp order to
> >>> optimize. (That and, in practice, our annotation-style process methods
> >>> don't lend themselves to easy composition.) I think it could work in
> >>> specific cases though.
> >>>
> >>> More inline below.
> >>>
> >>> On Tue, May 21, 2019 at 11:38 AM Jan Lukavský <je...@seznam.cz> wrote:
> >>>> Hi Robert,
> >>>>
> >>>>   > Beam has an exactly-once model. If the data was consumed, state
> >>>> mutated, and outputs written downstream (these three are committed
> >>>> together atomically) it will not be replayed. That does not, of
> >>>> course,
> >>>> solve the non-determanism due to ordering (including the fact that two
> >>>> operations reading the same PCollection may view different ordering).
> >>>>
> >>>> I think what you describe is a property of a runner, not of the model,
> >>>> right? I think if I run my pipeline on Flink I will not get this
> >>>> atomicity, because although Flink uses also exactly-once model if
> >>>> might
> >>>> write outputs multiple times.
> >>> Actually, I think it is a larger (open) question whether exactly once
> >>> is guaranteed by the model or whether runners are allowed to relax
> >>> that. I would think, however, that sources correctly implemented
> >>> should be idempotent when run atop an exactly once infrastructure such
> >>> as Flink of Dataflow.
> >>>
> >>>>   > 1) Is it correct for a (Stateful)DoFn to assume elements are
> >>>> received
> >>>> in a specific order? In the current model, it is not. Being able to
> >>>> read, handle, and produced out-of-order data, including late data,
> >>>> is a
> >>>> pretty fundamental property of distributed systems.
> >>>>
> >>>> Yes, absolutely. The argument here is not that Stateful ParDo should
> >>>> presume to receive elements in any order, but to _present_ it as
> >>>> such to
> >>>> the user @ProcessElement function.
> >>> Sounds like we should make this clearer.
> >>>
> >>>>   > 2) Given that some operations are easier (or possibly only
> >>>> possible)
> >>>> to write when operating on ordered data, and that different runners
> >>>> may
> >>>> have (significantly) cheaper ways to provide this ordering than can be
> >>>> done by the user themselves, should we elevate this to a property of
> >>>> (Stateful?)DoFns that the runner can provide? I think a compelling
> >>>> argument can be made here that we should.
> >>>>
> >>>> +1
> >>>>
> >>>> Jan
> >>>>
> >>>> On 5/21/19 11:07 AM, Robert Bradshaw wrote:
> >>>>> On Mon, May 20, 2019 at 5:24 PM Jan Lukavský <je...@seznam.cz>
> wrote:
> >>>>>>    > I don't see batch vs. streaming as part of the model. One
> >>>>>> can have
> >>>>>> microbatch, or even a runner that alternates between different
> >>>>>> modes.
> >>>>>>
> >>>>>> Although I understand motivation of this statement, this project
> >>>>>> name is
> >>>>>> "Apache Beam: An advanced unified programming model". What does the
> >>>>>> model unify, if "streaming vs. batch" is not part of the model?
> >>>>> What I mean is that streaming vs. batch is no longer part of the
> >>>>> model
> >>>>> (or ideally API), but pushed down to be a concern of the runner
> >>>>> (executor) of the pipeline.
> >>>>>
> >>>>>
> >>>>> On Tue, May 21, 2019 at 10:32 AM Jan Lukavský <je...@seznam.cz>
> >>>>> wrote:
> >>>>>> Hi Kenn,
> >>>>>>
> >>>>>> OK, so if we introduce annotation, we can have stateful ParDo
> >>>>>> with sorting, that would perfectly resolve my issues. I still
> >>>>>> have some doubts, though. Let me explain. The current behavior of
> >>>>>> stateful ParDo has the following properties:
> >>>>>>
> >>>>>>    a) might fail in batch, although runs fine in streaming (that
> >>>>>> is due to the buffering, and unbounded lateness in batch, which
> >>>>>> was discussed back and forth in this thread)
> >>>>>>
> >>>>>>    b) might be non deterministic (this is because the elements
> >>>>>> arrive at somewhat random order, and even if you do the operation
> >>>>>> "assign unique ID to elements" this might produce different
> >>>>>> results when run multiple times)
> >>>>> PCollections are *explicitly* unordered. Any operations that
> >>>>> assume or
> >>>>> depend on a specific ordering for correctness (or determinism) must
> >>>>> provide that ordering themselves (i.e. tolerate "arbitrary shuffling
> >>>>> of inputs"). As you point out, that may be very expensive if you have
> >>>>> very hot keys with very large (unbounded) timestamp skew.
> >>>>>
> >>>>> StatefulDoFns are low-level operations that should be used with care;
> >>>>> the simpler windowing model gives determinism in the face of
> >>>>> unordered
> >>>>> data (though late data and non-end-of-window triggering introduces
> >>>>> some of the non-determanism back in).
> >>>>>
> >>>>>> What worries me most is the property b), because it seems to me
> >>>>>> to have serious consequences - not only that if you run twice
> >>>>>> batch pipeline you would get different results, but even on
> >>>>>> streaming, when pipeline fails and gets restarted from
> >>>>>> checkpoint, produced output might differ from the previous run
> >>>>>> and data from the first run might have already been persisted
> >>>>>> into sink. That would create somewhat messy outputs.
> >>>>> Beam has an exactly-once model. If the data was consumed, state
> >>>>> mutated, and outputs written downstream (these three are committed
> >>>>> together atomically) it will not be replayed. That does not, of
> >>>>> course, solve the non-determanism due to ordering (including the fact
> >>>>> that two operations reading the same PCollection may view different
> >>>>> ordering).
> >>>>>
> >>>>>> These two properties makes me think that the current
> >>>>>> implementation is more of a _special case_ than the general one.
> >>>>>> The general one would be that your state doesn't have the
> >>>>>> properties to be able to tolerate buffering problems and/or
> >>>>>> non-determinism. Which is the case where you need sorting in both
> >>>>>> streaming and batch to be part of the model.
> >>>>>>
> >>>>>> Let me point out one more analogy - that is merging vs.
> >>>>>> non-merging windows. The general case (merging windows) implies
> >>>>>> sorting by timestamp in both batch case (explicit) and streaming
> >>>>>> (buffering). The special case (non-merging windows) doesn't rely
> >>>>>> on any timestamp ordering, so the sorting and buffering can be
> >>>>>> dropped. The underlying root cause of this is the same for both
> >>>>>> stateful ParDo and windowing (essentially, assigning window
> >>>>>> labels is a stateful operation when windowing function is merging).
> >>>>>>
> >>>>>> The reason for the current behavior of stateful ParDo seems to be
> >>>>>> performance, but is it right to abandon correctness in favor of
> >>>>>> performance? Wouldn't it be more consistent to have the default
> >>>>>> behavior prefer correctness and when you have the specific
> >>>>>> conditions of state function having special properties, then you
> >>>>>> can annotate your DoFn (with something like
> >>>>>> @TimeOrderingAgnostic), which would yield a better performance in
> >>>>>> that case?
> >>>>> There are two separable questions here.
> >>>>>
> >>>>> 1) Is it correct for a (Stateful)DoFn to assume elements are received
> >>>>> in a specific order? In the current model, it is not. Being able to
> >>>>> read, handle, and produced out-of-order data, including late data, is
> >>>>> a pretty fundamental property of distributed systems.
> >>>>>
> >>>>> 2) Given that some operations are easier (or possibly only possible)
> >>>>> to write when operating on ordered data, and that different runners
> >>>>> may have (significantly) cheaper ways to provide this ordering than
> >>>>> can be done by the user themselves, should we elevate this to a
> >>>>> property of (Stateful?)DoFns that the runner can provide? I think a
> >>>>> compelling argument can be made here that we should.
> >>>>>
> >>>>> - Robert
> >>>>>
> >>>>>
> >>>>>
> >>>>>> On 5/21/19 1:00 AM, Kenneth Knowles wrote:
> >>>>>>
> >>>>>> Thanks for the nice small example of a calculation that depends
> >>>>>> on order. You are right that many state machines have this
> >>>>>> property. I agree w/ you and Luke that it is convenient for batch
> >>>>>> processing to sort by event timestamp before running a stateful
> >>>>>> ParDo. In streaming you could also implement "sort by event
> >>>>>> timestamp" by buffering until you know all earlier data will be
> >>>>>> dropped - a slack buffer up to allowed lateness.
> >>>>>>
> >>>>>> I do not think that it is OK to sort in batch and not in
> >>>>>> streaming. Many state machines diverge very rapidly when things
> >>>>>> are out of order. So each runner if they see the
> >>>>>> "@OrderByTimestamp" annotation (or whatever) needs to deliver
> >>>>>> sorted data (by some mix of buffering and dropping), or to reject
> >>>>>> the pipeline as unsupported.
> >>>>>>
> >>>>>> And also want to say that this is not the default case - many
> >>>>>> uses of state & timers in ParDo yield different results at the
> >>>>>> element level, but the results are equivalent at in the big
> >>>>>> picture. Such as the example of "assign a unique sequence number
> >>>>>> to each element" or "group into batches" it doesn't matter
> >>>>>> exactly what the result is, only that it meets the spec. And
> >>>>>> other cases like user funnels are monotonic enough that you also
> >>>>>> don't actually need sorting.
> >>>>>>
> >>>>>> Kenn
> >>>>>>
> >>>>>> On Mon, May 20, 2019 at 2:59 PM Jan Lukavský <je...@seznam.cz>
> >>>>>> wrote:
> >>>>>>> Yes, the problem will arise probably mostly when you have not
> >>>>>>> well distributed keys (or too few keys). I'm really not sure if
> >>>>>>> a pure GBK with a trigger can solve this - it might help to have
> >>>>>>> data driven trigger. There would still be some doubts, though.
> >>>>>>> The main question is still here - people say, that sorting by
> >>>>>>> timestamp before stateful ParDo would be prohibitively slow, but
> >>>>>>> I don't really see why - the sorting is very probably already
> >>>>>>> there. And if not (hash grouping instead of sorted grouping),
> >>>>>>> then the sorting would affect only user defined StatefulParDos.
> >>>>>>>
> >>>>>>> This would suggest that the best way out of this would be really
> >>>>>>> to add annotation, so that the author of the pipeline can decide.
> >>>>>>>
> >>>>>>> If that would be acceptable I think I can try to prepare some
> >>>>>>> basic functionality, but I'm not sure, if I would be able to
> >>>>>>> cover all runners / sdks.
> >>>>>>>
> >>>>>>> On 5/20/19 11:36 PM, Lukasz Cwik wrote:
> >>>>>>>
> >>>>>>> It is read all per key and window and not just read all (this
> >>>>>>> still won't scale with hot keys in the global window). The GBK
> >>>>>>> preceding the StatefulParDo will guarantee that you are
> >>>>>>> processing all the values for a specific key and window at any
> >>>>>>> given time. Is there a specific window/trigger that is missing
> >>>>>>> that you feel would remove the need for you to use StatefulParDo?
> >>>>>>>
> >>>>>>> On Mon, May 20, 2019 at 12:54 PM Jan Lukavský <je...@seznam.cz>
> >>>>>>> wrote:
> >>>>>>>> Hi Lukasz,
> >>>>>>>>
> >>>>>>>>> Today, if you must have a strict order, you must guarantee
> >>>>>>>>> that your StatefulParDo implements the necessary "buffering &
> >>>>>>>>> sorting" into state.
> >>>>>>>> Yes, no problem with that. But this whole discussion started,
> >>>>>>>> because *this doesn't work on batch*. You simply cannot first
> >>>>>>>> read everything from distributed storage and then buffer it all
> >>>>>>>> into memory, just to read it again, but sorted. That will not
> >>>>>>>> work. And even if it would, it would be a terrible waste of
> >>>>>>>> resources.
> >>>>>>>>
> >>>>>>>> Jan
> >>>>>>>>
> >>>>>>>> On 5/20/19 8:39 PM, Lukasz Cwik wrote:
> >>>>>>>>
> >>>>>>>>
> >>>>>>>>
> >>>>>>>> On Mon, May 20, 2019 at 8:24 AM Jan Lukavský <je...@seznam.cz>
> >>>>>>>> wrote:
> >>>>>>>>> This discussion brings many really interesting questions for
> >>>>>>>>> me. :-)
> >>>>>>>>>
> >>>>>>>>>    > I don't see batch vs. streaming as part of the model. One
> >>>>>>>>> can have
> >>>>>>>>> microbatch, or even a runner that alternates between different
> >>>>>>>>> modes.
> >>>>>>>>>
> >>>>>>>>> Although I understand motivation of this statement, this
> >>>>>>>>> project name is
> >>>>>>>>> "Apache Beam: An advanced unified programming model". What
> >>>>>>>>> does the
> >>>>>>>>> model unify, if "streaming vs. batch" is not part of the model?
> >>>>>>>>>
> >>>>>>>>> Using microbatching, chaining of batch jobs, or pure streaming
> >>>>>>>>> are
> >>>>>>>>> exactly the "runtime conditions/characteristics" I refer to.
> >>>>>>>>> All these
> >>>>>>>>> define several runtime parameters, which in turn define how
> >>>>>>>>> well/badly
> >>>>>>>>> will the pipeline perform and how many resources might be
> >>>>>>>>> needed. From
> >>>>>>>>> my point of view, pure streaming should be the most resource
> >>>>>>>>> demanding
> >>>>>>>>> (if not, why bother with batch? why not run everything in
> >>>>>>>>> streaming
> >>>>>>>>> only? what will there remain to "unify"?).
> >>>>>>>>>
> >>>>>>>>>    > Fortunately, for batch, only the state for a single key
> >>>>>>>>> needs to be
> >>>>>>>>> preserved at a time, rather than the state for all keys across
> >>>>>>>>> the range
> >>>>>>>>> of skew. Of course if you have few or hot keys, one can still
> >>>>>>>>> have
> >>>>>>>>> issues (and this is not specific to StatefulDoFns).
> >>>>>>>>>
> >>>>>>>>> Yes, but here is still the presumption that my stateful DoFn can
> >>>>>>>>> tolerate arbitrary shuffling of inputs. Let me explain the use
> >>>>>>>>> case in
> >>>>>>>>> more detail.
> >>>>>>>>>
> >>>>>>>>> Suppose you have input stream consisting of 1s and 0s (and
> >>>>>>>>> some key for
> >>>>>>>>> each element, which is irrelevant for the demonstration). Your
> >>>>>>>>> task is
> >>>>>>>>> to calculate in running global window the actual number of
> >>>>>>>>> changes
> >>>>>>>>> between state 0 and state 1 and vice versa. When the state
> >>>>>>>>> doesn't
> >>>>>>>>> change, you don't calculate anything. If input (for given key)
> >>>>>>>>> would be
> >>>>>>>>> (tN denotes timestamp N):
> >>>>>>>>>
> >>>>>>>>>     t1: 1
> >>>>>>>>>
> >>>>>>>>>     t2: 0
> >>>>>>>>>
> >>>>>>>>>     t3: 0
> >>>>>>>>>
> >>>>>>>>>     t4: 1
> >>>>>>>>>
> >>>>>>>>>     t5: 1
> >>>>>>>>>
> >>>>>>>>>     t6: 0
> >>>>>>>>>
> >>>>>>>>> then the output should yield (supposing that default state is
> >>>>>>>>> zero):
> >>>>>>>>>
> >>>>>>>>>     t1: (one: 1, zero: 0)
> >>>>>>>>>
> >>>>>>>>>     t2: (one: 1, zero: 1)
> >>>>>>>>>
> >>>>>>>>>     t3: (one: 1, zero: 1)
> >>>>>>>>>
> >>>>>>>>>     t4: (one: 2, zero: 1)
> >>>>>>>>>
> >>>>>>>>>     t5: (one: 2, zero: 1)
> >>>>>>>>>
> >>>>>>>>>     t6: (one: 2, zero: 2)
> >>>>>>>>>
> >>>>>>>>> How would you implement this in current Beam semantics?
> >>>>>>>> I think your saying here that I know that my input is ordered
> >>>>>>>> in a specific way and since I assume the order when writing my
> >>>>>>>> pipeline I can perform this optimization. But there is nothing
> >>>>>>>> preventing a runner from noticing that your processing in the
> >>>>>>>> global window with a specific type of trigger and re-ordering
> >>>>>>>> your inputs/processing to get better performance (since you
> >>>>>>>> can't use an AfterWatermark trigger for your pipeline in
> >>>>>>>> streaming for the GlobalWindow).
> >>>>>>>>
> >>>>>>>> Today, if you must have a strict order, you must guarantee that
> >>>>>>>> your StatefulParDo implements the necessary "buffering &
> >>>>>>>> sorting" into state. I can see why you would want an annotation
> >>>>>>>> that says I must have timestamp ordered elements, since it
> >>>>>>>> makes writing certain StatefulParDos much easier. StatefulParDo
> >>>>>>>> is a low-level function, it really is the "here you go and do
> >>>>>>>> whatever you need to but here be dragons" function while
> >>>>>>>> windowing and triggering is meant to keep many people from
> >>>>>>>> writing StatefulParDo in the first place.
> >>>>>>>>
> >>>>>>>>>    > Pipelines that fail in the "worst case" batch scenario
> >>>>>>>>> are likely to
> >>>>>>>>> degrade poorly (possibly catastrophically) when the watermark
> >>>>>>>>> falls
> >>>>>>>>> behind in streaming mode as well.
> >>>>>>>>>
> >>>>>>>>> But the worst case is defined by input of size (available
> >>>>>>>>> resources +
> >>>>>>>>> single byte) -> pipeline fail. Although it could have
> >>>>>>>>> finished, given
> >>>>>>>>> the right conditions.
> >>>>>>>>>
> >>>>>>>>>    > This might be reasonable, implemented by default by
> >>>>>>>>> buffering
> >>>>>>>>> everything and releasing elements as the watermark (+lateness)
> >>>>>>>>> advances,
> >>>>>>>>> but would likely lead to inefficient (though *maybe* easier to
> >>>>>>>>> reason
> >>>>>>>>> about) code.
> >>>>>>>>>
> >>>>>>>>> Sure, the pipeline will be less efficient, because it would
> >>>>>>>>> have to
> >>>>>>>>> buffer and sort the inputs. But at least it will produce
> >>>>>>>>> correct results
> >>>>>>>>> in cases where updates to state are order-sensitive.
> >>>>>>>>>
> >>>>>>>>>    > Would it be roughly equivalent to GBK + FlatMap(lambda
> >>>>>>>>> (key, values):
> >>>>>>>>> [(key, value) for value in values])?
> >>>>>>>>>
> >>>>>>>>> I'd say roughly yes, but difference would be in the trigger.
> >>>>>>>>> The trigger
> >>>>>>>>> should ideally fire as soon as watermark (+lateness) crosses
> >>>>>>>>> element
> >>>>>>>>> with lowest timestamp in the buffer. Although this could be
> >>>>>>>>> somehow
> >>>>>>>>> emulated by fixed trigger each X millis.
> >>>>>>>>>
> >>>>>>>>>    > Or is the underlying desire just to be able to hint to
> >>>>>>>>> the runner
> >>>>>>>>> that the code may perform better (e.g. require less resources)
> >>>>>>>>> as skew
> >>>>>>>>> is reduced (and hence to order by timestamp iff it's cheap)?
> >>>>>>>>>
> >>>>>>>>> No, the sorting would have to be done in streaming case as
> >>>>>>>>> well. That is
> >>>>>>>>> an imperative of the unified model. I think it is possible to
> >>>>>>>>> sort by
> >>>>>>>>> timestamp only in batch case (and do it for *all* batch
> >>>>>>>>> stateful pardos
> >>>>>>>>> without annotation), or introduce annotation, but then make
> >>>>>>>>> the same
> >>>>>>>>> guarantees for streaming case as well.
> >>>>>>>>>
> >>>>>>>>> Jan
> >>>>>>>>>
> >>>>>>>>> On 5/20/19 4:41 PM, Robert Bradshaw wrote:
> >>>>>>>>>> On Mon, May 20, 2019 at 1:19 PM Jan Lukavský
> >>>>>>>>>> <je...@seznam.cz> wrote:
> >>>>>>>>>>> Hi Robert,
> >>>>>>>>>>>
> >>>>>>>>>>> yes, I think you rephrased my point - although no *explicit*
> >>>>>>>>>>> guarantees
> >>>>>>>>>>> of ordering are given in either mode, there is *implicit*
> >>>>>>>>>>> ordering in
> >>>>>>>>>>> streaming case that is due to nature of the processing - the
> >>>>>>>>>>> difference
> >>>>>>>>>>> between watermark and timestamp of elements flowing through
> >>>>>>>>>>> the pipeline
> >>>>>>>>>>> are generally low (too high difference leads to the
> >>>>>>>>>>> overbuffering
> >>>>>>>>>>> problem), but there is no such bound on batch.
> >>>>>>>>>> Fortunately, for batch, only the state for a single key needs
> >>>>>>>>>> to be
> >>>>>>>>>> preserved at a time, rather than the state for all keys
> >>>>>>>>>> across the
> >>>>>>>>>> range of skew. Of course if you have few or hot keys, one can
> >>>>>>>>>> still
> >>>>>>>>>> have issues (and this is not specific to StatefulDoFns).
> >>>>>>>>>>
> >>>>>>>>>>> As a result, I see a few possible solutions:
> >>>>>>>>>>>
> >>>>>>>>>>>      - the best and most natural seems to be extension of
> >>>>>>>>>>> the model, so
> >>>>>>>>>>> that it defines batch as not only "streaming pipeline
> >>>>>>>>>>> executed in batch
> >>>>>>>>>>> fashion", but "pipeline with at least as good runtime
> >>>>>>>>>>> characteristics as
> >>>>>>>>>>> in streaming case, executed in batch fashion", I really
> >>>>>>>>>>> don't think that
> >>>>>>>>>>> there are any conflicts with the current model, or that this
> >>>>>>>>>>> could
> >>>>>>>>>>> affect performance, because the required sorting (as pointed by
> >>>>>>>>>>> Aljoscha) is very probably already done during translation
> >>>>>>>>>>> of stateful
> >>>>>>>>>>> pardos. Also note that this definition only affects user
> >>>>>>>>>>> defined
> >>>>>>>>>>> stateful pardos
> >>>>>>>>>> I don't see batch vs. streaming as part of the model. One can
> >>>>>>>>>> have
> >>>>>>>>>> microbatch, or even a runner that alternates between
> >>>>>>>>>> different modes.
> >>>>>>>>>> The model describes what the valid outputs are given a
> >>>>>>>>>> (sometimes
> >>>>>>>>>> partial) set of inputs. It becomes really hard to define
> >>>>>>>>>> things like
> >>>>>>>>>> "as good runtime characteristics." Once you allow any
> >>>>>>>>>> out-of-orderedness, it is not very feasible to try and define
> >>>>>>>>>> (and
> >>>>>>>>>> more cheaply implement) a "upper bound" of acceptable
> >>>>>>>>>> out-of-orderedness.
> >>>>>>>>>>
> >>>>>>>>>> Pipelines that fail in the "worst case" batch scenario are
> >>>>>>>>>> likely to
> >>>>>>>>>> degrade poorly (possibly catastrophically) when the watermark
> >>>>>>>>>> falls
> >>>>>>>>>> behind in streaming mode as well.
> >>>>>>>>>>
> >>>>>>>>>>>      - another option would be to introduce annotation for
> >>>>>>>>>>> DoFns (e.g.
> >>>>>>>>>>> @RequiresStableTimeCharacteristics), which would result in
> >>>>>>>>>>> the sorting
> >>>>>>>>>>> in batch case - but - this extension would have to ensure
> >>>>>>>>>>> the sorting in
> >>>>>>>>>>> streaming mode also - it would require definition of allowed
> >>>>>>>>>>> lateness,
> >>>>>>>>>>> and triggger (essentially similar to window)
> >>>>>>>>>> This might be reasonable, implemented by default by buffering
> >>>>>>>>>> everything and releasing elements as the watermark (+lateness)
> >>>>>>>>>> advances, but would likely lead to inefficient (though
> >>>>>>>>>> *maybe* easier
> >>>>>>>>>> to reason about) code. Not sure about the semantics of
> >>>>>>>>>> triggering
> >>>>>>>>>> here, especially data-driven triggers. Would it be roughly
> >>>>>>>>>> equivalent
> >>>>>>>>>> to GBK + FlatMap(lambda (key, values): [(key, value) for
> >>>>>>>>>> value in
> >>>>>>>>>> values])?
> >>>>>>>>>>
> >>>>>>>>>> Or is the underlying desire just to be able to hint to the
> >>>>>>>>>> runner that
> >>>>>>>>>> the code may perform better (e.g. require less resources) as
> >>>>>>>>>> skew is
> >>>>>>>>>> reduced (and hence to order by timestamp iff it's cheap)?
> >>>>>>>>>>
> >>>>>>>>>>>      - last option would be to introduce these "higher order
> >>>>>>>>>>> guarantees" in
> >>>>>>>>>>> some extension DSL (e.g. Euphoria), but that seems to be the
> >>>>>>>>>>> worst
> >>>>>>>>>>> option to me
> >>>>>>>>>>>
> >>>>>>>>>>> I see the first two options quite equally good, although the
> >>>>>>>>>>> letter one
> >>>>>>>>>>> is probably more time consuming to implement. But it would
> >>>>>>>>>>> bring
> >>>>>>>>>>> additional feature to streaming case as well.
> >>>>>>>>>>>
> >>>>>>>>>>> Thanks for any thoughts.
> >>>>>>>>>>>
> >>>>>>>>>>>      Jan
> >>>>>>>>>>>
> >>>>>>>>>>> On 5/20/19 12:41 PM, Robert Bradshaw wrote:
> >>>>>>>>>>>> On Fri, May 17, 2019 at 4:48 PM Jan Lukavský
> >>>>>>>>>>>> <je...@seznam.cz> wrote:
> >>>>>>>>>>>>> Hi Reuven,
> >>>>>>>>>>>>>
> >>>>>>>>>>>>>> How so? AFAIK stateful DoFns work just fine in batch
> >>>>>>>>>>>>>> runners.
> >>>>>>>>>>>>> Stateful ParDo works in batch as far, as the logic inside
> >>>>>>>>>>>>> the state works for absolutely unbounded out-of-orderness
> >>>>>>>>>>>>> of elements. That basically (practically) can work only
> >>>>>>>>>>>>> for cases, where the order of input elements doesn't
> >>>>>>>>>>>>> matter. But, "state" can refer to "state machine", and any
> >>>>>>>>>>>>> time you have a state machine involved, then the ordering
> >>>>>>>>>>>>> of elements would matter.
> >>>>>>>>>>>> No guarantees on order are provided in *either* streaming
> >>>>>>>>>>>> or batch
> >>>>>>>>>>>> mode by the model. However, it is the case that in order to
> >>>>>>>>>>>> make
> >>>>>>>>>>>> forward progress most streaming runners attempt to limit
> >>>>>>>>>>>> the amount of
> >>>>>>>>>>>> out-of-orderedness of elements (in terms of event time vs.
> >>>>>>>>>>>> processing
> >>>>>>>>>>>> time) to make forward progress, which in turn could help
> >>>>>>>>>>>> cap the
> >>>>>>>>>>>> amount of state that must be held concurrently, whereas a
> >>>>>>>>>>>> batch runner
> >>>>>>>>>>>> may not allow any state to be safely discarded until the whole
> >>>>>>>>>>>> timeline from infinite past to infinite future has been
> >>>>>>>>>>>> observed.
> >>>>>>>>>>>>
> >>>>>>>>>>>> Also, as pointed out, state is not preserved "batch to
> >>>>>>>>>>>> batch" in batch mode.
> >>>>>>>>>>>>
> >>>>>>>>>>>>
> >>>>>>>>>>>> On Thu, May 16, 2019 at 3:59 PM Maximilian Michels
> >>>>>>>>>>>> <m...@apache.org> wrote:
> >>>>>>>>>>>>
> >>>>>>>>>>>>>>      batch semantics and streaming semantics differs only
> >>>>>>>>>>>>>> in that I can have GlobalWindow with default trigger on
> >>>>>>>>>>>>>> batch and cannot on stream
> >>>>>>>>>>>>> You can have a GlobalWindow in streaming with a default
> >>>>>>>>>>>>> trigger. You
> >>>>>>>>>>>>> could define additional triggers that do early firings.
> >>>>>>>>>>>>> And you could
> >>>>>>>>>>>>> even trigger the global window by advancing the watermark
> >>>>>>>>>>>>> to +inf.
> >>>>>>>>>>>> IIRC, as a pragmatic note, we prohibited global window with
> >>>>>>>>>>>> default
> >>>>>>>>>>>> trigger on unbounded PCollections in the SDK because this
> >>>>>>>>>>>> is more
> >>>>>>>>>>>> likely to be user error than an actual desire to have no
> >>>>>>>>>>>> output until
> >>>>>>>>>>>> drain. But it's semantically valid in the model.
>

Reply via email to