This is an automated email from the ASF dual-hosted git repository.

xuyangzhong pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/flink.git


The following commit(s) were added to refs/heads/master by this push:
     new fdce52cfa2e [FLINK-36663][Window]Fix the first processWatermark has 
extra data after restore by restore timeService's watermark. (#25637)
fdce52cfa2e is described below

commit fdce52cfa2ee914d5613976857399e60e84662aa
Author: xing1mo <44420959+xing...@users.noreply.github.com>
AuthorDate: Tue Jul 8 15:07:38 2025 +0800

    [FLINK-36663][Window]Fix the first processWatermark has extra data after 
restore by restore timeService's watermark. (#25637)
---
 .../api/operators/InternalTimerService.java        |   3 +
 .../api/operators/InternalTimerServiceImpl.java    |   5 +
 .../state/BatchExecutionInternalTimeService.java   |   5 +
 .../api/operators/TestInternalTimerService.java    |   5 +
 .../tvf/common/AsyncStateWindowAggOperator.java    |   3 +
 .../window/tvf/common/WindowAggOperator.java       |   4 +-
 .../window/SlicingWindowAggOperatorTest.java       | 107 +++++++++++++++++++++
 7 files changed, 131 insertions(+), 1 deletion(-)

diff --git 
a/flink-runtime/src/main/java/org/apache/flink/streaming/api/operators/InternalTimerService.java
 
