This is an automated email from the ASF dual-hosted git repository.

mittal pushed a commit to branch trunk
in repository https://gitbox.apache.org/repos/asf/kafka.git


The following commit(s) were added to refs/heads/trunk by this push:
     new bcd3191c792 KAFKA-17541:[2/2] Improve handling of delivery count 
(#20837)
bcd3191c792 is described below

commit bcd3191c792b90e1d17a0c3094ef7fe6b8655c53
Author: Lan Ding <[email protected]>
AuthorDate: Fri Nov 21 19:31:48 2025 +0800

    KAFKA-17541:[2/2] Improve handling of delivery count (#20837)
    
    For records with a delivery count exceeding 2, there is suspicion that
    delivery failures   stem from underlying issues rather than natural
    retry scenarios. The batching of such   records should be reduced.
    
    Solution:  Determining which offset is bad is not possible at broker's
    end. But broker can restrict the acquired records to a subset so only
    bad record is skipped. We can do the following:
    
    - If delivery count of a batch is >= 3 then only acquire 1/2 of the
    batch records i.e for a batch of 0-499 (500 records) if batch delivery
    count is 3 then start offset tracking and acquire 0-249 (250 records)
    - If delivery count is again bumped then keeping acquring 1/2 of
    previously acquired offsets until last delivery attempt i.e. 0-124 (125
    records)
    - For last delivery attempt, acquire only 1 offset. Then only the bad
    record will be skipped.
    
    Reviewers: Apoorv Mittal <[email protected]>, Andrew Schofield
     <[email protected]>, Abhinav Dixit <[email protected]>
    
    ---------
    
    Co-authored-by: d00791190 <[email protected]>
---
 .../java/kafka/server/share/SharePartition.java    |  90 ++++-
 .../kafka/server/share/SharePartitionTest.java     | 443 ++++++++++++++++++++-
 2 files changed, 509 insertions(+), 24 deletions(-)

diff --git a/core/src/main/java/kafka/server/share/SharePartition.java 
b/core/src/main/java/kafka/server/share/SharePartition.java
index 73b6a863aa3..cb268e81e93 100644
--- a/core/src/main/java/kafka/server/share/SharePartition.java
+++ b/core/src/main/java/kafka/server/share/SharePartition.java
@@ -106,6 +106,11 @@ public class SharePartition {
 
     private static final Logger log = 
LoggerFactory.getLogger(SharePartition.class);
 
+    /**
+     * Minimum number of records to deliver when throttling
+     */
+    private static final int MINIMUM_THROTTLE_RECORDS_DELIVERY_LIMIT = 2;
+
     /**
      * empty member id used to indicate when a record is not acquired by any 
member.
      */
@@ -201,6 +206,11 @@ public class SharePartition {
      */
     private final int maxDeliveryCount;
 
+    /**
+     * Records whose delivery count exceeds this are deemed abnormal and the 
batching of these records
+     * should be reduced. The limit is set to half of maxDeliveryCount rounded 
up, with a minimum of 2.
+     */
+    private final int throttleRecordsDeliveryLimit;
     /**
      * The group config manager is used to retrieve the values for dynamic 
group configurations
      */
@@ -362,6 +372,7 @@ public class SharePartition {
         this.leaderEpoch = leaderEpoch;
         this.maxInFlightRecords = maxInFlightRecords;
         this.maxDeliveryCount = maxDeliveryCount;
+        this.throttleRecordsDeliveryLimit = 
Math.max(MINIMUM_THROTTLE_RECORDS_DELIVERY_LIMIT, (int) Math.ceil((double) 
maxDeliveryCount / 2));
         this.cachedState = new ConcurrentSkipListMap<>();
         this.lock = new ReentrantReadWriteLock();
         this.findNextFetchOffset = false;
@@ -834,7 +845,16 @@ public class SharePartition {
                 boolean fullMatch = checkForFullMatch(inFlightBatch, 
firstBatch.baseOffset(), lastOffsetToAcquire);
                 int numRecordsRemaining = maxRecordsToAcquire - acquiredCount;
                 boolean recordLimitSubsetMatch = isRecordLimitMode && 
checkForRecordLimitSubsetMatch(inFlightBatch, maxRecordsToAcquire, 
acquiredCount);
-                if (!fullMatch || inFlightBatch.offsetState() != null || 
recordLimitSubsetMatch) {
+                boolean throttleRecordsDelivery = 
shouldThrottleRecordsDelivery(inFlightBatch, firstBatch.baseOffset(), 
lastOffsetToAcquire);
+                // Stop acquiring more records if records delivery has to be 
throttled. The throttling prevents
+                // complete batch to be archived in case of a single record 
being corrupt.
+                // Below check isolates the current batch/offsets to be 
delivered individually in subsequent fetches.
+                if (throttleRecordsDelivery && acquiredCount > 0) {
+                    // Set the max records to acquire as 0 to prevent further 
acquisition of records.
+                    maxRecordsToAcquire = 0;
+                    break;
+                }
+                if (!fullMatch || inFlightBatch.offsetState() != null || 
recordLimitSubsetMatch || throttleRecordsDelivery) {
                     log.trace("Subset or offset tracked batch record found for 
share partition,"
                             + " batch: {} request offsets - first: {}, last: 
{} for the share"
                             + " partition: {}-{}", inFlightBatch, 
firstBatch.baseOffset(),
@@ -859,7 +879,14 @@ public class SharePartition {
                     // maxRecordsToAcquire. Hence, pass the remaining number 
of records that can
                     // be acquired.
                     int acquiredSubsetCount = 
acquireSubsetBatchRecords(memberId, isRecordLimitMode, numRecordsRemaining, 
firstBatch.baseOffset(), lastOffsetToAcquire, inFlightBatch, result);
+
                     acquiredCount += acquiredSubsetCount;
+                    // If records are throttled, return immediately and set 
`maxRecordsToAcquire = 0`
+                    // to prevent acquiring any new records afterwards.
+                    if (throttleRecordsDelivery && acquiredSubsetCount > 0) {
+                        maxRecordsToAcquire = 0;
+                        break;
+                    }
                     continue;
                 }
 
@@ -1866,6 +1893,8 @@ public class SharePartition {
     ) {
         lock.writeLock().lock();
         int acquiredCount = 0;
+        long maxFetchRecordsWhileThrottledRecords = -1;
+        boolean hasThrottledRecord = false;
         try {
             for (Map.Entry<Long, InFlightState> offsetState : 
inFlightBatch.offsetState().entrySet()) {
                 // For the first batch which might have offsets prior to the 
request base
@@ -1885,7 +1914,29 @@ public class SharePartition {
                     continue;
                 }
 
-                InFlightState updateResult =  
offsetState.getValue().tryUpdateState(RecordState.ACQUIRED, 
DeliveryCountOps.INCREASE,
+                int recordDeliveryCount = 
offsetState.getValue().deliveryCount();
+                // If the record is on last delivery attempt then isolate that 
record to be delivered alone.
+                // If the respective record is corrupt then it prevents 
increasing delivery count of multiple
+                // records in a single response batch. Condition below checks 
if the current record has reached
+                // the delivery limit and already have some records to return 
in response then skip processing
+                // the current record, which shall be delivered alone in next 
fetch.
+                if (maxDeliveryCount > 2 && recordDeliveryCount == 
maxDeliveryCount - 1 && acquiredCount > 0) {
+                    break;
+                }
+
+                // When record delivery count reach the throttle threshold, 
progressively reduce batch size to isolate records.
+                // The `maxFetchRecordsWhileThrottledRecords` is halved with 
each additional delivery attempt beyond the throttle limit.
+                // Example:
+                //   - maxDeliveryCount = 6, throttleRecordsDeliveryLimit = 3, 
batch size = 500
+                //   - deliveryCount = 3: maxFetchRecords = 500 >> (3 - 3 + 1) 
= 250
+                //   - deliveryCount = 4: maxFetchRecords = 500 >> (4 - 3 + 1) 
= 125
+                // The `maxFetchRecordsWhileThrottledRecords` is calculated 
based on the first acquirable record that meets the throttling criteria in the 
batch.
+                if (recordDeliveryCount >= throttleRecordsDeliveryLimit && 
maxFetchRecordsWhileThrottledRecords < 0) {
+                    maxFetchRecordsWhileThrottledRecords = Math.max(1, (long) 
inFlightBatch.offsetState().size() >> (recordDeliveryCount - 
throttleRecordsDeliveryLimit + 1));
+                    hasThrottledRecord = true;
+                }
+
+                InFlightState updateResult = 
offsetState.getValue().tryUpdateState(RecordState.ACQUIRED, 
DeliveryCountOps.INCREASE,
                     maxDeliveryCount, memberId);
                 if (updateResult == null || updateResult.state() != 
RecordState.ACQUIRED) {
                     log.trace("Unable to acquire records for the offset: {} in 
batch: {}"
@@ -1904,10 +1955,18 @@ public class SharePartition {
                     .setLastOffset(offsetState.getKey())
                     .setDeliveryCount((short) 
offsetState.getValue().deliveryCount()));
                 acquiredCount++;
+
+                // Delivered alone.
+                if (offsetState.getValue().deliveryCount() == maxDeliveryCount 
&& maxDeliveryCount > 2) {
+                    break;
+                }
                 if (isRecordLimitMode && acquiredCount == maxFetchRecords) {
                     // In record_limit mode, acquire only the requested number 
of records.
                     break;
                 }
+                if (hasThrottledRecord && acquiredCount == 
maxFetchRecordsWhileThrottledRecords) {
+                    break;
+                }
             }
         } finally {
             lock.writeLock().unlock();
@@ -1942,6 +2001,33 @@ public class SharePartition {
         return batchFirstOffset < localStartOffset && batchLastOffset >= 
localStartOffset;
     }
 
+    /**
+     * Check if the in-flight batch should be throttled based on delivery 
count.
+     *
+     * @param inFlightBatch       The in-flight batch to check for throttling.
+     * @param requestFirstOffset  The first offset to acquire.
+     * @param requestLastOffset   THe last offset to acquire.
+     * @return True if the batch should be throttled (delivery count >= 
threshold), false otherwise.
+     */
+    private boolean shouldThrottleRecordsDelivery(InFlightBatch inFlightBatch, 
long requestFirstOffset, long requestLastOffset) {
+        if (inFlightBatch.offsetState() == null) {
+            return inFlightBatch.batchDeliveryCount() >= 
throttleRecordsDeliveryLimit;
+        }
+
+        return inFlightBatch.offsetState().entrySet().stream().filter(entry -> 
{
+            if (entry.getKey() < requestFirstOffset) {
+                return false;
+            }
+            if (entry.getKey() > requestLastOffset) {
+                return false;
+            }
+            if (entry.getValue().state() != RecordState.AVAILABLE) {
+                return false;
+            }
+            return true;
+        }).mapToInt(entry -> entry.getValue().deliveryCount()).max().orElse(0) 
>= throttleRecordsDeliveryLimit;
+    }
+
     // Visibility for test
     static Map<Long, Byte> fetchAckTypeMapForBatch(ShareAcknowledgementBatch 
batch) {
         // Client can either send a single entry in acknowledgeTypes which 
represents the state
diff --git a/core/src/test/java/kafka/server/share/SharePartitionTest.java 
b/core/src/test/java/kafka/server/share/SharePartitionTest.java
index 047fffa5171..c8be18c027e 100644
--- a/core/src/test/java/kafka/server/share/SharePartitionTest.java
+++ b/core/src/test/java/kafka/server/share/SharePartitionTest.java
@@ -96,6 +96,7 @@ import java.util.concurrent.CompletableFuture;
 import java.util.concurrent.ExecutorService;
 import java.util.concurrent.Executors;
 import java.util.concurrent.TimeUnit;
+import java.util.stream.IntStream;
 
 import static kafka.server.share.SharePartition.EMPTY_MEMBER_ID;
 import static 
org.apache.kafka.server.share.fetch.ShareFetchTestUtils.memoryRecordsBuilder;
@@ -1118,7 +1119,7 @@ public class SharePartitionTest {
                     List.of(
                         new PersisterStateBatch(15L, 18L, 
RecordState.AVAILABLE.id, (short) 2),
                         new PersisterStateBatch(20L, 22L, 
RecordState.ARCHIVED.id, (short) 2),
-                        new PersisterStateBatch(26L, 30L, 
RecordState.AVAILABLE.id, (short) 3)))))));
+                        new PersisterStateBatch(26L, 30L, 
RecordState.AVAILABLE.id, (short) 1)))))));
         
Mockito.when(persister.readState(Mockito.any())).thenReturn(CompletableFuture.completedFuture(readShareGroupStateResult));
         SharePartition sharePartition = 
SharePartitionBuilder.builder().withPersister(persister).build();
 
@@ -1186,7 +1187,7 @@ public class SharePartitionTest {
         List<AcquiredRecords> expectedAcquiredRecords = new 
ArrayList<>(expectedAcquiredRecord(15, 18, 3));
         expectedAcquiredRecords.addAll(expectedAcquiredRecord(19, 19, 1));
         expectedAcquiredRecords.addAll(expectedAcquiredRecord(23, 25, 1));
-        expectedAcquiredRecords.addAll(expectedAcquiredRecord(26, 30, 4));
+        expectedAcquiredRecords.addAll(expectedAcquiredRecord(26, 30, 2));
 
         assertArrayEquals(expectedAcquiredRecords.toArray(), 
acquiredRecordsList.toArray());
         assertEquals(31, sharePartition.nextFetchOffset());
@@ -1241,7 +1242,7 @@ public class SharePartitionTest {
                     List.of(
                         new PersisterStateBatch(15L, 18L, 
RecordState.AVAILABLE.id, (short) 2),
                         new PersisterStateBatch(20L, 22L, 
RecordState.ARCHIVED.id, (short) 2),
-                        new PersisterStateBatch(26L, 30L, 
RecordState.AVAILABLE.id, (short) 3)))))));
+                        new PersisterStateBatch(26L, 30L, 
RecordState.AVAILABLE.id, (short) 1)))))));
         
