reuvenlax commented on a change in pull request #16650:
URL: https://github.com/apache/beam/pull/16650#discussion_r796931971
##########
File path:
runners/direct-java/src/main/java/org/apache/beam/runners/direct/StatefulParDoEvaluatorFactory.java
##########
@@ -174,45 +168,46 @@ public void processElement(WindowedValue<KeyedWorkItem<K,
KV<K, InputT>>> gbkRes
for (WindowedValue<KV<K, InputT>> windowedValue :
gbkResult.getValue().elementsIterable()) {
delegateEvaluator.processElement(windowedValue);
}
- PriorityQueue<TimerData> toBeFiredTimers =
- new PriorityQueue<>(Comparator.comparing(TimerData::getTimestamp));
- Instant maxWatermarkTime = BoundedWindow.TIMESTAMP_MIN_VALUE;
- Instant maxProcessingTime = BoundedWindow.TIMESTAMP_MIN_VALUE;
- Instant maxSynchronizedProcessingTime =
BoundedWindow.TIMESTAMP_MIN_VALUE;
for (TimerData timerData : gbkResult.getValue().timersIterable()) {
- toBeFiredTimers.add(timerData);
- switch (timerData.getDomain()) {
- case EVENT_TIME:
- maxWatermarkTime = Ordering.natural().max(maxWatermarkTime,
timerData.getTimestamp());
- break;
- case PROCESSING_TIME:
- maxProcessingTime = Ordering.natural().max(maxProcessingTime,
timerData.getTimestamp());
- break;
- case SYNCHRONIZED_PROCESSING_TIME:
- maxSynchronizedProcessingTime =
- Ordering.natural().max(maxSynchronizedProcessingTime,
timerData.getTimestamp());
+ // Get any new or modified timers that are earlier than the current
one. In order to
+ // maintain timer ordering,
+ // we need to fire these timers first.
+ NavigableSet<TimerData> earlierTimers =
+
timerInternals.getModifiedTimersOrdered(timerData.getDomain()).headSet(timerData,
true);
+ while (!earlierTimers.isEmpty()) {
+ TimerData insertedTimer = earlierTimers.pollFirst();
+ if (timerModified(insertedTimer)) {
+ continue;
+ }
+ // Make sure to register this timer as deleted. This could be a
timer that was originally
+ // set for the future
+ // and not in the bundle but was reset to an earlier time in this
bundle. If we don't
+ // explicity delete the
+ // future timer, then it will still fire.
+ timerInternals.deleteTimer(insertedTimer);
+ processTimer(insertedTimer, gbkResult.getValue().key());
Review comment:
The loop always polls for the earliest timer in the NavigableSet. If an
earlier timer was set, then the next call to poll should return that timer. If
the user does this consistently, then of course this is an infinite loop.
However generally this behavior (resetting a timer into the past) is disallowed
by Beam unless the user explicitly allows it by setting the allowed timestamp
skew.
There is a test of looping timers in ParDoTest.
--
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.
To unsubscribe, e-mail: [email protected]
For queries about this service, please contact Infrastructure at:
[email protected]