You can still create "bundles" with portability that only contain timers.
You may want to break fusion between the stateful dofn and the downstream
transforms so the bundle doesn't do a bunch of additional processing that
is preventing the timer loop from being short.

On Tue, Apr 14, 2020 at 2:44 AM Jan Lukavský <[email protected]> wrote:

> I'm not quite familiar with details regarding portability, but in direct
> runner Kenn suggested, that it should be possible to do the timer firing
> loop as:
>
>   - extract all timers in bundle
>
>   - put them in PriorityQueue (or similar)
>
>   - extract timer with lowest timer, fire it,  put all updated timers
> back to the queue
>
>   - repeat for all timers with fire timestamp lower or equal to input
> watermark
>
> That would work for direct runner, it would be just a little ugly doing
> so in the current way the code is structured (how timer delegation works
> and it would mean quite a large refactor to do it 'cleanly'). Maybe
> simliar approach would be doable in Flink? Related issues are [1], [2]
> and [3].
>
> [1] https://issues.apache.org/jira/browse/BEAM-8543
>
> [2] https://issues.apache.org/jira/browse/BEAM-8460
>
> [3] https://issues.apache.org/jira/browse/BEAM-8459
>
> Jan
>
> On 4/14/20 10:20 AM, Maximilian Michels wrote:
> > Hey Jan,
> >
> > Just saw your message since you posted right before I replied. What you
> > describe is precisely what I was experiencing. I also solved it the same
> > way, i.e. pushing back a newly set timer to the next bundle. Note that
> > there is no other way in portability because we can't fire timers once
> > we have closed the current bundle; we need to close the bundle to
> > receive all output which includes timers. It's definitely not efficient
> > but it appears that this behavior is even desired by some runners, e.g.
> > Dataflow.
> >
> > Thanks,
> > Max
> >
> > On 13.04.20 18:58, Jan Lukavský wrote:
> >> This is probably related to issue I was having with Direct runner and
> >> timer ordering. The problem is that there might be multiple timers (for
> >> given key) inside bundle and that each timer might set another timer. To
> >> ensure timer ordering, timers must be fired one at a time and when fired
> >> timer sets timer for time preceding current input watermark, the new
> >> timer and all remaining timers are pushed back to next bundle. That was
> >> the simplest yet efficient enough implementation for direct runner (see
> >> [1]), for different runners might exist better alternatives (e.g. what
> >> was discussed in [2]).
> >>
> >> Jan
> >>
> >> [1]
> >>
> https://github.com/apache/beam/blob/master/runners/direct-java/src/main/java/org/apache/beam/runners/direct/StatefulParDoEvaluatorFactory.java#L253
> >>
> >> [2] https://github.com/apache/beam/pull/9190
> >>
> >> On 4/13/20 6:01 PM, Reuven Lax wrote:
> >>> I'm not sure I understand the difference - do any "classic" runners
> >>> add new timers to the bundle? I know that at least the Dataflow runner
> >>> would end up the new timer in a new bundle.
> >>>
> >>> One thing we do need to ensure is that modifications to timers are
> >>> reflected in a bundle. So if a bundle contains a processElement and a
> >>> processTimer and the processElement modifies the timer, that should be
> >>> reflected in timer firing.
> >>>
> >>> On Mon, Apr 13, 2020 at 8:53 AM Luke Cwik <[email protected]
> >>> <mailto:[email protected]>> wrote:
> >>>
> >>>      In non portable implementations you would have to wait till the
> >>>      element/timer was finished processing before you could process any
> >>>      newly created timers. Timers that are created with the same
> >>>      key+window+timer family overwrite existing timers that have been
> >>>      set which can lead to a timer being overwritten multiple times
> >>>      while an element/timer is being processed and you wouldn't want to
> >>>      process a newly created timer with the incorrect values or process
> >>>      a timer you shouldn't have.
> >>>
> >>>      In portable implementations, you can only safely say that element
> >>>      processing is done when the bundle completes. There could be value
> >>>      in exposing when an element is done since this could have usage in
> >>>      other parts of the system such as when a large GBK value is done.
> >>>
> >>>      On Mon, Apr 13, 2020 at 8:06 AM Maximilian Michels <
> [email protected]
> >>>      <mailto:[email protected]>> wrote:
> >>>
> >>>          Hi,
> >>>
> >>>          In the "classic" Java mode one can set timers which in turn
> >>>          can set
> >>>          timers which enables to create a timer loop, e.g.:
> >>>
> >>>          @ProcessElement
> >>>          public void processElement(
> >>>              ProcessContext context, @TimerId("timer") Timer timer) {
> >>>            // Initial timer
> >>>            timer.withOutputTimestamp(new
> >>>          Instant(0)).set(context.timestamp());
> >>>          }
> >>>
> >>>          @OnTimer("timer")
> >>>          public void onTimer(
> >>>              OnTimerContext context,
> >>>              @TimerId("timer") Timer timer) {
> >>>            // Trigger again and start endless loop
> >>>            timer.withOutputTimestamp(new
> >>>          Instant(0)).set(context.timestamp());
> >>>          }
> >>>
> >>>
> >>>          In portability, since we are only guaranteed to receive the
> >>>          timers at
> >>>          the end of a bundle when we flush the outputs and have closed
> the
> >>>          inputs, it looks like this behavior can only be supported by
> >>>          starting a
> >>>          new bundle and executing the deferred timers. This also means
> >>>          to hold
> >>>          back the watermark to allow for these loops. Plus, starting a
> >>>          new bundle
> >>>          comes with some cost.
> >>>
> >>>          Another possibility would be to allow a direct feedback loop,
> >>>          i.e. when
> >>>          the bundles closes and the timers fire, we can still set new
> >>>          timers.
> >>>
> >>>          I wonder do we want to allow a timer loop to execute within a
> >>>          bundle?
> >>>
> >>>          It would be possible to limit the number of iterations to
> >>>          perform during
> >>>          one bundle similar to how runners limit the number of elements
> >>>          or the
> >>>          duration of a bundle.
> >>>
> >>>          -Max
> >>>
>

Reply via email to