This is an automated email from the ASF dual-hosted git repository.

schofielaj pushed a commit to branch trunk
in repository https://gitbox.apache.org/repos/asf/kafka.git


The following commit(s) were added to refs/heads/trunk by this push:
     new 6eebb863a03 MINOR: Fix off-by-one in ShareCoordinatorShard snapshot 
update count (#21426)
6eebb863a03 is described below

commit 6eebb863a0335615b0dd491c8875e79a062d4756
Author: majialong <[email protected]>
AuthorDate: Wed Mar 4 23:29:21 2026 +0800

    MINOR: Fix off-by-one in ShareCoordinatorShard snapshot update count 
(#21426)
    
    Fix off-by-one in `ShareCoordinatorShard#handleShareUpdate` where the
    first update replay initialized `snapshotUpdateCount` to 0 instead of 1,
    causing limit + 1 update records before a snapshot instead of the
    configured limit.
    
    Reviewers: Sushant Mahajan <[email protected]>, Andrew Schofield
    <[email protected]>
---
 .../coordinator/share/ShareCoordinatorShard.java   |  2 +-
 .../share/ShareCoordinatorShardTest.java           | 40 ++++++++++++++++++++++
 2 files changed, 41 insertions(+), 1 deletion(-)

diff --git 
a/share-coordinator/src/main/java/org/apache/kafka/coordinator/share/ShareCoordinatorShard.java
 
b/share-coordinator/src/main/java/org/apache/kafka/coordinator/share/ShareCoordinatorShard.java
index 970cf561498..9eb4ea3f0da 100644
--- 
a/share-coordinator/src/main/java/org/apache/kafka/coordinator/share/ShareCoordinatorShard.java
+++ 
b/share-coordinator/src/main/java/org/apache/kafka/coordinator/share/ShareCoordinatorShard.java
@@ -279,7 +279,7 @@ public class ShareCoordinatorShard implements 
CoordinatorShard<CoordinatorRecord
         // This is an incremental snapshot,
         // so we need to apply it to our current soft state.
         shareStateMap.compute(mapKey, (k, v) -> v == null ? offsetRecord : 
merge(v, value));
-        snapshotUpdateCount.compute(mapKey, (k, v) -> v == null ? 0 : v + 1);
+        snapshotUpdateCount.compute(mapKey, (k, v) -> v == null ? 1 : v + 1);
     }
 
     private void maybeUpdateLeaderEpochMap(SharePartitionKey mapKey, int 
leaderEpoch) {
diff --git 
a/share-coordinator/src/test/java/org/apache/kafka/coordinator/share/ShareCoordinatorShardTest.java
 
b/share-coordinator/src/test/java/org/apache/kafka/coordinator/share/ShareCoordinatorShardTest.java
index 37da082007c..ecdbb991554 100644
--- 
a/share-coordinator/src/test/java/org/apache/kafka/coordinator/share/ShareCoordinatorShardTest.java
+++ 
b/share-coordinator/src/test/java/org/apache/kafka/coordinator/share/ShareCoordinatorShardTest.java
@@ -70,6 +70,7 @@ import java.util.Set;
 
 import static org.junit.jupiter.api.Assertions.assertEquals;
 import static org.junit.jupiter.api.Assertions.assertFalse;
+import static org.junit.jupiter.api.Assertions.assertInstanceOf;
 import static org.junit.jupiter.api.Assertions.assertNotEquals;
 import static org.junit.jupiter.api.Assertions.assertNotNull;
 import static org.junit.jupiter.api.Assertions.assertNull;
@@ -881,6 +882,45 @@ class ShareCoordinatorShardTest {
         verify(shard.getMetricsShard(), 
times(5)).record(ShareCoordinatorMetrics.SHARE_COORDINATOR_WRITE_SENSOR_NAME);
     }
 
+    @Test
+    public void testSnapshotUpdateCountBoundary() {
+        shard = new ShareCoordinatorShardBuilder()
+            
.setConfigOverrides(Map.of(ShareCoordinatorConfig.SNAPSHOT_UPDATE_RECORDS_PER_SNAPSHOT_CONFIG,
 "2"))
+            .build();
+
+        initSharePartition(shard, SHARE_PARTITION_KEY);
+
+        WriteShareGroupStateRequestData request = new 
WriteShareGroupStateRequestData()
+            .setGroupId(GROUP_ID)
+            .setTopics(List.of(new 
WriteShareGroupStateRequestData.WriteStateData()
+                .setTopicId(TOPIC_ID)
+                .setPartitions(List.of(new 
WriteShareGroupStateRequestData.PartitionData()
+                    .setPartition(PARTITION)
+                    .setStartOffset(0)
+                    .setDeliveryCompleteCount(0)
+                    .setStateEpoch(0)
+                    .setLeaderEpoch(0)
+                    .setStateBatches(List.of(new 
WriteShareGroupStateRequestData.StateBatch()
+                        .setFirstOffset(0)
+                        .setLastOffset(10)
+                        .setDeliveryCount((short) 1)
+                        .setDeliveryState((byte) 0)))))));
+
+        // Write 1: update count 0 < limit 2, should produce update record.
+        CoordinatorResult<WriteShareGroupStateResponseData, CoordinatorRecord> 
result = shard.writeState(request);
+        assertInstanceOf(ShareUpdateKey.class, result.records().get(0).key());
+        shard.replay(0L, 0L, (short) 0, result.records().get(0));
+
+        // Write 2: update count 1 < limit 2, should produce update record.
+        result = shard.writeState(request);
+        assertInstanceOf(ShareUpdateKey.class, result.records().get(0).key());
+        shard.replay(0L, 0L, (short) 0, result.records().get(0));
+
+        // Write 3: update count 2 >= limit 2, should produce snapshot record.
+        result = shard.writeState(request);
+        assertInstanceOf(ShareSnapshotKey.class, 
result.records().get(0).key());
+    }
+
     @Test
     public void testLastRedundantOffset() {
         ShareCoordinatorOffsetsManager manager = 
mock(ShareCoordinatorOffsetsManager.class);

Reply via email to