This is an automated email from the ASF dual-hosted git repository.

jark pushed a commit to branch main
in repository https://gitbox.apache.org/repos/asf/fluss.git

commit fc9c52f6121deab4f49e1e905dc7f50e1273525a
Author: 白鵺 <[email protected]>
AuthorDate: Thu Feb 12 07:29:38 2026 +0800

    [test] Fix flaky testUndoRecoveryMultipleKeys by checkpointing keyIndex in 
FailingCountingSource
    
    keyIndex was not persisted in checkpoint state, causing round-robin key
    distribution to shift after failover recovery, resulting in key1 receiving
    an extra record (140 instead of expected 130).
---
 .../sink/testutils/FailingCountingSource.java      | 30 +++++++++++++++++-----
 1 file changed, 24 insertions(+), 6 deletions(-)

diff --git 
a/fluss-flink/fluss-flink-common/src/test/java/org/apache/fluss/flink/sink/testutils/FailingCountingSource.java
 
b/fluss-flink/fluss-flink-common/src/test/java/org/apache/fluss/flink/sink/testutils/FailingCountingSource.java
index b6a3e3aa9..8d29bc42e 100644
--- 
a/fluss-flink/fluss-flink-common/src/test/java/org/apache/fluss/flink/sink/testutils/FailingCountingSource.java
+++ 
b/fluss-flink/fluss-flink-common/src/test/java/org/apache/fluss/flink/sink/testutils/FailingCountingSource.java
@@ -234,11 +234,18 @@ public class FailingCountingSource
         private final int splitId;
         private final int emittedRecords;
         private final boolean hasFailed;
+        private final int keyIndex;
 
         public FailingCountingSplit(int splitId, int emittedRecords, boolean 
hasFailed) {
+            this(splitId, emittedRecords, hasFailed, 0);
+        }
+
+        public FailingCountingSplit(
+                int splitId, int emittedRecords, boolean hasFailed, int 
keyIndex) {
             this.splitId = splitId;
             this.emittedRecords = emittedRecords;
             this.hasFailed = hasFailed;
+            this.keyIndex = keyIndex;
         }
 
         @Override
@@ -258,6 +265,10 @@ public class FailingCountingSource
             return hasFailed;
         }
 
+        public int getKeyIndex() {
+            return keyIndex;
+        }
+
         @Override
         public String toString() {
             return "FailingCountingSplit{"
@@ -267,6 +278,8 @@ public class FailingCountingSource
                     + emittedRecords
                     + ", hasFailed="
                     + hasFailed
+                    + ", keyIndex="
+                    + keyIndex
                     + '}';
         }
     }
@@ -498,8 +511,9 @@ public class FailingCountingSource
 
         @Override
         public List<FailingCountingSplit> snapshotState(long checkpointId) {
-            // Checkpoint the current state including hasFailed flag
-            return Collections.singletonList(new FailingCountingSplit(0, 
totalEmitted, hasFailed));
+            // Checkpoint the current state including hasFailed flag and 
keyIndex
+            return Collections.singletonList(
+                    new FailingCountingSplit(0, totalEmitted, hasFailed, 
keyIndex));
         }
 
         @Override
@@ -513,6 +527,7 @@ public class FailingCountingSource
             for (FailingCountingSplit split : newSplits) {
                 this.totalEmitted = split.getEmittedRecords();
                 this.hasFailed = split.hasFailed();
+                this.keyIndex = split.getKeyIndex();
                 splits.add(split);
             }
             availableFuture.complete(null);
@@ -533,11 +548,12 @@ public class FailingCountingSource
     /**
      * Serializer for FailingCountingSplit.
      *
-     * <p>Serialization format: splitId (4 bytes) + emittedRecords (4 bytes) + 
hasFailed (1 byte)
+     * <p>Serialization format: splitId (4 bytes) + emittedRecords (4 bytes) + 
hasFailed (1 byte) +
+     * keyIndex (4 bytes)
      */
     private static class FailingCountingSplitSerializer
             implements SimpleVersionedSerializer<FailingCountingSplit> {
-        private static final int VERSION = 1;
+        private static final int VERSION = 2;
 
         @Override
         public int getVersion() {
@@ -546,10 +562,11 @@ public class FailingCountingSource
 
         @Override
         public byte[] serialize(FailingCountingSplit split) {
-            ByteBuffer buffer = ByteBuffer.allocate(9);
+            ByteBuffer buffer = ByteBuffer.allocate(13);
             buffer.putInt(split.getSplitIdInt());
             buffer.putInt(split.getEmittedRecords());
             buffer.put((byte) (split.hasFailed() ? 1 : 0));
+            buffer.putInt(split.getKeyIndex());
             return buffer.array();
         }
 
@@ -559,7 +576,8 @@ public class FailingCountingSource
             int splitId = buffer.getInt();
             int emittedRecords = buffer.getInt();
             boolean hasFailed = buffer.get() == 1;
-            return new FailingCountingSplit(splitId, emittedRecords, 
hasFailed);
+            int keyIndex = version >= 2 ? buffer.getInt() : 0;
+            return new FailingCountingSplit(splitId, emittedRecords, 
hasFailed, keyIndex);
         }
     }
 

Reply via email to