This is an automated email from the ASF dual-hosted git repository. roman pushed a commit to branch master in repository https://gitbox.apache.org/repos/asf/flink.git
commit 69e858ffd26f47bc9c8f52de5066d9c14bc5cf8e Author: Roman Khachatryan <[email protected]> AuthorDate: Fri Oct 10 14:27:28 2025 +0000 [FLINK-38460] Add SinkUpsertMaterializer serializer and equalizer tests --- .../operators/sink/SinkUpsertMaterializerTest.java | 70 ++++++++++++++++++++++ 1 file changed, 70 insertions(+) diff --git a/flink-table/flink-table-runtime/src/test/java/org/apache/flink/table/runtime/operators/sink/SinkUpsertMaterializerTest.java b/flink-table/flink-table-runtime/src/test/java/org/apache/flink/table/runtime/operators/sink/SinkUpsertMaterializerTest.java index 99c0e710c13..f8c9a44017c 100644 --- a/flink-table/flink-table-runtime/src/test/java/org/apache/flink/table/runtime/operators/sink/SinkUpsertMaterializerTest.java +++ b/flink-table/flink-table-runtime/src/test/java/org/apache/flink/table/runtime/operators/sink/SinkUpsertMaterializerTest.java @@ -53,6 +53,7 @@ import java.util.List; import java.util.stream.Collectors; import static org.apache.flink.api.java.tuple.Tuple2.of; +import static org.apache.flink.table.runtime.util.StreamRecordUtils.binaryRecord; import static org.apache.flink.table.runtime.util.StreamRecordUtils.deleteRecord; import static org.apache.flink.table.runtime.util.StreamRecordUtils.insertRecord; import static org.apache.flink.table.runtime.util.StreamRecordUtils.rowOfKind; @@ -100,6 +101,75 @@ public class SinkUpsertMaterializerTest { } }; + /** + * If the composite serializer in {@link SinkUpsertMaterializer} works on projected fields then + * it might use the wrong serializer, e.g. the {@link VarCharType} instead of the {@link + * IntType}. That might cause {@link ArrayIndexOutOfBoundsException} because string serializer + * expects the first number to be the length of the string. + */ + @Test + public void testUpsertKeySerializerFailure() throws Exception { + LogicalType[] types = new LogicalType[] {new VarCharType(), new IntType()}; + // project int field, while in the original record it's string + + OneInputStreamOperator<RowData, RowData> materializer = createOperator(types, 1); + try (KeyedOneInputStreamOperatorTestHarness<RowData, RowData, RowData> testHarness = + createHarness(materializer, stateBackend, types)) { + testHarness.open(); + RowDataHarnessAssertor assertor = new RowDataHarnessAssertor(types); + // -1 is not a valid string length + testHarness.processElement(binaryRecord(RowKind.INSERT, "any string", -1)); + assertor.shouldEmit(testHarness, rowOfKind(RowKind.INSERT, "any string", -1)); + + // 999 as a string length is too long + testHarness.processElement(binaryRecord(RowKind.INSERT, "any string", 999)); + assertor.shouldEmit(testHarness, rowOfKind(RowKind.INSERT, "any string", 999)); + } + } + + @Test + public void testUpsertKeySerializerSilentCorruption() throws Exception { + LogicalType[] types = + new LogicalType[] {new VarCharType(), new BigIntType(), new IntType()}; + // project int field, while in the original record it's string + + OneInputStreamOperator<RowData, RowData> materializer = createOperator(types, 1); + try (KeyedOneInputStreamOperatorTestHarness<RowData, RowData, RowData> testHarness = + createHarness(materializer, stateBackend, types)) { + testHarness.open(); + RowDataHarnessAssertor assertor = new RowDataHarnessAssertor(types); + + // this might serialize upsert key as 32-character string potentially including "97" + testHarness.processElement(binaryRecord(RowKind.INSERT, "any string", 32L, 97)); + assertor.shouldEmit(testHarness, rowOfKind(RowKind.INSERT, "any string", 32L, 97)); + + // but here it might include "98" which would result in no output and test failure + testHarness.processElement(binaryRecord(RowKind.DELETE, "any string", 32L, 98)); + assertor.shouldEmit(testHarness, rowOfKind(RowKind.DELETE, "any string", 32L, 98)); + } + } + + @Test + public void testUpsertEqualizer() throws Exception { + LogicalType[] types = new LogicalType[] {new IntType(), new BigIntType()}; + + OneInputStreamOperator<RowData, RowData> materializer = createOperator(types, 1); + try (KeyedOneInputStreamOperatorTestHarness<RowData, RowData, RowData> testHarness = + createHarness(materializer, stateBackend, types)) { + testHarness.open(); + RowDataHarnessAssertor assertor = new RowDataHarnessAssertor(types); + + // upsert key is 33, 0 is unused + testHarness.processElement(binaryRecord(RowKind.INSERT, 0, 33L)); + assertor.shouldEmit(testHarness, rowOfKind(RowKind.INSERT, 0, 33L)); + + // upsert key 33 - should remove AND clear the state involving upsert equalizer + // equalizer might fail if it's used on un-projected records + testHarness.processElement(binaryRecord(RowKind.DELETE, 1, 33L)); + assertor.shouldEmit(testHarness, rowOfKind(RowKind.DELETE, 1, 33L)); + } + } + @Test public void testNoUpsertKeyFlow() throws Exception { KeyedOneInputStreamOperatorTestHarness<RowData, RowData, RowData> testHarness =
