[FLINK-5237] Don't Fire Processing-Time Timer in registerTimer() Immediately firing the timer can lead to endless recursion if the onTimer() method sets a timer for the past.
Project: http://git-wip-us.apache.org/repos/asf/flink/repo Commit: http://git-wip-us.apache.org/repos/asf/flink/commit/ac2c58c6 Tree: http://git-wip-us.apache.org/repos/asf/flink/tree/ac2c58c6 Diff: http://git-wip-us.apache.org/repos/asf/flink/diff/ac2c58c6 Branch: refs/heads/release-1.2 Commit: ac2c58c669dfb894915707e9d63fc9afe1157445 Parents: ebc1388 Author: Aljoscha Krettek <[email protected]> Authored: Tue Jan 10 14:15:42 2017 +0100 Committer: Aljoscha Krettek <[email protected]> Committed: Wed Jan 11 15:47:30 2017 +0100 ---------------------------------------------------------------------- .../runtime/tasks/TestProcessingTimeService.java | 12 ++---------- 1 file changed, 2 insertions(+), 10 deletions(-) ---------------------------------------------------------------------- http://git-wip-us.apache.org/repos/asf/flink/blob/ac2c58c6/flink-streaming-java/src/main/java/org/apache/flink/streaming/runtime/tasks/TestProcessingTimeService.java ---------------------------------------------------------------------- diff --git a/flink-streaming-java/src/main/java/org/apache/flink/streaming/runtime/tasks/TestProcessingTimeService.java b/flink-streaming-java/src/main/java/org/apache/flink/streaming/runtime/tasks/TestProcessingTimeService.java index 3c33ad3..b4e7e97 100644 --- a/flink-streaming-java/src/main/java/org/apache/flink/streaming/runtime/tasks/TestProcessingTimeService.java +++ b/flink-streaming-java/src/main/java/org/apache/flink/streaming/runtime/tasks/TestProcessingTimeService.java @@ -53,7 +53,7 @@ public class TestProcessingTimeService extends ProcessingTimeService { } }); } - + public void setCurrentTime(long timestamp) throws Exception { this.currentTime = timestamp; @@ -90,15 +90,7 @@ public class TestProcessingTimeService extends ProcessingTimeService { CallbackTask callbackTask = new CallbackTask(target); - if (timestamp <= currentTime) { - try { - callbackTask.onProcessingTime(timestamp); - } catch (Exception e) { - throw new RuntimeException(e); - } - } else { - priorityQueue.offer(Tuple2.of(timestamp, callbackTask)); - } + priorityQueue.offer(Tuple2.of(timestamp, callbackTask)); return callbackTask; }
