This is an automated email from the ASF dual-hosted git repository. xuyangzhong pushed a commit to branch master in repository https://gitbox.apache.org/repos/asf/flink.git
The following commit(s) were added to refs/heads/master by this push: new fdce52cfa2e [FLINK-36663][Window]Fix the first processWatermark has extra data after restore by restore timeService's watermark. (#25637) fdce52cfa2e is described below commit fdce52cfa2ee914d5613976857399e60e84662aa Author: xing1mo <44420959+xing...@users.noreply.github.com> AuthorDate: Tue Jul 8 15:07:38 2025 +0800 [FLINK-36663][Window]Fix the first processWatermark has extra data after restore by restore timeService's watermark. (#25637) --- .../api/operators/InternalTimerService.java | 3 + .../api/operators/InternalTimerServiceImpl.java | 5 + .../state/BatchExecutionInternalTimeService.java | 5 + .../api/operators/TestInternalTimerService.java | 5 + .../tvf/common/AsyncStateWindowAggOperator.java | 3 + .../window/tvf/common/WindowAggOperator.java | 4 +- .../window/SlicingWindowAggOperatorTest.java | 107 +++++++++++++++++++++ 7 files changed, 131 insertions(+), 1 deletion(-) diff --git a/flink-runtime/src/main/java/org/apache/flink/streaming/api/operators/InternalTimerService.java b/flink-runtime/src/main/java/org/apache/flink/streaming/api/operators/InternalTimerService.java index 191c2675779..2e78da83d94 100644 --- a/flink-runtime/src/main/java/org/apache/flink/streaming/api/operators/InternalTimerService.java +++ b/flink-runtime/src/main/java/org/apache/flink/streaming/api/operators/InternalTimerService.java @@ -38,6 +38,9 @@ public interface InternalTimerService<N> { /** Returns the current event-time watermark. */ long currentWatermark(); + /** Initialize watermark after restore. */ + void initializeWatermark(long watermark); + /** * Registers a timer to be fired when processing time passes the given time. The namespace you * pass here will be provided when the timer fires. diff --git a/flink-runtime/src/main/java/org/apache/flink/streaming/api/operators/InternalTimerServiceImpl.java b/flink-runtime/src/main/java/org/apache/flink/streaming/api/operators/InternalTimerServiceImpl.java index 269ba493b18..e24e8c9a62d 100644 --- a/flink-runtime/src/main/java/org/apache/flink/streaming/api/operators/InternalTimerServiceImpl.java +++ b/flink-runtime/src/main/java/org/apache/flink/streaming/api/operators/InternalTimerServiceImpl.java @@ -224,6 +224,11 @@ public class InternalTimerServiceImpl<K, N> implements InternalTimerService<N> { return currentWatermark; } + @Override + public void initializeWatermark(long watermark) { + this.currentWatermark = watermark; + } + @Override public void registerProcessingTimeTimer(N namespace, long time) { InternalTimer<K, N> oldHead = processingTimeTimersQueue.peek(); diff --git a/flink-runtime/src/main/java/org/apache/flink/streaming/api/operators/sorted/state/BatchExecutionInternalTimeService.java b/flink-runtime/src/main/java/org/apache/flink/streaming/api/operators/sorted/state/BatchExecutionInternalTimeService.java index 771afecd2ba..8fd71e760d7 100644 --- a/flink-runtime/src/main/java/org/apache/flink/streaming/api/operators/sorted/state/BatchExecutionInternalTimeService.java +++ b/flink-runtime/src/main/java/org/apache/flink/streaming/api/operators/sorted/state/BatchExecutionInternalTimeService.java @@ -82,6 +82,11 @@ public class BatchExecutionInternalTimeService<K, N> implements InternalTimerSer return currentWatermark; } + @Override + public void initializeWatermark(long watermark) { + this.currentWatermark = watermark; + } + @Override public void registerProcessingTimeTimer(N namespace, long time) { // the currentWatermark == Long.MAX_VALUE indicates the timer was registered from the diff --git a/flink-runtime/src/test/java/org/apache/flink/streaming/api/operators/TestInternalTimerService.java b/flink-runtime/src/test/java/org/apache/flink/streaming/api/operators/TestInternalTimerService.java index 826fa8b3c26..a81f5eeff20 100644 --- a/flink-runtime/src/test/java/org/apache/flink/streaming/api/operators/TestInternalTimerService.java +++ b/flink-runtime/src/test/java/org/apache/flink/streaming/api/operators/TestInternalTimerService.java @@ -67,6 +67,11 @@ public class TestInternalTimerService<K, N> implements InternalTimerService<N> { return currentWatermark; } + @Override + public void initializeWatermark(long watermark) { + this.currentWatermark = watermark; + } + @Override public void registerProcessingTimeTimer(N namespace, long time) { @SuppressWarnings("unchecked") diff --git a/flink-table/flink-table-runtime/src/main/java/org/apache/flink/table/runtime/operators/window/async/tvf/common/AsyncStateWindowAggOperator.java b/flink-table/flink-table-runtime/src/main/java/org/apache/flink/table/runtime/operators/window/async/tvf/common/AsyncStateWindowAggOperator.java index d629618638c..83d4439fd8d 100644 --- a/flink-table/flink-table-runtime/src/main/java/org/apache/flink/table/runtime/operators/window/async/tvf/common/AsyncStateWindowAggOperator.java +++ b/flink-table/flink-table-runtime/src/main/java/org/apache/flink/table/runtime/operators/window/async/tvf/common/AsyncStateWindowAggOperator.java @@ -116,6 +116,9 @@ public final class AsyncStateWindowAggOperator<K, W> extends AsyncStateTableStre internalTimerService = getInternalTimerService( "window-timers", windowProcessor.createWindowSerializer(), this); + // Restore the watermark of timerService to prevent expired data from being treated as + // not expired when flushWindowBuffer is executed. + internalTimerService.initializeWatermark(currentWatermark); windowProcessor.open( new WindowProcessorAsyncStateContext<>( diff --git a/flink-table/flink-table-runtime/src/main/java/org/apache/flink/table/runtime/operators/window/tvf/common/WindowAggOperator.java b/flink-table/flink-table-runtime/src/main/java/org/apache/flink/table/runtime/operators/window/tvf/common/WindowAggOperator.java index d69c4eaa0c1..093a5882807 100644 --- a/flink-table/flink-table-runtime/src/main/java/org/apache/flink/table/runtime/operators/window/tvf/common/WindowAggOperator.java +++ b/flink-table/flink-table-runtime/src/main/java/org/apache/flink/table/runtime/operators/window/tvf/common/WindowAggOperator.java @@ -145,7 +145,9 @@ public final class WindowAggOperator<K, W> extends TableStreamOperator<RowData> internalTimerService = getInternalTimerService( "window-timers", windowProcessor.createWindowSerializer(), this); - + // Restore the watermark of timerService to prevent expired data from being treated as + // not expired when flushWindowBuffer is executed. + internalTimerService.initializeWatermark(currentWatermark); windowProcessor.open( new WindowProcessorSyncStateContext<>( getContainingTask(), diff --git a/flink-table/flink-table-runtime/src/test/java/org/apache/flink/table/runtime/operators/aggregate/window/SlicingWindowAggOperatorTest.java b/flink-table/flink-table-runtime/src/test/java/org/apache/flink/table/runtime/operators/aggregate/window/SlicingWindowAggOperatorTest.java index bb975188f97..aaece6302f1 100644 --- a/flink-table/flink-table-runtime/src/test/java/org/apache/flink/table/runtime/operators/aggregate/window/SlicingWindowAggOperatorTest.java +++ b/flink-table/flink-table-runtime/src/test/java/org/apache/flink/table/runtime/operators/aggregate/window/SlicingWindowAggOperatorTest.java @@ -161,6 +161,113 @@ class SlicingWindowAggOperatorTest extends WindowAggOperatorTestBase { testHarness.close(); } + @TestTemplate + public void testEventTimeHoppingWindowWithExpiredSliceAndRestore() throws Exception { + final SliceAssigner assigner = + SliceAssigners.hopping( + 2, shiftTimeZone, Duration.ofSeconds(3), Duration.ofSeconds(1)); + final SlicingSumAndCountAggsFunction aggsFunction = + new SlicingSumAndCountAggsFunction(assigner); + OneInputStreamOperator<RowData, RowData> operator = + buildWindowOperator(assigner, aggsFunction, 1); + + OneInputStreamOperatorTestHarness<RowData, RowData> testHarness = + createTestHarness(operator); + + testHarness.setup(OUT_SERIALIZER); + testHarness.open(); + + // 1. process elements + ConcurrentLinkedQueue<Object> expectedOutput = new ConcurrentLinkedQueue<>(); + + testHarness.processElement(insertRecord("key1", 1, fromEpochMillis(1020L))); + testHarness.processElement(insertRecord("key1", 1, fromEpochMillis(1001L))); + testHarness.processElement(insertRecord("key1", 1, fromEpochMillis(1999L))); + + testHarness.processWatermark(new Watermark(2001)); + expectedOutput.add(insertRecord("key1", 3L, 3L, localMills(-1000L), localMills(2000L))); + expectedOutput.add(new Watermark(2001)); + ASSERTER.assertOutputEqualsSorted( + "Output was not correct.", expectedOutput, testHarness.getOutput()); + + // 2. do a snapshot, close and restore again + testHarness.prepareSnapshotPreBarrier(0L); + OperatorSubtaskState snapshot = testHarness.snapshot(0L, 0); + testHarness.close(); + + assertThat(aggsFunction.closeCalled.get()).as("Close was not called.").isGreaterThan(0); + + expectedOutput.clear(); + testHarness = createTestHarness(operator); + testHarness.setup(OUT_SERIALIZER); + testHarness.initializeState(snapshot); + testHarness.open(); + + // 3. process elements + // Expired slice but belong to other window. + testHarness.processElement(insertRecord("key2", 1, fromEpochMillis(1500L))); + + testHarness.processElement(insertRecord("key2", 1, fromEpochMillis(2998L))); + testHarness.processElement(insertRecord("key2", 1, fromEpochMillis(2999L))); + testHarness.processElement(insertRecord("key2", 1, fromEpochMillis(2000L))); + + testHarness.processWatermark(new Watermark(2999)); + expectedOutput.add(insertRecord("key1", 3L, 3L, localMills(0L), localMills(3000L))); + expectedOutput.add(insertRecord("key2", 4L, 4L, localMills(0L), localMills(3000L))); + expectedOutput.add(new Watermark(2999)); + ASSERTER.assertOutputEqualsSorted( + "Output was not correct.", expectedOutput, testHarness.getOutput()); + + testHarness.close(); + } + + @TestTemplate + public void testEventTimeHoppingWindowWithExpiredSliceAndNoRestore() throws Exception { + final SliceAssigner assigner = + SliceAssigners.hopping( + 2, shiftTimeZone, Duration.ofSeconds(3), Duration.ofSeconds(1)); + final SlicingSumAndCountAggsFunction aggsFunction = + new SlicingSumAndCountAggsFunction(assigner); + OneInputStreamOperator<RowData, RowData> operator = + buildWindowOperator(assigner, aggsFunction, 1); + + OneInputStreamOperatorTestHarness<RowData, RowData> testHarness = + createTestHarness(operator); + + testHarness.setup(OUT_SERIALIZER); + testHarness.open(); + + // 1. process elements + ConcurrentLinkedQueue<Object> expectedOutput = new ConcurrentLinkedQueue<>(); + + testHarness.processElement(insertRecord("key1", 1, fromEpochMillis(1020L))); + testHarness.processElement(insertRecord("key1", 1, fromEpochMillis(1001L))); + testHarness.processElement(insertRecord("key1", 1, fromEpochMillis(1999L))); + + testHarness.processWatermark(new Watermark(2001)); + expectedOutput.add(insertRecord("key1", 3L, 3L, localMills(-1000L), localMills(2000L))); + expectedOutput.add(new Watermark(2001)); + ASSERTER.assertOutputEqualsSorted( + "Output was not correct.", expectedOutput, testHarness.getOutput()); + + // 2. process elements + // Expired slice but belong to other window. + testHarness.processElement(insertRecord("key2", 1, fromEpochMillis(1500L))); + + testHarness.processElement(insertRecord("key2", 1, fromEpochMillis(2998L))); + testHarness.processElement(insertRecord("key2", 1, fromEpochMillis(2999L))); + testHarness.processElement(insertRecord("key2", 1, fromEpochMillis(2000L))); + + testHarness.processWatermark(new Watermark(2999)); + expectedOutput.add(insertRecord("key1", 3L, 3L, localMills(0L), localMills(3000L))); + expectedOutput.add(insertRecord("key2", 4L, 4L, localMills(0L), localMills(3000L))); + expectedOutput.add(new Watermark(2999)); + ASSERTER.assertOutputEqualsSorted( + "Output was not correct.", expectedOutput, testHarness.getOutput()); + + testHarness.close(); + } + @TestTemplate void testProcessingTimeHoppingWindows() throws Exception { final SliceAssigner assigner =