This is an automated email from the ASF dual-hosted git repository.

arvid pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/flink.git

commit 3b163c715ebda99d58c9161023c260f5ba0ea88a
Author: liliwei <[email protected]>
AuthorDate: Sat Oct 23 17:17:07 2021 +0800

    [FLINK-20443][API/DataStream] ContinuousEventTimeTrigger  optimization
---
 .../triggers/ContinuousEventTimeTrigger.java        | 21 +++++++++++++--------
 1 file changed, 13 insertions(+), 8 deletions(-)

diff --git 
a/flink-streaming-java/src/main/java/org/apache/flink/streaming/api/windowing/triggers/ContinuousEventTimeTrigger.java
 
b/flink-streaming-java/src/main/java/org/apache/flink/streaming/api/windowing/triggers/ContinuousEventTimeTrigger.java
index 4ff2d93..8d0dcb8 100644
--- 
a/flink-streaming-java/src/main/java/org/apache/flink/streaming/api/windowing/triggers/ContinuousEventTimeTrigger.java
+++ 
b/flink-streaming-java/src/main/java/org/apache/flink/streaming/api/windowing/triggers/ContinuousEventTimeTrigger.java
@@ -59,12 +59,10 @@ public class ContinuousEventTimeTrigger<W extends Window> 
extends Trigger<Object
             ctx.registerEventTimeTimer(window.maxTimestamp());
         }
 
-        ReducingState<Long> fireTimestamp = ctx.getPartitionedState(stateDesc);
-        if (fireTimestamp.get() == null) {
-            long start = timestamp - (timestamp % interval);
-            long nextFireTimestamp = start + interval;
-            ctx.registerEventTimeTimer(nextFireTimestamp);
-            fireTimestamp.add(nextFireTimestamp);
+        ReducingState<Long> fireTimestampState = 
ctx.getPartitionedState(stateDesc);
+        if (fireTimestampState.get() == null) {
+            registerNextFireTimestamp(
+                    timestamp - (timestamp % interval), window, ctx, 
fireTimestampState);
         }
 
         return TriggerResult.CONTINUE;
@@ -83,8 +81,7 @@ public class ContinuousEventTimeTrigger<W extends Window> 
extends Trigger<Object
 
         if (fireTimestamp != null && fireTimestamp == time) {
             fireTimestampState.clear();
-            fireTimestampState.add(time + interval);
-            ctx.registerEventTimeTimer(time + interval);
+            registerNextFireTimestamp(time, window, ctx, fireTimestampState);
             return TriggerResult.FIRE;
         }
 
@@ -149,4 +146,12 @@ public class ContinuousEventTimeTrigger<W extends Window> 
extends Trigger<Object
             return Math.min(value1, value2);
         }
     }
+
+    private void registerNextFireTimestamp(
+            long time, W window, TriggerContext ctx, ReducingState<Long> 
fireTimestampState)
+            throws Exception {
+        long nextFireTimestamp = Math.min(time + interval, 
window.maxTimestamp());
+        fireTimestampState.add(nextFireTimestamp);
+        ctx.registerEventTimeTimer(nextFireTimestamp);
+    }
 }

Reply via email to