This is an automated email from the ASF dual-hosted git repository. roman pushed a commit to branch master in repository https://gitbox.apache.org/repos/asf/flink.git
commit cdbf7bfe66ce2cdd8ca84d1e481f3a9cc929171c Author: Roman Khachatryan <[email protected]> AuthorDate: Fri Oct 10 14:26:59 2025 +0000 [FLINK-38460] Add SinkUpsertMaterializer retraction tests --- .../operators/sink/SinkUpsertMaterializerTest.java | 93 ++++++++++++++++++++++ 1 file changed, 93 insertions(+) diff --git a/flink-table/flink-table-runtime/src/test/java/org/apache/flink/table/runtime/operators/sink/SinkUpsertMaterializerTest.java b/flink-table/flink-table-runtime/src/test/java/org/apache/flink/table/runtime/operators/sink/SinkUpsertMaterializerTest.java index f8fb9754f26..99c0e710c13 100644 --- a/flink-table/flink-table-runtime/src/test/java/org/apache/flink/table/runtime/operators/sink/SinkUpsertMaterializerTest.java +++ b/flink-table/flink-table-runtime/src/test/java/org/apache/flink/table/runtime/operators/sink/SinkUpsertMaterializerTest.java @@ -19,12 +19,14 @@ package org.apache.flink.table.runtime.operators.sink; import org.apache.flink.api.common.state.StateTtlConfig; +import org.apache.flink.api.java.tuple.Tuple2; import org.apache.flink.runtime.checkpoint.OperatorSubtaskState; import org.apache.flink.runtime.checkpoint.StateObjectCollection; import org.apache.flink.runtime.state.OperatorStateHandle; import org.apache.flink.runtime.state.StateObject; import org.apache.flink.streaming.api.operators.OneInputStreamOperator; import org.apache.flink.streaming.api.operators.OperatorSnapshotFinalizer; +import org.apache.flink.streaming.runtime.streamrecord.StreamRecord; import org.apache.flink.streaming.util.KeyedOneInputStreamOperatorTestHarness; import org.apache.flink.streaming.util.OneInputStreamOperatorTestHarness; import org.apache.flink.table.data.RowData; @@ -50,6 +52,7 @@ import java.util.Arrays; import java.util.List; import java.util.stream.Collectors; +import static org.apache.flink.api.java.tuple.Tuple2.of; import static org.apache.flink.table.runtime.util.StreamRecordUtils.deleteRecord; import static org.apache.flink.table.runtime.util.StreamRecordUtils.insertRecord; import static org.apache.flink.table.runtime.util.StreamRecordUtils.rowOfKind; @@ -172,6 +175,96 @@ public class SinkUpsertMaterializerTest { testHarness.close(); } + @Test + public void testRetractionWithoutUpsertKey() throws Exception { + testRetractions((int[]) null); + } + + @Test + public void testRetractionWithUpsertKey() throws Exception { + testRetractions(UPSERT_KEY); + } + + public void testRetractions(int... upsertKey) throws Exception { + testThreeElementProcessing( + "retract first - should emit nothing until empty - then delete", + upsertKey, + of(deleteRecord(1L, 1, "a1"), null), + of(deleteRecord(2L, 1, "a2"), null), + of(deleteRecord(3L, 1, "a3"), rowOfKind(RowKind.DELETE, 3L, 1, "a3"))); + testThreeElementProcessing( + "retract middle - should emit nothing until empty - then delete", + upsertKey, + of(deleteRecord(2L, 1, "a2"), null), + of(deleteRecord(1L, 1, "a1"), null), + of(deleteRecord(3L, 1, "a3"), rowOfKind(RowKind.DELETE, 3L, 1, "a3"))); + testThreeElementProcessing( + "retract last - should emit penultimate until empty - then delete", + upsertKey, + of(deleteRecord(3L, 1, "a3"), rowOfKind(RowKind.UPDATE_AFTER, 2L, 1, "a2")), + of(deleteRecord(2L, 1, "a2"), rowOfKind(RowKind.UPDATE_AFTER, 1L, 1, "a1")), + of(deleteRecord(1L, 1, "a1"), rowOfKind(RowKind.DELETE, 1L, 1, "a1"))); + testThreeElementProcessing( + "retract in arbitrary order: 1,3,2", + upsertKey, + of(deleteRecord(1L, 1, "a1"), null), + of(deleteRecord(3L, 1, "a3"), rowOfKind(RowKind.UPDATE_AFTER, 2L, 1, "a2")), + of(deleteRecord(2L, 1, "a2"), rowOfKind(RowKind.DELETE, 2L, 1, "a2"))); + testThreeElementProcessing( + "retract in arbitrary order: 2,3,1", + upsertKey, + of(deleteRecord(2L, 1, "a2"), null), + of(deleteRecord(3L, 1, "a3"), rowOfKind(RowKind.UPDATE_AFTER, 1L, 1, "a1")), + of(deleteRecord(1L, 1, "a1"), rowOfKind(RowKind.DELETE, 1L, 1, "a1"))); + testThreeElementProcessing( + "retract in arbitrary order: 3,1,2", + upsertKey, + of(deleteRecord(3L, 1, "a3"), rowOfKind(RowKind.UPDATE_AFTER, 2L, 1, "a2")), + of(deleteRecord(1L, 1, "a1"), null), + of(deleteRecord(2L, 1, "a2"), rowOfKind(RowKind.DELETE, 2L, 1, "a2"))); + } + + // boilerplate for common test case of processing starting with three elements + @SafeVarargs + private void testThreeElementProcessing( + String description, + int[] upsertKey, + Tuple2<StreamRecord<RowData>, RowData>... inputAndOutput) + throws Exception { + @SuppressWarnings("rawtypes") + Tuple2[] merged = new Tuple2[inputAndOutput.length + 3]; + merged[0] = of(insertRecord(1L, 1, "a1"), rowOfKind(RowKind.INSERT, 1L, 1, "a1")); + merged[1] = of(insertRecord(2L, 1, "a2"), rowOfKind(RowKind.UPDATE_AFTER, 2L, 1, "a2")); + merged[2] = of(insertRecord(3L, 1, "a3"), rowOfKind(RowKind.UPDATE_AFTER, 3L, 1, "a3")); + System.arraycopy(inputAndOutput, 0, merged, 3, inputAndOutput.length); + testElementProcessing(description, upsertKey, merged); + } + + @SafeVarargs + private void testElementProcessing( + String description, + int[] upsertKey, + Tuple2<StreamRecord<RowData>, RowData>... inputAndOutput) + throws Exception { + OneInputStreamOperator<RowData, RowData> materializer = + createOperator(LOGICAL_TYPES, upsertKey); + KeyedOneInputStreamOperatorTestHarness<RowData, RowData, RowData> testHarness = + createHarness(materializer); + + testHarness.open(); + + for (Tuple2<StreamRecord<RowData>, RowData> el0 : inputAndOutput) { + testHarness.processElement(el0.f0); + if (el0.f1 == null) { + ASSERTOR.shouldEmitNothing(testHarness); + } else { + ASSERTOR.shouldEmit(testHarness, description, el0.f1); + } + } + + testHarness.close(); + } + private static class TestRecordEqualiser implements RecordEqualiser { @Override public boolean equals(RowData row1, RowData row2) {
