[FLINK-5237] Don't Fire Processing-Time Timer in registerTimer()

Immediately firing the timer can lead to endless recursion if the
onTimer() method sets a timer for the past.


Project: http://git-wip-us.apache.org/repos/asf/flink/repo
Commit: http://git-wip-us.apache.org/repos/asf/flink/commit/ac2c58c6
Tree: http://git-wip-us.apache.org/repos/asf/flink/tree/ac2c58c6
Diff: http://git-wip-us.apache.org/repos/asf/flink/diff/ac2c58c6

Branch: refs/heads/release-1.2
Commit: ac2c58c669dfb894915707e9d63fc9afe1157445
Parents: ebc1388
Author: Aljoscha Krettek <[email protected]>
Authored: Tue Jan 10 14:15:42 2017 +0100
Committer: Aljoscha Krettek <[email protected]>
Committed: Wed Jan 11 15:47:30 2017 +0100

----------------------------------------------------------------------
 .../runtime/tasks/TestProcessingTimeService.java        | 12 ++----------
 1 file changed, 2 insertions(+), 10 deletions(-)
----------------------------------------------------------------------


http://git-wip-us.apache.org/repos/asf/flink/blob/ac2c58c6/flink-streaming-java/src/main/java/org/apache/flink/streaming/runtime/tasks/TestProcessingTimeService.java
----------------------------------------------------------------------
diff --git 
a/flink-streaming-java/src/main/java/org/apache/flink/streaming/runtime/tasks/TestProcessingTimeService.java
 
b/flink-streaming-java/src/main/java/org/apache/flink/streaming/runtime/tasks/TestProcessingTimeService.java
index 3c33ad3..b4e7e97 100644
--- 
a/flink-streaming-java/src/main/java/org/apache/flink/streaming/runtime/tasks/TestProcessingTimeService.java
+++ 
b/flink-streaming-java/src/main/java/org/apache/flink/streaming/runtime/tasks/TestProcessingTimeService.java
@@ -53,7 +53,7 @@ public class TestProcessingTimeService extends 
ProcessingTimeService {
                        }
                });
        }
-       
+
        public void setCurrentTime(long timestamp) throws Exception {
                this.currentTime = timestamp;
 
@@ -90,15 +90,7 @@ public class TestProcessingTimeService extends 
ProcessingTimeService {
 
                CallbackTask callbackTask = new CallbackTask(target);
 
-               if (timestamp <= currentTime) {
-                       try {
-                               callbackTask.onProcessingTime(timestamp);
-                       } catch (Exception e) {
-                               throw new RuntimeException(e);
-                       }
-               } else {
-                       priorityQueue.offer(Tuple2.of(timestamp, callbackTask));
-               }
+               priorityQueue.offer(Tuple2.of(timestamp, callbackTask));
 
                return callbackTask;
        }

Reply via email to