This is an automated email from the ASF dual-hosted git repository.
mittal pushed a commit to branch 4.2
in repository https://gitbox.apache.org/repos/asf/kafka.git
The following commit(s) were added to refs/heads/4.2 by this push:
new e8397fc8769 MINOR: Correcting the throttling condition for batch in
Share Partition (#20978) (#20988)
e8397fc8769 is described below
commit e8397fc8769f4ecd7905711521e088ae315e3429
Author: Apoorv Mittal <[email protected]>
AuthorDate: Tue Nov 25 12:21:45 2025 +0000
MINOR: Correcting the throttling condition for batch in Share Partition
(#20978) (#20988)
The PR adds a check for throttling condition to not consider ongoing
state transition for batch and offsets. Tests has been added and
corrected which verifies the behaviour.
Reviewers: Andrew Schofield <[email protected]>
---
.../java/kafka/server/share/SharePartition.java | 14 ++-
.../kafka/server/share/SharePartitionTest.java | 117 ++++++++++++++++++++-
2 files changed, 125 insertions(+), 6 deletions(-)
diff --git a/core/src/main/java/kafka/server/share/SharePartition.java
b/core/src/main/java/kafka/server/share/SharePartition.java
index 78ab7c29897..ba3a142cf0c 100644
--- a/core/src/main/java/kafka/server/share/SharePartition.java
+++ b/core/src/main/java/kafka/server/share/SharePartition.java
@@ -2027,7 +2027,14 @@ public class SharePartition {
*/
private boolean shouldThrottleRecordsDelivery(InFlightBatch inFlightBatch,
long requestFirstOffset, long requestLastOffset) {
if (inFlightBatch.offsetState() == null) {
- return inFlightBatch.batchDeliveryCount() >=
throttleRecordsDeliveryLimit;
+ // If offsetState is null, it means the batch is not split and
represents a single batch.
+ // Check if the batch is in AVAILABLE state and has no ongoing
transition.
+ // The requested batch shall always be within the request first
and last offset as the sub
+ // map batches are only fetched to consider.
+ if (inFlightBatch.batchState() == RecordState.AVAILABLE &&
!inFlightBatch.batchHasOngoingStateTransition()) {
+ return inFlightBatch.batchDeliveryCount() >=
throttleRecordsDeliveryLimit;
+ }
+ return false;
}
return inFlightBatch.offsetState().entrySet().stream().filter(entry ->
{
@@ -2037,10 +2044,7 @@ public class SharePartition {
if (entry.getKey() > requestLastOffset) {
return false;
}
- if (entry.getValue().state() != RecordState.AVAILABLE) {
- return false;
- }
- return true;
+ return entry.getValue().state() == RecordState.AVAILABLE &&
!entry.getValue().hasOngoingStateTransition();
}).mapToInt(entry -> entry.getValue().deliveryCount()).max().orElse(0)
>= throttleRecordsDeliveryLimit;
}
diff --git a/core/src/test/java/kafka/server/share/SharePartitionTest.java
b/core/src/test/java/kafka/server/share/SharePartitionTest.java
index 73c0242b81f..8e8240377e6 100644
--- a/core/src/test/java/kafka/server/share/SharePartitionTest.java
+++ b/core/src/test/java/kafka/server/share/SharePartitionTest.java
@@ -11221,7 +11221,7 @@ public class SharePartitionTest {
PartitionFactory.newPartitionAllData(0, 3, 5L,
Errors.NONE.code(), Errors.NONE.message(),
List.of(
new PersisterStateBatch(15L, 19L,
RecordState.AVAILABLE.id, (short) 2),
- new PersisterStateBatch(20L, 22L,
RecordState.ARCHIVED.id, (short) 2),
+ new PersisterStateBatch(20L, 22L,
RecordState.ARCHIVED.id, (short) 3),
new PersisterStateBatch(26L, 30L,
RecordState.AVAILABLE.id, (short) 3)))))));
Mockito.when(persister.readState(Mockito.any())).thenReturn(CompletableFuture.completedFuture(readShareGroupStateResult));
SharePartition sharePartition =
SharePartitionBuilder.builder().withPersister(persister).build();
@@ -11610,6 +11610,121 @@ public class SharePartitionTest {
assertArrayEquals(expectedAcquiredRecords.toArray(),
acquiredRecordsList.toArray());
}
+ @Test
+ public void testAcquisitionThrottlingWithOngoingStateTransition() {
+ Persister persister = Mockito.mock(Persister.class);
+ ReadShareGroupStateResult readShareGroupStateResult =
Mockito.mock(ReadShareGroupStateResult.class);
+
Mockito.when(readShareGroupStateResult.topicsData()).thenReturn(List.of(
+ new TopicData<>(TOPIC_ID_PARTITION.topicId(), List.of(
+ PartitionFactory.newPartitionAllData(0, 3, 15L,
Errors.NONE.code(), Errors.NONE.message(),
+ List.of(
+ new PersisterStateBatch(15L, 19L,
RecordState.AVAILABLE.id, (short) 1),
+ // Batch of 20-24 has been set to delivery count of 2
so in next acquisition it will be 3,
+ // and post that it should be throttled but because of
pending state transition it
+ // should not be throttled.
+ new PersisterStateBatch(20L, 24L,
RecordState.AVAILABLE.id, (short) 2),
+ new PersisterStateBatch(25L, 29L,
RecordState.AVAILABLE.id, (short) 2),
+ new PersisterStateBatch(30L, 34L,
RecordState.AVAILABLE.id, (short) 2),
+ // Similarly, batch of 35-39 has been set to delivery
count of 2 so in next offset
+ // acquisition, some offsets will be at 3 delivery
count, and post that offsets
+ // should be throttled but because of pending state
transition they will not be throttled.
+ new PersisterStateBatch(35, 39L,
RecordState.AVAILABLE.id, (short) 2),
+ new PersisterStateBatch(40, 44L,
RecordState.ARCHIVED.id, (short) 5),
+ new PersisterStateBatch(45, 49L,
RecordState.AVAILABLE.id, (short) 1)))))));
+
Mockito.when(persister.readState(Mockito.any())).thenReturn(CompletableFuture.completedFuture(readShareGroupStateResult));
+ SharePartition sharePartition =
SharePartitionBuilder.builder().withPersister(persister).build();
+
+ CompletableFuture<Void> result = sharePartition.maybeInitialize();
+ assertTrue(result.isDone());
+ assertFalse(result.isCompletedExceptionally());
+
+ // Acquire batches 20-24 and 36-37 (offset based) and create a pending
state transition.
+ fetchAcquiredRecords(sharePartition, memoryRecords(20, 5), 5);
+ assertEquals(RecordState.ACQUIRED,
sharePartition.cachedState().get(20L).batchState());
+ fetchAcquiredRecords(sharePartition, memoryRecords(36, 2), 2);
+ assertEquals(RecordState.AVAILABLE,
sharePartition.cachedState().get(35L).offsetState().get(35L).state());
+ assertEquals(RecordState.ACQUIRED,
sharePartition.cachedState().get(35L).offsetState().get(36L).state());
+ assertEquals(RecordState.ACQUIRED,
sharePartition.cachedState().get(35L).offsetState().get(37L).state());
+ assertEquals(RecordState.AVAILABLE,
sharePartition.cachedState().get(35L).offsetState().get(38L).state());
+
+ // Create a pending future which will block state updates.
+ CompletableFuture<WriteShareGroupStateResult> future = new
CompletableFuture<>();
+ Mockito.when(persister.writeState(Mockito.any())).thenReturn(future);
+
+ // Release batch of 20-24 and offset 36-37, which will have pending
state transition.
+ sharePartition.acknowledge(
+ MEMBER_ID,
+ List.of(new ShareAcknowledgementBatch(20, 24,
List.of(AcknowledgeType.RELEASE.id)),
+ new ShareAcknowledgementBatch(36, 37,
List.of(AcknowledgeType.RELEASE.id))));
+
+ assertEquals(RecordState.AVAILABLE,
sharePartition.cachedState().get(20L).batchState());
+ assertEquals(RecordState.AVAILABLE,
sharePartition.cachedState().get(35L).offsetState().get(36L).state());
+ assertEquals(RecordState.AVAILABLE,
sharePartition.cachedState().get(35L).offsetState().get(37L).state());
+
+
assertTrue(sharePartition.cachedState().get(20L).batchHasOngoingStateTransition());
+
assertFalse(sharePartition.cachedState().get(35L).offsetState().get(35L).hasOngoingStateTransition());
+
assertTrue(sharePartition.cachedState().get(35L).offsetState().get(36L).hasOngoingStateTransition());
+
assertTrue(sharePartition.cachedState().get(35L).offsetState().get(37L).hasOngoingStateTransition());
+
assertFalse(sharePartition.cachedState().get(35L).offsetState().get(38L).hasOngoingStateTransition());
+
+ ByteBuffer buffer = ByteBuffer.allocate(4096);
+ memoryRecordsBuilder(buffer, 15, 5).close();
+ memoryRecordsBuilder(buffer, 20, 5).close();
+ memoryRecordsBuilder(buffer, 25, 5).close();
+ memoryRecordsBuilder(buffer, 30, 5).close();
+ memoryRecordsBuilder(buffer, 35, 5).close();
+ memoryRecordsBuilder(buffer, 40, 5).close();
+ memoryRecordsBuilder(buffer, 45, 5).close();
+ buffer.flip();
+ MemoryRecords records = MemoryRecords.readableRecords(buffer);
+
+ // Acquire batches and batch 15-19, 25-29 will be acquired as batch
20-24 has pending state transition.
+ // Without pending transition, the acquisition would have happened
only for 20-24 batch as the batch
+ // 20-24 would have marked to be throttled but eventually couldn't be
acquired because of state transition.
+ fetchAcquiredRecords(sharePartition.acquire(
+ MEMBER_ID,
+ ShareAcquireMode.BATCH_OPTIMIZED,
+ BATCH_SIZE,
+ 10,
+ 15,
+ fetchPartitionData(records),
+ FETCH_ISOLATION_HWM),
+ 10);
+
+ assertEquals(7, sharePartition.cachedState().size());
+ assertEquals(RecordState.ACQUIRED,
sharePartition.cachedState().get(15L).batchState());
+ assertEquals(RecordState.AVAILABLE,
sharePartition.cachedState().get(20L).batchState());
+
assertTrue(sharePartition.cachedState().get(20L).batchHasOngoingStateTransition());
+ assertEquals(RecordState.ACQUIRED,
sharePartition.cachedState().get(25L).batchState());
+ assertEquals(RecordState.AVAILABLE,
sharePartition.cachedState().get(30L).batchState());
+ assertEquals(RecordState.AVAILABLE,
sharePartition.cachedState().get(35L).offsetState().get(35L).state());
+ assertEquals(RecordState.AVAILABLE,
sharePartition.cachedState().get(35L).offsetState().get(36L).state());
+ assertEquals(RecordState.AVAILABLE,
sharePartition.cachedState().get(35L).offsetState().get(37L).state());
+ assertEquals(RecordState.AVAILABLE,
sharePartition.cachedState().get(35L).offsetState().get(38L).state());
+ assertEquals(RecordState.ARCHIVED,
sharePartition.cachedState().get(40L).batchState());
+ assertEquals(RecordState.AVAILABLE,
sharePartition.cachedState().get(45L).batchState());
+
+ // Re-trigger the acquisition and rest all the records will be
acquired, including the offsets
+ // ones. The throttling should not happen because of pending state
transition.
+ List<AcquiredRecords> acquiredRecordsList =
fetchAcquiredRecords(sharePartition.acquire(
+ MEMBER_ID,
+ ShareAcquireMode.BATCH_OPTIMIZED,
+ BATCH_SIZE,
+ 500,
+ 15,
+ fetchPartitionData(records),
+ FETCH_ISOLATION_HWM),
+ 13);
+
+ List<AcquiredRecords> expectedAcquiredRecords = new
ArrayList<>(expectedAcquiredRecord(30, 34, 3));
+ expectedAcquiredRecords.addAll(expectedAcquiredRecord(35, 35, 3));
+ expectedAcquiredRecords.addAll(expectedAcquiredRecord(38, 38, 3));
+ expectedAcquiredRecords.addAll(expectedAcquiredRecord(39, 39, 3));
+ expectedAcquiredRecords.addAll(expectedAcquiredRecord(45, 49, 2));
+
+ assertArrayEquals(expectedAcquiredRecords.toArray(),
acquiredRecordsList.toArray());
+ }
+
/**
* This function produces transactional data of a given no. of records
followed by a transactional marker (COMMIT/ABORT).
*/