Yea I like DelayTimer, or SleepTimer, or WaitTimer or some such.
OutputTime is always an event time timestamp so it isn't even
allowed to be set outside the window (or you'd end up with an
element assigned to a window that it isn't within, since
OutputTime essentially represents reserving the right to output
an element with that timestamp)
Kenn
On Mon, Feb 26, 2024 at 3:19 PM Robert Burke <rob...@frantil.com>
wrote:
Agreed that a retroactive behavior change would be bad, even
if tied to a beam version change. I agree that it meshes well
with the general theme of State & Timers exposing underlying
primitives for implementing Windowing and similar. I'd say
the distinction between the two might be additional
complexity for users to grok, and would need to be documented
well, as both operate in the ProcessingTime domain, but
differently.
What to call this new timer then? DelayTimer?
"A DelayTimer sets an instant in ProcessingTime at which
point computations can continue. Runners will prevent the
EventTimer watermark from advancing past the set OutputTime
until Processing Time has advanced to at least the provided
instant to execute the timers callback. This can be used to
allow the runner to constrain pipeline throughput with user
guidance."
I'd probably add that a timer with an output time outside of
the window would not be guaranteed to fire, and that
OnWindowExpiry is the correct way to ensure cleanup occurs.
No solution to the Looping Timers on Drain problem here, but
i think that's ultimately an orthogonal discussion, and will
restrain my thoughts on that for now.
This isn't a proposal, but exploring the solution space
within our problem. We'd want to break down exactly what
different and the same for the 3 kinds of timers...
On Mon, Feb 26, 2024, 11:45 AM Kenneth Knowles
<k...@apache.org> wrote:
Pulling out focus points:
On Fri, Feb 23, 2024 at 7:21 PM Robert Bradshaw via dev
<dev@beam.apache.org> wrote:
> I can't act on something yet [...] but I expect to be
able to [...] at some time in the processing-time future.
I like this as a clear and internally-consistent feature
description. It describes ProcessContinuation and those
timers which serve the same purpose as ProcessContinuation.
On Fri, Feb 23, 2024 at 7:21 PM Robert Bradshaw via dev
<dev@beam.apache.org> wrote:
> I can't think of a batch or streaming scenario where it
would be correct to not wait at least that long
The main reason we created timers: to take action in the
absence of data. The archetypal use case for processing
time timers was/is "flush data from state if it has been
sitting there too long". For this use case, the right
behavior for batch is to skip the timer. It is actually
basically incorrect to wait.
On Fri, Feb 23, 2024 at 3:54 PM Robert Burke
<lostl...@apache.org> wrote:
> It doesn't require a new primitive.
IMO what's being proposed *is* a new primitive. I think
it is a good primitive. It is the underlying primitive to
ProcessContinuation. It would be user-friendly as a kind
of timer. But if we made this the behavior of processing
time timers retroactively, it would break everyone using
them to flush data who is also reprocessing data.
There's two very different use cases ("I need to wait,
and block data" vs "I want to act without data, aka NOT
wait for data") and I think we should serve both of them,
but it doesn't have to be with the same low-level feature.
Kenn
On Fri, Feb 23, 2024 at 7:21 PM Robert Bradshaw via dev
<dev@beam.apache.org> wrote:
On Fri, Feb 23, 2024 at 3:54 PM Robert Burke
<lostl...@apache.org> wrote:
>
> While I'm currently on the other side of the fence,
I would not be against changing/requiring the
semantics of ProcessingTime constructs to be "must
wait and execute" as such a solution, and enables the
Proposed "batch" process continuation throttling
mechanism to work as hypothesized for both "batch"
and "streaming" execution.
>
> There's a lot to like, as it leans Beam further
into the unification of Batch and Stream, with one
fewer exception (eg. unifies timer experience
further). It doesn't require a new primitive. It
probably matches more with user expectations anyway.
>
> It does cause looping timer execution with
processing time to be a problem for Drains however.
I think we have a problem with looping timers plus
drain (a mostly
streaming idea anyway) regardless.
> I'd argue though that in the case of a drain, we
could updated the semantics as "move watermark to
infinity" "existing timers are executed, but new
timers are ignored",
I don't like the idea of dropping timers for drain. I
think correct
handling here requires user visibility into whether a
pipeline is
draining or not.
> and ensure/and update the requirements around
OnWindowExpiration callbacks to be a bit more
insistent on being implemented for correct execution,
which is currently the only "hard" signal to the SDK
side that the window's work is guaranteed to be over,
and remaining state needs to be addressed by the
transform or be garbage collected. This remains
critical for developing a good pattern for
ProcessingTime timers within a Global Window too.
+1
>
> On 2024/02/23 19:48:22 Robert Bradshaw via dev wrote:
> > Thanks for bringing this up.
> >
> > My position is that both batch and streaming
should wait for
> > processing time timers, according to local time
(with the exception of
> > tests that can accelerate this via faked clocks).
> >
> > Both ProcessContinuations delays and
ProcessingTimeTimers are IMHO
> > isomorphic, and can be implemented in terms of
each other (at least in
> > one direction, and likely the other). Both are an
indication that I
> > can't act on something yet due to external
constraints (e.g. not all
> > the data has been published, or I lack sufficient
capacity/quota to
> > push things downstream) but I expect to be able
to (or at least would
> > like to check again) at some time in the
processing-time future. I
> > can't think of a batch or streaming scenario
where it would be correct
> > to not wait at least that long (even in batch
inputs, e.g. suppose I'm
> > tailing logs and was eagerly started before they
were fully written,
> > or waiting for some kind of (non-data-dependent)
quiessence or other
> > operation to finish).
> >
> >
> > On Fri, Feb 23, 2024 at 12:36 AM Jan Lukavský
<je...@seznam.cz> wrote:
> > >
> > > For me it always helps to seek analogy in our
physical reality. Stream
> > > processing actually has quite a good analogy
for both event-time and
> > > processing-time - the simplest model for this
being relativity theory.
> > > Event-time is the time at which events occur
_at distant locations_. Due
> > > to finite and invariant speed of light (which
is actually really
> > > involved in the explanation why any stream
processing is inevitably
> > > unordered) these events are observed
(processed) at different times
> > > (processing time, different for different
observers). It is perfectly
> > > possible for an observer to observe events at a
rate that is higher than
> > > one second per second. This also happens in
reality for observers that
> > > travel at relativistic speeds (which might be
an analogy for fast -
> > > batch - (re)processing). Besides the invariant
speed, there is also
> > > another invariant - local clock (wall time)
always ticks exactly at the
> > > rate of one second per second, no matter what.
It is not possible to
> > > "move faster or slower" through (local) time.
> > >
> > > In my understanding the reason why we do not
put any guarantees or
> > > bounds on the delay of firing processing time
timers is purely technical
> > > - the processing is (per key) single-threaded,
thus any timer has to
> > > wait before any element processing finishes.
This is only consequence of
> > > a technical solution, not something fundamental.
> > >
> > > Having said that, my point is that according to
the above analogy, it
> > > should be perfectly fine to fire processing
time timers in batch based
> > > on (local wall) time only. There should be no
way of manipulating this
> > > local time (excluding tests). Watermarks should
be affected the same way
> > > as any buffering in a state that would happen
in a stateful DoFn (i.e.
> > > set timer holds output watermark). We should
probably pay attention to
> > > looping timers, but it seems possible to define
a valid stopping
> > > condition (input watermark at infinity).
> > >
> > > Jan
> > >
> > > On 2/22/24 19:50, Kenneth Knowles wrote:
> > > > Forking this thread.
> > > >
> > > > The state of processing time timers in this
mode of processing is not
> > > > satisfactory and is discussed a lot but we
should make everything
> > > > explicit.
> > > >
> > > > Currently, a state and timer DoFn has a
number of logical watermarks:
> > > > (apologies for fixed width not coming through
in email lists). Treat
> > > > timers as a back edge.
> > > >
> > > > input --(A)----(C)--> ParDo(DoFn) ----(D)--->
output
> > > > ^ |
> > > > |--(B)-----------------|
> > > > timers
> > > >
> > > > (A) Input Element watermark: this is the
watermark that promises there
> > > > is no incoming element with a timestamp
earlier than it. Each input
> > > > element's timestamp holds this watermark.
Note that *event time timers
> > > > firing is according to this watermark*. But a
runner commits changes
> > > > to this watermark *whenever it wants*, in a
way that can be
> > > > consistent. So the runner can absolute
process *all* the elements
> > > > before advancing the watermark (A), and only
afterwards start firing
> > > > timers.
> > > >
> > > > (B) Timer watermark: this is a watermark that
promises no timer is set
> > > > with an output timestamp earlier than it.
Each timer that has an
> > > > output timestamp holds this watermark. Note
that timers can set new
> > > > timers, indefinitely, so this may never reach
infinity even in a drain
> > > > scenario.
> > > >
> > > > (C) (derived) total input watermark: this is
a watermark that is the
> > > > minimum of the two above, and ensures that
all state for the DoFn for
> > > > expired windows can be GCd after calling
@OnWindowExpiration.
> > > >
> > > > (D) output watermark: this is a promise that
the DoFn will not output
> > > > earlier than the watermark. It is held by the
total input watermark.
> > > >
> > > > So a any timer, processing or not, holds the
total input watermark
> > > > which prevents window GC, hence the timer
must be fired. You can set
> > > > timers without a timestamp and they will not
hold (B) hence not hold
> > > > the total input / GC watermark (C). Then if a
timer fires for an
> > > > expired window, it is ignored. But in general
a timer that sets an
> > > > output timestamp is saying that it may
produce output, so it *must* be
> > > > fired, even in batch, for data integrity.
There was a time before
> > > > timers had output timestamps that we said
that you *always* have to
> > > > have an @OnWindowExpiration callback for data
integrity, and
> > > > processing time timers could not hold the
watermark. That is changed now.
> > > >
> > > > One main purpose of processing time timers in
streaming is to be a
> > > > "timeout" for data buffered in state, to
eventually flush. In this
> > > > case the output timestamp should be the
minimum of the elements in
> > > > state (or equivalent). In batch, of course,
this kind of timer is not
> > > > relevant and we should definitely not wait
for it, because the goal is
> > > > to just get through all the data. We can
justify this by saying that
> > > > the worker really has no business having any
idea what time it really
> > > > is, and the runner can just run the clock at
whatever speed it wants.
> > > >
> > > > Another purpose, brought up on the Throttle
thread, is to wait or
> > > > backoff. In this case it would be desired for
the timer to actually
> > > > cause batch processing to pause and wait.
This kind of behavior has
> > > > not been explored much. Notably the runner
can absolutely process all
> > > > elements first, then start to fire any
enqueued processing time
> > > > timers. In the same way that state in batch
can just be in memory,
> > > > this *could* just be a call to sleep(). It
all seems a bit sketchy so
> > > > I'd love clearer opinions.
> > > >
> > > > These two are both operational effects - as
you would expect for
> > > > processing time timers - and they seem to be
in conflict. Maybe they
> > > > just need different features?
> > > >
> > > > I'd love to hear some more uses of processing
time timers from the
> > > > community.
> > > >
> > > > Kenn
> >