An "as fast as it can runner" with dynamic splits, would ultimately split
to the systems maximum available parallelism (for stateful DoFns, this is
the number of keys; for SplittableDoFns, this is the maximum sharding of
each input element's restriction. That's what would happen with a "normal"
sleep.

WRT Portability, this means adding a current ProcessingTime field to the
ProcessBundleRequest, and likely also to the ProgressRequest so the runner
could coordinate. ProgressResponse may then need a "asleepUntil" field to
communicate back the state of the bundle, which the runner could then use
to better time its next ProgressRequest, and potentially arrest dynamic
splitting for that bundle. After all, the sleeping bundle is blocked until
processing time has advanced anyway; no progress can be made.

I like moving the abstraction out of the timer space, as it better aligns
with user intent for the throttle case, and it doesn't require a Stateful
DoFn to operate (orthogonal!), meaning it's useful for It also solves the
testing issue WRT ProcessingTime timers using an absolute time, rather than
a relative time, as the SDK can rebuild it's relative setters for output
time on the new canonical processing time, without user code changing.

The sleeping inprogress bundle naturally holds back the watermark too.

I suspect this mechanism would end up tending to over throttle as Reuven
described earlier, since the user is only pushing back on immediate
processing for the current element, not necessarily all elements. This is
particularly likely if there's a long gap between ProgressRequests for the
bundle and the runner doesn't adapt it's cadence.

An external source of rate doesn't really exist, other than some external
source that can provide throttle information. There would remain time skew
between the runner system and the external system though, but for a
throttle that's likely fine.

A central notion of ProcessingTime also allows the runner to "smear"
processing time so if there's a particularly long delay, it doesn't need to
catch up at once. I don't think that's relevant for the throttle case
though, since with the described clock mechanism and the communication back
to the runner, the unblocking notion is probably fine.

We'd need a discussion of what an SDK must do if the runner doesn't support
the central clock for completeness, and consistency.


On Tue, Feb 27, 2024, 6:58 AM Jan Lukavský <je...@seznam.cz> wrote:

> On 2/27/24 14:51, Kenneth Knowles wrote:
>
> I very much like the idea of processing time clock as a parameter
> to @ProcessElement. That will be obviously useful and remove a source of
> inconsistency, in addition to letting the runner/SDK harness control it. I
> also like the idea of passing a Sleeper or to @ProcessElement. These are
> both good practices for testing and flexibility and runner/SDK language
> differences.
>
> In your (a) (b) (c) can you be more specific about which watermarks you
> are referring to? Are they the same as in my opening email? If so, then
> what you describe is what we already have.
>
> Yes, we have that for streaming, but it does not work this way in batch.
> In my understanding we violate (a), we ignore (b) because we fire timers at
> GC time only and (c) is currently relevant only immediately preceding
> window GC time, but can be defined more generally. But essentially yes, I
> was just trying to restate the streaming processing time semantics in the
> limited batch case.
>
>
> Kenn
>
> On Tue, Feb 27, 2024 at 2:40 AM Jan Lukavský <je...@seznam.cz> wrote:
>
>> I think that before we introduce a possibly somewhat duplicate new
>> feature we should be certain that it is really semantically different. I'll
>> rephrase the two cases:
>>
>>  a) need to wait and block data (delay) - the use case is the motivating
>> example of Throttle transform
>>
>>  b) act without data, not block
>>
>> Provided we align processing time with local machine clock (or better,
>> because of testing, make current processing time available via context to
>> @ProcessElement) it seems to possble to unify both cases under slightly
>> updated semantics of processing time timer in batch:
>>
>>  a) processing time timers fire with best-effort, i.e. trying to minimize
>> delay between firing timestamp and timer's timestamp
>>  b) timer is valid only in the context of current key-window, once
>> watermark passes window GC time for the particular window that created the
>> timer, it is ignored
>>  c) if timer has output timestamp, this timestamp holds watermark (but
>> this is currently probably noop, because runners currently do no propagate
>> (per-key) watermark in batch, I assume)
>>
>> In case b) there might be needed to distinguish cases when timer has
>> output timestamp, if so, it probably should be taken into account.
>>
>> Now, such semantics should be quite aligned with what we do in streaming
>> case and what users generally expect. The blocking part can be implemented
>> in @ProcessElement using buffer & timer, once there is need to wait, it can
>> be implemented in user code using plain sleep(). That is due to the
>> alignment between local time and definition of processing time. If we had
>> some reason to be able to run faster-than-wall-clock (as I'm still not in
>> favor of that), we could do that using ProcessContext.sleep(). Delaying
>> processing in the @ProcessElement should result in backpressuring and
>> backpropagation of this backpressure from the Throttle transform to the
>> sources as mentioned (of course this is only for the streaming case).
>>
>> Is there anything missing in such definition that would still require
>> splitting the timers into two distinct features?
>>
>>  Jan
>> On 2/26/24 21:22, Kenneth Knowles wrote:
>>
>> Yea I like DelayTimer, or SleepTimer, or WaitTimer or some such.
>>
>> OutputTime is always an event time timestamp so it isn't even allowed to
>> be set outside the window (or you'd end up with an element assigned to a
>> window that it isn't within, since OutputTime essentially represents
>> reserving the right to output an element with that timestamp)
>>
>> Kenn
>>
>> On Mon, Feb 26, 2024 at 3:19 PM Robert Burke <rob...@frantil.com> wrote:
>>
>>> Agreed that a retroactive behavior change would be bad, even if tied to
>>> a beam version change. I agree that it meshes well with the general theme
>>> of State & Timers exposing underlying primitives for implementing Windowing
>>> and similar. I'd say the distinction between the two might be additional
>>> complexity for users to grok, and would need to be documented well, as both
>>> operate in the ProcessingTime domain, but differently.
>>>
>>> What to call this new timer then? DelayTimer?
>>>
>>> "A DelayTimer sets an instant in ProcessingTime at which point
>>> computations can continue. Runners will prevent the EventTimer watermark
>>> from advancing past the set OutputTime until Processing Time has advanced
>>> to at least the provided instant to execute the timers callback. This can
>>> be used to allow the runner to constrain pipeline throughput with user
>>> guidance."
>>>
>>> I'd probably add that a timer with an output time outside of the window
>>> would not be guaranteed to fire, and that OnWindowExpiry is the correct way
>>> to ensure cleanup occurs.
>>>
>>> No solution to the Looping Timers on Drain problem here, but i think
>>> that's ultimately an orthogonal discussion, and will restrain my thoughts
>>> on that for now.
>>>
>>> This isn't a proposal, but exploring the solution space within our
>>> problem. We'd want to break down exactly what different and the same for
>>> the 3 kinds of timers...
>>>
>>>
>>>
>>>
>>> On Mon, Feb 26, 2024, 11:45 AM Kenneth Knowles <k...@apache.org> wrote:
>>>
>>>> Pulling out focus points:
>>>>
>>>> On Fri, Feb 23, 2024 at 7:21 PM Robert Bradshaw via dev <
>>>> dev@beam.apache.org> wrote:
>>>> > I can't act on something yet [...] but I expect to be able to [...]
>>>> at some time in the processing-time future.
>>>>
>>>> I like this as a clear and internally-consistent feature description.
>>>> It describes ProcessContinuation and those timers which serve the same
>>>> purpose as ProcessContinuation.
>>>>
>>>> On Fri, Feb 23, 2024 at 7:21 PM Robert Bradshaw via dev <
>>>> dev@beam.apache.org> wrote:
>>>> > I can't think of a batch or streaming scenario where it would be
>>>> correct to not wait at least that long
>>>>
>>>> The main reason we created timers: to take action in the absence of
>>>> data. The archetypal use case for processing time timers was/is "flush data
>>>> from state if it has been sitting there too long". For this use case, the
>>>> right behavior for batch is to skip the timer. It is actually basically
>>>> incorrect to wait.
>>>>
>>>> On Fri, Feb 23, 2024 at 3:54 PM Robert Burke <lostl...@apache.org>
>>>> wrote:
>>>> > It doesn't require a new primitive.
>>>>
>>>> IMO what's being proposed *is* a new primitive. I think it is a good
>>>> primitive. It is the underlying primitive to ProcessContinuation. It
>>>> would be user-friendly as a kind of timer. But if we made this the behavior
>>>> of processing time timers retroactively, it would break everyone using them
>>>> to flush data who is also reprocessing data.
>>>>
>>>> There's two very different use cases ("I need to wait, and block data"
>>>> vs "I want to act without data, aka NOT wait for data") and I think we
>>>> should serve both of them, but it doesn't have to be with the same
>>>> low-level feature.
>>>>
>>>> Kenn
>>>>
>>>>
>>>> On Fri, Feb 23, 2024 at 7:21 PM Robert Bradshaw via dev <
>>>> dev@beam.apache.org> wrote:
>>>>
>>>>> On Fri, Feb 23, 2024 at 3:54 PM Robert Burke <lostl...@apache.org>
>>>>> wrote:
>>>>> >
>>>>> > While I'm currently on the other side of the fence, I would not be
>>>>> against changing/requiring the semantics of ProcessingTime constructs to 
>>>>> be
>>>>> "must wait and execute" as such a solution, and enables the Proposed
>>>>> "batch" process continuation throttling mechanism to work as hypothesized
>>>>> for both "batch" and "streaming" execution.
>>>>> >
>>>>> > There's a lot to like, as it leans Beam further into the unification
>>>>> of Batch and Stream, with one fewer exception (eg. unifies timer 
>>>>> experience
>>>>> further). It doesn't require a new primitive. It probably matches more 
>>>>> with
>>>>> user expectations anyway.
>>>>> >
>>>>> > It does cause looping timer execution with processing time to be a
>>>>> problem for Drains however.
>>>>>
>>>>> I think we have a problem with looping timers plus drain (a mostly
>>>>> streaming idea anyway) regardless.
>>>>>
>>>>> > I'd argue though that in the case of a drain, we could updated the
>>>>> semantics as "move watermark to infinity"  "existing timers are executed,
>>>>> but new timers are ignored",
>>>>>
>>>>> I don't like the idea of dropping timers for drain. I think correct
>>>>> handling here requires user visibility into whether a pipeline is
>>>>> draining or not.
>>>>>
>>>>> > and ensure/and update the requirements around OnWindowExpiration
>>>>> callbacks to be a bit more insistent on being implemented for correct
>>>>> execution, which is currently the only "hard" signal to the SDK side that
>>>>> the window's work is guaranteed to be over, and remaining state needs to 
>>>>> be
>>>>> addressed by the transform or be garbage collected. This remains critical
>>>>> for developing a good pattern for ProcessingTime timers within a Global
>>>>> Window too.
>>>>>
>>>>> +1
>>>>>
>>>>> >
>>>>> > On 2024/02/23 19:48:22 Robert Bradshaw via dev wrote:
>>>>> > > Thanks for bringing this up.
>>>>> > >
>>>>> > > My position is that both batch and streaming should wait for
>>>>> > > processing time timers, according to local time (with the
>>>>> exception of
>>>>> > > tests that can accelerate this via faked clocks).
>>>>> > >
>>>>> > > Both ProcessContinuations delays and ProcessingTimeTimers are IMHO
>>>>> > > isomorphic, and can be implemented in terms of each other (at
>>>>> least in
>>>>> > > one direction, and likely the other). Both are an indication that I
>>>>> > > can't act on something yet due to external constraints (e.g. not
>>>>> all
>>>>> > > the data has been published, or I lack sufficient capacity/quota to
>>>>> > > push things downstream) but I expect to be able to (or at least
>>>>> would
>>>>> > > like to check again) at some time in the processing-time future. I
>>>>> > > can't think of a batch or streaming scenario where it would be
>>>>> correct
>>>>> > > to not wait at least that long (even in batch inputs, e.g. suppose
>>>>> I'm
>>>>> > > tailing logs and was eagerly started before they were fully
>>>>> written,
>>>>> > > or waiting for some kind of (non-data-dependent) quiessence or
>>>>> other
>>>>> > > operation to finish).
>>>>> > >
>>>>> > >
>>>>> > > On Fri, Feb 23, 2024 at 12:36 AM Jan Lukavský <je...@seznam.cz>
>>>>> wrote:
>>>>> > > >
>>>>> > > > For me it always helps to seek analogy in our physical reality.
>>>>> Stream
>>>>> > > > processing actually has quite a good analogy for both event-time
>>>>> and
>>>>> > > > processing-time - the simplest model for this being relativity
>>>>> theory.
>>>>> > > > Event-time is the time at which events occur _at distant
>>>>> locations_. Due
>>>>> > > > to finite and invariant speed of light (which is actually really
>>>>> > > > involved in the explanation why any stream processing is
>>>>> inevitably
>>>>> > > > unordered) these events are observed (processed) at different
>>>>> times
>>>>> > > > (processing time, different for different observers). It is
>>>>> perfectly
>>>>> > > > possible for an observer to observe events at a rate that is
>>>>> higher than
>>>>> > > > one second per second. This also happens in reality for
>>>>> observers that
>>>>> > > > travel at relativistic speeds (which might be an analogy for
>>>>> fast -
>>>>> > > > batch - (re)processing). Besides the invariant speed, there is
>>>>> also
>>>>> > > > another invariant - local clock (wall time) always ticks exactly
>>>>> at the
>>>>> > > > rate of one second per second, no matter what. It is not
>>>>> possible to
>>>>> > > > "move faster or slower" through (local) time.
>>>>> > > >
>>>>> > > > In my understanding the reason why we do not put any guarantees
>>>>> or
>>>>> > > > bounds on the delay of firing processing time timers is purely
>>>>> technical
>>>>> > > > - the processing is (per key) single-threaded, thus any timer
>>>>> has to
>>>>> > > > wait before any element processing finishes. This is only
>>>>> consequence of
>>>>> > > > a technical solution, not something fundamental.
>>>>> > > >
>>>>> > > > Having said that, my point is that according to the above
>>>>> analogy, it
>>>>> > > > should be perfectly fine to fire processing time timers in batch
>>>>> based
>>>>> > > > on (local wall) time only. There should be no way of
>>>>> manipulating this
>>>>> > > > local time (excluding tests). Watermarks should be affected the
>>>>> same way
>>>>> > > > as any buffering in a state that would happen in a stateful DoFn
>>>>> (i.e.
>>>>> > > > set timer holds output watermark). We should probably pay
>>>>> attention to
>>>>> > > > looping timers, but it seems possible to define a valid stopping
>>>>> > > > condition (input watermark at infinity).
>>>>> > > >
>>>>> > > >   Jan
>>>>> > > >
>>>>> > > > On 2/22/24 19:50, Kenneth Knowles wrote:
>>>>> > > > > Forking this thread.
>>>>> > > > >
>>>>> > > > > The state of processing time timers in this mode of processing
>>>>> is not
>>>>> > > > > satisfactory and is discussed a lot but we should make
>>>>> everything
>>>>> > > > > explicit.
>>>>> > > > >
>>>>> > > > > Currently, a state and timer DoFn has a number of logical
>>>>> watermarks:
>>>>> > > > > (apologies for fixed width not coming through in email lists).
>>>>> Treat
>>>>> > > > > timers as a back edge.
>>>>> > > > >
>>>>> > > > > input --(A)----(C)--> ParDo(DoFn) ----(D)---> output
>>>>> > > > >             ^                      |
>>>>> > > > > |--(B)-----------------|
>>>>> > > > >                            timers
>>>>> > > > >
>>>>> > > > > (A) Input Element watermark: this is the watermark that
>>>>> promises there
>>>>> > > > > is no incoming element with a timestamp earlier than it. Each
>>>>> input
>>>>> > > > > element's timestamp holds this watermark. Note that *event
>>>>> time timers
>>>>> > > > > firing is according to this watermark*. But a runner commits
>>>>> changes
>>>>> > > > > to this watermark *whenever it wants*, in a way that can be
>>>>> > > > > consistent. So the runner can absolute process *all* the
>>>>> elements
>>>>> > > > > before advancing the watermark (A), and only afterwards start
>>>>> firing
>>>>> > > > > timers.
>>>>> > > > >
>>>>> > > > > (B) Timer watermark: this is a watermark that promises no
>>>>> timer is set
>>>>> > > > > with an output timestamp earlier than it. Each timer that has
>>>>> an
>>>>> > > > > output timestamp holds this watermark. Note that timers can
>>>>> set new
>>>>> > > > > timers, indefinitely, so this may never reach infinity even in
>>>>> a drain
>>>>> > > > > scenario.
>>>>> > > > >
>>>>> > > > > (C) (derived) total input watermark: this is a watermark that
>>>>> is the
>>>>> > > > > minimum of the two above, and ensures that all state for the
>>>>> DoFn for
>>>>> > > > > expired windows can be GCd after calling @OnWindowExpiration.
>>>>> > > > >
>>>>> > > > > (D) output watermark: this is a promise that the DoFn will not
>>>>> output
>>>>> > > > > earlier than the watermark. It is held by the total input
>>>>> watermark.
>>>>> > > > >
>>>>> > > > > So a any timer, processing or not, holds the total input
>>>>> watermark
>>>>> > > > > which prevents window GC, hence the timer must be fired. You
>>>>> can set
>>>>> > > > > timers without a timestamp and they will not hold (B) hence
>>>>> not hold
>>>>> > > > > the total input / GC watermark (C). Then if a timer fires for
>>>>> an
>>>>> > > > > expired window, it is ignored. But in general a timer that
>>>>> sets an
>>>>> > > > > output timestamp is saying that it may produce output, so it
>>>>> *must* be
>>>>> > > > > fired, even in batch, for data integrity. There was a time
>>>>> before
>>>>> > > > > timers had output timestamps that we said that you *always*
>>>>> have to
>>>>> > > > > have an @OnWindowExpiration callback for data integrity, and
>>>>> > > > > processing time timers could not hold the watermark. That is
>>>>> changed now.
>>>>> > > > >
>>>>> > > > > One main purpose of processing time timers in streaming is to
>>>>> be a
>>>>> > > > > "timeout" for data buffered in state, to eventually flush. In
>>>>> this
>>>>> > > > > case the output timestamp should be the minimum of the
>>>>> elements in
>>>>> > > > > state (or equivalent). In batch, of course, this kind of timer
>>>>> is not
>>>>> > > > > relevant and we should definitely not wait for it, because the
>>>>> goal is
>>>>> > > > > to just get through all the data. We can justify this by
>>>>> saying that
>>>>> > > > > the worker really has no business having any idea what time it
>>>>> really
>>>>> > > > > is, and the runner can just run the clock at whatever speed it
>>>>> wants.
>>>>> > > > >
>>>>> > > > > Another purpose, brought up on the Throttle thread, is to wait
>>>>> or
>>>>> > > > > backoff. In this case it would be desired for the timer to
>>>>> actually
>>>>> > > > > cause batch processing to pause and wait. This kind of
>>>>> behavior has
>>>>> > > > > not been explored much. Notably the runner can absolutely
>>>>> process all
>>>>> > > > > elements first, then start to fire any enqueued processing time
>>>>> > > > > timers. In the same way that state in batch can just be in
>>>>> memory,
>>>>> > > > > this *could* just be a call to sleep(). It all seems a bit
>>>>> sketchy so
>>>>> > > > > I'd love clearer opinions.
>>>>> > > > >
>>>>> > > > > These two are both operational effects - as you would expect
>>>>> for
>>>>> > > > > processing time timers - and they seem to be in conflict.
>>>>> Maybe they
>>>>> > > > > just need different features?
>>>>> > > > >
>>>>> > > > > I'd love to hear some more uses of processing time timers from
>>>>> the
>>>>> > > > > community.
>>>>> > > > >
>>>>> > > > > Kenn
>>>>> > >
>>>>>
>>>>

Reply via email to