This is an automated email from the ASF dual-hosted git repository.

ron pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/flink.git

commit bab14f9fabec6adc2f2a6aad8ecbeef59e34e911
Author: xuyang <[email protected]>
AuthorDate: Wed Jan 24 20:09:39 2024 +0800

    [FLINK-34100][table] Fix the window table function outputs wrong row kind
    
    This closes #24162
---
 .../operator/WindowTableFunctionOperatorBase.java  |   4 +-
 .../AlignedWindowTableFunctionOperatorTest.java    |  59 ++++++
 .../UnalignedWindowTableFunctionOperatorTest.java  | 231 +++++++++++++++++++++
 3 files changed, 293 insertions(+), 1 deletion(-)

diff --git 
a/flink-table/flink-table-runtime/src/main/java/org/apache/flink/table/runtime/operators/window/tvf/operator/WindowTableFunctionOperatorBase.java
 
b/flink-table/flink-table-runtime/src/main/java/org/apache/flink/table/runtime/operators/window/tvf/operator/WindowTableFunctionOperatorBase.java
index 9c60ac9af58..8f8fafbe356 100644
--- 
a/flink-table/flink-table-runtime/src/main/java/org/apache/flink/table/runtime/operators/window/tvf/operator/WindowTableFunctionOperatorBase.java
+++ 
b/flink-table/flink-table-runtime/src/main/java/org/apache/flink/table/runtime/operators/window/tvf/operator/WindowTableFunctionOperatorBase.java
@@ -113,7 +113,9 @@ public abstract class WindowTableFunctionOperatorBase 
extends TableStreamOperato
                     2,
                     TimestampData.fromEpochMillis(
                             toEpochMills(window.maxTimestamp(), 
shiftTimeZone)));
-            collector.collect(outRow.replace(inputRow, windowProperties));
+            outRow.replace(inputRow, windowProperties);
+            outRow.setRowKind(inputRow.getRowKind());
+            collector.collect(outRow);
         }
     }
 
diff --git 
a/flink-table/flink-table-runtime/src/test/java/org/apache/flink/table/runtime/operators/window/tvf/operator/AlignedWindowTableFunctionOperatorTest.java
 
b/flink-table/flink-table-runtime/src/test/java/org/apache/flink/table/runtime/operators/window/tvf/operator/AlignedWindowTableFunctionOperatorTest.java
index b22c9a3b1b7..2d6569ffa97 100644
--- 
a/flink-table/flink-table-runtime/src/test/java/org/apache/flink/table/runtime/operators/window/tvf/operator/AlignedWindowTableFunctionOperatorTest.java
+++ 
b/flink-table/flink-table-runtime/src/test/java/org/apache/flink/table/runtime/operators/window/tvf/operator/AlignedWindowTableFunctionOperatorTest.java
@@ -26,6 +26,7 @@ import 
org.apache.flink.table.runtime.operators.window.groupwindow.assigners.Cum
 import 
org.apache.flink.table.runtime.operators.window.groupwindow.assigners.GroupWindowAssigner;
 import 
org.apache.flink.table.runtime.operators.window.groupwindow.assigners.SlidingWindowAssigner;
 import 
org.apache.flink.table.runtime.operators.window.groupwindow.assigners.TumblingWindowAssigner;
+import org.apache.flink.types.RowKind;
 
 import org.junit.Test;
 import org.junit.runner.RunWith;
@@ -276,6 +277,64 @@ public class AlignedWindowTableFunctionOperatorTest 
extends WindowTableFunctionO
         testHarness.close();
     }
 
