I'm not sure I understand the difference - do any "classic" runners add new timers to the bundle? I know that at least the Dataflow runner would end up the new timer in a new bundle.
One thing we do need to ensure is that modifications to timers are reflected in a bundle. So if a bundle contains a processElement and a processTimer and the processElement modifies the timer, that should be reflected in timer firing. On Mon, Apr 13, 2020 at 8:53 AM Luke Cwik <[email protected]> wrote: > In non portable implementations you would have to wait till the > element/timer was finished processing before you could process any newly > created timers. Timers that are created with the same key+window+timer > family overwrite existing timers that have been set which can lead to a > timer being overwritten multiple times while an element/timer is being > processed and you wouldn't want to process a newly created timer with the > incorrect values or process a timer you shouldn't have. > > In portable implementations, you can only safely say that element > processing is done when the bundle completes. There could be value in > exposing when an element is done since this could have usage in other parts > of the system such as when a large GBK value is done. > > On Mon, Apr 13, 2020 at 8:06 AM Maximilian Michels <[email protected]> wrote: > >> Hi, >> >> In the "classic" Java mode one can set timers which in turn can set >> timers which enables to create a timer loop, e.g.: >> >> @ProcessElement >> public void processElement( >> ProcessContext context, @TimerId("timer") Timer timer) { >> // Initial timer >> timer.withOutputTimestamp(new Instant(0)).set(context.timestamp()); >> } >> >> @OnTimer("timer") >> public void onTimer( >> OnTimerContext context, >> @TimerId("timer") Timer timer) { >> // Trigger again and start endless loop >> timer.withOutputTimestamp(new Instant(0)).set(context.timestamp()); >> } >> >> >> In portability, since we are only guaranteed to receive the timers at >> the end of a bundle when we flush the outputs and have closed the >> inputs, it looks like this behavior can only be supported by starting a >> new bundle and executing the deferred timers. This also means to hold >> back the watermark to allow for these loops. Plus, starting a new bundle >> comes with some cost. >> >> Another possibility would be to allow a direct feedback loop, i.e. when >> the bundles closes and the timers fire, we can still set new timers. >> >> I wonder do we want to allow a timer loop to execute within a bundle? >> >> It would be possible to limit the number of iterations to perform during >> one bundle similar to how runners limit the number of elements or the >> duration of a bundle. >> >> -Max >> >
