This is a "timely" discussion because my next step for Prism is to address 
ProcessingTime.

The description of the watermarks matches my understanding and how it's 
implemented so far in Prism [0], where the "stage" contains one or more 
transforms to be executed by a worker.

My current thinking on processing time is in the issue tracker [1], largely 
focused on quite the opposite case than throttleling: for ensuring fast 
execution for pipelines with TestStream. As TestStream is for tests, and tests 
should execute quickly, there's no reason to do anything but synthetically 
advance the processing time. However, this only gates the Runner actions for 
processing time, not the Worker/SDK actions for processing time.

Of note during my explorations there was that there are two places 
ProcessingTime is invoked: a relatively scheduled resume for 
ProcessContinuations, and an absolute time for ProcessingTime timers. It's much 
easier to ignore a relative time, but absolute times are a bit harder, since 
it's never going to based on what the Runner time is, which will be skewed from 
SDK time, since there's no passing of Processing time from Runner to SDK.

I agree that the main purpose of ProcessingTime timers is to timeout state for 
"Streaming" execution, and similarly having OnWindowExpiration for guaranteeing 
any state is addressed for EventTime timer handling within a window. I also 
agree that a "Batch" execution shouldn't wait for ProcessingTime Timers,  but 
should still execute OnWindowExpirations. Notably, the existing behavior of a 
ProcessingTime timer is not to block execution, but to schedule potential 
execution. It would be wrong to block in otherwords.

Similarly, ProcessContinuations only declare a suggested resume time. It's 
still up to the DoFn returning the ProcessContinuation, assuming it's time 
dependant, to actually check the time for it's desired behavior. It's not a 
block, but an indication of when additional work might be available, and that 
it's probably a waste of time for the runner to schedule the work sooner than 
the recommended delay.

What's lacking is a Beam notion of Runner directed cross worker global state I 
think.

I don't know what that looks like exactly though in a way that would useful for 
more than simply a throttle. One could imagine a Special transform that is 
periodically executed on SDK workers in response to something and a Special 
SideInput that is how that information is propagated to other transforms (like 
the throttle transform). But that just sounds like a variant of Slowly Changing 
SideInputs, instead of allowing the Special transform to direct the runner's 
sharding and management of some other transforms. Hard to see how useful that 
is outside of the throttle though.

We could add a Block primitive, that does exactly that. Similar to timers, but 
execution SDK side is held until the Runner sends an Unblock signal for a given 
bundle instruction+blockID combo back to the SDK. But again that seems only 
useful for a central throttleing notion. Technically Google's internal Flume 
batch processor has the notion of a FlumeThrottle to solve exactly this problem.

I'd be happiest if we could figure out a less operationally specific primitive, 
but if not, a token bucket based BeamThrottle would be useful in batch and 
streaming, and shouldn't be too difficult to add to most runners and SDKs 
(though the amount of work will of course vary).

I've gotten away from the core topic. My opinion is "ProcessingTime Timers 
Shouldn't Block Execution" and "We should figure out the best central primitive 
to manage this class of concept".

Robert Burke
Beam Go Busybody

[0] 
https://github.com/apache/beam/blob/11f9bce485c4f6fe466ff4bf5073d2414e43678c/sdks/go/pkg/beam/runners/prism/internal/engine/elementmanager.go#L1253-L1331
[1] https://github.com/apache/beam/issues/30083


On 2024/02/22 18:50:10 Kenneth Knowles wrote:
> Forking this thread.
> 
> The state of processing time timers in this mode of processing is not
> satisfactory and is discussed a lot but we should make everything explicit.
> 
> Currently, a state and timer DoFn has a number of logical watermarks:
> (apologies for fixed width not coming through in email lists). Treat timers
> as a back edge.
> 
> input --(A)----(C)--> ParDo(DoFn) ----(D)---> output
>             ^                      |
>             |--(B)-----------------|
>                            timers
> 
> 
> (A) Input Element watermark: this is the watermark that promises there is
> no incoming element with a timestamp earlier than it. Each input element's
> timestamp holds this watermark. Note that *event time timers firing is
> according to this watermark*. But a runner commits changes to this
> watermark *whenever it wants*, in a way that can be consistent. So the
> runner can absolute process *all* the elements before advancing the
> watermark (A), and only afterwards start firing timers.
> 
> (B) Timer watermark: this is a watermark that promises no timer is set with
> an output timestamp earlier than it. Each timer that has an output
> timestamp holds this watermark. Note that timers can set new timers,
> indefinitely, so this may never reach infinity even in a drain scenario.
> 
> (C) (derived) total input watermark: this is a watermark that is the
> minimum of the two above, and ensures that all state for the DoFn for
> expired windows can be GCd after calling @OnWindowExpiration.
> 
> (D) output watermark: this is a promise that the DoFn will not output
> earlier than the watermark. It is held by the total input watermark.
> 
> So a any timer, processing or not, holds the total input watermark which
> prevents window GC, hence the timer must be fired. You can set timers
> without a timestamp and they will not hold (B) hence not hold the total
> input / GC watermark (C). Then if a timer fires for an expired window, it
> is ignored. But in general a timer that sets an output timestamp is saying
> that it may produce output, so it *must* be fired, even in batch, for data
> integrity. There was a time before timers had output timestamps that we
> said that you *always* have to have an @OnWindowExpiration callback for
> data integrity, and processing time timers could not hold the watermark.
> That is changed now.
> 
> One main purpose of processing time timers in streaming is to be a
> "timeout" for data buffered in state, to eventually flush. In this case the
> output timestamp should be the minimum of the elements in state (or
> equivalent). In batch, of course, this kind of timer is not relevant and we
> should definitely not wait for it, because the goal is to just get through
> all the data. We can justify this by saying that the worker really has no
> business having any idea what time it really is, and the runner can just
> run the clock at whatever speed it wants.
> 
> Another purpose, brought up on the Throttle thread, is to wait or backoff.
> In this case it would be desired for the timer to actually cause batch
> processing to pause and wait. This kind of behavior has not been explored
> much. Notably the runner can absolutely process all elements first, then
> start to fire any enqueued processing time timers. In the same way that
> state in batch can just be in memory, this *could* just be a call to
> sleep(). It all seems a bit sketchy so I'd love clearer opinions.
> 
> These two are both operational effects - as you would expect for processing
> time timers - and they seem to be in conflict. Maybe they just need
> different features?
> 
> I'd love to hear some more uses of processing time timers from the
> community.
> 
> Kenn
> 

Reply via email to