Sounds like a different variation is either new timer types with those distinctions in mind, or additional configuration for ProcessingTime timers (defaulting to current behavior) to sort out those cases. Could potentially be extended to EventTime timers too for explicitly handling looping timer cases (eg. To signal: This DoFn's OnWindowExpiry method manages the consequences of this timer's effect of a Drain. Or similar. Or we put that as a additional configuration for OnWindowExpiry, along with Drain Awareness...)
I got curious and looked loosely at how Flink solves this problem: https://flink.apache.org/2022/11/25/optimising-the-throughput-of-async-sinks-using-a-custom-ratelimitingstrategy/ In short, an explicit rate limiting strategy. The surface glance indicates that it relies on local in memory state, but actual use of these things seems relegated to abstract classes (eg for Sinks and similar). It's not clear to me whether there is cross worker coordination happening there, or it's assumed to be all on a single machine anyway. I'm unfamiliar with how Flink operates, so I can't say. I think I'd be happiest if we could build into Beam a mechanism / paired primitive where such a Cross Worker Communication Pair (the processor/server + DoFn client) could be built, but not purely be limited to Rate limiting/Throttling. Possibly mumble mumble StatePipe? But that feels like a harder problem for the time being. Robert Burke On 2024/02/28 08:25:35 Jan Lukavský wrote: > > On 2/27/24 19:49, Robert Bradshaw via dev wrote: > > On Tue, Feb 27, 2024 at 10:39 AM Jan Lukavský <je...@seznam.cz> wrote: > >> On 2/27/24 19:22, Robert Bradshaw via dev wrote: > >>> On Mon, Feb 26, 2024 at 11:45 AM Kenneth Knowles <k...@apache.org> wrote: > >>>> Pulling out focus points: > >>>> > >>>> On Fri, Feb 23, 2024 at 7:21 PM Robert Bradshaw via dev > >>>> <dev@beam.apache.org> wrote: > >>>>> I can't act on something yet [...] but I expect to be able to [...] at > >>>>> some time in the processing-time future. > >>>> I like this as a clear and internally-consistent feature description. It > >>>> describes ProcessContinuation and those timers which serve the same > >>>> purpose as ProcessContinuation. > >>>> > >>>> On Fri, Feb 23, 2024 at 7:21 PM Robert Bradshaw via dev > >>>> <dev@beam.apache.org> wrote: > >>>>> I can't think of a batch or streaming scenario where it would be > >>>>> correct to not wait at least that long > >>>> The main reason we created timers: to take action in the absence of > >>>> data. The archetypal use case for processing time timers was/is "flush > >>>> data from state if it has been sitting there too long". For this use > >>>> case, the right behavior for batch is to skip the timer. It is actually > >>>> basically incorrect to wait. > >>> Good point calling out the distinction between "I need to wait in case > >>> there's more data." and "I need to wait for something external." We > >>> can't currently distinguish between the two, but a batch runner can > >>> say something definitive about the first. Feels like we need a new > >>> primitive (or at least new signaling information on our existing > >>> primitive). > >> Runners signal end of data to a DoFn via (input) watermark. Is there a > >> need for additional information? > > Yes, and I agree that watermarks/event timestamps are a much better > > way to track data completeness (if possible). > > > > Unfortunately processing timers don't specify if they're waiting for > > additional data or external/environmental change, meaning we can't use > > the (event time) watermark to determine whether they're safe to > > trigger. > +1 >