Repository: flink Updated Branches: refs/heads/release-1.0 2c78be3e9 -> 954cdc113
[hotfix] Fix ContinuousProcessingTimeTrigger Project: http://git-wip-us.apache.org/repos/asf/flink/repo Commit: http://git-wip-us.apache.org/repos/asf/flink/commit/954cdc11 Tree: http://git-wip-us.apache.org/repos/asf/flink/tree/954cdc11 Diff: http://git-wip-us.apache.org/repos/asf/flink/diff/954cdc11 Branch: refs/heads/release-1.0 Commit: 954cdc113693cac6e68d6d7022e76ddedac29a9f Parents: 2c78be3 Author: Aljoscha Krettek <[email protected]> Authored: Mon Mar 21 10:57:10 2016 +0100 Committer: Aljoscha Krettek <[email protected]> Committed: Mon Mar 21 11:13:59 2016 +0100 ---------------------------------------------------------------------- .../api/windowing/triggers/ContinuousProcessingTimeTrigger.java | 4 ++-- 1 file changed, 2 insertions(+), 2 deletions(-) ---------------------------------------------------------------------- http://git-wip-us.apache.org/repos/asf/flink/blob/954cdc11/flink-streaming-java/src/main/java/org/apache/flink/streaming/api/windowing/triggers/ContinuousProcessingTimeTrigger.java ---------------------------------------------------------------------- diff --git a/flink-streaming-java/src/main/java/org/apache/flink/streaming/api/windowing/triggers/ContinuousProcessingTimeTrigger.java b/flink-streaming-java/src/main/java/org/apache/flink/streaming/api/windowing/triggers/ContinuousProcessingTimeTrigger.java index eacdf0b..a952c42 100644 --- a/flink-streaming-java/src/main/java/org/apache/flink/streaming/api/windowing/triggers/ContinuousProcessingTimeTrigger.java +++ b/flink-streaming-java/src/main/java/org/apache/flink/streaming/api/windowing/triggers/ContinuousProcessingTimeTrigger.java @@ -58,14 +58,14 @@ public class ContinuousProcessingTimeTrigger<W extends Window> extends Trigger<O long start = currentTime - (currentTime % interval); fireState.update(start + interval); - ctx.registerProcessingTimeTimer(nextFireTimestamp); + ctx.registerProcessingTimeTimer((start + interval)); return TriggerResult.CONTINUE; } if (currentTime > nextFireTimestamp) { long start = currentTime - (currentTime % interval); fireState.update(start + interval); - ctx.registerProcessingTimeTimer(nextFireTimestamp); + ctx.registerProcessingTimeTimer((start + interval)); return TriggerResult.FIRE; }