+    @Test
+    public void testConsumingChangelogRecords() throws Exception {
+        final TumblingWindowAssigner assigner = 
TumblingWindowAssigner.of(Duration.ofSeconds(3));
+        OneInputStreamOperatorTestHarness<RowData, RowData> testHarness =
+                createTestHarness(assigner, shiftTimeZone);
+        testHarness.setup(OUT_SERIALIZER);
+        testHarness.open();
+
+        // process elements
+        ConcurrentLinkedQueue<Object> expectedOutput = new 
ConcurrentLinkedQueue<>();
+        testHarness.processElement(binaryRecord(RowKind.INSERT, "key1", 1, 
20L));
+        testHarness.processElement(binaryRecord(RowKind.UPDATE_BEFORE, "key1", 
1, 30L));
+        testHarness.processElement(binaryRecord(RowKind.UPDATE_AFTER, "key1", 
1, 40L));
+        testHarness.processWatermark(new Watermark(999));
+        // append 3 fields: window_start, window_end, window_time
+        expectedOutput.add(
+                binaryRecord(
+                        RowKind.INSERT, "key1", 1, 20L, localMills(0L), 
localMills(3000L), 2999L));
+        expectedOutput.add(
+                binaryRecord(
+                        RowKind.UPDATE_BEFORE,
+                        "key1",
+                        1,
+                        30L,
+                        localMills(0L),
+                        localMills(3000L),
+                        2999L));
+        expectedOutput.add(
+                binaryRecord(
+                        RowKind.UPDATE_AFTER,
+                        "key1",
+                        1,
+                        40L,
+                        localMills(0L),
+                        localMills(3000L),
+                        2999L));
+        expectedOutput.add(new Watermark(999));
+
+        ASSERTER.assertOutputEqualsSorted(
+                "Output was not correct.", expectedOutput, 
testHarness.getOutput());
+
+        testHarness.processWatermark(new Watermark(9999));
+        expectedOutput.add(new Watermark(9999));
+
+        ASSERTER.assertOutputEqualsSorted(
+                "Output was not correct.", expectedOutput, 
testHarness.getOutput());
+
+        // late records would not be dropped
+        testHarness.processElement(binaryRecord(RowKind.DELETE, "key1", 1, 
200L));
+
+        expectedOutput.add(
+                binaryRecord(
+                        RowKind.DELETE, "key1", 1, 200L, localMills(0L), 
localMills(3000L), 2999L));
+        ASSERTER.assertOutputEqualsSorted(
+                "Output was not correct.", expectedOutput, 
testHarness.getOutput());
+        testHarness.close();
+    }
+
     private OneInputStreamOperatorTestHarness<RowData, RowData> 
createTestHarness(
             GroupWindowAssigner<TimeWindow> windowAssigner, ZoneId 
shiftTimeZone) throws Exception {
         AlignedWindowTableFunctionOperator operator =
diff --git 
a/flink-table/flink-table-runtime/src/test/java/org/apache/flink/table/runtime/operators/window/tvf/operator/UnalignedWindowTableFunctionOperatorTest.java
 
b/flink-table/flink-table-runtime/src/test/java/org/apache/flink/table/runtime/operators/window/tvf/operator/UnalignedWindowTableFunctionOperatorTest.java
index 3bc2df1bea2..e7dd43d77cd 100644
--- 
a/flink-table/flink-table-runtime/src/test/java/org/apache/flink/table/runtime/operators/window/tvf/operator/UnalignedWindowTableFunctionOperatorTest.java
+++ 
b/flink-table/flink-table-runtime/src/test/java/org/apache/flink/table/runtime/operators/window/tvf/operator/UnalignedWindowTableFunctionOperatorTest.java
@@ -32,6 +32,7 @@ import 
org.apache.flink.table.runtime.operators.window.groupwindow.assigners.Ses
 import org.apache.flink.table.runtime.typeutils.RowDataSerializer;
 import org.apache.flink.table.types.logical.LogicalType;
 import org.apache.flink.table.utils.HandwrittenSelectorUtil;
+import org.apache.flink.types.RowKind;
 
 import org.junit.Test;
 import org.junit.runner.RunWith;
@@ -43,6 +44,7 @@ import java.util.Arrays;
 import java.util.Collection;
 import java.util.concurrent.ConcurrentLinkedQueue;
 
+import static 
org.apache.flink.table.runtime.util.StreamRecordUtils.binaryRecord;
 import static org.assertj.core.api.Assertions.assertThat;
 
 /** Test for {@link UnalignedWindowTableFunctionOperator}. */
@@ -156,6 +158,140 @@ public class UnalignedWindowTableFunctionOperatorTest 
extends WindowTableFunctio
         testHarness.close();
     }
 
