From my understanding Flink rate limits based on local information only. On the other hand - in case of Flink - this should easily extend to global information, because the parallelism for both batch and streaming is set before job is launched and remains unchanged (until possible manual rescaling). There is a possibility of adaptive scheduling [1] which would then probably require communication of the parallelism to workers (I'd guess this is not implemented).

Regarding the other points - I'd be in favor of the following:

 a) batch timers - trying to extend the current definition of processing time timers to batch without introduction new primitive, so in an extended, backwards compatible way (presumably mostly terminating condition?)

 b) we could define a CombineFn that would accumulate data from workers and provide accumulated results in defined tumbling windows back to workers - this could be reused both Throttle, watermark alignment, and probably others

Best,

 Jan

[1] https://nightlies.apache.org/flink/flink-docs-master/docs/deployment/elastic_scaling/


On 2/28/24 19:37, Robert Burke wrote:
Sounds like a different variation is either new timer types with those 
distinctions in mind, or additional configuration for ProcessingTime timers 
(defaulting to current behavior) to sort out those cases. Could potentially be 
extended to EventTime timers too for explicitly handling looping timer cases 
(eg. To signal: This DoFn's OnWindowExpiry method manages the consequences of 
this timer's effect of a Drain. Or similar. Or we put that as a additional 
configuration for OnWindowExpiry, along with Drain Awareness...)

I got curious and looked loosely at how Flink solves this problem:  
https://flink.apache.org/2022/11/25/optimising-the-throughput-of-async-sinks-using-a-custom-ratelimitingstrategy/

In short, an explicit rate limiting strategy. The surface glance indicates that 
it relies on local in memory state, but actual use of these things seems 
relegated to abstract classes (eg for Sinks and similar). It's not clear to me 
whether there is cross worker coordination happening there, or it's assumed to 
be all on a single machine anyway. I'm unfamiliar with how Flink operates, so I 
can't say.

I think I'd be happiest if we could build into Beam a mechanism / paired 
primitive where such a Cross Worker Communication Pair (the processor/server + 
DoFn client) could be built, but not purely be limited to Rate 
limiting/Throttling. Possibly mumble mumble StatePipe? But that feels like a 
harder problem for the time being.

Robert Burke

On 2024/02/28 08:25:35 Jan Lukavský wrote:
On 2/27/24 19:49, Robert Bradshaw via dev wrote:
On Tue, Feb 27, 2024 at 10:39 AM Jan Lukavský <je...@seznam.cz> wrote:
On 2/27/24 19:22, Robert Bradshaw via dev wrote:
On Mon, Feb 26, 2024 at 11:45 AM Kenneth Knowles <k...@apache.org> wrote:
Pulling out focus points:

On Fri, Feb 23, 2024 at 7:21 PM Robert Bradshaw via dev <dev@beam.apache.org> 
wrote:
I can't act on something yet [...] but I expect to be able to [...] at some 
time in the processing-time future.
I like this as a clear and internally-consistent feature description. It 
describes ProcessContinuation and those timers which serve the same purpose as 
ProcessContinuation.

On Fri, Feb 23, 2024 at 7:21 PM Robert Bradshaw via dev <dev@beam.apache.org> 
wrote:
I can't think of a batch or streaming scenario where it would be correct to not 
wait at least that long
The main reason we created timers: to take action in the absence of data. The archetypal 
use case for processing time timers was/is "flush data from state if it has been 
sitting there too long". For this use case, the right behavior for batch is to skip 
the timer. It is actually basically incorrect to wait.
Good point calling out the distinction between "I need to wait in case
there's more data." and "I need to wait for something external." We
can't currently distinguish between the two, but a batch runner can
say something definitive about the first. Feels like we need a new
primitive (or at least new signaling information on our existing
primitive).
Runners signal end of data to a DoFn via (input) watermark. Is there a
need for additional information?
Yes, and I agree that watermarks/event timestamps are a much better
way to track data completeness (if possible).

Unfortunately processing timers don't specify if they're waiting for
additional data or external/environmental change, meaning we can't use
the (event time) watermark to determine whether they're safe to
trigger.
+1

Reply via email to