You can still create "bundles" with portability that only contain timers. You may want to break fusion between the stateful dofn and the downstream transforms so the bundle doesn't do a bunch of additional processing that is preventing the timer loop from being short.
On Tue, Apr 14, 2020 at 2:44 AM Jan Lukavský <[email protected]> wrote: > I'm not quite familiar with details regarding portability, but in direct > runner Kenn suggested, that it should be possible to do the timer firing > loop as: > > - extract all timers in bundle > > - put them in PriorityQueue (or similar) > > - extract timer with lowest timer, fire it, put all updated timers > back to the queue > > - repeat for all timers with fire timestamp lower or equal to input > watermark > > That would work for direct runner, it would be just a little ugly doing > so in the current way the code is structured (how timer delegation works > and it would mean quite a large refactor to do it 'cleanly'). Maybe > simliar approach would be doable in Flink? Related issues are [1], [2] > and [3]. > > [1] https://issues.apache.org/jira/browse/BEAM-8543 > > [2] https://issues.apache.org/jira/browse/BEAM-8460 > > [3] https://issues.apache.org/jira/browse/BEAM-8459 > > Jan > > On 4/14/20 10:20 AM, Maximilian Michels wrote: > > Hey Jan, > > > > Just saw your message since you posted right before I replied. What you > > describe is precisely what I was experiencing. I also solved it the same > > way, i.e. pushing back a newly set timer to the next bundle. Note that > > there is no other way in portability because we can't fire timers once > > we have closed the current bundle; we need to close the bundle to > > receive all output which includes timers. It's definitely not efficient > > but it appears that this behavior is even desired by some runners, e.g. > > Dataflow. > > > > Thanks, > > Max > > > > On 13.04.20 18:58, Jan Lukavský wrote: > >> This is probably related to issue I was having with Direct runner and > >> timer ordering. The problem is that there might be multiple timers (for > >> given key) inside bundle and that each timer might set another timer. To > >> ensure timer ordering, timers must be fired one at a time and when fired > >> timer sets timer for time preceding current input watermark, the new > >> timer and all remaining timers are pushed back to next bundle. That was > >> the simplest yet efficient enough implementation for direct runner (see > >> [1]), for different runners might exist better alternatives (e.g. what > >> was discussed in [2]). > >> > >> Jan > >> > >> [1] > >> > https://github.com/apache/beam/blob/master/runners/direct-java/src/main/java/org/apache/beam/runners/direct/StatefulParDoEvaluatorFactory.java#L253 > >> > >> [2] https://github.com/apache/beam/pull/9190 > >> > >> On 4/13/20 6:01 PM, Reuven Lax wrote: > >>> I'm not sure I understand the difference - do any "classic" runners > >>> add new timers to the bundle? I know that at least the Dataflow runner > >>> would end up the new timer in a new bundle. > >>> > >>> One thing we do need to ensure is that modifications to timers are > >>> reflected in a bundle. So if a bundle contains a processElement and a > >>> processTimer and the processElement modifies the timer, that should be > >>> reflected in timer firing. > >>> > >>> On Mon, Apr 13, 2020 at 8:53 AM Luke Cwik <[email protected] > >>> <mailto:[email protected]>> wrote: > >>> > >>> In non portable implementations you would have to wait till the > >>> element/timer was finished processing before you could process any > >>> newly created timers. Timers that are created with the same > >>> key+window+timer family overwrite existing timers that have been > >>> set which can lead to a timer being overwritten multiple times > >>> while an element/timer is being processed and you wouldn't want to > >>> process a newly created timer with the incorrect values or process > >>> a timer you shouldn't have. > >>> > >>> In portable implementations, you can only safely say that element > >>> processing is done when the bundle completes. There could be value > >>> in exposing when an element is done since this could have usage in > >>> other parts of the system such as when a large GBK value is done. > >>> > >>> On Mon, Apr 13, 2020 at 8:06 AM Maximilian Michels < > [email protected] > >>> <mailto:[email protected]>> wrote: > >>> > >>> Hi, > >>> > >>> In the "classic" Java mode one can set timers which in turn > >>> can set > >>> timers which enables to create a timer loop, e.g.: > >>> > >>> @ProcessElement > >>> public void processElement( > >>> ProcessContext context, @TimerId("timer") Timer timer) { > >>> // Initial timer > >>> timer.withOutputTimestamp(new > >>> Instant(0)).set(context.timestamp()); > >>> } > >>> > >>> @OnTimer("timer") > >>> public void onTimer( > >>> OnTimerContext context, > >>> @TimerId("timer") Timer timer) { > >>> // Trigger again and start endless loop > >>> timer.withOutputTimestamp(new > >>> Instant(0)).set(context.timestamp()); > >>> } > >>> > >>> > >>> In portability, since we are only guaranteed to receive the > >>> timers at > >>> the end of a bundle when we flush the outputs and have closed > the > >>> inputs, it looks like this behavior can only be supported by > >>> starting a > >>> new bundle and executing the deferred timers. This also means > >>> to hold > >>> back the watermark to allow for these loops. Plus, starting a > >>> new bundle > >>> comes with some cost. > >>> > >>> Another possibility would be to allow a direct feedback loop, > >>> i.e. when > >>> the bundles closes and the timers fire, we can still set new > >>> timers. > >>> > >>> I wonder do we want to allow a timer loop to execute within a > >>> bundle? > >>> > >>> It would be possible to limit the number of iterations to > >>> perform during > >>> one bundle similar to how runners limit the number of elements > >>> or the > >>> duration of a bundle. > >>> > >>> -Max > >>> >
