This is an automated email from the ASF dual-hosted git repository. roman pushed a commit to branch master in repository https://gitbox.apache.org/repos/asf/flink.git
commit 4cd638dbc5818f9e5e7a84daaa29ded47109e801 Author: Roman Khachatryan <[email protected]> AuthorDate: Tue Sep 30 10:34:34 2025 +0200 [FLINK-38460] Add SinkUpsertMaterializerMigrationTest --- .../sink/SinkUpsertMaterializerMigrationTest.java | 189 +++++++++++++++++++++ .../migration-flink-2.2-HEAP-V1-snapshot | Bin 0 -> 2566 bytes .../migration-flink-2.2-ROCKSDB-V1-snapshot | Bin 0 -> 16099 bytes pom.xml | 1 + 4 files changed, 190 insertions(+) diff --git a/flink-table/flink-table-runtime/src/test/java/org/apache/flink/table/runtime/operators/sink/SinkUpsertMaterializerMigrationTest.java b/flink-table/flink-table-runtime/src/test/java/org/apache/flink/table/runtime/operators/sink/SinkUpsertMaterializerMigrationTest.java new file mode 100644 index 00000000000..1ab7ac4ae84 --- /dev/null +++ b/flink-table/flink-table-runtime/src/test/java/org/apache/flink/table/runtime/operators/sink/SinkUpsertMaterializerMigrationTest.java @@ -0,0 +1,189 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.flink.table.runtime.operators.sink; + +import org.apache.flink.FlinkVersion; +import org.apache.flink.streaming.api.operators.OneInputStreamOperator; +import org.apache.flink.streaming.util.KeyedOneInputStreamOperatorTestHarness; +import org.apache.flink.streaming.util.OneInputStreamOperatorTestHarness; +import org.apache.flink.streaming.util.OperatorSnapshotUtil; +import org.apache.flink.table.data.RowData; +import org.apache.flink.table.runtime.typeutils.RowDataSerializer; +import org.apache.flink.table.types.logical.RowType; +import org.apache.flink.test.util.MigrationTest; +import org.apache.flink.types.RowKind; + +import org.junit.Test; +import org.junit.runner.RunWith; +import org.junit.runners.Parameterized; + +import java.nio.file.Files; +import java.nio.file.Path; +import java.nio.file.Paths; +import java.util.ArrayList; +import java.util.List; +import java.util.Set; + +import static org.apache.flink.FlinkVersion.current; +import static org.apache.flink.streaming.util.OperatorSnapshotUtil.getResourceFilename; +import static org.apache.flink.table.runtime.operators.sink.SinkUpsertMaterializerTest.ASSERTOR; +import static org.apache.flink.table.runtime.operators.sink.SinkUpsertMaterializerTest.EQUALISER; +import static org.apache.flink.table.runtime.operators.sink.SinkUpsertMaterializerTest.LOGICAL_TYPES; +import static org.apache.flink.table.runtime.operators.sink.SinkUpsertMaterializerTest.TTL_CONFIG; +import static org.apache.flink.table.runtime.operators.sink.SinkUpsertMaterializerTest.UPSERT_KEY; +import static org.apache.flink.table.runtime.operators.sink.SinkUpsertMaterializerTest.UPSERT_KEY_EQUALISER; +import static org.apache.flink.table.runtime.util.StreamRecordUtils.deleteRecord; +import static org.apache.flink.table.runtime.util.StreamRecordUtils.insertRecord; +import static org.apache.flink.table.runtime.util.StreamRecordUtils.rowOfKind; +import static org.apache.flink.table.runtime.util.StreamRecordUtils.updateAfterRecord; + +/** Test for {@link SinkUpsertMaterializer} migration. */ +@RunWith(Parameterized.class) +public class SinkUpsertMaterializerMigrationTest implements MigrationTest { + + private static final String FOLDER_NAME = "sink-upsert-materializer"; + + @Parameterized.Parameter(0) + @SuppressWarnings({"ClassEscapesDefinedScope", "DefaultAnnotationParam"}) + public SinkOperationMode migrateFrom; + + @Parameterized.Parameter(1) + @SuppressWarnings("ClassEscapesDefinedScope") + public SinkOperationMode migrateTo; + + @Parameterized.Parameters(name = "{0} -> {1}") + public static List<Object[]> parameters() { + List<Object[]> result = new ArrayList<>(); + Set<FlinkVersion> versions = FlinkVersion.rangeOf(FlinkVersion.v2_2, FlinkVersion.v2_2); + for (FlinkVersion fromVersion : versions) { + for (SinkUpsertMaterializerStateBackend backend : + SinkUpsertMaterializerStateBackend.values()) { + result.add( + new Object[] { + new SinkOperationMode(fromVersion, backend), + new SinkOperationMode(current(), backend) + }); + } + } + return result; + } + + @Test + public void testMigration() throws Exception { + String path = getResourceFilename(FOLDER_NAME + "/" + getFileName(migrateFrom)); + try (OneInputStreamOperatorTestHarness<RowData, RowData> harness = + createHarness(migrateTo, path)) { + testCorrectnessAfterSnapshot(harness); + } + } + + private OneInputStreamOperatorTestHarness<RowData, RowData> createHarness( + SinkOperationMode mode, String snapshotPath) throws Exception { + int[] inputUpsertKey = {UPSERT_KEY}; + OneInputStreamOperator<RowData, RowData> materializer = + SinkUpsertMaterializer.create( + TTL_CONFIG, + RowType.of(LOGICAL_TYPES), + EQUALISER, + UPSERT_KEY_EQUALISER, + inputUpsertKey); + KeyedOneInputStreamOperatorTestHarness<RowData, RowData, RowData> harness = + SinkUpsertMaterializerTest.createHarness( + materializer, mode.stateBackend, LOGICAL_TYPES); + harness.setup(new RowDataSerializer(LOGICAL_TYPES)); + if (snapshotPath != null) { + harness.initializeState(snapshotPath); + } + harness.open(); + harness.setStateTtlProcessingTime(1); + return harness; + } + + private void testCorrectnessBeforeSnapshot( + OneInputStreamOperatorTestHarness<RowData, RowData> testHarness) throws Exception { + + testHarness.processElement(insertRecord(1L, 1, "a1")); + ASSERTOR.shouldEmit(testHarness, rowOfKind(RowKind.INSERT, 1L, 1, "a1")); + + testHarness.processElement(updateAfterRecord(1L, 1, "a11")); + ASSERTOR.shouldEmit(testHarness, rowOfKind(RowKind.UPDATE_AFTER, 1L, 1, "a11")); + + testHarness.processElement(insertRecord(3L, 1, "a3")); + ASSERTOR.shouldEmit(testHarness, rowOfKind(RowKind.UPDATE_AFTER, 3L, 1, "a3")); + } + + private void testCorrectnessAfterSnapshot( + OneInputStreamOperatorTestHarness<RowData, RowData> testHarness) throws Exception { + testHarness.processElement(deleteRecord(1L, 1, "a111")); + ASSERTOR.shouldEmitNothing(testHarness); + + testHarness.processElement(deleteRecord(3L, 1, "a33")); + ASSERTOR.shouldEmit(testHarness, rowOfKind(RowKind.DELETE, 3L, 1, "a33")); + + testHarness.processElement(insertRecord(4L, 1, "a4")); + ASSERTOR.shouldEmit(testHarness, rowOfKind(RowKind.INSERT, 4L, 1, "a4")); + + testHarness.setStateTtlProcessingTime(1002); + testHarness.processElement(deleteRecord(4L, 1, "a4")); + ASSERTOR.shouldEmitNothing(testHarness); + } + + private static String getFileName(SinkOperationMode mode) { + return String.format( + "migration-flink-%s-%s-%s-snapshot", mode.version, mode.stateBackend, "V1"); + } + + @SnapshotsGenerator + public void writeSnapshot(FlinkVersion version) throws Exception { + for (SinkUpsertMaterializerStateBackend stateBackend : + SinkUpsertMaterializerStateBackend.values()) { + SinkOperationMode mode = new SinkOperationMode(version, stateBackend); + try (OneInputStreamOperatorTestHarness<RowData, RowData> harness = + createHarness(mode, null)) { + testCorrectnessBeforeSnapshot(harness); + Path parent = Paths.get("src/test/resources", FOLDER_NAME); + Files.createDirectories(parent); + OperatorSnapshotUtil.writeStateHandle( + harness.snapshot(1L, 1L), parent.resolve(getFileName(mode)).toString()); + } + } + } + + public static void main(String... s) throws Exception { + // Run this to manually generate snapshot files for migration tests + // set working directory to flink-table/flink-table-runtime/ + new SinkUpsertMaterializerMigrationTest().writeSnapshot(current()); + } + + private static class SinkOperationMode { + private final FlinkVersion version; + private final SinkUpsertMaterializerStateBackend stateBackend; + + private SinkOperationMode( + FlinkVersion version, SinkUpsertMaterializerStateBackend stateBackend) { + this.version = version; + this.stateBackend = stateBackend; + } + + @Override + public String toString() { + return String.format("flink=%s, state=%s}", version, stateBackend); + } + } +} diff --git a/flink-table/flink-table-runtime/src/test/resources/sink-upsert-materializer/migration-flink-2.2-HEAP-V1-snapshot b/flink-table/flink-table-runtime/src/test/resources/sink-upsert-materializer/migration-flink-2.2-HEAP-V1-snapshot new file mode 100644 index 00000000000..be4fd71e72a Binary files /dev/null and b/flink-table/flink-table-runtime/src/test/resources/sink-upsert-materializer/migration-flink-2.2-HEAP-V1-snapshot differ diff --git a/flink-table/flink-table-runtime/src/test/resources/sink-upsert-materializer/migration-flink-2.2-ROCKSDB-V1-snapshot b/flink-table/flink-table-runtime/src/test/resources/sink-upsert-materializer/migration-flink-2.2-ROCKSDB-V1-snapshot new file mode 100644 index 00000000000..1832c7a50eb Binary files /dev/null and b/flink-table/flink-table-runtime/src/test/resources/sink-upsert-materializer/migration-flink-2.2-ROCKSDB-V1-snapshot differ diff --git a/pom.xml b/pom.xml index 5085c460882..b9cc0ab6c8e 100644 --- a/pom.xml +++ b/pom.xml @@ -1716,6 +1716,7 @@ under the License. <exclude>**/src/test/resources/**/serializer-snapshot</exclude> <exclude>**/src/test/resources/**/test-data</exclude> <exclude>**/src/test/resources/*-snapshot</exclude> + <exclude>**/src/test/resources/**/*-snapshot</exclude> <exclude>**/src/test/resources/*.snapshot</exclude> <exclude>**/src/test/resources/*-savepoint/**</exclude> <exclude>**/src/test/resources/*-savepoint-native/**</exclude>
