This is an automated email from the ASF dual-hosted git repository.

mittal pushed a commit to branch 4.2
in repository https://gitbox.apache.org/repos/asf/kafka.git


The following commit(s) were added to refs/heads/4.2 by this push:
     new cf579edd268 KAFKA-19954: Fixing overlapping batches post 
re-initalization of Share Partition (#21051)
cf579edd268 is described below

commit cf579edd2680e0cc3031bc200be35c1490ab53d8
Author: Apoorv Mittal <[email protected]>
AuthorDate: Tue Dec 2 20:50:33 2025 +0000

    KAFKA-19954: Fixing overlapping batches post re-initalization of Share 
Partition (#21051)
    
    The PR fixes the issue when SharePartition is re-initalized and starts
    fetching prior to the start offset. In edge cases, there occurs a
    scenario when  `acquire` method creates overlapping batches in cache.
    The issue is more  evident in `record_limit` mode.
    
    Consider a scenario where log has 0-99 offsets as batch and post
    re-initialization of share partition the start offset becomes 5, first 5
    records already acknowledged in previous share-partition instance. Prior
    to the fix:
    - The request arrives for fetching next 5 offsets hence the cache will
    hold 5-99, with 5-9 offsets as acquired.
    - Client acknowledes 5-9 offset, start offset moves to 10.
    - Client re-fetches from offset 10 and gets same 0-99 batch.
    - Acquire readjusts the base offset to 10 as start offset has moved.
    - There won't be any overlapping batch in cache for 10-99 as the key in
    cache is 5.
    - Hence, 0-99 cache entry will be created
    
    Post Fix:
    - There will be an overlapping entry in the cache as start considering
    the minimum of baseOffset and batch's firstOffset.
    
    Also fixed other scenarios where due to startOffset move and no cache
    overlap the offsets could be acquired prior to startOffset.
    
    Reviewers: Andrew Schofield <[email protected]>, Abhinav Dixit
     <[email protected]>
---
 .../java/kafka/server/share/SharePartition.java    |  27 ++-
 .../kafka/server/share/SharePartitionTest.java     | 210 +++++++++++++++++++++
 2 files changed, 234 insertions(+), 3 deletions(-)

diff --git a/core/src/main/java/kafka/server/share/SharePartition.java 
b/core/src/main/java/kafka/server/share/SharePartition.java
index 23a85611719..c6eb76129a6 100644
--- a/core/src/main/java/kafka/server/share/SharePartition.java
+++ b/core/src/main/java/kafka/server/share/SharePartition.java
@@ -736,6 +736,14 @@ public class SharePartition {
             return ShareAcquiredRecords.empty();
         }
 
+        // Though there shouldn't be any case where fetch batch is prior to 
start offset, as fetch
+        // offset should have moved past start offset. However, check ensure 
that no records are
+        // acquired prior to start offset.
+        if (lastBatch.lastOffset() < startOffset()) {
+            // Fetch batch is prior to start offset, nothing to acquire.
+            return ShareAcquiredRecords.empty();
+        }
+
         LastOffsetAndMaxRecords lastOffsetAndMaxRecords = 
lastOffsetAndMaxRecordsToAcquire(fetchOffset,
             maxFetchRecords, lastBatch.lastOffset());
         if (lastOffsetAndMaxRecords.maxRecords() <= 0) {
@@ -744,7 +752,7 @@ public class SharePartition {
         // The lastOffsetAndMaxRecords contains the last offset to acquire and 
the maximum number of records
         // to acquire.
         int maxRecordsToAcquire = lastOffsetAndMaxRecords.maxRecords();
-        long lastOffsetToAcquire = lastOffsetAndMaxRecords.lastOffset();
+        final long lastOffsetToAcquire = lastOffsetAndMaxRecords.lastOffset();
 
         // We require the first batch of records to get the base offset. Stop 
parsing further
         // batches.
@@ -788,7 +796,12 @@ public class SharePartition {
                 baseOffset = floorEntry.getKey();
             }
             // Validate if the fetch records are already part of existing 
batches and if available.
-            NavigableMap<Long, InFlightBatch> subMap = 
cachedState.subMap(baseOffset, true, lastOffsetToAcquire, true);
+            // The sub map is used to find the overlapping batches in the 
cache for the request batch.
+            // However, as baseOffset might have been adjusted above, which 
could either move ahead
+            // to align with startOffset or moved back to align with floor 
entry hence compute the
+            // min of first batch base offset and adjusted base offset.
+            final NavigableMap<Long, InFlightBatch> subMap = 
cachedState.subMap(
+                Math.min(firstBatch.baseOffset(), baseOffset), true, 
lastOffsetToAcquire, true);
             // No overlap with request offsets in the cache for in-flight 
records. Acquire the complete
             // batch.
             if (subMap.isEmpty()) {
@@ -796,8 +809,16 @@ public class SharePartition {
                     groupId, topicIdPartition);
                 // It's safe to use lastOffsetToAcquire instead of 
lastBatch.lastOffset() because there is no
                 // overlap hence the lastOffsetToAcquire is same as 
lastBatch.lastOffset() or before that.
+                // Also, the first offset to acquire should be baseOffset. The 
baseOffset could be adjusted
+                // either prior to the fetch batch's base offset, in that case 
there has to be a submap
+                // entry hence the current code path shall not be executed, or 
the baseOffset is adjusted
+                // past the batch's base offset, to startOffset, in which case 
acquire should honour the
+                // adjusted baseOffset. Consider persister returns 5 as 
startOffset and a batch of 15-20
+                // in ARCHIVED state. The fetch returns 0-10 as first batch, 
then the baseOffset
+                // is adjusted to 5, to the startOffset. As there is no cached 
batch from 0-10, the
+                // submap will be empty and the first offset for batch for 
acquire should be 5 not 0.
                 ShareAcquiredRecords shareAcquiredRecords = 
acquireNewBatchRecords(memberId, fetchPartitionData.records.batches(), 
isRecordLimitMode,
-                    firstBatch.baseOffset(), lastOffsetToAcquire, batchSize, 
maxRecordsToAcquire);
+                    baseOffset, lastOffsetToAcquire, batchSize, 
maxRecordsToAcquire);
                 return 
maybeFilterAbortedTransactionalAcquiredRecords(fetchPartitionData, 
isolationLevel, shareAcquiredRecords);
             }
 
diff --git a/core/src/test/java/kafka/server/share/SharePartitionTest.java 
b/core/src/test/java/kafka/server/share/SharePartitionTest.java
index b8e2d1f81ad..f84f64afdb1 100644
--- a/core/src/test/java/kafka/server/share/SharePartitionTest.java
+++ b/core/src/test/java/kafka/server/share/SharePartitionTest.java
@@ -2520,6 +2520,109 @@ public class SharePartitionTest {
         assertEquals(25, sharePartition.nextFetchOffset());
     }
 
+    @Test
+    public void testAcquireBatchPriorToStartOffset() {
+        Persister persister = Mockito.mock(Persister.class);
+        ReadShareGroupStateResult readShareGroupStateResult = 
Mockito.mock(ReadShareGroupStateResult.class);
+        
Mockito.when(readShareGroupStateResult.topicsData()).thenReturn(List.of(
+            new TopicData<>(TOPIC_ID_PARTITION.topicId(), List.of(
+                PartitionFactory.newPartitionAllData(0, 3, 5L, 
Errors.NONE.code(), Errors.NONE.message(),
+                    List.of(
+                        new PersisterStateBatch(5L, 99L, 
RecordState.AVAILABLE.id, (short) 1)
+                    ))))));
+        
Mockito.when(persister.readState(Mockito.any())).thenReturn(CompletableFuture.completedFuture(readShareGroupStateResult));
+
+        SharePartition sharePartition = SharePartitionBuilder.builder()
+            .withPersister(persister)
+            .withSharePartitionMetrics(sharePartitionMetrics)
+            .build();
+        sharePartition.maybeInitialize();
+
+        // Validate the cached state after initialization.
+        assertEquals(5, sharePartition.nextFetchOffset());
+        assertEquals(5, sharePartition.startOffset());
+        assertEquals(1, sharePartition.cachedState().size());
+        assertEquals(5, sharePartition.cachedState().get(5L).firstOffset());
+        assertEquals(99, sharePartition.cachedState().get(5L).lastOffset());
+        assertEquals(RecordState.AVAILABLE, 
sharePartition.cachedState().get(5L).batchState());
+
+        // Acquire offsets prior to start offset. Should not acquire any 
records.
+        fetchAcquiredRecords(sharePartition.acquire(
+                MEMBER_ID,
+                ShareAcquireMode.RECORD_LIMIT,
+                10,
+                10,
+                DEFAULT_FETCH_OFFSET,
+                fetchPartitionData(memoryRecords(0, 5)),
+                FETCH_ISOLATION_HWM),
+            0);
+
+        // Validate the cached state remains unchanged.
+        assertEquals(5, sharePartition.nextFetchOffset());
+        assertEquals(5, sharePartition.startOffset());
+        assertEquals(1, sharePartition.cachedState().size());
+        assertEquals(5, sharePartition.cachedState().get(5L).firstOffset());
+        assertEquals(99, sharePartition.cachedState().get(5L).lastOffset());
+        assertEquals(RecordState.AVAILABLE, 
sharePartition.cachedState().get(5L).batchState());
+    }
+
+    /**
+     * Test validates the scenario where the partition has been re-initialized 
and the first fetch batch
+     * has start offset in middle and no overlap with the batch returned from 
the persister during initialization.
+     * In this case, the acquire logic should respect the start offset and not 
allow acquiring records
+     * prior to the start offset.
+     */
+    @Test
+    public void 
testAcquireBatchWithMovedStartOffsetAndNoOverlapWithCachedBatch() {
+        Persister persister = Mockito.mock(Persister.class);
+        ReadShareGroupStateResult readShareGroupStateResult = 
Mockito.mock(ReadShareGroupStateResult.class);
+        
Mockito.when(readShareGroupStateResult.topicsData()).thenReturn(List.of(
+            new TopicData<>(TOPIC_ID_PARTITION.topicId(), List.of(
+                PartitionFactory.newPartitionAllData(0, 3, 5L, 
Errors.NONE.code(), Errors.NONE.message(),
+                    List.of(
+                        new PersisterStateBatch(15L, 20L, 
RecordState.ARCHIVED.id, (short) 1)
+                    ))))));
+        
Mockito.when(persister.readState(Mockito.any())).thenReturn(CompletableFuture.completedFuture(readShareGroupStateResult));
+
+        SharePartition sharePartition = SharePartitionBuilder.builder()
+            .withPersister(persister)
+            .withSharePartitionMetrics(sharePartitionMetrics)
+            .build();
+        sharePartition.maybeInitialize();
+
+        // Validate the cached state after initialization.
+        assertEquals(5, sharePartition.nextFetchOffset());
+        assertEquals(5, sharePartition.startOffset());
+        assertEquals(1, sharePartition.cachedState().size());
+        assertEquals(15, sharePartition.cachedState().get(15L).firstOffset());
+        assertEquals(20, sharePartition.cachedState().get(15L).lastOffset());
+        assertEquals(RecordState.ARCHIVED, 
sharePartition.cachedState().get(15L).batchState());
+        // As there is a gap between 5-14 offsets, gap window should be 
created.
+        assertNotNull(sharePartition.persisterReadResultGapWindow());
+        assertEquals(5, 
sharePartition.persisterReadResultGapWindow().gapStartOffset());
+
+        // Acquire offsets starting prior to start offset and going beyond it. 
Only offsets from 5-9 should
+        // be acquired.
+        List<AcquiredRecords> acquiredRecordsList = 
fetchAcquiredRecords(sharePartition.acquire(
+                MEMBER_ID,
+                ShareAcquireMode.RECORD_LIMIT,
+                10,
+                10,
+                5L,
+                fetchPartitionData(memoryRecords(0, 10)),
+                FETCH_ISOLATION_HWM),
+            5);
+
+        assertArrayEquals(expectedAcquiredRecord(5, 9, 1).toArray(), 
acquiredRecordsList.toArray());
+        assertEquals(2, sharePartition.cachedState().size());
+        assertEquals(10, sharePartition.nextFetchOffset());
+        assertEquals(5L, sharePartition.startOffset());
+        assertEquals(5, sharePartition.cachedState().get(5L).firstOffset());
+        assertEquals(9, sharePartition.cachedState().get(5L).lastOffset());
+        assertNotNull(sharePartition.persisterReadResultGapWindow());
+        assertEquals(10, 
sharePartition.persisterReadResultGapWindow().gapStartOffset());
+    }
+
     @Test
     public void testNextFetchOffsetInitialState() {
         SharePartition sharePartition = 
SharePartitionBuilder.builder().withState(SharePartitionState.ACTIVE).build();
@@ -10635,6 +10738,113 @@ public class SharePartitionTest {
         assertEquals(5, 
sharePartitionMetrics.inFlightBatchMessageCount().max());
     }
 
+    /**
+     * Test validates the scenario where the partition has been re-initialized 
with a moved start offset.
+     * In this case, the acquire logic should respect the new start offset and 
not allow acquiring
+     * records before the new start offset. For the test, simulate a scenario 
where the log batch is 0-99
+     * offsets, but post re-initialization, the start offset is moved to 5 
hence share partition should
+     * have 5-99 offsets in AVAILABLE state in the cache. Post acquire and 
acknowledge of 5-14 offsets,
+     * the start offset is moved to 15. Next acquire on the same log batch 
should only allow acquiring
+     * offsets from 15 offset and should not create any other entries in the 
cache.
+     */
+    @Test
+    public void testAcquireBatchInRecordLimitModeWithMovedStartOffset() {
+        Persister persister = Mockito.mock(Persister.class);
+        ReadShareGroupStateResult readShareGroupStateResult = 
Mockito.mock(ReadShareGroupStateResult.class);
+        
Mockito.when(readShareGroupStateResult.topicsData()).thenReturn(List.of(
+            new TopicData<>(TOPIC_ID_PARTITION.topicId(), List.of(
+                PartitionFactory.newPartitionAllData(0, 3, 5L, 
Errors.NONE.code(), Errors.NONE.message(),
+                    List.of(
+                        new PersisterStateBatch(5L, 99L, 
RecordState.AVAILABLE.id, (short) 1)
+                    ))))));
+        
Mockito.when(persister.readState(Mockito.any())).thenReturn(CompletableFuture.completedFuture(readShareGroupStateResult));
+
+        SharePartition sharePartition = SharePartitionBuilder.builder()
+            .withPersister(persister)
+            .withSharePartitionMetrics(sharePartitionMetrics)
+            .build();
+        sharePartition.maybeInitialize();
+
+        // Validate the cached state after initialization.
+        assertEquals(5, sharePartition.nextFetchOffset());
+        assertEquals(5, sharePartition.startOffset());
+        assertEquals(1, sharePartition.cachedState().size());
+        assertEquals(5, sharePartition.cachedState().get(5L).firstOffset());
+        assertEquals(99, sharePartition.cachedState().get(5L).lastOffset());
+        assertEquals(RecordState.AVAILABLE, 
sharePartition.cachedState().get(5L).batchState());
+
+        // Send the batch of 0-99 offsets for acquire, only 5-14 should be 
acquired. Though the start
+        // offset for the share partition is 5 and the log batch base offset 
is 0, but acquire should
+        // respect the share partition start offset.
+        List<AcquiredRecords> acquiredRecordsList = 
fetchAcquiredRecords(sharePartition.acquire(
+                MEMBER_ID,
+                ShareAcquireMode.RECORD_LIMIT,
+                10,
+                10,
+                5L,
+                fetchPartitionData(memoryRecords(0, 100)),
+                FETCH_ISOLATION_HWM),
+            10);
+
+        assertArrayEquals(expectedAcquiredRecords(5, 14, 2).toArray(), 
acquiredRecordsList.toArray());
+
+        assertEquals(15, sharePartition.nextFetchOffset());
+        assertEquals(5L, sharePartition.startOffset());
+        assertEquals(1, sharePartition.cachedState().size());
+        assertEquals(5, sharePartition.cachedState().get(5L).firstOffset());
+        assertEquals(99, sharePartition.cachedState().get(5L).lastOffset());
+
+        // Offset state should be maintained since partial offsets in the 
batch are acquired.
+        assertThrows(IllegalStateException.class, () -> 
sharePartition.cachedState().get(5L).batchState());
+        assertEquals(RecordState.ACQUIRED, 
sharePartition.cachedState().get(5L).offsetState().get(5L).state());
+        assertEquals(RecordState.ACQUIRED, 
sharePartition.cachedState().get(5L).offsetState().get(14L).state());
+        assertEquals(RecordState.AVAILABLE, 
sharePartition.cachedState().get(5L).offsetState().get(15L).state());
+
+        // Acknowledge the acquired offsets 5-14 so the start offset moves to 
15.
+        WriteShareGroupStateResult writeShareGroupStateResult = 
Mockito.mock(WriteShareGroupStateResult.class);
+        
Mockito.when(writeShareGroupStateResult.topicsData()).thenReturn(List.of(
+            new TopicData<>(TOPIC_ID_PARTITION.topicId(), List.of(
+                PartitionFactory.newPartitionErrorData(0, Errors.NONE.code(), 
Errors.NONE.message())))));
+        
Mockito.when(persister.writeState(Mockito.any())).thenReturn(CompletableFuture.completedFuture(writeShareGroupStateResult));
+        sharePartition.acknowledge(MEMBER_ID, List.of(
+            new ShareAcknowledgementBatch(5, 14, 
List.of(AcknowledgeType.ACCEPT.id))
+        ));
+
+        assertEquals(15, sharePartition.nextFetchOffset());
+        assertEquals(15L, sharePartition.startOffset());
+        assertEquals(1, sharePartition.cachedState().size());
+        assertEquals(5, sharePartition.cachedState().get(5L).firstOffset());
+        assertEquals(99, sharePartition.cachedState().get(5L).lastOffset());
+
+        assertEquals(RecordState.ACKNOWLEDGED, 
sharePartition.cachedState().get(5L).offsetState().get(5L).state());
+        assertEquals(RecordState.ACKNOWLEDGED, 
sharePartition.cachedState().get(5L).offsetState().get(14L).state());
+        assertEquals(RecordState.AVAILABLE, 
sharePartition.cachedState().get(5L).offsetState().get(15L).state());
+
+        // Re-acquire on the same log batch of 0-99 offsets, only 15-24 should 
be acquired.
+        acquiredRecordsList = fetchAcquiredRecords(sharePartition.acquire(
+                MEMBER_ID,
+                ShareAcquireMode.RECORD_LIMIT,
+                10,
+                10,
+                15,
+                fetchPartitionData(memoryRecords(0, 100)),
+                FETCH_ISOLATION_HWM),
+            10);
+
+        assertArrayEquals(expectedAcquiredRecords(15, 24, 2).toArray(), 
acquiredRecordsList.toArray());
+        assertEquals(25, sharePartition.nextFetchOffset());
+        assertEquals(15L, sharePartition.startOffset());
+        assertEquals(1, sharePartition.cachedState().size());
+        assertEquals(5, sharePartition.cachedState().get(5L).firstOffset());
+        assertEquals(99, sharePartition.cachedState().get(5L).lastOffset());
+
+        assertEquals(RecordState.ACKNOWLEDGED, 
sharePartition.cachedState().get(5L).offsetState().get(5L).state());
+        assertEquals(RecordState.ACKNOWLEDGED, 
sharePartition.cachedState().get(5L).offsetState().get(14L).state());
+        assertEquals(RecordState.ACQUIRED, 
sharePartition.cachedState().get(5L).offsetState().get(15L).state());
+        assertEquals(RecordState.ACQUIRED, 
sharePartition.cachedState().get(5L).offsetState().get(24L).state());
+        assertEquals(RecordState.AVAILABLE, 
sharePartition.cachedState().get(5L).offsetState().get(25L).state());
+    }
+
     @Test
     public void testAcknowledgeInRecordLimitMode() {
         Persister persister = Mockito.mock(Persister.class);

Reply via email to