This is an automated email from the ASF dual-hosted git repository.
mittal pushed a commit to branch 4.2
in repository https://gitbox.apache.org/repos/asf/kafka.git
The following commit(s) were added to refs/heads/4.2 by this push:
new 6ef617e93e4 KAFKA-19903: Integration tests for share group throttled
delivery (#20953) (#20971)
6ef617e93e4 is described below
commit 6ef617e93e4e82aae311886bffad45703499580e
Author: Apoorv Mittal <[email protected]>
AuthorDate: Mon Nov 24 12:58:02 2025 +0000
KAFKA-19903: Integration tests for share group throttled delivery (#20953)
(#20971)
Integartion tests for share group throttled delivery when delivery count
is reaching the max delivery limit.
The changes were introduced in the PR:
https://github.com/apache/kafka/pull/20837
Reviewers: Andrew Schofield <[email protected]>, Lan Ding
<[email protected]>
---
.../kafka/clients/consumer/ShareConsumerTest.java | 379 +++++++++++++++++++++
.../java/kafka/server/share/SharePartition.java | 11 +-
2 files changed, 388 insertions(+), 2 deletions(-)
diff --git
a/clients/clients-integration-tests/src/test/java/org/apache/kafka/clients/consumer/ShareConsumerTest.java
b/clients/clients-integration-tests/src/test/java/org/apache/kafka/clients/consumer/ShareConsumerTest.java
index 5ce121fc602..435735467d1 100644
---
a/clients/clients-integration-tests/src/test/java/org/apache/kafka/clients/consumer/ShareConsumerTest.java
+++
b/clients/clients-integration-tests/src/test/java/org/apache/kafka/clients/consumer/ShareConsumerTest.java
@@ -3435,6 +3435,356 @@ public class ShareConsumerTest {
}
}
+ @ClusterTest
+ public void testFetchWithThrottledDelivery() {
+ alterShareAutoOffsetReset("group1", "earliest");
+ try (Producer<byte[], byte[]> producer = createProducer();
+ ShareConsumer<byte[], byte[]> shareConsumer = createShareConsumer(
+ "group1",
+ Map.of(ConsumerConfig.SHARE_ACKNOWLEDGEMENT_MODE_CONFIG,
EXPLICIT))
+ ) {
+ // Produce a batch of 100 messages
+ for (int i = 0; i < 100; i++) {
+ ProducerRecord<byte[], byte[]> record = new
ProducerRecord<>(tp.topic(), tp.partition(), null, "key".getBytes(), ("Message
" + i).getBytes());
+ producer.send(record);
+ }
+ producer.flush();
+
+ shareConsumer.subscribe(List.of(tp.topic()));
+ // Fetch records in 5 iterations, each time acknowledging with
RELEASE. 5 is the default
+ // delivery limit hence we should see throttling from
Math.ceil(5/2) = 3 fetches.
+ int throttleDeliveryLimit = 3;
+ for (int i = 0; i < 5; i++) {
+ // Adjust expected fetch count based on throttling. If i <
throttleDeliveryLimit, we get full batch of 100.
+ // If i == 4 i.e. the last delivery, then we get 1 record.
+ // Otherwise, we get half the previous fetch count due to
throttling. In this case, 100 >> (i - throttleDeliveryLimit + 1) it is 50 for
i=3.
+ int expectedFetchCount = (i < throttleDeliveryLimit) ? 100 :
((i == 4) ? 1 : 50);
+ ConsumerRecords<byte[], byte[]> records =
waitedPoll(shareConsumer, 2500L, expectedFetchCount);
+ assertEquals(expectedFetchCount, records.count());
+
+ records.forEach(record -> shareConsumer.acknowledge(record,
AcknowledgeType.RELEASE));
+ Map<TopicIdPartition, Optional<KafkaException>> result =
shareConsumer.commitSync();
+ assertEquals(1, result.size());
+ assertEquals(Optional.empty(),
+ result.get(new TopicIdPartition(tpId, tp.partition(),
tp.topic())));
+ }
+
+ // Offset 0 has already reached the delivery limit hence shall be
archived.
+ // Offset 1 to 49 shall be in last delivery attempt and hence 1
record per poll.
+ validateExpectedRecordsInEachPollAndRelease(shareConsumer, 1, 50,
1);
+ // Delivery limit 4.
+ validateExpectedRecordsInEachPollAndRelease(shareConsumer, 50,
100, 50);
+ // Delivery limit 5.
+ validateExpectedRecordsInEachPollAndRelease(shareConsumer, 50,
100, 1);
+ // Next poll should not have any records as all records have
reached delivery limit.
+ ConsumerRecords<byte[], byte[]> records =
shareConsumer.poll(Duration.ofMillis(2500L));
+ assertTrue(records.isEmpty(), "Records should be empty as all
records have reached delivery limit. But received: " + records.count());
+ }
+ }
+
+ @ClusterTest(
+ serverProperties = {
+ @ClusterConfigProperty(key = "group.share.delivery.count.limit",
value = "10"),
+ }
+ )
+ public void
testFetchWithThrottledDeliveryBatchesWithIncreasedDeliveryLimit() {
+ alterShareAutoOffsetReset("group1", "earliest");
+ try (Producer<byte[], byte[]> producer = createProducer();
+ ShareConsumer<byte[], byte[]> shareConsumer = createShareConsumer(
+ "group1",
+ Map.of(ConsumerConfig.SHARE_ACKNOWLEDGEMENT_MODE_CONFIG,
EXPLICIT))
+ ) {
+ // Produce records in complete power of 2 to fully test the
throttling behavior.
+ int producedMessageCount = 512;
+ // Produce a batch of 512 messages
+ for (int i = 0; i < producedMessageCount; i++) {
+ ProducerRecord<byte[], byte[]> record = new
ProducerRecord<>(tp.topic(), tp.partition(),
+ null, "key".getBytes(), ("Message " + i).getBytes());
+ producer.send(record);
+ }
+ producer.flush();
+
+ // Map which defines expected fetch count for each delivery
attempt from 1 to 10.
+ Map<Integer, Integer> expectedFetchCountMap = Map.of(1, 512, 2,
512, 3, 512, 4, 512, 5, 512,
+ 6, 256, 7, 128, 8, 64, 9, 32, 10, 1);
+ shareConsumer.subscribe(List.of(tp.topic()));
+ // Fetch records in 10 iterations, each time acknowledging with
RELEASE. 10 is the
+ // delivery limit hence we should see throttling from
Math.ceil(10/2) = 5 fetches.
+ for (int i = 0; i < 10; i++) {
+ int expectedFetchCount = expectedFetchCountMap.get(i + 1);
+ ConsumerRecords<byte[], byte[]> records =
waitedPoll(shareConsumer, 2500L, expectedFetchCount);
+ assertEquals(expectedFetchCount, records.count());
+ // Acknowledge all records with RELEASE.
+ records.forEach(record -> shareConsumer.acknowledge(record,
AcknowledgeType.RELEASE));
+ Map<TopicIdPartition, Optional<KafkaException>> result =
shareConsumer.commitSync();
+ assertEquals(1, result.size());
+ assertEquals(Optional.empty(), result.get(new
TopicIdPartition(tpId, tp.partition(), tp.topic())));
+ }
+
+ // Offset 0 is already verified above, so start from offset 1 and
as it's last delivery cycle
+ // hence expectedRecords is 1 for each poll till offset 32.
+ // Delivery 10
+ validateExpectedRecordsInEachPollAndRelease(shareConsumer, 1, 32,
1);
+ // Delivery 9
+ validateExpectedRecordsInEachPollAndRelease(shareConsumer, 32, 64,
32);
+ // Delivery 10
+ validateExpectedRecordsInEachPollAndRelease(shareConsumer, 32, 64,
1);
+ // Delivery 8
+ validateExpectedRecordsInEachPollAndRelease(shareConsumer, 64,
128, 64);
+ // Delivery 9
+ validateExpectedRecordsInEachPollAndRelease(shareConsumer, 64, 96,
32);
+ // Delivery 10
+ validateExpectedRecordsInEachPollAndRelease(shareConsumer, 64, 96,
1);
+ // Delivery 9
+ validateExpectedRecordsInEachPollAndRelease(shareConsumer, 96,
128, 32);
+ // Delivery 10
+ validateExpectedRecordsInEachPollAndRelease(shareConsumer, 96,
128, 1);
+ // Delivery 7
+ validateExpectedRecordsInEachPollAndRelease(shareConsumer, 128,
256, 128);
+ // Delivery 8
+ validateExpectedRecordsInEachPollAndRelease(shareConsumer, 128,
192, 64);
+ // Delivery 9
+ validateExpectedRecordsInEachPollAndRelease(shareConsumer, 128,
160, 32);
+ // Delivery 10
+ validateExpectedRecordsInEachPollAndRelease(shareConsumer, 128,
160, 1);
+ // Delivery 9
+ validateExpectedRecordsInEachPollAndRelease(shareConsumer, 160,
192, 32);
+ // Delivery 10
+ validateExpectedRecordsInEachPollAndRelease(shareConsumer, 160,
192, 1);
+ // Delivery 8
+ validateExpectedRecordsInEachPollAndRelease(shareConsumer, 192,
256, 64);
+ // Delivery 9
+ validateExpectedRecordsInEachPollAndRelease(shareConsumer, 192,
224, 32);
+ // Delivery 10
+ validateExpectedRecordsInEachPollAndRelease(shareConsumer, 192,
224, 1);
+ // Delivery 9
+ validateExpectedRecordsInEachPollAndRelease(shareConsumer, 224,
256, 32);
+ // Delivery 10
+ validateExpectedRecordsInEachPollAndRelease(shareConsumer, 224,
256, 1);
+ // Delivery 6
+ validateExpectedRecordsInEachPollAndRelease(shareConsumer, 256,
512, 256);
+ // Delivery 7
+ validateExpectedRecordsInEachPollAndRelease(shareConsumer, 256,
384, 128);
+ // Delivery 8
+ validateExpectedRecordsInEachPollAndRelease(shareConsumer, 256,
320, 64);
+ // Delivery 9
+ validateExpectedRecordsInEachPollAndRelease(shareConsumer, 256,
288, 32);
+ // Delivery 10
+ validateExpectedRecordsInEachPollAndRelease(shareConsumer, 256,
288, 1);
+ // Delivery 9
+ validateExpectedRecordsInEachPollAndRelease(shareConsumer, 288,
320, 32);
+ // Delivery 10
+ validateExpectedRecordsInEachPollAndRelease(shareConsumer, 288,
320, 1);
+ // Delivery 8
+ validateExpectedRecordsInEachPollAndRelease(shareConsumer, 320,
384, 64);
+ // Delivery 9
+ validateExpectedRecordsInEachPollAndRelease(shareConsumer, 320,
352, 32);
+ // Delivery 10
+ validateExpectedRecordsInEachPollAndRelease(shareConsumer, 320,
352, 1);
+ // Delivery 9
+ validateExpectedRecordsInEachPollAndRelease(shareConsumer, 352,
384, 32);
+ // Delivery 10
+ validateExpectedRecordsInEachPollAndRelease(shareConsumer, 352,
384, 1);
+ // Delivery 7
+ validateExpectedRecordsInEachPollAndRelease(shareConsumer, 384,
512, 128);
+ // Delivery 8
+ validateExpectedRecordsInEachPollAndRelease(shareConsumer, 384,
448, 64);
+ // Delivery 9
+ validateExpectedRecordsInEachPollAndRelease(shareConsumer, 384,
416, 32);
+ // Delivery 10
+ validateExpectedRecordsInEachPollAndRelease(shareConsumer, 384,
416, 1);
+ // Delivery 9
+ validateExpectedRecordsInEachPollAndRelease(shareConsumer, 416,
448, 32);
+ // Delivery 10
+ validateExpectedRecordsInEachPollAndRelease(shareConsumer, 416,
448, 1);
+ // Delivery 8
+ validateExpectedRecordsInEachPollAndRelease(shareConsumer, 448,
512, 64);
+ // Delivery 9
+ validateExpectedRecordsInEachPollAndRelease(shareConsumer, 448,
480, 32);
+ // Delivery 10
+ validateExpectedRecordsInEachPollAndRelease(shareConsumer, 448,
480, 1);
+ // Delivery 9
+ validateExpectedRecordsInEachPollAndRelease(shareConsumer, 480,
512, 32);
+ // Delivery 10
+ validateExpectedRecordsInEachPollAndRelease(shareConsumer, 480,
512, 1);
+ // Next poll should not have any records as all records have
reached delivery limit.
+ ConsumerRecords<byte[], byte[]> records =
shareConsumer.poll(Duration.ofMillis(2500L));
+ assertTrue(records.isEmpty(), "Records should be empty as all
records have reached delivery limit. But received: " + records.count());
+ }
+ }
+
+ @ClusterTest(
+ serverProperties = {
+ @ClusterConfigProperty(key = "group.share.delivery.count.limit",
value = "10"),
+ }
+ )
+ public void testFetchWithThrottledDeliveryValidateDeliveryCount() throws
InterruptedException {
+ alterShareAutoOffsetReset("group1", "earliest");
+ try (Producer<byte[], byte[]> producer = createProducer();
+ ShareConsumer<byte[], byte[]> shareConsumer = createShareConsumer(
+ "group1",
+ Map.of(ConsumerConfig.SHARE_ACKNOWLEDGEMENT_MODE_CONFIG,
EXPLICIT))
+ ) {
+ int producedMessageCount = 500;
+ // Produce a batch of 500 messages
+ for (int i = 0; i < producedMessageCount; i++) {
+ ProducerRecord<byte[], byte[]> record = new
ProducerRecord<>(tp.topic(), tp.partition(),
+ null, "key".getBytes(), ("Message " + i).getBytes());
+ producer.send(record);
+ }
+ producer.flush();
+
+ // Map to track delivery count for each offset.
+ Map<Long, Integer> offsetToDeliveryCountMap = new HashMap<>();
+ // Map which defines expected fetch count for each delivery
attempt from 1 to 10.
+ Map<Integer, Integer> expectedFetchCountMap = Map.of(1, 500, 2,
500, 3, 500, 4, 500, 5, 500,
+ 6, 250, 7, 125, 8, 62, 9, 31, 10, 1);
+ shareConsumer.subscribe(List.of(tp.topic()));
+ // Fetch records in 10 iterations, each time acknowledging with
RELEASE. 10 is the
+ // delivery limit hence we should see throttling from
Math.ceil(10/2) = 5 fetches.
+ for (int i = 0; i < 10; i++) {
+ int expectedFetchCount = expectedFetchCountMap.get(i + 1);
+ ConsumerRecords<byte[], byte[]> records =
waitedPoll(shareConsumer, 2500L, expectedFetchCount);
+ assertEquals(expectedFetchCount, records.count());
+ // Update delivery count for each offset.
+ records.forEach(record -> {
+ if
(!offsetToDeliveryCountMap.containsKey(record.offset())) {
+ offsetToDeliveryCountMap.put(record.offset(), 1);
+ } else {
+ offsetToDeliveryCountMap.put(record.offset(),
offsetToDeliveryCountMap.get(record.offset()) + 1);
+ }
+ });
+ // Acknowledge with RELEASE.
+ records.forEach(record -> shareConsumer.acknowledge(record,
AcknowledgeType.RELEASE));
+ Map<TopicIdPartition, Optional<KafkaException>> result =
shareConsumer.commitSync();
+ assertEquals(1, result.size());
+ assertEquals(Optional.empty(), result.get(new
TopicIdPartition(tpId, tp.partition(), tp.topic())));
+ }
+
+ // Validate every offset is delivered at most till delivery limit.
+ waitForCondition(() -> {
+ ConsumerRecords<byte[], byte[]> records =
shareConsumer.poll(Duration.ofMillis(2500L));
+ if (!records.isEmpty()) {
+ records.forEach(record -> {
+ if
(!offsetToDeliveryCountMap.containsKey(record.offset())) {
+ offsetToDeliveryCountMap.put(record.offset(), 1);
+ } else {
+ offsetToDeliveryCountMap.put(record.offset(),
offsetToDeliveryCountMap.get(record.offset()) + 1);
+ }
+ });
+ records.forEach(record ->
shareConsumer.acknowledge(record, AcknowledgeType.RELEASE));
+ Map<TopicIdPartition, Optional<KafkaException>> result =
shareConsumer.commitSync();
+ assertEquals(1, result.size());
+ assertEquals(Optional.empty(),
+ result.get(new TopicIdPartition(tpId, tp.partition(),
tp.topic())));
+ }
+ return offsetToDeliveryCountMap.size() == 500 &&
+
offsetToDeliveryCountMap.values().stream().allMatch(deliveryCount ->
deliveryCount == 10);
+ },
+ 120000L, // 120 seconds.
+ 50L,
+ () -> "failed to get records till delivery limit"
+ );
+
+ // Next poll should not have any records as all records have
reached delivery limit.
+ ConsumerRecords<byte[], byte[]> records =
shareConsumer.poll(Duration.ofMillis(2500L));
+ assertTrue(records.isEmpty(), "Records should be empty as all
records have reached delivery limit. But received: " + records.count());
+ }
+ }
+
+ @ClusterTest(
+ serverProperties = {
+ @ClusterConfigProperty(key = "group.share.delivery.count.limit",
value = "2"),
+ }
+ )
+ public void
testFetchWithThrottledDeliveryBatchesWithDecreasedDeliveryLimit() {
+ alterShareAutoOffsetReset("group1", "earliest");
+ try (Producer<byte[], byte[]> producer = createProducer();
+ ShareConsumer<byte[], byte[]> shareConsumer = createShareConsumer(
+ "group1",
+ Map.of(
+ ConsumerConfig.SHARE_ACKNOWLEDGEMENT_MODE_CONFIG, EXPLICIT,
+ ConsumerConfig.MAX_POLL_RECORDS_CONFIG, 512
+ ))
+ ) {
+ // Produce records in complete power of 2 to fully test the
throttling behavior.
+ int producedMessageCount = 512;
+ // Produce a batch of 512 messages
+ for (int i = 0; i < producedMessageCount; i++) {
+ ProducerRecord<byte[], byte[]> record = new
ProducerRecord<>(tp.topic(), tp.partition(),
+ null, "key".getBytes(), ("Message " + i).getBytes());
+ producer.send(record);
+ }
+ producer.flush();
+
+ shareConsumer.subscribe(List.of(tp.topic()));
+ // Fetch records in 2 iterations, each time acknowledging with
RELEASE. As throttling
+ // currently applies for delivery limit > 2, hence we should get
full batch in both fetches.
+ validateExpectedRecordsInEachPollAndRelease(shareConsumer, 0, 512,
512);
+ validateExpectedRecordsInEachPollAndRelease(shareConsumer, 0, 512,
512);
+ // Next poll should not have any records as all records have
reached delivery limit.
+ ConsumerRecords<byte[], byte[]> records =
shareConsumer.poll(Duration.ofMillis(2500L));
+ assertTrue(records.isEmpty(), "Records should be empty as all
records have reached delivery limit. But received: " + records.count());
+ }
+ }
+
+ @ClusterTest
+ public void testFetchWithThrottledDeliveryBatchesMultipleConsumers() {
+ alterShareAutoOffsetReset("group1", "earliest");
+ try (Producer<byte[], byte[]> producer = createProducer();
+ ShareConsumer<byte[], byte[]> shareConsumer1 = createShareConsumer(
+ "group1",
+ Map.of(
+ ConsumerConfig.SHARE_ACKNOWLEDGEMENT_MODE_CONFIG, EXPLICIT,
+ ConsumerConfig.MAX_POLL_RECORDS_CONFIG, 1
+ ));
+ ShareConsumer<byte[], byte[]> shareConsumer2 = createShareConsumer(
+ "group1",
+ Map.of(
+ ConsumerConfig.SHARE_ACKNOWLEDGEMENT_MODE_CONFIG, EXPLICIT,
+ ConsumerConfig.MAX_POLL_RECORDS_CONFIG, 2
+ ))
+ ) {
+ // Produce 2 records in separate batches.
+ for (int i = 0; i < 2; i++) {
+ ProducerRecord<byte[], byte[]> record = new
ProducerRecord<>(tp.topic(), tp.partition(),
+ null, "key".getBytes(), ("Message " + i).getBytes());
+ producer.send(record);
+ // Flush immediately to create 2 different batches.
+ producer.flush();
+ }
+
+ shareConsumer1.subscribe(List.of(tp.topic()));
+ shareConsumer2.subscribe(List.of(tp.topic()));
+ // Fetch from consumer1 - should get 1 record as
max.poll.records=1.
+ ConsumerRecords<byte[], byte[]> records =
waitedPoll(shareConsumer1, 2500L, 1);
+ assertEquals(1, records.count());
+ // Verify the first offset of the fetched records.
+ assertEquals(0, records.iterator().next().offset());
+ // Fetch from consumer2 - should get 1 record as offset 0 is
Acquired by consumer1.
+ // Release the record from consumer2 after fetching until the last
delivery attempt.
+ validateExpectedRecordsInEachPollAndRelease(shareConsumer2, 1, 2,
1);
+ validateExpectedRecordsInEachPollAndRelease(shareConsumer2, 1, 2,
1);
+ validateExpectedRecordsInEachPollAndRelease(shareConsumer2, 1, 2,
1);
+ validateExpectedRecordsInEachPollAndRelease(shareConsumer2, 1, 2,
1);
+
+ // Now release the record from consumer1. Fetch again from
consumer2 to verify it gets the released record.
+ // And should only get 1 record at offset 0 as offset 1 record is
in final delivery attempt.
+ records.forEach(record -> shareConsumer1.acknowledge(record,
AcknowledgeType.RELEASE));
+ Map<TopicIdPartition, Optional<KafkaException>> result =
shareConsumer1.commitSync();
+ assertEquals(1, result.size());
+ assertEquals(Optional.empty(), result.get(new
TopicIdPartition(tpId, tp.partition(), tp.topic())));
+ // Fetch from consumer2 - should get the released record at offset
0. Accept the record after fetching.
+ validateExpectedRecordsInEachPollAndAcknowledge(shareConsumer2, 0,
1, 1, AcknowledgeType.ACCEPT);
+ // Now fetch the last record at offset 1 from consumer2 in its
final delivery attempt.
+ validateExpectedRecordsInEachPollAndAcknowledge(shareConsumer2, 1,
2, 1, AcknowledgeType.ACCEPT);
+
+ // Next poll from consumer1 should not have any records as all
records have reached delivery limit.
+ records = shareConsumer1.poll(Duration.ofMillis(2500L));
+ assertTrue(records.isEmpty(), "Records should be empty as all
records have reached delivery limit. But received: " + records.count());
+ }
+ }
+
/**
* Util class to encapsulate state for a consumer/producer
* being executed by an {@link ExecutorService}.
@@ -3922,6 +4272,35 @@ public class ShareConsumerTest {
}
}
+ private void validateExpectedRecordsInEachPollAndRelease(
+ ShareConsumer<byte[], byte[]> shareConsumer,
+ int startOffset,
+ int lastOffset,
+ int expectedRecordsInEachPoll
+ ) {
+ validateExpectedRecordsInEachPollAndAcknowledge(shareConsumer,
startOffset, lastOffset, expectedRecordsInEachPoll, AcknowledgeType.RELEASE);
+ }
+
+ private void validateExpectedRecordsInEachPollAndAcknowledge(
+ ShareConsumer<byte[], byte[]> shareConsumer,
+ int startOffset,
+ int lastOffset,
+ int expectedRecordsInEachPoll,
+ AcknowledgeType acknowledgeType
+ ) {
+ for (int i = startOffset; i < lastOffset; i = i +
expectedRecordsInEachPoll) {
+ ConsumerRecords<byte[], byte[]> records =
waitedPoll(shareConsumer, 2500L, expectedRecordsInEachPoll);
+ assertEquals(expectedRecordsInEachPoll, records.count());
+ // Verify the first offset of the fetched records.
+ assertEquals(i, records.iterator().next().offset());
+
+ records.forEach(record -> shareConsumer.acknowledge(record,
acknowledgeType));
+ Map<TopicIdPartition, Optional<KafkaException>> result =
shareConsumer.commitSync();
+ assertEquals(1, result.size());
+ assertEquals(Optional.empty(), result.get(new
TopicIdPartition(tpId, tp.partition(), tp.topic())));
+ }
+ }
+
private void waitForAssignment(String groupId, List<TopicPartition> tps) {
try {
waitForCondition(() -> {
diff --git a/core/src/main/java/kafka/server/share/SharePartition.java
b/core/src/main/java/kafka/server/share/SharePartition.java
index cb268e81e93..6f7041b92c0 100644
--- a/core/src/main/java/kafka/server/share/SharePartition.java
+++ b/core/src/main/java/kafka/server/share/SharePartition.java
@@ -1930,7 +1930,14 @@ public class SharePartition {
// - maxDeliveryCount = 6, throttleRecordsDeliveryLimit = 3,
batch size = 500
// - deliveryCount = 3: maxFetchRecords = 500 >> (3 - 3 + 1)
= 250
// - deliveryCount = 4: maxFetchRecords = 500 >> (4 - 3 + 1)
= 125
- // The `maxFetchRecordsWhileThrottledRecords` is calculated
based on the first acquirable record that meets the throttling criteria in the
batch.
+ // The `maxFetchRecordsWhileThrottledRecords` is calculated
based on the first acquirable
+ // record that meets the throttling criteria in the batch.
Generally, when complete
+ // batch is erroring out and increases delivery count then the
front offset can have
+ // higher delivery count then the later offsets, for example
for a batch of 500 records
+ // and delivery limit of 10 the division will happen as 250,
125, 62, 31, 1 which means
+ // offset 124 will be at higher delivery count than offset
125. The calculation below
+ // deliver 124 once alone and then proceed with 125 onwards.
The code ensures that
+ // the records at higher delivery count are isolated first.
if (recordDeliveryCount >= throttleRecordsDeliveryLimit &&
maxFetchRecordsWhileThrottledRecords < 0) {
maxFetchRecordsWhileThrottledRecords = Math.max(1, (long)
inFlightBatch.offsetState().size() >> (recordDeliveryCount -
throttleRecordsDeliveryLimit + 1));
hasThrottledRecord = true;
@@ -2006,7 +2013,7 @@ public class SharePartition {
*
* @param inFlightBatch The in-flight batch to check for throttling.
* @param requestFirstOffset The first offset to acquire.
- * @param requestLastOffset THe last offset to acquire.
+ * @param requestLastOffset The last offset to acquire.
* @return True if the batch should be throttled (delivery count >=
threshold), false otherwise.
*/
private boolean shouldThrottleRecordsDelivery(InFlightBatch inFlightBatch,
long requestFirstOffset, long requestLastOffset) {