I'm not sure I understand the difference - do any "classic" runners add new
timers to the bundle? I know that at least the Dataflow runner would end up
the new timer in a new bundle.

One thing we do need to ensure is that modifications to timers are
reflected in a bundle. So if a bundle contains a processElement and a
processTimer and the processElement modifies the timer, that should be
reflected in timer firing.

On Mon, Apr 13, 2020 at 8:53 AM Luke Cwik <[email protected]> wrote:

> In non portable implementations you would have to wait till the
> element/timer was finished processing before you could process any newly
> created timers. Timers that are created with the same key+window+timer
> family overwrite existing timers that have been set which can lead to a
> timer being overwritten multiple times while an element/timer is being
> processed and you wouldn't want to process a newly created timer with the
> incorrect values or process a timer you shouldn't have.
>
> In portable implementations, you can only safely say that element
> processing is done when the bundle completes. There could be value in
> exposing when an element is done since this could have usage in other parts
> of the system such as when a large GBK value is done.
>
> On Mon, Apr 13, 2020 at 8:06 AM Maximilian Michels <[email protected]> wrote:
>
>> Hi,
>>
>> In the "classic" Java mode one can set timers which in turn can set
>> timers which enables to create a timer loop, e.g.:
>>
>> @ProcessElement
>> public void processElement(
>>     ProcessContext context, @TimerId("timer") Timer timer) {
>>   // Initial timer
>>   timer.withOutputTimestamp(new Instant(0)).set(context.timestamp());
>> }
>>
>> @OnTimer("timer")
>> public void onTimer(
>>     OnTimerContext context,
>>     @TimerId("timer") Timer timer) {
>>   // Trigger again and start endless loop
>>   timer.withOutputTimestamp(new Instant(0)).set(context.timestamp());
>> }
>>
>>
>> In portability, since we are only guaranteed to receive the timers at
>> the end of a bundle when we flush the outputs and have closed the
>> inputs, it looks like this behavior can only be supported by starting a
>> new bundle and executing the deferred timers. This also means to hold
>> back the watermark to allow for these loops. Plus, starting a new bundle
>> comes with some cost.
>>
>> Another possibility would be to allow a direct feedback loop, i.e. when
>> the bundles closes and the timers fire, we can still set new timers.
>>
>> I wonder do we want to allow a timer loop to execute within a bundle?
>>
>> It would be possible to limit the number of iterations to perform during
>> one bundle similar to how runners limit the number of elements or the
>> duration of a bundle.
>>
>> -Max
>>
>

Reply via email to