On 2/27/24 19:30, Robert Bradshaw via dev wrote:
On Tue, Feb 27, 2024 at 7:44 AM Robert Burke <rob...@frantil.com> wrote:
An "as fast as it can runner" with dynamic splits, would ultimately split to the systems 
maximum available parallelism (for stateful DoFns, this is the number of keys; for SplittableDoFns, 
this is the maximum sharding of each input element's restriction. That's what would happen with a 
"normal" sleep.

WRT Portability, this means adding a current ProcessingTime field to the 
ProcessBundleRequest, and likely also to the ProgressRequest so the runner could 
coordinate. ProgressResponse may then need a "asleepUntil" field to communicate 
back the state of the bundle, which the runner could then use to better time its next 
ProgressRequest, and potentially arrest dynamic splitting for that bundle. After all, the 
sleeping bundle is blocked until processing time has advanced anyway; no progress can be 
made.

I like moving the abstraction out of the timer space, as it better aligns with 
user intent for the throttle case, and it doesn't require a Stateful DoFn to 
operate (orthogonal!), meaning it's useful for It also solves the testing issue 
WRT ProcessingTime timers using an absolute time, rather than a relative time, 
as the SDK can rebuild it's relative setters for output time on the new 
canonical processing time, without user code changing.

The sleeping inprogress bundle naturally holds back the watermark too.

I suspect this mechanism would end up tending to over throttle as Reuven 
described earlier, since the user is only pushing back on immediate processing 
for the current element, not necessarily all elements. This is particularly 
likely if there's a long gap between ProgressRequests for the bundle and the 
runner doesn't adapt it's cadence.

An external source of rate doesn't really exist, other than some external 
source that can provide throttle information. There would remain time skew 
between the runner system and the external system though, but for a throttle 
that's likely fine.

A central notion of ProcessingTime also allows the runner to "smear" processing 
time so if there's a particularly long delay, it doesn't need to catch up at once. I 
don't think that's relevant for the throttle case though, since with the described clock 
mechanism and the communication back to the runner, the unblocking notion is probably 
fine.
On this note, I have become skeptical that a global throttling rate
can be done well with local information.

For streaming dataflow, we can have an approximate solution by knowing
the number of keys and doing per-key throttling because keys (at least
up to hundreds per worker) are all processed concurrently. This
solution doesn't even require state + timers and would best be done by
standard sleeps.

For most other systems, including dataflow batch, this would massively
under throttle. Here we need to either add something to the model, or
do something outside the model, to discover, dynamically, how many
siblings are being concurrently run. (This could be done at a
worker/process level, rather than bundle level, as well.) The ability
to broadcast, aggregate, and read dynamic, provisional from all
workers could help in other cases too (e.g. a more efficient top N),
but this is a whole new thread...

So while I think the semantics of processing timers in batch is worth
solving, this probably isn't the best application.
Yes, it seems that under the assumption of dynamic parallelism defined by runner defining global throttling rate is not possible under the current model. But maybe (rather than introducing a whole new concept) we could propagate the informatoin about current parallelism from runner to DoFn via ProcessContext? For some runners that would be as easy as returning a constant. Dynamic runners would be more involved, but the only other option than propagaring parallelism from runner to workers seems to be introduction of a whole new worker <-> runner communication channel, so that worker could ask runner for a permission to proceed with processing data based on some (global) condition. It feels somewhat too complex given the motivating example. Maybe there could be others so that this could be generalized to a concept, what comes to mind is something Flink calls "watermark alignment", which throttles sources based on the event-time progress of individual partitions, so that partitions that are too ahead of time do not blow up downstream state. These might be related concepts.

We'd need a discussion of what an SDK must do if the runner doesn't support the 
central clock for completeness, and consistency.


On Tue, Feb 27, 2024, 6:58 AM Jan Lukavský <je...@seznam.cz> wrote:
On 2/27/24 14:51, Kenneth Knowles wrote:

I very much like the idea of processing time clock as a parameter to 
@ProcessElement. That will be obviously useful and remove a source of 
inconsistency, in addition to letting the runner/SDK harness control it. I also 
like the idea of passing a Sleeper or to @ProcessElement. These are both good 
practices for testing and flexibility and runner/SDK language differences.

In your (a) (b) (c) can you be more specific about which watermarks you are 
referring to? Are they the same as in my opening email? If so, then what you 
describe is what we already have.

Yes, we have that for streaming, but it does not work this way in batch. In my 
understanding we violate (a), we ignore (b) because we fire timers at GC time 
only and (c) is currently relevant only immediately preceding window GC time, 
but can be defined more generally. But essentially yes, I was just trying to 
restate the streaming processing time semantics in the limited batch case.


Kenn

On Tue, Feb 27, 2024 at 2:40 AM Jan Lukavský <je...@seznam.cz> wrote:
I think that before we introduce a possibly somewhat duplicate new feature we 
should be certain that it is really semantically different. I'll rephrase the 
two cases:

  a) need to wait and block data (delay) - the use case is the motivating 
