This is an automated email from the ASF dual-hosted git repository. roman pushed a commit to branch master in repository https://gitbox.apache.org/repos/asf/flink.git
commit 742730ff718ed493896e1d3d14ff66a4f86257e6 Author: Roman Khachatryan <[email protected]> AuthorDate: Fri Oct 10 13:54:32 2025 +0000 [hotfix] Refactor SinkUpsertMaterializer tests --- .../sink/SinkUpsertMaterializerStateBackend.java | 41 +++++++ .../operators/sink/SinkUpsertMaterializerTest.java | 124 ++++++++++++++------- .../table/runtime/util/RowDataHarnessAssertor.java | 8 ++ 3 files changed, 133 insertions(+), 40 deletions(-) diff --git a/flink-table/flink-table-runtime/src/test/java/org/apache/flink/table/runtime/operators/sink/SinkUpsertMaterializerStateBackend.java b/flink-table/flink-table-runtime/src/test/java/org/apache/flink/table/runtime/operators/sink/SinkUpsertMaterializerStateBackend.java new file mode 100644 index 00000000000..b675df2893e --- /dev/null +++ b/flink-table/flink-table-runtime/src/test/java/org/apache/flink/table/runtime/operators/sink/SinkUpsertMaterializerStateBackend.java @@ -0,0 +1,41 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.flink.table.runtime.operators.sink; + +import org.apache.flink.contrib.streaming.state.EmbeddedRocksDBStateBackend; +import org.apache.flink.runtime.state.StateBackend; +import org.apache.flink.runtime.state.hashmap.HashMapStateBackend; + +/** StateBackend to test SinkUpsertMaterializer with. */ +public enum SinkUpsertMaterializerStateBackend { + HEAP { + + public StateBackend create(boolean incrementalIfSupported) { + return new HashMapStateBackend(); + } + }, + ROCKSDB { + + public StateBackend create(boolean incrementalIfSupported) { + return new EmbeddedRocksDBStateBackend(incrementalIfSupported); + } + }; + + public abstract StateBackend create(boolean incrementalIfSupported); +} diff --git a/flink-table/flink-table-runtime/src/test/java/org/apache/flink/table/runtime/operators/sink/SinkUpsertMaterializerTest.java b/flink-table/flink-table-runtime/src/test/java/org/apache/flink/table/runtime/operators/sink/SinkUpsertMaterializerTest.java index 322ac9e1ceb..86014220b81 100644 --- a/flink-table/flink-table-runtime/src/test/java/org/apache/flink/table/runtime/operators/sink/SinkUpsertMaterializerTest.java +++ b/flink-table/flink-table-runtime/src/test/java/org/apache/flink/table/runtime/operators/sink/SinkUpsertMaterializerTest.java @@ -19,22 +19,28 @@ package org.apache.flink.table.runtime.operators.sink; import org.apache.flink.api.common.state.StateTtlConfig; +import org.apache.flink.streaming.api.operators.OneInputStreamOperator; import org.apache.flink.streaming.util.KeyedOneInputStreamOperatorTestHarness; import org.apache.flink.table.data.RowData; import org.apache.flink.table.runtime.generated.GeneratedRecordEqualiser; import org.apache.flink.table.runtime.generated.RecordEqualiser; -import org.apache.flink.table.runtime.keyselector.RowDataKeySelector; -import org.apache.flink.table.runtime.typeutils.RowDataSerializer; import org.apache.flink.table.runtime.util.RowDataHarnessAssertor; import org.apache.flink.table.runtime.util.StateConfigUtil; import org.apache.flink.table.types.logical.BigIntType; import org.apache.flink.table.types.logical.IntType; import org.apache.flink.table.types.logical.LogicalType; +import org.apache.flink.table.types.logical.RowType; import org.apache.flink.table.types.logical.VarCharType; import org.apache.flink.table.utils.HandwrittenSelectorUtil; import org.apache.flink.types.RowKind; -import org.junit.jupiter.api.Test; +import org.junit.Test; +import org.junit.runner.RunWith; +import org.junit.runners.Parameterized; +import org.junit.runners.Parameterized.Parameter; + +import java.util.ArrayList; +import java.util.List; import static org.apache.flink.table.runtime.util.StreamRecordUtils.deleteRecord; import static org.apache.flink.table.runtime.util.StreamRecordUtils.insertRecord; @@ -42,17 +48,29 @@ import static org.apache.flink.table.runtime.util.StreamRecordUtils.rowOfKind; import static org.apache.flink.table.runtime.util.StreamRecordUtils.updateAfterRecord; /** Test for {@link SinkUpsertMaterializer}. */ -class SinkUpsertMaterializerTest { +@RunWith(Parameterized.class) +public class SinkUpsertMaterializerTest { + + static final int UPSERT_KEY = 0; + + @Parameter public SinkUpsertMaterializerStateBackend stateBackend; + + @Parameterized.Parameters(name = "stateBackend={0}") + public static Object[][] generateTestParameters() { + List<Object[]> result = new ArrayList<>(); + for (SinkUpsertMaterializerStateBackend backend : + SinkUpsertMaterializerStateBackend.values()) { + result.add(new Object[] {backend}); + } + return result.toArray(new Object[0][]); + } - private final StateTtlConfig ttlConfig = StateConfigUtil.createTtlConfig(1000); - private final LogicalType[] types = + static final StateTtlConfig TTL_CONFIG = StateConfigUtil.createTtlConfig(1000); + static final LogicalType[] LOGICAL_TYPES = new LogicalType[] {new BigIntType(), new IntType(), new VarCharType()}; - private final RowDataSerializer serializer = new RowDataSerializer(types); - private final RowDataKeySelector keySelector = - HandwrittenSelectorUtil.getRowDataSelector(new int[] {1}, types); - private final RowDataHarnessAssertor assertor = new RowDataHarnessAssertor(types); + static final RowDataHarnessAssertor ASSERTOR = new RowDataHarnessAssertor(LOGICAL_TYPES); - private final GeneratedRecordEqualiser equaliser = + static final GeneratedRecordEqualiser EQUALISER = new GeneratedRecordEqualiser("", "", new Object[0]) { @Override @@ -61,7 +79,7 @@ class SinkUpsertMaterializerTest { } }; - private final GeneratedRecordEqualiser upsertKeyEqualiser = + static final GeneratedRecordEqualiser UPSERT_KEY_EQUALISER = new GeneratedRecordEqualiser("", "", new Object[0]) { @Override @@ -71,82 +89,76 @@ class SinkUpsertMaterializerTest { }; @Test - void test() throws Exception { - SinkUpsertMaterializer materializer = - new SinkUpsertMaterializer( - ttlConfig, serializer, equaliser, upsertKeyEqualiser, null); + public void testNoUpsertKeyFlow() throws Exception { KeyedOneInputStreamOperatorTestHarness<RowData, RowData, RowData> testHarness = - new KeyedOneInputStreamOperatorTestHarness<>( - materializer, keySelector, keySelector.getProducedType()); + createHarness(createOperatorWithoutUpsertKey()); testHarness.open(); testHarness.setStateTtlProcessingTime(1); testHarness.processElement(insertRecord(1L, 1, "a1")); - assertor.shouldEmit(testHarness, rowOfKind(RowKind.INSERT, 1L, 1, "a1")); + ASSERTOR.shouldEmit(testHarness, rowOfKind(RowKind.INSERT, 1L, 1, "a1")); testHarness.processElement(insertRecord(2L, 1, "a2")); - assertor.shouldEmit(testHarness, rowOfKind(RowKind.UPDATE_AFTER, 2L, 1, "a2")); + ASSERTOR.shouldEmit(testHarness, rowOfKind(RowKind.UPDATE_AFTER, 2L, 1, "a2")); testHarness.processElement(insertRecord(3L, 1, "a3")); - assertor.shouldEmit(testHarness, rowOfKind(RowKind.UPDATE_AFTER, 3L, 1, "a3")); + ASSERTOR.shouldEmit(testHarness, rowOfKind(RowKind.UPDATE_AFTER, 3L, 1, "a3")); testHarness.processElement(deleteRecord(2L, 1, "a2")); - assertor.shouldEmitNothing(testHarness); + ASSERTOR.shouldEmitNothing(testHarness); testHarness.processElement(deleteRecord(3L, 1, "a3")); - assertor.shouldEmit(testHarness, rowOfKind(RowKind.UPDATE_AFTER, 1L, 1, "a1")); + ASSERTOR.shouldEmit(testHarness, rowOfKind(RowKind.UPDATE_AFTER, 1L, 1, "a1")); testHarness.processElement(deleteRecord(1L, 1, "a1")); - assertor.shouldEmit(testHarness, rowOfKind(RowKind.DELETE, 1L, 1, "a1")); + ASSERTOR.shouldEmit(testHarness, rowOfKind(RowKind.DELETE, 1L, 1, "a1")); testHarness.processElement(insertRecord(4L, 1, "a4")); - assertor.shouldEmit(testHarness, rowOfKind(RowKind.INSERT, 4L, 1, "a4")); + ASSERTOR.shouldEmit(testHarness, rowOfKind(RowKind.INSERT, 4L, 1, "a4")); testHarness.setStateTtlProcessingTime(1002); testHarness.processElement(deleteRecord(4L, 1, "a4")); - assertor.shouldEmitNothing(testHarness); + ASSERTOR.shouldEmitNothing(testHarness); testHarness.close(); } @Test - void testInputHasUpsertKeyWithNonDeterministicColumn() throws Exception { - SinkUpsertMaterializer materializer = - new SinkUpsertMaterializer( - ttlConfig, serializer, equaliser, upsertKeyEqualiser, new int[] {0}); + public void testInputHasUpsertKeyWithNonDeterministicColumn() throws Exception { + OneInputStreamOperator<RowData, RowData> materializer = + createOperator(LOGICAL_TYPES, UPSERT_KEY); KeyedOneInputStreamOperatorTestHarness<RowData, RowData, RowData> testHarness = - new KeyedOneInputStreamOperatorTestHarness<>( - materializer, keySelector, keySelector.getProducedType()); + createHarness(materializer); testHarness.open(); testHarness.setStateTtlProcessingTime(1); testHarness.processElement(insertRecord(1L, 1, "a1")); - assertor.shouldEmit(testHarness, rowOfKind(RowKind.INSERT, 1L, 1, "a1")); + ASSERTOR.shouldEmit(testHarness, rowOfKind(RowKind.INSERT, 1L, 1, "a1")); testHarness.processElement(updateAfterRecord(1L, 1, "a11")); - assertor.shouldEmit(testHarness, rowOfKind(RowKind.UPDATE_AFTER, 1L, 1, "a11")); + ASSERTOR.shouldEmit(testHarness, rowOfKind(RowKind.UPDATE_AFTER, 1L, 1, "a11")); testHarness.processElement(insertRecord(3L, 1, "a3")); - assertor.shouldEmit(testHarness, rowOfKind(RowKind.UPDATE_AFTER, 3L, 1, "a3")); + ASSERTOR.shouldEmit(testHarness, rowOfKind(RowKind.UPDATE_AFTER, 3L, 1, "a3")); testHarness.processElement(deleteRecord(1L, 1, "a111")); - assertor.shouldEmitNothing(testHarness); + ASSERTOR.shouldEmitNothing(testHarness); testHarness.processElement(deleteRecord(3L, 1, "a33")); - assertor.shouldEmit(testHarness, rowOfKind(RowKind.DELETE, 3L, 1, "a33")); + ASSERTOR.shouldEmit(testHarness, rowOfKind(RowKind.DELETE, 3L, 1, "a33")); testHarness.processElement(insertRecord(4L, 1, "a4")); - assertor.shouldEmit(testHarness, rowOfKind(RowKind.INSERT, 4L, 1, "a4")); + ASSERTOR.shouldEmit(testHarness, rowOfKind(RowKind.INSERT, 4L, 1, "a4")); testHarness.setStateTtlProcessingTime(1002); testHarness.processElement(deleteRecord(4L, 1, "a4")); - assertor.shouldEmitNothing(testHarness); + ASSERTOR.shouldEmitNothing(testHarness); testHarness.close(); } @@ -162,9 +174,41 @@ class SinkUpsertMaterializerTest { } private static class TestUpsertKeyEqualiser implements RecordEqualiser { + @Override public boolean equals(RowData row1, RowData row2) { - return row1.getRowKind() == row2.getRowKind() && row1.getLong(0) == row2.getLong(0); + return row1.getRowKind() == row2.getRowKind() + && row1.getLong(UPSERT_KEY) == row2.getLong(UPSERT_KEY); } } + + private OneInputStreamOperator<RowData, RowData> createOperatorWithoutUpsertKey() { + return createOperator(LOGICAL_TYPES, (int[]) null); + } + + private OneInputStreamOperator<RowData, RowData> createOperator( + LogicalType[] types, int... upsertKey) { + return SinkUpsertMaterializer.create( + TTL_CONFIG, RowType.of(types), EQUALISER, UPSERT_KEY_EQUALISER, upsertKey); + } + + private KeyedOneInputStreamOperatorTestHarness<RowData, RowData, RowData> createHarness( + OneInputStreamOperator<RowData, RowData> m2) throws Exception { + return createHarness(m2, stateBackend, LOGICAL_TYPES); + } + + static KeyedOneInputStreamOperatorTestHarness<RowData, RowData, RowData> createHarness( + OneInputStreamOperator<RowData, RowData> materializer, + SinkUpsertMaterializerStateBackend backend, + LogicalType[] types) + throws Exception { + KeyedOneInputStreamOperatorTestHarness<RowData, RowData, RowData> testHarness = + new KeyedOneInputStreamOperatorTestHarness<>( + materializer, + HandwrittenSelectorUtil.getRowDataSelector(new int[] {1}, types), + HandwrittenSelectorUtil.getRowDataSelector(new int[] {1}, types) + .getProducedType()); + testHarness.setStateBackend(backend.create(true)); + return testHarness; + } } diff --git a/flink-table/flink-table-runtime/src/test/java/org/apache/flink/table/runtime/util/RowDataHarnessAssertor.java b/flink-table/flink-table-runtime/src/test/java/org/apache/flink/table/runtime/util/RowDataHarnessAssertor.java index d7fbef9f162..b4b074f7249 100644 --- a/flink-table/flink-table-runtime/src/test/java/org/apache/flink/table/runtime/util/RowDataHarnessAssertor.java +++ b/flink-table/flink-table-runtime/src/test/java/org/apache/flink/table/runtime/util/RowDataHarnessAssertor.java @@ -61,6 +61,14 @@ public class RowDataHarnessAssertor { assertThat(getEmittedRows(harness)).containsExactly(expected); } + /** Assert the test harness should emit records exactly same as the expected records. */ + public void shouldEmit( + AbstractStreamOperatorTestHarness<RowData> harness, + String description, + RowData... expected) { + assertThat(getEmittedRows(harness)).describedAs(description).containsExactly(expected); + } + /** Assert the test harness should emit all records regardless of the order. */ public void shouldEmitAll( AbstractStreamOperatorTestHarness<RowData> harness, RowData... expected) {
