[FLINK-5237] Don't Fire Processing-Time Timer in registerTimer() Immediately firing the timer can lead to endless recursion if the onTimer() method sets a timer for the past.
Project: http://git-wip-us.apache.org/repos/asf/flink/repo Commit: http://git-wip-us.apache.org/repos/asf/flink/commit/8c8c0288 Tree: http://git-wip-us.apache.org/repos/asf/flink/tree/8c8c0288 Diff: http://git-wip-us.apache.org/repos/asf/flink/diff/8c8c0288 Branch: refs/heads/master Commit: 8c8c02887a27cdc87bb019626f82ec03392ca8ce Parents: 274cc41 Author: Aljoscha Krettek <[email protected]> Authored: Tue Jan 10 14:15:42 2017 +0100 Committer: Aljoscha Krettek <[email protected]> Committed: Wed Jan 11 10:35:47 2017 +0100 ---------------------------------------------------------------------- .../runtime/tasks/TestProcessingTimeService.java | 12 ++---------- 1 file changed, 2 insertions(+), 10 deletions(-) ---------------------------------------------------------------------- http://git-wip-us.apache.org/repos/asf/flink/blob/8c8c0288/flink-streaming-java/src/main/java/org/apache/flink/streaming/runtime/tasks/TestProcessingTimeService.java ---------------------------------------------------------------------- diff --git a/flink-streaming-java/src/main/java/org/apache/flink/streaming/runtime/tasks/TestProcessingTimeService.java b/flink-streaming-java/src/main/java/org/apache/flink/streaming/runtime/tasks/TestProcessingTimeService.java index 3c33ad3..b4e7e97 100644 --- a/flink-streaming-java/src/main/java/org/apache/flink/streaming/runtime/tasks/TestProcessingTimeService.java +++ b/flink-streaming-java/src/main/java/org/apache/flink/streaming/runtime/tasks/TestProcessingTimeService.java @@ -53,7 +53,7 @@ public class TestProcessingTimeService extends ProcessingTimeService { } }); } - + public void setCurrentTime(long timestamp) throws Exception { this.currentTime = timestamp; @@ -90,15 +90,7 @@ public class TestProcessingTimeService extends ProcessingTimeService { CallbackTask callbackTask = new CallbackTask(target); - if (timestamp <= currentTime) { - try { - callbackTask.onProcessingTime(timestamp); - } catch (Exception e) { - throw new RuntimeException(e); - } - } else { - priorityQueue.offer(Tuple2.of(timestamp, callbackTask)); - } + priorityQueue.offer(Tuple2.of(timestamp, callbackTask)); return callbackTask; }
