This is probably related to issue I was having with Direct runner and
timer ordering. The problem is that there might be multiple timers (for
given key) inside bundle and that each timer might set another timer. To
ensure timer ordering, timers must be fired one at a time and when fired
timer sets timer for time preceding current input watermark, the new
timer and all remaining timers are pushed back to next bundle. That was
the simplest yet efficient enough implementation for direct runner (see
[1]), for different runners might exist better alternatives (e.g. what
was discussed in [2]).
Jan
[1]
https://github.com/apache/beam/blob/master/runners/direct-java/src/main/java/org/apache/beam/runners/direct/StatefulParDoEvaluatorFactory.java#L253
[2] https://github.com/apache/beam/pull/9190
On 4/13/20 6:01 PM, Reuven Lax wrote:
I'm not sure I understand the difference - do any "classic" runners
add new timers to the bundle? I know that at least the Dataflow runner
would end up the new timer in a new bundle.
One thing we do need to ensure is that modifications to timers are
reflected in a bundle. So if a bundle contains a processElement and a
processTimer and the processElement modifies the timer, that should be
reflected in timer firing.
On Mon, Apr 13, 2020 at 8:53 AM Luke Cwik <lc...@google.com
<mailto:lc...@google.com>> wrote:
In non portable implementations you would have to wait till the
element/timer was finished processing before you could process any
newly created timers. Timers that are created with the same
key+window+timer family overwrite existing timers that have been
set which can lead to a timer being overwritten multiple times
while an element/timer is being processed and you wouldn't want to
process a newly created timer with the incorrect values or process
a timer you shouldn't have.
In portable implementations, you can only safely say that element
processing is done when the bundle completes. There could be value
in exposing when an element is done since this could have usage in
other parts of the system such as when a large GBK value is done.
On Mon, Apr 13, 2020 at 8:06 AM Maximilian Michels <m...@apache.org
<mailto:m...@apache.org>> wrote:
Hi,
In the "classic" Java mode one can set timers which in turn
can set
timers which enables to create a timer loop, e.g.:
@ProcessElement
public void processElement(
ProcessContext context, @TimerId("timer") Timer timer) {
// Initial timer
timer.withOutputTimestamp(new
Instant(0)).set(context.timestamp());
}
@OnTimer("timer")
public void onTimer(
OnTimerContext context,
@TimerId("timer") Timer timer) {
// Trigger again and start endless loop
timer.withOutputTimestamp(new
Instant(0)).set(context.timestamp());
}
In portability, since we are only guaranteed to receive the
timers at
the end of a bundle when we flush the outputs and have closed the
inputs, it looks like this behavior can only be supported by
starting a
new bundle and executing the deferred timers. This also means
to hold
back the watermark to allow for these loops. Plus, starting a
new bundle
comes with some cost.
Another possibility would be to allow a direct feedback loop,
i.e. when
the bundles closes and the timers fire, we can still set new
timers.
I wonder do we want to allow a timer loop to execute within a
bundle?
It would be possible to limit the number of iterations to
perform during
one bundle similar to how runners limit the number of elements
or the
duration of a bundle.
-Max