je-ik commented on a change in pull request #16650:
URL: https://github.com/apache/beam/pull/16650#discussion_r796370946
##########
File path:
runners/direct-java/src/main/java/org/apache/beam/runners/direct/StatefulParDoEvaluatorFactory.java
##########
@@ -174,45 +168,46 @@ public void processElement(WindowedValue<KeyedWorkItem<K,
KV<K, InputT>>> gbkRes
for (WindowedValue<KV<K, InputT>> windowedValue :
gbkResult.getValue().elementsIterable()) {
delegateEvaluator.processElement(windowedValue);
}
- PriorityQueue<TimerData> toBeFiredTimers =
- new PriorityQueue<>(Comparator.comparing(TimerData::getTimestamp));
- Instant maxWatermarkTime = BoundedWindow.TIMESTAMP_MIN_VALUE;
- Instant maxProcessingTime = BoundedWindow.TIMESTAMP_MIN_VALUE;
- Instant maxSynchronizedProcessingTime =
BoundedWindow.TIMESTAMP_MIN_VALUE;
for (TimerData timerData : gbkResult.getValue().timersIterable()) {
- toBeFiredTimers.add(timerData);
- switch (timerData.getDomain()) {
- case EVENT_TIME:
- maxWatermarkTime = Ordering.natural().max(maxWatermarkTime,
timerData.getTimestamp());
- break;
- case PROCESSING_TIME:
- maxProcessingTime = Ordering.natural().max(maxProcessingTime,
timerData.getTimestamp());
- break;
- case SYNCHRONIZED_PROCESSING_TIME:
- maxSynchronizedProcessingTime =
- Ordering.natural().max(maxSynchronizedProcessingTime,
timerData.getTimestamp());
+ // Get any new or modified timers that are earlier than the current
one. In order to
+ // maintain timer ordering,
+ // we need to fire these timers first.
+ NavigableSet<TimerData> earlierTimers =
+
timerInternals.getModifiedTimersOrdered(timerData.getDomain()).headSet(timerData,
true);
+ while (!earlierTimers.isEmpty()) {
+ TimerData insertedTimer = earlierTimers.pollFirst();
+ if (timerModified(insertedTimer)) {
+ continue;
+ }
+ // Make sure to register this timer as deleted. This could be a
timer that was originally
+ // set for the future
+ // and not in the bundle but was reset to an earlier time in this
bundle. If we don't
+ // explicity delete the
+ // future timer, then it will still fire.
+ timerInternals.deleteTimer(insertedTimer);
+ processTimer(insertedTimer, gbkResult.getValue().key());
Review comment:
I'm reading this quite quickly, but how does this handle the case where
`earlierTimers` contains a looping timer that sets itself in `processTimer` for
time earlier than `timerData`?
##########
File path:
runners/direct-java/src/main/java/org/apache/beam/runners/direct/WatermarkManager.java
##########
@@ -366,32 +365,32 @@ synchronized void updateTimers(TimerUpdate update) {
keyTimers.add(timer);
}
- existingTimersForKey.put(
- timer.getNamespace(), timer.getTimerId() + '+' +
timer.getTimerFamilyId(), timer);
+ existingTimersForKey.put(timer.getNamespace(), timerKey, timer);
}
}
for (TimerData timer : update.getDeletedTimers()) {
if (TimeDomain.EVENT_TIME.equals(timer.getDomain())) {
+ String timerKey = timer.getTimerId() + '+' +
timer.getTimerFamilyId();
Review comment:
The construction of the `timerKey` seems repeated, could we wrap it in a
helper method?
##########
File path:
sdks/java/core/src/test/java/org/apache/beam/sdk/transforms/ParDoTest.java
##########
@@ -4085,7 +4085,8 @@ public void onTimer(OutputReceiver<Long> r) {
ValidatesRunner.class,
UsesStatefulParDo.class,
UsesTimersInParDo.class,
- UsesLoopingTimer.class
+ UsesLoopingTimer.class,
+ UsesStrictTimerOrdering.class
Review comment:
Should `UsesLoopingTimer` imply `UsesStrictTimerOrdering` automatically?
--
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.
To unsubscribe, e-mail: [email protected]
For queries about this service, please contact Infrastructure at:
[email protected]