Mockito.when(persister.readState(Mockito.any())).thenReturn(CompletableFuture.completedFuture(readShareGroupStateResult));
         SharePartition sharePartition = 
SharePartitionBuilder.builder().withPersister(persister).build();
 
@@ -1284,7 +1285,7 @@ public class SharePartitionTest {
         expectedAcquiredRecords.addAll(expectedAcquiredRecord(15, 18, 3));
         expectedAcquiredRecords.addAll(expectedAcquiredRecord(19, 19, 1));
         expectedAcquiredRecords.addAll(expectedAcquiredRecord(23, 25, 1));
-        expectedAcquiredRecords.addAll(expectedAcquiredRecord(26, 30, 4));
+        expectedAcquiredRecords.addAll(expectedAcquiredRecord(26, 30, 2));
         expectedAcquiredRecords.addAll(expectedAcquiredRecord(31, 49, 1));
 
         assertArrayEquals(expectedAcquiredRecords.toArray(), 
acquiredRecordsList.toArray());
@@ -1322,7 +1323,7 @@ public class SharePartitionTest {
                     List.of(
                         new PersisterStateBatch(15L, 18L, 
RecordState.AVAILABLE.id, (short) 2),
                         new PersisterStateBatch(20L, 22L, 
RecordState.ARCHIVED.id, (short) 2),
-                        new PersisterStateBatch(26L, 30L, 
RecordState.AVAILABLE.id, (short) 3)))))));
+                        new PersisterStateBatch(26L, 30L, 
RecordState.AVAILABLE.id, (short) 1)))))));
         
