Forking this thread.

The state of processing time timers in this mode of processing is not
satisfactory and is discussed a lot but we should make everything explicit.

Currently, a state and timer DoFn has a number of logical watermarks:
(apologies for fixed width not coming through in email lists). Treat timers
as a back edge.

input --(A)----(C)--> ParDo(DoFn) ----(D)---> output
            ^                      |
            |--(B)-----------------|
                           timers


(A) Input Element watermark: this is the watermark that promises there is
no incoming element with a timestamp earlier than it. Each input element's
timestamp holds this watermark. Note that *event time timers firing is
according to this watermark*. But a runner commits changes to this
watermark *whenever it wants*, in a way that can be consistent. So the
runner can absolute process *all* the elements before advancing the
watermark (A), and only afterwards start firing timers.

(B) Timer watermark: this is a watermark that promises no timer is set with
an output timestamp earlier than it. Each timer that has an output
timestamp holds this watermark. Note that timers can set new timers,
indefinitely, so this may never reach infinity even in a drain scenario.

(C) (derived) total input watermark: this is a watermark that is the
minimum of the two above, and ensures that all state for the DoFn for
expired windows can be GCd after calling @OnWindowExpiration.

(D) output watermark: this is a promise that the DoFn will not output
earlier than the watermark. It is held by the total input watermark.

So a any timer, processing or not, holds the total input watermark which
prevents window GC, hence the timer must be fired. You can set timers
without a timestamp and they will not hold (B) hence not hold the total
input / GC watermark (C). Then if a timer fires for an expired window, it
is ignored. But in general a timer that sets an output timestamp is saying
that it may produce output, so it *must* be fired, even in batch, for data
integrity. There was a time before timers had output timestamps that we
said that you *always* have to have an @OnWindowExpiration callback for
data integrity, and processing time timers could not hold the watermark.
That is changed now.

One main purpose of processing time timers in streaming is to be a
"timeout" for data buffered in state, to eventually flush. In this case the
output timestamp should be the minimum of the elements in state (or
equivalent). In batch, of course, this kind of timer is not relevant and we
should definitely not wait for it, because the goal is to just get through
all the data. We can justify this by saying that the worker really has no
business having any idea what time it really is, and the runner can just
run the clock at whatever speed it wants.

Another purpose, brought up on the Throttle thread, is to wait or backoff.
In this case it would be desired for the timer to actually cause batch
processing to pause and wait. This kind of behavior has not been explored
much. Notably the runner can absolutely process all elements first, then
start to fire any enqueued processing time timers. In the same way that
state in batch can just be in memory, this *could* just be a call to
sleep(). It all seems a bit sketchy so I'd love clearer opinions.

These two are both operational effects - as you would expect for processing
time timers - and they seem to be in conflict. Maybe they just need
different features?

I'd love to hear some more uses of processing time timers from the
community.

Kenn

Reply via email to