Yea I like DelayTimer, or SleepTimer, or WaitTimer or some
such.
OutputTime is always an event time timestamp so it isn't
even allowed to be set outside the window (or you'd end up
with an element assigned to a window that it isn't within,
since OutputTime essentially represents reserving the right
to output an element with that timestamp)
Kenn
On Mon, Feb 26, 2024 at 3:19 PM Robert Burke
<rob...@frantil.com> wrote:
Agreed that a retroactive behavior change would be bad,
even if tied to a beam version change. I agree that it
meshes well with the general theme of State & Timers
exposing underlying primitives for implementing
Windowing and similar. I'd say the distinction between
the two might be additional complexity for users to
grok, and would need to be documented well, as both
operate in the ProcessingTime domain, but differently.
What to call this new timer then? DelayTimer?
"A DelayTimer sets an instant in ProcessingTime at which
point computations can continue. Runners will prevent
the EventTimer watermark from advancing past the set
OutputTime until Processing Time has advanced to at
least the provided instant to execute the timers
callback. This can be used to allow the runner to
constrain pipeline throughput with user guidance."
I'd probably add that a timer with an output time
outside of the window would not be guaranteed to fire,
and that OnWindowExpiry is the correct way to ensure
cleanup occurs.
No solution to the Looping Timers on Drain problem here,
but i think that's ultimately an orthogonal discussion,
and will restrain my thoughts on that for now.
This isn't a proposal, but exploring the solution space
within our problem. We'd want to break down exactly what
different and the same for the 3 kinds of timers...
On Mon, Feb 26, 2024, 11:45 AM Kenneth Knowles
<k...@apache.org> wrote:
Pulling out focus points:
On Fri, Feb 23, 2024 at 7:21 PM Robert Bradshaw via
dev <dev@beam.apache.org> wrote:
> I can't act on something yet [...] but I expect to
be able to [...] at some time in the processing-time
future.
I like this as a clear and internally-consistent
feature description. It describes
ProcessContinuation and those timers which serve the
same purpose as ProcessContinuation.
On Fri, Feb 23, 2024 at 7:21 PM Robert Bradshaw via
dev <dev@beam.apache.org> wrote:
> I can't think of a batch or streaming scenario
where it would be correct to not wait at least that long
The main reason we created timers: to take action in
the absence of data. The archetypal use case for
processing time timers was/is "flush data from state
if it has been sitting there too long". For this use
case, the right behavior for batch is to skip the
timer. It is actually basically incorrect to wait.
On Fri, Feb 23, 2024 at 3:54 PM Robert Burke
<lostl...@apache.org> wrote:
> It doesn't require a new primitive.
IMO what's being proposed *is* a new primitive. I
think it is a good primitive. It is the underlying
primitive to ProcessContinuation. It would be
user-friendly as a kind of timer. But if we made
this the behavior of processing time timers
retroactively, it would break everyone using them to
flush data who is also reprocessing data.
There's two very different use cases ("I need to
wait, and block data" vs "I want to act without
data, aka NOT wait for data") and I think we should
serve both of them, but it doesn't have to be with
the same low-level feature.
Kenn
On Fri, Feb 23, 2024 at 7:21 PM Robert Bradshaw via
dev <dev@beam.apache.org> wrote:
On Fri, Feb 23, 2024 at 3:54 PM Robert Burke
<lostl...@apache.org> wrote:
>
> While I'm currently on the other side of the
fence, I would not be against changing/requiring
the semantics of ProcessingTime constructs to be
"must wait and execute" as such a solution, and
enables the Proposed "batch" process
continuation throttling mechanism to work as
hypothesized for both "batch" and "streaming"
execution.
>
> There's a lot to like, as it leans Beam
further into the unification of Batch and
Stream, with one fewer exception (eg. unifies
timer experience further). It doesn't require a
new primitive. It probably matches more with
user expectations anyway.
>
> It does cause looping timer execution with
processing time to be a problem for Drains however.
I think we have a problem with looping timers
plus drain (a mostly
streaming idea anyway) regardless.
> I'd argue though that in the case of a drain,
we could updated the semantics as "move
watermark to infinity" "existing timers are
executed, but new timers are ignored",
I don't like the idea of dropping timers for
drain. I think correct
handling here requires user visibility into
whether a pipeline is
draining or not.
> and ensure/and update the requirements around
OnWindowExpiration callbacks to be a bit more
insistent on being implemented for correct
execution, which is currently the only "hard"
signal to the SDK side that the window's work is
guaranteed to be over, and remaining state needs
to be addressed by the transform or be garbage
collected. This remains critical for developing
a good pattern for ProcessingTime timers within
a Global Window too.
+1
>
> On 2024/02/23 19:48:22 Robert Bradshaw via dev
wrote:
> > Thanks for bringing this up.
> >
> > My position is that both batch and streaming
should wait for
> > processing time timers, according to local
time (with the exception of
> > tests that can accelerate this via faked
clocks).
> >
> > Both ProcessContinuations delays and
ProcessingTimeTimers are IMHO
> > isomorphic, and can be implemented in terms
of each other (at least in
> > one direction, and likely the other). Both
are an indication that I
> > can't act on something yet due to external
constraints (e.g. not all
> > the data has been published, or I lack
sufficient capacity/quota to
> > push things downstream) but I expect to be
able to (or at least would
> > like to check again) at some time in the
processing-time future. I
> > can't think of a batch or streaming scenario
where it would be correct
> > to not wait at least that long (even in
batch inputs, e.g. suppose I'm
> > tailing logs and was eagerly started before
they were fully written,
> > or waiting for some kind of
(non-data-dependent) quiessence or other
> > operation to finish).
> >
> >
> > On Fri, Feb 23, 2024 at 12:36 AM Jan
Lukavský <je...@seznam.cz> wrote:
> > >
> > > For me it always helps to seek analogy in
our physical reality. Stream
> > > processing actually has quite a good
analogy for both event-time and
> > > processing-time - the simplest model for
this being relativity theory.
> > > Event-time is the time at which events
occur _at distant locations_. Due
> > > to finite and invariant speed of light
(which is actually really
> > > involved in the explanation why any stream
processing is inevitably
> > > unordered) these events are observed
(processed) at different times
> > > (processing time, different for different
observers). It is perfectly
> > > possible for an observer to observe events
at a rate that is higher than
> > > one second per second. This also happens
in reality for observers that
> > > travel at relativistic speeds (which might
be an analogy for fast -
> > > batch - (re)processing). Besides the
invariant speed, there is also
> > > another invariant - local clock (wall
time) always ticks exactly at the
> > > rate of one second per second, no matter
what. It is not possible to
> > > "move faster or slower" through (local) time.
> > >
> > > In my understanding the reason why we do
not put any guarantees or
> > > bounds on the delay of firing processing
time timers is purely technical
> > > - the processing is (per key)
single-threaded, thus any timer has to
> > > wait before any element processing
finishes. This is only consequence of
> > > a technical solution, not something
fundamental.
> > >
> > > Having said that, my point is that
according to the above analogy, it
> > > should be perfectly fine to fire
processing time timers in batch based
> > > on (local wall) time only. There should be
no way of manipulating this
> > > local time (excluding tests). Watermarks
should be affected the same way
> > > as any buffering in a state that would
happen in a stateful DoFn (i.e.
> > > set timer holds output watermark). We
should probably pay attention to
> > > looping timers, but it seems possible to
define a valid stopping
> > > condition (input watermark at infinity).
> > >
> > > Jan
> > >
> > > On 2/22/24 19:50, Kenneth Knowles wrote:
> > > > Forking this thread.
> > > >
> > > > The state of processing time timers in
this mode of processing is not
> > > > satisfactory and is discussed a lot but
we should make everything
> > > > explicit.
> > > >
> > > > Currently, a state and timer DoFn has a
number of logical watermarks:
> > > > (apologies for fixed width not coming
through in email lists). Treat
> > > > timers as a back edge.
> > > >
> > > > input --(A)----(C)--> ParDo(DoFn)
----(D)---> output
> > > > ^ |
> > > > |--(B)-----------------|
> > > > timers
> > > >
> > > > (A) Input Element watermark: this is the
watermark that promises there
> > > > is no incoming element with a timestamp
earlier than it. Each input
> > > > element's timestamp holds this
watermark. Note that *event time timers
> > > > firing is according to this watermark*.
But a runner commits changes
> > > > to this watermark *whenever it wants*,
in a way that can be
> > > > consistent. So the runner can absolute
process *all* the elements
> > > > before advancing the watermark (A), and
only afterwards start firing
> > > > timers.
> > > >
> > > > (B) Timer watermark: this is a watermark
that promises no timer is set
> > > > with an output timestamp earlier than
it. Each timer that has an
> > > > output timestamp holds this watermark.
Note that timers can set new
> > > > timers, indefinitely, so this may never
reach infinity even in a drain
> > > > scenario.
> > > >
> > > > (C) (derived) total input watermark:
this is a watermark that is the
> > > > minimum of the two above, and ensures
that all state for the DoFn for
> > > > expired windows can be GCd after calling
@OnWindowExpiration.
> > > >
> > > > (D) output watermark: this is a promise
that the DoFn will not output
> > > > earlier than the watermark. It is held
by the total input watermark.
> > > >
> > > > So a any timer, processing or not, holds
the total input watermark
> > > > which prevents window GC, hence the
timer must be fired. You can set
> > > > timers without a timestamp and they will
not hold (B) hence not hold
> > > > the total input / GC watermark (C). Then
if a timer fires for an
> > > > expired window, it is ignored. But in
general a timer that sets an
> > > > output timestamp is saying that it may
produce output, so it *must* be
> > > > fired, even in batch, for data
integrity. There was a time before
> > > > timers had output timestamps that we
said that you *always* have to
> > > > have an @OnWindowExpiration callback for
data integrity, and
> > > > processing time timers could not hold
the watermark. That is changed now.
> > > >
> > > > One main purpose of processing time
timers in streaming is to be a
> > > > "timeout" for data buffered in state, to
eventually flush. In this
> > > > case the output timestamp should be the
minimum of the elements in
> > > > state (or equivalent). In batch, of
course, this kind of timer is not
> > > > relevant and we should definitely not
wait for it, because the goal is
> > > > to just get through all the data. We can
justify this by saying that
> > > > the worker really has no business having
any idea what time it really
> > > > is, and the runner can just run the
clock at whatever speed it wants.
> > > >
> > > > Another purpose, brought up on the
Throttle thread, is to wait or
> > > > backoff. In this case it would be
desired for the timer to actually
> > > > cause batch processing to pause and
wait. This kind of behavior has
> > > > not been explored much. Notably the
runner can absolutely process all
> > > > elements first, then start to fire any
enqueued processing time
> > > > timers. In the same way that state in
batch can just be in memory,
> > > > this *could* just be a call to sleep().
It all seems a bit sketchy so
> > > > I'd love clearer opinions.
> > > >
> > > > These two are both operational effects -
as you would expect for
> > > > processing time timers - and they seem
to be in conflict. Maybe they
> > > > just need different features?
> > > >
> > > > I'd love to hear some more uses of
processing time timers from the
> > > > community.
> > > >
> > > > Kenn
> >