This is an automated email from the ASF dual-hosted git repository.

mittal pushed a commit to branch trunk
in repository https://gitbox.apache.org/repos/asf/kafka.git


The following commit(s) were added to refs/heads/trunk by this push:
     new 25aae990334 KAFKA-19808: Handle batch alignment when share partition 
is at capacity (#20738)
25aae990334 is described below

commit 25aae990334d8bed14454b89dd54fc156cdaae81
Author: Apoorv Mittal <[email protected]>
AuthorDate: Tue Oct 21 15:42:00 2025 +0100

    KAFKA-19808: Handle batch alignment when share partition is at capacity 
(#20738)
    
    The PR fixes couple of issues which are related to the batch alignment:
    
    1. Base offset not found for some acquisition lock timeouts.
    2. Gaps reported by client when the topic cannot have any gaps.
    
    The root cause was the misaligned batches when cache is full but the
    fetch happens in middle of cached offsets i.e. some records are released
    because of acquisition lock time out or expilicitly. As the subsequent
    fetch can have lesser bytes of data but the method
    `lastOffsetAndMaxRecordsToAcquire` aligned the lastOffsetToAcquire to
    `endOffset` hence incorrect batches were created. The PR fixes that by
    adding a `min` comparison.
    
    Reviewers: Andrew Schofield <[email protected]>, Abhinav Dixit
     <[email protected]>
---
 .../java/kafka/server/share/SharePartition.java    |  38 ++++++--
 .../kafka/server/share/SharePartitionTest.java     | 103 +++++++++++++++++++++
 2 files changed, 131 insertions(+), 10 deletions(-)

diff --git a/core/src/main/java/kafka/server/share/SharePartition.java 
b/core/src/main/java/kafka/server/share/SharePartition.java
index 2c330076733..222769ac875 100644
--- a/core/src/main/java/kafka/server/share/SharePartition.java
+++ b/core/src/main/java/kafka/server/share/SharePartition.java
@@ -373,7 +373,7 @@ public class SharePartition {
      *         or completes with an exception if the share partition is in 
non-initializable state.
      */
     public CompletableFuture<Void> maybeInitialize() {
-        log.debug("Maybe initialize share partition: {}-{}", groupId, 
topicIdPartition);
+        log.trace("Maybe initialize share partition: {}-{}", groupId, 
topicIdPartition);
         // Check if the share partition is already initialized.
         try {
             if (initializedOrThrowException()) return 
CompletableFuture.completedFuture(null);
@@ -487,6 +487,8 @@ public class SharePartition {
                 }
                 // Set the partition state to Active and complete the future.
                 partitionState = SharePartitionState.ACTIVE;
+                log.debug("Initialized share partition: {}-{} with persister 
read state: {}, cached state: {}",
+                    groupId, topicIdPartition, result.topicsData(), 
cachedState);
             } catch (Exception e) {
                 throwable = e;
             } finally {
@@ -738,16 +740,16 @@ public class SharePartition {
                 baseOffset = floorEntry.getKey();
             }
             // Validate if the fetch records are already part of existing 
batches and if available.
-            NavigableMap<Long, InFlightBatch> subMap = 
cachedState.subMap(baseOffset, true, lastBatch.lastOffset(), true);
+            NavigableMap<Long, InFlightBatch> subMap = 
cachedState.subMap(baseOffset, true, lastOffsetToAcquire, true);
             // No overlap with request offsets in the cache for in-flight 
records. Acquire the complete
             // batch.
             if (subMap.isEmpty()) {
                 log.trace("No cached data exists for the share partition for 
requested fetch batch: {}-{}",
                     groupId, topicIdPartition);
-                // Do not send the lastOffsetToAcquire as when the subMap is 
empty, it means that
-                // there isn't any overlap itself.
+                // It's safe to use lastOffsetToAcquire instead of 
lastBatch.lastOffset() because there is no
+                // overlap hence the lastOffsetToAcquire is same as 
lastBatch.lastOffset() or before that.
                 ShareAcquiredRecords shareAcquiredRecords = 
acquireNewBatchRecords(memberId, fetchPartitionData.records.batches(),
-                    firstBatch.baseOffset(), lastBatch.lastOffset(), 
batchSize, maxRecordsToAcquire);
+                    firstBatch.baseOffset(), lastOffsetToAcquire, batchSize, 
maxRecordsToAcquire);
                 return 
maybeFilterAbortedTransactionalAcquiredRecords(fetchPartitionData, 
isolationLevel, shareAcquiredRecords);
             }
 
@@ -776,6 +778,15 @@ public class SharePartition {
                     // If nextBatchStartOffset is less than the key of the 
entry, this means the fetch happened for a gap in the cachedState.
                     // Thus, a new batch needs to be acquired for the gap.
                     if (maybeGapStartOffset < entry.getKey()) {
+                        // It's safe to use entry.getKey() - 1 as the last 
offset to acquire for the
+                        // gap as the sub map should contain either the next 
batch or this line should
+                        // not have been executed i.e. say there is a gap from 
10-20 and cache contains
+                        // [0-9, 21-30], when fetch returns single/multiple 
batches from 0-15, then
+                        // first sub map entry has no gap and there exists 
only 1 entry in sub map.
+                        // Hence, for next batch the following code will not 
be executed and records
+                        // from 10-15 will be acquired later in the code. In 
other case, when
+                        // fetch returns batches from 0-25, then the sub map 
will have 2 entries and
+                        // gap will be computed correctly.
                         ShareAcquiredRecords shareAcquiredRecords = 
acquireNewBatchRecords(memberId, fetchPartitionData.records.batches(),
                             maybeGapStartOffset, entry.getKey() - 1, 
batchSize, maxRecordsToAcquire);
                         result.addAll(shareAcquiredRecords.acquiredRecords());
@@ -791,13 +802,13 @@ public class SharePartition {
                 }
 
                 // Compute if the batch is a full match.
-                boolean fullMatch = checkForFullMatch(inFlightBatch, 
firstBatch.baseOffset(), lastBatch.lastOffset());
+                boolean fullMatch = checkForFullMatch(inFlightBatch, 
firstBatch.baseOffset(), lastOffsetToAcquire);
 
                 if (!fullMatch || inFlightBatch.offsetState() != null) {
                     log.trace("Subset or offset tracked batch record found for 
share partition,"
                             + " batch: {} request offsets - first: {}, last: 
{} for the share"
                             + " partition: {}-{}", inFlightBatch, 
firstBatch.baseOffset(),
-                        lastBatch.lastOffset(), groupId, topicIdPartition);
+                        lastOffsetToAcquire, groupId, topicIdPartition);
                     if (inFlightBatch.offsetState() == null) {
                         // Though the request is a subset of in-flight batch 
but the offset
                         // tracking has not been initialized yet which means 
that we could only
@@ -884,7 +895,7 @@ public class SharePartition {
 
         CompletableFuture<Void> future = new CompletableFuture<>();
         Throwable throwable = null;
-        List<PersisterBatch>  persisterBatches = new ArrayList<>();
+        List<PersisterBatch> persisterBatches = new ArrayList<>();
         lock.writeLock().lock();
         try {
             // Avoided using enhanced for loop as need to check if the last 
batch have offsets
@@ -1518,8 +1529,15 @@ public class SharePartition {
                     // in-flight records limit. This can happen when the fetch 
is happening in-between
                     // the in-flight batches and the share partition has 
reached the max in-flight records limit.
                     maxRecordsToAcquire = Math.min(maxFetchRecords, (int) 
(endOffset() - fetchOffset + 1));
-                    // Adjust the last offset to acquire to the endOffset of 
the share partition.
-                    lastOffsetToAcquire = endOffset();
+                    // Adjust the last offset to acquire to the minimum of 
fetched data's last or
+                    // endOffset of the share partition. This is required as 
partition fetch bytes
+                    // are dynamic and subsequent fetches can fetch lesser 
data i.e. lastOffset can
+                    // be lesser than endOffset.
+                    lastOffsetToAcquire = Math.min(lastOffset, endOffset());
+                    log.debug("Share partition {}-{} is at max in-flight 
records limit: {}. "
+                            + "However, fetch is happening in-between the 
in-flight batches, hence adjusting "
+                            + "last offset to: {} and max records to: {}", 
groupId, topicIdPartition,
+                        maxInFlightRecords, lastOffsetToAcquire, 
maxRecordsToAcquire);
                 } else {
                     // The share partition is already at max in-flight 
records, hence cannot acquire more records.
                     log.debug("Share partition {}-{} has reached max in-flight 
records limit: {}. Cannot acquire more records, inflight records count: {}",
diff --git a/core/src/test/java/kafka/server/share/SharePartitionTest.java 
b/core/src/test/java/kafka/server/share/SharePartitionTest.java
index b3068abbeda..70f2099bd6a 100644
--- a/core/src/test/java/kafka/server/share/SharePartitionTest.java
+++ b/core/src/test/java/kafka/server/share/SharePartitionTest.java
@@ -2323,6 +2323,109 @@ public class SharePartitionTest {
         assertEquals(30, sharePartition.nextFetchOffset());
     }
 
+    @Test
+    public void 
testAcquireWithMaxInFlightRecordsReleaseBatchAndAcquireSubsetRecords() {
+        SharePartition sharePartition = SharePartitionBuilder.builder()
+            .withState(SharePartitionState.ACTIVE)
+            .withSharePartitionMetrics(sharePartitionMetrics)
+            .withMaxInflightRecords(20)
+            .build();
+
+        // Acquire records, should be acquired till maxInFlightRecords i.e. 25 
records till 24 offset.
+        fetchAcquiredRecords(sharePartition, memoryRecords(5), 5);
+        fetchAcquiredRecords(sharePartition, memoryRecords(5, 10), 10);
+        fetchAcquiredRecords(sharePartition, memoryRecords(15, 10), 10);
+
+        // Validate 3 batches are fetched and fourth batch should be skipped. 
Max in-flight records
+        // limit is reached.
+        assertEquals(3, sharePartition.cachedState().size());
+        assertEquals(0, sharePartition.cachedState().get(0L).firstOffset());
+        assertEquals(4, sharePartition.cachedState().get(0L).lastOffset());
+        assertEquals(5, sharePartition.cachedState().get(5L).firstOffset());
+        assertEquals(14, sharePartition.cachedState().get(5L).lastOffset());
+        assertEquals(15, sharePartition.cachedState().get(15L).firstOffset());
+        assertEquals(24, sharePartition.cachedState().get(15L).lastOffset());
+        assertEquals(25, sharePartition.nextFetchOffset());
+
+        // Release middle batch.
+        CompletableFuture<Void> ackResult = sharePartition.acknowledge(
+            MEMBER_ID,
+            List.of(new ShareAcknowledgementBatch(5, 14, List.of((byte) 2))));
+        assertNull(ackResult.join());
+        assertFalse(ackResult.isCompletedExceptionally());
+        // Validate the nextFetchOffset is updated to 5.
+        assertEquals(5, sharePartition.nextFetchOffset());
+
+        // The complete released batch should be acquired but not any other 
batch as the lastOffset
+        // is adjusted according to the minimum of fetched batch and endOffset.
+        List<AcquiredRecords> acquiredRecordsList = 
fetchAcquiredRecords(sharePartition.acquire(
+                MEMBER_ID,
+                BATCH_SIZE,
+                500 /* Max fetch records */,
+                5 /* Fetch Offset */,
+                fetchPartitionData(memoryRecords(5, 10), 0),
+                FETCH_ISOLATION_HWM),
+            10);
+
+        // Validate 1 batch is fetched, with 10 records till end of batch.
+        assertArrayEquals(expectedAcquiredRecord(5, 14, 2).toArray(), 
acquiredRecordsList.toArray());
+        assertEquals(25, sharePartition.nextFetchOffset());
+    }
+
+    @Test
+    public void 
testAcquireWithMaxInFlightRecordsReleaseBatchAndAcquireSubsetRecordsOverlap() {
+        SharePartition sharePartition = SharePartitionBuilder.builder()
+            .withState(SharePartitionState.ACTIVE)
+            .withSharePartitionMetrics(sharePartitionMetrics)
+            .withMaxInflightRecords(20)
+            .build();
+
+        // Acquire records, should be acquired till maxInFlightRecords i.e. 25 
records till 24 offset.
+        fetchAcquiredRecords(sharePartition, memoryRecords(5), 5);
+        fetchAcquiredRecords(sharePartition, memoryRecords(5, 5), 5);
+        fetchAcquiredRecords(sharePartition, memoryRecords(10, 5), 5);
+        fetchAcquiredRecords(sharePartition, memoryRecords(15, 10), 10);
+
+        // Validate 4 batches are fetched and fourth batch should be skipped. 
Max in-flight records
+        // limit is reached.
+        assertEquals(4, sharePartition.cachedState().size());
+        assertEquals(0, sharePartition.cachedState().get(0L).firstOffset());
+        assertEquals(4, sharePartition.cachedState().get(0L).lastOffset());
+        assertEquals(5, sharePartition.cachedState().get(5L).firstOffset());
+        assertEquals(9, sharePartition.cachedState().get(5L).lastOffset());
+        assertEquals(10, sharePartition.cachedState().get(10L).firstOffset());
+        assertEquals(14, sharePartition.cachedState().get(10L).lastOffset());
+        assertEquals(15, sharePartition.cachedState().get(15L).firstOffset());
+        assertEquals(24, sharePartition.cachedState().get(15L).lastOffset());
+        assertEquals(25, sharePartition.nextFetchOffset());
+
+        // Release only 1 middle batch.
+        CompletableFuture<Void> ackResult = sharePartition.acknowledge(
+            MEMBER_ID,
+            List.of(new ShareAcknowledgementBatch(5, 9, List.of((byte) 2))));
+        assertNull(ackResult.join());
+        assertFalse(ackResult.isCompletedExceptionally());
+        // Validate the nextFetchOffset is updated to 5.
+        assertEquals(5, sharePartition.nextFetchOffset());
+
+        // Adjust the max fetch records to 6 so it's just 1 record more than 
the released batch size.
+        // This shall not impact the acquired records as only the released 
batch should be acquired.
+        // However, this previously caused an issue where the subset of 
records were acquired from the
+        // next batch due to incorrect calculation.
+        List<AcquiredRecords> acquiredRecordsList = 
fetchAcquiredRecords(sharePartition.acquire(
+                MEMBER_ID,
+                BATCH_SIZE,
+                6 /* Max fetch records */,
+                5 /* Fetch Offset */,
+                fetchPartitionData(memoryRecords(5, 5), 0),
+                FETCH_ISOLATION_HWM),
+            5);
+
+        // Validate 1 batch is fetched, with 5 records till end of batch.
+        assertArrayEquals(expectedAcquiredRecord(5, 9, 2).toArray(), 
acquiredRecordsList.toArray());
+        assertEquals(25, sharePartition.nextFetchOffset());
+    }
+
     @Test
     public void testNextFetchOffsetInitialState() {
         SharePartition sharePartition = 
SharePartitionBuilder.builder().withState(SharePartitionState.ACTIVE).build();

Reply via email to