je-ik commented on a change in pull request #16650:
URL: https://github.com/apache/beam/pull/16650#discussion_r797399576
##########
File path:
runners/direct-java/src/main/java/org/apache/beam/runners/direct/StatefulParDoEvaluatorFactory.java
##########
@@ -174,45 +168,46 @@ public void processElement(WindowedValue<KeyedWorkItem<K,
KV<K, InputT>>> gbkRes
for (WindowedValue<KV<K, InputT>> windowedValue :
gbkResult.getValue().elementsIterable()) {
delegateEvaluator.processElement(windowedValue);
}
- PriorityQueue<TimerData> toBeFiredTimers =
- new PriorityQueue<>(Comparator.comparing(TimerData::getTimestamp));
- Instant maxWatermarkTime = BoundedWindow.TIMESTAMP_MIN_VALUE;
- Instant maxProcessingTime = BoundedWindow.TIMESTAMP_MIN_VALUE;
- Instant maxSynchronizedProcessingTime =
BoundedWindow.TIMESTAMP_MIN_VALUE;
for (TimerData timerData : gbkResult.getValue().timersIterable()) {
- toBeFiredTimers.add(timerData);
- switch (timerData.getDomain()) {
- case EVENT_TIME:
- maxWatermarkTime = Ordering.natural().max(maxWatermarkTime,
timerData.getTimestamp());
- break;
- case PROCESSING_TIME:
- maxProcessingTime = Ordering.natural().max(maxProcessingTime,
timerData.getTimestamp());
- break;
- case SYNCHRONIZED_PROCESSING_TIME:
- maxSynchronizedProcessingTime =
- Ordering.natural().max(maxSynchronizedProcessingTime,
timerData.getTimestamp());
+ // Get any new or modified timers that are earlier than the current
one. In order to
+ // maintain timer ordering,
+ // we need to fire these timers first.
+ NavigableSet<TimerData> earlierTimers =
+
timerInternals.getModifiedTimersOrdered(timerData.getDomain()).headSet(timerData,
true);
+ while (!earlierTimers.isEmpty()) {
+ TimerData insertedTimer = earlierTimers.pollFirst();
+ if (timerModified(insertedTimer)) {
+ continue;
+ }
+ // Make sure to register this timer as deleted. This could be a
timer that was originally
+ // set for the future
+ // and not in the bundle but was reset to an earlier time in this
bundle. If we don't
+ // explicity delete the
+ // future timer, then it will still fire.
+ timerInternals.deleteTimer(insertedTimer);
+ processTimer(insertedTimer, gbkResult.getValue().key());
Review comment:
Ah, ok, the `earlierTimers` Set is updated from `processTimer`, if
needed. Makes sense.
--
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.
To unsubscribe, e-mail: [email protected]
For queries about this service, please contact Infrastructure at:
[email protected]