Github user StefanRRichter commented on a diff in the pull request:

    https://github.com/apache/flink/pull/5908#discussion_r185749320
  
    --- Diff: 
flink-streaming-java/src/main/java/org/apache/flink/streaming/api/operators/HeapInternalTimerService.java
 ---
    @@ -222,29 +228,53 @@ public void registerProcessingTimeTimer(N namespace, 
long time) {
     
        @Override
        public void registerEventTimeTimer(N namespace, long time) {
    -           InternalTimer<K, N> timer = new InternalTimer<>(time, (K) 
keyContext.getCurrentKey(), namespace);
    -           Set<InternalTimer<K, N>> timerSet = 
getEventTimeTimerSetForTimer(timer);
    -           if (timerSet.add(timer)) {
    +           InternalTimer<K, N> timer = new InternalTimer<>(time, (K) 
keyContext.getCurrentKey(), namespace,
    +                   
this.knInternalTimeServiceManager.getStateTableVersion().intValue(), -1);
    +           Map<String, InternalTimer<K, N>> timerMap = 
getEventTimeTimerSetForTimer((K) keyContext.getCurrentKey());
    +           InternalTimer<K, N> prev = timerMap.put(timer.buildHashKey(), 
timer);
    +           if (prev == null) {
                        eventTimeTimersQueue.add(timer);
                }
        }
     
        @Override
        public void deleteProcessingTimeTimer(N namespace, long time) {
    -           InternalTimer<K, N> timer = new InternalTimer<>(time, (K) 
keyContext.getCurrentKey(), namespace);
    -           Set<InternalTimer<K, N>> timerSet = 
getProcessingTimeTimerSetForTimer(timer);
    -           if (timerSet.remove(timer)) {
    +           Map<String, InternalTimer<K, N>> timerMap = 
getProcessingTimeTimerSetForTimer((K) keyContext.getCurrentKey());
    +           String key = 
InternalTimer.buildHashKey(keyContext.getCurrentKey().toString(), 
namespace.toString(), time);
    +           InternalTimer<K, N> timer = timerMap.get(key);
    +           if (timer != null) {
    +                   
timer.markDelete(this.knInternalTimeServiceManager.getStateTableVersion().intValue());
                        processingTimeTimersQueue.remove(timer);
                }
    +           this.knInternalTimeServiceManager.getReadLock().lock();
    +           try {
    +                   if 
(this.knInternalTimeServiceManager.getSnapshotVersions().size() == 0) {
    +                           timerMap.remove(key);
    --- End diff --
    
    This looks like it could take a very long time (until the timer triggers) 
until a timer is truly removed when the remove happened while there was a 
snapshot ongoing? This could potentially accumulate a lot of deleted timers.


---

Reply via email to