This is an automated email from the ASF dual-hosted git repository. roman pushed a commit to branch master in repository https://gitbox.apache.org/repos/asf/flink.git
commit ca03ed435b1c68b3381b93bb18d37d04cb68161f Author: Roman Khachatryan <[email protected]> AuthorDate: Fri Oct 10 14:17:47 2025 +0000 [FLINK-38460] Add SinkUpsertMaterializer recovery test --- .../operators/sink/SinkUpsertMaterializerTest.java | 24 ++++++++++++++++++++++ 1 file changed, 24 insertions(+) diff --git a/flink-table/flink-table-runtime/src/test/java/org/apache/flink/table/runtime/operators/sink/SinkUpsertMaterializerTest.java b/flink-table/flink-table-runtime/src/test/java/org/apache/flink/table/runtime/operators/sink/SinkUpsertMaterializerTest.java index 86014220b81..cd92c2fe702 100644 --- a/flink-table/flink-table-runtime/src/test/java/org/apache/flink/table/runtime/operators/sink/SinkUpsertMaterializerTest.java +++ b/flink-table/flink-table-runtime/src/test/java/org/apache/flink/table/runtime/operators/sink/SinkUpsertMaterializerTest.java @@ -19,6 +19,7 @@ package org.apache.flink.table.runtime.operators.sink; import org.apache.flink.api.common.state.StateTtlConfig; +import org.apache.flink.runtime.checkpoint.OperatorSubtaskState; import org.apache.flink.streaming.api.operators.OneInputStreamOperator; import org.apache.flink.streaming.util.KeyedOneInputStreamOperatorTestHarness; import org.apache.flink.table.data.RowData; @@ -211,4 +212,27 @@ public class SinkUpsertMaterializerTest { testHarness.setStateBackend(backend.create(true)); return testHarness; } + + @Test + public void testEmptyUpsertKey() throws Exception { + testRecovery(createOperator(LOGICAL_TYPES), createOperatorWithoutUpsertKey()); + testRecovery(createOperatorWithoutUpsertKey(), createOperator(LOGICAL_TYPES)); + } + + private void testRecovery( + OneInputStreamOperator<RowData, RowData> from, + OneInputStreamOperator<RowData, RowData> to) + throws Exception { + OperatorSubtaskState snapshot; + try (KeyedOneInputStreamOperatorTestHarness<RowData, RowData, RowData> testHarness = + createHarness(from)) { + testHarness.open(); + snapshot = testHarness.snapshot(1L, 1L); + } + try (KeyedOneInputStreamOperatorTestHarness<RowData, RowData, RowData> testHarness = + createHarness(to)) { + testHarness.initializeState(snapshot); + testHarness.open(); + } + } }
