This is an automated email from the ASF dual-hosted git repository.

roman pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/flink.git

commit cdbf7bfe66ce2cdd8ca84d1e481f3a9cc929171c
Author: Roman Khachatryan <[email protected]>
AuthorDate: Fri Oct 10 14:26:59 2025 +0000

    [FLINK-38460] Add SinkUpsertMaterializer retraction tests
---
 .../operators/sink/SinkUpsertMaterializerTest.java | 93 ++++++++++++++++++++++
 1 file changed, 93 insertions(+)

diff --git 
a/flink-table/flink-table-runtime/src/test/java/org/apache/flink/table/runtime/operators/sink/SinkUpsertMaterializerTest.java
 
b/flink-table/flink-table-runtime/src/test/java/org/apache/flink/table/runtime/operators/sink/SinkUpsertMaterializerTest.java
index f8fb9754f26..99c0e710c13 100644
--- 
a/flink-table/flink-table-runtime/src/test/java/org/apache/flink/table/runtime/operators/sink/SinkUpsertMaterializerTest.java
+++ 
b/flink-table/flink-table-runtime/src/test/java/org/apache/flink/table/runtime/operators/sink/SinkUpsertMaterializerTest.java
@@ -19,12 +19,14 @@
 package org.apache.flink.table.runtime.operators.sink;
 
 import org.apache.flink.api.common.state.StateTtlConfig;
+import org.apache.flink.api.java.tuple.Tuple2;
 import org.apache.flink.runtime.checkpoint.OperatorSubtaskState;
 import org.apache.flink.runtime.checkpoint.StateObjectCollection;
 import org.apache.flink.runtime.state.OperatorStateHandle;
 import org.apache.flink.runtime.state.StateObject;
 import org.apache.flink.streaming.api.operators.OneInputStreamOperator;
 import org.apache.flink.streaming.api.operators.OperatorSnapshotFinalizer;
+import org.apache.flink.streaming.runtime.streamrecord.StreamRecord;
 import org.apache.flink.streaming.util.KeyedOneInputStreamOperatorTestHarness;
 import org.apache.flink.streaming.util.OneInputStreamOperatorTestHarness;
 import org.apache.flink.table.data.RowData;
@@ -50,6 +52,7 @@ import java.util.Arrays;
 import java.util.List;
 import java.util.stream.Collectors;
 
+import static org.apache.flink.api.java.tuple.Tuple2.of;
 import static 
org.apache.flink.table.runtime.util.StreamRecordUtils.deleteRecord;
 import static 
org.apache.flink.table.runtime.util.StreamRecordUtils.insertRecord;
 import static org.apache.flink.table.runtime.util.StreamRecordUtils.rowOfKind;
@@ -172,6 +175,96 @@ public class SinkUpsertMaterializerTest {
         testHarness.close();
     }
 
