Github user StefanRRichter commented on a diff in the pull request:
https://github.com/apache/flink/pull/5908#discussion_r185749320
--- Diff:
flink-streaming-java/src/main/java/org/apache/flink/streaming/api/operators/HeapInternalTimerService.java
---
@@ -222,29 +228,53 @@ public void registerProcessingTimeTimer(N namespace,
long time) {
@Override
public void registerEventTimeTimer(N namespace, long time) {
- InternalTimer<K, N> timer = new InternalTimer<>(time, (K)
keyContext.getCurrentKey(), namespace);
- Set<InternalTimer<K, N>> timerSet =
getEventTimeTimerSetForTimer(timer);
- if (timerSet.add(timer)) {
+ InternalTimer<K, N> timer = new InternalTimer<>(time, (K)
keyContext.getCurrentKey(), namespace,
+
this.knInternalTimeServiceManager.getStateTableVersion().intValue(), -1);
+ Map<String, InternalTimer<K, N>> timerMap =
getEventTimeTimerSetForTimer((K) keyContext.getCurrentKey());
+ InternalTimer<K, N> prev = timerMap.put(timer.buildHashKey(),
timer);
+ if (prev == null) {
eventTimeTimersQueue.add(timer);
}
}
@Override
public void deleteProcessingTimeTimer(N namespace, long time) {
- InternalTimer<K, N> timer = new InternalTimer<>(time, (K)
keyContext.getCurrentKey(), namespace);
- Set<InternalTimer<K, N>> timerSet =
getProcessingTimeTimerSetForTimer(timer);
- if (timerSet.remove(timer)) {
+ Map<String, InternalTimer<K, N>> timerMap =
getProcessingTimeTimerSetForTimer((K) keyContext.getCurrentKey());
+ String key =
InternalTimer.buildHashKey(keyContext.getCurrentKey().toString(),
namespace.toString(), time);
+ InternalTimer<K, N> timer = timerMap.get(key);
+ if (timer != null) {
+
timer.markDelete(this.knInternalTimeServiceManager.getStateTableVersion().intValue());
processingTimeTimersQueue.remove(timer);
}
+ this.knInternalTimeServiceManager.getReadLock().lock();
+ try {
+ if
(this.knInternalTimeServiceManager.getSnapshotVersions().size() == 0) {
+ timerMap.remove(key);
--- End diff --
This looks like it could take a very long time (until the timer triggers)
until a timer is truly removed when the remove happened while there was a
snapshot ongoing? This could potentially accumulate a lot of deleted timers.
---