This is an automated email from the ASF dual-hosted git repository.
schofielaj pushed a commit to branch trunk
in repository https://gitbox.apache.org/repos/asf/kafka.git
The following commit(s) were added to refs/heads/trunk by this push:
new 6eebb863a03 MINOR: Fix off-by-one in ShareCoordinatorShard snapshot
update count (#21426)
6eebb863a03 is described below
commit 6eebb863a0335615b0dd491c8875e79a062d4756
Author: majialong <[email protected]>
AuthorDate: Wed Mar 4 23:29:21 2026 +0800
MINOR: Fix off-by-one in ShareCoordinatorShard snapshot update count
(#21426)
Fix off-by-one in `ShareCoordinatorShard#handleShareUpdate` where the
first update replay initialized `snapshotUpdateCount` to 0 instead of 1,
causing limit + 1 update records before a snapshot instead of the
configured limit.
Reviewers: Sushant Mahajan <[email protected]>, Andrew Schofield
<[email protected]>
---
.../coordinator/share/ShareCoordinatorShard.java | 2 +-
.../share/ShareCoordinatorShardTest.java | 40 ++++++++++++++++++++++
2 files changed, 41 insertions(+), 1 deletion(-)
diff --git
a/share-coordinator/src/main/java/org/apache/kafka/coordinator/share/ShareCoordinatorShard.java
b/share-coordinator/src/main/java/org/apache/kafka/coordinator/share/ShareCoordinatorShard.java
index 970cf561498..9eb4ea3f0da 100644
---
a/share-coordinator/src/main/java/org/apache/kafka/coordinator/share/ShareCoordinatorShard.java
+++
b/share-coordinator/src/main/java/org/apache/kafka/coordinator/share/ShareCoordinatorShard.java
@@ -279,7 +279,7 @@ public class ShareCoordinatorShard implements
CoordinatorShard<CoordinatorRecord
// This is an incremental snapshot,
// so we need to apply it to our current soft state.
shareStateMap.compute(mapKey, (k, v) -> v == null ? offsetRecord :
merge(v, value));
- snapshotUpdateCount.compute(mapKey, (k, v) -> v == null ? 0 : v + 1);
+ snapshotUpdateCount.compute(mapKey, (k, v) -> v == null ? 1 : v + 1);
}
private void maybeUpdateLeaderEpochMap(SharePartitionKey mapKey, int
leaderEpoch) {
diff --git
a/share-coordinator/src/test/java/org/apache/kafka/coordinator/share/ShareCoordinatorShardTest.java
b/share-coordinator/src/test/java/org/apache/kafka/coordinator/share/ShareCoordinatorShardTest.java
index 37da082007c..ecdbb991554 100644
---
a/share-coordinator/src/test/java/org/apache/kafka/coordinator/share/ShareCoordinatorShardTest.java
+++
b/share-coordinator/src/test/java/org/apache/kafka/coordinator/share/ShareCoordinatorShardTest.java
@@ -70,6 +70,7 @@ import java.util.Set;
import static org.junit.jupiter.api.Assertions.assertEquals;
import static org.junit.jupiter.api.Assertions.assertFalse;
+import static org.junit.jupiter.api.Assertions.assertInstanceOf;
import static org.junit.jupiter.api.Assertions.assertNotEquals;
import static org.junit.jupiter.api.Assertions.assertNotNull;
import static org.junit.jupiter.api.Assertions.assertNull;
@@ -881,6 +882,45 @@ class ShareCoordinatorShardTest {
verify(shard.getMetricsShard(),
times(5)).record(ShareCoordinatorMetrics.SHARE_COORDINATOR_WRITE_SENSOR_NAME);
}
+ @Test
+ public void testSnapshotUpdateCountBoundary() {
+ shard = new ShareCoordinatorShardBuilder()
+
.setConfigOverrides(Map.of(ShareCoordinatorConfig.SNAPSHOT_UPDATE_RECORDS_PER_SNAPSHOT_CONFIG,
"2"))
+ .build();
+
+ initSharePartition(shard, SHARE_PARTITION_KEY);
+
+ WriteShareGroupStateRequestData request = new
WriteShareGroupStateRequestData()
+ .setGroupId(GROUP_ID)
+ .setTopics(List.of(new
WriteShareGroupStateRequestData.WriteStateData()
+ .setTopicId(TOPIC_ID)
+ .setPartitions(List.of(new
WriteShareGroupStateRequestData.PartitionData()
+ .setPartition(PARTITION)
+ .setStartOffset(0)
+ .setDeliveryCompleteCount(0)
+ .setStateEpoch(0)
+ .setLeaderEpoch(0)
+ .setStateBatches(List.of(new
WriteShareGroupStateRequestData.StateBatch()
+ .setFirstOffset(0)
+ .setLastOffset(10)
+ .setDeliveryCount((short) 1)
+ .setDeliveryState((byte) 0)))))));
+
+ // Write 1: update count 0 < limit 2, should produce update record.
+ CoordinatorResult<WriteShareGroupStateResponseData, CoordinatorRecord>
result = shard.writeState(request);
+ assertInstanceOf(ShareUpdateKey.class, result.records().get(0).key());
+ shard.replay(0L, 0L, (short) 0, result.records().get(0));
+
+ // Write 2: update count 1 < limit 2, should produce update record.
+ result = shard.writeState(request);
+ assertInstanceOf(ShareUpdateKey.class, result.records().get(0).key());
+ shard.replay(0L, 0L, (short) 0, result.records().get(0));
+
+ // Write 3: update count 2 >= limit 2, should produce snapshot record.
+ result = shard.writeState(request);
+ assertInstanceOf(ShareSnapshotKey.class,
result.records().get(0).key());
+ }
+
@Test
public void testLastRedundantOffset() {
ShareCoordinatorOffsetsManager manager =
mock(ShareCoordinatorOffsetsManager.class);