example of Throttle transform

  b) act without data, not block

Provided we align processing time with local machine clock (or better, because 
of testing, make current processing time available via context to 
@ProcessElement) it seems to possble to unify both cases under slightly updated 
semantics of processing time timer in batch:

  a) processing time timers fire with best-effort, i.e. trying to minimize 
delay between firing timestamp and timer's timestamp
  b) timer is valid only in the context of current key-window, once watermark 
passes window GC time for the particular window that created the timer, it is 
ignored
  c) if timer has output timestamp, this timestamp holds watermark (but this is 
currently probably noop, because runners currently do no propagate (per-key) 
watermark in batch, I assume)

In case b) there might be needed to distinguish cases when timer has output 
timestamp, if so, it probably should be taken into account.

Now, such semantics should be quite aligned with what we do in streaming case and 
what users generally expect. The blocking part can be implemented in 
@ProcessElement using buffer & timer, once there is need to wait, it can be 
implemented in user code using plain sleep(). That is due to the alignment between 
local time and definition of processing time. If we had some reason to be able to 
run faster-than-wall-clock (as I'm still not in favor of that), we could do that 
using ProcessContext.sleep(). Delaying processing in the @ProcessElement should 
result in backpressuring and backpropagation of this backpressure from the Throttle 
transform to the sources as mentioned (of course this is only for the streaming 
case).

Is there anything missing in such definition that would still require splitting 
the timers into two distinct features?

  Jan

On 2/26/24 21:22, Kenneth Knowles wrote:

Yea I like DelayTimer, or SleepTimer, or WaitTimer or some such.

OutputTime is always an event time timestamp so it isn't even allowed to be set 
outside the window (or you'd end up with an element assigned to a window that 
it isn't within, since OutputTime essentially represents reserving the right to 
output an element with that timestamp)

Kenn

On Mon, Feb 26, 2024 at 3:19 PM Robert Burke <rob...@frantil.com> wrote:
Agreed that a retroactive behavior change would be bad, even if tied to a beam 
version change. I agree that it meshes well with the general theme of State & 
Timers exposing underlying primitives for implementing Windowing and similar. I'd 
say the distinction between the two might be additional complexity for users to 
grok, and would need to be documented well, as both operate in the ProcessingTime 
domain, but differently.

What to call this new timer then? DelayTimer?

"A DelayTimer sets an instant in ProcessingTime at which point computations can 
continue. Runners will prevent the EventTimer watermark from advancing past the set 
OutputTime until Processing Time has advanced to at least the provided instant to execute 
the timers callback. This can be used to allow the runner to constrain pipeline 
throughput with user guidance."

I'd probably add that a timer with an output time outside of the window would 
not be guaranteed to fire, and that OnWindowExpiry is the correct way to ensure 
cleanup occurs.

No solution to the Looping Timers on Drain problem here, but i think that's 
ultimately an orthogonal discussion, and will restrain my thoughts on that for 
now.

This isn't a proposal, but exploring the solution space within our problem. 
We'd want to break down exactly what different and the same for the 3 kinds of 
timers...




On Mon, Feb 26, 2024, 11:45 AM Kenneth Knowles <k...@apache.org> wrote:
Pulling out focus points:

On Fri, Feb 23, 2024 at 7:21 PM Robert Bradshaw via dev <dev@beam.apache.org> 
wrote:
I can't act on something yet [...] but I expect to be able to [...] at some 
time in the processing-time future.
I like this as a clear and internally-consistent feature description. It 
describes ProcessContinuation and those timers which serve the same purpose as 
ProcessContinuation.

On Fri, Feb 23, 2024 at 7:21 PM Robert Bradshaw via dev <dev@beam.apache.org> 
wrote:
I can't think of a batch or streaming scenario where it would be correct to not 
wait at least that long
The main reason we created timers: to take action in the absence of data. The archetypal 
use case for processing time timers was/is "flush data from state if it has been 
sitting there too long". For this use case, the right behavior for batch is to skip 
the timer. It is actually basically incorrect to wait.

On Fri, Feb 23, 2024 at 3:54 PM Robert Burke <lostl...@apache.org> wrote:
It doesn't require a new primitive.
IMO what's being proposed *is* a new primitive. I think it is a good primitive. 
It is the underlying primitive to ProcessContinuation. It would be 
user-friendly as a kind of timer. But if we made this the behavior of 
processing time timers retroactively, it would break everyone using them to 
flush data who is also reprocessing data.

There's two very different use cases ("I need to wait, and block data" vs "I want to 
act without data, aka NOT wait for data") and I think we should serve both of them, but it 
doesn't have to be with the same low-level feature.

Kenn


On Fri, Feb 23, 2024 at 7:21 PM Robert Bradshaw via dev <dev@beam.apache.org> 
wrote:
On Fri, Feb 23, 2024 at 3:54 PM Robert Burke <lostl...@apache.org> wrote:
While I'm currently on the other side of the fence, I would not be against changing/requiring the semantics of 
ProcessingTime constructs to be "must wait and execute" as such a solution, and enables the Proposed 
"batch" process continuation throttling mechanism to work as hypothesized for both "batch" and 
"streaming" execution.

There's a lot to like, as it leans Beam further into the unification of Batch 
and Stream, with one fewer exception (eg. unifies timer experience further). It 
doesn't require a new primitive. It probably matches more with user 
expectations anyway.

It does cause looping timer execution with processing time to be a problem for 
Drains however.
I think we have a problem with looping timers plus drain (a mostly
streaming idea anyway) regardless.

I'd argue though that in the case of a drain, we could updated the semantics as "move 
watermark to infinity"  "existing timers are executed, but new timers are ignored",
I don't like the idea of dropping timers for drain. I think correct
handling here requires user visibility into whether a pipeline is
draining or not.

and ensure/and update the requirements around OnWindowExpiration callbacks to be a bit 
more insistent on being implemented for correct execution, which is currently the only 
"hard" signal to the SDK side that the window's work is guaranteed to be over, 
and remaining state needs to be addressed by the transform or be garbage collected. This 
remains critical for developing a good pattern for ProcessingTime timers within a Global 
Window too.
+1

On 2024/02/23 19:48:22 Robert Bradshaw via dev wrote:
Thanks for bringing this up.

My position is that both batch and streaming should wait for
processing time timers, according to local time (with the exception of
tests that can accelerate this via faked clocks).

Both ProcessContinuations delays and ProcessingTimeTimers are IMHO
isomorphic, and can be implemented in terms of each other (at least in
one direction, and likely the other). Both are an indication that I
can't act on something yet due to external constraints (e.g. not all
the data has been published, or I lack sufficient capacity/quota to
push things downstream) but I expect to be able to (or at least would
like to check again) at some time in the processing-time future. I
can't think of a batch or streaming scenario where it would be correct
to not wait at least that long (even in batch inputs, e.g. suppose I'm
tailing logs and was eagerly started before they were fully written,
or waiting for some kind of (non-data-dependent) quiessence or other
operation to finish).


