This is an automated email from the ASF dual-hosted git repository. roman pushed a commit to branch master in repository https://gitbox.apache.org/repos/asf/flink.git
commit 3c581afdc5f8b787d829ca0c9bdacfc498084b17 Author: Roman Khachatryan <[email protected]> AuthorDate: Fri Oct 10 14:18:51 2025 +0000 [FLINK-38460] Add SinkUpsertMaterializer state growth test --- .../operators/sink/SinkUpsertMaterializerTest.java | 62 ++++++++++++++++++++++ 1 file changed, 62 insertions(+) diff --git a/flink-table/flink-table-runtime/src/test/java/org/apache/flink/table/runtime/operators/sink/SinkUpsertMaterializerTest.java b/flink-table/flink-table-runtime/src/test/java/org/apache/flink/table/runtime/operators/sink/SinkUpsertMaterializerTest.java index cd92c2fe702..f8fb9754f26 100644 --- a/flink-table/flink-table-runtime/src/test/java/org/apache/flink/table/runtime/operators/sink/SinkUpsertMaterializerTest.java +++ b/flink-table/flink-table-runtime/src/test/java/org/apache/flink/table/runtime/operators/sink/SinkUpsertMaterializerTest.java @@ -20,8 +20,13 @@ package org.apache.flink.table.runtime.operators.sink; import org.apache.flink.api.common.state.StateTtlConfig; import org.apache.flink.runtime.checkpoint.OperatorSubtaskState; +import org.apache.flink.runtime.checkpoint.StateObjectCollection; +import org.apache.flink.runtime.state.OperatorStateHandle; +import org.apache.flink.runtime.state.StateObject; import org.apache.flink.streaming.api.operators.OneInputStreamOperator; +import org.apache.flink.streaming.api.operators.OperatorSnapshotFinalizer; import org.apache.flink.streaming.util.KeyedOneInputStreamOperatorTestHarness; +import org.apache.flink.streaming.util.OneInputStreamOperatorTestHarness; import org.apache.flink.table.data.RowData; import org.apache.flink.table.runtime.generated.GeneratedRecordEqualiser; import org.apache.flink.table.runtime.generated.RecordEqualiser; @@ -41,12 +46,15 @@ import org.junit.runners.Parameterized; import org.junit.runners.Parameterized.Parameter; import java.util.ArrayList; +import java.util.Arrays; import java.util.List; +import java.util.stream.Collectors; import static org.apache.flink.table.runtime.util.StreamRecordUtils.deleteRecord; import static org.apache.flink.table.runtime.util.StreamRecordUtils.insertRecord; import static org.apache.flink.table.runtime.util.StreamRecordUtils.rowOfKind; import static org.apache.flink.table.runtime.util.StreamRecordUtils.updateAfterRecord; +import static org.junit.jupiter.api.Assertions.assertEquals; /** Test for {@link SinkUpsertMaterializer}. */ @RunWith(Parameterized.class) @@ -235,4 +243,58 @@ public class SinkUpsertMaterializerTest { testHarness.open(); } } + + @Test + public void testStateIsBounded() throws Exception { + int dop = 2; + int numIterations = 10; + OperatorSnapshotFinalizer[] snapshots = new OperatorSnapshotFinalizer[dop]; + long[] prevStateSizes = new long[dop]; + for (int i = 0; i < numIterations; i++) { + for (int subtask = 0; subtask < dop; subtask++) { + snapshots[subtask] = initAndSnapshot(snapshots[subtask], i); + long currentStateSize = + snapshots[subtask] + .getJobManagerOwnedState() + .getManagedOperatorState() + .stream() + .mapToLong(StateObject::getStateSize) + .sum(); + if (i > 0) { + assertEquals(prevStateSizes[subtask], currentStateSize); + } + prevStateSizes[subtask] = currentStateSize; + } + List<OperatorStateHandle> union = + Arrays.stream(snapshots) + .flatMap( + s -> + s + .getJobManagerOwnedState() + .getManagedOperatorState() + .stream()) + .collect(Collectors.toList()); + for (int j = 0; j < dop; j++) { + snapshots[j] = + new OperatorSnapshotFinalizer( + snapshots[j].getJobManagerOwnedState().toBuilder() + .setManagedOperatorState(new StateObjectCollection<>(union)) + .build(), + snapshots[j].getTaskLocalState()); + } + } + } + + private OperatorSnapshotFinalizer initAndSnapshot( + OperatorSnapshotFinalizer from, int newCheckpointID) throws Exception { + try (OneInputStreamOperatorTestHarness<RowData, RowData> harness = + createHarness( + createOperator(LOGICAL_TYPES, UPSERT_KEY), stateBackend, LOGICAL_TYPES)) { + if (from != null) { + harness.initializeState(from.getJobManagerOwnedState()); + } + harness.open(); + return harness.snapshotWithLocalState(newCheckpointID, newCheckpointID); + } + } }
