This is an automated email from the ASF dual-hosted git repository. jark pushed a commit to branch main in repository https://gitbox.apache.org/repos/asf/fluss.git
commit fc9c52f6121deab4f49e1e905dc7f50e1273525a Author: 白鵺 <[email protected]> AuthorDate: Thu Feb 12 07:29:38 2026 +0800 [test] Fix flaky testUndoRecoveryMultipleKeys by checkpointing keyIndex in FailingCountingSource keyIndex was not persisted in checkpoint state, causing round-robin key distribution to shift after failover recovery, resulting in key1 receiving an extra record (140 instead of expected 130). --- .../sink/testutils/FailingCountingSource.java | 30 +++++++++++++++++----- 1 file changed, 24 insertions(+), 6 deletions(-) diff --git a/fluss-flink/fluss-flink-common/src/test/java/org/apache/fluss/flink/sink/testutils/FailingCountingSource.java b/fluss-flink/fluss-flink-common/src/test/java/org/apache/fluss/flink/sink/testutils/FailingCountingSource.java index b6a3e3aa9..8d29bc42e 100644 --- a/fluss-flink/fluss-flink-common/src/test/java/org/apache/fluss/flink/sink/testutils/FailingCountingSource.java +++ b/fluss-flink/fluss-flink-common/src/test/java/org/apache/fluss/flink/sink/testutils/FailingCountingSource.java @@ -234,11 +234,18 @@ public class FailingCountingSource private final int splitId; private final int emittedRecords; private final boolean hasFailed; + private final int keyIndex; public FailingCountingSplit(int splitId, int emittedRecords, boolean hasFailed) { + this(splitId, emittedRecords, hasFailed, 0); + } + + public FailingCountingSplit( + int splitId, int emittedRecords, boolean hasFailed, int keyIndex) { this.splitId = splitId; this.emittedRecords = emittedRecords; this.hasFailed = hasFailed; + this.keyIndex = keyIndex; } @Override @@ -258,6 +265,10 @@ public class FailingCountingSource return hasFailed; } + public int getKeyIndex() { + return keyIndex; + } + @Override public String toString() { return "FailingCountingSplit{" @@ -267,6 +278,8 @@ public class FailingCountingSource + emittedRecords + ", hasFailed=" + hasFailed + + ", keyIndex=" + + keyIndex + '}'; } } @@ -498,8 +511,9 @@ public class FailingCountingSource @Override public List<FailingCountingSplit> snapshotState(long checkpointId) { - // Checkpoint the current state including hasFailed flag - return Collections.singletonList(new FailingCountingSplit(0, totalEmitted, hasFailed)); + // Checkpoint the current state including hasFailed flag and keyIndex + return Collections.singletonList( + new FailingCountingSplit(0, totalEmitted, hasFailed, keyIndex)); } @Override @@ -513,6 +527,7 @@ public class FailingCountingSource for (FailingCountingSplit split : newSplits) { this.totalEmitted = split.getEmittedRecords(); this.hasFailed = split.hasFailed(); + this.keyIndex = split.getKeyIndex(); splits.add(split); } availableFuture.complete(null); @@ -533,11 +548,12 @@ public class FailingCountingSource /** * Serializer for FailingCountingSplit. * - * <p>Serialization format: splitId (4 bytes) + emittedRecords (4 bytes) + hasFailed (1 byte) + * <p>Serialization format: splitId (4 bytes) + emittedRecords (4 bytes) + hasFailed (1 byte) + + * keyIndex (4 bytes) */ private static class FailingCountingSplitSerializer implements SimpleVersionedSerializer<FailingCountingSplit> { - private static final int VERSION = 1; + private static final int VERSION = 2; @Override public int getVersion() { @@ -546,10 +562,11 @@ public class FailingCountingSource @Override public byte[] serialize(FailingCountingSplit split) { - ByteBuffer buffer = ByteBuffer.allocate(9); + ByteBuffer buffer = ByteBuffer.allocate(13); buffer.putInt(split.getSplitIdInt()); buffer.putInt(split.getEmittedRecords()); buffer.put((byte) (split.hasFailed() ? 1 : 0)); + buffer.putInt(split.getKeyIndex()); return buffer.array(); } @@ -559,7 +576,8 @@ public class FailingCountingSource int splitId = buffer.getInt(); int emittedRecords = buffer.getInt(); boolean hasFailed = buffer.get() == 1; - return new FailingCountingSplit(splitId, emittedRecords, hasFailed); + int keyIndex = version >= 2 ? buffer.getInt() : 0; + return new FailingCountingSplit(splitId, emittedRecords, hasFailed, keyIndex); } }
