I'm not quite familiar with details regarding portability, but in direct runner Kenn suggested, that it should be possible to do the timer firing loop as:

 - extract all timers in bundle

 - put them in PriorityQueue (or similar)

 - extract timer with lowest timer, fire it,  put all updated timers back to the queue

 - repeat for all timers with fire timestamp lower or equal to input watermark

That would work for direct runner, it would be just a little ugly doing so in the current way the code is structured (how timer delegation works and it would mean quite a large refactor to do it 'cleanly'). Maybe simliar approach would be doable in Flink? Related issues are [1], [2] and [3].

[1] https://issues.apache.org/jira/browse/BEAM-8543

[2] https://issues.apache.org/jira/browse/BEAM-8460

[3] https://issues.apache.org/jira/browse/BEAM-8459

Jan

On 4/14/20 10:20 AM, Maximilian Michels wrote:
Hey Jan,

Just saw your message since you posted right before I replied. What you
describe is precisely what I was experiencing. I also solved it the same
way, i.e. pushing back a newly set timer to the next bundle. Note that
there is no other way in portability because we can't fire timers once
we have closed the current bundle; we need to close the bundle to
receive all output which includes timers. It's definitely not efficient
but it appears that this behavior is even desired by some runners, e.g.
Dataflow.

Thanks,
Max

On 13.04.20 18:58, Jan Lukavský wrote:
This is probably related to issue I was having with Direct runner and
timer ordering. The problem is that there might be multiple timers (for
given key) inside bundle and that each timer might set another timer. To
ensure timer ordering, timers must be fired one at a time and when fired
timer sets timer for time preceding current input watermark, the new
timer and all remaining timers are pushed back to next bundle. That was
the simplest yet efficient enough implementation for direct runner (see
[1]), for different runners might exist better alternatives (e.g. what
was discussed in [2]).

Jan

[1]
https://github.com/apache/beam/blob/master/runners/direct-java/src/main/java/org/apache/beam/runners/direct/StatefulParDoEvaluatorFactory.java#L253

[2] https://github.com/apache/beam/pull/9190

On 4/13/20 6:01 PM, Reuven Lax wrote:
I'm not sure I understand the difference - do any "classic" runners
add new timers to the bundle? I know that at least the Dataflow runner
would end up the new timer in a new bundle.

One thing we do need to ensure is that modifications to timers are
reflected in a bundle. So if a bundle contains a processElement and a
processTimer and the processElement modifies the timer, that should be
reflected in timer firing.

On Mon, Apr 13, 2020 at 8:53 AM Luke Cwik <[email protected]
<mailto:[email protected]>> wrote:

     In non portable implementations you would have to wait till the
     element/timer was finished processing before you could process any
     newly created timers. Timers that are created with the same
     key+window+timer family overwrite existing timers that have been
     set which can lead to a timer being overwritten multiple times
     while an element/timer is being processed and you wouldn't want to
     process a newly created timer with the incorrect values or process
     a timer you shouldn't have.

     In portable implementations, you can only safely say that element
     processing is done when the bundle completes. There could be value
     in exposing when an element is done since this could have usage in
     other parts of the system such as when a large GBK value is done.

     On Mon, Apr 13, 2020 at 8:06 AM Maximilian Michels <[email protected]
     <mailto:[email protected]>> wrote:

         Hi,

         In the "classic" Java mode one can set timers which in turn
         can set
         timers which enables to create a timer loop, e.g.:

         @ProcessElement
         public void processElement(
             ProcessContext context, @TimerId("timer") Timer timer) {
           // Initial timer
           timer.withOutputTimestamp(new
         Instant(0)).set(context.timestamp());
         }

         @OnTimer("timer")
         public void onTimer(
             OnTimerContext context,
             @TimerId("timer") Timer timer) {
           // Trigger again and start endless loop
           timer.withOutputTimestamp(new
         Instant(0)).set(context.timestamp());
         }


         In portability, since we are only guaranteed to receive the
         timers at
         the end of a bundle when we flush the outputs and have closed the
         inputs, it looks like this behavior can only be supported by
         starting a
         new bundle and executing the deferred timers. This also means
         to hold
         back the watermark to allow for these loops. Plus, starting a
         new bundle
         comes with some cost.

         Another possibility would be to allow a direct feedback loop,
         i.e. when
         the bundles closes and the timers fire, we can still set new
         timers.

         I wonder do we want to allow a timer loop to execute within a
         bundle?

         It would be possible to limit the number of iterations to
         perform during
         one bundle similar to how runners limit the number of elements
         or the
         duration of a bundle.

         -Max

Reply via email to