This is an automated email from the ASF dual-hosted git repository.

mittal pushed a commit to branch trunk
in repository https://gitbox.apache.org/repos/asf/kafka.git


The following commit(s) were added to refs/heads/trunk by this push:
     new 49ee1fb4f9d KAFKA-19632: Handle overlap batch on partition 
re-assignment (#20395)
49ee1fb4f9d is described below

commit 49ee1fb4f9deb326d82b86d3a047182f7d96ddfa
Author: Apoorv Mittal <apoorvmitta...@gmail.com>
AuthorDate: Tue Aug 26 13:25:57 2025 +0100

    KAFKA-19632: Handle overlap batch on partition re-assignment (#20395)
    
    The PR fixes the batch alignment issue when partitions are re-assigned.
    During initial read of state the batches can be broken arbitrarily. Say
    the start offset is 10 and cache contains [15-18] batch during
    initialization. When fetch happens at offset 10 and say the fetched
    batch contain 10 records i.e. [10-19] then correct batches will be
    created if maxFetchRecords is greater than 10. But if maxFetchRecords is
    less than 10 then last offset of batch is determined, which will be 19.
    Hence acquire method will incorrectly create a batch of [10-19] while
    [15-18] already exists. Below check is required t resolve the issue:
    ```
    if (isInitialReadGapOffsetWindowActive() && lastAcquiredOffset >
    lastOffset) {
         lastAcquiredOffset = lastOffset;
    }
    ```
    
    While testing with other cases, other issues were determined while
    updating the gap offset, acquire of records prior share partitions end
    offset and determining next fetch offset with compacted topics. All
    these issues can arise mainly during initial read window after partition
    re-assignment.
    
    Reviewers: Andrew Schofield <aschofi...@confluent.io>, Abhinav Dixit
     <adi...@confluent.io>, Chirag Wadhwa <cwad...@confluent.io>
---
 .../java/kafka/server/share/SharePartition.java    |  92 ++-
 .../kafka/server/share/SharePartitionTest.java     | 666 +++++++++++++++++++++
 .../acknowledge/ShareAcknowledgementBatch.java     |   3 +
 3 files changed, 749 insertions(+), 12 deletions(-)

diff --git a/core/src/main/java/kafka/server/share/SharePartition.java 
b/core/src/main/java/kafka/server/share/SharePartition.java
index a07f9a12fbb..08a9539dbed 100644
--- a/core/src/main/java/kafka/server/share/SharePartition.java
+++ b/core/src/main/java/kafka/server/share/SharePartition.java
@@ -572,7 +572,16 @@ public class SharePartition {
                         nextFetchOffset = gapStartOffset;
                         break;
                     }
-                    gapStartOffset = entry.getValue().lastOffset() + 1;
+                    // If the gapStartOffset is already past the last offset 
of the in-flight batch,
+                    // then do not consider this batch for finding the next 
fetch offset. For example,
+                    // consider during initialization, the 
initialReadGapOffset is set to 5 and the
+                    // first cached batch is 15-18. First read will happen at 
offset 5 and say the data
+                    // fetched is [5-6], now next fetch offset should be 7. 
This works fine but say
+                    // subsequent read returns batch 8-11, and the 
gapStartOffset will be 12. Without
+                    // the max check, the next fetch offset returned will be 7 
which is incorrect.
+                    // The natural gaps for which no data is available shall 
be considered hence
+                    // take the max of the gapStartOffset and the last offset 
of the in-flight batch.
+                    gapStartOffset = Math.max(entry.getValue().lastOffset() + 
1, gapStartOffset);
                 }
 
                 // Check if the state is maintained per offset or batch. If 
the offsetState
@@ -699,16 +708,33 @@ public class SharePartition {
 
             // Find the floor batch record for the request batch. The request 
batch could be
             // for a subset of the in-flight batch i.e. cached batch of offset 
10-14 and request batch
-            // of 12-13. Hence, floor entry is fetched to find the sub-map.
+            // of 12-13. Hence, floor entry is fetched to find the sub-map. 
Secondly, when the share
+            // partition is initialized with persisted state, the start offset 
might be moved to a later
+            // offset. In such case, the first batch base offset might be less 
than the start offset.
             Map.Entry<Long, InFlightBatch> floorEntry = 
cachedState.floorEntry(baseOffset);
-            // We might find a batch with floor entry but not necessarily that 
batch has an overlap,
-            // if the request batch base offset is ahead of last offset from 
floor entry i.e. cached
-            // batch of 10-14 and request batch of 15-18, though floor entry 
is found but no overlap.
-            // Such scenario will be handled in the next step when considering 
the subMap. However,
-            // if the floor entry is found and the request batch base offset 
is within the floor entry
-            // then adjust the base offset to the floor entry so that acquire 
method can still work on
-            // previously cached batch boundaries.
-            if (floorEntry != null && floorEntry.getValue().lastOffset() >= 
baseOffset) {
+            if (floorEntry == null) {
+                // The initialize method check that there couldn't be any 
batches prior to the start offset.
+                // And once share partition starts fetching records, it will 
always fetch records, at least,
+                // from the start offset, but there could be cases where the 
batch base offset is prior
+                // to the start offset. This can happen when the share 
partition is initialized with
+                // partial persisted state and moved start offset i.e. start 
offset is not the batch's
+                // first offset. In such case, we need to adjust the base 
offset to the start offset.
+                // It's safe to adjust the base offset to the start offset 
when there isn't any floor
+                // i.e. no cached batches available prior to the request batch 
base offset. Hence,
+                // check for the floor entry and adjust the base offset 
accordingly.
+                if (baseOffset < startOffset) {
+                    log.info("Adjusting base offset for the fetch as it's 
prior to start offset: {}-{}"
+                            + "from {} to {}", groupId, topicIdPartition, 
baseOffset, startOffset);
+                    baseOffset = startOffset;
+                }
+            } else if (floorEntry.getValue().lastOffset() >= baseOffset) {
+                // We might find a batch with floor entry but not necessarily 
that batch has an overlap,
+                // if the request batch base offset is ahead of last offset 
from floor entry i.e. cached
+                // batch of 10-14 and request batch of 15-18, though floor 
entry is found but no overlap.
+                // Such scenario will be handled in the next step when 
considering the subMap. However,
+                // if the floor entry is found and the request batch base 
offset is within the floor entry
+                // then adjust the base offset to the floor entry so that 
acquire method can still work on
+                // previously cached batch boundaries.
                 baseOffset = floorEntry.getKey();
             }
             // Validate if the fetch records are already part of existing 
batches and if available.
@@ -755,7 +781,8 @@ public class SharePartition {
                         result.addAll(shareAcquiredRecords.acquiredRecords());
                         acquiredCount += shareAcquiredRecords.count();
                     }
-                    // Set nextBatchStartOffset as the last offset of the 
current in-flight batch + 1
+                    // Set nextBatchStartOffset as the last offset of the 
current in-flight batch + 1.
+                    // Hence, after the loop iteration the next gap can be 
considered.
                     maybeGapStartOffset = inFlightBatch.lastOffset() + 1;
                     // If the acquired count is equal to the max fetch records 
then break the loop.
                     if (acquiredCount >= maxRecordsToAcquire) {
@@ -1057,10 +1084,24 @@ public class SharePartition {
     /**
      * Updates the cached state, start and end offsets of the share partition 
as per the new log
      * start offset. The method is called when the log start offset is moved 
for the share partition.
+     * <p>
+     * This method only archives the available records in the cached state 
that are before the new log
+     * start offset. It does not persist the archived state batches to the 
persister, rather it
+     * updates the cached state and offsets to reflect the new log start 
offset. The state in persister
+     * will be updated lazily during the acknowledge/release records API calls 
or acquisition lock timeout.
+     * <p>
+     * The AVAILABLE state records can either have ongoing state transition or 
not. Hence, the archive
+     * records method will update the state of the records to ARCHIVED and set 
the terminal state flag
+     * hence if the transition is rolled back then the state will not be 
AVAILABLE again. However,
+     * the ACQUIRED state records will not be archived as they are still 
in-flight and acknowledge
+     * method also do not allow the state update for any offsets post the log 
start offset, hence those
+     * records will only be archived once acquisition lock timeout occurs.
      *
      * @param logStartOffset The new log start offset.
      */
     void updateCacheAndOffsets(long logStartOffset) {
+        log.debug("Updating cached states for share partition: {}-{} with new 
log start offset: {}",
+            groupId, topicIdPartition, logStartOffset);
         lock.writeLock().lock();
         try {
             if (logStartOffset <= startOffset) {
@@ -1432,7 +1473,11 @@ public class SharePartition {
         lock.writeLock().lock();
         try {
             if (initialReadGapOffset != null) {
-                if (initialReadGapOffset.endOffset() == endOffset) {
+                // When last cached batch for initial read gap window is 
acquired, then endOffset is
+                // same as the initialReadGapOffset's endOffset, but the gap 
offset to update is
+                // endOffset + 1. Hence, do not update the gap start offset if 
the request offset
+                // is ahead of the endOffset.
+                if (initialReadGapOffset.endOffset() == endOffset && offset <= 
initialReadGapOffset.endOffset()) {
                     initialReadGapOffset.gapStartOffset(offset);
                 } else {
                     // The initial read gap offset is not valid anymore as the 
end offset has moved
@@ -1445,6 +1490,15 @@ public class SharePartition {
         }
     }
 
+    /**
+     * The method calculates the last offset and maximum records to acquire. 
The adjustment is needed
+     * to ensure that the records acquired do not exceed the maximum in-flight 
messages limit.
+     *
+     * @param fetchOffset The offset from which the records are fetched.
+     * @param maxFetchRecords The maximum number of records to acquire.
+     * @param lastOffset The last offset to acquire records to, which is the 
last offset of the fetched batch.
+     * @return LastOffsetAndMaxRecords object, containing the last offset to 
acquire and the maximum records to acquire.
+     */
     private LastOffsetAndMaxRecords lastOffsetAndMaxRecordsToAcquire(long 
fetchOffset, int maxFetchRecords, long lastOffset) {
         // There can always be records fetched exceeding the max in-flight 
messages limit. Hence,
         // we need to check if the share partition has reached the max 
in-flight messages limit
@@ -1512,6 +1566,20 @@ public class SharePartition {
                 // which falls under the max messages limit. As the max fetch 
records is the soft
                 // limit, the last offset can be higher than the max messages.
                 lastAcquiredOffset = 
lastOffsetFromBatchWithRequestOffset(batches, firstAcquiredOffset + 
maxFetchRecords - 1);
+                // If the initial read gap offset window is active then it's 
not guaranteed that the
+                // batches align on batch boundaries. Hence, reset to last 
offset itself if the batch's
+                // last offset is greater than the last offset for 
acquisition, else there could be
+                // a situation where the batch overlaps with the initial read 
gap offset window batch.
+                // For example, if the initial read gap offset window is 10-30 
i.e. initialReadGapOffset's
+                // startOffset is 10 and endOffset is 30, and the first 
persister's read batch is 15-30.
+                // Say first fetched batch from log is 10-30 and 
maxFetchRecords is 1, then the lastOffset
+                // in this method call would be 14. As the maxFetchRecords is 
lesser than the batch,
+                // hence last batch offset for request offset is fetched. In 
this example it will
+                // be 30, hence check if the initial read gap offset window is 
active and the last acquired
+                // offset should be adjusted to 14 instead of 30.
+                if (isInitialReadGapOffsetWindowActive() && lastAcquiredOffset 
> lastOffset) {
+                    lastAcquiredOffset = lastOffset;
+                }
             }
 
             // Create batches of acquired records.
diff --git a/core/src/test/java/kafka/server/share/SharePartitionTest.java 
b/core/src/test/java/kafka/server/share/SharePartitionTest.java
index ba24f3b2595..75f27ec5382 100644
--- a/core/src/test/java/kafka/server/share/SharePartitionTest.java
+++ b/core/src/test/java/kafka/server/share/SharePartitionTest.java
@@ -1088,6 +1088,672 @@ public class SharePartitionTest {
         assertNull(initialReadGapOffset);
     }
 
+    @Test
+    public void testMaybeInitializeAndAcquire() {
+        Persister persister = Mockito.mock(Persister.class);
+        ReadShareGroupStateResult readShareGroupStateResult = 
Mockito.mock(ReadShareGroupStateResult.class);
+        
Mockito.when(readShareGroupStateResult.topicsData()).thenReturn(List.of(
+            new TopicData<>(TOPIC_ID_PARTITION.topicId(), List.of(
+                PartitionFactory.newPartitionAllData(0, 3, 10L, 
Errors.NONE.code(), Errors.NONE.message(),
+                    List.of(
+                        new PersisterStateBatch(15L, 18L, 
RecordState.AVAILABLE.id, (short) 2),
+                        new PersisterStateBatch(20L, 22L, 
RecordState.ARCHIVED.id, (short) 2),
+                        new PersisterStateBatch(26L, 30L, 
RecordState.AVAILABLE.id, (short) 3)))))));
+        
Mockito.when(persister.readState(Mockito.any())).thenReturn(CompletableFuture.completedFuture(readShareGroupStateResult));
+        SharePartition sharePartition = 
SharePartitionBuilder.builder().withPersister(persister).build();
+
+        CompletableFuture<Void> result = sharePartition.maybeInitialize();
+        assertTrue(result.isDone());
+        assertFalse(result.isCompletedExceptionally());
+
+        assertEquals(SharePartitionState.ACTIVE, 
sharePartition.partitionState());
+        assertEquals(3, sharePartition.cachedState().size());
+        assertEquals(10, sharePartition.startOffset());
+        assertEquals(30, sharePartition.endOffset());
+        assertEquals(10, sharePartition.nextFetchOffset());
+
+        assertEquals(18, sharePartition.cachedState().get(15L).lastOffset());
+        assertEquals(22, sharePartition.cachedState().get(20L).lastOffset());
+        assertEquals(30, sharePartition.cachedState().get(26L).lastOffset());
+        assertEquals(RecordState.AVAILABLE, 
sharePartition.cachedState().get(15L).batchState());
+        assertEquals(RecordState.ARCHIVED, 
sharePartition.cachedState().get(20L).batchState());
+        assertEquals(RecordState.AVAILABLE, 
sharePartition.cachedState().get(26L).batchState());
+        assertNotNull(sharePartition.initialReadGapOffset());
+        assertEquals(10L, 
sharePartition.initialReadGapOffset().gapStartOffset());
+        assertEquals(30L, sharePartition.initialReadGapOffset().endOffset());
+
+        // Create a single batch record that covers the entire range from 10 
to 30 of initial read gap.
+        // The records in the batch are from 10 to 49.
+        MemoryRecords records = memoryRecords(40, 10);
+        // Set max fetch records to 1, records will be acquired till the first 
gap is encountered.
+        List<AcquiredRecords> acquiredRecordsList = 
fetchAcquiredRecords(sharePartition.acquire(
+                MEMBER_ID,
+                BATCH_SIZE,
+                1,
+                10,
+                fetchPartitionData(records),
+                FETCH_ISOLATION_HWM),
+            5);
+
+        assertArrayEquals(expectedAcquiredRecord(10, 14, 1).toArray(), 
acquiredRecordsList.toArray());
+        assertEquals(15, sharePartition.nextFetchOffset());
+        assertEquals(4, sharePartition.cachedState().size());
+        assertEquals(10, sharePartition.cachedState().get(10L).firstOffset());
+        assertEquals(14, sharePartition.cachedState().get(10L).lastOffset());
+        assertEquals(RecordState.ACQUIRED, 
sharePartition.cachedState().get(10L).batchState());
+        assertEquals(1, 
sharePartition.cachedState().get(10L).batchDeliveryCount());
+        assertNull(sharePartition.cachedState().get(10L).offsetState());
+        assertEquals(RecordState.AVAILABLE, 
sharePartition.cachedState().get(15L).batchState());
+        assertEquals(RecordState.ARCHIVED, 
sharePartition.cachedState().get(20L).batchState());
+        assertEquals(RecordState.AVAILABLE, 
sharePartition.cachedState().get(26L).batchState());
+        assertNotNull(sharePartition.initialReadGapOffset());
+        assertEquals(15L, 
sharePartition.initialReadGapOffset().gapStartOffset());
+
+        // Send the same batch again to acquire the next set of records.
+        acquiredRecordsList = fetchAcquiredRecords(sharePartition.acquire(
+                MEMBER_ID,
+                BATCH_SIZE,
+                10,
+                15,
+                fetchPartitionData(records),
+                FETCH_ISOLATION_HWM),
+            13);
+
+        List<AcquiredRecords> expectedAcquiredRecords = new 
ArrayList<>(expectedAcquiredRecord(15, 18, 3));
+        expectedAcquiredRecords.addAll(expectedAcquiredRecord(19, 19, 1));
+        expectedAcquiredRecords.addAll(expectedAcquiredRecord(23, 25, 1));
+        expectedAcquiredRecords.addAll(expectedAcquiredRecord(26, 30, 4));
+
+        assertArrayEquals(expectedAcquiredRecords.toArray(), 
acquiredRecordsList.toArray());
+        assertEquals(31, sharePartition.nextFetchOffset());
+        assertEquals(6, sharePartition.cachedState().size());
+        assertEquals(19, sharePartition.cachedState().get(19L).firstOffset());
+        assertEquals(19, sharePartition.cachedState().get(19L).lastOffset());
+        assertEquals(RecordState.ACQUIRED, 
sharePartition.cachedState().get(19L).batchState());
+        assertEquals(1, 
sharePartition.cachedState().get(19L).batchDeliveryCount());
+        assertNull(sharePartition.cachedState().get(19L).offsetState());
+        assertEquals(RecordState.ACQUIRED, 
sharePartition.cachedState().get(15L).batchState());
+        assertEquals(RecordState.ARCHIVED, 
sharePartition.cachedState().get(20L).batchState());
+        assertEquals(23, sharePartition.cachedState().get(23L).firstOffset());
+        assertEquals(25, sharePartition.cachedState().get(23L).lastOffset());
+        assertEquals(RecordState.ACQUIRED, 
sharePartition.cachedState().get(23L).batchState());
+        assertEquals(1, 
sharePartition.cachedState().get(23L).batchDeliveryCount());
+        assertEquals(RecordState.ACQUIRED, 
sharePartition.cachedState().get(26L).batchState());
+        assertEquals(30L, sharePartition.endOffset());
+        // As all the gaps are now filled, the initialReadGapOffset should be 
null.
+        assertNull(sharePartition.initialReadGapOffset());
+
+        // Now initial read gap is filled, so the complete batch can be 
acquired despite max fetch records being 1.
+        acquiredRecordsList = fetchAcquiredRecords(sharePartition.acquire(
+                MEMBER_ID,
+                BATCH_SIZE,
+                1,
+                31,
+                fetchPartitionData(records),
+                FETCH_ISOLATION_HWM),
+            19);
+
+        assertArrayEquals(expectedAcquiredRecord(31, 49, 1).toArray(), 
acquiredRecordsList.toArray());
+        assertEquals(50, sharePartition.nextFetchOffset());
+        assertEquals(7, sharePartition.cachedState().size());
+        assertEquals(31, sharePartition.cachedState().get(31L).firstOffset());
+        assertEquals(49, sharePartition.cachedState().get(31L).lastOffset());
+        assertEquals(RecordState.ACQUIRED, 
sharePartition.cachedState().get(31L).batchState());
+        assertEquals(1, 
sharePartition.cachedState().get(31L).batchDeliveryCount());
+        assertNull(sharePartition.cachedState().get(31L).offsetState());
+        assertEquals(49L, sharePartition.endOffset());
+    }
+
+    @Test
+    public void testMaybeInitializeAndAcquireWithHigherMaxFetchRecords() {
+        Persister persister = Mockito.mock(Persister.class);
+        ReadShareGroupStateResult readShareGroupStateResult = 
Mockito.mock(ReadShareGroupStateResult.class);
+        
Mockito.when(readShareGroupStateResult.topicsData()).thenReturn(List.of(
+            new TopicData<>(TOPIC_ID_PARTITION.topicId(), List.of(
+                PartitionFactory.newPartitionAllData(0, 3, 10L, 
Errors.NONE.code(), Errors.NONE.message(),
+                    List.of(
+                        new PersisterStateBatch(15L, 18L, 
RecordState.AVAILABLE.id, (short) 2),
+                        new PersisterStateBatch(20L, 22L, 
RecordState.ARCHIVED.id, (short) 2),
+                        new PersisterStateBatch(26L, 30L, 
RecordState.AVAILABLE.id, (short) 3)))))));
+        
Mockito.when(persister.readState(Mockito.any())).thenReturn(CompletableFuture.completedFuture(readShareGroupStateResult));
+        SharePartition sharePartition = 
SharePartitionBuilder.builder().withPersister(persister).build();
+
+        CompletableFuture<Void> result = sharePartition.maybeInitialize();
+        assertTrue(result.isDone());
+        assertFalse(result.isCompletedExceptionally());
+
+        assertEquals(SharePartitionState.ACTIVE, 
sharePartition.partitionState());
+        assertEquals(3, sharePartition.cachedState().size());
+        assertEquals(10, sharePartition.startOffset());
+        assertEquals(30, sharePartition.endOffset());
+        assertEquals(10, sharePartition.nextFetchOffset());
+
+        assertEquals(18, sharePartition.cachedState().get(15L).lastOffset());
+        assertEquals(22, sharePartition.cachedState().get(20L).lastOffset());
+        assertEquals(30, sharePartition.cachedState().get(26L).lastOffset());
+        assertEquals(RecordState.AVAILABLE, 
sharePartition.cachedState().get(15L).batchState());
+        assertEquals(RecordState.ARCHIVED, 
sharePartition.cachedState().get(20L).batchState());
+        assertEquals(RecordState.AVAILABLE, 
sharePartition.cachedState().get(26L).batchState());
+        assertNotNull(sharePartition.initialReadGapOffset());
+        assertEquals(10L, 
sharePartition.initialReadGapOffset().gapStartOffset());
+        assertEquals(30L, sharePartition.initialReadGapOffset().endOffset());
+
+        // Create a single batch record that covers the entire range from 10 
to 30 of initial read gap.
+        // The records in the batch are from 10 to 49.
+        MemoryRecords records = memoryRecords(40, 10);
+        // Set max fetch records to 500, all records should be acquired.
+        List<AcquiredRecords> acquiredRecordsList = 
fetchAcquiredRecords(sharePartition.acquire(
+                MEMBER_ID,
+                BATCH_SIZE,
+                500,
+                10,
+                fetchPartitionData(records),
+                FETCH_ISOLATION_HWM),
+            37);
+
+        List<AcquiredRecords> expectedAcquiredRecords = new 
ArrayList<>(expectedAcquiredRecord(10, 14, 1));
+        expectedAcquiredRecords.addAll(expectedAcquiredRecord(15, 18, 3));
+        expectedAcquiredRecords.addAll(expectedAcquiredRecord(19, 19, 1));
+        expectedAcquiredRecords.addAll(expectedAcquiredRecord(23, 25, 1));
+        expectedAcquiredRecords.addAll(expectedAcquiredRecord(26, 30, 4));
+        expectedAcquiredRecords.addAll(expectedAcquiredRecord(31, 49, 1));
+
+        assertArrayEquals(expectedAcquiredRecords.toArray(), 
acquiredRecordsList.toArray());
+        assertEquals(50, sharePartition.nextFetchOffset());
+        assertEquals(7, sharePartition.cachedState().size());
+        assertEquals(10, sharePartition.cachedState().get(10L).firstOffset());
+        assertEquals(14, sharePartition.cachedState().get(10L).lastOffset());
+        assertEquals(19, sharePartition.cachedState().get(19L).firstOffset());
+        assertEquals(19, sharePartition.cachedState().get(19L).lastOffset());
+        assertEquals(23, sharePartition.cachedState().get(23L).firstOffset());
+        assertEquals(25, sharePartition.cachedState().get(23L).lastOffset());
+        assertEquals(31, sharePartition.cachedState().get(31L).firstOffset());
+        assertEquals(49, sharePartition.cachedState().get(31L).lastOffset());
+
+        assertEquals(RecordState.ACQUIRED, 
sharePartition.cachedState().get(10L).batchState());
+        assertEquals(RecordState.ACQUIRED, 
sharePartition.cachedState().get(15L).batchState());
+        assertEquals(RecordState.ACQUIRED, 
sharePartition.cachedState().get(19L).batchState());
+        assertEquals(RecordState.ARCHIVED, 
sharePartition.cachedState().get(20L).batchState());
+        assertEquals(RecordState.ACQUIRED, 
sharePartition.cachedState().get(23L).batchState());
+        assertEquals(RecordState.ACQUIRED, 
sharePartition.cachedState().get(26L).batchState());
+        assertEquals(RecordState.ACQUIRED, 
sharePartition.cachedState().get(31L).batchState());
+        assertEquals(49L, sharePartition.endOffset());
+        // As all the gaps are now filled, the initialReadGapOffset should be 
null.
+        assertNull(sharePartition.initialReadGapOffset());
+    }
+
+    @Test
+    public void 
testMaybeInitializeAndAcquireWithFetchBatchLastOffsetWithinCachedBatch() {
+        Persister persister = Mockito.mock(Persister.class);
+        ReadShareGroupStateResult readShareGroupStateResult = 
Mockito.mock(ReadShareGroupStateResult.class);
+        
Mockito.when(readShareGroupStateResult.topicsData()).thenReturn(List.of(
+            new TopicData<>(TOPIC_ID_PARTITION.topicId(), List.of(
+                PartitionFactory.newPartitionAllData(0, 3, 10L, 
Errors.NONE.code(), Errors.NONE.message(),
+                    List.of(
+                        new PersisterStateBatch(15L, 18L, 
RecordState.AVAILABLE.id, (short) 2),
+                        new PersisterStateBatch(20L, 22L, 
RecordState.ARCHIVED.id, (short) 2),
+                        new PersisterStateBatch(26L, 30L, 
RecordState.AVAILABLE.id, (short) 3)))))));
+        
Mockito.when(persister.readState(Mockito.any())).thenReturn(CompletableFuture.completedFuture(readShareGroupStateResult));
+        SharePartition sharePartition = 
SharePartitionBuilder.builder().withPersister(persister).build();
+
+        CompletableFuture<Void> result = sharePartition.maybeInitialize();
+        assertTrue(result.isDone());
+        assertFalse(result.isCompletedExceptionally());
+
+        assertEquals(SharePartitionState.ACTIVE, 
sharePartition.partitionState());
+        assertEquals(3, sharePartition.cachedState().size());
+        assertEquals(10, sharePartition.startOffset());
+        assertEquals(30, sharePartition.endOffset());
+        assertEquals(10, sharePartition.nextFetchOffset());
+
+        assertEquals(18, sharePartition.cachedState().get(15L).lastOffset());
+        assertEquals(22, sharePartition.cachedState().get(20L).lastOffset());
+        assertEquals(30, sharePartition.cachedState().get(26L).lastOffset());
+        assertEquals(RecordState.AVAILABLE, 
sharePartition.cachedState().get(15L).batchState());
+        assertEquals(RecordState.ARCHIVED, 
sharePartition.cachedState().get(20L).batchState());
+        assertEquals(RecordState.AVAILABLE, 
sharePartition.cachedState().get(26L).batchState());
+        assertNotNull(sharePartition.initialReadGapOffset());
+        assertEquals(10L, 
sharePartition.initialReadGapOffset().gapStartOffset());
+        assertEquals(30L, sharePartition.initialReadGapOffset().endOffset());
+
+        // Create a single batch record that ends in between the cached batch 
and the fetch offset is
+        // post startOffset.
+        MemoryRecords records = memoryRecords(16, 12);
+        // Set max fetch records to 500, records should be acquired till the 
last offset of the fetched batch.
+        List<AcquiredRecords> acquiredRecordsList = 
fetchAcquiredRecords(sharePartition.acquire(
+                MEMBER_ID,
+                BATCH_SIZE,
+                500,
+                10,
+                fetchPartitionData(records),
+                FETCH_ISOLATION_HWM),
+            13);
+
+        List<AcquiredRecords> expectedAcquiredRecords = new 
ArrayList<>(expectedAcquiredRecord(12, 14, 1));
+        expectedAcquiredRecords.addAll(expectedAcquiredRecord(15, 18, 3));
+        expectedAcquiredRecords.addAll(expectedAcquiredRecord(19, 19, 1));
+        expectedAcquiredRecords.addAll(expectedAcquiredRecord(23, 25, 1));
+        expectedAcquiredRecords.addAll(expectedAcquiredRecords(26, 27, 4));
+
+        assertArrayEquals(expectedAcquiredRecords.toArray(), 
acquiredRecordsList.toArray());
+        assertEquals(28, sharePartition.nextFetchOffset());
+        assertEquals(6, sharePartition.cachedState().size());
+        assertEquals(12, sharePartition.cachedState().get(12L).firstOffset());
+        assertEquals(14, sharePartition.cachedState().get(12L).lastOffset());
+        assertEquals(19, sharePartition.cachedState().get(19L).firstOffset());
+        assertEquals(19, sharePartition.cachedState().get(19L).lastOffset());
+        assertEquals(23, sharePartition.cachedState().get(23L).firstOffset());
+        assertEquals(25, sharePartition.cachedState().get(23L).lastOffset());
+
+        assertEquals(RecordState.ACQUIRED, 
sharePartition.cachedState().get(12L).batchState());
+        assertEquals(RecordState.ACQUIRED, 
sharePartition.cachedState().get(15L).batchState());
+        assertEquals(RecordState.ACQUIRED, 
sharePartition.cachedState().get(19L).batchState());
+        assertEquals(RecordState.ARCHIVED, 
sharePartition.cachedState().get(20L).batchState());
+        assertEquals(RecordState.ACQUIRED, 
sharePartition.cachedState().get(23L).batchState());
+        assertThrows(IllegalStateException.class, () -> 
sharePartition.cachedState().get(26L).batchState());
+        assertEquals(RecordState.ACQUIRED, 
sharePartition.cachedState().get(26L).offsetState().get(26L).state());
+        assertEquals(RecordState.ACQUIRED, 
sharePartition.cachedState().get(26L).offsetState().get(27L).state());
+        assertEquals(RecordState.AVAILABLE, 
sharePartition.cachedState().get(26L).offsetState().get(28L).state());
+        assertEquals(RecordState.AVAILABLE, 
sharePartition.cachedState().get(26L).offsetState().get(29L).state());
+        assertEquals(RecordState.AVAILABLE, 
sharePartition.cachedState().get(26L).offsetState().get(30L).state());
+        assertEquals(30L, sharePartition.endOffset());
+        assertNotNull(sharePartition.initialReadGapOffset());
+        assertEquals(28L, 
sharePartition.initialReadGapOffset().gapStartOffset());
+    }
+
+    @Test
+    public void testMaybeInitializeAndAcquireWithFetchBatchPriorStartOffset() {
+        Persister persister = Mockito.mock(Persister.class);
+        ReadShareGroupStateResult readShareGroupStateResult = 
Mockito.mock(ReadShareGroupStateResult.class);
+        
Mockito.when(readShareGroupStateResult.topicsData()).thenReturn(List.of(
+            new TopicData<>(TOPIC_ID_PARTITION.topicId(), List.of(
+                PartitionFactory.newPartitionAllData(0, 3, 10L, 
Errors.NONE.code(), Errors.NONE.message(),
+                    List.of(
+                        new PersisterStateBatch(15L, 18L, 
RecordState.AVAILABLE.id, (short) 2),
+                        new PersisterStateBatch(20L, 22L, 
RecordState.ARCHIVED.id, (short) 2),
+                        new PersisterStateBatch(26L, 30L, 
RecordState.AVAILABLE.id, (short) 3)))))));
+        
Mockito.when(persister.readState(Mockito.any())).thenReturn(CompletableFuture.completedFuture(readShareGroupStateResult));
+        SharePartition sharePartition = 
SharePartitionBuilder.builder().withPersister(persister).build();
+
+        CompletableFuture<Void> result = sharePartition.maybeInitialize();
+        assertTrue(result.isDone());
+        assertFalse(result.isCompletedExceptionally());
+
+        assertEquals(SharePartitionState.ACTIVE, 
sharePartition.partitionState());
+        assertEquals(3, sharePartition.cachedState().size());
+        assertEquals(10, sharePartition.startOffset());
+        assertEquals(30, sharePartition.endOffset());
+        assertEquals(10, sharePartition.nextFetchOffset());
+
+        assertEquals(18, sharePartition.cachedState().get(15L).lastOffset());
+        assertEquals(22, sharePartition.cachedState().get(20L).lastOffset());
+        assertEquals(30, sharePartition.cachedState().get(26L).lastOffset());
+        assertEquals(RecordState.AVAILABLE, 
sharePartition.cachedState().get(15L).batchState());
+        assertEquals(RecordState.ARCHIVED, 
sharePartition.cachedState().get(20L).batchState());
+        assertEquals(RecordState.AVAILABLE, 
sharePartition.cachedState().get(26L).batchState());
+        assertNotNull(sharePartition.initialReadGapOffset());
+        assertEquals(10L, 
sharePartition.initialReadGapOffset().gapStartOffset());
+        assertEquals(30L, sharePartition.initialReadGapOffset().endOffset());
+
+        // Create a single batch record where first offset is prior 
startOffset.
+        MemoryRecords records = memoryRecords(16, 6);
+        // Set max fetch records to 500, records should be acquired till the 
last offset of the fetched batch.
+        List<AcquiredRecords> acquiredRecordsList = 
fetchAcquiredRecords(sharePartition.acquire(
+                MEMBER_ID,
+                BATCH_SIZE,
+                500,
+                10,
+                fetchPartitionData(records),
+                FETCH_ISOLATION_HWM),
+            10);
+
+        List<AcquiredRecords> expectedAcquiredRecords = new 
ArrayList<>(expectedAcquiredRecord(10, 14, 1));
+        expectedAcquiredRecords.addAll(expectedAcquiredRecord(15, 18, 3));
+        expectedAcquiredRecords.addAll(expectedAcquiredRecord(19, 19, 1));
+
+        assertArrayEquals(expectedAcquiredRecords.toArray(), 
acquiredRecordsList.toArray());
+        assertEquals(23, sharePartition.nextFetchOffset());
+        assertEquals(5, sharePartition.cachedState().size());
+        assertEquals(10, sharePartition.cachedState().get(10L).firstOffset());
+        assertEquals(14, sharePartition.cachedState().get(10L).lastOffset());
+        assertEquals(19, sharePartition.cachedState().get(19L).firstOffset());
+        assertEquals(19, sharePartition.cachedState().get(19L).lastOffset());
+
+        assertEquals(RecordState.ACQUIRED, 
sharePartition.cachedState().get(10L).batchState());
+        assertEquals(RecordState.ACQUIRED, 
sharePartition.cachedState().get(15L).batchState());
+        assertEquals(RecordState.ACQUIRED, 
sharePartition.cachedState().get(19L).batchState());
+        assertEquals(RecordState.ARCHIVED, 
sharePartition.cachedState().get(20L).batchState());
+        assertEquals(RecordState.AVAILABLE, 
sharePartition.cachedState().get(26L).batchState());
+        assertEquals(30L, sharePartition.endOffset());
+        assertNotNull(sharePartition.initialReadGapOffset());
+        assertEquals(20L, 
sharePartition.initialReadGapOffset().gapStartOffset());
+    }
+
+    @Test
+    public void testMaybeInitializeAndAcquireWithMultipleBatches() {
+        Persister persister = Mockito.mock(Persister.class);
+        ReadShareGroupStateResult readShareGroupStateResult = 
Mockito.mock(ReadShareGroupStateResult.class);
+        
Mockito.when(readShareGroupStateResult.topicsData()).thenReturn(List.of(
+            new TopicData<>(TOPIC_ID_PARTITION.topicId(), List.of(
+                PartitionFactory.newPartitionAllData(0, 3, 5L, 
Errors.NONE.code(), Errors.NONE.message(),
+                    List.of(
+                        new PersisterStateBatch(15L, 18L, 
RecordState.AVAILABLE.id, (short) 2),
+                        new PersisterStateBatch(20L, 22L, 
RecordState.ARCHIVED.id, (short) 2),
+                        new PersisterStateBatch(26L, 30L, 
RecordState.AVAILABLE.id, (short) 3)))))));
+        
Mockito.when(persister.readState(Mockito.any())).thenReturn(CompletableFuture.completedFuture(readShareGroupStateResult));
+        SharePartition sharePartition = 
SharePartitionBuilder.builder().withPersister(persister).build();
+
+        CompletableFuture<Void> result = sharePartition.maybeInitialize();
+        assertTrue(result.isDone());
+        assertFalse(result.isCompletedExceptionally());
+
+        assertEquals(SharePartitionState.ACTIVE, 
sharePartition.partitionState());
+        assertEquals(3, sharePartition.cachedState().size());
+        assertEquals(5, sharePartition.startOffset());
+        assertEquals(30, sharePartition.endOffset());
+        assertEquals(5, sharePartition.nextFetchOffset());
+
+        assertEquals(18, sharePartition.cachedState().get(15L).lastOffset());
+        assertEquals(22, sharePartition.cachedState().get(20L).lastOffset());
+        assertEquals(30, sharePartition.cachedState().get(26L).lastOffset());
+        assertEquals(RecordState.AVAILABLE, 
sharePartition.cachedState().get(15L).batchState());
+        assertEquals(RecordState.ARCHIVED, 
sharePartition.cachedState().get(20L).batchState());
+        assertEquals(RecordState.AVAILABLE, 
sharePartition.cachedState().get(26L).batchState());
+        assertNotNull(sharePartition.initialReadGapOffset());
+        assertEquals(5L, 
sharePartition.initialReadGapOffset().gapStartOffset());
+        assertEquals(30L, sharePartition.initialReadGapOffset().endOffset());
+
+        // Create multiple batch records that covers the entire range from 5 
to 30 of initial read gap.
+        // The records in the batch are from 5 to 49.
+        ByteBuffer buffer = ByteBuffer.allocate(4096);
+        memoryRecordsBuilder(buffer, 2, 5).close();
+        memoryRecordsBuilder(buffer, 1, 8).close();
+        memoryRecordsBuilder(buffer, 2, 10).close();
+        memoryRecordsBuilder(buffer, 6, 13).close();
+        memoryRecordsBuilder(buffer, 3, 19).close();
+        memoryRecordsBuilder(buffer, 9, 22).close();
+        memoryRecordsBuilder(buffer, 19, 31).close();
+        buffer.flip();
+        MemoryRecords records = MemoryRecords.readableRecords(buffer);
+        // Set max fetch records to 1, records will be acquired till the first 
gap is encountered.
+        List<AcquiredRecords> acquiredRecordsList = 
fetchAcquiredRecords(sharePartition.acquire(
+                MEMBER_ID,
+                BATCH_SIZE,
+                1,
+                5L,
+                fetchPartitionData(records),
+                FETCH_ISOLATION_HWM),
+            2);
+
+        assertArrayEquals(expectedAcquiredRecord(5, 6, 1).toArray(), 
acquiredRecordsList.toArray());
+        assertEquals(7, sharePartition.nextFetchOffset());
+        assertEquals(4, sharePartition.cachedState().size());
+        assertEquals(5, sharePartition.cachedState().get(5L).firstOffset());
+        assertEquals(6, sharePartition.cachedState().get(5L).lastOffset());
+        assertEquals(RecordState.ACQUIRED, 
sharePartition.cachedState().get(5L).batchState());
+        assertEquals(1, 
sharePartition.cachedState().get(5L).batchDeliveryCount());
+        assertNull(sharePartition.cachedState().get(5L).offsetState());
+        assertEquals(RecordState.AVAILABLE, 
sharePartition.cachedState().get(15L).batchState());
+        assertEquals(RecordState.ARCHIVED, 
sharePartition.cachedState().get(20L).batchState());
+        assertEquals(RecordState.AVAILABLE, 
sharePartition.cachedState().get(26L).batchState());
+        assertNotNull(sharePartition.initialReadGapOffset());
+        assertEquals(7L, 
sharePartition.initialReadGapOffset().gapStartOffset());
+
+        // Remove first batch from the records as the fetch offset has moved 
forward to 7 offset.
+        List<RecordBatch> batch = TestUtils.toList(records.batches());
+        records = records.slice(batch.get(0).sizeInBytes(), 
records.sizeInBytes() - batch.get(0).sizeInBytes());
+        // Send the batch again to acquire the next set of records.
+        acquiredRecordsList = fetchAcquiredRecords(sharePartition.acquire(
+                MEMBER_ID,
+                BATCH_SIZE,
+                3,
+                7L,
+                fetchPartitionData(records),
+                FETCH_ISOLATION_HWM),
+            4);
+
+        assertArrayEquals(expectedAcquiredRecord(8, 11, 1).toArray(), 
acquiredRecordsList.toArray());
+        assertEquals(12, sharePartition.nextFetchOffset());
+        assertEquals(5, sharePartition.cachedState().size());
+        assertEquals(8, sharePartition.cachedState().get(8L).firstOffset());
+        assertEquals(11, sharePartition.cachedState().get(8L).lastOffset());
+        assertEquals(RecordState.ACQUIRED, 
sharePartition.cachedState().get(8L).batchState());
+        assertEquals(1, 
sharePartition.cachedState().get(8L).batchDeliveryCount());
+        assertNull(sharePartition.cachedState().get(8L).offsetState());
+        assertEquals(RecordState.AVAILABLE, 
sharePartition.cachedState().get(15L).batchState());
+        assertEquals(RecordState.ARCHIVED, 
sharePartition.cachedState().get(20L).batchState());
+        assertEquals(RecordState.AVAILABLE, 
sharePartition.cachedState().get(26L).batchState());
+        assertEquals(30L, sharePartition.endOffset());
+        assertNotNull(sharePartition.initialReadGapOffset());
+        assertEquals(12L, 
sharePartition.initialReadGapOffset().gapStartOffset());
+
+        // Remove the next 2 batches from the records as the fetch offset has 
moved forward to 12 offset.
+        int size = batch.get(1).sizeInBytes() + batch.get(2).sizeInBytes();
+        records = records.slice(size, records.sizeInBytes() - size);
+        // Send the records with 8 as max fetch records to acquire new and 
existing cached batches.
+        acquiredRecordsList = fetchAcquiredRecords(sharePartition.acquire(
+                MEMBER_ID,
+                BATCH_SIZE,
+                8,
+                12,
+                fetchPartitionData(records),
+                FETCH_ISOLATION_HWM),
+            10);
+
+        List<AcquiredRecords> expectedAcquiredRecords = new 
ArrayList<>(expectedAcquiredRecord(13, 14, 1));
+        expectedAcquiredRecords.addAll(expectedAcquiredRecord(15, 18, 3));
+        expectedAcquiredRecords.addAll(expectedAcquiredRecord(19, 19, 1));
+        expectedAcquiredRecords.addAll(expectedAcquiredRecord(23, 25, 1));
+
+        assertArrayEquals(expectedAcquiredRecords.toArray(), 
acquiredRecordsList.toArray());
+        assertEquals(26, sharePartition.nextFetchOffset());
+        assertEquals(8, sharePartition.cachedState().size());
+        assertEquals(13, sharePartition.cachedState().get(13L).firstOffset());
+        assertEquals(14, sharePartition.cachedState().get(13L).lastOffset());
+        assertEquals(RecordState.ACQUIRED, 
sharePartition.cachedState().get(13L).batchState());
+        assertEquals(RecordState.ACQUIRED, 
sharePartition.cachedState().get(15L).batchState());
+        assertEquals(19, sharePartition.cachedState().get(19L).firstOffset());
+        assertEquals(19, sharePartition.cachedState().get(19L).lastOffset());
+        assertEquals(RecordState.ACQUIRED, 
sharePartition.cachedState().get(19L).batchState());
+        assertEquals(RecordState.ARCHIVED, 
sharePartition.cachedState().get(20L).batchState());
+        assertEquals(23, sharePartition.cachedState().get(23L).firstOffset());
+        assertEquals(25, sharePartition.cachedState().get(23L).lastOffset());
+        assertEquals(RecordState.ACQUIRED, 
sharePartition.cachedState().get(23L).batchState());
+        assertEquals(RecordState.AVAILABLE, 
sharePartition.cachedState().get(26L).batchState());
+        assertEquals(30L, sharePartition.endOffset());
+        assertNotNull(sharePartition.initialReadGapOffset());
+        assertEquals(26L, 
sharePartition.initialReadGapOffset().gapStartOffset());
+
+        // Remove the next 2 batches from the records as the fetch offset has 
moved forward to 26 offset.
+        // Do not remove the 5th batch as it's only partially acquired.
+        size = batch.get(3).sizeInBytes() + batch.get(4).sizeInBytes();
+        records = records.slice(size, records.sizeInBytes() - size);
+        // Send the records with 10 as max fetch records to acquire the 
existing and till end of the
+        // fetched data.
+        acquiredRecordsList = fetchAcquiredRecords(sharePartition.acquire(
+                MEMBER_ID,
+                BATCH_SIZE,
+                10,
+                26,
+                fetchPartitionData(records),
+                FETCH_ISOLATION_HWM),
+            24);
+
+        expectedAcquiredRecords = new ArrayList<>(expectedAcquiredRecord(26, 
30, 4));
+        expectedAcquiredRecords.addAll(expectedAcquiredRecord(31, 49, 1));
+
+        assertArrayEquals(expectedAcquiredRecords.toArray(), 
acquiredRecordsList.toArray());
+        assertEquals(50, sharePartition.nextFetchOffset());
+        assertEquals(9, sharePartition.cachedState().size());
+        assertEquals(31, sharePartition.cachedState().get(31L).firstOffset());
+        assertEquals(49, sharePartition.cachedState().get(31L).lastOffset());
+        assertEquals(RecordState.ACQUIRED, 
sharePartition.cachedState().get(31L).batchState());
+        assertEquals(RecordState.ACQUIRED, 
sharePartition.cachedState().get(26L).batchState());
+        assertEquals(49L, sharePartition.endOffset());
+        // As all the gaps are now filled, the initialReadGapOffset should be 
null.
+        assertNull(sharePartition.initialReadGapOffset());
+    }
+
+    @Test
+    public void 
testMaybeInitializeAndAcquireWithMultipleBatchesAndLastOffsetWithinCachedBatch()
 {
+        Persister persister = Mockito.mock(Persister.class);
+        ReadShareGroupStateResult readShareGroupStateResult = 
Mockito.mock(ReadShareGroupStateResult.class);
+        
Mockito.when(readShareGroupStateResult.topicsData()).thenReturn(List.of(
+            new TopicData<>(TOPIC_ID_PARTITION.topicId(), List.of(
+                PartitionFactory.newPartitionAllData(0, 3, 5L, 
Errors.NONE.code(), Errors.NONE.message(),
+                    List.of(
+                        new PersisterStateBatch(15L, 18L, 
RecordState.AVAILABLE.id, (short) 2),
+                        new PersisterStateBatch(20L, 22L, 
RecordState.ARCHIVED.id, (short) 2),
+                        new PersisterStateBatch(26L, 30L, 
RecordState.AVAILABLE.id, (short) 3)))))));
+        
Mockito.when(persister.readState(Mockito.any())).thenReturn(CompletableFuture.completedFuture(readShareGroupStateResult));
+        SharePartition sharePartition = 
SharePartitionBuilder.builder().withPersister(persister).build();
+
+        CompletableFuture<Void> result = sharePartition.maybeInitialize();
+        assertTrue(result.isDone());
+        assertFalse(result.isCompletedExceptionally());
+
+        assertEquals(SharePartitionState.ACTIVE, 
sharePartition.partitionState());
+        assertEquals(3, sharePartition.cachedState().size());
+        assertEquals(5, sharePartition.startOffset());
+        assertEquals(30, sharePartition.endOffset());
+        assertEquals(5, sharePartition.nextFetchOffset());
+
+        assertEquals(18, sharePartition.cachedState().get(15L).lastOffset());
+        assertEquals(22, sharePartition.cachedState().get(20L).lastOffset());
+        assertEquals(30, sharePartition.cachedState().get(26L).lastOffset());
+        assertEquals(RecordState.AVAILABLE, 
sharePartition.cachedState().get(15L).batchState());
+        assertEquals(RecordState.ARCHIVED, 
sharePartition.cachedState().get(20L).batchState());
+        assertEquals(RecordState.AVAILABLE, 
sharePartition.cachedState().get(26L).batchState());
+        assertNotNull(sharePartition.initialReadGapOffset());
+        assertEquals(5L, 
sharePartition.initialReadGapOffset().gapStartOffset());
+        assertEquals(30L, sharePartition.initialReadGapOffset().endOffset());
+
+        // Create multiple batch records that ends in between the cached batch 
and the fetch offset is
+        // post startOffset.
+        ByteBuffer buffer = ByteBuffer.allocate(4096);
+        memoryRecordsBuilder(buffer, 2, 7).close();
+        memoryRecordsBuilder(buffer, 2, 10).close();
+        memoryRecordsBuilder(buffer, 6, 13).close();
+        // Though 19 offset is a gap but still be acquired.
+        memoryRecordsBuilder(buffer, 8, 20).close();
+        buffer.flip();
+        MemoryRecords records = MemoryRecords.readableRecords(buffer);
+        // Set max fetch records to 500, records should be acquired till the 
last offset of the fetched batch.
+        List<AcquiredRecords> acquiredRecordsList = 
fetchAcquiredRecords(sharePartition.acquire(
+                MEMBER_ID,
+                BATCH_SIZE,
+                500,
+                5,
+                fetchPartitionData(records),
+                FETCH_ISOLATION_HWM),
+            18);
+
+        List<AcquiredRecords> expectedAcquiredRecords = new 
ArrayList<>(expectedAcquiredRecord(7, 14, 1));
+        expectedAcquiredRecords.addAll(expectedAcquiredRecord(15, 18, 3));
+        expectedAcquiredRecords.addAll(expectedAcquiredRecord(19, 19, 1));
+        expectedAcquiredRecords.addAll(expectedAcquiredRecord(23, 25, 1));
+        expectedAcquiredRecords.addAll(expectedAcquiredRecords(26, 27, 4));
+
+        assertArrayEquals(expectedAcquiredRecords.toArray(), 
acquiredRecordsList.toArray());
+        assertEquals(28, sharePartition.nextFetchOffset());
+        assertEquals(6, sharePartition.cachedState().size());
+        assertEquals(7, sharePartition.cachedState().get(7L).firstOffset());
+        assertEquals(14, sharePartition.cachedState().get(7L).lastOffset());
+        assertEquals(19, sharePartition.cachedState().get(19L).firstOffset());
+        assertEquals(19, sharePartition.cachedState().get(19L).lastOffset());
+        assertEquals(23, sharePartition.cachedState().get(23L).firstOffset());
+        assertEquals(25, sharePartition.cachedState().get(23L).lastOffset());
+
+        assertEquals(RecordState.ACQUIRED, 
sharePartition.cachedState().get(7L).batchState());
+        assertEquals(RecordState.ACQUIRED, 
sharePartition.cachedState().get(15L).batchState());
+        assertEquals(RecordState.ACQUIRED, 
sharePartition.cachedState().get(19L).batchState());
+        assertEquals(RecordState.ARCHIVED, 
sharePartition.cachedState().get(20L).batchState());
+        assertEquals(RecordState.ACQUIRED, 
sharePartition.cachedState().get(23L).batchState());
+        assertThrows(IllegalStateException.class, () -> 
sharePartition.cachedState().get(26L).batchState());
+        assertEquals(RecordState.ACQUIRED, 
sharePartition.cachedState().get(26L).offsetState().get(26L).state());
+        assertEquals(RecordState.ACQUIRED, 
sharePartition.cachedState().get(26L).offsetState().get(27L).state());
+        assertEquals(RecordState.AVAILABLE, 
sharePartition.cachedState().get(26L).offsetState().get(28L).state());
+        assertEquals(RecordState.AVAILABLE, 
sharePartition.cachedState().get(26L).offsetState().get(29L).state());
+        assertEquals(RecordState.AVAILABLE, 
sharePartition.cachedState().get(26L).offsetState().get(30L).state());
+        assertEquals(30L, sharePartition.endOffset());
+        assertNotNull(sharePartition.initialReadGapOffset());
+        assertEquals(28L, 
sharePartition.initialReadGapOffset().gapStartOffset());
+    }
+
+    @Test
+    public void 
testMaybeInitializeAndAcquireWithMultipleBatchesPriorStartOffset() {
+        Persister persister = Mockito.mock(Persister.class);
+        ReadShareGroupStateResult readShareGroupStateResult = 
Mockito.mock(ReadShareGroupStateResult.class);
+        
Mockito.when(readShareGroupStateResult.topicsData()).thenReturn(List.of(
+            new TopicData<>(TOPIC_ID_PARTITION.topicId(), List.of(
+                PartitionFactory.newPartitionAllData(0, 3, 10L, 
Errors.NONE.code(), Errors.NONE.message(),
+                    List.of(
+                        new PersisterStateBatch(15L, 18L, 
RecordState.AVAILABLE.id, (short) 2),
+                        new PersisterStateBatch(20L, 22L, 
RecordState.ARCHIVED.id, (short) 2),
+                        new PersisterStateBatch(26L, 30L, 
RecordState.AVAILABLE.id, (short) 3)))))));
+        
Mockito.when(persister.readState(Mockito.any())).thenReturn(CompletableFuture.completedFuture(readShareGroupStateResult));
+        SharePartition sharePartition = 
SharePartitionBuilder.builder().withPersister(persister).build();
+
+        CompletableFuture<Void> result = sharePartition.maybeInitialize();
+        assertTrue(result.isDone());
+        assertFalse(result.isCompletedExceptionally());
+
+        assertEquals(SharePartitionState.ACTIVE, 
sharePartition.partitionState());
+        assertEquals(3, sharePartition.cachedState().size());
+        assertEquals(10, sharePartition.startOffset());
+        assertEquals(30, sharePartition.endOffset());
+        assertEquals(10, sharePartition.nextFetchOffset());
+
+        assertEquals(18, sharePartition.cachedState().get(15L).lastOffset());
+        assertEquals(22, sharePartition.cachedState().get(20L).lastOffset());
+        assertEquals(30, sharePartition.cachedState().get(26L).lastOffset());
+        assertEquals(RecordState.AVAILABLE, 
sharePartition.cachedState().get(15L).batchState());
+        assertEquals(RecordState.ARCHIVED, 
sharePartition.cachedState().get(20L).batchState());
+        assertEquals(RecordState.AVAILABLE, 
sharePartition.cachedState().get(26L).batchState());
+        assertNotNull(sharePartition.initialReadGapOffset());
+        assertEquals(10L, 
sharePartition.initialReadGapOffset().gapStartOffset());
+        assertEquals(30L, sharePartition.initialReadGapOffset().endOffset());
+
+        // Create multiple batch records where multiple batches base offsets 
are prior startOffset.
+        ByteBuffer buffer = ByteBuffer.allocate(4096);
+        memoryRecordsBuilder(buffer, 2, 3).close();
+        memoryRecordsBuilder(buffer, 1, 6).close();
+        memoryRecordsBuilder(buffer, 4, 8).close();
+        memoryRecordsBuilder(buffer, 10, 13).close();
+        buffer.flip();
+        MemoryRecords records = MemoryRecords.readableRecords(buffer);
+        // Set max fetch records to 500, records should be acquired till the 
last offset of the fetched batch.
+        List<AcquiredRecords> acquiredRecordsList = 
fetchAcquiredRecords(sharePartition.acquire(
+                MEMBER_ID,
+                BATCH_SIZE,
+                500,
+                10,
+                fetchPartitionData(records),
+                FETCH_ISOLATION_HWM),
+            10);
+
+        List<AcquiredRecords> expectedAcquiredRecords = new 
ArrayList<>(expectedAcquiredRecord(10, 14, 1));
+        expectedAcquiredRecords.addAll(expectedAcquiredRecord(15, 18, 3));
+        expectedAcquiredRecords.addAll(expectedAcquiredRecord(19, 19, 1));
+
+        assertArrayEquals(expectedAcquiredRecords.toArray(), 
acquiredRecordsList.toArray());
+        assertEquals(23, sharePartition.nextFetchOffset());
+        assertEquals(5, sharePartition.cachedState().size());
+        assertEquals(10, sharePartition.cachedState().get(10L).firstOffset());
+        assertEquals(14, sharePartition.cachedState().get(10L).lastOffset());
+        assertEquals(19, sharePartition.cachedState().get(19L).firstOffset());
+        assertEquals(19, sharePartition.cachedState().get(19L).lastOffset());
+
+        assertEquals(RecordState.ACQUIRED, 
sharePartition.cachedState().get(10L).batchState());
+        assertEquals(RecordState.ACQUIRED, 
sharePartition.cachedState().get(15L).batchState());
+        assertEquals(RecordState.ACQUIRED, 
sharePartition.cachedState().get(19L).batchState());
+        assertEquals(RecordState.ARCHIVED, 
sharePartition.cachedState().get(20L).batchState());
+        assertEquals(RecordState.AVAILABLE, 
sharePartition.cachedState().get(26L).batchState());
+        assertEquals(30L, sharePartition.endOffset());
+        assertNotNull(sharePartition.initialReadGapOffset());
+        assertEquals(20L, 
sharePartition.initialReadGapOffset().gapStartOffset());
+    }
+
     @Test
     public void testAcquireSingleRecord() throws InterruptedException {
         SharePartition sharePartition = SharePartitionBuilder.builder()
diff --git 
a/server/src/main/java/org/apache/kafka/server/share/acknowledge/ShareAcknowledgementBatch.java
 
b/server/src/main/java/org/apache/kafka/server/share/acknowledge/ShareAcknowledgementBatch.java
index bc29f37c62f..96efa5eb29e 100644
--- 
a/server/src/main/java/org/apache/kafka/server/share/acknowledge/ShareAcknowledgementBatch.java
+++ 
b/server/src/main/java/org/apache/kafka/server/share/acknowledge/ShareAcknowledgementBatch.java
@@ -24,6 +24,9 @@ import java.util.List;
  * The class abstracts the acknowledgement request for 
<code>SharePartition</code> class constructed
  * from {@link 
org.apache.kafka.common.message.ShareFetchRequestData.AcknowledgementBatch} and
  * {@link 
org.apache.kafka.common.message.ShareAcknowledgeRequestData.AcknowledgementBatch}
 classes.
+ * <p>
+ * Acknowledge types are represented as a list of bytes, where each byte 
corresponds to an acknowledge
+ * type defined in {@link org.apache.kafka.clients.consumer.AcknowledgeType}.
  */
 public record ShareAcknowledgementBatch(
     long firstOffset,

Reply via email to