Hi,
In the "classic" Java mode one can set timers which in turn can set
timers which enables to create a timer loop, e.g.:
@ProcessElement
public void processElement(
ProcessContext context, @TimerId("timer") Timer timer) {
// Initial timer
timer.withOutputTimestamp(new Instant(0)).set(context.timestamp());
}
@OnTimer("timer")
public void onTimer(
OnTimerContext context,
@TimerId("timer") Timer timer) {
// Trigger again and start endless loop
timer.withOutputTimestamp(new Instant(0)).set(context.timestamp());
}
In portability, since we are only guaranteed to receive the timers at
the end of a bundle when we flush the outputs and have closed the
inputs, it looks like this behavior can only be supported by starting a
new bundle and executing the deferred timers. This also means to hold
back the watermark to allow for these loops. Plus, starting a new bundle
comes with some cost.
Another possibility would be to allow a direct feedback loop, i.e. when
the bundles closes and the timers fire, we can still set new timers.
I wonder do we want to allow a timer loop to execute within a bundle?
It would be possible to limit the number of iterations to perform during
one bundle similar to how runners limit the number of elements or the
duration of a bundle.
-Max