This is an automated email from the ASF dual-hosted git repository.

roman pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/flink.git

commit ca03ed435b1c68b3381b93bb18d37d04cb68161f
Author: Roman Khachatryan <[email protected]>
AuthorDate: Fri Oct 10 14:17:47 2025 +0000

    [FLINK-38460] Add SinkUpsertMaterializer recovery test
---
 .../operators/sink/SinkUpsertMaterializerTest.java | 24 ++++++++++++++++++++++
 1 file changed, 24 insertions(+)

diff --git 
a/flink-table/flink-table-runtime/src/test/java/org/apache/flink/table/runtime/operators/sink/SinkUpsertMaterializerTest.java
 
b/flink-table/flink-table-runtime/src/test/java/org/apache/flink/table/runtime/operators/sink/SinkUpsertMaterializerTest.java
index 86014220b81..cd92c2fe702 100644
--- 
a/flink-table/flink-table-runtime/src/test/java/org/apache/flink/table/runtime/operators/sink/SinkUpsertMaterializerTest.java
+++ 
b/flink-table/flink-table-runtime/src/test/java/org/apache/flink/table/runtime/operators/sink/SinkUpsertMaterializerTest.java
@@ -19,6 +19,7 @@
 package org.apache.flink.table.runtime.operators.sink;
 
 import org.apache.flink.api.common.state.StateTtlConfig;
+import org.apache.flink.runtime.checkpoint.OperatorSubtaskState;
 import org.apache.flink.streaming.api.operators.OneInputStreamOperator;
 import org.apache.flink.streaming.util.KeyedOneInputStreamOperatorTestHarness;
 import org.apache.flink.table.data.RowData;
@@ -211,4 +212,27 @@ public class SinkUpsertMaterializerTest {
         testHarness.setStateBackend(backend.create(true));
         return testHarness;
     }
+
+    @Test
+    public void testEmptyUpsertKey() throws Exception {
+        testRecovery(createOperator(LOGICAL_TYPES), 
createOperatorWithoutUpsertKey());
+        testRecovery(createOperatorWithoutUpsertKey(), 
createOperator(LOGICAL_TYPES));
+    }
+
+    private void testRecovery(
+            OneInputStreamOperator<RowData, RowData> from,
+            OneInputStreamOperator<RowData, RowData> to)
+            throws Exception {
+        OperatorSubtaskState snapshot;
+        try (KeyedOneInputStreamOperatorTestHarness<RowData, RowData, RowData> 
testHarness =
+                createHarness(from)) {
+            testHarness.open();
+            snapshot = testHarness.snapshot(1L, 1L);
+        }
+        try (KeyedOneInputStreamOperatorTestHarness<RowData, RowData, RowData> 
testHarness =
+                createHarness(to)) {
+            testHarness.initializeState(snapshot);
+            testHarness.open();
+        }
+    }
 }

Reply via email to