This is an automated email from the ASF dual-hosted git repository.

roman pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/flink.git

commit 69e858ffd26f47bc9c8f52de5066d9c14bc5cf8e
Author: Roman Khachatryan <[email protected]>
AuthorDate: Fri Oct 10 14:27:28 2025 +0000

    [FLINK-38460] Add SinkUpsertMaterializer serializer and equalizer tests
---
 .../operators/sink/SinkUpsertMaterializerTest.java | 70 ++++++++++++++++++++++
 1 file changed, 70 insertions(+)

diff --git 
a/flink-table/flink-table-runtime/src/test/java/org/apache/flink/table/runtime/operators/sink/SinkUpsertMaterializerTest.java
 
b/flink-table/flink-table-runtime/src/test/java/org/apache/flink/table/runtime/operators/sink/SinkUpsertMaterializerTest.java
index 99c0e710c13..f8c9a44017c 100644
--- 
a/flink-table/flink-table-runtime/src/test/java/org/apache/flink/table/runtime/operators/sink/SinkUpsertMaterializerTest.java
+++ 
b/flink-table/flink-table-runtime/src/test/java/org/apache/flink/table/runtime/operators/sink/SinkUpsertMaterializerTest.java
@@ -53,6 +53,7 @@ import java.util.List;
 import java.util.stream.Collectors;
 
 import static org.apache.flink.api.java.tuple.Tuple2.of;
+import static 
org.apache.flink.table.runtime.util.StreamRecordUtils.binaryRecord;
 import static 
org.apache.flink.table.runtime.util.StreamRecordUtils.deleteRecord;
 import static 
org.apache.flink.table.runtime.util.StreamRecordUtils.insertRecord;
 import static org.apache.flink.table.runtime.util.StreamRecordUtils.rowOfKind;
@@ -100,6 +101,75 @@ public class SinkUpsertMaterializerTest {
                 }
             };
 
+    /**
+     * If the composite serializer in {@link SinkUpsertMaterializer} works on 
projected fields then
+     * it might use the wrong serializer, e.g. the {@link VarCharType} instead 
of the {@link
+     * IntType}. That might cause {@link ArrayIndexOutOfBoundsException} 
because string serializer
+     * expects the first number to be the length of the string.
+     */
+    @Test
+    public void testUpsertKeySerializerFailure() throws Exception {
+        LogicalType[] types = new LogicalType[] {new VarCharType(), new 
IntType()};
+        // project int field, while in the original record it's string
+
+        OneInputStreamOperator<RowData, RowData> materializer = 
createOperator(types, 1);
+        try (KeyedOneInputStreamOperatorTestHarness<RowData, RowData, RowData> 
testHarness =
+                createHarness(materializer, stateBackend, types)) {
+            testHarness.open();
+            RowDataHarnessAssertor assertor = new 
RowDataHarnessAssertor(types);
+            // -1 is not a valid string length
+            testHarness.processElement(binaryRecord(RowKind.INSERT, "any 
string", -1));
+            assertor.shouldEmit(testHarness, rowOfKind(RowKind.INSERT, "any 
string", -1));
+
+            // 999 as a string length is too long
+            testHarness.processElement(binaryRecord(RowKind.INSERT, "any 
string", 999));
+            assertor.shouldEmit(testHarness, rowOfKind(RowKind.INSERT, "any 
string", 999));
+        }
+    }
+
+    @Test
+    public void testUpsertKeySerializerSilentCorruption() throws Exception {
+        LogicalType[] types =
+                new LogicalType[] {new VarCharType(), new BigIntType(), new 
IntType()};
+        // project int field, while in the original record it's string
+
+        OneInputStreamOperator<RowData, RowData> materializer = 
createOperator(types, 1);
+        try (KeyedOneInputStreamOperatorTestHarness<RowData, RowData, RowData> 
testHarness =
+                createHarness(materializer, stateBackend, types)) {
+            testHarness.open();
+            RowDataHarnessAssertor assertor = new 
RowDataHarnessAssertor(types);
+
+            // this might serialize upsert key as 32-character string 
potentially including "97"
+            testHarness.processElement(binaryRecord(RowKind.INSERT, "any 
string", 32L, 97));
+            assertor.shouldEmit(testHarness, rowOfKind(RowKind.INSERT, "any 
string", 32L, 97));
+
+            // but here it might include "98" which would result in no output 
and test failure
+            testHarness.processElement(binaryRecord(RowKind.DELETE, "any 
string", 32L, 98));
+            assertor.shouldEmit(testHarness, rowOfKind(RowKind.DELETE, "any 
string", 32L, 98));
+        }
+    }
+
+    @Test
+    public void testUpsertEqualizer() throws Exception {
+        LogicalType[] types = new LogicalType[] {new IntType(), new 
BigIntType()};
+
+        OneInputStreamOperator<RowData, RowData> materializer = 
createOperator(types, 1);
+        try (KeyedOneInputStreamOperatorTestHarness<RowData, RowData, RowData> 
testHarness =
+                createHarness(materializer, stateBackend, types)) {
+            testHarness.open();
+            RowDataHarnessAssertor assertor = new 
RowDataHarnessAssertor(types);
+
+            // upsert key is 33, 0 is unused
+            testHarness.processElement(binaryRecord(RowKind.INSERT, 0, 33L));
+            assertor.shouldEmit(testHarness, rowOfKind(RowKind.INSERT, 0, 
33L));
+
+            // upsert key 33 - should remove AND clear the state involving 
upsert equalizer
+            // equalizer might fail if it's used on un-projected records
+            testHarness.processElement(binaryRecord(RowKind.DELETE, 1, 33L));
+            assertor.shouldEmit(testHarness, rowOfKind(RowKind.DELETE, 1, 
33L));
+        }
+    }
+
     @Test
     public void testNoUpsertKeyFlow() throws Exception {
         KeyedOneInputStreamOperatorTestHarness<RowData, RowData, RowData> 
testHarness =

Reply via email to