This is an automated email from the ASF dual-hosted git repository.

mittal pushed a commit to branch 4.1
in repository https://gitbox.apache.org/repos/asf/kafka.git


The following commit(s) were added to refs/heads/4.1 by this push:
     new 978672b7249 KAFKA-19436: Restrict cache update for ongoing 
batch/offset state (#20041) (#20647)
978672b7249 is described below

commit 978672b7249c902062ce5f758d32f476fb9b56fe
Author: Apoorv Mittal <[email protected]>
AuthorDate: Tue Oct 7 14:45:58 2025 +0100

    KAFKA-19436: Restrict cache update for ongoing batch/offset state (#20041) 
(#20647)
    
    Cherry-pick commit from
    https://github.com/apache/kafka/commit/96ef1c520a
    
    In the stress testing it was noticed that on acquisition lock timeout,
    some offsets were not found in the cache. The cache can be tried to be
    updated in different acknowledgement calls hence if there is an ongoing
    transition which is not yet finished but another parallel
    acknowledgement triggers the cache update then the cache can be updated
    incorrectly, while first transition is not yet finished.
    
    Though the cache update happens for Archived and Acknowldeged records
    hence this issue or existing implementation should not hamper the queues
    functionality. But it might update the cache early when persister call
    might fail or this issue triggers error logs with offset not found in
    cache when acquisition lock timeouts (in some scenarios).
    
    Reviewers: Abhinav Dixit <[email protected]>, Andrew Schofield
     <[email protected]>
---
 .../java/kafka/server/share/SharePartition.java    | 24 ++++--
 .../kafka/server/share/SharePartitionTest.java     | 97 ++++++++++++++++++++++
 2 files changed, 115 insertions(+), 6 deletions(-)

diff --git a/core/src/main/java/kafka/server/share/SharePartition.java 
b/core/src/main/java/kafka/server/share/SharePartition.java
index 09385d6c48c..eb0fedbe8cd 100644
--- a/core/src/main/java/kafka/server/share/SharePartition.java
+++ b/core/src/main/java/kafka/server/share/SharePartition.java
@@ -2178,7 +2178,8 @@ public class SharePartition {
         }
     }
 
-    private boolean canMoveStartOffset() {
+    // Visible for testing.
+    boolean canMoveStartOffset() {
         // The Share Partition Start Offset may be moved after acknowledge 
request is complete.
         // The following conditions need to be met to move the startOffset:
         // 1. When the cachedState is not empty.
@@ -2203,7 +2204,15 @@ public class SharePartition {
                 "as there is an acquirable gap at the beginning. Cannot move 
the start offset.", startOffset, groupId, topicIdPartition);
             return false;
         }
-        RecordState startOffsetState = entry.getValue().offsetState == null ?
+        boolean isBatchState = entry.getValue().offsetState() == null;
+        boolean isOngoingTransition = isBatchState ?
+            entry.getValue().batchHasOngoingStateTransition() :
+            
entry.getValue().offsetState().get(startOffset).hasOngoingStateTransition();
+        if (isOngoingTransition) {
+            return false;
+        }
+
+        RecordState startOffsetState = isBatchState ?
             entry.getValue().batchState() :
             entry.getValue().offsetState().get(startOffset).state();
         return isRecordStateAcknowledged(startOffsetState);
@@ -2238,13 +2247,13 @@ public class SharePartition {
                 }
 
                 if (inFlightBatch.offsetState() == null) {
-                    if 
(!isRecordStateAcknowledged(inFlightBatch.batchState())) {
+                    if (inFlightBatch.batchHasOngoingStateTransition() || 
!isRecordStateAcknowledged(inFlightBatch.batchState())) {
                         return lastOffsetAcknowledged;
                     }
                     lastOffsetAcknowledged = inFlightBatch.lastOffset();
                 } else {
                     for (Map.Entry<Long, InFlightState> offsetState : 
inFlightBatch.offsetState.entrySet()) {
-                        if 
(!isRecordStateAcknowledged(offsetState.getValue().state())) {
+                        if (offsetState.getValue().hasOngoingStateTransition() 
|| !isRecordStateAcknowledged(offsetState.getValue().state())) {
                             return lastOffsetAcknowledged;
                         }
                         lastOffsetAcknowledged = offsetState.getKey();
@@ -2913,7 +2922,8 @@ public class SharePartition {
             return batchState;
         }
 
-        private boolean batchHasOngoingStateTransition() {
+        // Visible for testing.
+        boolean batchHasOngoingStateTransition() {
             return inFlightState().hasOngoingStateTransition();
         }
 
@@ -3034,7 +3044,8 @@ public class SharePartition {
             acquisitionLockTimeoutTask = null;
         }
 
-        private boolean hasOngoingStateTransition() {
+        // Visible for testing.
+        boolean hasOngoingStateTransition() {
             if (rollbackState == null) {
                 // This case could occur when the batch/offset hasn't 
transitioned even once or the state transitions have
                 // been committed.
@@ -3067,6 +3078,7 @@ public class SharePartition {
                 return this;
             } catch (IllegalStateException e) {
                 log.error("Failed to update state of the records", e);
+                rollbackState = null;
                 return null;
             }
         }
diff --git a/core/src/test/java/kafka/server/share/SharePartitionTest.java 
b/core/src/test/java/kafka/server/share/SharePartitionTest.java
index 0cc7ad4d9a1..2c77e98d72a 100644
--- a/core/src/test/java/kafka/server/share/SharePartitionTest.java
+++ b/core/src/test/java/kafka/server/share/SharePartitionTest.java
@@ -6479,6 +6479,103 @@ public class SharePartitionTest {
         assertEquals(-1, lastOffsetAcknowledged);
     }
 
+    @Test
+    public void testCacheUpdateWhenBatchHasOngoingTransition() {
+        Persister persister = Mockito.mock(Persister.class);
+
+        SharePartition sharePartition = SharePartitionBuilder.builder()
+            .withState(SharePartitionState.ACTIVE)
+            .withPersister(persister)
+            .build();
+        // Acquire a single batch.
+        fetchAcquiredRecords(
+            sharePartition.acquire(MEMBER_ID, BATCH_SIZE, MAX_FETCH_RECORDS, 
21,
+                fetchPartitionData(memoryRecords(10, 21)), FETCH_ISOLATION_HWM
+            ), 10
+        );
+
+        // Validate that there is no ongoing transition.
+        
assertFalse(sharePartition.cachedState().get(21L).batchHasOngoingStateTransition());
+        // Return a future which will be completed later, so the batch state 
has ongoing transition.
+        CompletableFuture<WriteShareGroupStateResult> future = new 
CompletableFuture<>();
+        Mockito.when(persister.writeState(Mockito.any())).thenReturn(future);
+        // Acknowledge batch to create ongoing transition.
+        sharePartition.acknowledge(MEMBER_ID, List.of(new 
ShareAcknowledgementBatch(21, 30, List.of(AcknowledgeType.ACCEPT.id))));
+
+        // Assert the start offset has not moved and batch has ongoing 
transition.
+        assertEquals(21L, sharePartition.startOffset());
+        assertEquals(1, sharePartition.cachedState().size());
+        
assertTrue(sharePartition.cachedState().get(21L).batchHasOngoingStateTransition());
+
+        // Validate that offset can't be moved because batch has ongoing 
transition.
+        assertFalse(sharePartition.canMoveStartOffset());
+        assertEquals(-1, sharePartition.findLastOffsetAcknowledged());
+
+        // Complete the future so acknowledge API can be completed, which 
updates the cache.
+        WriteShareGroupStateResult writeShareGroupStateResult = 
Mockito.mock(WriteShareGroupStateResult.class);
+        
Mockito.when(writeShareGroupStateResult.topicsData()).thenReturn(List.of(
+            new TopicData<>(TOPIC_ID_PARTITION.topicId(), List.of(
+                PartitionFactory.newPartitionErrorData(0, Errors.NONE.code(), 
Errors.NONE.message())))));
+        future.complete(writeShareGroupStateResult);
+
+        // Validate the cache has been updated.
+        assertEquals(31L, sharePartition.startOffset());
+        assertTrue(sharePartition.cachedState().isEmpty());
+    }
+
+    @Test
+    public void testCacheUpdateWhenOffsetStateHasOngoingTransition() {
+        Persister persister = Mockito.mock(Persister.class);
+
+        SharePartition sharePartition = SharePartitionBuilder.builder()
+            .withState(SharePartitionState.ACTIVE)
+            .withPersister(persister)
+            .build();
+        // Acquire a single batch.
+        fetchAcquiredRecords(
+            sharePartition.acquire(MEMBER_ID, BATCH_SIZE, MAX_FETCH_RECORDS, 
21,
+                fetchPartitionData(memoryRecords(10, 21)), FETCH_ISOLATION_HWM
+            ), 10
+        );
+
+        // Validate that there is no ongoing transition.
+        
assertFalse(sharePartition.cachedState().get(21L).batchHasOngoingStateTransition());
+        assertNull(sharePartition.cachedState().get(21L).offsetState());
+        // Return a future which will be completed later, so the batch state 
has ongoing transition.
+        CompletableFuture<WriteShareGroupStateResult> future = new 
CompletableFuture<>();
+        Mockito.when(persister.writeState(Mockito.any())).thenReturn(future);
+        // Acknowledge offsets to create ongoing transition.
+        sharePartition.acknowledge(MEMBER_ID, List.of(new 
ShareAcknowledgementBatch(21, 23, List.of(AcknowledgeType.ACCEPT.id))));
+
+        // Assert the start offset has not moved and offset state is now 
maintained. Offset state should
+        // have ongoing transition.
+        assertEquals(21L, sharePartition.startOffset());
+        assertEquals(1, sharePartition.cachedState().size());
+        assertNotNull(sharePartition.cachedState().get(21L).offsetState());
+        
assertTrue(sharePartition.cachedState().get(21L).offsetState().get(21L).hasOngoingStateTransition());
+        
assertTrue(sharePartition.cachedState().get(21L).offsetState().get(22L).hasOngoingStateTransition());
+        
assertTrue(sharePartition.cachedState().get(21L).offsetState().get(23L).hasOngoingStateTransition());
+        // Only 21, 22 and 23 offsets should have ongoing state transition as 
the acknowledge request
+        // contains 21-23 offsets.
+        
assertFalse(sharePartition.cachedState().get(21L).offsetState().get(24L).hasOngoingStateTransition());
+
+        // Validate that offset can't be moved because batch has ongoing 
transition.
+        assertFalse(sharePartition.canMoveStartOffset());
+        assertEquals(-1, sharePartition.findLastOffsetAcknowledged());
+
+        // Complete the future so acknowledge API can be completed, which 
updates the cache.
+        WriteShareGroupStateResult writeShareGroupStateResult = 
Mockito.mock(WriteShareGroupStateResult.class);
+        
Mockito.when(writeShareGroupStateResult.topicsData()).thenReturn(List.of(
+            new TopicData<>(TOPIC_ID_PARTITION.topicId(), List.of(
+                PartitionFactory.newPartitionErrorData(0, Errors.NONE.code(), 
Errors.NONE.message())))));
+        future.complete(writeShareGroupStateResult);
+
+        // Validate the cache has been updated.
+        assertEquals(24L, sharePartition.startOffset());
+        assertEquals(1, sharePartition.cachedState().size());
+        assertNotNull(sharePartition.cachedState().get(21L));
+    }
+
     /**
      * Test the case where the fetch batch has first record offset greater 
than the record batch start offset.
      * Such batches can exist for compacted topics.

Reply via email to