From my understanding Flink rate limits based on local information
only. On the other hand - in case of Flink - this should easily extend
to global information, because the parallelism for both batch and
streaming is set before job is launched and remains unchanged (until
possible manual rescaling). There is a possibility of adaptive
scheduling [1] which would then probably require communication of the
parallelism to workers (I'd guess this is not implemented).
Regarding the other points - I'd be in favor of the following:
a) batch timers - trying to extend the current definition of
processing time timers to batch without introduction new primitive, so
in an extended, backwards compatible way (presumably mostly terminating
condition?)
b) we could define a CombineFn that would accumulate data from workers
and provide accumulated results in defined tumbling windows back to
workers - this could be reused both Throttle, watermark alignment, and
probably others
Best,
Jan
[1]
https://nightlies.apache.org/flink/flink-docs-master/docs/deployment/elastic_scaling/
On 2/28/24 19:37, Robert Burke wrote:
Sounds like a different variation is either new timer types with those
distinctions in mind, or additional configuration for ProcessingTime timers
(defaulting to current behavior) to sort out those cases. Could potentially be
extended to EventTime timers too for explicitly handling looping timer cases
(eg. To signal: This DoFn's OnWindowExpiry method manages the consequences of
this timer's effect of a Drain. Or similar. Or we put that as a additional
configuration for OnWindowExpiry, along with Drain Awareness...)
I got curious and looked loosely at how Flink solves this problem:
https://flink.apache.org/2022/11/25/optimising-the-throughput-of-async-sinks-using-a-custom-ratelimitingstrategy/
In short, an explicit rate limiting strategy. The surface glance indicates that
it relies on local in memory state, but actual use of these things seems
relegated to abstract classes (eg for Sinks and similar). It's not clear to me
whether there is cross worker coordination happening there, or it's assumed to
be all on a single machine anyway. I'm unfamiliar with how Flink operates, so I
can't say.
I think I'd be happiest if we could build into Beam a mechanism / paired
primitive where such a Cross Worker Communication Pair (the processor/server +
DoFn client) could be built, but not purely be limited to Rate
limiting/Throttling. Possibly mumble mumble StatePipe? But that feels like a
harder problem for the time being.
Robert Burke
On 2024/02/28 08:25:35 Jan Lukavský wrote:
On 2/27/24 19:49, Robert Bradshaw via dev wrote:
On Tue, Feb 27, 2024 at 10:39 AM Jan Lukavský <je...@seznam.cz> wrote:
On 2/27/24 19:22, Robert Bradshaw via dev wrote:
On Mon, Feb 26, 2024 at 11:45 AM Kenneth Knowles <k...@apache.org> wrote:
Pulling out focus points:
On Fri, Feb 23, 2024 at 7:21 PM Robert Bradshaw via dev <dev@beam.apache.org>
wrote:
I can't act on something yet [...] but I expect to be able to [...] at some
time in the processing-time future.
I like this as a clear and internally-consistent feature description. It
describes ProcessContinuation and those timers which serve the same purpose as
ProcessContinuation.
On Fri, Feb 23, 2024 at 7:21 PM Robert Bradshaw via dev <dev@beam.apache.org>
wrote:
I can't think of a batch or streaming scenario where it would be correct to not
wait at least that long
The main reason we created timers: to take action in the absence of data. The archetypal
use case for processing time timers was/is "flush data from state if it has been
sitting there too long". For this use case, the right behavior for batch is to skip
the timer. It is actually basically incorrect to wait.
Good point calling out the distinction between "I need to wait in case
there's more data." and "I need to wait for something external." We
can't currently distinguish between the two, but a batch runner can
say something definitive about the first. Feels like we need a new
primitive (or at least new signaling information on our existing
primitive).
Runners signal end of data to a DoFn via (input) watermark. Is there a
need for additional information?
Yes, and I agree that watermarks/event timestamps are a much better
way to track data completeness (if possible).
Unfortunately processing timers don't specify if they're waiting for
additional data or external/environmental change, meaning we can't use
the (event time) watermark to determine whether they're safe to
trigger.
+1