This is an automated email from the ASF dual-hosted git repository.
mittal pushed a commit to branch trunk
in repository https://gitbox.apache.org/repos/asf/kafka.git
The following commit(s) were added to refs/heads/trunk by this push:
new bcd3191c792 KAFKA-17541:[2/2] Improve handling of delivery count
(#20837)
bcd3191c792 is described below
commit bcd3191c792b90e1d17a0c3094ef7fe6b8655c53
Author: Lan Ding <[email protected]>
AuthorDate: Fri Nov 21 19:31:48 2025 +0800
KAFKA-17541:[2/2] Improve handling of delivery count (#20837)
For records with a delivery count exceeding 2, there is suspicion that
delivery failures stem from underlying issues rather than natural
retry scenarios. The batching of such records should be reduced.
Solution: Determining which offset is bad is not possible at broker's
end. But broker can restrict the acquired records to a subset so only
bad record is skipped. We can do the following:
- If delivery count of a batch is >= 3 then only acquire 1/2 of the
batch records i.e for a batch of 0-499 (500 records) if batch delivery
count is 3 then start offset tracking and acquire 0-249 (250 records)
- If delivery count is again bumped then keeping acquring 1/2 of
previously acquired offsets until last delivery attempt i.e. 0-124 (125
records)
- For last delivery attempt, acquire only 1 offset. Then only the bad
record will be skipped.
Reviewers: Apoorv Mittal <[email protected]>, Andrew Schofield
<[email protected]>, Abhinav Dixit <[email protected]>
---------
Co-authored-by: d00791190 <[email protected]>
---
.../java/kafka/server/share/SharePartition.java | 90 ++++-
.../kafka/server/share/SharePartitionTest.java | 443 ++++++++++++++++++++-
2 files changed, 509 insertions(+), 24 deletions(-)
diff --git a/core/src/main/java/kafka/server/share/SharePartition.java
b/core/src/main/java/kafka/server/share/SharePartition.java
index 73b6a863aa3..cb268e81e93 100644
--- a/core/src/main/java/kafka/server/share/SharePartition.java
+++ b/core/src/main/java/kafka/server/share/SharePartition.java
@@ -106,6 +106,11 @@ public class SharePartition {
private static final Logger log =
LoggerFactory.getLogger(SharePartition.class);
+ /**
+ * Minimum number of records to deliver when throttling
+ */
+ private static final int MINIMUM_THROTTLE_RECORDS_DELIVERY_LIMIT = 2;
+
/**
* empty member id used to indicate when a record is not acquired by any
member.
*/
@@ -201,6 +206,11 @@ public class SharePartition {
*/
private final int maxDeliveryCount;
+ /**
+ * Records whose delivery count exceeds this are deemed abnormal and the
batching of these records
+ * should be reduced. The limit is set to half of maxDeliveryCount rounded
up, with a minimum of 2.
+ */
+ private final int throttleRecordsDeliveryLimit;
/**
* The group config manager is used to retrieve the values for dynamic
group configurations
*/
@@ -362,6 +372,7 @@ public class SharePartition {
this.leaderEpoch = leaderEpoch;
this.maxInFlightRecords = maxInFlightRecords;
this.maxDeliveryCount = maxDeliveryCount;
+ this.throttleRecordsDeliveryLimit =
Math.max(MINIMUM_THROTTLE_RECORDS_DELIVERY_LIMIT, (int) Math.ceil((double)
maxDeliveryCount / 2));
this.cachedState = new ConcurrentSkipListMap<>();
this.lock = new ReentrantReadWriteLock();
this.findNextFetchOffset = false;
@@ -834,7 +845,16 @@ public class SharePartition {
boolean fullMatch = checkForFullMatch(inFlightBatch,
firstBatch.baseOffset(), lastOffsetToAcquire);
int numRecordsRemaining = maxRecordsToAcquire - acquiredCount;
boolean recordLimitSubsetMatch = isRecordLimitMode &&
checkForRecordLimitSubsetMatch(inFlightBatch, maxRecordsToAcquire,
acquiredCount);
- if (!fullMatch || inFlightBatch.offsetState() != null ||
recordLimitSubsetMatch) {
+ boolean throttleRecordsDelivery =
shouldThrottleRecordsDelivery(inFlightBatch, firstBatch.baseOffset(),
lastOffsetToAcquire);
+ // Stop acquiring more records if records delivery has to be
throttled. The throttling prevents
+ // complete batch to be archived in case of a single record
being corrupt.
+ // Below check isolates the current batch/offsets to be
delivered individually in subsequent fetches.
+ if (throttleRecordsDelivery && acquiredCount > 0) {
+ // Set the max records to acquire as 0 to prevent further
acquisition of records.
+ maxRecordsToAcquire = 0;
+ break;
+ }
+ if (!fullMatch || inFlightBatch.offsetState() != null ||
recordLimitSubsetMatch || throttleRecordsDelivery) {
log.trace("Subset or offset tracked batch record found for
share partition,"
+ " batch: {} request offsets - first: {}, last:
{} for the share"
+ " partition: {}-{}", inFlightBatch,
firstBatch.baseOffset(),
@@ -859,7 +879,14 @@ public class SharePartition {
// maxRecordsToAcquire. Hence, pass the remaining number
of records that can
// be acquired.
int acquiredSubsetCount =
acquireSubsetBatchRecords(memberId, isRecordLimitMode, numRecordsRemaining,
firstBatch.baseOffset(), lastOffsetToAcquire, inFlightBatch, result);
+
acquiredCount += acquiredSubsetCount;
+ // If records are throttled, return immediately and set
`maxRecordsToAcquire = 0`
+ // to prevent acquiring any new records afterwards.
+ if (throttleRecordsDelivery && acquiredSubsetCount > 0) {
+ maxRecordsToAcquire = 0;
+ break;
+ }
continue;
}
@@ -1866,6 +1893,8 @@ public class SharePartition {
) {
lock.writeLock().lock();
int acquiredCount = 0;
+ long maxFetchRecordsWhileThrottledRecords = -1;
+ boolean hasThrottledRecord = false;
try {
for (Map.Entry<Long, InFlightState> offsetState :
inFlightBatch.offsetState().entrySet()) {
// For the first batch which might have offsets prior to the
request base
@@ -1885,7 +1914,29 @@ public class SharePartition {
continue;
}
- InFlightState updateResult =
offsetState.getValue().tryUpdateState(RecordState.ACQUIRED,
DeliveryCountOps.INCREASE,
+ int recordDeliveryCount =
offsetState.getValue().deliveryCount();
+ // If the record is on last delivery attempt then isolate that
record to be delivered alone.
+ // If the respective record is corrupt then it prevents
increasing delivery count of multiple
+ // records in a single response batch. Condition below checks
if the current record has reached
+ // the delivery limit and already have some records to return
in response then skip processing
+ // the current record, which shall be delivered alone in next
fetch.
+ if (maxDeliveryCount > 2 && recordDeliveryCount ==
maxDeliveryCount - 1 && acquiredCount > 0) {
+ break;
+ }
+
+ // When record delivery count reach the throttle threshold,
progressively reduce batch size to isolate records.
+ // The `maxFetchRecordsWhileThrottledRecords` is halved with
each additional delivery attempt beyond the throttle limit.
+ // Example:
+ // - maxDeliveryCount = 6, throttleRecordsDeliveryLimit = 3,
batch size = 500
+ // - deliveryCount = 3: maxFetchRecords = 500 >> (3 - 3 + 1)
= 250
+ // - deliveryCount = 4: maxFetchRecords = 500 >> (4 - 3 + 1)
= 125
+ // The `maxFetchRecordsWhileThrottledRecords` is calculated
based on the first acquirable record that meets the throttling criteria in the
batch.
+ if (recordDeliveryCount >= throttleRecordsDeliveryLimit &&
maxFetchRecordsWhileThrottledRecords < 0) {
+ maxFetchRecordsWhileThrottledRecords = Math.max(1, (long)
inFlightBatch.offsetState().size() >> (recordDeliveryCount -
throttleRecordsDeliveryLimit + 1));
+ hasThrottledRecord = true;
+ }
+
+ InFlightState updateResult =
offsetState.getValue().tryUpdateState(RecordState.ACQUIRED,
DeliveryCountOps.INCREASE,
maxDeliveryCount, memberId);
if (updateResult == null || updateResult.state() !=
RecordState.ACQUIRED) {
log.trace("Unable to acquire records for the offset: {} in
batch: {}"
@@ -1904,10 +1955,18 @@ public class SharePartition {
.setLastOffset(offsetState.getKey())
.setDeliveryCount((short)
offsetState.getValue().deliveryCount()));
acquiredCount++;
+
+ // Delivered alone.
+ if (offsetState.getValue().deliveryCount() == maxDeliveryCount
&& maxDeliveryCount > 2) {
+ break;
+ }
if (isRecordLimitMode && acquiredCount == maxFetchRecords) {
// In record_limit mode, acquire only the requested number
of records.
break;
}
+ if (hasThrottledRecord && acquiredCount ==
maxFetchRecordsWhileThrottledRecords) {
+ break;
+ }
}
} finally {
lock.writeLock().unlock();
@@ -1942,6 +2001,33 @@ public class SharePartition {
return batchFirstOffset < localStartOffset && batchLastOffset >=
localStartOffset;
}
+ /**
+ * Check if the in-flight batch should be throttled based on delivery
count.
+ *
+ * @param inFlightBatch The in-flight batch to check for throttling.
+ * @param requestFirstOffset The first offset to acquire.
+ * @param requestLastOffset THe last offset to acquire.
+ * @return True if the batch should be throttled (delivery count >=
threshold), false otherwise.
+ */
+ private boolean shouldThrottleRecordsDelivery(InFlightBatch inFlightBatch,
long requestFirstOffset, long requestLastOffset) {
+ if (inFlightBatch.offsetState() == null) {
+ return inFlightBatch.batchDeliveryCount() >=
throttleRecordsDeliveryLimit;
+ }
+
+ return inFlightBatch.offsetState().entrySet().stream().filter(entry ->
{
+ if (entry.getKey() < requestFirstOffset) {
+ return false;
+ }
+ if (entry.getKey() > requestLastOffset) {
+ return false;
+ }
+ if (entry.getValue().state() != RecordState.AVAILABLE) {
+ return false;
+ }
+ return true;
+ }).mapToInt(entry -> entry.getValue().deliveryCount()).max().orElse(0)
>= throttleRecordsDeliveryLimit;
+ }
+
// Visibility for test
static Map<Long, Byte> fetchAckTypeMapForBatch(ShareAcknowledgementBatch
batch) {
// Client can either send a single entry in acknowledgeTypes which
represents the state
diff --git a/core/src/test/java/kafka/server/share/SharePartitionTest.java
b/core/src/test/java/kafka/server/share/SharePartitionTest.java
index 047fffa5171..c8be18c027e 100644
--- a/core/src/test/java/kafka/server/share/SharePartitionTest.java
+++ b/core/src/test/java/kafka/server/share/SharePartitionTest.java
@@ -96,6 +96,7 @@ import java.util.concurrent.CompletableFuture;
import java.util.concurrent.ExecutorService;
import java.util.concurrent.Executors;
import java.util.concurrent.TimeUnit;
+import java.util.stream.IntStream;
import static kafka.server.share.SharePartition.EMPTY_MEMBER_ID;
import static
org.apache.kafka.server.share.fetch.ShareFetchTestUtils.memoryRecordsBuilder;
@@ -1118,7 +1119,7 @@ public class SharePartitionTest {
List.of(
new PersisterStateBatch(15L, 18L,
RecordState.AVAILABLE.id, (short) 2),
new PersisterStateBatch(20L, 22L,
RecordState.ARCHIVED.id, (short) 2),
- new PersisterStateBatch(26L, 30L,
RecordState.AVAILABLE.id, (short) 3)))))));
+ new PersisterStateBatch(26L, 30L,
RecordState.AVAILABLE.id, (short) 1)))))));
Mockito.when(persister.readState(Mockito.any())).thenReturn(CompletableFuture.completedFuture(readShareGroupStateResult));
SharePartition sharePartition =
SharePartitionBuilder.builder().withPersister(persister).build();
@@ -1186,7 +1187,7 @@ public class SharePartitionTest {
List<AcquiredRecords> expectedAcquiredRecords = new
ArrayList<>(expectedAcquiredRecord(15, 18, 3));
expectedAcquiredRecords.addAll(expectedAcquiredRecord(19, 19, 1));
expectedAcquiredRecords.addAll(expectedAcquiredRecord(23, 25, 1));
- expectedAcquiredRecords.addAll(expectedAcquiredRecord(26, 30, 4));
+ expectedAcquiredRecords.addAll(expectedAcquiredRecord(26, 30, 2));
assertArrayEquals(expectedAcquiredRecords.toArray(),
acquiredRecordsList.toArray());
assertEquals(31, sharePartition.nextFetchOffset());
@@ -1241,7 +1242,7 @@ public class SharePartitionTest {
List.of(
new PersisterStateBatch(15L, 18L,
RecordState.AVAILABLE.id, (short) 2),
new PersisterStateBatch(20L, 22L,
RecordState.ARCHIVED.id, (short) 2),
- new PersisterStateBatch(26L, 30L,
RecordState.AVAILABLE.id, (short) 3)))))));
+ new PersisterStateBatch(26L, 30L,
RecordState.AVAILABLE.id, (short) 1)))))));
Mockito.when(persister.readState(Mockito.any())).thenReturn(CompletableFuture.completedFuture(readShareGroupStateResult));
SharePartition sharePartition =
SharePartitionBuilder.builder().withPersister(persister).build();
@@ -1284,7 +1285,7 @@ public class SharePartitionTest {
expectedAcquiredRecords.addAll(expectedAcquiredRecord(15, 18, 3));
expectedAcquiredRecords.addAll(expectedAcquiredRecord(19, 19, 1));
expectedAcquiredRecords.addAll(expectedAcquiredRecord(23, 25, 1));
- expectedAcquiredRecords.addAll(expectedAcquiredRecord(26, 30, 4));
+ expectedAcquiredRecords.addAll(expectedAcquiredRecord(26, 30, 2));
expectedAcquiredRecords.addAll(expectedAcquiredRecord(31, 49, 1));
assertArrayEquals(expectedAcquiredRecords.toArray(),
acquiredRecordsList.toArray());
@@ -1322,7 +1323,7 @@ public class SharePartitionTest {
List.of(
new PersisterStateBatch(15L, 18L,
RecordState.AVAILABLE.id, (short) 2),
new PersisterStateBatch(20L, 22L,
RecordState.ARCHIVED.id, (short) 2),
- new PersisterStateBatch(26L, 30L,
RecordState.AVAILABLE.id, (short) 3)))))));
+ new PersisterStateBatch(26L, 30L,
RecordState.AVAILABLE.id, (short) 1)))))));
Mockito.when(persister.readState(Mockito.any())).thenReturn(CompletableFuture.completedFuture(readShareGroupStateResult));
SharePartition sharePartition =
SharePartitionBuilder.builder().withPersister(persister).build();
@@ -1365,7 +1366,7 @@ public class SharePartitionTest {
expectedAcquiredRecords.addAll(expectedAcquiredRecord(15, 18, 3));
expectedAcquiredRecords.addAll(expectedAcquiredRecord(19, 19, 1));
expectedAcquiredRecords.addAll(expectedAcquiredRecord(23, 25, 1));
- expectedAcquiredRecords.addAll(expectedAcquiredRecords(26, 27, 4));
+ expectedAcquiredRecords.addAll(expectedAcquiredRecords(26, 27, 2));
assertArrayEquals(expectedAcquiredRecords.toArray(),
acquiredRecordsList.toArray());
assertEquals(28, sharePartition.nextFetchOffset());
@@ -1404,7 +1405,7 @@ public class SharePartitionTest {
List.of(
new PersisterStateBatch(15L, 18L,
RecordState.AVAILABLE.id, (short) 2),
new PersisterStateBatch(20L, 22L,
RecordState.ARCHIVED.id, (short) 2),
- new PersisterStateBatch(26L, 30L,
RecordState.AVAILABLE.id, (short) 3)))))));
+ new PersisterStateBatch(26L, 30L,
RecordState.AVAILABLE.id, (short) 1)))))));
Mockito.when(persister.readState(Mockito.any())).thenReturn(CompletableFuture.completedFuture(readShareGroupStateResult));
SharePartition sharePartition =
SharePartitionBuilder.builder().withPersister(persister).build();
@@ -1475,7 +1476,7 @@ public class SharePartitionTest {
List.of(
new PersisterStateBatch(15L, 18L,
RecordState.AVAILABLE.id, (short) 2),
new PersisterStateBatch(20L, 22L,
RecordState.ARCHIVED.id, (short) 2),
- new PersisterStateBatch(26L, 30L,
RecordState.AVAILABLE.id, (short) 3)))))));
+ new PersisterStateBatch(26L, 30L,
RecordState.AVAILABLE.id, (short) 1)))))));
Mockito.when(persister.readState(Mockito.any())).thenReturn(CompletableFuture.completedFuture(readShareGroupStateResult));
SharePartition sharePartition =
SharePartitionBuilder.builder().withPersister(persister).build();
@@ -1621,7 +1622,7 @@ public class SharePartitionTest {
FETCH_ISOLATION_HWM),
24);
- expectedAcquiredRecords = new ArrayList<>(expectedAcquiredRecord(26,
30, 4));
+ expectedAcquiredRecords = new ArrayList<>(expectedAcquiredRecord(26,
30, 2));
expectedAcquiredRecords.addAll(expectedAcquiredRecord(31, 49, 1));
assertArrayEquals(expectedAcquiredRecords.toArray(),
acquiredRecordsList.toArray());
@@ -1647,7 +1648,7 @@ public class SharePartitionTest {
List.of(
new PersisterStateBatch(15L, 18L,
RecordState.AVAILABLE.id, (short) 2),
new PersisterStateBatch(20L, 22L,
RecordState.ARCHIVED.id, (short) 2),
- new PersisterStateBatch(26L, 30L,
RecordState.AVAILABLE.id, (short) 3)))))));
+ new PersisterStateBatch(26L, 30L,
RecordState.AVAILABLE.id, (short) 1)))))));
Mockito.when(persister.readState(Mockito.any())).thenReturn(CompletableFuture.completedFuture(readShareGroupStateResult));
SharePartition sharePartition =
SharePartitionBuilder.builder().withPersister(persister).build();
@@ -1697,7 +1698,7 @@ public class SharePartitionTest {
expectedAcquiredRecords.addAll(expectedAcquiredRecord(15, 18, 3));
expectedAcquiredRecords.addAll(expectedAcquiredRecord(19, 19, 1));
expectedAcquiredRecords.addAll(expectedAcquiredRecord(23, 25, 1));
- expectedAcquiredRecords.addAll(expectedAcquiredRecords(26, 27, 4));
+ expectedAcquiredRecords.addAll(expectedAcquiredRecords(26, 27, 2));
assertArrayEquals(expectedAcquiredRecords.toArray(),
acquiredRecordsList.toArray());
assertEquals(28, sharePartition.nextFetchOffset());
@@ -1736,7 +1737,7 @@ public class SharePartitionTest {
List.of(
new PersisterStateBatch(15L, 18L,
RecordState.AVAILABLE.id, (short) 2),
new PersisterStateBatch(20L, 22L,
RecordState.ARCHIVED.id, (short) 2),
- new PersisterStateBatch(26L, 30L,
RecordState.AVAILABLE.id, (short) 3)))))));
+ new PersisterStateBatch(26L, 30L,
RecordState.AVAILABLE.id, (short) 1)))))));
Mockito.when(persister.readState(Mockito.any())).thenReturn(CompletableFuture.completedFuture(readShareGroupStateResult));
SharePartition sharePartition =
SharePartitionBuilder.builder().withPersister(persister).build();
@@ -4428,12 +4429,12 @@ public class SharePartitionTest {
// Allowing acquisition lock to expire to archive the records that
reach max delivery count.
mockTimer.advanceClock(DEFAULT_MAX_WAIT_ACQUISITION_LOCK_TIMEOUT_MS);
TestUtils.waitForCondition(
- () -> sharePartition.timer().size() == 0 &&
- sharePartition.nextFetchOffset() == 0 &&
- // After the second delivery attempt fails to
acknowledge the record correctly, the record should be archived.
- sharePartition.cachedState().get(10L).batchState() ==
RecordState.ARCHIVED &&
-
sharePartition.cachedState().get(10L).batchDeliveryCount() == 2 &&
-
sharePartition.cachedState().get(10L).batchAcquisitionLockTimeoutTask() == null,
+ () -> sharePartition.timer().size() == 0 &&
+ sharePartition.nextFetchOffset() == 0 &&
+ // After the second delivery attempt fails to acknowledge
the record correctly, the record should be archived.
+ sharePartition.cachedState().get(10L).batchState() ==
RecordState.ARCHIVED &&
+ sharePartition.cachedState().get(10L).batchDeliveryCount()
== 2 &&
+
sharePartition.cachedState().get(10L).batchAcquisitionLockTimeoutTask() == null,
DEFAULT_MAX_WAIT_ACQUISITION_LOCK_TIMEOUT_MS,
() -> assertionFailedMessage(sharePartition, Map.of(10L,
List.of())));
// After the acquisition lock expires for the second time, the records
should be archived as the max delivery count is reached.
@@ -7782,7 +7783,7 @@ public class SharePartitionTest {
new TopicData<>(TOPIC_ID_PARTITION.topicId(), List.of(
PartitionFactory.newPartitionAllData(0, 3, 11L,
Errors.NONE.code(), Errors.NONE.message(),
List.of(
- new PersisterStateBatch(11L, 20L,
RecordState.AVAILABLE.id, (short) 2),
+ new PersisterStateBatch(11L, 20L,
RecordState.AVAILABLE.id, (short) 1),
new PersisterStateBatch(31L, 40L,
RecordState.ARCHIVED.id, (short) 1) // There is a gap from 21 to 30
))))));
Mockito.when(persister.readState(Mockito.any())).thenReturn(CompletableFuture.completedFuture(readShareGroupStateResult));
@@ -8083,11 +8084,11 @@ public class SharePartitionTest {
fetchAcquiredRecords(sharePartition, records, 10);
sharePartition.acknowledge(MEMBER_ID, List.of(
- new ShareAcknowledgementBatch(5, 14,
List.of(AcknowledgeType.RELEASE.id))));
+ new ShareAcknowledgementBatch(5, 14,
List.of(AcknowledgeType.RELEASE.id))));
fetchAcquiredRecords(sharePartition, records, 10);
sharePartition.acknowledge(MEMBER_ID, List.of(
- new ShareAcknowledgementBatch(5, 14,
List.of(AcknowledgeType.RELEASE.id))));
+ new ShareAcknowledgementBatch(5, 14,
List.of(AcknowledgeType.RELEASE.id))));
// All the records in the batch reached the max delivery count, hence
they got archived and the cached state cleared.
assertEquals(15, sharePartition.nextFetchOffset());
@@ -8121,7 +8122,7 @@ public class SharePartitionTest {
fetchAcquiredRecords(sharePartition, records2, 2);
sharePartition.acknowledge(MEMBER_ID, List.of(
- new ShareAcknowledgementBatch(13, 16,
List.of(AcknowledgeType.RELEASE.id))));
+ new ShareAcknowledgementBatch(13, 16,
List.of(AcknowledgeType.RELEASE.id))));
assertEquals(20, sharePartition.nextFetchOffset());
// Cached state will be empty because after the second release, the
acquired records will now have moved to
@@ -11191,6 +11192,404 @@ public class SharePartitionTest {
Mockito.verify(persister, Mockito.times(4)).writeState(Mockito.any());
}
+ @Test
+ public void testThrottleRecordsWhenPendingDeliveriesExist() {
+ Persister persister = Mockito.mock(Persister.class);
+ ReadShareGroupStateResult readShareGroupStateResult =
Mockito.mock(ReadShareGroupStateResult.class);
+
Mockito.when(readShareGroupStateResult.topicsData()).thenReturn(List.of(
+ new TopicData<>(TOPIC_ID_PARTITION.topicId(), List.of(
+ PartitionFactory.newPartitionAllData(0, 3, 5L,
Errors.NONE.code(), Errors.NONE.message(),
+ List.of(
+ new PersisterStateBatch(15L, 19L,
RecordState.AVAILABLE.id, (short) 2),
+ new PersisterStateBatch(20L, 22L,
RecordState.ARCHIVED.id, (short) 2),
+ new PersisterStateBatch(26L, 30L,
RecordState.AVAILABLE.id, (short) 3)))))));
+
Mockito.when(persister.readState(Mockito.any())).thenReturn(CompletableFuture.completedFuture(readShareGroupStateResult));
+ SharePartition sharePartition =
SharePartitionBuilder.builder().withPersister(persister).build();
+
+ CompletableFuture<Void> result = sharePartition.maybeInitialize();
+ assertTrue(result.isDone());
+ assertFalse(result.isCompletedExceptionally());
+
+ assertEquals(SharePartitionState.ACTIVE,
sharePartition.partitionState());
+ assertEquals(3, sharePartition.cachedState().size());
+ assertEquals(5, sharePartition.startOffset());
+ assertEquals(30, sharePartition.endOffset());
+ assertEquals(5, sharePartition.nextFetchOffset());
+
+ ByteBuffer buffer = ByteBuffer.allocate(4096);
+ memoryRecordsBuilder(buffer, 7, 13).close();
+ memoryRecordsBuilder(buffer, 20, 8).close();
+ buffer.flip();
+ MemoryRecords records = MemoryRecords.readableRecords(buffer);
+ // Set max fetch records to 500, records should be acquired till the
offset 26 of the fetched batch.
+ // 16 records should be returned: 7-19, 23-25
+ // The record at offset 26 has a delivery count of 3 and is a subject
to be throttled; it should be skipped.
+ List<AcquiredRecords> acquiredRecordsList =
fetchAcquiredRecords(sharePartition.acquire(
+ MEMBER_ID,
+ ShareAcquireMode.BATCH_OPTIMIZED,
+ BATCH_SIZE,
+ 500,
+ 5,
+ fetchPartitionData(records),
+ FETCH_ISOLATION_HWM),
+ 16);
+
+ List<AcquiredRecords> expectedAcquiredRecords = new
ArrayList<>(expectedAcquiredRecord(7, 14, 1));
+ expectedAcquiredRecords.addAll(expectedAcquiredRecord(15, 19, 3));
+ expectedAcquiredRecords.addAll(expectedAcquiredRecord(23, 25, 1));
+
+ assertArrayEquals(expectedAcquiredRecords.toArray(),
acquiredRecordsList.toArray());
+ assertEquals(26, sharePartition.nextFetchOffset());
+ assertEquals(23, sharePartition.cachedState().get(23L).firstOffset());
+ assertEquals(25, sharePartition.cachedState().get(23L).lastOffset());
+
+ assertEquals(RecordState.ACQUIRED,
sharePartition.cachedState().get(7L).batchState());
+ assertEquals(RecordState.ACQUIRED,
sharePartition.cachedState().get(15L).batchState());
+ assertEquals(RecordState.ARCHIVED,
sharePartition.cachedState().get(20L).batchState());
+ assertEquals(RecordState.ACQUIRED,
sharePartition.cachedState().get(23L).batchState());
+ assertEquals(RecordState.AVAILABLE,
sharePartition.cachedState().get(26L).batchState());
+ assertEquals(30L, sharePartition.endOffset());
+ assertEquals(3, sharePartition.deliveryCompleteCount());
+
+ // The record at offset 26 has a delivery count of 3 and is a subject
to be throttled;
+ // First acquisition attempt fails: batch size should be halved (5 ->
2)
+ acquiredRecordsList = fetchAcquiredRecords(sharePartition.acquire(
+ MEMBER_ID,
+ ShareAcquireMode.BATCH_OPTIMIZED,
+ BATCH_SIZE,
+ 500,
+ 26,
+ fetchPartitionData(records),
+ FETCH_ISOLATION_HWM),
+ 2);
+
+ expectedAcquiredRecords = new ArrayList<>(expectedAcquiredRecord(26,
26, 4));
+ expectedAcquiredRecords.addAll(expectedAcquiredRecord(27, 27, 4));
+ assertArrayEquals(expectedAcquiredRecords.toArray(),
acquiredRecordsList.toArray());
+ }
+
+ @Test
+ public void
testAcquireRecordsHalvesBatchSizeOnEachFailureUntilSingleRecordOnLastAttempt()
throws InterruptedException {
+ Persister persister = Mockito.mock(Persister.class);
+ ReadShareGroupStateResult readShareGroupStateResult =
Mockito.mock(ReadShareGroupStateResult.class);
+
Mockito.when(readShareGroupStateResult.topicsData()).thenReturn(List.of(
+ new TopicData<>(TOPIC_ID_PARTITION.topicId(), List.of(
+ PartitionFactory.newPartitionAllData(0, 3, 5L,
Errors.NONE.code(), Errors.NONE.message(),
+ List.of(
+ new PersisterStateBatch(15L, 34L,
RecordState.AVAILABLE.id, (short) 4)))))));
+
Mockito.when(persister.readState(Mockito.any())).thenReturn(CompletableFuture.completedFuture(readShareGroupStateResult));
+ SharePartition sharePartition = SharePartitionBuilder.builder()
+ .withPersister(persister)
+ .withMaxDeliveryCount(7)
+
.withDefaultAcquisitionLockTimeoutMs(DEFAULT_MAX_WAIT_ACQUISITION_LOCK_TIMEOUT_MS)
+ .build();
+
+ WriteShareGroupStateResult writeShareGroupStateResult =
Mockito.mock(WriteShareGroupStateResult.class);
+
Mockito.when(writeShareGroupStateResult.topicsData()).thenReturn(List.of(
+ new TopicData<>(TOPIC_ID_PARTITION.topicId(), List.of(
+ PartitionFactory.newPartitionErrorData(0, Errors.NONE.code(),
Errors.NONE.message())))));
+
Mockito.when(persister.writeState(Mockito.any())).thenReturn(CompletableFuture.completedFuture(writeShareGroupStateResult));
+
+ CompletableFuture<Void> result = sharePartition.maybeInitialize();
+ assertTrue(result.isDone());
+ assertFalse(result.isCompletedExceptionally());
+
+ assertEquals(SharePartitionState.ACTIVE,
sharePartition.partitionState());
+ assertEquals(1, sharePartition.cachedState().size());
+ assertEquals(5, sharePartition.startOffset());
+ assertEquals(34, sharePartition.endOffset());
+ assertEquals(5, sharePartition.nextFetchOffset());
+
+ ByteBuffer buffer = ByteBuffer.allocate(4096);
+ memoryRecordsBuilder(buffer, 15, 20).close();
+ buffer.flip();
+ MemoryRecords records = MemoryRecords.readableRecords(buffer);
+
+ // The record at offset 15 has a delivery count of 4 and is a subject
to be throttled
+ // First acquisition attempt fails: batch size should be halved (20 ->
10)
+ List<AcquiredRecords> acquiredRecordsList =
fetchAcquiredRecords(sharePartition.acquire(
+ MEMBER_ID,
+ ShareAcquireMode.BATCH_OPTIMIZED,
+ BATCH_SIZE,
+ 500,
+ 5,
+ fetchPartitionData(records),
+ FETCH_ISOLATION_HWM),
+ 10);
+ final List<AcquiredRecords> expectedAcquiredRecords = new
ArrayList<>(expectedAcquiredRecord(15, 15, 5));
+ IntStream.range(1, 10).forEach(i ->
expectedAcquiredRecords.addAll(expectedAcquiredRecord(15 + i, 15 + i, 5)));
+ assertArrayEquals(expectedAcquiredRecords.toArray(),
acquiredRecordsList.toArray());
+
+ // Allowing acquisition lock to expire.
+ mockTimer.advanceClock(DEFAULT_MAX_WAIT_ACQUISITION_LOCK_TIMEOUT_MS +
1);
+
+ // Second failure: batch size halved again (now ~1/4 of original, 20
-> 5)
+ acquiredRecordsList = fetchAcquiredRecords(sharePartition.acquire(
+ MEMBER_ID,
+ ShareAcquireMode.BATCH_OPTIMIZED,
+ BATCH_SIZE,
+ 500,
+ 5,
+ fetchPartitionData(records),
+ FETCH_ISOLATION_HWM),
+ 5);
+ final List<AcquiredRecords> expectedAcquiredRecords2 = new
ArrayList<>(expectedAcquiredRecord(15, 15, 6));
+ IntStream.range(1, 5).forEach(i ->
expectedAcquiredRecords2.addAll(expectedAcquiredRecord(15 + i, 15 + i, 6)));
+ assertArrayEquals(expectedAcquiredRecords2.toArray(),
acquiredRecordsList.toArray());
+
+ // Allowing acquisition lock to expire.
+ mockTimer.advanceClock(DEFAULT_MAX_WAIT_ACQUISITION_LOCK_TIMEOUT_MS +
1);
+
+ List<AcquiredRecords> expectedLastAttemptAcquiredRecords;
+ // Last delivery attempt, records are delivered individually.
+ for (int i = 0; i < 5; i++) {
+ acquiredRecordsList = fetchAcquiredRecords(sharePartition.acquire(
+ MEMBER_ID,
+ ShareAcquireMode.BATCH_OPTIMIZED,
+ BATCH_SIZE,
+ 500,
+ 5,
+ fetchPartitionData(records),
+ FETCH_ISOLATION_HWM),
+ 1);
+ expectedLastAttemptAcquiredRecords = new
ArrayList<>(expectedAcquiredRecord(15 + i, 15 + i, 7));
+ assertArrayEquals(expectedLastAttemptAcquiredRecords.toArray(),
acquiredRecordsList.toArray());
+ }
+
+ // The record at offset 20 has a delivery count of 5 and is a subject
to be throttled;
+ // Second acquisition attempt fails: batch size should be ~1/4 of
original, 20 -> 5
+ acquiredRecordsList = fetchAcquiredRecords(sharePartition.acquire(
+ MEMBER_ID,
+ ShareAcquireMode.BATCH_OPTIMIZED,
+ BATCH_SIZE,
+ 500,
+ 5,
+ fetchPartitionData(records),
+ FETCH_ISOLATION_HWM),
+ 5);
+ final List<AcquiredRecords> expectedAcquiredRecords3 = new
ArrayList<>(expectedAcquiredRecord(20, 20, 6));
+ IntStream.range(1, 5).forEach(i ->
expectedAcquiredRecords3.addAll(expectedAcquiredRecord(20 + i, 20 + i, 6)));
+ assertArrayEquals(expectedAcquiredRecords3.toArray(),
acquiredRecordsList.toArray());
+
+ // The record at offset 25 has a delivery count of 4 and is a subject
to be throttled;
+ // First acquisition attempt fails: batch size should be ~1/2 of
original, 20 -> 10
+ acquiredRecordsList = fetchAcquiredRecords(sharePartition.acquire(
+ MEMBER_ID,
+ ShareAcquireMode.BATCH_OPTIMIZED,
+ BATCH_SIZE,
+ 500,
+ 5,
+ fetchPartitionData(records),
+ FETCH_ISOLATION_HWM),
+ 10);
+ final List<AcquiredRecords> expectedAcquiredRecords4 = new
ArrayList<>(expectedAcquiredRecord(25, 25, 5));
+ IntStream.range(1, 10).forEach(i ->
expectedAcquiredRecords4.addAll(expectedAcquiredRecord(25 + i, 25 + i, 5)));
+ assertArrayEquals(expectedAcquiredRecords4.toArray(),
acquiredRecordsList.toArray());
+ }
+
+ @Test
+ public void testLastAttemptRecordIsolationWithMixedDeliveryCount() {
+ Persister persister = Mockito.mock(Persister.class);
+ ReadShareGroupStateResult readShareGroupStateResult =
Mockito.mock(ReadShareGroupStateResult.class);
+
Mockito.when(readShareGroupStateResult.topicsData()).thenReturn(List.of(
+ new TopicData<>(TOPIC_ID_PARTITION.topicId(), List.of(
+ PartitionFactory.newPartitionAllData(0, 3, 15L,
Errors.NONE.code(), Errors.NONE.message(),
+ List.of(
+ new PersisterStateBatch(15L, 34L,
RecordState.AVAILABLE.id, (short) 2)))))));
+
Mockito.when(persister.readState(Mockito.any())).thenReturn(CompletableFuture.completedFuture(readShareGroupStateResult));
+ SharePartition sharePartition =
SharePartitionBuilder.builder().withPersister(persister).build();
+
+ WriteShareGroupStateResult writeShareGroupStateResult =
Mockito.mock(WriteShareGroupStateResult.class);
+
Mockito.when(writeShareGroupStateResult.topicsData()).thenReturn(List.of(
+ new TopicData<>(TOPIC_ID_PARTITION.topicId(), List.of(
+ PartitionFactory.newPartitionErrorData(0, Errors.NONE.code(),
Errors.NONE.message())))));
+
Mockito.when(persister.writeState(Mockito.any())).thenReturn(CompletableFuture.completedFuture(writeShareGroupStateResult));
+
+ CompletableFuture<Void> result = sharePartition.maybeInitialize();
+ assertTrue(result.isDone());
+ assertFalse(result.isCompletedExceptionally());
+
+ assertEquals(SharePartitionState.ACTIVE,
sharePartition.partitionState());
+ assertEquals(1, sharePartition.cachedState().size());
+ assertEquals(15, sharePartition.startOffset());
+ assertEquals(34, sharePartition.endOffset());
+ assertEquals(15, sharePartition.nextFetchOffset());
+
+ ByteBuffer buffer = ByteBuffer.allocate(4096);
+ memoryRecordsBuilder(buffer, 15, 20).close();
+ buffer.flip();
+ MemoryRecords records = MemoryRecords.readableRecords(buffer);
+
+ fetchAcquiredRecords(sharePartition.acquire(
+ MEMBER_ID,
+ ShareAcquireMode.BATCH_OPTIMIZED,
+ BATCH_SIZE,
+ 500,
+ 15,
+ fetchPartitionData(records),
+ FETCH_ISOLATION_HWM),
+ 20);
+
+ // Release middle batch.
+ CompletableFuture<Void> ackResult = sharePartition.acknowledge(
+ MEMBER_ID,
+ List.of(new ShareAcknowledgementBatch(17, 17,
List.of(AcknowledgeType.RELEASE.id)),
+ new ShareAcknowledgementBatch(20, 20,
List.of(AcknowledgeType.RELEASE.id))));
+ assertNull(ackResult.join());
+ assertFalse(ackResult.isCompletedExceptionally());
+
+ fetchAcquiredRecords(sharePartition.acquire(
+ MEMBER_ID,
+ ShareAcquireMode.BATCH_OPTIMIZED,
+ BATCH_SIZE,
+ 500,
+ 15,
+ fetchPartitionData(records),
+ FETCH_ISOLATION_HWM),
+ 2);
+
+ // Release all batch.
+ ackResult = sharePartition.acknowledge(
+ MEMBER_ID,
+ List.of(new ShareAcknowledgementBatch(15, 34,
List.of(AcknowledgeType.RELEASE.id))));
+ assertNull(ackResult.join());
+ assertFalse(ackResult.isCompletedExceptionally());
+
+ assertEquals(1, sharePartition.cachedState().size());
+ assertEquals(RecordState.AVAILABLE,
sharePartition.cachedState().get(15L).offsetState().get(15L).state());
+ assertEquals(RecordState.AVAILABLE,
sharePartition.cachedState().get(15L).offsetState().get(16L).state());
+ assertEquals(RecordState.AVAILABLE,
sharePartition.cachedState().get(15L).offsetState().get(17L).state());
+ assertEquals(RecordState.AVAILABLE,
sharePartition.cachedState().get(15L).offsetState().get(18L).state());
+ assertEquals(3,
sharePartition.cachedState().get(15L).offsetState().get(15L).deliveryCount());
+ assertEquals(3,
sharePartition.cachedState().get(15L).offsetState().get(16L).deliveryCount());
+ assertEquals(4,
sharePartition.cachedState().get(15L).offsetState().get(17L).deliveryCount());
+ assertEquals(3,
sharePartition.cachedState().get(15L).offsetState().get(18L).deliveryCount());
+
+ // The record at offset 17 (delivery count of 4) is on its last
attempt and should be delivered alone,
+ // so this acquisition correctly stops at offset 16.
+ List<AcquiredRecords> acquiredRecordsList =
fetchAcquiredRecords(sharePartition.acquire(
+ MEMBER_ID,
+ ShareAcquireMode.BATCH_OPTIMIZED,
+ BATCH_SIZE,
+ 500,
+ 5,
+ fetchPartitionData(records),
+ FETCH_ISOLATION_HWM),
+ 2);
+ List<AcquiredRecords> expectedAcquiredRecords = new
ArrayList<>(expectedAcquiredRecord(15, 15, 4));
+ expectedAcquiredRecords.addAll(expectedAcquiredRecord(16, 16, 4));
+ assertArrayEquals(expectedAcquiredRecords.toArray(),
acquiredRecordsList.toArray());
+
+ // The record at offset 17 should ba delivered alone.
+ acquiredRecordsList = fetchAcquiredRecords(sharePartition.acquire(
+ MEMBER_ID,
+ ShareAcquireMode.BATCH_OPTIMIZED,
+ BATCH_SIZE,
+ 500,
+ 5,
+ fetchPartitionData(records),
+ FETCH_ISOLATION_HWM),
+ 1);
+ expectedAcquiredRecords = new ArrayList<>(expectedAcquiredRecord(17,
17, 5));
+ assertArrayEquals(expectedAcquiredRecords.toArray(),
acquiredRecordsList.toArray());
+ }
+
+ @Test
+ public void
testAcquisitionNotThrottledIfHighDeliveryCountRecordNotAcquired() {
+ Persister persister = Mockito.mock(Persister.class);
+ ReadShareGroupStateResult readShareGroupStateResult =
Mockito.mock(ReadShareGroupStateResult.class);
+
Mockito.when(readShareGroupStateResult.topicsData()).thenReturn(List.of(
+ new TopicData<>(TOPIC_ID_PARTITION.topicId(), List.of(
+ PartitionFactory.newPartitionAllData(0, 3, 15L,
Errors.NONE.code(), Errors.NONE.message(),
+ List.of(
+ new PersisterStateBatch(15L, 19L,
RecordState.AVAILABLE.id, (short) 1)))))));
+
Mockito.when(persister.readState(Mockito.any())).thenReturn(CompletableFuture.completedFuture(readShareGroupStateResult));
+ SharePartition sharePartition =
SharePartitionBuilder.builder().withPersister(persister).build();
+
+ WriteShareGroupStateResult writeShareGroupStateResult =
Mockito.mock(WriteShareGroupStateResult.class);
+
Mockito.when(writeShareGroupStateResult.topicsData()).thenReturn(List.of(
+ new TopicData<>(TOPIC_ID_PARTITION.topicId(), List.of(
+ PartitionFactory.newPartitionErrorData(0, Errors.NONE.code(),
Errors.NONE.message())))));
+
Mockito.when(persister.writeState(Mockito.any())).thenReturn(CompletableFuture.completedFuture(writeShareGroupStateResult));
+
+ CompletableFuture<Void> result = sharePartition.maybeInitialize();
+ assertTrue(result.isDone());
+ assertFalse(result.isCompletedExceptionally());
+
+ assertEquals(SharePartitionState.ACTIVE,
sharePartition.partitionState());
+ assertEquals(1, sharePartition.cachedState().size());
+ assertEquals(15, sharePartition.startOffset());
+ assertEquals(19, sharePartition.endOffset());
+ assertEquals(15, sharePartition.nextFetchOffset());
+
+ ByteBuffer buffer = ByteBuffer.allocate(4096);
+ memoryRecordsBuilder(buffer, 15, 5).close();
+ buffer.flip();
+ MemoryRecords records = MemoryRecords.readableRecords(buffer);
+
+ fetchAcquiredRecords(sharePartition.acquire(
+ MEMBER_ID,
+ ShareAcquireMode.BATCH_OPTIMIZED,
+ BATCH_SIZE,
+ 500,
+ 15,
+ fetchPartitionData(records),
+ FETCH_ISOLATION_HWM),
+ 5);
+
+ // Release middle batch.
+ CompletableFuture<Void> ackResult = sharePartition.acknowledge(
+ MEMBER_ID,
+ List.of(new ShareAcknowledgementBatch(15, 15,
List.of(AcknowledgeType.RELEASE.id))));
+ assertNull(ackResult.join());
+ assertFalse(ackResult.isCompletedExceptionally());
+
+ fetchAcquiredRecords(sharePartition.acquire(
+ MEMBER_ID,
+ ShareAcquireMode.BATCH_OPTIMIZED,
+ BATCH_SIZE,
+ 500,
+ 15,
+ fetchPartitionData(records),
+ FETCH_ISOLATION_HWM),
+ 1);
+
+ // Release middle batch.
+ ackResult = sharePartition.acknowledge(
+ MEMBER_ID,
+ List.of(new ShareAcknowledgementBatch(16, 19,
List.of(AcknowledgeType.RELEASE.id))));
+ assertNull(ackResult.join());
+ assertFalse(ackResult.isCompletedExceptionally());
+
+ assertEquals(1, sharePartition.cachedState().size());
+ assertEquals(RecordState.ACQUIRED,
sharePartition.cachedState().get(15L).offsetState().get(15L).state());
+ assertEquals(RecordState.AVAILABLE,
sharePartition.cachedState().get(15L).offsetState().get(16L).state());
+ assertEquals(RecordState.AVAILABLE,
sharePartition.cachedState().get(15L).offsetState().get(17L).state());
+ assertEquals(RecordState.AVAILABLE,
sharePartition.cachedState().get(15L).offsetState().get(18L).state());
+ assertEquals(RecordState.AVAILABLE,
sharePartition.cachedState().get(15L).offsetState().get(19L).state());
+ assertEquals(3,
sharePartition.cachedState().get(15L).offsetState().get(15L).deliveryCount());
+ assertEquals(2,
sharePartition.cachedState().get(15L).offsetState().get(16L).deliveryCount());
+ assertEquals(2,
sharePartition.cachedState().get(15L).offsetState().get(17L).deliveryCount());
+ assertEquals(2,
sharePartition.cachedState().get(15L).offsetState().get(18L).deliveryCount());
+ assertEquals(2,
sharePartition.cachedState().get(15L).offsetState().get(19L).deliveryCount());
+
+ // This acquisition should not be throttled, as the
high-delivery-count record (offset 15) was not acquired.
+ List<AcquiredRecords> acquiredRecordsList =
fetchAcquiredRecords(sharePartition.acquire(
+ MEMBER_ID,
+ ShareAcquireMode.BATCH_OPTIMIZED,
+ BATCH_SIZE,
+ 500,
+ 5,
+ fetchPartitionData(records),
+ FETCH_ISOLATION_HWM),
+ 4);
+ List<AcquiredRecords> expectedAcquiredRecords = new
ArrayList<>(expectedAcquiredRecord(16, 16, 3));
+ expectedAcquiredRecords.addAll(expectedAcquiredRecord(17, 17, 3));
+ expectedAcquiredRecords.addAll(expectedAcquiredRecord(18, 18, 3));
+ expectedAcquiredRecords.addAll(expectedAcquiredRecord(19, 19, 3));
+ assertArrayEquals(expectedAcquiredRecords.toArray(),
acquiredRecordsList.toArray());
+ }
+
/**
* This function produces transactional data of a given no. of records
followed by a transactional marker (COMMIT/ABORT).
*/