Sounds like a different variation is either new timer types with those 
distinctions in mind, or additional configuration for ProcessingTime timers 
(defaulting to current behavior) to sort out those cases. Could potentially be 
extended to EventTime timers too for explicitly handling looping timer cases 
(eg. To signal: This DoFn's OnWindowExpiry method manages the consequences of 
this timer's effect of a Drain. Or similar. Or we put that as a additional 
configuration for OnWindowExpiry, along with Drain Awareness...)

I got curious and looked loosely at how Flink solves this problem:  
https://flink.apache.org/2022/11/25/optimising-the-throughput-of-async-sinks-using-a-custom-ratelimitingstrategy/

In short, an explicit rate limiting strategy. The surface glance indicates that 
it relies on local in memory state, but actual use of these things seems 
relegated to abstract classes (eg for Sinks and similar). It's not clear to me 
whether there is cross worker coordination happening there, or it's assumed to 
be all on a single machine anyway. I'm unfamiliar with how Flink operates, so I 
can't say.

I think I'd be happiest if we could build into Beam a mechanism / paired 
primitive where such a Cross Worker Communication Pair (the processor/server + 
DoFn client) could be built, but not purely be limited to Rate 
limiting/Throttling. Possibly mumble mumble StatePipe? But that feels like a 
harder problem for the time being.

Robert Burke

On 2024/02/28 08:25:35 Jan Lukavský wrote:
> 
> On 2/27/24 19:49, Robert Bradshaw via dev wrote:
> > On Tue, Feb 27, 2024 at 10:39 AM Jan Lukavský <je...@seznam.cz> wrote:
> >> On 2/27/24 19:22, Robert Bradshaw via dev wrote:
> >>> On Mon, Feb 26, 2024 at 11:45 AM Kenneth Knowles <k...@apache.org> wrote:
> >>>> Pulling out focus points:
> >>>>
> >>>> On Fri, Feb 23, 2024 at 7:21 PM Robert Bradshaw via dev 
> >>>> <dev@beam.apache.org> wrote:
> >>>>> I can't act on something yet [...] but I expect to be able to [...] at 
> >>>>> some time in the processing-time future.
> >>>> I like this as a clear and internally-consistent feature description. It 
> >>>> describes ProcessContinuation and those timers which serve the same 
> >>>> purpose as ProcessContinuation.
> >>>>
> >>>> On Fri, Feb 23, 2024 at 7:21 PM Robert Bradshaw via dev 
> >>>> <dev@beam.apache.org> wrote:
> >>>>> I can't think of a batch or streaming scenario where it would be 
> >>>>> correct to not wait at least that long
> >>>> The main reason we created timers: to take action in the absence of 
> >>>> data. The archetypal use case for processing time timers was/is "flush 
> >>>> data from state if it has been sitting there too long". For this use 
> >>>> case, the right behavior for batch is to skip the timer. It is actually 
> >>>> basically incorrect to wait.
> >>> Good point calling out the distinction between "I need to wait in case
> >>> there's more data." and "I need to wait for something external." We
> >>> can't currently distinguish between the two, but a batch runner can
> >>> say something definitive about the first. Feels like we need a new
> >>> primitive (or at least new signaling information on our existing
> >>> primitive).
> >> Runners signal end of data to a DoFn via (input) watermark. Is there a
> >> need for additional information?
> > Yes, and I agree that watermarks/event timestamps are a much better
> > way to track data completeness (if possible).
> >
> > Unfortunately processing timers don't specify if they're waiting for
> > additional data or external/environmental change, meaning we can't use
> > the (event time) watermark to determine whether they're safe to
> > trigger.
> +1
> 

Reply via email to