This is probably related to issue I was having with Direct runner and timer ordering. The problem is that there might be multiple timers (for given key) inside bundle and that each timer might set another timer. To ensure timer ordering, timers must be fired one at a time and when fired timer sets timer for time preceding current input watermark, the new timer and all remaining timers are pushed back to next bundle. That was the simplest yet efficient enough implementation for direct runner (see [1]), for different runners might exist better alternatives (e.g. what was discussed in [2]).

Jan

[1] https://github.com/apache/beam/blob/master/runners/direct-java/src/main/java/org/apache/beam/runners/direct/StatefulParDoEvaluatorFactory.java#L253

[2] https://github.com/apache/beam/pull/9190

On 4/13/20 6:01 PM, Reuven Lax wrote:
I'm not sure I understand the difference - do any "classic" runners add new timers to the bundle? I know that at least the Dataflow runner would end up the new timer in a new bundle.

One thing we do need to ensure is that modifications to timers are reflected in a bundle. So if a bundle contains a processElement and a processTimer and the processElement modifies the timer, that should be reflected in timer firing.

On Mon, Apr 13, 2020 at 8:53 AM Luke Cwik <lc...@google.com <mailto:lc...@google.com>> wrote:

    In non portable implementations you would have to wait till the
    element/timer was finished processing before you could process any
    newly created timers. Timers that are created with the same
    key+window+timer family overwrite existing timers that have been
    set which can lead to a timer being overwritten multiple times
    while an element/timer is being processed and you wouldn't want to
    process a newly created timer with the incorrect values or process
    a timer you shouldn't have.

    In portable implementations, you can only safely say that element
    processing is done when the bundle completes. There could be value
    in exposing when an element is done since this could have usage in
    other parts of the system such as when a large GBK value is done.

    On Mon, Apr 13, 2020 at 8:06 AM Maximilian Michels <m...@apache.org
    <mailto:m...@apache.org>> wrote:

        Hi,

        In the "classic" Java mode one can set timers which in turn
        can set
        timers which enables to create a timer loop, e.g.:

        @ProcessElement
        public void processElement(
            ProcessContext context, @TimerId("timer") Timer timer) {
          // Initial timer
          timer.withOutputTimestamp(new
        Instant(0)).set(context.timestamp());
        }

        @OnTimer("timer")
        public void onTimer(
            OnTimerContext context,
            @TimerId("timer") Timer timer) {
          // Trigger again and start endless loop
          timer.withOutputTimestamp(new
        Instant(0)).set(context.timestamp());
        }


        In portability, since we are only guaranteed to receive the
        timers at
        the end of a bundle when we flush the outputs and have closed the
        inputs, it looks like this behavior can only be supported by
        starting a
        new bundle and executing the deferred timers. This also means
        to hold
        back the watermark to allow for these loops. Plus, starting a
        new bundle
        comes with some cost.

        Another possibility would be to allow a direct feedback loop,
        i.e. when
        the bundles closes and the timers fire, we can still set new
        timers.

        I wonder do we want to allow a timer loop to execute within a
        bundle?

        It would be possible to limit the number of iterations to
        perform during
        one bundle similar to how runners limit the number of elements
        or the
        duration of a bundle.

        -Max

Reply via email to