This is an automated email from the ASF dual-hosted git repository.

pnowojski pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/flink.git


The following commit(s) were added to refs/heads/master by this push:
     new 00a94ad50ba [FLINK-36416][table][runtime] Enable splittable timers for 
temporal join, temporal sort and windowed aggregations
00a94ad50ba is described below

commit 00a94ad50ba74f2171bd242e8e7f1c593288f95a
Author: Piotr Nowojski <[email protected]>
AuthorDate: Tue Oct 1 17:18:21 2024 +0200

    [FLINK-36416][table][runtime] Enable splittable timers for temporal join, 
temporal sort and windowed aggregations
    
    This is a follow up for 
https://cwiki.apache.org/confluence/display/FLINK/FLIP-443%3A+Interruptible+timers+firing
 . Temporal join, temporal sort and both windowed and table windowed 
aggregations in Table API/SQL can have large amount of registered/fired time, 
while at the same time enabling splittable timers shouldn't cause any side 
effects for those operators.
---
 .../join/temporal/BaseTwoInputStreamOperatorWithStateRetention.java  | 5 +++++
 .../flink/table/runtime/operators/sort/BaseTemporalSortOperator.java | 5 +++++
 .../operators/window/groupwindow/operator/WindowOperator.java        | 5 +++++
 3 files changed, 15 insertions(+)

diff --git 
a/flink-table/flink-table-runtime/src/main/java/org/apache/flink/table/runtime/operators/join/temporal/BaseTwoInputStreamOperatorWithStateRetention.java
 
b/flink-table/flink-table-runtime/src/main/java/org/apache/flink/table/runtime/operators/join/temporal/BaseTwoInputStreamOperatorWithStateRetention.java
index f7b36d2fcb0..62d28a77bae 100644
--- 
a/flink-table/flink-table-runtime/src/main/java/org/apache/flink/table/runtime/operators/join/temporal/BaseTwoInputStreamOperatorWithStateRetention.java
+++ 
b/flink-table/flink-table-runtime/src/main/java/org/apache/flink/table/runtime/operators/join/temporal/BaseTwoInputStreamOperatorWithStateRetention.java
@@ -80,6 +80,11 @@ public abstract class 
BaseTwoInputStreamOperatorWithStateRetention
         this.stateCleaningEnabled = minRetentionTime > 1;
     }
 
+    @Override
+    public boolean useSplittableTimers() {
+        return true;
+    }
+
     @Override
     public void open() throws Exception {
         initializeTimerService();
diff --git 
a/flink-table/flink-table-runtime/src/main/java/org/apache/flink/table/runtime/operators/sort/BaseTemporalSortOperator.java
 
b/flink-table/flink-table-runtime/src/main/java/org/apache/flink/table/runtime/operators/sort/BaseTemporalSortOperator.java
index 355a246e9f4..a12e01036c4 100644
--- 
a/flink-table/flink-table-runtime/src/main/java/org/apache/flink/table/runtime/operators/sort/BaseTemporalSortOperator.java
+++ 
b/flink-table/flink-table-runtime/src/main/java/org/apache/flink/table/runtime/operators/sort/BaseTemporalSortOperator.java
@@ -38,6 +38,11 @@ abstract class BaseTemporalSortOperator extends 
AbstractStreamOperator<RowData>
 
     BaseTemporalSortOperator() {}
 
+    @Override
+    public boolean useSplittableTimers() {
+        return true;
+    }
+
     @Override
     public void open() throws Exception {
         InternalTimerService<VoidNamespace> internalTimerService =
diff --git 
a/flink-table/flink-table-runtime/src/main/java/org/apache/flink/table/runtime/operators/window/groupwindow/operator/WindowOperator.java
 
b/flink-table/flink-table-runtime/src/main/java/org/apache/flink/table/runtime/operators/window/groupwindow/operator/WindowOperator.java
index 8e910375374..8a2c04bf36b 100644
--- 
a/flink-table/flink-table-runtime/src/main/java/org/apache/flink/table/runtime/operators/window/groupwindow/operator/WindowOperator.java
+++ 
b/flink-table/flink-table-runtime/src/main/java/org/apache/flink/table/runtime/operators/window/groupwindow/operator/WindowOperator.java
@@ -208,6 +208,11 @@ public abstract class WindowOperator<K, W extends Window> 
extends AbstractStream
         this.recordCounter = RecordCounter.of(inputCountIndex);
     }
 
+    @Override
+    public boolean useSplittableTimers() {
+        return true;
+    }
+
     WindowOperator(
             GroupWindowAssigner<W> windowAssigner,
             Trigger<W> trigger,

Reply via email to