Mockito.when(persister.readState(Mockito.any())).thenReturn(CompletableFuture.completedFuture(readShareGroupStateResult));
         SharePartition sharePartition = 
SharePartitionBuilder.builder().withPersister(persister).build();
 
@@ -1365,7 +1366,7 @@ public class SharePartitionTest {
         expectedAcquiredRecords.addAll(expectedAcquiredRecord(15, 18, 3));
         expectedAcquiredRecords.addAll(expectedAcquiredRecord(19, 19, 1));
         expectedAcquiredRecords.addAll(expectedAcquiredRecord(23, 25, 1));
-        expectedAcquiredRecords.addAll(expectedAcquiredRecords(26, 27, 4));
+        expectedAcquiredRecords.addAll(expectedAcquiredRecords(26, 27, 2));
 
         assertArrayEquals(expectedAcquiredRecords.toArray(), 
acquiredRecordsList.toArray());
         assertEquals(28, sharePartition.nextFetchOffset());
@@ -1404,7 +1405,7 @@ public class SharePartitionTest {
                     List.of(
                         new PersisterStateBatch(15L, 18L, 
RecordState.AVAILABLE.id, (short) 2),
                         new PersisterStateBatch(20L, 22L, 
RecordState.ARCHIVED.id, (short) 2),
-                        new PersisterStateBatch(26L, 30L, 
RecordState.AVAILABLE.id, (short) 3)))))));
+                        new PersisterStateBatch(26L, 30L, 
RecordState.AVAILABLE.id, (short) 1)))))));
         
Mockito.when(persister.readState(Mockito.any())).thenReturn(CompletableFuture.completedFuture(readShareGroupStateResult));
         SharePartition sharePartition = 
SharePartitionBuilder.builder().withPersister(persister).build();
 
@@ -1475,7 +1476,7 @@ public class SharePartitionTest {
                     List.of(
                         new PersisterStateBatch(15L, 18L, 
RecordState.AVAILABLE.id, (short) 2),
                         new PersisterStateBatch(20L, 22L, 
RecordState.ARCHIVED.id, (short) 2),
-                        new PersisterStateBatch(26L, 30L, 
RecordState.AVAILABLE.id, (short) 3)))))));
+                        new PersisterStateBatch(26L, 30L, 
RecordState.AVAILABLE.id, (short) 1)))))));
         
Mockito.when(persister.readState(Mockito.any())).thenReturn(CompletableFuture.completedFuture(readShareGroupStateResult));
         SharePartition sharePartition = 
SharePartitionBuilder.builder().withPersister(persister).build();
 
@@ -1621,7 +1622,7 @@ public class SharePartitionTest {
                 FETCH_ISOLATION_HWM),
             24);
 
