This is an automated email from the ASF dual-hosted git repository.
pnowojski pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/flink.git
The following commit(s) were added to refs/heads/master by this push:
new 00a94ad50ba [FLINK-36416][table][runtime] Enable splittable timers for
temporal join, temporal sort and windowed aggregations
00a94ad50ba is described below
commit 00a94ad50ba74f2171bd242e8e7f1c593288f95a
Author: Piotr Nowojski <[email protected]>
AuthorDate: Tue Oct 1 17:18:21 2024 +0200
[FLINK-36416][table][runtime] Enable splittable timers for temporal join,
temporal sort and windowed aggregations
This is a follow up for
https://cwiki.apache.org/confluence/display/FLINK/FLIP-443%3A+Interruptible+timers+firing
. Temporal join, temporal sort and both windowed and table windowed
aggregations in Table API/SQL can have large amount of registered/fired time,
while at the same time enabling splittable timers shouldn't cause any side
effects for those operators.
---
.../join/temporal/BaseTwoInputStreamOperatorWithStateRetention.java | 5 +++++
.../flink/table/runtime/operators/sort/BaseTemporalSortOperator.java | 5 +++++
.../operators/window/groupwindow/operator/WindowOperator.java | 5 +++++
3 files changed, 15 insertions(+)
diff --git
a/flink-table/flink-table-runtime/src/main/java/org/apache/flink/table/runtime/operators/join/temporal/BaseTwoInputStreamOperatorWithStateRetention.java
b/flink-table/flink-table-runtime/src/main/java/org/apache/flink/table/runtime/operators/join/temporal/BaseTwoInputStreamOperatorWithStateRetention.java
index f7b36d2fcb0..62d28a77bae 100644
---
a/flink-table/flink-table-runtime/src/main/java/org/apache/flink/table/runtime/operators/join/temporal/BaseTwoInputStreamOperatorWithStateRetention.java
+++
b/flink-table/flink-table-runtime/src/main/java/org/apache/flink/table/runtime/operators/join/temporal/BaseTwoInputStreamOperatorWithStateRetention.java
@@ -80,6 +80,11 @@ public abstract class
BaseTwoInputStreamOperatorWithStateRetention
this.stateCleaningEnabled = minRetentionTime > 1;
}
+ @Override
+ public boolean useSplittableTimers() {
+ return true;
+ }
+
@Override
public void open() throws Exception {
initializeTimerService();
diff --git
a/flink-table/flink-table-runtime/src/main/java/org/apache/flink/table/runtime/operators/sort/BaseTemporalSortOperator.java
b/flink-table/flink-table-runtime/src/main/java/org/apache/flink/table/runtime/operators/sort/BaseTemporalSortOperator.java
index 355a246e9f4..a12e01036c4 100644
---
a/flink-table/flink-table-runtime/src/main/java/org/apache/flink/table/runtime/operators/sort/BaseTemporalSortOperator.java
+++
b/flink-table/flink-table-runtime/src/main/java/org/apache/flink/table/runtime/operators/sort/BaseTemporalSortOperator.java
@@ -38,6 +38,11 @@ abstract class BaseTemporalSortOperator extends
AbstractStreamOperator<RowData>
BaseTemporalSortOperator() {}
+ @Override
+ public boolean useSplittableTimers() {
+ return true;
+ }
+
@Override
public void open() throws Exception {
InternalTimerService<VoidNamespace> internalTimerService =
diff --git
a/flink-table/flink-table-runtime/src/main/java/org/apache/flink/table/runtime/operators/window/groupwindow/operator/WindowOperator.java
b/flink-table/flink-table-runtime/src/main/java/org/apache/flink/table/runtime/operators/window/groupwindow/operator/WindowOperator.java
index 8e910375374..8a2c04bf36b 100644
---
a/flink-table/flink-table-runtime/src/main/java/org/apache/flink/table/runtime/operators/window/groupwindow/operator/WindowOperator.java
+++
b/flink-table/flink-table-runtime/src/main/java/org/apache/flink/table/runtime/operators/window/groupwindow/operator/WindowOperator.java
@@ -208,6 +208,11 @@ public abstract class WindowOperator<K, W extends Window>
extends AbstractStream
this.recordCounter = RecordCounter.of(inputCountIndex);
}
+ @Override
+ public boolean useSplittableTimers() {
+ return true;
+ }
+
WindowOperator(
GroupWindowAssigner<W> windowAssigner,
Trigger<W> trigger,