+    @Test
+    public void testRetractionWithoutUpsertKey() throws Exception {
+        testRetractions((int[]) null);
+    }
+
+    @Test
+    public void testRetractionWithUpsertKey() throws Exception {
+        testRetractions(UPSERT_KEY);
+    }
+
+    public void testRetractions(int... upsertKey) throws Exception {
+        testThreeElementProcessing(
+                "retract first - should emit nothing until empty - then 
delete",
+                upsertKey,
+                of(deleteRecord(1L, 1, "a1"), null),
+                of(deleteRecord(2L, 1, "a2"), null),
+                of(deleteRecord(3L, 1, "a3"), rowOfKind(RowKind.DELETE, 3L, 1, 
"a3")));
+        testThreeElementProcessing(
+                "retract middle - should emit nothing until empty - then 
delete",
+                upsertKey,
+                of(deleteRecord(2L, 1, "a2"), null),
+                of(deleteRecord(1L, 1, "a1"), null),
+                of(deleteRecord(3L, 1, "a3"), rowOfKind(RowKind.DELETE, 3L, 1, 
"a3")));
+        testThreeElementProcessing(
+                "retract last - should emit penultimate until empty - then 
delete",
+                upsertKey,
+                of(deleteRecord(3L, 1, "a3"), rowOfKind(RowKind.UPDATE_AFTER, 
2L, 1, "a2")),
+                of(deleteRecord(2L, 1, "a2"), rowOfKind(RowKind.UPDATE_AFTER, 
1L, 1, "a1")),
+                of(deleteRecord(1L, 1, "a1"), rowOfKind(RowKind.DELETE, 1L, 1, 
"a1")));
+        testThreeElementProcessing(
+                "retract in arbitrary order: 1,3,2",
+                upsertKey,
+                of(deleteRecord(1L, 1, "a1"), null),
+                of(deleteRecord(3L, 1, "a3"), rowOfKind(RowKind.UPDATE_AFTER, 
2L, 1, "a2")),
+                of(deleteRecord(2L, 1, "a2"), rowOfKind(RowKind.DELETE, 2L, 1, 
"a2")));
+        testThreeElementProcessing(
+                "retract in arbitrary order: 2,3,1",
+                upsertKey,
+                of(deleteRecord(2L, 1, "a2"), null),
+                of(deleteRecord(3L, 1, "a3"), rowOfKind(RowKind.UPDATE_AFTER, 
1L, 1, "a1")),
+                of(deleteRecord(1L, 1, "a1"), rowOfKind(RowKind.DELETE, 1L, 1, 
"a1")));
+        testThreeElementProcessing(
+                "retract in arbitrary order: 3,1,2",
+                upsertKey,
+                of(deleteRecord(3L, 1, "a3"), rowOfKind(RowKind.UPDATE_AFTER, 
2L, 1, "a2")),
+                of(deleteRecord(1L, 1, "a1"), null),
+                of(deleteRecord(2L, 1, "a2"), rowOfKind(RowKind.DELETE, 2L, 1, 
"a2")));
+    }
+
+    // boilerplate for common test case of processing starting with three 
elements
+    @SafeVarargs
+    private void testThreeElementProcessing(
+            String description,
+            int[] upsertKey,
+            Tuple2<StreamRecord<RowData>, RowData>... inputAndOutput)
+            throws Exception {
+        @SuppressWarnings("rawtypes")
+        Tuple2[] merged = new Tuple2[inputAndOutput.length + 3];
+        merged[0] = of(insertRecord(1L, 1, "a1"), rowOfKind(RowKind.INSERT, 
1L, 1, "a1"));
+        merged[1] = of(insertRecord(2L, 1, "a2"), 
rowOfKind(RowKind.UPDATE_AFTER, 2L, 1, "a2"));
+        merged[2] = of(insertRecord(3L, 1, "a3"), 
rowOfKind(RowKind.UPDATE_AFTER, 3L, 1, "a3"));
+        System.arraycopy(inputAndOutput, 0, merged, 3, inputAndOutput.length);
+        testElementProcessing(description, upsertKey, merged);
+    }
+
+    @SafeVarargs
+    private void testElementProcessing(
+            String description,
+            int[] upsertKey,
+            Tuple2<StreamRecord<RowData>, RowData>... inputAndOutput)
+            throws Exception {
+        OneInputStreamOperator<RowData, RowData> materializer =
+                createOperator(LOGICAL_TYPES, upsertKey);
+        KeyedOneInputStreamOperatorTestHarness<RowData, RowData, RowData> 
testHarness =
+                createHarness(materializer);
+
+        testHarness.open();
+
+        for (Tuple2<StreamRecord<RowData>, RowData> el0 : inputAndOutput) {
+            testHarness.processElement(el0.f0);
+            if (el0.f1 == null) {
+                ASSERTOR.shouldEmitNothing(testHarness);
+            } else {
+                ASSERTOR.shouldEmit(testHarness, description, el0.f1);
+            }
+        }
+
+        testHarness.close();
+    }
+
     private static class TestRecordEqualiser implements RecordEqualiser {
         @Override
         public boolean equals(RowData row1, RowData row2) {

Reply via email to