Hi,

In the "classic" Java mode one can set timers which in turn can set
timers which enables to create a timer loop, e.g.:

@ProcessElement
public void processElement(
    ProcessContext context, @TimerId("timer") Timer timer) {
  // Initial timer
  timer.withOutputTimestamp(new Instant(0)).set(context.timestamp());
}

@OnTimer("timer")
public void onTimer(
    OnTimerContext context,
    @TimerId("timer") Timer timer) {
  // Trigger again and start endless loop
  timer.withOutputTimestamp(new Instant(0)).set(context.timestamp());
}


In portability, since we are only guaranteed to receive the timers at
the end of a bundle when we flush the outputs and have closed the
inputs, it looks like this behavior can only be supported by starting a
new bundle and executing the deferred timers. This also means to hold
back the watermark to allow for these loops. Plus, starting a new bundle
comes with some cost.

Another possibility would be to allow a direct feedback loop, i.e. when
the bundles closes and the timers fire, we can still set new timers.

I wonder do we want to allow a timer loop to execute within a bundle?

It would be possible to limit the number of iterations to perform during
one bundle similar to how runners limit the number of elements or the
duration of a bundle.

-Max

Reply via email to