This is an automated email from the ASF dual-hosted git repository.

rkhachatryan pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/flink.git

commit f08eb4345004b1422d100fccdee9667804122ad2
Author: Roman Khachatryan <[email protected]>
AuthorDate: Fri Mar 13 13:08:50 2026 +0100

    [FLINK-39740][table/runtime] Fix highSqn update in LinkedMultiSetState.add
---
 .../linked/LinkedMultiSetState.java                |  2 +-
 .../SequencedMultiSetStateTest.java                | 28 ++++++++++++++++++++++
 2 files changed, 29 insertions(+), 1 deletion(-)

diff --git 
a/flink-table/flink-table-runtime/src/main/java/org/apache/flink/table/runtime/sequencedmultisetstate/linked/LinkedMultiSetState.java
 
b/flink-table/flink-table-runtime/src/main/java/org/apache/flink/table/runtime/sequencedmultisetstate/linked/LinkedMultiSetState.java
index 840fe007ebf..801bab4a973 100644
--- 
a/flink-table/flink-table-runtime/src/main/java/org/apache/flink/table/runtime/sequencedmultisetstate/linked/LinkedMultiSetState.java
+++ 
b/flink-table/flink-table-runtime/src/main/java/org/apache/flink/table/runtime/sequencedmultisetstate/linked/LinkedMultiSetState.java
@@ -185,8 +185,8 @@ public class LinkedMultiSetState implements 
SequencedMultiSetState<RowData> {
                 isNewRowKey
                         ? new Node(row, newSqn, highSqn, null, null, timestamp)
                         : sqnToNodeState.get(oldSqn).withRow(row, timestamp));
-        highestSqnAndSizeState.update(MetaSqnInfo.of(newSqn, newSize));
         if (isNewRowKey) {
+            highestSqnAndSizeState.update(MetaSqnInfo.of(newSqn, newSize));
             rowToSqnState.put(key, RowSqnInfo.ofSingle(newSqn));
             if (!isNewContextKey) {
                 sqnToNodeState.put(highSqn, 
sqnToNodeState.get(highSqn).withNext(newSqn));
diff --git 
a/flink-table/flink-table-runtime/src/test/java/org/apache/flink/table/runtime/sequencedmultisetstate/SequencedMultiSetStateTest.java
 
b/flink-table/flink-table-runtime/src/test/java/org/apache/flink/table/runtime/sequencedmultisetstate/SequencedMultiSetStateTest.java
index b68638115cf..38f3c826d21 100644
--- 
a/flink-table/flink-table-runtime/src/test/java/org/apache/flink/table/runtime/sequencedmultisetstate/SequencedMultiSetStateTest.java
+++ 
b/flink-table/flink-table-runtime/src/test/java/org/apache/flink/table/runtime/sequencedmultisetstate/SequencedMultiSetStateTest.java
@@ -229,6 +229,34 @@ public class SequencedMultiSetStateTest {
                 });
     }
 
+    /** Test that replacing a non-tail row preserves the ability to add new 
rows afterwards. */
+    @TestTemplate
+    public void testAddAfterReplacingNonTail() throws Exception {
+        runTest(
+                state -> {
+                    state.add(row("k1", "v1"), 1L);
+                    state.add(row("k2", "v2"), 2L);
+                    state.add(row("k3", "v3"), 3L);
+
+                    // replace k1 (not the tail) - should not corrupt highSqn
+                    state.add(row("k1", "v1-updated"), 4L);
+                    assertStateContents(
+                            state,
+                            Tuple2.of(row("k1", "v1-updated"), 4L),
+                            Tuple2.of(row("k2", "v2"), 2L),
+                            Tuple2.of(row("k3", "v3"), 3L));
+
+                    // adding a new key after the replace should work correctly
+                    state.add(row("k4", "v4"), 5L);
+                    assertStateContents(
+                            state,
+                            Tuple2.of(row("k1", "v1-updated"), 4L),
+                            Tuple2.of(row("k2", "v2"), 2L),
+                            Tuple2.of(row("k3", "v3"), 3L),
+                            Tuple2.of(row("k4", "v4"), 5L));
+                });
+    }
+
     @TestTemplate
     public void testAddAfterRemovingTail() throws Exception {
         runTest(

Reply via email to