+    @Test
+    public void testEventTimeSessionWindowsWithChangelog() throws Exception {
+        final SessionWindowAssigner assigner = 
SessionWindowAssigner.withGap(Duration.ofSeconds(3));
+        UnalignedWindowTableFunctionOperator operator = 
createOperator(assigner, ROW_TIME_INDEX);
+        OneInputStreamOperatorTestHarness<RowData, RowData> testHarness =
+                createTestHarness(operator);
+        testHarness.setup(OUT_SERIALIZER);
+        testHarness.open();
+
+        // process elements
+        ConcurrentLinkedQueue<Object> expectedOutput = new 
ConcurrentLinkedQueue<>();
+
+        testHarness.processElement(binaryRecord(RowKind.INSERT, "key1", 1, 
20L));
+        testHarness.processElement(binaryRecord(RowKind.DELETE, "key2", 1, 
1999L));
+        testHarness.processElement(binaryRecord(RowKind.INSERT, "key3", 1, 
2999L));
+        testHarness.processElement(binaryRecord(RowKind.INSERT, "key4", 1, 
1999L));
+        testHarness.processWatermark(new Watermark(999));
+        expectedOutput.add(new Watermark(999));
+
+        ASSERTER.assertOutputEqualsSorted(
+                "Output was not correct.", expectedOutput, 
testHarness.getOutput());
+
+        testHarness.processElement(binaryRecord(RowKind.DELETE, "key4", 1, 
2999L));
+
+        testHarness.processWatermark(new Watermark(1999));
+        expectedOutput.add(new Watermark(1999));
+
+        testHarness.processElement(binaryRecord(RowKind.UPDATE_BEFORE, "key3", 
1, 3999L));
+
+        testHarness.processElement(binaryRecord(RowKind.UPDATE_AFTER, "key3", 
1, 4999L));
+
+        testHarness.processWatermark(new Watermark(2999));
+        expectedOutput.add(new Watermark(2999));
+
+        ASSERTER.assertOutputEqualsSorted(
+                "Output was not correct.", expectedOutput, 
testHarness.getOutput());
+
+        testHarness.processWatermark(new Watermark(3999));
+        // append 3 fields: window_start, window_end, window_time
+        expectedOutput.add(
+                binaryRecord(
+                        RowKind.INSERT, "key1", 1, 20L, localMills(20L), 
localMills(3020L), 3019L));
+        expectedOutput.add(new Watermark(3999));
+
+        ASSERTER.assertOutputEqualsSorted(
+                "Output was not correct.", expectedOutput, 
testHarness.getOutput());
+
+        testHarness.processWatermark(new Watermark(4999));
+        expectedOutput.add(
+                binaryRecord(
+                        RowKind.DELETE,
+                        "key2",
+                        1,
+                        1999L,
+                        localMills(1999L),
+                        localMills(4999L),
+                        4998L));
+        expectedOutput.add(new Watermark(4999));
+
+        ASSERTER.assertOutputEqualsSorted(
+                "Output was not correct.", expectedOutput, 
testHarness.getOutput());
+
+        testHarness.processWatermark(new Watermark(5999));
+        expectedOutput.add(
+                binaryRecord(
+                        RowKind.INSERT,
+                        "key4",
+                        1,
+                        1999L,
+                        localMills(1999L),
+                        localMills(5999L),
+                        5998L));
+        expectedOutput.add(
+                binaryRecord(
+                        RowKind.DELETE,
+                        "key4",
+                        1,
+                        2999L,
+                        localMills(1999L),
+                        localMills(5999L),
+                        5998L));
+        expectedOutput.add(new Watermark(5999));
+
+        ASSERTER.assertOutputEqualsSorted(
+                "Output was not correct.", expectedOutput, 
testHarness.getOutput());
+
+        // do a snapshot, close and restore again
+        testHarness.prepareSnapshotPreBarrier(0L);
+        OperatorSubtaskState snapshot = testHarness.snapshot(0L, 0);
+        testHarness.close();
+
+        expectedOutput.clear();
+        testHarness = createTestHarness(operator);
+        testHarness.setup();
+        testHarness.initializeState(snapshot);
+        testHarness.open();
+
+        testHarness.processWatermark(new Watermark(7999));
+
+        expectedOutput.add(
+                binaryRecord(
+                        RowKind.INSERT,
+                        "key3",
+                        1,
+                        2999L,
+                        localMills(2999L),
+                        localMills(7999L),
+                        7998L));
+        expectedOutput.add(
+                binaryRecord(
+                        RowKind.UPDATE_BEFORE,
+                        "key3",
+                        1,
+                        3999L,
+                        localMills(2999L),
+                        localMills(7999L),
+                        7998L));
+        expectedOutput.add(
+                binaryRecord(
+                        RowKind.UPDATE_AFTER,
+                        "key3",
+                        1,
+                        4999L,
+                        localMills(2999L),
+                        localMills(7999L),
+                        7998L));
+        expectedOutput.add(new Watermark(7999));
+
+        ASSERTER.assertOutputEqualsSorted(
+                "Output was not correct.", expectedOutput, 
testHarness.getOutput());
+
+        testHarness.close();
+    }
+
     @Test
     public void testProcessTimeSessionWindows() throws Exception {
         final SessionWindowAssigner assigner =
@@ -246,6 +382,101 @@ public class UnalignedWindowTableFunctionOperatorTest 
extends WindowTableFunctio
         testHarness.close();
     }
 
