This is an automated email from the ASF dual-hosted git repository. mittal pushed a commit to branch trunk in repository https://gitbox.apache.org/repos/asf/kafka.git
The following commit(s) were added to refs/heads/trunk by this push: new 875537f54b7 KAFKA-19555: Restrict records acquisition post max in-flight limit (#20253) 875537f54b7 is described below commit 875537f54b718ea5278a0f4ffd09e2b8ed84e7fe Author: Apoorv Mittal <apoorvmitta...@gmail.com> AuthorDate: Tue Jul 29 10:40:06 2025 +0100 KAFKA-19555: Restrict records acquisition post max in-flight limit (#20253) The PR restricts the records being acquired post max-inflight limit. Previously the max in-flight limit was only enforced while considering the share partition for further fetches i.e. once the limit was reached the share partition was not considered for further fetches. However, when the records are actively released then there might be some records being acquired post max-inflight limit. This is evident with higher number of consumers reading from same share partition and releasing the records. Reviewers: Andrew Schofield <aschofi...@confluent.io>, Lan Ding <isdin...@163.com> --- .../java/kafka/server/share/SharePartition.java | 85 ++++++++++-- .../kafka/server/share/SharePartitionTest.java | 144 +++++++++++++++++++++ .../share/metrics/SharePartitionMetrics.java | 2 +- 3 files changed, 218 insertions(+), 13 deletions(-) diff --git a/core/src/main/java/kafka/server/share/SharePartition.java b/core/src/main/java/kafka/server/share/SharePartition.java index 891e37f8af7..dbd7e5e1730 100644 --- a/core/src/main/java/kafka/server/share/SharePartition.java +++ b/core/src/main/java/kafka/server/share/SharePartition.java @@ -656,7 +656,7 @@ public class SharePartition { * @param isolationLevel The isolation level for the share fetch request. * @return The acquired records for the share partition. */ - @SuppressWarnings("cyclomaticcomplexity") // Consider refactoring to avoid suppression + @SuppressWarnings({"cyclomaticcomplexity", "methodlength"}) // Consider refactoring to avoid suppression public ShareAcquiredRecords acquire( String memberId, int batchSize, @@ -677,6 +677,16 @@ public class SharePartition { return ShareAcquiredRecords.empty(); } + LastOffsetAndMaxRecords lastOffsetAndMaxRecords = lastOffsetAndMaxRecordsToAcquire(fetchOffset, + maxFetchRecords, lastBatch.lastOffset()); + if (lastOffsetAndMaxRecords.maxRecords() <= 0) { + return ShareAcquiredRecords.empty(); + } + // The lastOffsetAndMaxRecords contains the last offset to acquire and the maximum number of records + // to acquire. + int maxRecordsToAcquire = lastOffsetAndMaxRecords.maxRecords(); + long lastOffsetToAcquire = lastOffsetAndMaxRecords.lastOffset(); + // We require the first batch of records to get the base offset. Stop parsing further // batches. RecordBatch firstBatch = fetchPartitionData.records.batches().iterator().next(); @@ -708,8 +718,10 @@ public class SharePartition { if (subMap.isEmpty()) { log.trace("No cached data exists for the share partition for requested fetch batch: {}-{}", groupId, topicIdPartition); + // Do not send the lastOffsetToAcquire as when the subMap is empty, it means that + // there isn't any overlap itself. ShareAcquiredRecords shareAcquiredRecords = acquireNewBatchRecords(memberId, fetchPartitionData.records.batches(), - firstBatch.baseOffset(), lastBatch.lastOffset(), batchSize, maxFetchRecords); + firstBatch.baseOffset(), lastBatch.lastOffset(), batchSize, maxRecordsToAcquire); return maybeFilterAbortedTransactionalAcquiredRecords(fetchPartitionData, isolationLevel, shareAcquiredRecords); } @@ -726,7 +738,7 @@ public class SharePartition { // be an exact match, subset or span over multiple already fetched batches. for (Map.Entry<Long, InFlightBatch> entry : subMap.entrySet()) { // If the acquired count is equal to the max fetch records then break the loop. - if (acquiredCount >= maxFetchRecords) { + if (acquiredCount >= maxRecordsToAcquire) { break; } @@ -739,14 +751,14 @@ public class SharePartition { // Thus, a new batch needs to be acquired for the gap. if (maybeGapStartOffset < entry.getKey()) { ShareAcquiredRecords shareAcquiredRecords = acquireNewBatchRecords(memberId, fetchPartitionData.records.batches(), - maybeGapStartOffset, entry.getKey() - 1, batchSize, maxFetchRecords); + maybeGapStartOffset, entry.getKey() - 1, batchSize, maxRecordsToAcquire); result.addAll(shareAcquiredRecords.acquiredRecords()); acquiredCount += shareAcquiredRecords.count(); } // Set nextBatchStartOffset as the last offset of the current in-flight batch + 1 maybeGapStartOffset = inFlightBatch.lastOffset() + 1; // If the acquired count is equal to the max fetch records then break the loop. - if (acquiredCount >= maxFetchRecords) { + if (acquiredCount >= maxRecordsToAcquire) { break; } } @@ -778,7 +790,7 @@ public class SharePartition { // Do not send max fetch records to acquireSubsetBatchRecords as we want to acquire // all the records from the batch as the batch will anyway be part of the file-records // response batch. - int acquiredSubsetCount = acquireSubsetBatchRecords(memberId, firstBatch.baseOffset(), lastBatch.lastOffset(), inFlightBatch, result); + int acquiredSubsetCount = acquireSubsetBatchRecords(memberId, firstBatch.baseOffset(), lastOffsetToAcquire, inFlightBatch, result); acquiredCount += acquiredSubsetCount; continue; } @@ -810,11 +822,11 @@ public class SharePartition { // Some of the request offsets are not found in the fetched batches. Acquire the // missing records as well. - if (acquiredCount < maxFetchRecords && subMap.lastEntry().getValue().lastOffset() < lastBatch.lastOffset()) { + if (acquiredCount < maxRecordsToAcquire && subMap.lastEntry().getValue().lastOffset() < lastOffsetToAcquire) { log.trace("There exists another batch which needs to be acquired as well"); ShareAcquiredRecords shareAcquiredRecords = acquireNewBatchRecords(memberId, fetchPartitionData.records.batches(), subMap.lastEntry().getValue().lastOffset() + 1, - lastBatch.lastOffset(), batchSize, maxFetchRecords - acquiredCount); + lastOffsetToAcquire, batchSize, maxRecordsToAcquire - acquiredCount); result.addAll(shareAcquiredRecords.acquiredRecords()); acquiredCount += shareAcquiredRecords.count(); } @@ -1385,14 +1397,14 @@ public class SharePartition { sharePartitionMetrics.registerInFlightBatchCount(this.cachedState::size); } - private long numInFlightRecords() { + private int numInFlightRecords() { lock.readLock().lock(); - long numRecords; + int numRecords; try { if (cachedState.isEmpty()) { numRecords = 0; } else { - numRecords = this.endOffset - this.startOffset + 1; + numRecords = (int) (this.endOffset - this.startOffset + 1); } } finally { lock.readLock().unlock(); @@ -1449,6 +1461,46 @@ public class SharePartition { } } + private LastOffsetAndMaxRecords lastOffsetAndMaxRecordsToAcquire(long fetchOffset, int maxFetchRecords, long lastOffset) { + // There can always be records fetched exceeding the max in-flight messages limit. Hence, + // we need to check if the share partition has reached the max in-flight messages limit + // and only acquire limited records. + int maxRecordsToAcquire; + long lastOffsetToAcquire = lastOffset; + lock.readLock().lock(); + try { + int inFlightRecordsCount = numInFlightRecords(); + // Take minimum of maxFetchRecords and remaining capacity to fill max in-flight messages limit. + maxRecordsToAcquire = Math.min(maxFetchRecords, maxInFlightMessages - inFlightRecordsCount); + // If the maxRecordsToAcquire is less than or equal to 0, then ideally (check exists to not + // fetch records for share partitions which are at capacity) the fetch must be happening + // in-between the in-flight batches i.e. some in-flight records have been released (marked + // re-available). In such case, last offset to acquire should be adjusted to the endOffset + // of the share partition, if not adjusted then the records can be acquired post the endOffset. + // For example, if 30 records are already acquired i.e. [0-29] and single offset 20 is released + // then the next fetch request will be at 20. Difference from endOffset will be 10, which + // means that some offset past the endOffset can be acquired (21-29 are already acquired). + // Hence, the lastOffsetToAcquire should be adjusted to the endOffset. + if (maxRecordsToAcquire <= 0) { + if (fetchOffset <= endOffset()) { + // Adjust the max records to acquire to the capacity available to fill the max + // in-flight messages limit. This can happen when the fetch is happening in-between + // the in-flight batches and the share partition has reached the max in-flight messages limit. + maxRecordsToAcquire = Math.min(maxFetchRecords, (int) (endOffset() - fetchOffset + 1)); + // Adjust the last offset to acquire to the endOffset of the share partition. + lastOffsetToAcquire = endOffset(); + } else { + // The share partition is already at max in-flight messages, hence cannot acquire more records. + log.debug("Share partition {}-{} has reached max in-flight messages limit: {}. Cannot acquire more records, inflight records count: {}", + groupId, topicIdPartition, maxInFlightMessages, inFlightRecordsCount); + } + } + } finally { + lock.readLock().unlock(); + } + return new LastOffsetAndMaxRecords(lastOffsetToAcquire, maxRecordsToAcquire); + } + private ShareAcquiredRecords acquireNewBatchRecords( String memberId, Iterable<? extends RecordBatch> batches, @@ -2242,7 +2294,7 @@ public class SharePartition { .setGroupId(this.groupId) .setTopicsData(List.of(new TopicData<>(topicIdPartition.topicId(), List.of(PartitionFactory.newPartitionStateBatchData( - topicIdPartition.partition(), stateEpoch, startOffset, leaderEpoch, stateBatches)))) + topicIdPartition.partition(), stateEpoch, startOffset(), leaderEpoch, stateBatches)))) ).build()).build()) .whenComplete((result, exception) -> { if (exception != null) { @@ -2803,4 +2855,13 @@ public class SharePartition { this.offsetMetadata = offsetMetadata; } } + + /** + * LastOffsetAndMaxRecords class is used to track the last offset to acquire and the maximum number + * of records that can be acquired in a fetch request. + */ + private record LastOffsetAndMaxRecords( + long lastOffset, + int maxRecords + ) { } } diff --git a/core/src/test/java/kafka/server/share/SharePartitionTest.java b/core/src/test/java/kafka/server/share/SharePartitionTest.java index a0bdfee7b5f..c4fc0a0454d 100644 --- a/core/src/test/java/kafka/server/share/SharePartitionTest.java +++ b/core/src/test/java/kafka/server/share/SharePartitionTest.java @@ -1510,6 +1510,150 @@ public class SharePartitionTest { assertTrue(sharePartition.cachedState().containsKey(12L)); } + @Test + public void testAcquireWithMaxInFlightMessagesAndTryAcquireNewBatch() { + SharePartition sharePartition = SharePartitionBuilder.builder() + .withState(SharePartitionState.ACTIVE) + .withSharePartitionMetrics(sharePartitionMetrics) + .withMaxInflightMessages(20) + .build(); + + // Acquire records, all 10 records should be acquired as within maxInflightMessages limit. + List<AcquiredRecords> acquiredRecordsList = fetchAcquiredRecords(sharePartition.acquire( + MEMBER_ID, + BATCH_SIZE, + 500 /* Max fetch records */, + DEFAULT_FETCH_OFFSET, + fetchPartitionData(memoryRecords(10, 0), 0), + FETCH_ISOLATION_HWM), + 10); + // Validate all 10 records will be acquired as the maxInFlightMessages is 20. + assertArrayEquals(expectedAcquiredRecord(0, 9, 1).toArray(), acquiredRecordsList.toArray()); + assertEquals(10, sharePartition.nextFetchOffset()); + + // Create 4 batches of records. + ByteBuffer buffer = ByteBuffer.allocate(4096); + memoryRecordsBuilder(buffer, 5, 10).close(); + memoryRecordsBuilder(buffer, 10, 15).close(); + memoryRecordsBuilder(buffer, 5, 25).close(); + memoryRecordsBuilder(buffer, 2, 30).close(); + + buffer.flip(); + + MemoryRecords records = MemoryRecords.readableRecords(buffer); + + // Acquire records, should be acquired till maxInFlightMessages i.e. 20 records. As second batch + // is ending at 24 offset, hence additional 15 records will be acquired. + acquiredRecordsList = fetchAcquiredRecords(sharePartition.acquire( + MEMBER_ID, + BATCH_SIZE, + 500 /* Max fetch records */, + DEFAULT_FETCH_OFFSET, + fetchPartitionData(records, 0), + FETCH_ISOLATION_HWM), + 15); + + // Validate 2 batches are fetched one with 5 records and other till end of batch, third batch + // should be skipped. + assertArrayEquals(expectedAcquiredRecord(10, 24, 1).toArray(), acquiredRecordsList.toArray()); + assertEquals(25, sharePartition.nextFetchOffset()); + + // Should not acquire any records as the share partition is at capacity and fetch offset is beyond + // the end offset. + fetchAcquiredRecords(sharePartition.acquire( + MEMBER_ID, + BATCH_SIZE, + 500 /* Max fetch records */, + 25 /* Fetch Offset */, + fetchPartitionData(memoryRecords(10, 25), 10), + FETCH_ISOLATION_HWM), + 0); + + assertEquals(25, sharePartition.nextFetchOffset()); + } + + @Test + public void testAcquireWithMaxInFlightMessagesAndReleaseLastOffset() { + SharePartition sharePartition = SharePartitionBuilder.builder() + .withState(SharePartitionState.ACTIVE) + .withSharePartitionMetrics(sharePartitionMetrics) + .withMaxInflightMessages(20) + .build(); + + // Create 4 batches of records. + ByteBuffer buffer = ByteBuffer.allocate(4096); + memoryRecordsBuilder(buffer, 5, 10).close(); + memoryRecordsBuilder(buffer, 10, 15).close(); + memoryRecordsBuilder(buffer, 5, 25).close(); + memoryRecordsBuilder(buffer, 3, 30).close(); + + buffer.flip(); + + MemoryRecords records = MemoryRecords.readableRecords(buffer); + // Acquire records, should be acquired till maxInFlightMessages i.e. 20 records till 29 offset. + List<AcquiredRecords> acquiredRecordsList = fetchAcquiredRecords(sharePartition.acquire( + MEMBER_ID, + BATCH_SIZE, + 500 /* Max fetch records */, + DEFAULT_FETCH_OFFSET, + fetchPartitionData(records, 10), + FETCH_ISOLATION_HWM), + 20); + + // Validate 3 batches are fetched and fourth batch should be skipped. Max in-flight messages + // limit is reached. + assertArrayEquals(expectedAcquiredRecord(10, 29, 1).toArray(), acquiredRecordsList.toArray()); + assertEquals(30, sharePartition.nextFetchOffset()); + + // Release middle batch. + CompletableFuture<Void> ackResult = sharePartition.acknowledge( + MEMBER_ID, + List.of(new ShareAcknowledgementBatch(15, 19, List.of((byte) 2)))); + assertNull(ackResult.join()); + assertFalse(ackResult.isCompletedExceptionally()); + // Validate the nextFetchOffset is updated to 15. + assertEquals(15, sharePartition.nextFetchOffset()); + + // The complete released batch should be acquired but not the last batch, starting at offset 30, + // as the lastOffset is adjusted according to the endOffset. + acquiredRecordsList = fetchAcquiredRecords(sharePartition.acquire( + MEMBER_ID, + BATCH_SIZE, + 500 /* Max fetch records */, + 15 /* Fetch Offset */, + fetchPartitionData(records, 10), + FETCH_ISOLATION_HWM), + 5); + + // Validate 1 batch is fetched, with 5 records till end of batch, last available batch should + // not be acquired + assertArrayEquals(expectedAcquiredRecords(15, 19, 2).toArray(), acquiredRecordsList.toArray()); + assertEquals(30, sharePartition.nextFetchOffset()); + + // Release last offset of the acquired batch. Only 1 record should be released and later acquired. + ackResult = sharePartition.acknowledge( + MEMBER_ID, + List.of(new ShareAcknowledgementBatch(29, 29, List.of((byte) 2)))); + assertNull(ackResult.join()); + assertFalse(ackResult.isCompletedExceptionally()); + // Validate the nextFetchOffset is updated to 29. + assertEquals(29, sharePartition.nextFetchOffset()); + + // Only the last record of the acquired batch should be acquired again. + acquiredRecordsList = fetchAcquiredRecords(sharePartition.acquire( + MEMBER_ID, + BATCH_SIZE, + 500 /* Max fetch records */, + 29 /* Fetch Offset */, + fetchPartitionData(records, 10), + FETCH_ISOLATION_HWM), + 1); + + // Validate 1 record is acquired. + assertArrayEquals(expectedAcquiredRecord(29, 29, 2).toArray(), acquiredRecordsList.toArray()); + assertEquals(30, sharePartition.nextFetchOffset()); + } + @Test public void testNextFetchOffsetInitialState() { SharePartition sharePartition = SharePartitionBuilder.builder().withState(SharePartitionState.ACTIVE).build(); diff --git a/server/src/main/java/org/apache/kafka/server/share/metrics/SharePartitionMetrics.java b/server/src/main/java/org/apache/kafka/server/share/metrics/SharePartitionMetrics.java index 6ea94d46374..e83a3932402 100644 --- a/server/src/main/java/org/apache/kafka/server/share/metrics/SharePartitionMetrics.java +++ b/server/src/main/java/org/apache/kafka/server/share/metrics/SharePartitionMetrics.java @@ -96,7 +96,7 @@ public class SharePartitionMetrics implements AutoCloseable { * * @param messageCountSupplier The supplier for the in-flight message count. */ - public void registerInFlightMessageCount(Supplier<Long> messageCountSupplier) { + public void registerInFlightMessageCount(Supplier<Integer> messageCountSupplier) { metricsGroup.newGauge( IN_FLIGHT_MESSAGE_COUNT, messageCountSupplier,