-        expectedAcquiredRecords = new ArrayList<>(expectedAcquiredRecord(26, 
30, 4));
+        expectedAcquiredRecords = new ArrayList<>(expectedAcquiredRecord(26, 
30, 2));
         expectedAcquiredRecords.addAll(expectedAcquiredRecord(31, 49, 1));
 
         assertArrayEquals(expectedAcquiredRecords.toArray(), 
acquiredRecordsList.toArray());
@@ -1647,7 +1648,7 @@ public class SharePartitionTest {
                     List.of(
                         new PersisterStateBatch(15L, 18L, 
RecordState.AVAILABLE.id, (short) 2),
                         new PersisterStateBatch(20L, 22L, 
RecordState.ARCHIVED.id, (short) 2),
-                        new PersisterStateBatch(26L, 30L, 
RecordState.AVAILABLE.id, (short) 3)))))));
+                        new PersisterStateBatch(26L, 30L, 
RecordState.AVAILABLE.id, (short) 1)))))));
         
Mockito.when(persister.readState(Mockito.any())).thenReturn(CompletableFuture.completedFuture(readShareGroupStateResult));
         SharePartition sharePartition = 
SharePartitionBuilder.builder().withPersister(persister).build();
 
@@ -1697,7 +1698,7 @@ public class SharePartitionTest {
         expectedAcquiredRecords.addAll(expectedAcquiredRecord(15, 18, 3));
         expectedAcquiredRecords.addAll(expectedAcquiredRecord(19, 19, 1));
         expectedAcquiredRecords.addAll(expectedAcquiredRecord(23, 25, 1));
-        expectedAcquiredRecords.addAll(expectedAcquiredRecords(26, 27, 4));
+        expectedAcquiredRecords.addAll(expectedAcquiredRecords(26, 27, 2));
 
         assertArrayEquals(expectedAcquiredRecords.toArray(), 
acquiredRecordsList.toArray());
         assertEquals(28, sharePartition.nextFetchOffset());
@@ -1736,7 +1737,7 @@ public class SharePartitionTest {
                     List.of(
                         new PersisterStateBatch(15L, 18L, 
RecordState.AVAILABLE.id, (short) 2),
                         new PersisterStateBatch(20L, 22L, 
RecordState.ARCHIVED.id, (short) 2),
-                        new PersisterStateBatch(26L, 30L, 
RecordState.AVAILABLE.id, (short) 3)))))));
+                        new PersisterStateBatch(26L, 30L, 
RecordState.AVAILABLE.id, (short) 1)))))));
         
Mockito.when(persister.readState(Mockito.any())).thenReturn(CompletableFuture.completedFuture(readShareGroupStateResult));
         SharePartition sharePartition = 
SharePartitionBuilder.builder().withPersister(persister).build();
 
@@ -4428,12 +4429,12 @@ public class SharePartitionTest {
         // Allowing acquisition lock to expire to archive the records that 
reach max delivery count.
         mockTimer.advanceClock(DEFAULT_MAX_WAIT_ACQUISITION_LOCK_TIMEOUT_MS);
         TestUtils.waitForCondition(
-                () -> sharePartition.timer().size() == 0 &&
-                        sharePartition.nextFetchOffset() == 0 &&
-                        // After the second delivery attempt fails to 
acknowledge the record correctly, the record should be archived.
-                        sharePartition.cachedState().get(10L).batchState() == 
RecordState.ARCHIVED &&
-                        
sharePartition.cachedState().get(10L).batchDeliveryCount() == 2 &&
-                        
sharePartition.cachedState().get(10L).batchAcquisitionLockTimeoutTask() == null,
+            () -> sharePartition.timer().size() == 0 &&
+                    sharePartition.nextFetchOffset() == 0 &&
+                    // After the second delivery attempt fails to acknowledge 
the record correctly, the record should be archived.
+                    sharePartition.cachedState().get(10L).batchState() == 
RecordState.ARCHIVED &&
+                    sharePartition.cachedState().get(10L).batchDeliveryCount() 
== 2 &&
+                    
sharePartition.cachedState().get(10L).batchAcquisitionLockTimeoutTask() == null,
                 DEFAULT_MAX_WAIT_ACQUISITION_LOCK_TIMEOUT_MS,
                 () -> assertionFailedMessage(sharePartition, Map.of(10L, 
List.of())));
         // After the acquisition lock expires for the second time, the records 
should be archived as the max delivery count is reached.
@@ -7782,7 +7783,7 @@ public class SharePartitionTest {
             new TopicData<>(TOPIC_ID_PARTITION.topicId(), List.of(
                 PartitionFactory.newPartitionAllData(0, 3, 11L, 
Errors.NONE.code(), Errors.NONE.message(),
                     List.of(
-                        new PersisterStateBatch(11L, 20L, 
RecordState.AVAILABLE.id, (short) 2),
+                        new PersisterStateBatch(11L, 20L, 
RecordState.AVAILABLE.id, (short) 1),
                         new PersisterStateBatch(31L, 40L, 
RecordState.ARCHIVED.id, (short) 1) // There is a gap from 21 to 30
                     ))))));
         
Mockito.when(persister.readState(Mockito.any())).thenReturn(CompletableFuture.completedFuture(readShareGroupStateResult));
@@ -8083,11 +8084,11 @@ public class SharePartitionTest {
 
         fetchAcquiredRecords(sharePartition, records, 10);
         sharePartition.acknowledge(MEMBER_ID, List.of(
-                new ShareAcknowledgementBatch(5, 14, 
List.of(AcknowledgeType.RELEASE.id))));
+            new ShareAcknowledgementBatch(5, 14, 
List.of(AcknowledgeType.RELEASE.id))));
 
         fetchAcquiredRecords(sharePartition, records, 10);
         sharePartition.acknowledge(MEMBER_ID, List.of(
-                new ShareAcknowledgementBatch(5, 14, 
List.of(AcknowledgeType.RELEASE.id))));
+            new ShareAcknowledgementBatch(5, 14, 
List.of(AcknowledgeType.RELEASE.id))));
 
         // All the records in the batch reached the max delivery count, hence 
they got archived and the cached state cleared.
         assertEquals(15, sharePartition.nextFetchOffset());
@@ -8121,7 +8122,7 @@ public class SharePartitionTest {
         fetchAcquiredRecords(sharePartition, records2, 2);
 
         sharePartition.acknowledge(MEMBER_ID, List.of(
-                new ShareAcknowledgementBatch(13, 16, 
List.of(AcknowledgeType.RELEASE.id))));
+            new ShareAcknowledgementBatch(13, 16, 
List.of(AcknowledgeType.RELEASE.id))));
 
         assertEquals(20, sharePartition.nextFetchOffset());
         // Cached state will be empty because after the second release, the 
