This is an automated email from the ASF dual-hosted git repository. ron pushed a commit to branch master in repository https://gitbox.apache.org/repos/asf/flink.git
commit bab14f9fabec6adc2f2a6aad8ecbeef59e34e911 Author: xuyang <[email protected]> AuthorDate: Wed Jan 24 20:09:39 2024 +0800 [FLINK-34100][table] Fix the window table function outputs wrong row kind This closes #24162 --- .../operator/WindowTableFunctionOperatorBase.java | 4 +- .../AlignedWindowTableFunctionOperatorTest.java | 59 ++++++ .../UnalignedWindowTableFunctionOperatorTest.java | 231 +++++++++++++++++++++ 3 files changed, 293 insertions(+), 1 deletion(-) diff --git a/flink-table/flink-table-runtime/src/main/java/org/apache/flink/table/runtime/operators/window/tvf/operator/WindowTableFunctionOperatorBase.java b/flink-table/flink-table-runtime/src/main/java/org/apache/flink/table/runtime/operators/window/tvf/operator/WindowTableFunctionOperatorBase.java index 9c60ac9af58..8f8fafbe356 100644 --- a/flink-table/flink-table-runtime/src/main/java/org/apache/flink/table/runtime/operators/window/tvf/operator/WindowTableFunctionOperatorBase.java +++ b/flink-table/flink-table-runtime/src/main/java/org/apache/flink/table/runtime/operators/window/tvf/operator/WindowTableFunctionOperatorBase.java @@ -113,7 +113,9 @@ public abstract class WindowTableFunctionOperatorBase extends TableStreamOperato 2, TimestampData.fromEpochMillis( toEpochMills(window.maxTimestamp(), shiftTimeZone))); - collector.collect(outRow.replace(inputRow, windowProperties)); + outRow.replace(inputRow, windowProperties); + outRow.setRowKind(inputRow.getRowKind()); + collector.collect(outRow); } } diff --git a/flink-table/flink-table-runtime/src/test/java/org/apache/flink/table/runtime/operators/window/tvf/operator/AlignedWindowTableFunctionOperatorTest.java b/flink-table/flink-table-runtime/src/test/java/org/apache/flink/table/runtime/operators/window/tvf/operator/AlignedWindowTableFunctionOperatorTest.java index b22c9a3b1b7..2d6569ffa97 100644 --- a/flink-table/flink-table-runtime/src/test/java/org/apache/flink/table/runtime/operators/window/tvf/operator/AlignedWindowTableFunctionOperatorTest.java +++ b/flink-table/flink-table-runtime/src/test/java/org/apache/flink/table/runtime/operators/window/tvf/operator/AlignedWindowTableFunctionOperatorTest.java @@ -26,6 +26,7 @@ import org.apache.flink.table.runtime.operators.window.groupwindow.assigners.Cum import org.apache.flink.table.runtime.operators.window.groupwindow.assigners.GroupWindowAssigner; import org.apache.flink.table.runtime.operators.window.groupwindow.assigners.SlidingWindowAssigner; import org.apache.flink.table.runtime.operators.window.groupwindow.assigners.TumblingWindowAssigner; +import org.apache.flink.types.RowKind; import org.junit.Test; import org.junit.runner.RunWith; @@ -276,6 +277,64 @@ public class AlignedWindowTableFunctionOperatorTest extends WindowTableFunctionO testHarness.close(); } + @Test + public void testConsumingChangelogRecords() throws Exception { + final TumblingWindowAssigner assigner = TumblingWindowAssigner.of(Duration.ofSeconds(3)); + OneInputStreamOperatorTestHarness<RowData, RowData> testHarness = + createTestHarness(assigner, shiftTimeZone); + testHarness.setup(OUT_SERIALIZER); + testHarness.open(); + + // process elements + ConcurrentLinkedQueue<Object> expectedOutput = new ConcurrentLinkedQueue<>(); + testHarness.processElement(binaryRecord(RowKind.INSERT, "key1", 1, 20L)); + testHarness.processElement(binaryRecord(RowKind.UPDATE_BEFORE, "key1", 1, 30L)); + testHarness.processElement(binaryRecord(RowKind.UPDATE_AFTER, "key1", 1, 40L)); + testHarness.processWatermark(new Watermark(999)); + // append 3 fields: window_start, window_end, window_time + expectedOutput.add( + binaryRecord( + RowKind.INSERT, "key1", 1, 20L, localMills(0L), localMills(3000L), 2999L)); + expectedOutput.add( + binaryRecord( + RowKind.UPDATE_BEFORE, + "key1", + 1, + 30L, + localMills(0L), + localMills(3000L), + 2999L)); + expectedOutput.add( + binaryRecord( + RowKind.UPDATE_AFTER, + "key1", + 1, + 40L, + localMills(0L), + localMills(3000L), + 2999L)); + expectedOutput.add(new Watermark(999)); + + ASSERTER.assertOutputEqualsSorted( + "Output was not correct.", expectedOutput, testHarness.getOutput()); + + testHarness.processWatermark(new Watermark(9999)); + expectedOutput.add(new Watermark(9999)); + + ASSERTER.assertOutputEqualsSorted( + "Output was not correct.", expectedOutput, testHarness.getOutput()); + + // late records would not be dropped + testHarness.processElement(binaryRecord(RowKind.DELETE, "key1", 1, 200L)); + + expectedOutput.add( + binaryRecord( + RowKind.DELETE, "key1", 1, 200L, localMills(0L), localMills(3000L), 2999L)); + ASSERTER.assertOutputEqualsSorted( + "Output was not correct.", expectedOutput, testHarness.getOutput()); + testHarness.close(); + } + private OneInputStreamOperatorTestHarness<RowData, RowData> createTestHarness( GroupWindowAssigner<TimeWindow> windowAssigner, ZoneId shiftTimeZone) throws Exception { AlignedWindowTableFunctionOperator operator = diff --git a/flink-table/flink-table-runtime/src/test/java/org/apache/flink/table/runtime/operators/window/tvf/operator/UnalignedWindowTableFunctionOperatorTest.java b/flink-table/flink-table-runtime/src/test/java/org/apache/flink/table/runtime/operators/window/tvf/operator/UnalignedWindowTableFunctionOperatorTest.java index 3bc2df1bea2..e7dd43d77cd 100644 --- a/flink-table/flink-table-runtime/src/test/java/org/apache/flink/table/runtime/operators/window/tvf/operator/UnalignedWindowTableFunctionOperatorTest.java +++ b/flink-table/flink-table-runtime/src/test/java/org/apache/flink/table/runtime/operators/window/tvf/operator/UnalignedWindowTableFunctionOperatorTest.java @@ -32,6 +32,7 @@ import org.apache.flink.table.runtime.operators.window.groupwindow.assigners.Ses import org.apache.flink.table.runtime.typeutils.RowDataSerializer; import org.apache.flink.table.types.logical.LogicalType; import org.apache.flink.table.utils.HandwrittenSelectorUtil; +import org.apache.flink.types.RowKind; import org.junit.Test; import org.junit.runner.RunWith; @@ -43,6 +44,7 @@ import java.util.Arrays; import java.util.Collection; import java.util.concurrent.ConcurrentLinkedQueue; +import static org.apache.flink.table.runtime.util.StreamRecordUtils.binaryRecord; import static org.assertj.core.api.Assertions.assertThat; /** Test for {@link UnalignedWindowTableFunctionOperator}. */ @@ -156,6 +158,140 @@ public class UnalignedWindowTableFunctionOperatorTest extends WindowTableFunctio testHarness.close(); } + @Test + public void testEventTimeSessionWindowsWithChangelog() throws Exception { + final SessionWindowAssigner assigner = SessionWindowAssigner.withGap(Duration.ofSeconds(3)); + UnalignedWindowTableFunctionOperator operator = createOperator(assigner, ROW_TIME_INDEX); + OneInputStreamOperatorTestHarness<RowData, RowData> testHarness = + createTestHarness(operator); + testHarness.setup(OUT_SERIALIZER); + testHarness.open(); + + // process elements + ConcurrentLinkedQueue<Object> expectedOutput = new ConcurrentLinkedQueue<>(); + + testHarness.processElement(binaryRecord(RowKind.INSERT, "key1", 1, 20L)); + testHarness.processElement(binaryRecord(RowKind.DELETE, "key2", 1, 1999L)); + testHarness.processElement(binaryRecord(RowKind.INSERT, "key3", 1, 2999L)); + testHarness.processElement(binaryRecord(RowKind.INSERT, "key4", 1, 1999L)); + testHarness.processWatermark(new Watermark(999)); + expectedOutput.add(new Watermark(999)); + + ASSERTER.assertOutputEqualsSorted( + "Output was not correct.", expectedOutput, testHarness.getOutput()); + + testHarness.processElement(binaryRecord(RowKind.DELETE, "key4", 1, 2999L)); + + testHarness.processWatermark(new Watermark(1999)); + expectedOutput.add(new Watermark(1999)); + + testHarness.processElement(binaryRecord(RowKind.UPDATE_BEFORE, "key3", 1, 3999L)); + + testHarness.processElement(binaryRecord(RowKind.UPDATE_AFTER, "key3", 1, 4999L)); + + testHarness.processWatermark(new Watermark(2999)); + expectedOutput.add(new Watermark(2999)); + + ASSERTER.assertOutputEqualsSorted( + "Output was not correct.", expectedOutput, testHarness.getOutput()); + + testHarness.processWatermark(new Watermark(3999)); + // append 3 fields: window_start, window_end, window_time + expectedOutput.add( + binaryRecord( + RowKind.INSERT, "key1", 1, 20L, localMills(20L), localMills(3020L), 3019L)); + expectedOutput.add(new Watermark(3999)); + + ASSERTER.assertOutputEqualsSorted( + "Output was not correct.", expectedOutput, testHarness.getOutput()); + + testHarness.processWatermark(new Watermark(4999)); + expectedOutput.add( + binaryRecord( + RowKind.DELETE, + "key2", + 1, + 1999L, + localMills(1999L), + localMills(4999L), + 4998L)); + expectedOutput.add(new Watermark(4999)); + + ASSERTER.assertOutputEqualsSorted( + "Output was not correct.", expectedOutput, testHarness.getOutput()); + + testHarness.processWatermark(new Watermark(5999)); + expectedOutput.add( + binaryRecord( + RowKind.INSERT, + "key4", + 1, + 1999L, + localMills(1999L), + localMills(5999L), + 5998L)); + expectedOutput.add( + binaryRecord( + RowKind.DELETE, + "key4", + 1, + 2999L, + localMills(1999L), + localMills(5999L), + 5998L)); + expectedOutput.add(new Watermark(5999)); + + ASSERTER.assertOutputEqualsSorted( + "Output was not correct.", expectedOutput, testHarness.getOutput()); + + // do a snapshot, close and restore again + testHarness.prepareSnapshotPreBarrier(0L); + OperatorSubtaskState snapshot = testHarness.snapshot(0L, 0); + testHarness.close(); + + expectedOutput.clear(); + testHarness = createTestHarness(operator); + testHarness.setup(); + testHarness.initializeState(snapshot); + testHarness.open(); + + testHarness.processWatermark(new Watermark(7999)); + + expectedOutput.add( + binaryRecord( + RowKind.INSERT, + "key3", + 1, + 2999L, + localMills(2999L), + localMills(7999L), + 7998L)); + expectedOutput.add( + binaryRecord( + RowKind.UPDATE_BEFORE, + "key3", + 1, + 3999L, + localMills(2999L), + localMills(7999L), + 7998L)); + expectedOutput.add( + binaryRecord( + RowKind.UPDATE_AFTER, + "key3", + 1, + 4999L, + localMills(2999L), + localMills(7999L), + 7998L)); + expectedOutput.add(new Watermark(7999)); + + ASSERTER.assertOutputEqualsSorted( + "Output was not correct.", expectedOutput, testHarness.getOutput()); + + testHarness.close(); + } + @Test public void testProcessTimeSessionWindows() throws Exception { final SessionWindowAssigner assigner = @@ -246,6 +382,101 @@ public class UnalignedWindowTableFunctionOperatorTest extends WindowTableFunctio testHarness.close(); } + @Test + public void testProcessTimeSessionWindowsWithChangelog() throws Exception { + final SessionWindowAssigner assigner = + SessionWindowAssigner.withGap(Duration.ofSeconds(3)).withProcessingTime(); + UnalignedWindowTableFunctionOperator operator = createOperator(assigner, -1); + OneInputStreamOperatorTestHarness<RowData, RowData> testHarness = + createTestHarness(operator); + testHarness.setup(OUT_SERIALIZER); + testHarness.open(); + + // process elements + ConcurrentLinkedQueue<Object> expectedOutput = new ConcurrentLinkedQueue<>(); + + // timestamp is ignored in processing time + testHarness.setProcessingTime(20L); + testHarness.processElement(binaryRecord(RowKind.INSERT, "key1", 1, Long.MAX_VALUE)); + testHarness.processElement(binaryRecord(RowKind.INSERT, "key2", 1, Long.MAX_VALUE)); + + ASSERTER.assertOutputEqualsSorted( + "Output was not correct.", expectedOutput, testHarness.getOutput()); + + testHarness.setProcessingTime(1999); + testHarness.processElement(binaryRecord(RowKind.DELETE, "key3", 1, Long.MAX_VALUE)); + testHarness.processElement(binaryRecord(RowKind.DELETE, "key2", 1, Long.MAX_VALUE)); + + testHarness.setProcessingTime(2999); + + ASSERTER.assertOutputEqualsSorted( + "Output was not correct.", expectedOutput, testHarness.getOutput()); + + testHarness.setProcessingTime(3999L); + + // append 3 fields: window_start, window_end, window_time + expectedOutput.add( + binaryRecord( + RowKind.INSERT, + "key1", + 1, + Long.MAX_VALUE, + localMills(20L), + localMills(3020L), + 3019L)); + + ASSERTER.assertOutputEqualsSorted( + "Output was not correct.", expectedOutput, testHarness.getOutput()); + + // do a snapshot, close and restore again + testHarness.prepareSnapshotPreBarrier(0L); + OperatorSubtaskState snapshot = testHarness.snapshot(0L, 0); + testHarness.close(); + + expectedOutput.clear(); + testHarness = createTestHarness(operator); + testHarness.setup(); + testHarness.initializeState(snapshot); + testHarness.open(); + + testHarness.setProcessingTime(4999); + + expectedOutput.add( + binaryRecord( + RowKind.INSERT, + "key2", + 1, + Long.MAX_VALUE, + localMills(20L), + localMills(4999L), + 4998L)); + + expectedOutput.add( + binaryRecord( + RowKind.DELETE, + "key3", + 1, + Long.MAX_VALUE, + localMills(1999L), + localMills(4999L), + 4998L)); + + expectedOutput.add( + binaryRecord( + RowKind.DELETE, + "key2", + 1, + Long.MAX_VALUE, + localMills(20L), + localMills(4999L), + 4998L)); + + ASSERTER.assertOutputEqualsSorted( + "Output was not correct.", expectedOutput, testHarness.getOutput()); + + testHarness.close(); + } + @Test public void testSessionWindowsWithoutPartitionKeys() throws Exception { final SessionWindowAssigner assigner = SessionWindowAssigner.withGap(Duration.ofSeconds(3));