On Fri, Feb 23, 2024 at 12:36 AM Jan Lukavský <je...@seznam.cz> wrote:
For me it always helps to seek analogy in our physical reality. Stream
processing actually has quite a good analogy for both event-time and
processing-time - the simplest model for this being relativity theory.
Event-time is the time at which events occur _at distant locations_. Due
to finite and invariant speed of light (which is actually really
involved in the explanation why any stream processing is inevitably
unordered) these events are observed (processed) at different times
(processing time, different for different observers). It is perfectly
possible for an observer to observe events at a rate that is higher than
one second per second. This also happens in reality for observers that
travel at relativistic speeds (which might be an analogy for fast -
batch - (re)processing). Besides the invariant speed, there is also
another invariant - local clock (wall time) always ticks exactly at the
rate of one second per second, no matter what. It is not possible to
"move faster or slower" through (local) time.

In my understanding the reason why we do not put any guarantees or
bounds on the delay of firing processing time timers is purely technical
- the processing is (per key) single-threaded, thus any timer has to
wait before any element processing finishes. This is only consequence of
a technical solution, not something fundamental.

Having said that, my point is that according to the above analogy, it
should be perfectly fine to fire processing time timers in batch based
on (local wall) time only. There should be no way of manipulating this
local time (excluding tests). Watermarks should be affected the same way
as any buffering in a state that would happen in a stateful DoFn (i.e.
set timer holds output watermark). We should probably pay attention to
looping timers, but it seems possible to define a valid stopping
condition (input watermark at infinity).

   Jan