b/flink-runtime/src/main/java/org/apache/flink/streaming/api/operators/InternalTimerService.java
index 191c2675779..2e78da83d94 100644
--- 
a/flink-runtime/src/main/java/org/apache/flink/streaming/api/operators/InternalTimerService.java
+++ 
b/flink-runtime/src/main/java/org/apache/flink/streaming/api/operators/InternalTimerService.java
@@ -38,6 +38,9 @@ public interface InternalTimerService<N> {
     /** Returns the current event-time watermark. */
     long currentWatermark();
 
+    /** Initialize watermark after restore. */
+    void initializeWatermark(long watermark);
+
     /**
      * Registers a timer to be fired when processing time passes the given 
time. The namespace you
      * pass here will be provided when the timer fires.
diff --git 
a/flink-runtime/src/main/java/org/apache/flink/streaming/api/operators/InternalTimerServiceImpl.java
 
b/flink-runtime/src/main/java/org/apache/flink/streaming/api/operators/InternalTimerServiceImpl.java
index 269ba493b18..e24e8c9a62d 100644
--- 
a/flink-runtime/src/main/java/org/apache/flink/streaming/api/operators/InternalTimerServiceImpl.java
+++ 
b/flink-runtime/src/main/java/org/apache/flink/streaming/api/operators/InternalTimerServiceImpl.java
@@ -224,6 +224,11 @@ public class InternalTimerServiceImpl<K, N> implements 
InternalTimerService<N> {
         return currentWatermark;
     }
 
+    @Override
+    public void initializeWatermark(long watermark) {
+        this.currentWatermark = watermark;
+    }
+
     @Override
     public void registerProcessingTimeTimer(N namespace, long time) {
         InternalTimer<K, N> oldHead = processingTimeTimersQueue.peek();
diff --git 
a/flink-runtime/src/main/java/org/apache/flink/streaming/api/operators/sorted/state/BatchExecutionInternalTimeService.java
 
b/flink-runtime/src/main/java/org/apache/flink/streaming/api/operators/sorted/state/BatchExecutionInternalTimeService.java
index 771afecd2ba..8fd71e760d7 100644
--- 
a/flink-runtime/src/main/java/org/apache/flink/streaming/api/operators/sorted/state/BatchExecutionInternalTimeService.java
+++ 
b/flink-runtime/src/main/java/org/apache/flink/streaming/api/operators/sorted/state/BatchExecutionInternalTimeService.java
@@ -82,6 +82,11 @@ public class BatchExecutionInternalTimeService<K, N> 
implements InternalTimerSer
         return currentWatermark;
     }
 
+    @Override
+    public void initializeWatermark(long watermark) {
+        this.currentWatermark = watermark;
+    }
+
     @Override
     public void registerProcessingTimeTimer(N namespace, long time) {
         // the currentWatermark == Long.MAX_VALUE indicates the timer was 
registered from the
diff --git 
a/flink-runtime/src/test/java/org/apache/flink/streaming/api/operators/TestInternalTimerService.java
 
b/flink-runtime/src/test/java/org/apache/flink/streaming/api/operators/TestInternalTimerService.java
index 826fa8b3c26..a81f5eeff20 100644
--- 
a/flink-runtime/src/test/java/org/apache/flink/streaming/api/operators/TestInternalTimerService.java
+++ 
b/flink-runtime/src/test/java/org/apache/flink/streaming/api/operators/TestInternalTimerService.java
@@ -67,6 +67,11 @@ public class TestInternalTimerService<K, N> implements 
InternalTimerService<N> {
         return currentWatermark;
     }
 
+    @Override
+    public void initializeWatermark(long watermark) {
+        this.currentWatermark = watermark;
+    }
+
     @Override
     public void registerProcessingTimeTimer(N namespace, long time) {
         @SuppressWarnings("unchecked")
diff --git 
a/flink-table/flink-table-runtime/src/main/java/org/apache/flink/table/runtime/operators/window/async/tvf/common/AsyncStateWindowAggOperator.java
 
b/flink-table/flink-table-runtime/src/main/java/org/apache/flink/table/runtime/operators/window/async/tvf/common/AsyncStateWindowAggOperator.java
index d629618638c..83d4439fd8d 100644
--- 
a/flink-table/flink-table-runtime/src/main/java/org/apache/flink/table/runtime/operators/window/async/tvf/common/AsyncStateWindowAggOperator.java
+++ 
b/flink-table/flink-table-runtime/src/main/java/org/apache/flink/table/runtime/operators/window/async/tvf/common/AsyncStateWindowAggOperator.java
@@ -116,6 +116,9 @@ public final class AsyncStateWindowAggOperator<K, W> 
extends AsyncStateTableStre
         internalTimerService =
                 getInternalTimerService(
                         "window-timers", 
windowProcessor.createWindowSerializer(), this);
+        // Restore the watermark of timerService to prevent expired data from 
being treated as
+        // not expired when flushWindowBuffer is executed.
+        internalTimerService.initializeWatermark(currentWatermark);
 
         windowProcessor.open(
                 new WindowProcessorAsyncStateContext<>(
diff --git 
a/flink-table/flink-table-runtime/src/main/java/org/apache/flink/table/runtime/operators/window/tvf/common/WindowAggOperator.java
 
b/flink-table/flink-table-runtime/src/main/java/org/apache/flink/table/runtime/operators/window/tvf/common/WindowAggOperator.java
index d69c4eaa0c1..093a5882807 100644
--- 
a/flink-table/flink-table-runtime/src/main/java/org/apache/flink/table/runtime/operators/window/tvf/common/WindowAggOperator.java
+++ 
b/flink-table/flink-table-runtime/src/main/java/org/apache/flink/table/runtime/operators/window/tvf/common/WindowAggOperator.java
@@ -145,7 +145,9 @@ public final class WindowAggOperator<K, W> extends 
TableStreamOperator<RowData>
         internalTimerService =
                 getInternalTimerService(
                         "window-timers", 
windowProcessor.createWindowSerializer(), this);
-
+        // Restore the watermark of timerService to prevent expired data from 
being treated as
+        // not expired when flushWindowBuffer is executed.
+        internalTimerService.initializeWatermark(currentWatermark);
         windowProcessor.open(
                 new WindowProcessorSyncStateContext<>(
                         getContainingTask(),
diff --git 
a/flink-table/flink-table-runtime/src/test/java/org/apache/flink/table/runtime/operators/aggregate/window/SlicingWindowAggOperatorTest.java
 
b/flink-table/flink-table-runtime/src/test/java/org/apache/flink/table/runtime/operators/aggregate/window/SlicingWindowAggOperatorTest.java
index bb975188f97..aaece6302f1 100644
--- 
a/flink-table/flink-table-runtime/src/test/java/org/apache/flink/table/runtime/operators/aggregate/window/SlicingWindowAggOperatorTest.java
+++ 
b/flink-table/flink-table-runtime/src/test/java/org/apache/flink/table/runtime/operators/aggregate/window/SlicingWindowAggOperatorTest.java
@@ -161,6 +161,113 @@ class SlicingWindowAggOperatorTest extends 
WindowAggOperatorTestBase {
         testHarness.close();
     }
 
+    @TestTemplate
+    public void testEventTimeHoppingWindowWithExpiredSliceAndRestore() throws 
Exception {
+        final SliceAssigner assigner =
+                SliceAssigners.hopping(
+                        2, shiftTimeZone, Duration.ofSeconds(3), 
Duration.ofSeconds(1));
+        final SlicingSumAndCountAggsFunction aggsFunction =
+                new SlicingSumAndCountAggsFunction(assigner);
+        OneInputStreamOperator<RowData, RowData> operator =
+                buildWindowOperator(assigner, aggsFunction, 1);
+
+        OneInputStreamOperatorTestHarness<RowData, RowData> testHarness =
+                createTestHarness(operator);
+
+        testHarness.setup(OUT_SERIALIZER);
+        testHarness.open();
+
+        // 1. process elements
+        ConcurrentLinkedQueue<Object> expectedOutput = new 
ConcurrentLinkedQueue<>();
+
+        testHarness.processElement(insertRecord("key1", 1, 
fromEpochMillis(1020L)));
+        testHarness.processElement(insertRecord("key1", 1, 
fromEpochMillis(1001L)));
+        testHarness.processElement(insertRecord("key1", 1, 
fromEpochMillis(1999L)));
+
+        testHarness.processWatermark(new Watermark(2001));
+        expectedOutput.add(insertRecord("key1", 3L, 3L, localMills(-1000L), 
localMills(2000L)));
+        expectedOutput.add(new Watermark(2001));
+        ASSERTER.assertOutputEqualsSorted(
+                "Output was not correct.", expectedOutput, 
testHarness.getOutput());
+
+        // 2. do a snapshot, close and restore again
+        testHarness.prepareSnapshotPreBarrier(0L);
+        OperatorSubtaskState snapshot = testHarness.snapshot(0L, 0);
+        testHarness.close();
+
+        assertThat(aggsFunction.closeCalled.get()).as("Close was not 
called.").isGreaterThan(0);
+
+        expectedOutput.clear();
+        testHarness = createTestHarness(operator);
+        testHarness.setup(OUT_SERIALIZER);
+        testHarness.initializeState(snapshot);
+        testHarness.open();
+
+        // 3. process elements
+        // Expired slice but belong to other window.
+        testHarness.processElement(insertRecord("key2", 1, 
fromEpochMillis(1500L)));
+
+        testHarness.processElement(insertRecord("key2", 1, 
fromEpochMillis(2998L)));
+        testHarness.processElement(insertRecord("key2", 1, 
fromEpochMillis(2999L)));
+        testHarness.processElement(insertRecord("key2", 1, 
fromEpochMillis(2000L)));
+
+        testHarness.processWatermark(new Watermark(2999));
+        expectedOutput.add(insertRecord("key1", 3L, 3L, localMills(0L), 
localMills(3000L)));
+        expectedOutput.add(insertRecord("key2", 4L, 4L, localMills(0L), 
localMills(3000L)));
+        expectedOutput.add(new Watermark(2999));
+        ASSERTER.assertOutputEqualsSorted(
+                "Output was not correct.", expectedOutput, 
testHarness.getOutput());
+
+        testHarness.close();
+    }
+
+    @TestTemplate
+    public void testEventTimeHoppingWindowWithExpiredSliceAndNoRestore() 
throws Exception {
+        final SliceAssigner assigner =
+                SliceAssigners.hopping(
+                        2, shiftTimeZone, Duration.ofSeconds(3), 
Duration.ofSeconds(1));
+        final SlicingSumAndCountAggsFunction aggsFunction =
+                new SlicingSumAndCountAggsFunction(assigner);
+        OneInputStreamOperator<RowData, RowData> operator =
+                buildWindowOperator(assigner, aggsFunction, 1);
+
+        OneInputStreamOperatorTestHarness<RowData, RowData> testHarness =
+                createTestHarness(operator);
+
+        testHarness.setup(OUT_SERIALIZER);
+        testHarness.open();
+
+        // 1. process elements
+        ConcurrentLinkedQueue<Object> expectedOutput = new 
ConcurrentLinkedQueue<>();
+
+        testHarness.processElement(insertRecord("key1", 1, 
fromEpochMillis(1020L)));
+        testHarness.processElement(insertRecord("key1", 1, 
fromEpochMillis(1001L)));
+        testHarness.processElement(insertRecord("key1", 1, 
fromEpochMillis(1999L)));
+
+        testHarness.processWatermark(new Watermark(2001));
+        expectedOutput.add(insertRecord("key1", 3L, 3L, localMills(-1000L), 
localMills(2000L)));
+        expectedOutput.add(new Watermark(2001));
+        ASSERTER.assertOutputEqualsSorted(
+                "Output was not correct.", expectedOutput, 
testHarness.getOutput());
+
+        // 2. process elements
+        // Expired slice but belong to other window.
+        testHarness.processElement(insertRecord("key2", 1, 
fromEpochMillis(1500L)));
+
+        testHarness.processElement(insertRecord("key2", 1, 
fromEpochMillis(2998L)));
+        testHarness.processElement(insertRecord("key2", 1, 
fromEpochMillis(2999L)));
+        testHarness.processElement(insertRecord("key2", 1, 
fromEpochMillis(2000L)));
+
+        testHarness.processWatermark(new Watermark(2999));
+        expectedOutput.add(insertRecord("key1", 3L, 3L, localMills(0L), 
localMills(3000L)));
+        expectedOutput.add(insertRecord("key2", 4L, 4L, localMills(0L), 
localMills(3000L)));
+        expectedOutput.add(new Watermark(2999));
+        ASSERTER.assertOutputEqualsSorted(
+                "Output was not correct.", expectedOutput, 
testHarness.getOutput());
+
+        testHarness.close();
+    }
+
     @TestTemplate
     void testProcessingTimeHoppingWindows() throws Exception {
         final SliceAssigner assigner =

Reply via email to