This is an automated email from the ASF dual-hosted git repository.

mittal pushed a commit to branch 4.2
in repository https://gitbox.apache.org/repos/asf/kafka.git


The following commit(s) were added to refs/heads/4.2 by this push:
     new 6ef617e93e4 KAFKA-19903: Integration tests for share group throttled 
delivery (#20953) (#20971)
6ef617e93e4 is described below

commit 6ef617e93e4e82aae311886bffad45703499580e
Author: Apoorv Mittal <[email protected]>
AuthorDate: Mon Nov 24 12:58:02 2025 +0000

    KAFKA-19903: Integration tests for share group throttled delivery (#20953) 
(#20971)
    
    Integartion tests for share group throttled delivery when delivery count
    is reaching the max delivery limit.
    
    The changes were introduced in the PR:
    https://github.com/apache/kafka/pull/20837
    
    Reviewers: Andrew Schofield <[email protected]>, Lan Ding
     <[email protected]>
---
 .../kafka/clients/consumer/ShareConsumerTest.java  | 379 +++++++++++++++++++++
 .../java/kafka/server/share/SharePartition.java    |  11 +-
 2 files changed, 388 insertions(+), 2 deletions(-)

diff --git 
a/clients/clients-integration-tests/src/test/java/org/apache/kafka/clients/consumer/ShareConsumerTest.java
 
b/clients/clients-integration-tests/src/test/java/org/apache/kafka/clients/consumer/ShareConsumerTest.java
index 5ce121fc602..435735467d1 100644
--- 
a/clients/clients-integration-tests/src/test/java/org/apache/kafka/clients/consumer/ShareConsumerTest.java
+++ 
b/clients/clients-integration-tests/src/test/java/org/apache/kafka/clients/consumer/ShareConsumerTest.java
@@ -3435,6 +3435,356 @@ public class ShareConsumerTest {
         }
     }
 
+    @ClusterTest
+    public void testFetchWithThrottledDelivery() {
+        alterShareAutoOffsetReset("group1", "earliest");
+        try (Producer<byte[], byte[]> producer = createProducer();
+            ShareConsumer<byte[], byte[]> shareConsumer = createShareConsumer(
+                "group1",
+                Map.of(ConsumerConfig.SHARE_ACKNOWLEDGEMENT_MODE_CONFIG, 
EXPLICIT))
+        ) {
+            // Produce a batch of 100 messages
+            for (int i = 0; i < 100; i++) {
+                ProducerRecord<byte[], byte[]> record = new 
ProducerRecord<>(tp.topic(), tp.partition(), null, "key".getBytes(), ("Message 
" + i).getBytes());
+                producer.send(record);
+            }
+            producer.flush();
+
+            shareConsumer.subscribe(List.of(tp.topic()));
+            // Fetch records in 5 iterations, each time acknowledging with 
RELEASE. 5 is the default
+            // delivery limit hence we should see throttling from 
Math.ceil(5/2) = 3 fetches.
+            int throttleDeliveryLimit = 3;
+            for (int i = 0; i < 5; i++) {
+                // Adjust expected fetch count based on throttling. If i < 
throttleDeliveryLimit, we get full batch of 100.
+                // If i == 4 i.e. the last delivery, then we get 1 record.
+                // Otherwise, we get half the previous fetch count due to 
throttling. In this case, 100 >> (i - throttleDeliveryLimit + 1) it is 50 for 
i=3.
+                int expectedFetchCount = (i < throttleDeliveryLimit) ? 100 : 
((i == 4) ? 1 : 50);
+                ConsumerRecords<byte[], byte[]> records = 
waitedPoll(shareConsumer, 2500L, expectedFetchCount);
+                assertEquals(expectedFetchCount, records.count());
+
+                records.forEach(record -> shareConsumer.acknowledge(record, 
AcknowledgeType.RELEASE));
+                Map<TopicIdPartition, Optional<KafkaException>> result = 
shareConsumer.commitSync();
+                assertEquals(1, result.size());
+                assertEquals(Optional.empty(),
+                    result.get(new TopicIdPartition(tpId, tp.partition(), 
tp.topic())));
+            }
+
+            // Offset 0 has already reached the delivery limit hence shall be 
archived.
+            // Offset 1 to 49 shall be in last delivery attempt and hence 1 
record per poll.
+            validateExpectedRecordsInEachPollAndRelease(shareConsumer, 1, 50, 
1);
+            // Delivery limit 4.
+            validateExpectedRecordsInEachPollAndRelease(shareConsumer, 50, 
100, 50);
+            // Delivery limit 5.
+            validateExpectedRecordsInEachPollAndRelease(shareConsumer, 50, 
100, 1);
+            // Next poll should not have any records as all records have 
reached delivery limit.
+            ConsumerRecords<byte[], byte[]> records = 
shareConsumer.poll(Duration.ofMillis(2500L));
+            assertTrue(records.isEmpty(), "Records should be empty as all 
records have reached delivery limit. But received: " + records.count());
+        }
+    }
+
+    @ClusterTest(
+        serverProperties = {
+            @ClusterConfigProperty(key = "group.share.delivery.count.limit", 
value = "10"),
+        }
+    )
+    public void 
testFetchWithThrottledDeliveryBatchesWithIncreasedDeliveryLimit() {
+        alterShareAutoOffsetReset("group1", "earliest");
+        try (Producer<byte[], byte[]> producer = createProducer();
+            ShareConsumer<byte[], byte[]> shareConsumer = createShareConsumer(
+                "group1",
+                Map.of(ConsumerConfig.SHARE_ACKNOWLEDGEMENT_MODE_CONFIG, 
EXPLICIT))
+        ) {
+            // Produce records in complete power of 2 to fully test the 
throttling behavior.
+            int producedMessageCount = 512;
+            // Produce a batch of 512 messages
+            for (int i = 0; i < producedMessageCount; i++) {
+                ProducerRecord<byte[], byte[]> record = new 
ProducerRecord<>(tp.topic(), tp.partition(),
+                    null, "key".getBytes(), ("Message " + i).getBytes());
+                producer.send(record);
+            }
+            producer.flush();
+
+            // Map which defines expected fetch count for each delivery 
attempt from 1 to 10.
+            Map<Integer, Integer> expectedFetchCountMap = Map.of(1, 512, 2, 
512, 3, 512, 4, 512, 5, 512,
+                6, 256, 7, 128, 8, 64, 9, 32, 10, 1);
+            shareConsumer.subscribe(List.of(tp.topic()));
+            // Fetch records in 10 iterations, each time acknowledging with 
RELEASE. 10 is the
+            // delivery limit hence we should see throttling from 
Math.ceil(10/2) = 5 fetches.
+            for (int i = 0; i < 10; i++) {
+                int expectedFetchCount = expectedFetchCountMap.get(i + 1);
+                ConsumerRecords<byte[], byte[]> records = 
waitedPoll(shareConsumer, 2500L, expectedFetchCount);
+                assertEquals(expectedFetchCount, records.count());
+                // Acknowledge all records with RELEASE.
+                records.forEach(record -> shareConsumer.acknowledge(record, 
AcknowledgeType.RELEASE));
+                Map<TopicIdPartition, Optional<KafkaException>> result = 
shareConsumer.commitSync();
+                assertEquals(1, result.size());
+                assertEquals(Optional.empty(), result.get(new 
TopicIdPartition(tpId, tp.partition(), tp.topic())));
+            }
+
+            // Offset 0 is already verified above, so start from offset 1 and 
as it's last delivery cycle
+            // hence expectedRecords is 1 for each poll till offset 32.
+            // Delivery 10
+            validateExpectedRecordsInEachPollAndRelease(shareConsumer, 1, 32, 
1);
+            // Delivery 9
+            validateExpectedRecordsInEachPollAndRelease(shareConsumer, 32, 64, 
32);
+            // Delivery 10
+            validateExpectedRecordsInEachPollAndRelease(shareConsumer, 32, 64, 
1);
+            // Delivery 8
+            validateExpectedRecordsInEachPollAndRelease(shareConsumer, 64, 
128, 64);
+            // Delivery 9
+            validateExpectedRecordsInEachPollAndRelease(shareConsumer, 64, 96, 
32);
+            // Delivery 10
+            validateExpectedRecordsInEachPollAndRelease(shareConsumer, 64, 96, 
1);
+            // Delivery 9
+            validateExpectedRecordsInEachPollAndRelease(shareConsumer, 96, 
128, 32);
+            // Delivery 10
+            validateExpectedRecordsInEachPollAndRelease(shareConsumer, 96, 
128, 1);
+            // Delivery 7
+            validateExpectedRecordsInEachPollAndRelease(shareConsumer, 128, 
256, 128);
+            // Delivery 8
+            validateExpectedRecordsInEachPollAndRelease(shareConsumer, 128, 
192, 64);
+            // Delivery 9
+            validateExpectedRecordsInEachPollAndRelease(shareConsumer, 128, 
160, 32);
+            // Delivery 10
+            validateExpectedRecordsInEachPollAndRelease(shareConsumer, 128, 
160, 1);
+            // Delivery 9
+            validateExpectedRecordsInEachPollAndRelease(shareConsumer, 160, 
192, 32);
+            // Delivery 10
+            validateExpectedRecordsInEachPollAndRelease(shareConsumer, 160, 
192, 1);
+            // Delivery 8
+            validateExpectedRecordsInEachPollAndRelease(shareConsumer, 192, 
256, 64);
+            // Delivery 9
+            validateExpectedRecordsInEachPollAndRelease(shareConsumer, 192, 
224, 32);
+            // Delivery 10
+            validateExpectedRecordsInEachPollAndRelease(shareConsumer, 192, 
224, 1);
+            // Delivery 9
+            validateExpectedRecordsInEachPollAndRelease(shareConsumer, 224, 
256, 32);
+            // Delivery 10
+            validateExpectedRecordsInEachPollAndRelease(shareConsumer, 224, 
256, 1);
+            // Delivery 6
+            validateExpectedRecordsInEachPollAndRelease(shareConsumer, 256, 
512, 256);
+            // Delivery 7
+            validateExpectedRecordsInEachPollAndRelease(shareConsumer, 256, 
384, 128);
+            // Delivery 8
+            validateExpectedRecordsInEachPollAndRelease(shareConsumer, 256, 
320, 64);
+            // Delivery 9
+            validateExpectedRecordsInEachPollAndRelease(shareConsumer, 256, 
288, 32);
+            // Delivery 10
+            validateExpectedRecordsInEachPollAndRelease(shareConsumer, 256, 
288, 1);
+            // Delivery 9
+            validateExpectedRecordsInEachPollAndRelease(shareConsumer, 288, 
320, 32);
+            // Delivery 10
+            validateExpectedRecordsInEachPollAndRelease(shareConsumer, 288, 
320, 1);
+            // Delivery 8
+            validateExpectedRecordsInEachPollAndRelease(shareConsumer, 320, 
384, 64);
+            // Delivery 9
+            validateExpectedRecordsInEachPollAndRelease(shareConsumer, 320, 
352, 32);
+            // Delivery 10
+            validateExpectedRecordsInEachPollAndRelease(shareConsumer, 320, 
352, 1);
+            // Delivery 9
+            validateExpectedRecordsInEachPollAndRelease(shareConsumer, 352, 
384, 32);
+            // Delivery 10
+            validateExpectedRecordsInEachPollAndRelease(shareConsumer, 352, 
384, 1);
+            // Delivery 7
+            validateExpectedRecordsInEachPollAndRelease(shareConsumer, 384, 
512, 128);
+            // Delivery 8
+            validateExpectedRecordsInEachPollAndRelease(shareConsumer, 384, 
448, 64);
+            // Delivery 9
+            validateExpectedRecordsInEachPollAndRelease(shareConsumer, 384, 
416, 32);
+            // Delivery 10
+            validateExpectedRecordsInEachPollAndRelease(shareConsumer, 384, 
416, 1);
+            // Delivery 9
+            validateExpectedRecordsInEachPollAndRelease(shareConsumer, 416, 
448, 32);
+            // Delivery 10
+            validateExpectedRecordsInEachPollAndRelease(shareConsumer, 416, 
448, 1);
+            // Delivery 8
+            validateExpectedRecordsInEachPollAndRelease(shareConsumer, 448, 
512, 64);
+            // Delivery 9
+            validateExpectedRecordsInEachPollAndRelease(shareConsumer, 448, 
480, 32);
+            // Delivery 10
+            validateExpectedRecordsInEachPollAndRelease(shareConsumer, 448, 
480, 1);
+            // Delivery 9
+            validateExpectedRecordsInEachPollAndRelease(shareConsumer, 480, 
512, 32);
+            // Delivery 10
+            validateExpectedRecordsInEachPollAndRelease(shareConsumer, 480, 
512, 1);
+            // Next poll should not have any records as all records have 
reached delivery limit.
+            ConsumerRecords<byte[], byte[]> records = 
shareConsumer.poll(Duration.ofMillis(2500L));
+            assertTrue(records.isEmpty(), "Records should be empty as all 
records have reached delivery limit. But received: " + records.count());
+        }
+    }
+
+    @ClusterTest(
+        serverProperties = {
+            @ClusterConfigProperty(key = "group.share.delivery.count.limit", 
value = "10"),
+        }
+    )
+    public void testFetchWithThrottledDeliveryValidateDeliveryCount() throws 
InterruptedException {
+        alterShareAutoOffsetReset("group1", "earliest");
+        try (Producer<byte[], byte[]> producer = createProducer();
+            ShareConsumer<byte[], byte[]> shareConsumer = createShareConsumer(
+                "group1",
+                Map.of(ConsumerConfig.SHARE_ACKNOWLEDGEMENT_MODE_CONFIG, 
EXPLICIT))
+        ) {
+            int producedMessageCount = 500;
+            // Produce a batch of 500 messages
+            for (int i = 0; i < producedMessageCount; i++) {
+                ProducerRecord<byte[], byte[]> record = new 
ProducerRecord<>(tp.topic(), tp.partition(),
+                    null, "key".getBytes(), ("Message " + i).getBytes());
+                producer.send(record);
+            }
+            producer.flush();
+
+            // Map to track delivery count for each offset.
+            Map<Long, Integer> offsetToDeliveryCountMap = new HashMap<>();
+            // Map which defines expected fetch count for each delivery 
attempt from 1 to 10.
+            Map<Integer, Integer> expectedFetchCountMap = Map.of(1, 500, 2, 
500, 3, 500, 4, 500, 5, 500,
+                6, 250, 7, 125, 8, 62, 9, 31, 10, 1);
+            shareConsumer.subscribe(List.of(tp.topic()));
+            // Fetch records in 10 iterations, each time acknowledging with 
RELEASE. 10 is the
+            // delivery limit hence we should see throttling from 
Math.ceil(10/2) = 5 fetches.
+            for (int i = 0; i < 10; i++) {
+                int expectedFetchCount = expectedFetchCountMap.get(i + 1);
+                ConsumerRecords<byte[], byte[]> records = 
waitedPoll(shareConsumer, 2500L, expectedFetchCount);
+                assertEquals(expectedFetchCount, records.count());
+                // Update delivery count for each offset.
+                records.forEach(record -> {
+                    if 
(!offsetToDeliveryCountMap.containsKey(record.offset())) {
+                        offsetToDeliveryCountMap.put(record.offset(), 1);
+                    } else  {
+                        offsetToDeliveryCountMap.put(record.offset(), 
offsetToDeliveryCountMap.get(record.offset()) + 1);
+                    }
+                });
+                // Acknowledge with RELEASE.
+                records.forEach(record -> shareConsumer.acknowledge(record, 
AcknowledgeType.RELEASE));
+                Map<TopicIdPartition, Optional<KafkaException>> result = 
shareConsumer.commitSync();
+                assertEquals(1, result.size());
+                assertEquals(Optional.empty(), result.get(new 
TopicIdPartition(tpId, tp.partition(), tp.topic())));
+            }
+
+            // Validate every offset is delivered at most till delivery limit.
+            waitForCondition(() -> {
+                ConsumerRecords<byte[], byte[]> records = 
shareConsumer.poll(Duration.ofMillis(2500L));
+                if (!records.isEmpty()) {
+                    records.forEach(record -> {
+                        if 
(!offsetToDeliveryCountMap.containsKey(record.offset())) {
+                            offsetToDeliveryCountMap.put(record.offset(), 1);
+                        } else  {
+                            offsetToDeliveryCountMap.put(record.offset(), 
offsetToDeliveryCountMap.get(record.offset()) + 1);
+                        }
+                    });
+                    records.forEach(record -> 
shareConsumer.acknowledge(record, AcknowledgeType.RELEASE));
+                    Map<TopicIdPartition, Optional<KafkaException>> result = 
shareConsumer.commitSync();
+                    assertEquals(1, result.size());
+                    assertEquals(Optional.empty(),
+                        result.get(new TopicIdPartition(tpId, tp.partition(), 
tp.topic())));
+                }
+                return offsetToDeliveryCountMap.size() == 500 &&
+                    
offsetToDeliveryCountMap.values().stream().allMatch(deliveryCount -> 
deliveryCount == 10);
+                },
+                120000L, // 120 seconds.
+                50L,
+                () -> "failed to get records till delivery limit"
+            );
+
+            // Next poll should not have any records as all records have 
reached delivery limit.
+            ConsumerRecords<byte[], byte[]> records = 
shareConsumer.poll(Duration.ofMillis(2500L));
+            assertTrue(records.isEmpty(), "Records should be empty as all 
records have reached delivery limit. But received: " + records.count());
+        }
+    }
+
+    @ClusterTest(
+        serverProperties = {
+            @ClusterConfigProperty(key = "group.share.delivery.count.limit", 
value = "2"),
+        }
+    )
+    public void 
testFetchWithThrottledDeliveryBatchesWithDecreasedDeliveryLimit() {
+        alterShareAutoOffsetReset("group1", "earliest");
+        try (Producer<byte[], byte[]> producer = createProducer();
+            ShareConsumer<byte[], byte[]> shareConsumer = createShareConsumer(
+                "group1",
+                Map.of(
+                    ConsumerConfig.SHARE_ACKNOWLEDGEMENT_MODE_CONFIG, EXPLICIT,
+                    ConsumerConfig.MAX_POLL_RECORDS_CONFIG, 512
+                ))
+        ) {
+            // Produce records in complete power of 2 to fully test the 
throttling behavior.
+            int producedMessageCount = 512;
+            // Produce a batch of 512 messages
+            for (int i = 0; i < producedMessageCount; i++) {
+                ProducerRecord<byte[], byte[]> record = new 
ProducerRecord<>(tp.topic(), tp.partition(),
+                    null, "key".getBytes(), ("Message " + i).getBytes());
+                producer.send(record);
+            }
+            producer.flush();
+
+            shareConsumer.subscribe(List.of(tp.topic()));
+            // Fetch records in 2 iterations, each time acknowledging with 
RELEASE. As throttling
+            // currently applies for delivery limit > 2, hence we should get 
full batch in both fetches.
+            validateExpectedRecordsInEachPollAndRelease(shareConsumer, 0, 512, 
512);
+            validateExpectedRecordsInEachPollAndRelease(shareConsumer, 0, 512, 
512);
+            // Next poll should not have any records as all records have 
reached delivery limit.
+            ConsumerRecords<byte[], byte[]> records = 
shareConsumer.poll(Duration.ofMillis(2500L));
+            assertTrue(records.isEmpty(), "Records should be empty as all 
records have reached delivery limit. But received: " + records.count());
+        }
+    }
+
+    @ClusterTest
+    public void testFetchWithThrottledDeliveryBatchesMultipleConsumers() {
+        alterShareAutoOffsetReset("group1", "earliest");
+        try (Producer<byte[], byte[]> producer = createProducer();
+            ShareConsumer<byte[], byte[]> shareConsumer1 = createShareConsumer(
+                "group1",
+                Map.of(
+                    ConsumerConfig.SHARE_ACKNOWLEDGEMENT_MODE_CONFIG, EXPLICIT,
+                    ConsumerConfig.MAX_POLL_RECORDS_CONFIG, 1
+                ));
+            ShareConsumer<byte[], byte[]> shareConsumer2 = createShareConsumer(
+                "group1",
+                Map.of(
+                    ConsumerConfig.SHARE_ACKNOWLEDGEMENT_MODE_CONFIG, EXPLICIT,
+                    ConsumerConfig.MAX_POLL_RECORDS_CONFIG, 2
+                ))
+        ) {
+            // Produce 2 records in separate batches.
+            for (int i = 0; i < 2; i++) {
+                ProducerRecord<byte[], byte[]> record = new 
ProducerRecord<>(tp.topic(), tp.partition(),
+                    null, "key".getBytes(), ("Message " + i).getBytes());
+                producer.send(record);
+                // Flush immediately to create 2 different batches.
+                producer.flush();
+            }
+
+            shareConsumer1.subscribe(List.of(tp.topic()));
+            shareConsumer2.subscribe(List.of(tp.topic()));
+            // Fetch from consumer1 - should get 1 record as 
max.poll.records=1.
+            ConsumerRecords<byte[], byte[]> records = 
waitedPoll(shareConsumer1, 2500L, 1);
+            assertEquals(1, records.count());
+            // Verify the first offset of the fetched records.
+            assertEquals(0, records.iterator().next().offset());
+            // Fetch from consumer2 - should get 1 record as offset 0 is 
Acquired by consumer1.
+            // Release the record from consumer2 after fetching until the last 
delivery attempt.
+            validateExpectedRecordsInEachPollAndRelease(shareConsumer2, 1, 2, 
1);
+            validateExpectedRecordsInEachPollAndRelease(shareConsumer2, 1, 2, 
1);
+            validateExpectedRecordsInEachPollAndRelease(shareConsumer2, 1, 2, 
1);
+            validateExpectedRecordsInEachPollAndRelease(shareConsumer2, 1, 2, 
1);
+
+            // Now release the record from consumer1. Fetch again from 
consumer2 to verify it gets the released record.
+            // And should only get 1 record at offset 0 as offset 1 record is 
in final delivery attempt.
+            records.forEach(record -> shareConsumer1.acknowledge(record, 
AcknowledgeType.RELEASE));
+            Map<TopicIdPartition, Optional<KafkaException>> result = 
shareConsumer1.commitSync();
+            assertEquals(1, result.size());
+            assertEquals(Optional.empty(), result.get(new 
TopicIdPartition(tpId, tp.partition(), tp.topic())));
+            // Fetch from consumer2 - should get the released record at offset 
0. Accept the record after fetching.
+            validateExpectedRecordsInEachPollAndAcknowledge(shareConsumer2, 0, 
1, 1, AcknowledgeType.ACCEPT);
+            // Now fetch the last record at offset 1 from consumer2 in its 
final delivery attempt.
+            validateExpectedRecordsInEachPollAndAcknowledge(shareConsumer2, 1, 
2, 1, AcknowledgeType.ACCEPT);
+
+            // Next poll from consumer1 should not have any records as all 
records have reached delivery limit.
+            records = shareConsumer1.poll(Duration.ofMillis(2500L));
+            assertTrue(records.isEmpty(), "Records should be empty as all 
records have reached delivery limit. But received: " + records.count());
+        }
+    }
+
     /**
      * Util class to encapsulate state for a consumer/producer
      * being executed by an {@link ExecutorService}.
@@ -3922,6 +4272,35 @@ public class ShareConsumerTest {
         }
     }
 
+    private void validateExpectedRecordsInEachPollAndRelease(
+        ShareConsumer<byte[], byte[]> shareConsumer,
+        int startOffset,
+        int lastOffset,
+        int expectedRecordsInEachPoll
+    ) {
+        validateExpectedRecordsInEachPollAndAcknowledge(shareConsumer, 
startOffset, lastOffset, expectedRecordsInEachPoll, AcknowledgeType.RELEASE);
+    }
+
+    private void validateExpectedRecordsInEachPollAndAcknowledge(
+        ShareConsumer<byte[], byte[]> shareConsumer,
+        int startOffset,
+        int lastOffset,
+        int expectedRecordsInEachPoll,
+        AcknowledgeType acknowledgeType
+    ) {
+        for (int i = startOffset; i < lastOffset; i = i + 
expectedRecordsInEachPoll) {
+            ConsumerRecords<byte[], byte[]> records = 
waitedPoll(shareConsumer, 2500L, expectedRecordsInEachPoll);
+            assertEquals(expectedRecordsInEachPoll, records.count());
+            // Verify the first offset of the fetched records.
+            assertEquals(i, records.iterator().next().offset());
+
+            records.forEach(record -> shareConsumer.acknowledge(record, 
acknowledgeType));
+            Map<TopicIdPartition, Optional<KafkaException>> result = 
shareConsumer.commitSync();
+            assertEquals(1, result.size());
+            assertEquals(Optional.empty(), result.get(new 
TopicIdPartition(tpId, tp.partition(), tp.topic())));
+        }
+    }
+
     private void waitForAssignment(String groupId, List<TopicPartition> tps) {
         try {
             waitForCondition(() -> {
diff --git a/core/src/main/java/kafka/server/share/SharePartition.java 
b/core/src/main/java/kafka/server/share/SharePartition.java
index cb268e81e93..6f7041b92c0 100644
--- a/core/src/main/java/kafka/server/share/SharePartition.java
+++ b/core/src/main/java/kafka/server/share/SharePartition.java
@@ -1930,7 +1930,14 @@ public class SharePartition {
                 //   - maxDeliveryCount = 6, throttleRecordsDeliveryLimit = 3, 
batch size = 500
                 //   - deliveryCount = 3: maxFetchRecords = 500 >> (3 - 3 + 1) 
= 250
                 //   - deliveryCount = 4: maxFetchRecords = 500 >> (4 - 3 + 1) 
= 125
-                // The `maxFetchRecordsWhileThrottledRecords` is calculated 
based on the first acquirable record that meets the throttling criteria in the 
batch.
+                // The `maxFetchRecordsWhileThrottledRecords` is calculated 
based on the first acquirable
+                // record that meets the throttling criteria in the batch. 
Generally, when complete
+                // batch is erroring out and increases delivery count then the 
front offset can have
+                // higher delivery count then the later offsets, for example 
for a batch of 500 records
+                // and delivery limit of 10 the division will happen as 250, 
125, 62, 31, 1 which means
+                // offset 124 will be at higher delivery count than offset 
125. The calculation below
+                // deliver 124 once alone and then proceed with 125 onwards. 
The code ensures that
+                // the records at higher delivery count are isolated first.
                 if (recordDeliveryCount >= throttleRecordsDeliveryLimit && 
maxFetchRecordsWhileThrottledRecords < 0) {
                     maxFetchRecordsWhileThrottledRecords = Math.max(1, (long) 
inFlightBatch.offsetState().size() >> (recordDeliveryCount - 
throttleRecordsDeliveryLimit + 1));
                     hasThrottledRecord = true;
@@ -2006,7 +2013,7 @@ public class SharePartition {
      *
      * @param inFlightBatch       The in-flight batch to check for throttling.
      * @param requestFirstOffset  The first offset to acquire.
-     * @param requestLastOffset   THe last offset to acquire.
+     * @param requestLastOffset   The last offset to acquire.
      * @return True if the batch should be throttled (delivery count >= 
threshold), false otherwise.
      */
     private boolean shouldThrottleRecordsDelivery(InFlightBatch inFlightBatch, 
long requestFirstOffset, long requestLastOffset) {

Reply via email to