On 2/22/24 19:50, Kenneth Knowles wrote:
Forking this thread.

The state of processing time timers in this mode of processing is not
satisfactory and is discussed a lot but we should make everything
explicit.

Currently, a state and timer DoFn has a number of logical watermarks:
(apologies for fixed width not coming through in email lists). Treat
timers as a back edge.

input --(A)----(C)--> ParDo(DoFn) ----(D)---> output
             ^                      |
|--(B)-----------------|
                            timers

(A) Input Element watermark: this is the watermark that promises there
is no incoming element with a timestamp earlier than it. Each input
element's timestamp holds this watermark. Note that *event time timers
firing is according to this watermark*. But a runner commits changes
to this watermark *whenever it wants*, in a way that can be
consistent. So the runner can absolute process *all* the elements
before advancing the watermark (A), and only afterwards start firing
timers.

(B) Timer watermark: this is a watermark that promises no timer is set
with an output timestamp earlier than it. Each timer that has an
output timestamp holds this watermark. Note that timers can set new
timers, indefinitely, so this may never reach infinity even in a drain
scenario.

(C) (derived) total input watermark: this is a watermark that is the
minimum of the two above, and ensures that all state for the DoFn for
expired windows can be GCd after calling @OnWindowExpiration.

(D) output watermark: this is a promise that the DoFn will not output
earlier than the watermark. It is held by the total input watermark.

So a any timer, processing or not, holds the total input watermark
which prevents window GC, hence the timer must be fired. You can set
timers without a timestamp and they will not hold (B) hence not hold
the total input / GC watermark (C). Then if a timer fires for an
expired window, it is ignored. But in general a timer that sets an
output timestamp is saying that it may produce output, so it *must* be
fired, even in batch, for data integrity. There was a time before
timers had output timestamps that we said that you *always* have to
have an @OnWindowExpiration callback for data integrity, and
processing time timers could not hold the watermark. That is changed now.

One main purpose of processing time timers in streaming is to be a
"timeout" for data buffered in state, to eventually flush. In this
case the output timestamp should be the minimum of the elements in
state (or equivalent). In batch, of course, this kind of timer is not
relevant and we should definitely not wait for it, because the goal is
to just get through all the data. We can justify this by saying that
the worker really has no business having any idea what time it really
is, and the runner can just run the clock at whatever speed it wants.

Another purpose, brought up on the Throttle thread, is to wait or
backoff. In this case it would be desired for the timer to actually
cause batch processing to pause and wait. This kind of behavior has
not been explored much. Notably the runner can absolutely process all
elements first, then start to fire any enqueued processing time
timers. In the same way that state in batch can just be in memory,
this *could* just be a call to sleep(). It all seems a bit sketchy so
I'd love clearer opinions.

These two are both operational effects - as you would expect for
processing time timers - and they seem to be in conflict. Maybe they
just need different features?

I'd love to hear some more uses of processing time timers from the
community.

Kenn

Reply via email to