This is an automated email from the ASF dual-hosted git repository.

roman pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/flink.git

commit 3c581afdc5f8b787d829ca0c9bdacfc498084b17
Author: Roman Khachatryan <[email protected]>
AuthorDate: Fri Oct 10 14:18:51 2025 +0000

    [FLINK-38460] Add SinkUpsertMaterializer state growth test
---
 .../operators/sink/SinkUpsertMaterializerTest.java | 62 ++++++++++++++++++++++
 1 file changed, 62 insertions(+)

diff --git 
a/flink-table/flink-table-runtime/src/test/java/org/apache/flink/table/runtime/operators/sink/SinkUpsertMaterializerTest.java
 
b/flink-table/flink-table-runtime/src/test/java/org/apache/flink/table/runtime/operators/sink/SinkUpsertMaterializerTest.java
index cd92c2fe702..f8fb9754f26 100644
--- 
a/flink-table/flink-table-runtime/src/test/java/org/apache/flink/table/runtime/operators/sink/SinkUpsertMaterializerTest.java
+++ 
b/flink-table/flink-table-runtime/src/test/java/org/apache/flink/table/runtime/operators/sink/SinkUpsertMaterializerTest.java
@@ -20,8 +20,13 @@ package org.apache.flink.table.runtime.operators.sink;
 
 import org.apache.flink.api.common.state.StateTtlConfig;
 import org.apache.flink.runtime.checkpoint.OperatorSubtaskState;
+import org.apache.flink.runtime.checkpoint.StateObjectCollection;
+import org.apache.flink.runtime.state.OperatorStateHandle;
+import org.apache.flink.runtime.state.StateObject;
 import org.apache.flink.streaming.api.operators.OneInputStreamOperator;
+import org.apache.flink.streaming.api.operators.OperatorSnapshotFinalizer;
 import org.apache.flink.streaming.util.KeyedOneInputStreamOperatorTestHarness;
+import org.apache.flink.streaming.util.OneInputStreamOperatorTestHarness;
 import org.apache.flink.table.data.RowData;
 import org.apache.flink.table.runtime.generated.GeneratedRecordEqualiser;
 import org.apache.flink.table.runtime.generated.RecordEqualiser;
@@ -41,12 +46,15 @@ import org.junit.runners.Parameterized;
 import org.junit.runners.Parameterized.Parameter;
 
 import java.util.ArrayList;
+import java.util.Arrays;
 import java.util.List;
+import java.util.stream.Collectors;
 
 import static 
org.apache.flink.table.runtime.util.StreamRecordUtils.deleteRecord;
 import static 
org.apache.flink.table.runtime.util.StreamRecordUtils.insertRecord;
 import static org.apache.flink.table.runtime.util.StreamRecordUtils.rowOfKind;
 import static 
org.apache.flink.table.runtime.util.StreamRecordUtils.updateAfterRecord;
+import static org.junit.jupiter.api.Assertions.assertEquals;
 
 /** Test for {@link SinkUpsertMaterializer}. */
 @RunWith(Parameterized.class)
@@ -235,4 +243,58 @@ public class SinkUpsertMaterializerTest {
             testHarness.open();
         }
     }
+
+    @Test
+    public void testStateIsBounded() throws Exception {
+        int dop = 2;
+        int numIterations = 10;
+        OperatorSnapshotFinalizer[] snapshots = new 
OperatorSnapshotFinalizer[dop];
+        long[] prevStateSizes = new long[dop];
+        for (int i = 0; i < numIterations; i++) {
+            for (int subtask = 0; subtask < dop; subtask++) {
+                snapshots[subtask] = initAndSnapshot(snapshots[subtask], i);
+                long currentStateSize =
+                        snapshots[subtask]
+                                .getJobManagerOwnedState()
+                                .getManagedOperatorState()
+                                .stream()
+                                .mapToLong(StateObject::getStateSize)
+                                .sum();
+                if (i > 0) {
+                    assertEquals(prevStateSizes[subtask], currentStateSize);
+                }
+                prevStateSizes[subtask] = currentStateSize;
+            }
+            List<OperatorStateHandle> union =
+                    Arrays.stream(snapshots)
+                            .flatMap(
+                                    s ->
+                                            s
+                                                    .getJobManagerOwnedState()
+                                                    .getManagedOperatorState()
+                                                    .stream())
+                            .collect(Collectors.toList());
+            for (int j = 0; j < dop; j++) {
+                snapshots[j] =
+                        new OperatorSnapshotFinalizer(
+                                
snapshots[j].getJobManagerOwnedState().toBuilder()
+                                        .setManagedOperatorState(new 
StateObjectCollection<>(union))
+                                        .build(),
+                                snapshots[j].getTaskLocalState());
+            }
+        }
+    }
+
+    private OperatorSnapshotFinalizer initAndSnapshot(
+            OperatorSnapshotFinalizer from, int newCheckpointID) throws 
Exception {
+        try (OneInputStreamOperatorTestHarness<RowData, RowData> harness =
+                createHarness(
+                        createOperator(LOGICAL_TYPES, UPSERT_KEY), 
stateBackend, LOGICAL_TYPES)) {
+            if (from != null) {
+                harness.initializeState(from.getJobManagerOwnedState());
+            }
+            harness.open();
+            return harness.snapshotWithLocalState(newCheckpointID, 
newCheckpointID);
+        }
+    }
 }

Reply via email to