On Mon, Feb 26, 2024 at 11:45 AM Kenneth Knowles <k...@apache.org> wrote:
>
> Pulling out focus points:
>
> On Fri, Feb 23, 2024 at 7:21 PM Robert Bradshaw via dev <dev@beam.apache.org> 
> wrote:
> > I can't act on something yet [...] but I expect to be able to [...] at some 
> > time in the processing-time future.
>
> I like this as a clear and internally-consistent feature description. It 
> describes ProcessContinuation and those timers which serve the same purpose 
> as ProcessContinuation.
>
> On Fri, Feb 23, 2024 at 7:21 PM Robert Bradshaw via dev <dev@beam.apache.org> 
> wrote:
> > I can't think of a batch or streaming scenario where it would be correct to 
> > not wait at least that long
>
> The main reason we created timers: to take action in the absence of data. The 
> archetypal use case for processing time timers was/is "flush data from state 
> if it has been sitting there too long". For this use case, the right behavior 
> for batch is to skip the timer. It is actually basically incorrect to wait.

Good point calling out the distinction between "I need to wait in case
there's more data." and "I need to wait for something external." We
can't currently distinguish between the two, but a batch runner can
say something definitive about the first. Feels like we need a new
primitive (or at least new signaling information on our existing
primitive).

> On Fri, Feb 23, 2024 at 3:54 PM Robert Burke <lostl...@apache.org> wrote:
> > It doesn't require a new primitive.
>
> IMO what's being proposed *is* a new primitive. I think it is a good 
> primitive. It is the underlying primitive to ProcessContinuation. It would be 
> user-friendly as a kind of timer. But if we made this the behavior of 
> processing time timers retroactively, it would break everyone using them to 
> flush data who is also reprocessing data.
>
> There's two very different use cases ("I need to wait, and block data" vs "I 
> want to act without data, aka NOT wait for data") and I think we should serve 
> both of them, but it doesn't have to be with the same low-level feature.
>
> Kenn
>
>
> On Fri, Feb 23, 2024 at 7:21 PM Robert Bradshaw via dev <dev@beam.apache.org> 
> wrote:
>>
>> On Fri, Feb 23, 2024 at 3:54 PM Robert Burke <lostl...@apache.org> wrote:
>> >
>> > While I'm currently on the other side of the fence, I would not be against 
>> > changing/requiring the semantics of ProcessingTime constructs to be "must 
>> > wait and execute" as such a solution, and enables the Proposed "batch" 
>> > process continuation throttling mechanism to work as hypothesized for both 
>> > "batch" and "streaming" execution.
>> >
>> > There's a lot to like, as it leans Beam further into the unification of 
>> > Batch and Stream, with one fewer exception (eg. unifies timer experience 
>> > further). It doesn't require a new primitive. It probably matches more 
>> > with user expectations anyway.
>> >
>> > It does cause looping timer execution with processing time to be a problem 
>> > for Drains however.
>>
>> I think we have a problem with looping timers plus drain (a mostly
>> streaming idea anyway) regardless.
>>
>> > I'd argue though that in the case of a drain, we could updated the 
>> > semantics as "move watermark to infinity"  "existing timers are executed, 
>> > but new timers are ignored",
>>
>> I don't like the idea of dropping timers for drain. I think correct
>> handling here requires user visibility into whether a pipeline is
>> draining or not.
>>
>> > and ensure/and update the requirements around OnWindowExpiration callbacks 
>> > to be a bit more insistent on being implemented for correct execution, 
>> > which is currently the only "hard" signal to the SDK side that the 
>> > window's work is guaranteed to be over, and remaining state needs to be 
>> > addressed by the transform or be garbage collected. This remains critical 
>> > for developing a good pattern for ProcessingTime timers within a Global 
>> > Window too.
>>
>> +1
>>
>> >
>> > On 2024/02/23 19:48:22 Robert Bradshaw via dev wrote:
>> > > Thanks for bringing this up.
>> > >
>> > > My position is that both batch and streaming should wait for
>> > > processing time timers, according to local time (with the exception of
>> > > tests that can accelerate this via faked clocks).
>> > >
>> > > Both ProcessContinuations delays and ProcessingTimeTimers are IMHO
>> > > isomorphic, and can be implemented in terms of each other (at least in
>> > > one direction, and likely the other). Both are an indication that I
>> > > can't act on something yet due to external constraints (e.g. not all
>> > > the data has been published, or I lack sufficient capacity/quota to
>> > > push things downstream) but I expect to be able to (or at least would
>> > > like to check again) at some time in the processing-time future. I
>> > > can't think of a batch or streaming scenario where it would be correct
>> > > to not wait at least that long (even in batch inputs, e.g. suppose I'm
>> > > tailing logs and was eagerly started before they were fully written,
>> > > or waiting for some kind of (non-data-dependent) quiessence or other
>> > > operation to finish).
>> > >
>> > >
>> > > On Fri, Feb 23, 2024 at 12:36 AM Jan Lukavský <je...@seznam.cz> wrote:
>> > > >
>> > > > For me it always helps to seek analogy in our physical reality. Stream
>> > > > processing actually has quite a good analogy for both event-time and
>> > > > processing-time - the simplest model for this being relativity theory.
>> > > > Event-time is the time at which events occur _at distant locations_. 
>> > > > Due
>> > > > to finite and invariant speed of light (which is actually really
>> > > > involved in the explanation why any stream processing is inevitably
>> > > > unordered) these events are observed (processed) at different times
>> > > > (processing time, different for different observers). It is perfectly
>> > > > possible for an observer to observe events at a rate that is higher 
>> > > > than
>> > > > one second per second. This also happens in reality for observers that
>> > > > travel at relativistic speeds (which might be an analogy for fast -
>> > > > batch - (re)processing). Besides the invariant speed, there is also
>> > > > another invariant - local clock (wall time) always ticks exactly at the
>> > > > rate of one second per second, no matter what. It is not possible to
>> > > > "move faster or slower" through (local) time.
>> > > >
>> > > > In my understanding the reason why we do not put any guarantees or
>> > > > bounds on the delay of firing processing time timers is purely 
>> > > > technical
>> > > > - the processing is (per key) single-threaded, thus any timer has to
>> > > > wait before any element processing finishes. This is only consequence 
>> > > > of
>> > > > a technical solution, not something fundamental.
>> > > >
>> > > > Having said that, my point is that according to the above analogy, it
>> > > > should be perfectly fine to fire processing time timers in batch based
>> > > > on (local wall) time only. There should be no way of manipulating this
>> > > > local time (excluding tests). Watermarks should be affected the same 
>> > > > way
>> > > > as any buffering in a state that would happen in a stateful DoFn (i.e.
>> > > > set timer holds output watermark). We should probably pay attention to
>> > > > looping timers, but it seems possible to define a valid stopping
>> > > > condition (input watermark at infinity).
>> > > >
>> > > >   Jan
>> > > >
>> > > > On 2/22/24 19:50, Kenneth Knowles wrote:
>> > > > > Forking this thread.
>> > > > >
>> > > > > The state of processing time timers in this mode of processing is not
>> > > > > satisfactory and is discussed a lot but we should make everything
>> > > > > explicit.
>> > > > >
>> > > > > Currently, a state and timer DoFn has a number of logical watermarks:
>> > > > > (apologies for fixed width not coming through in email lists). Treat
>> > > > > timers as a back edge.
>> > > > >
>> > > > > input --(A)----(C)--> ParDo(DoFn) ----(D)---> output
>> > > > >             ^                      |
>> > > > > |--(B)-----------------|
>> > > > >                            timers
>> > > > >
>> > > > > (A) Input Element watermark: this is the watermark that promises 
>> > > > > there
>> > > > > is no incoming element with a timestamp earlier than it. Each input
>> > > > > element's timestamp holds this watermark. Note that *event time 
>> > > > > timers
>> > > > > firing is according to this watermark*. But a runner commits changes
>> > > > > to this watermark *whenever it wants*, in a way that can be
>> > > > > consistent. So the runner can absolute process *all* the elements
>> > > > > before advancing the watermark (A), and only afterwards start firing
>> > > > > timers.
>> > > > >
>> > > > > (B) Timer watermark: this is a watermark that promises no timer is 
>> > > > > set
>> > > > > with an output timestamp earlier than it. Each timer that has an
>> > > > > output timestamp holds this watermark. Note that timers can set new
>> > > > > timers, indefinitely, so this may never reach infinity even in a 
>> > > > > drain
>> > > > > scenario.
>> > > > >
>> > > > > (C) (derived) total input watermark: this is a watermark that is the
>> > > > > minimum of the two above, and ensures that all state for the DoFn for
>> > > > > expired windows can be GCd after calling @OnWindowExpiration.
>> > > > >
>> > > > > (D) output watermark: this is a promise that the DoFn will not output
>> > > > > earlier than the watermark. It is held by the total input watermark.
>> > > > >
>> > > > > So a any timer, processing or not, holds the total input watermark
>> > > > > which prevents window GC, hence the timer must be fired. You can set
>> > > > > timers without a timestamp and they will not hold (B) hence not hold
>> > > > > the total input / GC watermark (C). Then if a timer fires for an
>> > > > > expired window, it is ignored. But in general a timer that sets an
>> > > > > output timestamp is saying that it may produce output, so it *must* 
>> > > > > be
>> > > > > fired, even in batch, for data integrity. There was a time before
>> > > > > timers had output timestamps that we said that you *always* have to
>> > > > > have an @OnWindowExpiration callback for data integrity, and
>> > > > > processing time timers could not hold the watermark. That is changed 
>> > > > > now.
>> > > > >
>> > > > > One main purpose of processing time timers in streaming is to be a
>> > > > > "timeout" for data buffered in state, to eventually flush. In this
>> > > > > case the output timestamp should be the minimum of the elements in
>> > > > > state (or equivalent). In batch, of course, this kind of timer is not
>> > > > > relevant and we should definitely not wait for it, because the goal 
>> > > > > is
>> > > > > to just get through all the data. We can justify this by saying that
>> > > > > the worker really has no business having any idea what time it really
>> > > > > is, and the runner can just run the clock at whatever speed it wants.
>> > > > >
>> > > > > Another purpose, brought up on the Throttle thread, is to wait or
>> > > > > backoff. In this case it would be desired for the timer to actually
>> > > > > cause batch processing to pause and wait. This kind of behavior has
>> > > > > not been explored much. Notably the runner can absolutely process all
>> > > > > elements first, then start to fire any enqueued processing time
>> > > > > timers. In the same way that state in batch can just be in memory,
>> > > > > this *could* just be a call to sleep(). It all seems a bit sketchy so
>> > > > > I'd love clearer opinions.
>> > > > >
>> > > > > These two are both operational effects - as you would expect for
>> > > > > processing time timers - and they seem to be in conflict. Maybe they
>> > > > > just need different features?
>> > > > >
>> > > > > I'd love to hear some more uses of processing time timers from the
>> > > > > community.
>> > > > >
>> > > > > Kenn
>> > >

Reply via email to