Github user StefanRRichter commented on a diff in the pull request:

    https://github.com/apache/flink/pull/5908#discussion_r185749819
  
    --- Diff: 
flink-streaming-java/src/main/java/org/apache/flink/streaming/api/operators/HeapInternalTimerService.java
 ---
    @@ -222,29 +228,53 @@ public void registerProcessingTimeTimer(N namespace, 
long time) {
     
        @Override
        public void registerEventTimeTimer(N namespace, long time) {
    -           InternalTimer<K, N> timer = new InternalTimer<>(time, (K) 
keyContext.getCurrentKey(), namespace);
    -           Set<InternalTimer<K, N>> timerSet = 
getEventTimeTimerSetForTimer(timer);
    -           if (timerSet.add(timer)) {
    +           InternalTimer<K, N> timer = new InternalTimer<>(time, (K) 
keyContext.getCurrentKey(), namespace,
    +                   
this.knInternalTimeServiceManager.getStateTableVersion().intValue(), -1);
    +           Map<String, InternalTimer<K, N>> timerMap = 
getEventTimeTimerSetForTimer((K) keyContext.getCurrentKey());
    +           InternalTimer<K, N> prev = timerMap.put(timer.buildHashKey(), 
timer);
    +           if (prev == null) {
    --- End diff --
    
    What happens if we find a `prev != null` that was marked as deleted? Looks 
like no timer will be inserted even though it should.


---

Reply via email to