This is an automated email from the ASF dual-hosted git repository. arvid pushed a commit to branch master in repository https://gitbox.apache.org/repos/asf/flink.git
commit 3b163c715ebda99d58c9161023c260f5ba0ea88a Author: liliwei <[email protected]> AuthorDate: Sat Oct 23 17:17:07 2021 +0800 [FLINK-20443][API/DataStream] ContinuousEventTimeTrigger optimization --- .../triggers/ContinuousEventTimeTrigger.java | 21 +++++++++++++-------- 1 file changed, 13 insertions(+), 8 deletions(-) diff --git a/flink-streaming-java/src/main/java/org/apache/flink/streaming/api/windowing/triggers/ContinuousEventTimeTrigger.java b/flink-streaming-java/src/main/java/org/apache/flink/streaming/api/windowing/triggers/ContinuousEventTimeTrigger.java index 4ff2d93..8d0dcb8 100644 --- a/flink-streaming-java/src/main/java/org/apache/flink/streaming/api/windowing/triggers/ContinuousEventTimeTrigger.java +++ b/flink-streaming-java/src/main/java/org/apache/flink/streaming/api/windowing/triggers/ContinuousEventTimeTrigger.java @@ -59,12 +59,10 @@ public class ContinuousEventTimeTrigger<W extends Window> extends Trigger<Object ctx.registerEventTimeTimer(window.maxTimestamp()); } - ReducingState<Long> fireTimestamp = ctx.getPartitionedState(stateDesc); - if (fireTimestamp.get() == null) { - long start = timestamp - (timestamp % interval); - long nextFireTimestamp = start + interval; - ctx.registerEventTimeTimer(nextFireTimestamp); - fireTimestamp.add(nextFireTimestamp); + ReducingState<Long> fireTimestampState = ctx.getPartitionedState(stateDesc); + if (fireTimestampState.get() == null) { + registerNextFireTimestamp( + timestamp - (timestamp % interval), window, ctx, fireTimestampState); } return TriggerResult.CONTINUE; @@ -83,8 +81,7 @@ public class ContinuousEventTimeTrigger<W extends Window> extends Trigger<Object if (fireTimestamp != null && fireTimestamp == time) { fireTimestampState.clear(); - fireTimestampState.add(time + interval); - ctx.registerEventTimeTimer(time + interval); + registerNextFireTimestamp(time, window, ctx, fireTimestampState); return TriggerResult.FIRE; } @@ -149,4 +146,12 @@ public class ContinuousEventTimeTrigger<W extends Window> extends Trigger<Object return Math.min(value1, value2); } } + + private void registerNextFireTimestamp( + long time, W window, TriggerContext ctx, ReducingState<Long> fireTimestampState) + throws Exception { + long nextFireTimestamp = Math.min(time + interval, window.maxTimestamp()); + fireTimestampState.add(nextFireTimestamp); + ctx.registerEventTimeTimer(nextFireTimestamp); + } }
