This is an automated email from the ASF dual-hosted git repository.

mittal pushed a commit to branch trunk
in repository https://gitbox.apache.org/repos/asf/kafka.git


The following commit(s) were added to refs/heads/trunk by this push:
     new 7b017faddd3 MINOR: Correcting the throttling condition for batch in 
Share Partition (#20978)
7b017faddd3 is described below

commit 7b017faddd366aea3f8a8196818235f8821e26d4
Author: Apoorv Mittal <[email protected]>
AuthorDate: Mon Nov 24 21:52:39 2025 +0000

    MINOR: Correcting the throttling condition for batch in Share Partition 
(#20978)
    
    The PR adds a check for throttling condition to not consider ongoing
    state transition for batch and offsets. Tests has been added and
    corrected which verifies the behaviour.
    
    Reviewers: Andrew Schofield <[email protected]>
---
 .../java/kafka/server/share/SharePartition.java    |  14 ++-
 .../kafka/server/share/SharePartitionTest.java     | 117 ++++++++++++++++++++-
 2 files changed, 125 insertions(+), 6 deletions(-)

diff --git a/core/src/main/java/kafka/server/share/SharePartition.java 
b/core/src/main/java/kafka/server/share/SharePartition.java
index 78ab7c29897..ba3a142cf0c 100644
--- a/core/src/main/java/kafka/server/share/SharePartition.java
+++ b/core/src/main/java/kafka/server/share/SharePartition.java
@@ -2027,7 +2027,14 @@ public class SharePartition {
      */
     private boolean shouldThrottleRecordsDelivery(InFlightBatch inFlightBatch, 
long requestFirstOffset, long requestLastOffset) {
         if (inFlightBatch.offsetState() == null) {
-            return inFlightBatch.batchDeliveryCount() >= 
throttleRecordsDeliveryLimit;
+            // If offsetState is null, it means the batch is not split and 
represents a single batch.
+            // Check if the batch is in AVAILABLE state and has no ongoing 
transition.
+            // The requested batch shall always be within the request first 
and last offset as the sub
+            // map batches are only fetched to consider.
+            if (inFlightBatch.batchState() == RecordState.AVAILABLE && 
!inFlightBatch.batchHasOngoingStateTransition()) {
+                return inFlightBatch.batchDeliveryCount() >= 
throttleRecordsDeliveryLimit;
+            }
+            return false;
         }
 
         return inFlightBatch.offsetState().entrySet().stream().filter(entry -> 
{
@@ -2037,10 +2044,7 @@ public class SharePartition {
             if (entry.getKey() > requestLastOffset) {
                 return false;
             }
-            if (entry.getValue().state() != RecordState.AVAILABLE) {
-                return false;
-            }
-            return true;
+            return entry.getValue().state() == RecordState.AVAILABLE && 
!entry.getValue().hasOngoingStateTransition();
         }).mapToInt(entry -> entry.getValue().deliveryCount()).max().orElse(0) 
>= throttleRecordsDeliveryLimit;
     }
 
diff --git a/core/src/test/java/kafka/server/share/SharePartitionTest.java 
b/core/src/test/java/kafka/server/share/SharePartitionTest.java
index 73c0242b81f..8e8240377e6 100644
--- a/core/src/test/java/kafka/server/share/SharePartitionTest.java
+++ b/core/src/test/java/kafka/server/share/SharePartitionTest.java
@@ -11221,7 +11221,7 @@ public class SharePartitionTest {
                 PartitionFactory.newPartitionAllData(0, 3, 5L, 
Errors.NONE.code(), Errors.NONE.message(),
                     List.of(
                         new PersisterStateBatch(15L, 19L, 
RecordState.AVAILABLE.id, (short) 2),
-                        new PersisterStateBatch(20L, 22L, 
RecordState.ARCHIVED.id, (short) 2),
+                        new PersisterStateBatch(20L, 22L, 
RecordState.ARCHIVED.id, (short) 3),
                         new PersisterStateBatch(26L, 30L, 
RecordState.AVAILABLE.id, (short) 3)))))));
         
Mockito.when(persister.readState(Mockito.any())).thenReturn(CompletableFuture.completedFuture(readShareGroupStateResult));
         SharePartition sharePartition = 
SharePartitionBuilder.builder().withPersister(persister).build();
@@ -11610,6 +11610,121 @@ public class SharePartitionTest {
         assertArrayEquals(expectedAcquiredRecords.toArray(), 
acquiredRecordsList.toArray());
     }
 
+    @Test
+    public void testAcquisitionThrottlingWithOngoingStateTransition() {
+        Persister persister = Mockito.mock(Persister.class);
+        ReadShareGroupStateResult readShareGroupStateResult = 
Mockito.mock(ReadShareGroupStateResult.class);
+        
Mockito.when(readShareGroupStateResult.topicsData()).thenReturn(List.of(
+            new TopicData<>(TOPIC_ID_PARTITION.topicId(), List.of(
+                PartitionFactory.newPartitionAllData(0, 3, 15L, 
Errors.NONE.code(), Errors.NONE.message(),
+                    List.of(
+                        new PersisterStateBatch(15L, 19L, 
RecordState.AVAILABLE.id, (short) 1),
+                        // Batch of 20-24 has been set to delivery count of 2 
so in next acquisition it will be 3,
+                        // and post that it should be throttled but because of 
pending state transition it
+                        // should not be throttled.
+                        new PersisterStateBatch(20L, 24L, 
RecordState.AVAILABLE.id, (short) 2),
+                        new PersisterStateBatch(25L, 29L, 
RecordState.AVAILABLE.id, (short) 2),
+                        new PersisterStateBatch(30L, 34L, 
RecordState.AVAILABLE.id, (short) 2),
+                        // Similarly, batch of 35-39 has been set to delivery 
count of 2 so in next offset
+                        // acquisition, some offsets will be at 3 delivery 
count, and post that offsets
+                        // should be throttled but because of pending state 
transition they will not be throttled.
+                        new PersisterStateBatch(35, 39L, 
RecordState.AVAILABLE.id, (short) 2),
+                        new PersisterStateBatch(40, 44L, 
RecordState.ARCHIVED.id, (short) 5),
+                        new PersisterStateBatch(45, 49L, 
RecordState.AVAILABLE.id, (short) 1)))))));
+        
Mockito.when(persister.readState(Mockito.any())).thenReturn(CompletableFuture.completedFuture(readShareGroupStateResult));
+        SharePartition sharePartition = 
SharePartitionBuilder.builder().withPersister(persister).build();
+
+        CompletableFuture<Void> result = sharePartition.maybeInitialize();
+        assertTrue(result.isDone());
+        assertFalse(result.isCompletedExceptionally());
+
+        // Acquire batches 20-24 and 36-37 (offset based) and create a pending 
state transition.
+        fetchAcquiredRecords(sharePartition, memoryRecords(20, 5), 5);
+        assertEquals(RecordState.ACQUIRED, 
sharePartition.cachedState().get(20L).batchState());
+        fetchAcquiredRecords(sharePartition, memoryRecords(36, 2), 2);
+        assertEquals(RecordState.AVAILABLE, 
sharePartition.cachedState().get(35L).offsetState().get(35L).state());
+        assertEquals(RecordState.ACQUIRED, 
sharePartition.cachedState().get(35L).offsetState().get(36L).state());
+        assertEquals(RecordState.ACQUIRED, 
sharePartition.cachedState().get(35L).offsetState().get(37L).state());
+        assertEquals(RecordState.AVAILABLE, 
sharePartition.cachedState().get(35L).offsetState().get(38L).state());
+
+        // Create a pending future which will block state updates.
+        CompletableFuture<WriteShareGroupStateResult> future = new 
CompletableFuture<>();
+        Mockito.when(persister.writeState(Mockito.any())).thenReturn(future);
+
+        // Release batch of 20-24 and offset 36-37, which will have pending 
state transition.
+        sharePartition.acknowledge(
+            MEMBER_ID,
+            List.of(new ShareAcknowledgementBatch(20, 24, 
List.of(AcknowledgeType.RELEASE.id)),
+                    new ShareAcknowledgementBatch(36, 37, 
List.of(AcknowledgeType.RELEASE.id))));
+
+        assertEquals(RecordState.AVAILABLE, 
sharePartition.cachedState().get(20L).batchState());
+        assertEquals(RecordState.AVAILABLE, 
sharePartition.cachedState().get(35L).offsetState().get(36L).state());
+        assertEquals(RecordState.AVAILABLE, 
sharePartition.cachedState().get(35L).offsetState().get(37L).state());
+
+        
assertTrue(sharePartition.cachedState().get(20L).batchHasOngoingStateTransition());
+        
assertFalse(sharePartition.cachedState().get(35L).offsetState().get(35L).hasOngoingStateTransition());
+        
assertTrue(sharePartition.cachedState().get(35L).offsetState().get(36L).hasOngoingStateTransition());
+        
assertTrue(sharePartition.cachedState().get(35L).offsetState().get(37L).hasOngoingStateTransition());
+        
assertFalse(sharePartition.cachedState().get(35L).offsetState().get(38L).hasOngoingStateTransition());
+
+        ByteBuffer buffer = ByteBuffer.allocate(4096);
+        memoryRecordsBuilder(buffer, 15, 5).close();
+        memoryRecordsBuilder(buffer, 20, 5).close();
+        memoryRecordsBuilder(buffer, 25, 5).close();
+        memoryRecordsBuilder(buffer, 30, 5).close();
+        memoryRecordsBuilder(buffer, 35, 5).close();
+        memoryRecordsBuilder(buffer, 40, 5).close();
+        memoryRecordsBuilder(buffer, 45, 5).close();
+        buffer.flip();
+        MemoryRecords records = MemoryRecords.readableRecords(buffer);
+
+        // Acquire batches and batch 15-19, 25-29 will be acquired as batch 
20-24 has pending state transition.
+        // Without pending transition, the acquisition would have happened 
only for 20-24 batch as the batch
+        // 20-24 would have marked to be throttled but eventually couldn't be 
acquired because of state transition.
+        fetchAcquiredRecords(sharePartition.acquire(
+                MEMBER_ID,
+                ShareAcquireMode.BATCH_OPTIMIZED,
+                BATCH_SIZE,
+                10,
+                15,
+                fetchPartitionData(records),
+                FETCH_ISOLATION_HWM),
+            10);
+
+        assertEquals(7, sharePartition.cachedState().size());
+        assertEquals(RecordState.ACQUIRED, 
sharePartition.cachedState().get(15L).batchState());
+        assertEquals(RecordState.AVAILABLE, 
sharePartition.cachedState().get(20L).batchState());
+        
assertTrue(sharePartition.cachedState().get(20L).batchHasOngoingStateTransition());
+        assertEquals(RecordState.ACQUIRED, 
sharePartition.cachedState().get(25L).batchState());
+        assertEquals(RecordState.AVAILABLE, 
sharePartition.cachedState().get(30L).batchState());
+        assertEquals(RecordState.AVAILABLE, 
sharePartition.cachedState().get(35L).offsetState().get(35L).state());
+        assertEquals(RecordState.AVAILABLE, 
sharePartition.cachedState().get(35L).offsetState().get(36L).state());
+        assertEquals(RecordState.AVAILABLE, 
sharePartition.cachedState().get(35L).offsetState().get(37L).state());
+        assertEquals(RecordState.AVAILABLE, 
sharePartition.cachedState().get(35L).offsetState().get(38L).state());
+        assertEquals(RecordState.ARCHIVED, 
sharePartition.cachedState().get(40L).batchState());
+        assertEquals(RecordState.AVAILABLE, 
sharePartition.cachedState().get(45L).batchState());
+
+        // Re-trigger the acquisition and rest all the records will be 
acquired, including the offsets
+        // ones. The throttling should not happen because of pending state 
transition.
+        List<AcquiredRecords> acquiredRecordsList = 
fetchAcquiredRecords(sharePartition.acquire(
+                MEMBER_ID,
+                ShareAcquireMode.BATCH_OPTIMIZED,
+                BATCH_SIZE,
+                500,
+                15,
+                fetchPartitionData(records),
+                FETCH_ISOLATION_HWM),
+            13);
+
+        List<AcquiredRecords> expectedAcquiredRecords = new 
ArrayList<>(expectedAcquiredRecord(30, 34, 3));
+        expectedAcquiredRecords.addAll(expectedAcquiredRecord(35, 35, 3));
+        expectedAcquiredRecords.addAll(expectedAcquiredRecord(38, 38, 3));
+        expectedAcquiredRecords.addAll(expectedAcquiredRecord(39, 39, 3));
+        expectedAcquiredRecords.addAll(expectedAcquiredRecord(45, 49, 2));
+
+        assertArrayEquals(expectedAcquiredRecords.toArray(), 
acquiredRecordsList.toArray());
+    }
+
     /**
      * This function produces transactional data of a given no. of records 
followed by a transactional marker (COMMIT/ABORT).
      */

Reply via email to