This is an automated email from the ASF dual-hosted git repository.

roman pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/flink.git

commit 742730ff718ed493896e1d3d14ff66a4f86257e6
Author: Roman Khachatryan <[email protected]>
AuthorDate: Fri Oct 10 13:54:32 2025 +0000

    [hotfix] Refactor SinkUpsertMaterializer tests
---
 .../sink/SinkUpsertMaterializerStateBackend.java   |  41 +++++++
 .../operators/sink/SinkUpsertMaterializerTest.java | 124 ++++++++++++++-------
 .../table/runtime/util/RowDataHarnessAssertor.java |   8 ++
 3 files changed, 133 insertions(+), 40 deletions(-)

diff --git 
a/flink-table/flink-table-runtime/src/test/java/org/apache/flink/table/runtime/operators/sink/SinkUpsertMaterializerStateBackend.java
 
b/flink-table/flink-table-runtime/src/test/java/org/apache/flink/table/runtime/operators/sink/SinkUpsertMaterializerStateBackend.java
new file mode 100644
index 00000000000..b675df2893e
--- /dev/null
+++ 
b/flink-table/flink-table-runtime/src/test/java/org/apache/flink/table/runtime/operators/sink/SinkUpsertMaterializerStateBackend.java
@@ -0,0 +1,41 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.flink.table.runtime.operators.sink;
+
+import org.apache.flink.contrib.streaming.state.EmbeddedRocksDBStateBackend;
+import org.apache.flink.runtime.state.StateBackend;
+import org.apache.flink.runtime.state.hashmap.HashMapStateBackend;
+
+/** StateBackend to test SinkUpsertMaterializer with. */
+public enum SinkUpsertMaterializerStateBackend {
+    HEAP {
+
+        public StateBackend create(boolean incrementalIfSupported) {
+            return new HashMapStateBackend();
+        }
+    },
+    ROCKSDB {
+
+        public StateBackend create(boolean incrementalIfSupported) {
+            return new EmbeddedRocksDBStateBackend(incrementalIfSupported);
+        }
+    };
+
+    public abstract StateBackend create(boolean incrementalIfSupported);
+}
diff --git 
a/flink-table/flink-table-runtime/src/test/java/org/apache/flink/table/runtime/operators/sink/SinkUpsertMaterializerTest.java
 
b/flink-table/flink-table-runtime/src/test/java/org/apache/flink/table/runtime/operators/sink/SinkUpsertMaterializerTest.java
index 322ac9e1ceb..86014220b81 100644
--- 
a/flink-table/flink-table-runtime/src/test/java/org/apache/flink/table/runtime/operators/sink/SinkUpsertMaterializerTest.java
+++ 
b/flink-table/flink-table-runtime/src/test/java/org/apache/flink/table/runtime/operators/sink/SinkUpsertMaterializerTest.java
@@ -19,22 +19,28 @@
 package org.apache.flink.table.runtime.operators.sink;
 
 import org.apache.flink.api.common.state.StateTtlConfig;
+import org.apache.flink.streaming.api.operators.OneInputStreamOperator;
 import org.apache.flink.streaming.util.KeyedOneInputStreamOperatorTestHarness;
 import org.apache.flink.table.data.RowData;
 import org.apache.flink.table.runtime.generated.GeneratedRecordEqualiser;
 import org.apache.flink.table.runtime.generated.RecordEqualiser;
-import org.apache.flink.table.runtime.keyselector.RowDataKeySelector;
-import org.apache.flink.table.runtime.typeutils.RowDataSerializer;
 import org.apache.flink.table.runtime.util.RowDataHarnessAssertor;
 import org.apache.flink.table.runtime.util.StateConfigUtil;
 import org.apache.flink.table.types.logical.BigIntType;
 import org.apache.flink.table.types.logical.IntType;
 import org.apache.flink.table.types.logical.LogicalType;
+import org.apache.flink.table.types.logical.RowType;
 import org.apache.flink.table.types.logical.VarCharType;
 import org.apache.flink.table.utils.HandwrittenSelectorUtil;
 import org.apache.flink.types.RowKind;
 