+    @Test
+    public void testProcessTimeSessionWindowsWithChangelog() throws Exception {
+        final SessionWindowAssigner assigner =
+                
SessionWindowAssigner.withGap(Duration.ofSeconds(3)).withProcessingTime();
+        UnalignedWindowTableFunctionOperator operator = 
createOperator(assigner, -1);
+        OneInputStreamOperatorTestHarness<RowData, RowData> testHarness =
+                createTestHarness(operator);
+        testHarness.setup(OUT_SERIALIZER);
+        testHarness.open();
+
+        // process elements
+        ConcurrentLinkedQueue<Object> expectedOutput = new 
ConcurrentLinkedQueue<>();
+
+        // timestamp is ignored in processing time
+        testHarness.setProcessingTime(20L);
+        testHarness.processElement(binaryRecord(RowKind.INSERT, "key1", 1, 
Long.MAX_VALUE));
+        testHarness.processElement(binaryRecord(RowKind.INSERT, "key2", 1, 
Long.MAX_VALUE));
+
+        ASSERTER.assertOutputEqualsSorted(
+                "Output was not correct.", expectedOutput, 
testHarness.getOutput());
+
+        testHarness.setProcessingTime(1999);
+        testHarness.processElement(binaryRecord(RowKind.DELETE, "key3", 1, 
Long.MAX_VALUE));
+        testHarness.processElement(binaryRecord(RowKind.DELETE, "key2", 1, 
Long.MAX_VALUE));
+
+        testHarness.setProcessingTime(2999);
+
+        ASSERTER.assertOutputEqualsSorted(
+                "Output was not correct.", expectedOutput, 
testHarness.getOutput());
+
+        testHarness.setProcessingTime(3999L);
+
+        // append 3 fields: window_start, window_end, window_time
+        expectedOutput.add(
+                binaryRecord(
+                        RowKind.INSERT,
+                        "key1",
+                        1,
+                        Long.MAX_VALUE,
+                        localMills(20L),
+                        localMills(3020L),
+                        3019L));
+
+        ASSERTER.assertOutputEqualsSorted(
+                "Output was not correct.", expectedOutput, 
testHarness.getOutput());
+
+        // do a snapshot, close and restore again
+        testHarness.prepareSnapshotPreBarrier(0L);
+        OperatorSubtaskState snapshot = testHarness.snapshot(0L, 0);
+        testHarness.close();
+
+        expectedOutput.clear();
+        testHarness = createTestHarness(operator);
+        testHarness.setup();
+        testHarness.initializeState(snapshot);
+        testHarness.open();
+
+        testHarness.setProcessingTime(4999);
+
+        expectedOutput.add(
+                binaryRecord(
+                        RowKind.INSERT,
+                        "key2",
+                        1,
+                        Long.MAX_VALUE,
+                        localMills(20L),
+                        localMills(4999L),
+                        4998L));
+
+        expectedOutput.add(
+                binaryRecord(
+                        RowKind.DELETE,
+                        "key3",
+                        1,
+                        Long.MAX_VALUE,
+                        localMills(1999L),
+                        localMills(4999L),
+                        4998L));
+
+        expectedOutput.add(
+                binaryRecord(
+                        RowKind.DELETE,
+                        "key2",
+                        1,
+                        Long.MAX_VALUE,
+                        localMills(20L),
+                        localMills(4999L),
+                        4998L));
+
+        ASSERTER.assertOutputEqualsSorted(
+                "Output was not correct.", expectedOutput, 
testHarness.getOutput());
+
+        testHarness.close();
+    }
+
     @Test
     public void testSessionWindowsWithoutPartitionKeys() throws Exception {
         final SessionWindowAssigner assigner = 
SessionWindowAssigner.withGap(Duration.ofSeconds(3));

Reply via email to