acquired records will now have moved to
@@ -11191,6 +11192,404 @@ public class SharePartitionTest {
         Mockito.verify(persister, Mockito.times(4)).writeState(Mockito.any());
     }
 
+    @Test
+    public void testThrottleRecordsWhenPendingDeliveriesExist() {
+        Persister persister = Mockito.mock(Persister.class);
+        ReadShareGroupStateResult readShareGroupStateResult = 
Mockito.mock(ReadShareGroupStateResult.class);
+        
Mockito.when(readShareGroupStateResult.topicsData()).thenReturn(List.of(
+            new TopicData<>(TOPIC_ID_PARTITION.topicId(), List.of(
+                PartitionFactory.newPartitionAllData(0, 3, 5L, 
Errors.NONE.code(), Errors.NONE.message(),
+                    List.of(
+                        new PersisterStateBatch(15L, 19L, 
RecordState.AVAILABLE.id, (short) 2),
+                        new PersisterStateBatch(20L, 22L, 
RecordState.ARCHIVED.id, (short) 2),
+                        new PersisterStateBatch(26L, 30L, 
RecordState.AVAILABLE.id, (short) 3)))))));
+        
Mockito.when(persister.readState(Mockito.any())).thenReturn(CompletableFuture.completedFuture(readShareGroupStateResult));
+        SharePartition sharePartition = 
SharePartitionBuilder.builder().withPersister(persister).build();
+
+        CompletableFuture<Void> result = sharePartition.maybeInitialize();
+        assertTrue(result.isDone());
+        assertFalse(result.isCompletedExceptionally());
+
+        assertEquals(SharePartitionState.ACTIVE, 
sharePartition.partitionState());
+        assertEquals(3, sharePartition.cachedState().size());
+        assertEquals(5, sharePartition.startOffset());
+        assertEquals(30, sharePartition.endOffset());
+        assertEquals(5, sharePartition.nextFetchOffset());
+
+        ByteBuffer buffer = ByteBuffer.allocate(4096);
+        memoryRecordsBuilder(buffer, 7, 13).close();
+        memoryRecordsBuilder(buffer, 20, 8).close();
+        buffer.flip();
+        MemoryRecords records = MemoryRecords.readableRecords(buffer);
+        // Set max fetch records to 500, records should be acquired till the 
offset 26 of the fetched batch.
+        // 16 records should be returned: 7-19, 23-25
+        // The record at offset 26 has a delivery count of 3 and is a subject 
to be throttled; it should be skipped.
+        List<AcquiredRecords> acquiredRecordsList = 
fetchAcquiredRecords(sharePartition.acquire(
+                MEMBER_ID,
+                ShareAcquireMode.BATCH_OPTIMIZED,
+                BATCH_SIZE,
+                500,
+                5,
+                fetchPartitionData(records),
+                FETCH_ISOLATION_HWM),
+            16);
+
+        List<AcquiredRecords> expectedAcquiredRecords = new 
ArrayList<>(expectedAcquiredRecord(7, 14, 1));
+        expectedAcquiredRecords.addAll(expectedAcquiredRecord(15, 19, 3));
+        expectedAcquiredRecords.addAll(expectedAcquiredRecord(23, 25, 1));
+
+        assertArrayEquals(expectedAcquiredRecords.toArray(), 
acquiredRecordsList.toArray());
+        assertEquals(26, sharePartition.nextFetchOffset());
+        assertEquals(23, sharePartition.cachedState().get(23L).firstOffset());
+        assertEquals(25, sharePartition.cachedState().get(23L).lastOffset());
+
+        assertEquals(RecordState.ACQUIRED, 
sharePartition.cachedState().get(7L).batchState());
+        assertEquals(RecordState.ACQUIRED, 
sharePartition.cachedState().get(15L).batchState());
+        assertEquals(RecordState.ARCHIVED, 
sharePartition.cachedState().get(20L).batchState());
+        assertEquals(RecordState.ACQUIRED, 
sharePartition.cachedState().get(23L).batchState());
+        assertEquals(RecordState.AVAILABLE, 
sharePartition.cachedState().get(26L).batchState());
+        assertEquals(30L, sharePartition.endOffset());
+        assertEquals(3, sharePartition.deliveryCompleteCount());
+
+        // The record at offset 26 has a delivery count of 3 and is a subject 
to be throttled;
+        // First acquisition attempt fails: batch size should be halved (5 -> 
2)
+        acquiredRecordsList = fetchAcquiredRecords(sharePartition.acquire(
+                MEMBER_ID,
+                ShareAcquireMode.BATCH_OPTIMIZED,
+                BATCH_SIZE,
+                500,
+                26,
+                fetchPartitionData(records),
+                FETCH_ISOLATION_HWM),
+            2);
+
+        expectedAcquiredRecords = new ArrayList<>(expectedAcquiredRecord(26, 
26, 4));
+        expectedAcquiredRecords.addAll(expectedAcquiredRecord(27, 27, 4));
+        assertArrayEquals(expectedAcquiredRecords.toArray(), 
acquiredRecordsList.toArray());
+    }
+
+    @Test
+    public void 
testAcquireRecordsHalvesBatchSizeOnEachFailureUntilSingleRecordOnLastAttempt() 
throws InterruptedException {
+        Persister persister = Mockito.mock(Persister.class);
+        ReadShareGroupStateResult readShareGroupStateResult = 
Mockito.mock(ReadShareGroupStateResult.class);
+        
Mockito.when(readShareGroupStateResult.topicsData()).thenReturn(List.of(
+            new TopicData<>(TOPIC_ID_PARTITION.topicId(), List.of(
+                PartitionFactory.newPartitionAllData(0, 3, 5L, 
Errors.NONE.code(), Errors.NONE.message(),
+                    List.of(
+                        new PersisterStateBatch(15L, 34L, 
RecordState.AVAILABLE.id, (short) 4)))))));
+        
Mockito.when(persister.readState(Mockito.any())).thenReturn(CompletableFuture.completedFuture(readShareGroupStateResult));
+        SharePartition sharePartition = SharePartitionBuilder.builder()
+            .withPersister(persister)
+            .withMaxDeliveryCount(7)
+            
.withDefaultAcquisitionLockTimeoutMs(DEFAULT_MAX_WAIT_ACQUISITION_LOCK_TIMEOUT_MS)
+            .build();
+
+        WriteShareGroupStateResult writeShareGroupStateResult = 
Mockito.mock(WriteShareGroupStateResult.class);
+        
Mockito.when(writeShareGroupStateResult.topicsData()).thenReturn(List.of(
+            new TopicData<>(TOPIC_ID_PARTITION.topicId(), List.of(
+                PartitionFactory.newPartitionErrorData(0, Errors.NONE.code(), 
Errors.NONE.message())))));
+        
Mockito.when(persister.writeState(Mockito.any())).thenReturn(CompletableFuture.completedFuture(writeShareGroupStateResult));
+
+        CompletableFuture<Void> result = sharePartition.maybeInitialize();
+        assertTrue(result.isDone());
+        assertFalse(result.isCompletedExceptionally());
+
+        assertEquals(SharePartitionState.ACTIVE, 
sharePartition.partitionState());
+        assertEquals(1, sharePartition.cachedState().size());
+        assertEquals(5, sharePartition.startOffset());
+        assertEquals(34, sharePartition.endOffset());
+        assertEquals(5, sharePartition.nextFetchOffset());
+
+        ByteBuffer buffer = ByteBuffer.allocate(4096);
+        memoryRecordsBuilder(buffer, 15, 20).close();
+        buffer.flip();
+        MemoryRecords records = MemoryRecords.readableRecords(buffer);
+
+        // The record at offset 15 has a delivery count of 4 and is a subject 
to be throttled
+        // First acquisition attempt fails: batch size should be halved (20 -> 
10)
+        List<AcquiredRecords> acquiredRecordsList = 
fetchAcquiredRecords(sharePartition.acquire(
+                MEMBER_ID,
+                ShareAcquireMode.BATCH_OPTIMIZED,
+                BATCH_SIZE,
+                500,
+                5,
+                fetchPartitionData(records),
+                FETCH_ISOLATION_HWM),
+            10);
+        final List<AcquiredRecords> expectedAcquiredRecords = new 
ArrayList<>(expectedAcquiredRecord(15, 15, 5));
+        IntStream.range(1, 10).forEach(i -> 
expectedAcquiredRecords.addAll(expectedAcquiredRecord(15 + i, 15 + i, 5)));
+        assertArrayEquals(expectedAcquiredRecords.toArray(), 
acquiredRecordsList.toArray());
+
+        // Allowing acquisition lock to expire.
+        mockTimer.advanceClock(DEFAULT_MAX_WAIT_ACQUISITION_LOCK_TIMEOUT_MS + 
1);
+
+        // Second failure: batch size halved again (now ~1/4 of original, 20 
-> 5)
+        acquiredRecordsList = fetchAcquiredRecords(sharePartition.acquire(
+                MEMBER_ID,
+                ShareAcquireMode.BATCH_OPTIMIZED,
+                BATCH_SIZE,
+                500,
+                5,
+                fetchPartitionData(records),
+                FETCH_ISOLATION_HWM),
+            5);
+        final List<AcquiredRecords> expectedAcquiredRecords2 = new 
ArrayList<>(expectedAcquiredRecord(15, 15, 6));
+        IntStream.range(1, 5).forEach(i -> 
expectedAcquiredRecords2.addAll(expectedAcquiredRecord(15 + i, 15 + i, 6)));
+        assertArrayEquals(expectedAcquiredRecords2.toArray(), 
acquiredRecordsList.toArray());
+
+        // Allowing acquisition lock to expire.
+        mockTimer.advanceClock(DEFAULT_MAX_WAIT_ACQUISITION_LOCK_TIMEOUT_MS + 
1);
+
+        List<AcquiredRecords> expectedLastAttemptAcquiredRecords;
+        // Last delivery attempt, records are delivered individually.
+        for (int i = 0; i < 5; i++) {
+            acquiredRecordsList = fetchAcquiredRecords(sharePartition.acquire(
+                    MEMBER_ID,
+                    ShareAcquireMode.BATCH_OPTIMIZED,
+                    BATCH_SIZE,
+                    500,
+                    5,
+                    fetchPartitionData(records),
+                    FETCH_ISOLATION_HWM),
+                1);
+            expectedLastAttemptAcquiredRecords = new 
ArrayList<>(expectedAcquiredRecord(15 + i, 15 + i, 7));
+            assertArrayEquals(expectedLastAttemptAcquiredRecords.toArray(), 
acquiredRecordsList.toArray());
+        }
+
+        // The record at offset 20 has a delivery count of 5 and is a subject 
to be throttled;
+        // Second acquisition attempt fails: batch size should be ~1/4 of 
original, 20 -> 5
+        acquiredRecordsList = fetchAcquiredRecords(sharePartition.acquire(
+                MEMBER_ID,
+                ShareAcquireMode.BATCH_OPTIMIZED,
+                BATCH_SIZE,
+                500,
+                5,
+                fetchPartitionData(records),
+                FETCH_ISOLATION_HWM),
+            5);
+        final List<AcquiredRecords> expectedAcquiredRecords3 = new 
ArrayList<>(expectedAcquiredRecord(20, 20, 6));
+        IntStream.range(1, 5).forEach(i -> 
expectedAcquiredRecords3.addAll(expectedAcquiredRecord(20 + i, 20 + i, 6)));
+        assertArrayEquals(expectedAcquiredRecords3.toArray(), 
acquiredRecordsList.toArray());
+
+        // The record at offset 25 has a delivery count of 4 and is a subject 
to be throttled;
+        // First acquisition attempt fails: batch size should be ~1/2 of 
original, 20 -> 10
+        acquiredRecordsList = fetchAcquiredRecords(sharePartition.acquire(
+                MEMBER_ID,
+                ShareAcquireMode.BATCH_OPTIMIZED,
+                BATCH_SIZE,
+                500,
+                5,
+                fetchPartitionData(records),
+                FETCH_ISOLATION_HWM),
+            10);
+        final List<AcquiredRecords> expectedAcquiredRecords4 = new 
ArrayList<>(expectedAcquiredRecord(25, 25, 5));
+        IntStream.range(1, 10).forEach(i -> 
expectedAcquiredRecords4.addAll(expectedAcquiredRecord(25 + i, 25 + i, 5)));
+        assertArrayEquals(expectedAcquiredRecords4.toArray(), 
acquiredRecordsList.toArray());
+    }
+
+    @Test
+    public void testLastAttemptRecordIsolationWithMixedDeliveryCount() {
+        Persister persister = Mockito.mock(Persister.class);
+        ReadShareGroupStateResult readShareGroupStateResult = 
Mockito.mock(ReadShareGroupStateResult.class);
+        
Mockito.when(readShareGroupStateResult.topicsData()).thenReturn(List.of(
+            new TopicData<>(TOPIC_ID_PARTITION.topicId(), List.of(
+                PartitionFactory.newPartitionAllData(0, 3, 15L, 
Errors.NONE.code(), Errors.NONE.message(),
+                    List.of(
+                        new PersisterStateBatch(15L, 34L, 
RecordState.AVAILABLE.id, (short) 2)))))));
+        
Mockito.when(persister.readState(Mockito.any())).thenReturn(CompletableFuture.completedFuture(readShareGroupStateResult));
+        SharePartition sharePartition = 
SharePartitionBuilder.builder().withPersister(persister).build();
+
+        WriteShareGroupStateResult writeShareGroupStateResult = 
Mockito.mock(WriteShareGroupStateResult.class);
+        
Mockito.when(writeShareGroupStateResult.topicsData()).thenReturn(List.of(
+            new TopicData<>(TOPIC_ID_PARTITION.topicId(), List.of(
+                PartitionFactory.newPartitionErrorData(0, Errors.NONE.code(), 
Errors.NONE.message())))));
+        
Mockito.when(persister.writeState(Mockito.any())).thenReturn(CompletableFuture.completedFuture(writeShareGroupStateResult));
+
+        CompletableFuture<Void> result = sharePartition.maybeInitialize();
+        assertTrue(result.isDone());
+        assertFalse(result.isCompletedExceptionally());
+
+        assertEquals(SharePartitionState.ACTIVE, 
sharePartition.partitionState());
+        assertEquals(1, sharePartition.cachedState().size());
+        assertEquals(15, sharePartition.startOffset());
+        assertEquals(34, sharePartition.endOffset());
+        assertEquals(15, sharePartition.nextFetchOffset());
+
+        ByteBuffer buffer = ByteBuffer.allocate(4096);
+        memoryRecordsBuilder(buffer, 15, 20).close();
+        buffer.flip();
+        MemoryRecords records = MemoryRecords.readableRecords(buffer);
+
+        fetchAcquiredRecords(sharePartition.acquire(
+                MEMBER_ID,
+                ShareAcquireMode.BATCH_OPTIMIZED,
+                BATCH_SIZE,
+                500,
+                15,
+                fetchPartitionData(records),
+                FETCH_ISOLATION_HWM),
+            20);
+
+        // Release middle batch.
+        CompletableFuture<Void> ackResult = sharePartition.acknowledge(
+            MEMBER_ID,
+            List.of(new ShareAcknowledgementBatch(17, 17, 
List.of(AcknowledgeType.RELEASE.id)),
+                new ShareAcknowledgementBatch(20, 20, 
List.of(AcknowledgeType.RELEASE.id))));
+        assertNull(ackResult.join());
+        assertFalse(ackResult.isCompletedExceptionally());
+
+        fetchAcquiredRecords(sharePartition.acquire(
+                MEMBER_ID,
+                ShareAcquireMode.BATCH_OPTIMIZED,
+                BATCH_SIZE,
+                500,
+                15,
+                fetchPartitionData(records),
+                FETCH_ISOLATION_HWM),
+            2);
+
+        // Release all batch.
+        ackResult = sharePartition.acknowledge(
+            MEMBER_ID,
+            List.of(new ShareAcknowledgementBatch(15, 34, 
List.of(AcknowledgeType.RELEASE.id))));
+        assertNull(ackResult.join());
+        assertFalse(ackResult.isCompletedExceptionally());
+
+        assertEquals(1, sharePartition.cachedState().size());
+        assertEquals(RecordState.AVAILABLE, 
sharePartition.cachedState().get(15L).offsetState().get(15L).state());
+        assertEquals(RecordState.AVAILABLE, 
sharePartition.cachedState().get(15L).offsetState().get(16L).state());
+        assertEquals(RecordState.AVAILABLE, 
sharePartition.cachedState().get(15L).offsetState().get(17L).state());
+        assertEquals(RecordState.AVAILABLE, 
sharePartition.cachedState().get(15L).offsetState().get(18L).state());
+        assertEquals(3, 
sharePartition.cachedState().get(15L).offsetState().get(15L).deliveryCount());
+        assertEquals(3, 
sharePartition.cachedState().get(15L).offsetState().get(16L).deliveryCount());
+        assertEquals(4, 
sharePartition.cachedState().get(15L).offsetState().get(17L).deliveryCount());
+        assertEquals(3, 
sharePartition.cachedState().get(15L).offsetState().get(18L).deliveryCount());
+
+        // The record at offset 17 (delivery count of 4) is on its last 
attempt and should be delivered alone,
+        // so this acquisition correctly stops at offset 16.
+        List<AcquiredRecords> acquiredRecordsList = 
fetchAcquiredRecords(sharePartition.acquire(
+                MEMBER_ID,
+                ShareAcquireMode.BATCH_OPTIMIZED,
+                BATCH_SIZE,
+                500,
+                5,
+                fetchPartitionData(records),
+                FETCH_ISOLATION_HWM),
+            2);
+        List<AcquiredRecords> expectedAcquiredRecords = new 
ArrayList<>(expectedAcquiredRecord(15, 15, 4));
+        expectedAcquiredRecords.addAll(expectedAcquiredRecord(16, 16, 4));
+        assertArrayEquals(expectedAcquiredRecords.toArray(), 
acquiredRecordsList.toArray());
+
+        // The record at offset 17 should ba delivered alone.
+        acquiredRecordsList = fetchAcquiredRecords(sharePartition.acquire(
+                MEMBER_ID,
+                ShareAcquireMode.BATCH_OPTIMIZED,
+                BATCH_SIZE,
+                500,
+                5,
+                fetchPartitionData(records),
+                FETCH_ISOLATION_HWM),
+            1);
+        expectedAcquiredRecords = new ArrayList<>(expectedAcquiredRecord(17, 
17, 5));
+        assertArrayEquals(expectedAcquiredRecords.toArray(), 
acquiredRecordsList.toArray());
+    }
+
+    @Test
+    public void 
testAcquisitionNotThrottledIfHighDeliveryCountRecordNotAcquired() {
+        Persister persister = Mockito.mock(Persister.class);
+        ReadShareGroupStateResult readShareGroupStateResult = 
Mockito.mock(ReadShareGroupStateResult.class);
+        
Mockito.when(readShareGroupStateResult.topicsData()).thenReturn(List.of(
+            new TopicData<>(TOPIC_ID_PARTITION.topicId(), List.of(
+                PartitionFactory.newPartitionAllData(0, 3, 15L, 
Errors.NONE.code(), Errors.NONE.message(),
+                    List.of(
+                        new PersisterStateBatch(15L, 19L, 
RecordState.AVAILABLE.id, (short) 1)))))));
+        
Mockito.when(persister.readState(Mockito.any())).thenReturn(CompletableFuture.completedFuture(readShareGroupStateResult));
+        SharePartition sharePartition = 
SharePartitionBuilder.builder().withPersister(persister).build();
+
+        WriteShareGroupStateResult writeShareGroupStateResult = 
Mockito.mock(WriteShareGroupStateResult.class);
+        
Mockito.when(writeShareGroupStateResult.topicsData()).thenReturn(List.of(
+            new TopicData<>(TOPIC_ID_PARTITION.topicId(), List.of(
+                PartitionFactory.newPartitionErrorData(0, Errors.NONE.code(), 
Errors.NONE.message())))));
+        
Mockito.when(persister.writeState(Mockito.any())).thenReturn(CompletableFuture.completedFuture(writeShareGroupStateResult));
+
+        CompletableFuture<Void> result = sharePartition.maybeInitialize();
+        assertTrue(result.isDone());
+        assertFalse(result.isCompletedExceptionally());
+
+        assertEquals(SharePartitionState.ACTIVE, 
sharePartition.partitionState());
+        assertEquals(1, sharePartition.cachedState().size());
+        assertEquals(15, sharePartition.startOffset());
+        assertEquals(19, sharePartition.endOffset());
+        assertEquals(15, sharePartition.nextFetchOffset());
+
+        ByteBuffer buffer = ByteBuffer.allocate(4096);
+        memoryRecordsBuilder(buffer, 15, 5).close();
+        buffer.flip();
+        MemoryRecords records = MemoryRecords.readableRecords(buffer);
+
+        fetchAcquiredRecords(sharePartition.acquire(
+                MEMBER_ID,
+                ShareAcquireMode.BATCH_OPTIMIZED,
+                BATCH_SIZE,
+                500,
+                15,
+                fetchPartitionData(records),
+                FETCH_ISOLATION_HWM),
+            5);
+
+        // Release middle batch.
+        CompletableFuture<Void> ackResult = sharePartition.acknowledge(
+            MEMBER_ID,
+            List.of(new ShareAcknowledgementBatch(15, 15, 
List.of(AcknowledgeType.RELEASE.id))));
+        assertNull(ackResult.join());
+        assertFalse(ackResult.isCompletedExceptionally());
+
+        fetchAcquiredRecords(sharePartition.acquire(
+                MEMBER_ID,
+                ShareAcquireMode.BATCH_OPTIMIZED,
+                BATCH_SIZE,
+                500,
+                15,
+                fetchPartitionData(records),
+                FETCH_ISOLATION_HWM),
+            1);
+
+        // Release middle batch.
+        ackResult = sharePartition.acknowledge(
+            MEMBER_ID,
+            List.of(new ShareAcknowledgementBatch(16, 19, 
List.of(AcknowledgeType.RELEASE.id))));
+        assertNull(ackResult.join());
+        assertFalse(ackResult.isCompletedExceptionally());
+
+        assertEquals(1, sharePartition.cachedState().size());
+        assertEquals(RecordState.ACQUIRED, 
sharePartition.cachedState().get(15L).offsetState().get(15L).state());
+        assertEquals(RecordState.AVAILABLE, 
sharePartition.cachedState().get(15L).offsetState().get(16L).state());
+        assertEquals(RecordState.AVAILABLE, 
sharePartition.cachedState().get(15L).offsetState().get(17L).state());
+        assertEquals(RecordState.AVAILABLE, 
sharePartition.cachedState().get(15L).offsetState().get(18L).state());
+        assertEquals(RecordState.AVAILABLE, 
sharePartition.cachedState().get(15L).offsetState().get(19L).state());
+        assertEquals(3, 
sharePartition.cachedState().get(15L).offsetState().get(15L).deliveryCount());
+        assertEquals(2, 
sharePartition.cachedState().get(15L).offsetState().get(16L).deliveryCount());
+        assertEquals(2, 
sharePartition.cachedState().get(15L).offsetState().get(17L).deliveryCount());
+        assertEquals(2, 
sharePartition.cachedState().get(15L).offsetState().get(18L).deliveryCount());
+        assertEquals(2, 
sharePartition.cachedState().get(15L).offsetState().get(19L).deliveryCount());
+
+        // This acquisition should not be throttled, as the 
high-delivery-count record (offset 15) was not acquired.
+        List<AcquiredRecords> acquiredRecordsList = 
fetchAcquiredRecords(sharePartition.acquire(
+                MEMBER_ID,
+                ShareAcquireMode.BATCH_OPTIMIZED,
+                BATCH_SIZE,
+                500,
+                5,
+                fetchPartitionData(records),
+                FETCH_ISOLATION_HWM),
+            4);
+        List<AcquiredRecords> expectedAcquiredRecords = new 
ArrayList<>(expectedAcquiredRecord(16, 16, 3));
+        expectedAcquiredRecords.addAll(expectedAcquiredRecord(17, 17, 3));
+        expectedAcquiredRecords.addAll(expectedAcquiredRecord(18, 18, 3));
+        expectedAcquiredRecords.addAll(expectedAcquiredRecord(19, 19, 3));
+        assertArrayEquals(expectedAcquiredRecords.toArray(), 
acquiredRecordsList.toArray());
+    }
+
     /**
      * This function produces transactional data of a given no. of records 
followed by a transactional marker (COMMIT/ABORT).
      */


Reply via email to