-import org.junit.jupiter.api.Test;
+import org.junit.Test;
+import org.junit.runner.RunWith;
+import org.junit.runners.Parameterized;
+import org.junit.runners.Parameterized.Parameter;
+
+import java.util.ArrayList;
+import java.util.List;
 
 import static 
org.apache.flink.table.runtime.util.StreamRecordUtils.deleteRecord;
 import static 
org.apache.flink.table.runtime.util.StreamRecordUtils.insertRecord;
@@ -42,17 +48,29 @@ import static 
org.apache.flink.table.runtime.util.StreamRecordUtils.rowOfKind;
 import static 
org.apache.flink.table.runtime.util.StreamRecordUtils.updateAfterRecord;
 
 /** Test for {@link SinkUpsertMaterializer}. */
-class SinkUpsertMaterializerTest {
+@RunWith(Parameterized.class)
+public class SinkUpsertMaterializerTest {
+
+    static final int UPSERT_KEY = 0;
+
+    @Parameter public SinkUpsertMaterializerStateBackend stateBackend;
+
+    @Parameterized.Parameters(name = "stateBackend={0}")
+    public static Object[][] generateTestParameters() {
+        List<Object[]> result = new ArrayList<>();
+        for (SinkUpsertMaterializerStateBackend backend :
+                SinkUpsertMaterializerStateBackend.values()) {
+            result.add(new Object[] {backend});
+        }
+        return result.toArray(new Object[0][]);
+    }
 
-    private final StateTtlConfig ttlConfig = 
StateConfigUtil.createTtlConfig(1000);
-    private final LogicalType[] types =
+    static final StateTtlConfig TTL_CONFIG = 
StateConfigUtil.createTtlConfig(1000);
+    static final LogicalType[] LOGICAL_TYPES =
             new LogicalType[] {new BigIntType(), new IntType(), new 
VarCharType()};
-    private final RowDataSerializer serializer = new RowDataSerializer(types);
-    private final RowDataKeySelector keySelector =
-            HandwrittenSelectorUtil.getRowDataSelector(new int[] {1}, types);
-    private final RowDataHarnessAssertor assertor = new 
RowDataHarnessAssertor(types);
+    static final RowDataHarnessAssertor ASSERTOR = new 
RowDataHarnessAssertor(LOGICAL_TYPES);
 
-    private final GeneratedRecordEqualiser equaliser =
+    static final GeneratedRecordEqualiser EQUALISER =
             new GeneratedRecordEqualiser("", "", new Object[0]) {
 
                 @Override
@@ -61,7 +79,7 @@ class SinkUpsertMaterializerTest {
                 }
             };
 
-    private final GeneratedRecordEqualiser upsertKeyEqualiser =
+    static final GeneratedRecordEqualiser UPSERT_KEY_EQUALISER =
             new GeneratedRecordEqualiser("", "", new Object[0]) {
 
                 @Override
@@ -71,82 +89,76 @@ class SinkUpsertMaterializerTest {
             };
 
     @Test
-    void test() throws Exception {
-        SinkUpsertMaterializer materializer =
-                new SinkUpsertMaterializer(
-                        ttlConfig, serializer, equaliser, upsertKeyEqualiser, 
null);
+    public void testNoUpsertKeyFlow() throws Exception {
         KeyedOneInputStreamOperatorTestHarness<RowData, RowData, RowData> 
testHarness =
-                new KeyedOneInputStreamOperatorTestHarness<>(
-                        materializer, keySelector, 
keySelector.getProducedType());
+                createHarness(createOperatorWithoutUpsertKey());
 
         testHarness.open();
 
         testHarness.setStateTtlProcessingTime(1);
 
         testHarness.processElement(insertRecord(1L, 1, "a1"));
-        assertor.shouldEmit(testHarness, rowOfKind(RowKind.INSERT, 1L, 1, 
"a1"));
+        ASSERTOR.shouldEmit(testHarness, rowOfKind(RowKind.INSERT, 1L, 1, 
"a1"));
 
         testHarness.processElement(insertRecord(2L, 1, "a2"));
-        assertor.shouldEmit(testHarness, rowOfKind(RowKind.UPDATE_AFTER, 2L, 
1, "a2"));
+        ASSERTOR.shouldEmit(testHarness, rowOfKind(RowKind.UPDATE_AFTER, 2L, 
1, "a2"));
 
         testHarness.processElement(insertRecord(3L, 1, "a3"));
-        assertor.shouldEmit(testHarness, rowOfKind(RowKind.UPDATE_AFTER, 3L, 
1, "a3"));
+        ASSERTOR.shouldEmit(testHarness, rowOfKind(RowKind.UPDATE_AFTER, 3L, 
1, "a3"));
 
         testHarness.processElement(deleteRecord(2L, 1, "a2"));
-        assertor.shouldEmitNothing(testHarness);
+        ASSERTOR.shouldEmitNothing(testHarness);
 
         testHarness.processElement(deleteRecord(3L, 1, "a3"));
-        assertor.shouldEmit(testHarness, rowOfKind(RowKind.UPDATE_AFTER, 1L, 
1, "a1"));
+        ASSERTOR.shouldEmit(testHarness, rowOfKind(RowKind.UPDATE_AFTER, 1L, 
1, "a1"));
 
         testHarness.processElement(deleteRecord(1L, 1, "a1"));
-        assertor.shouldEmit(testHarness, rowOfKind(RowKind.DELETE, 1L, 1, 
"a1"));
+        ASSERTOR.shouldEmit(testHarness, rowOfKind(RowKind.DELETE, 1L, 1, 
"a1"));
 
         testHarness.processElement(insertRecord(4L, 1, "a4"));
-        assertor.shouldEmit(testHarness, rowOfKind(RowKind.INSERT, 4L, 1, 
"a4"));
+        ASSERTOR.shouldEmit(testHarness, rowOfKind(RowKind.INSERT, 4L, 1, 
"a4"));
 
         testHarness.setStateTtlProcessingTime(1002);
 
         testHarness.processElement(deleteRecord(4L, 1, "a4"));
-        assertor.shouldEmitNothing(testHarness);
+        ASSERTOR.shouldEmitNothing(testHarness);
 
         testHarness.close();
     }
 
     @Test
-    void testInputHasUpsertKeyWithNonDeterministicColumn() throws Exception {
-        SinkUpsertMaterializer materializer =
-                new SinkUpsertMaterializer(
-                        ttlConfig, serializer, equaliser, upsertKeyEqualiser, 
new int[] {0});
+    public void testInputHasUpsertKeyWithNonDeterministicColumn() throws 
Exception {
+        OneInputStreamOperator<RowData, RowData> materializer =
+                createOperator(LOGICAL_TYPES, UPSERT_KEY);
         KeyedOneInputStreamOperatorTestHarness<RowData, RowData, RowData> 
testHarness =
-                new KeyedOneInputStreamOperatorTestHarness<>(
-                        materializer, keySelector, 
keySelector.getProducedType());
+                createHarness(materializer);
 
         testHarness.open();
 
         testHarness.setStateTtlProcessingTime(1);
 
         testHarness.processElement(insertRecord(1L, 1, "a1"));
-        assertor.shouldEmit(testHarness, rowOfKind(RowKind.INSERT, 1L, 1, 
"a1"));
+        ASSERTOR.shouldEmit(testHarness, rowOfKind(RowKind.INSERT, 1L, 1, 
"a1"));
 
         testHarness.processElement(updateAfterRecord(1L, 1, "a11"));
-        assertor.shouldEmit(testHarness, rowOfKind(RowKind.UPDATE_AFTER, 1L, 
1, "a11"));
+        ASSERTOR.shouldEmit(testHarness, rowOfKind(RowKind.UPDATE_AFTER, 1L, 
1, "a11"));
 
         testHarness.processElement(insertRecord(3L, 1, "a3"));
-        assertor.shouldEmit(testHarness, rowOfKind(RowKind.UPDATE_AFTER, 3L, 
1, "a3"));
+        ASSERTOR.shouldEmit(testHarness, rowOfKind(RowKind.UPDATE_AFTER, 3L, 
1, "a3"));
 
         testHarness.processElement(deleteRecord(1L, 1, "a111"));
-        assertor.shouldEmitNothing(testHarness);
+        ASSERTOR.shouldEmitNothing(testHarness);
 
         testHarness.processElement(deleteRecord(3L, 1, "a33"));
-        assertor.shouldEmit(testHarness, rowOfKind(RowKind.DELETE, 3L, 1, 
"a33"));
+        ASSERTOR.shouldEmit(testHarness, rowOfKind(RowKind.DELETE, 3L, 1, 
"a33"));
 
         testHarness.processElement(insertRecord(4L, 1, "a4"));
-        assertor.shouldEmit(testHarness, rowOfKind(RowKind.INSERT, 4L, 1, 
"a4"));
+        ASSERTOR.shouldEmit(testHarness, rowOfKind(RowKind.INSERT, 4L, 1, 
"a4"));
 
         testHarness.setStateTtlProcessingTime(1002);
 
         testHarness.processElement(deleteRecord(4L, 1, "a4"));
-        assertor.shouldEmitNothing(testHarness);
+        ASSERTOR.shouldEmitNothing(testHarness);
 
         testHarness.close();
     }
@@ -162,9 +174,41 @@ class SinkUpsertMaterializerTest {
     }
 
     private static class TestUpsertKeyEqualiser implements RecordEqualiser {
+
         @Override
         public boolean equals(RowData row1, RowData row2) {
-            return row1.getRowKind() == row2.getRowKind() && row1.getLong(0) 
== row2.getLong(0);
+            return row1.getRowKind() == row2.getRowKind()
+                    && row1.getLong(UPSERT_KEY) == row2.getLong(UPSERT_KEY);
         }
     }
+
+    private OneInputStreamOperator<RowData, RowData> 
createOperatorWithoutUpsertKey() {
+        return createOperator(LOGICAL_TYPES, (int[]) null);
+    }
+
+    private OneInputStreamOperator<RowData, RowData> createOperator(
+            LogicalType[] types, int... upsertKey) {
+        return SinkUpsertMaterializer.create(
+                TTL_CONFIG, RowType.of(types), EQUALISER, 
UPSERT_KEY_EQUALISER, upsertKey);
+    }
+
+    private KeyedOneInputStreamOperatorTestHarness<RowData, RowData, RowData> 
createHarness(
+            OneInputStreamOperator<RowData, RowData> m2) throws Exception {
+        return createHarness(m2, stateBackend, LOGICAL_TYPES);
+    }
+
+    static KeyedOneInputStreamOperatorTestHarness<RowData, RowData, RowData> 
createHarness(
+            OneInputStreamOperator<RowData, RowData> materializer,
+            SinkUpsertMaterializerStateBackend backend,
+            LogicalType[] types)
+            throws Exception {
+        KeyedOneInputStreamOperatorTestHarness<RowData, RowData, RowData> 
testHarness =
+                new KeyedOneInputStreamOperatorTestHarness<>(
+                        materializer,
+                        HandwrittenSelectorUtil.getRowDataSelector(new int[] 
{1}, types),
+                        HandwrittenSelectorUtil.getRowDataSelector(new int[] 
{1}, types)
+                                .getProducedType());
+        testHarness.setStateBackend(backend.create(true));
+        return testHarness;
+    }
 }
diff --git 
a/flink-table/flink-table-runtime/src/test/java/org/apache/flink/table/runtime/util/RowDataHarnessAssertor.java
 
b/flink-table/flink-table-runtime/src/test/java/org/apache/flink/table/runtime/util/RowDataHarnessAssertor.java
index d7fbef9f162..b4b074f7249 100644
--- 
a/flink-table/flink-table-runtime/src/test/java/org/apache/flink/table/runtime/util/RowDataHarnessAssertor.java
+++ 
b/flink-table/flink-table-runtime/src/test/java/org/apache/flink/table/runtime/util/RowDataHarnessAssertor.java
@@ -61,6 +61,14 @@ public class RowDataHarnessAssertor {
         assertThat(getEmittedRows(harness)).containsExactly(expected);
     }
 
+    /** Assert the test harness should emit records exactly same as the 
expected records. */
+    public void shouldEmit(
+            AbstractStreamOperatorTestHarness<RowData> harness,
+            String description,
+            RowData... expected) {
+        
assertThat(getEmittedRows(harness)).describedAs(description).containsExactly(expected);
+    }
+
     /** Assert the test harness should emit all records regardless of the 
order. */
     public void shouldEmitAll(
             AbstractStreamOperatorTestHarness<RowData> harness, RowData... 
expected) {

Reply via email to