This is an automated email from the ASF dual-hosted git repository.
mittal pushed a commit to branch trunk
in repository https://gitbox.apache.org/repos/asf/kafka.git
The following commit(s) were added to refs/heads/trunk by this push:
new 25aae990334 KAFKA-19808: Handle batch alignment when share partition
is at capacity (#20738)
25aae990334 is described below
commit 25aae990334d8bed14454b89dd54fc156cdaae81
Author: Apoorv Mittal <[email protected]>
AuthorDate: Tue Oct 21 15:42:00 2025 +0100
KAFKA-19808: Handle batch alignment when share partition is at capacity
(#20738)
The PR fixes couple of issues which are related to the batch alignment:
1. Base offset not found for some acquisition lock timeouts.
2. Gaps reported by client when the topic cannot have any gaps.
The root cause was the misaligned batches when cache is full but the
fetch happens in middle of cached offsets i.e. some records are released
because of acquisition lock time out or expilicitly. As the subsequent
fetch can have lesser bytes of data but the method
`lastOffsetAndMaxRecordsToAcquire` aligned the lastOffsetToAcquire to
`endOffset` hence incorrect batches were created. The PR fixes that by
adding a `min` comparison.
Reviewers: Andrew Schofield <[email protected]>, Abhinav Dixit
<[email protected]>
---
.../java/kafka/server/share/SharePartition.java | 38 ++++++--
.../kafka/server/share/SharePartitionTest.java | 103 +++++++++++++++++++++
2 files changed, 131 insertions(+), 10 deletions(-)
diff --git a/core/src/main/java/kafka/server/share/SharePartition.java
b/core/src/main/java/kafka/server/share/SharePartition.java
index 2c330076733..222769ac875 100644
--- a/core/src/main/java/kafka/server/share/SharePartition.java
+++ b/core/src/main/java/kafka/server/share/SharePartition.java
@@ -373,7 +373,7 @@ public class SharePartition {
* or completes with an exception if the share partition is in
non-initializable state.
*/
public CompletableFuture<Void> maybeInitialize() {
- log.debug("Maybe initialize share partition: {}-{}", groupId,
topicIdPartition);
+ log.trace("Maybe initialize share partition: {}-{}", groupId,
topicIdPartition);
// Check if the share partition is already initialized.
try {
if (initializedOrThrowException()) return
CompletableFuture.completedFuture(null);
@@ -487,6 +487,8 @@ public class SharePartition {
}
// Set the partition state to Active and complete the future.
partitionState = SharePartitionState.ACTIVE;
+ log.debug("Initialized share partition: {}-{} with persister
read state: {}, cached state: {}",
+ groupId, topicIdPartition, result.topicsData(),
cachedState);
} catch (Exception e) {
throwable = e;
} finally {
@@ -738,16 +740,16 @@ public class SharePartition {
baseOffset = floorEntry.getKey();
}
// Validate if the fetch records are already part of existing
batches and if available.
- NavigableMap<Long, InFlightBatch> subMap =
cachedState.subMap(baseOffset, true, lastBatch.lastOffset(), true);
+ NavigableMap<Long, InFlightBatch> subMap =
cachedState.subMap(baseOffset, true, lastOffsetToAcquire, true);
// No overlap with request offsets in the cache for in-flight
records. Acquire the complete
// batch.
if (subMap.isEmpty()) {
log.trace("No cached data exists for the share partition for
requested fetch batch: {}-{}",
groupId, topicIdPartition);
- // Do not send the lastOffsetToAcquire as when the subMap is
empty, it means that
- // there isn't any overlap itself.
+ // It's safe to use lastOffsetToAcquire instead of
lastBatch.lastOffset() because there is no
+ // overlap hence the lastOffsetToAcquire is same as
lastBatch.lastOffset() or before that.
ShareAcquiredRecords shareAcquiredRecords =
acquireNewBatchRecords(memberId, fetchPartitionData.records.batches(),
- firstBatch.baseOffset(), lastBatch.lastOffset(),
batchSize, maxRecordsToAcquire);
+ firstBatch.baseOffset(), lastOffsetToAcquire, batchSize,
maxRecordsToAcquire);
return
maybeFilterAbortedTransactionalAcquiredRecords(fetchPartitionData,
isolationLevel, shareAcquiredRecords);
}
@@ -776,6 +778,15 @@ public class SharePartition {
// If nextBatchStartOffset is less than the key of the
entry, this means the fetch happened for a gap in the cachedState.
// Thus, a new batch needs to be acquired for the gap.
if (maybeGapStartOffset < entry.getKey()) {
+ // It's safe to use entry.getKey() - 1 as the last
offset to acquire for the
+ // gap as the sub map should contain either the next
batch or this line should
+ // not have been executed i.e. say there is a gap from
10-20 and cache contains
+ // [0-9, 21-30], when fetch returns single/multiple
batches from 0-15, then
+ // first sub map entry has no gap and there exists
only 1 entry in sub map.
+ // Hence, for next batch the following code will not
be executed and records
+ // from 10-15 will be acquired later in the code. In
other case, when
+ // fetch returns batches from 0-25, then the sub map
will have 2 entries and
+ // gap will be computed correctly.
ShareAcquiredRecords shareAcquiredRecords =
acquireNewBatchRecords(memberId, fetchPartitionData.records.batches(),
maybeGapStartOffset, entry.getKey() - 1,
batchSize, maxRecordsToAcquire);
result.addAll(shareAcquiredRecords.acquiredRecords());
@@ -791,13 +802,13 @@ public class SharePartition {
}
// Compute if the batch is a full match.
- boolean fullMatch = checkForFullMatch(inFlightBatch,
firstBatch.baseOffset(), lastBatch.lastOffset());
+ boolean fullMatch = checkForFullMatch(inFlightBatch,
firstBatch.baseOffset(), lastOffsetToAcquire);
if (!fullMatch || inFlightBatch.offsetState() != null) {
log.trace("Subset or offset tracked batch record found for
share partition,"
+ " batch: {} request offsets - first: {}, last:
{} for the share"
+ " partition: {}-{}", inFlightBatch,
firstBatch.baseOffset(),
- lastBatch.lastOffset(), groupId, topicIdPartition);
+ lastOffsetToAcquire, groupId, topicIdPartition);
if (inFlightBatch.offsetState() == null) {
// Though the request is a subset of in-flight batch
but the offset
// tracking has not been initialized yet which means
that we could only
@@ -884,7 +895,7 @@ public class SharePartition {
CompletableFuture<Void> future = new CompletableFuture<>();
Throwable throwable = null;
- List<PersisterBatch> persisterBatches = new ArrayList<>();
+ List<PersisterBatch> persisterBatches = new ArrayList<>();
lock.writeLock().lock();
try {
// Avoided using enhanced for loop as need to check if the last
batch have offsets
@@ -1518,8 +1529,15 @@ public class SharePartition {
// in-flight records limit. This can happen when the fetch
is happening in-between
// the in-flight batches and the share partition has
reached the max in-flight records limit.
maxRecordsToAcquire = Math.min(maxFetchRecords, (int)
(endOffset() - fetchOffset + 1));
- // Adjust the last offset to acquire to the endOffset of
the share partition.
- lastOffsetToAcquire = endOffset();
+ // Adjust the last offset to acquire to the minimum of
fetched data's last or
+ // endOffset of the share partition. This is required as
partition fetch bytes
+ // are dynamic and subsequent fetches can fetch lesser
data i.e. lastOffset can
+ // be lesser than endOffset.
+ lastOffsetToAcquire = Math.min(lastOffset, endOffset());
+ log.debug("Share partition {}-{} is at max in-flight
records limit: {}. "
+ + "However, fetch is happening in-between the
in-flight batches, hence adjusting "
+ + "last offset to: {} and max records to: {}",
groupId, topicIdPartition,
+ maxInFlightRecords, lastOffsetToAcquire,
maxRecordsToAcquire);
} else {
// The share partition is already at max in-flight
records, hence cannot acquire more records.
log.debug("Share partition {}-{} has reached max in-flight
records limit: {}. Cannot acquire more records, inflight records count: {}",
diff --git a/core/src/test/java/kafka/server/share/SharePartitionTest.java
b/core/src/test/java/kafka/server/share/SharePartitionTest.java
index b3068abbeda..70f2099bd6a 100644
--- a/core/src/test/java/kafka/server/share/SharePartitionTest.java
+++ b/core/src/test/java/kafka/server/share/SharePartitionTest.java
@@ -2323,6 +2323,109 @@ public class SharePartitionTest {
assertEquals(30, sharePartition.nextFetchOffset());
}
+ @Test
+ public void
testAcquireWithMaxInFlightRecordsReleaseBatchAndAcquireSubsetRecords() {
+ SharePartition sharePartition = SharePartitionBuilder.builder()
+ .withState(SharePartitionState.ACTIVE)
+ .withSharePartitionMetrics(sharePartitionMetrics)
+ .withMaxInflightRecords(20)
+ .build();
+
+ // Acquire records, should be acquired till maxInFlightRecords i.e. 25
records till 24 offset.
+ fetchAcquiredRecords(sharePartition, memoryRecords(5), 5);
+ fetchAcquiredRecords(sharePartition, memoryRecords(5, 10), 10);
+ fetchAcquiredRecords(sharePartition, memoryRecords(15, 10), 10);
+
+ // Validate 3 batches are fetched and fourth batch should be skipped.
Max in-flight records
+ // limit is reached.
+ assertEquals(3, sharePartition.cachedState().size());
+ assertEquals(0, sharePartition.cachedState().get(0L).firstOffset());
+ assertEquals(4, sharePartition.cachedState().get(0L).lastOffset());
+ assertEquals(5, sharePartition.cachedState().get(5L).firstOffset());
+ assertEquals(14, sharePartition.cachedState().get(5L).lastOffset());
+ assertEquals(15, sharePartition.cachedState().get(15L).firstOffset());
+ assertEquals(24, sharePartition.cachedState().get(15L).lastOffset());
+ assertEquals(25, sharePartition.nextFetchOffset());
+
+ // Release middle batch.
+ CompletableFuture<Void> ackResult = sharePartition.acknowledge(
+ MEMBER_ID,
+ List.of(new ShareAcknowledgementBatch(5, 14, List.of((byte) 2))));
+ assertNull(ackResult.join());
+ assertFalse(ackResult.isCompletedExceptionally());
+ // Validate the nextFetchOffset is updated to 5.
+ assertEquals(5, sharePartition.nextFetchOffset());
+
+ // The complete released batch should be acquired but not any other
batch as the lastOffset
+ // is adjusted according to the minimum of fetched batch and endOffset.
+ List<AcquiredRecords> acquiredRecordsList =
fetchAcquiredRecords(sharePartition.acquire(
+ MEMBER_ID,
+ BATCH_SIZE,
+ 500 /* Max fetch records */,
+ 5 /* Fetch Offset */,
+ fetchPartitionData(memoryRecords(5, 10), 0),
+ FETCH_ISOLATION_HWM),
+ 10);
+
+ // Validate 1 batch is fetched, with 10 records till end of batch.
+ assertArrayEquals(expectedAcquiredRecord(5, 14, 2).toArray(),
acquiredRecordsList.toArray());
+ assertEquals(25, sharePartition.nextFetchOffset());
+ }
+
+ @Test
+ public void
testAcquireWithMaxInFlightRecordsReleaseBatchAndAcquireSubsetRecordsOverlap() {
+ SharePartition sharePartition = SharePartitionBuilder.builder()
+ .withState(SharePartitionState.ACTIVE)
+ .withSharePartitionMetrics(sharePartitionMetrics)
+ .withMaxInflightRecords(20)
+ .build();
+
+ // Acquire records, should be acquired till maxInFlightRecords i.e. 25
records till 24 offset.
+ fetchAcquiredRecords(sharePartition, memoryRecords(5), 5);
+ fetchAcquiredRecords(sharePartition, memoryRecords(5, 5), 5);
+ fetchAcquiredRecords(sharePartition, memoryRecords(10, 5), 5);
+ fetchAcquiredRecords(sharePartition, memoryRecords(15, 10), 10);
+
+ // Validate 4 batches are fetched and fourth batch should be skipped.
Max in-flight records
+ // limit is reached.
+ assertEquals(4, sharePartition.cachedState().size());
+ assertEquals(0, sharePartition.cachedState().get(0L).firstOffset());
+ assertEquals(4, sharePartition.cachedState().get(0L).lastOffset());
+ assertEquals(5, sharePartition.cachedState().get(5L).firstOffset());
+ assertEquals(9, sharePartition.cachedState().get(5L).lastOffset());
+ assertEquals(10, sharePartition.cachedState().get(10L).firstOffset());
+ assertEquals(14, sharePartition.cachedState().get(10L).lastOffset());
+ assertEquals(15, sharePartition.cachedState().get(15L).firstOffset());
+ assertEquals(24, sharePartition.cachedState().get(15L).lastOffset());
+ assertEquals(25, sharePartition.nextFetchOffset());
+
+ // Release only 1 middle batch.
+ CompletableFuture<Void> ackResult = sharePartition.acknowledge(
+ MEMBER_ID,
+ List.of(new ShareAcknowledgementBatch(5, 9, List.of((byte) 2))));
+ assertNull(ackResult.join());
+ assertFalse(ackResult.isCompletedExceptionally());
+ // Validate the nextFetchOffset is updated to 5.
+ assertEquals(5, sharePartition.nextFetchOffset());
+
+ // Adjust the max fetch records to 6 so it's just 1 record more than
the released batch size.
+ // This shall not impact the acquired records as only the released
batch should be acquired.
+ // However, this previously caused an issue where the subset of
records were acquired from the
+ // next batch due to incorrect calculation.
+ List<AcquiredRecords> acquiredRecordsList =
fetchAcquiredRecords(sharePartition.acquire(
+ MEMBER_ID,
+ BATCH_SIZE,
+ 6 /* Max fetch records */,
+ 5 /* Fetch Offset */,
+ fetchPartitionData(memoryRecords(5, 5), 0),
+ FETCH_ISOLATION_HWM),
+ 5);
+
+ // Validate 1 batch is fetched, with 5 records till end of batch.
+ assertArrayEquals(expectedAcquiredRecord(5, 9, 2).toArray(),
acquiredRecordsList.toArray());
+ assertEquals(25, sharePartition.nextFetchOffset());
+ }
+
@Test
public void testNextFetchOffsetInitialState() {
SharePartition sharePartition =
SharePartitionBuilder.builder().withState(SharePartitionState.ACTIVE).build();