This is an automated email from the ASF dual-hosted git repository. roman pushed a commit to branch master in repository https://gitbox.apache.org/repos/asf/flink.git
commit 4d1a611850b1604de8a4fa40a9b10320bf6ab902 Author: Roman Khachatryan <[email protected]> AuthorDate: Tue Sep 30 10:44:25 2025 +0200 [FLINK-38460] Add SinkUpsertMaterializerRescalingTest --- .../sink/SinkUpsertMaterializerRescalingTest.java | 353 +++++++++++++++++++++ 1 file changed, 353 insertions(+) diff --git a/flink-table/flink-table-runtime/src/test/java/org/apache/flink/table/runtime/operators/sink/SinkUpsertMaterializerRescalingTest.java b/flink-table/flink-table-runtime/src/test/java/org/apache/flink/table/runtime/operators/sink/SinkUpsertMaterializerRescalingTest.java new file mode 100644 index 00000000000..9d8990264bc --- /dev/null +++ b/flink-table/flink-table-runtime/src/test/java/org/apache/flink/table/runtime/operators/sink/SinkUpsertMaterializerRescalingTest.java @@ -0,0 +1,353 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.flink.table.runtime.operators.sink; + +import org.apache.flink.api.common.state.StateTtlConfig; +import org.apache.flink.core.execution.SavepointFormatType; +import org.apache.flink.runtime.checkpoint.CheckpointType; +import org.apache.flink.runtime.checkpoint.OperatorSubtaskState; +import org.apache.flink.runtime.checkpoint.SavepointType; +import org.apache.flink.runtime.state.KeyGroupRangeAssignment; +import org.apache.flink.streaming.api.operators.OneInputStreamOperator; +import org.apache.flink.streaming.runtime.streamrecord.StreamRecord; +import org.apache.flink.streaming.util.AbstractStreamOperatorTestHarness; +import org.apache.flink.streaming.util.KeyedOneInputStreamOperatorTestHarness; +import org.apache.flink.table.data.RowData; +import org.apache.flink.table.runtime.generated.GeneratedRecordEqualiser; +import org.apache.flink.table.runtime.generated.HashFunction; +import org.apache.flink.table.runtime.generated.RecordEqualiser; +import org.apache.flink.table.runtime.keyselector.RowDataKeySelector; +import org.apache.flink.table.runtime.util.RowDataHarnessAssertor; +import org.apache.flink.table.runtime.util.StateConfigUtil; +import org.apache.flink.table.types.logical.BigIntType; +import org.apache.flink.table.types.logical.IntType; +import org.apache.flink.table.types.logical.LogicalType; +import org.apache.flink.table.types.logical.RowType; +import org.apache.flink.table.types.logical.VarCharType; +import org.apache.flink.table.utils.HandwrittenSelectorUtil; +import org.apache.flink.types.RowKind; + +import org.junit.Test; +import org.junit.runner.RunWith; +import org.junit.runners.Parameterized; +import org.junit.runners.Parameterized.Parameter; + +import javax.annotation.Nullable; + +import java.util.ArrayList; +import java.util.Arrays; +import java.util.List; +import java.util.Objects; +import java.util.function.ToIntFunction; +import java.util.stream.Collectors; + +import static org.apache.flink.table.runtime.util.StreamRecordUtils.deleteRecord; +import static org.apache.flink.table.runtime.util.StreamRecordUtils.insertRecord; +import static org.apache.flink.table.runtime.util.StreamRecordUtils.rowOfKind; + +/** Rescaling and migration unit tests for {@link SinkUpsertMaterializer}. */ +@RunWith(Parameterized.class) +public class SinkUpsertMaterializerRescalingTest { + + @Parameter public SinkUpsertMaterializerStateBackend backend; + + @Parameterized.Parameters(name = "stateBackend={0}") + public static Object[][] generateTestParameters() { + List<Object[]> result = new ArrayList<>(); + for (SinkUpsertMaterializerStateBackend backend : + SinkUpsertMaterializerStateBackend.values()) { + result.add(new Object[] {backend}); + } + return result.toArray(new Object[0][]); + } + + @Test + public void testScaleUpThenDown() throws Exception { + testRescaleFromToFrom(10, 2, 3, backend, backend); + } + + @Test + public void testScaleDownThenUp() throws Exception { + testRescaleFromToFrom(10, 3, 2, backend, backend); + } + + @Test + public void testRecovery() throws Exception { + testRescaleFromToFrom(1, 1, 1, backend, backend); + } + + @Test + public void testForwardAndBackwardMigration() throws Exception { + testRescaleFromToFrom(7, 3, 3, backend, getOtherBackend(backend)); + } + + @Test + public void testScaleUpThenDownWithMigration() throws Exception { + testRescaleFromToFrom(7, 1, 5, backend, getOtherBackend(backend)); + } + + @Test + public void testScaleDownThenUpWithMigration() throws Exception { + testRescaleFromToFrom( + 7, 5, 1, backend, getOtherBackend(SinkUpsertMaterializerStateBackend.HEAP)); + } + + private SinkUpsertMaterializerStateBackend getOtherBackend( + SinkUpsertMaterializerStateBackend backend) { + return backend == SinkUpsertMaterializerStateBackend.HEAP + ? SinkUpsertMaterializerStateBackend.ROCKSDB + : SinkUpsertMaterializerStateBackend.HEAP; + } + + @SuppressWarnings("unchecked") + private void testRescaleFromToFrom( + final int maxParallelism, + final int fromParallelism, + final int toParallelism, + final SinkUpsertMaterializerStateBackend fromBackend, + final SinkUpsertMaterializerStateBackend toBackend) + throws Exception { + + int[] currentParallelismRef = new int[] {fromParallelism}; + + boolean useSavepoint = fromBackend != toBackend; + + OneInputStreamOperator<RowData, RowData>[] materializers = + new OneInputStreamOperator[maxParallelism]; + KeyedOneInputStreamOperatorTestHarness<RowData, RowData, RowData>[] harnesses = + new KeyedOneInputStreamOperatorTestHarness[maxParallelism]; + + final ToIntFunction<StreamRecord<RowData>> combinedHarnesses = + (r) -> { + try { + int subtaskIndex = + KeyGroupRangeAssignment.assignKeyToParallelOperator( + KEY_SELECTOR.getKey(r.getValue()), + maxParallelism, + currentParallelismRef[0]); + + harnesses[subtaskIndex].processElement(r); + return subtaskIndex; + } catch (Exception e) { + throw new RuntimeException(e); + } + }; + + initHarnessesAndMaterializers( + harnesses, materializers, fromBackend, maxParallelism, fromParallelism, null); + + int idx = combinedHarnesses.applyAsInt(insertRecord(1L, 1, "a1")); + ASSERTOR.shouldEmit(harnesses[idx], rowOfKind(RowKind.INSERT, 1L, 1, "a1")); + + idx = combinedHarnesses.applyAsInt(insertRecord(2L, 1, "a2")); + ASSERTOR.shouldEmit(harnesses[idx], rowOfKind(RowKind.UPDATE_AFTER, 2L, 1, "a2")); + + List<OperatorSubtaskState> subtaskStates = + snapshotHarnesses(harnesses, fromParallelism, 1L, useSavepoint); + + currentParallelismRef[0] = toParallelism; + initHarnessesAndMaterializers( + harnesses, materializers, toBackend, maxParallelism, toParallelism, subtaskStates); + + idx = combinedHarnesses.applyAsInt(insertRecord(3L, 1, "a3")); + ASSERTOR.shouldEmit(harnesses[idx], rowOfKind(RowKind.UPDATE_AFTER, 3L, 1, "a3")); + + idx = combinedHarnesses.applyAsInt(insertRecord(4L, 1, "a4")); + ASSERTOR.shouldEmit(harnesses[idx], rowOfKind(RowKind.UPDATE_AFTER, 4L, 1, "a4")); + + subtaskStates = snapshotHarnesses(harnesses, toParallelism, 2L, useSavepoint); + + currentParallelismRef[0] = fromParallelism; + initHarnessesAndMaterializers( + harnesses, + materializers, + fromBackend, + maxParallelism, + fromParallelism, + subtaskStates); + + idx = combinedHarnesses.applyAsInt(deleteRecord(4L, 1, "a4")); + ASSERTOR.shouldEmit(harnesses[idx], rowOfKind(RowKind.UPDATE_AFTER, 3L, 1, "a3")); + + idx = combinedHarnesses.applyAsInt(deleteRecord(2L, 1, "a2")); + ASSERTOR.shouldEmitNothing(harnesses[idx]); + + idx = combinedHarnesses.applyAsInt(deleteRecord(3L, 1, "a3")); + ASSERTOR.shouldEmit(harnesses[idx], rowOfKind(RowKind.UPDATE_AFTER, 1L, 1, "a1")); + + idx = combinedHarnesses.applyAsInt(deleteRecord(1L, 1, "a1")); + ASSERTOR.shouldEmit(harnesses[idx], rowOfKind(RowKind.DELETE, 1L, 1, "a1")); + + idx = combinedHarnesses.applyAsInt(insertRecord(4L, 1, "a4")); + ASSERTOR.shouldEmit(harnesses[idx], rowOfKind(RowKind.INSERT, 4L, 1, "a4")); + + Arrays.stream(harnesses) + .filter(Objects::nonNull) + .forEach(h -> h.setStateTtlProcessingTime(1002)); + + idx = combinedHarnesses.applyAsInt(deleteRecord(4L, 1, "a4")); + ASSERTOR.shouldEmitNothing(harnesses[idx]); + + Arrays.stream(harnesses) + .filter(Objects::nonNull) + .forEach( + h -> { + try { + h.close(); + } catch (Exception e) { + throw new RuntimeException(e); + } + }); + } + + private void initHarnessesAndMaterializers( + KeyedOneInputStreamOperatorTestHarness<RowData, RowData, RowData>[] harnesses, + OneInputStreamOperator<RowData, RowData>[] materializers, + SinkUpsertMaterializerStateBackend backend, + int maxParallelism, + int parallelism, + @Nullable List<OperatorSubtaskState> subtaskStates) + throws Exception { + for (int i = 0; i < parallelism; ++i) { + materializers[i] = + SinkUpsertMaterializer.create( + TTL_CONFIG, + RowType.of(LOGICAL_TYPES), + EQUALISER, + UPSERT_KEY_EQUALISER, + null); + harnesses[i] = + new KeyedOneInputStreamOperatorTestHarness<>( + materializers[i], + KEY_SELECTOR, + KEY_SELECTOR.getProducedType(), + maxParallelism, + parallelism, + i); + + harnesses[i].setStateBackend(backend.create(false)); + + if (subtaskStates != null) { + OperatorSubtaskState operatorSubtaskState = + AbstractStreamOperatorTestHarness.repackageState( + subtaskStates.toArray(new OperatorSubtaskState[0])); + + harnesses[i].initializeState( + AbstractStreamOperatorTestHarness.repartitionOperatorState( + operatorSubtaskState, + maxParallelism, + subtaskStates.size(), + parallelism, + i)); + } + + harnesses[i].open(); + harnesses[i].setStateTtlProcessingTime(1); + } + } + + private List<OperatorSubtaskState> snapshotHarnesses( + KeyedOneInputStreamOperatorTestHarness<RowData, RowData, RowData>[] harnesses, + int parallelism, + long checkpointId, + boolean useSavepoint) { + return Arrays.stream(harnesses, 0, parallelism) + .map( + h -> { + try { + return h.snapshotWithLocalState( + checkpointId, + 0L, + useSavepoint + ? SavepointType.savepoint( + SavepointFormatType.CANONICAL) + : CheckpointType.CHECKPOINT) + .getJobManagerOwnedState(); + } catch (Exception e) { + throw new RuntimeException(e); + } + }) + .collect(Collectors.toList()); + } + + /** Test equalizer for records. */ + protected static class TestRecordEqualiser implements RecordEqualiser, HashFunction { + @Override + public boolean equals(RowData row1, RowData row2) { + return row1.getRowKind() == row2.getRowKind() + && row1.getLong(0) == row2.getLong(0) + && row1.getInt(1) == row2.getInt(1) + && row1.getString(2).equals(row2.getString(2)); + } + + @Override + public int hashCode(Object data) { + RowData rd = (RowData) data; + return Objects.hash(rd.getRowKind(), rd.getLong(0), rd.getInt(1), rd.getString(2)); + } + } + + /** Test equalizer for upsert keys. */ + protected static class TestUpsertKeyEqualiser implements RecordEqualiser, HashFunction { + @Override + public boolean equals(RowData row1, RowData row2) { + return row1.getRowKind() == row2.getRowKind() && row1.getLong(0) == row2.getLong(0); + } + + @Override + public int hashCode(Object data) { + RowData rd = (RowData) data; + return Objects.hash(rd.getRowKind(), rd.getLong(0)); + } + } + + private static class MyGeneratedRecordEqualiser extends GeneratedRecordEqualiser { + + public MyGeneratedRecordEqualiser() { + super("", "", new Object[0]); + } + + @Override + public RecordEqualiser newInstance(ClassLoader classLoader) { + return new TestRecordEqualiser(); + } + } + + private static final StateTtlConfig TTL_CONFIG = StateConfigUtil.createTtlConfig(1000); + + private static final LogicalType[] LOGICAL_TYPES = + new LogicalType[] {new BigIntType(), new IntType(), new VarCharType()}; + + private static final RowDataKeySelector KEY_SELECTOR = + HandwrittenSelectorUtil.getRowDataSelector(new int[] {1}, LOGICAL_TYPES); + + private static final RowDataHarnessAssertor ASSERTOR = + new RowDataHarnessAssertor(LOGICAL_TYPES); + + private static final GeneratedRecordEqualiser EQUALISER = new MyGeneratedRecordEqualiser(); + + private static final GeneratedRecordEqualiser UPSERT_KEY_EQUALISER = + new GeneratedRecordEqualiser("", "", new Object[0]) { + + @Override + public RecordEqualiser newInstance(ClassLoader classLoader) { + return new TestUpsertKeyEqualiser(); + } + }; +}
