This is an automated email from the ASF dual-hosted git repository.

schofielaj pushed a commit to branch trunk
in repository https://gitbox.apache.org/repos/asf/kafka.git


The following commit(s) were added to refs/heads/trunk by this push:
     new 01587d09d82 KAFKA-18494-3: solution for the bug relating to gaps in 
the share partition cachedStates post initialization (#18696)
01587d09d82 is described below

commit 01587d09d82861144f57ba87067c01e72329a127
Author: Chirag Wadhwa <[email protected]>
AuthorDate: Wed Feb 5 20:46:25 2025 +0530

    KAFKA-18494-3: solution for the bug relating to gaps in the share partition 
cachedStates post initialization (#18696)
    
    Reviewers: Apoorv Mittal <[email protected]>, Abhinav Dixit 
<[email protected]>, Andrew Schofield <[email protected]>
---
 .../java/kafka/server/share/SharePartition.java    |  161 ++-
 .../kafka/server/share/SharePartitionTest.java     | 1350 +++++++++++++++++---
 2 files changed, 1352 insertions(+), 159 deletions(-)

diff --git a/core/src/main/java/kafka/server/share/SharePartition.java 
b/core/src/main/java/kafka/server/share/SharePartition.java
index 65dd7374720..2ba8574f785 100644
--- a/core/src/main/java/kafka/server/share/SharePartition.java
+++ b/core/src/main/java/kafka/server/share/SharePartition.java
@@ -287,6 +287,12 @@ public class SharePartition {
      */
     private long endOffset;
 
+    /**
+     * The initial read gap offset tracks if there are any gaps in the 
in-flight batch during intial
+     * read of the share partition state from the persister.
+     */
+    private InitialReadGapOffset initialReadGapOffset;
+
     /**
      * We maintain the latest fetch offset and its metadata to estimate the 
minBytes requirement more efficiently.
      */
@@ -444,6 +450,11 @@ public class SharePartition {
                 stateEpoch = partitionData.stateEpoch();
 
                 List<PersisterStateBatch> stateBatches = 
partitionData.stateBatches();
+                long gapStartOffset = -1;
+                // The previousBatchLastOffset is used to track the last 
offset of the previous batch.
+                // For the first batch that should ideally start from 
startOffset if there are no gaps,
+                // we assume the previousBatchLastOffset to be startOffset - 1.
+                long previousBatchLastOffset = startOffset - 1;
                 for (PersisterStateBatch stateBatch : stateBatches) {
                     if (stateBatch.firstOffset() < startOffset) {
                         log.error("Invalid state batch found for the share 
partition: {}-{}. The base offset: {}"
@@ -452,6 +463,10 @@ public class SharePartition {
                         throwable = new 
IllegalStateException(String.format("Failed to initialize the share partition 
%s-%s", groupId, topicIdPartition));
                         return;
                     }
+                    if (gapStartOffset == -1 && stateBatch.firstOffset() > 
previousBatchLastOffset + 1) {
+                        gapStartOffset = previousBatchLastOffset + 1;
+                    }
+                    previousBatchLastOffset = stateBatch.lastOffset();
                     InFlightBatch inFlightBatch = new 
InFlightBatch(EMPTY_MEMBER_ID, stateBatch.firstOffset(),
                         stateBatch.lastOffset(), 
RecordState.forId(stateBatch.deliveryState()), stateBatch.deliveryCount(), 
null);
                     cachedState.put(stateBatch.firstOffset(), inFlightBatch);
@@ -462,6 +477,10 @@ public class SharePartition {
                     // in the cached state are not missed
                     findNextFetchOffset.set(true);
                     endOffset = 
cachedState.lastEntry().getValue().lastOffset();
+                    // initialReadGapOffset is not required, if there are no 
gaps in the read state response
+                    if (gapStartOffset != -1) {
+                        initialReadGapOffset = new 
InitialReadGapOffset(endOffset, gapStartOffset);
+                    }
                     // In case the persister read state RPC result contains no 
AVAILABLE records, we can update cached state
                     // and start/end offsets.
                     maybeUpdateCachedStateAndOffsets();
@@ -538,8 +557,20 @@ public class SharePartition {
             }
 
             long nextFetchOffset = -1;
-
+            long gapStartOffset = isInitialReadGapOffsetWindowActive() ? 
initialReadGapOffset.gapStartOffset() : -1;
             for (Map.Entry<Long, InFlightBatch> entry : 
cachedState.entrySet()) {
+                // Check if there exists any gap in the in-flight batch which 
needs to be fetched. If
+                // initialReadGapOffset's endOffset is equal to the share 
partition's endOffset, then
+                // only the initial gaps should be considered. Once share 
partition's endOffset is past
+                // initial read end offset then all gaps are anyway fetched.
+                if (isInitialReadGapOffsetWindowActive()) {
+                    if (entry.getKey() > gapStartOffset) {
+                        nextFetchOffset = gapStartOffset;
+                        break;
+                    }
+                    gapStartOffset = entry.getValue().lastOffset() + 1;
+                }
+
                 // Check if the state is maintained per offset or batch. If 
the offsetState
                 // is not maintained then the batch state is used to determine 
the offsets state.
                 if (entry.getValue().offsetState() == null) {
@@ -638,6 +669,9 @@ public class SharePartition {
             List<AcquiredRecords> result = new ArrayList<>();
             // The acquired count is used to track the number of records 
acquired for the request.
             int acquiredCount = 0;
+            // This tracks whether there is a gap between the subMap entries. 
If a gap is found, we will acquire
+            // the corresponding offsets in a separate batch.
+            long maybeGapStartOffset = baseOffset;
             // The fetched records are already part of the in-flight records. 
The records might
             // be available for re-delivery hence try acquiring same. The 
request batches could
             // be an exact match, subset or span over multiple already fetched 
batches.
@@ -648,6 +682,26 @@ public class SharePartition {
                 }
 
                 InFlightBatch inFlightBatch = entry.getValue();
+                // If the initialReadGapOffset window is active, we need to 
treat the gaps in between the window as
+                // acquirable. Once the window is inactive (when we have 
acquired all the gaps inside the window),
+                // the remaining gaps are natural (data does not exist at 
those offsets) and we need not acquire them.
+                if (isInitialReadGapOffsetWindowActive()) {
+                    // If nextBatchStartOffset is less than the key of the 
entry, this means the fetch happened for a gap in the cachedState.
+                    // Thus, a new batch needs to be acquired for the gap.
+                    if (maybeGapStartOffset < entry.getKey()) {
+                        ShareAcquiredRecords shareAcquiredRecords = 
acquireNewBatchRecords(memberId, fetchPartitionData.records.batches(),
+                            maybeGapStartOffset, entry.getKey() - 1, 
batchSize, maxFetchRecords);
+                        result.addAll(shareAcquiredRecords.acquiredRecords());
+                        acquiredCount += shareAcquiredRecords.count();
+                    }
+                    // Set nextBatchStartOffset as the last offset of the 
current in-flight batch + 1
+                    maybeGapStartOffset = inFlightBatch.lastOffset() + 1;
+                    // If the acquired count is equal to the max fetch records 
then break the loop.
+                    if (acquiredCount >= maxFetchRecords) {
+                        break;
+                    }
+                }
+
                 // Compute if the batch is a full match.
                 boolean fullMatch = checkForFullMatch(inFlightBatch, 
firstBatch.baseOffset(), lastBatch.lastOffset());
 
@@ -715,6 +769,9 @@ public class SharePartition {
                 result.addAll(shareAcquiredRecords.acquiredRecords());
                 acquiredCount += shareAcquiredRecords.count();
             }
+            if (!result.isEmpty()) {
+                maybeUpdateReadGapFetchOffset(result.get(result.size() - 
1).lastOffset() + 1);
+            }
             return new ShareAcquiredRecords(result, acquiredCount);
         } finally {
             lock.writeLock().unlock();
@@ -1177,6 +1234,24 @@ public class SharePartition {
         };
     }
 
+    // Method to reduce the window that tracks gaps in the cachedState
+    private void maybeUpdateReadGapFetchOffset(long offset) {
+        lock.writeLock().lock();
+        try {
+            if (initialReadGapOffset != null) {
+                if (initialReadGapOffset.endOffset() == endOffset) {
+                    initialReadGapOffset.gapStartOffset(offset);
+                } else {
+                    // The initial read gap offset is not valid anymore as the 
end offset has moved
+                    // beyond the initial read gap offset. Hence, reset the 
initial read gap offset.
+                    initialReadGapOffset = null;
+                }
+            }
+        } finally {
+            lock.writeLock().unlock();
+        }
+    }
+
     private ShareAcquiredRecords acquireNewBatchRecords(
         String memberId,
         Iterable<? extends RecordBatch> batches,
@@ -1212,7 +1287,15 @@ public class SharePartition {
             if (cachedState.firstKey() == firstAcquiredOffset)  {
                 startOffset = firstAcquiredOffset;
             }
-            endOffset = lastAcquiredOffset;
+
+            // If the new batch acquired is part of a gap in the cachedState, 
then endOffset should not be updated.
+            // Ex. if startOffset is 10, endOffset is 30, there is a gap from 
10 to 20, and an inFlight batch from 21 to 30.
+            // In this case, the nextFetchOffset results in 10 and the records 
are fetched. A new batch is acquired from
+            // 10 to 20, but the endOffset remains at 30.
+            if (lastAcquiredOffset > endOffset) {
+                endOffset = lastAcquiredOffset;
+            }
+            maybeUpdateReadGapFetchOffset(lastAcquiredOffset + 1);
             return new ShareAcquiredRecords(acquiredRecords, (int) 
(lastAcquiredOffset - firstAcquiredOffset + 1));
         } finally {
             lock.writeLock().unlock();
@@ -1791,10 +1874,23 @@ public class SharePartition {
             long firstKeyToRemove = cachedState.firstKey();
             long lastKeyToRemove;
             NavigableMap.Entry<Long, InFlightBatch> entry = 
cachedState.floorEntry(lastOffsetAcknowledged);
+            // If the lastOffsetAcknowledged is equal to the last offset of 
entry, then the entire batch can potentially be removed.
             if (lastOffsetAcknowledged == entry.getValue().lastOffset()) {
                 startOffset = cachedState.higherKey(lastOffsetAcknowledged);
+                if (isInitialReadGapOffsetWindowActive()) {
+                    // This case will arise if we have a situation where there 
is an acquirable gap after the lastOffsetAcknowledged.
+                    // Ex, the cachedState has following state batches -> {(0, 
10), (11, 20), (31,40)} and all these batches are acked.
+                    // There is a gap from 21 to 30. Let the 
initialReadGapOffset.gapStartOffset be 21. In this case,
+                    // lastOffsetAcknowledged will be 20, but we cannot simply 
move the start offset to the first offset
+                    // of next cachedState batch (next cachedState batch is 31 
to 40). There is an acquirable gap in between (21 to 30)
+                    // and The startOffset should be at 21. Hence, we set 
startOffset to the minimum of initialReadGapOffset.gapStartOffset
+                    // and higher key of lastOffsetAcknowledged
+                    startOffset = 
Math.min(initialReadGapOffset.gapStartOffset(), startOffset);
+                }
                 lastKeyToRemove = entry.getKey();
             } else {
+                // The code will reach this point only if 
lastOffsetAcknowledged is in the middle of some stateBatch. In this case
+                // we can simply move the startOffset to the next offset of 
lastOffsetAcknowledged and should consider any read gap offsets.
                 startOffset = lastOffsetAcknowledged + 1;
                 if (entry.getKey().equals(cachedState.firstKey())) {
                     // If the first batch in cachedState has some records yet 
to be acknowledged,
@@ -1825,8 +1921,17 @@ public class SharePartition {
 
         NavigableMap.Entry<Long, InFlightBatch> entry = 
cachedState.floorEntry(startOffset);
         if (entry == null) {
-            log.error("The start offset: {} is not found in the cached state 
for share partition: {}-{}."
-                + " Cannot move the start offset.", startOffset, groupId, 
topicIdPartition);
+            // The start offset is not found in the cached state when there is 
a gap starting at the start offset.
+            // For example, if the start offset is 10 and the cached state has 
batches -> { (21, 30), (31, 40) }.
+            // This case arises only when the share partition is initialized 
and the read state response results in
+            // state batches containing gaps. This situation is possible in 
the case where in the previous instance
+            // of this share partition, the gap offsets were fetched but not 
acknowledged, and the next batch of offsets
+            // were fetched as well as acknowledged. In the above example, 
possibly in the previous instance of the share
+            // partition, the batch 10-20 was fetched but not acknowledged and 
the batch 21-30 was fetched and acknowledged.
+            // Thus, the persister has no clue about what happened with the 
batch 10-20. During the re-initialization of
+            // the share partition, the start offset is set to 10 and the 
cached state has the batch 21-30, resulting in a gap.
+            log.debug("The start offset: {} is not found in the cached state 
for share partition: {}-{} " +
+                "as there is an acquirable gap at the beginning. Cannot move 
the start offset.", startOffset, groupId, topicIdPartition);
             return false;
         }
         RecordState startOffsetState = entry.getValue().offsetState == null ?
@@ -1835,6 +1940,10 @@ public class SharePartition {
         return isRecordStateAcknowledged(startOffsetState);
     }
 
+    private boolean isInitialReadGapOffsetWindowActive() {
+        return initialReadGapOffset != null && 
initialReadGapOffset.endOffset() == endOffset;
+    }
+
     /**
      * The record state is considered acknowledged if it is either 
acknowledged or archived.
      * These are terminal states for the record.
@@ -1847,12 +1956,18 @@ public class SharePartition {
         return recordState == RecordState.ACKNOWLEDGED || recordState == 
RecordState.ARCHIVED;
     }
 
-    private long findLastOffsetAcknowledged() {
-        lock.readLock().lock();
+    // Visible for testing
+    long findLastOffsetAcknowledged() {
         long lastOffsetAcknowledged = -1;
+        lock.readLock().lock();
         try {
             for (NavigableMap.Entry<Long, InFlightBatch> entry : 
cachedState.entrySet()) {
                 InFlightBatch inFlightBatch = entry.getValue();
+
+                if (isInitialReadGapOffsetWindowActive() && 
inFlightBatch.lastOffset() >= initialReadGapOffset.gapStartOffset()) {
+                    return lastOffsetAcknowledged;
+                }
+
                 if (inFlightBatch.offsetState() == null) {
                     if 
(!isRecordStateAcknowledged(inFlightBatch.batchState())) {
                         return lastOffsetAcknowledged;
@@ -2205,6 +2320,40 @@ public class SharePartition {
         return timer;
     }
 
+    // Visible for testing
+    InitialReadGapOffset initialReadGapOffset() {
+        return initialReadGapOffset;
+    }
+
+    /**
+     * The InitialReadGapOffset class is used to record the gap start and end 
offset of the probable gaps
+     * of available records which are neither known to Persister nor to 
SharePartition. Share Partition
+     * will use this information to determine the next fetch offset and should 
try to fetch the records
+     * in the gap.
+     */
+    // Visible for Testing
+    static class InitialReadGapOffset {
+        private final long endOffset;
+        private long gapStartOffset;
+
+        InitialReadGapOffset(long endOffset, long gapStartOffset) {
+            this.endOffset = endOffset;
+            this.gapStartOffset = gapStartOffset;
+        }
+
+        long endOffset() {
+            return endOffset;
+        }
+
+        long gapStartOffset() {
+            return gapStartOffset;
+        }
+
+        void gapStartOffset(long gapStartOffset) {
+            this.gapStartOffset = gapStartOffset;
+        }
+    }
+
     // Visible for testing
     final class AcquisitionLockTimerTask extends TimerTask {
         private final long expirationMs;
diff --git a/core/src/test/java/kafka/server/share/SharePartitionTest.java 
b/core/src/test/java/kafka/server/share/SharePartitionTest.java
index 54cfa8cbd80..f4a04b3bb42 100644
--- a/core/src/test/java/kafka/server/share/SharePartitionTest.java
+++ b/core/src/test/java/kafka/server/share/SharePartitionTest.java
@@ -896,6 +896,167 @@ public class SharePartitionTest {
         assertThrows(RuntimeException.class, sharePartition2::maybeInitialize);
     }
 
+    @Test
+    public void testMaybeInitializeStateBatchesWithGapAtBeginning() {
+        Persister persister = Mockito.mock(Persister.class);
+        ReadShareGroupStateResult readShareGroupStateResult = 
Mockito.mock(ReadShareGroupStateResult.class);
+        
Mockito.when(readShareGroupStateResult.topicsData()).thenReturn(List.of(
+            new TopicData<>(TOPIC_ID_PARTITION.topicId(), List.of(
+                PartitionFactory.newPartitionAllData(0, 3, 10L, 
Errors.NONE.code(), Errors.NONE.message(),
+                    Arrays.asList(
+                        new PersisterStateBatch(15L, 20L, 
RecordState.ACKNOWLEDGED.id, (short) 2), // There is a gap from 10 to 14
+                        new PersisterStateBatch(21L, 30L, 
RecordState.ARCHIVED.id, (short) 3)))))));
+        
Mockito.when(persister.readState(Mockito.any())).thenReturn(CompletableFuture.completedFuture(readShareGroupStateResult));
+        SharePartition sharePartition = 
SharePartitionBuilder.builder().withPersister(persister).build();
+
+        CompletableFuture<Void> result = sharePartition.maybeInitialize();
+        assertTrue(result.isDone());
+        assertFalse(result.isCompletedExceptionally());
+
+        assertEquals(SharePartitionState.ACTIVE, 
sharePartition.partitionState());
+        assertFalse(sharePartition.cachedState().isEmpty());
+        assertEquals(10, sharePartition.startOffset());
+        assertEquals(30, sharePartition.endOffset());
+        assertEquals(3, sharePartition.stateEpoch());
+        assertEquals(10, sharePartition.nextFetchOffset());
+
+        assertEquals(2, sharePartition.cachedState().size());
+        assertNotNull(sharePartition.cachedState().get(15L));
+        assertNotNull(sharePartition.cachedState().get(21L));
+
+        assertEquals(20, sharePartition.cachedState().get(15L).lastOffset());
+        assertEquals(RecordState.ACKNOWLEDGED, 
sharePartition.cachedState().get(15L).batchState());
+        assertEquals(2, 
sharePartition.cachedState().get(15L).batchDeliveryCount());
+        assertNull(sharePartition.cachedState().get(15L).offsetState());
+
+        assertEquals(30, sharePartition.cachedState().get(21L).lastOffset());
+        assertEquals(RecordState.ARCHIVED, 
sharePartition.cachedState().get(21L).batchState());
+        assertEquals(3, 
sharePartition.cachedState().get(21L).batchDeliveryCount());
+        assertNull(sharePartition.cachedState().get(21L).offsetState());
+
+        SharePartition.InitialReadGapOffset initialReadGapOffset = 
sharePartition.initialReadGapOffset();
+        assertNotNull(initialReadGapOffset);
+
+        assertEquals(10, initialReadGapOffset.gapStartOffset());
+        assertEquals(30, initialReadGapOffset.endOffset());
+    }
+
+    @Test
+    public void testMaybeInitializeStateBatchesWithMultipleGaps() {
+        Persister persister = Mockito.mock(Persister.class);
+        ReadShareGroupStateResult readShareGroupStateResult = 
Mockito.mock(ReadShareGroupStateResult.class);
+        
Mockito.when(readShareGroupStateResult.topicsData()).thenReturn(List.of(
+            new TopicData<>(TOPIC_ID_PARTITION.topicId(), List.of(
+                PartitionFactory.newPartitionAllData(0, 3, 10L, 
Errors.NONE.code(), Errors.NONE.message(),
+                    Arrays.asList(
+                        new PersisterStateBatch(15L, 20L, 
RecordState.ACKNOWLEDGED.id, (short) 2), // There is a gap from 10 to 14
+                        new PersisterStateBatch(30L, 40L, 
RecordState.ARCHIVED.id, (short) 3))))))); // There is a gap from 21 to 29
+        
Mockito.when(persister.readState(Mockito.any())).thenReturn(CompletableFuture.completedFuture(readShareGroupStateResult));
+        SharePartition sharePartition = 
SharePartitionBuilder.builder().withPersister(persister).build();
+
+        CompletableFuture<Void> result = sharePartition.maybeInitialize();
+        assertTrue(result.isDone());
+        assertFalse(result.isCompletedExceptionally());
+
+        assertEquals(SharePartitionState.ACTIVE, 
sharePartition.partitionState());
+        assertFalse(sharePartition.cachedState().isEmpty());
+        assertEquals(10, sharePartition.startOffset());
+        assertEquals(40, sharePartition.endOffset());
+        assertEquals(3, sharePartition.stateEpoch());
+        assertEquals(10, sharePartition.nextFetchOffset());
+
+        assertEquals(2, sharePartition.cachedState().size());
+        assertNotNull(sharePartition.cachedState().get(15L));
+        assertNotNull(sharePartition.cachedState().get(30L));
+
+        assertEquals(20, sharePartition.cachedState().get(15L).lastOffset());
+        assertEquals(RecordState.ACKNOWLEDGED, 
sharePartition.cachedState().get(15L).batchState());
+        assertEquals(2, 
sharePartition.cachedState().get(15L).batchDeliveryCount());
+        assertNull(sharePartition.cachedState().get(15L).offsetState());
+
+        assertEquals(40, sharePartition.cachedState().get(30L).lastOffset());
+        assertEquals(RecordState.ARCHIVED, 
sharePartition.cachedState().get(30L).batchState());
+        assertEquals(3, 
sharePartition.cachedState().get(30L).batchDeliveryCount());
+        assertNull(sharePartition.cachedState().get(30L).offsetState());
+
+        SharePartition.InitialReadGapOffset initialReadGapOffset = 
sharePartition.initialReadGapOffset();
+        assertNotNull(initialReadGapOffset);
+
+        assertEquals(10, initialReadGapOffset.gapStartOffset());
+        assertEquals(40, initialReadGapOffset.endOffset());
+    }
+
+    @Test
+    public void testMaybeInitializeStateBatchesWithGapNotAtBeginning() {
+        Persister persister = Mockito.mock(Persister.class);
+        ReadShareGroupStateResult readShareGroupStateResult = 
Mockito.mock(ReadShareGroupStateResult.class);
+        
Mockito.when(readShareGroupStateResult.topicsData()).thenReturn(List.of(
+            new TopicData<>(TOPIC_ID_PARTITION.topicId(), List.of(
+                PartitionFactory.newPartitionAllData(0, 3, 15L, 
Errors.NONE.code(), Errors.NONE.message(),
+                    Arrays.asList(
+                        new PersisterStateBatch(15L, 20L, 
RecordState.ACKNOWLEDGED.id, (short) 2),
+                        new PersisterStateBatch(30L, 40L, 
RecordState.ARCHIVED.id, (short) 3))))))); // There is a gap from 21 to 29
+        
Mockito.when(persister.readState(Mockito.any())).thenReturn(CompletableFuture.completedFuture(readShareGroupStateResult));
+        SharePartition sharePartition = 
SharePartitionBuilder.builder().withPersister(persister).build();
+
+        CompletableFuture<Void> result = sharePartition.maybeInitialize();
+        assertTrue(result.isDone());
+        assertFalse(result.isCompletedExceptionally());
+
+        assertEquals(SharePartitionState.ACTIVE, 
sharePartition.partitionState());
+        assertFalse(sharePartition.cachedState().isEmpty());
+        // The start offset will be moved to 21, since the offsets 15 to 20 
are acknowledged, and will be removed
+        // from cached state in the maybeUpdateCachedStateAndOffsets method
+        assertEquals(21, sharePartition.startOffset());
+        assertEquals(40, sharePartition.endOffset());
+        assertEquals(3, sharePartition.stateEpoch());
+        assertEquals(21, sharePartition.nextFetchOffset());
+
+        assertEquals(1, sharePartition.cachedState().size());
+        assertNotNull(sharePartition.cachedState().get(30L));
+
+        assertEquals(40, sharePartition.cachedState().get(30L).lastOffset());
+        assertEquals(RecordState.ARCHIVED, 
sharePartition.cachedState().get(30L).batchState());
+        assertEquals(3, 
sharePartition.cachedState().get(30L).batchDeliveryCount());
+        assertNull(sharePartition.cachedState().get(30L).offsetState());
+
+        SharePartition.InitialReadGapOffset initialReadGapOffset = 
sharePartition.initialReadGapOffset();
+        assertNotNull(initialReadGapOffset);
+
+        assertEquals(21, initialReadGapOffset.gapStartOffset());
+        assertEquals(40, initialReadGapOffset.endOffset());
+    }
+
+    @Test
+    public void testMaybeInitializeStateBatchesWithoutGaps() {
+        Persister persister = Mockito.mock(Persister.class);
+        ReadShareGroupStateResult readShareGroupStateResult = 
Mockito.mock(ReadShareGroupStateResult.class);
+        
Mockito.when(readShareGroupStateResult.topicsData()).thenReturn(List.of(
+            new TopicData<>(TOPIC_ID_PARTITION.topicId(), List.of(
+                PartitionFactory.newPartitionAllData(0, 3, 15L, 
Errors.NONE.code(), Errors.NONE.message(),
+                    Arrays.asList(
+                        new PersisterStateBatch(15L, 20L, 
RecordState.ACKNOWLEDGED.id, (short) 2),
+                        new PersisterStateBatch(21L, 30L, 
RecordState.ARCHIVED.id, (short) 3)))))));
+        
Mockito.when(persister.readState(Mockito.any())).thenReturn(CompletableFuture.completedFuture(readShareGroupStateResult));
+        SharePartition sharePartition = 
SharePartitionBuilder.builder().withPersister(persister).build();
+
+        CompletableFuture<Void> result = sharePartition.maybeInitialize();
+        assertTrue(result.isDone());
+        assertFalse(result.isCompletedExceptionally());
+
+        assertEquals(SharePartitionState.ACTIVE, 
sharePartition.partitionState());
+        assertTrue(sharePartition.cachedState().isEmpty());
+        assertEquals(31, sharePartition.startOffset());
+        assertEquals(31, sharePartition.endOffset());
+        assertEquals(3, sharePartition.stateEpoch());
+        assertEquals(31, sharePartition.nextFetchOffset());
+
+        SharePartition.InitialReadGapOffset initialReadGapOffset = 
sharePartition.initialReadGapOffset();
+
+        // Since there are no gaps present in the readState response, 
initialReadGapOffset should be null
+        assertNull(initialReadGapOffset);
+    }
+
     @Test
     public void testAcquireSingleRecord() {
         SharePartition sharePartition = 
SharePartitionBuilder.builder().withState(SharePartitionState.ACTIVE).build();
@@ -1926,189 +2087,992 @@ public class SharePartitionTest {
         expectedOffsetStateMap.put(14L, new 
InFlightState(RecordState.ACQUIRED, (short) 1, MEMBER_ID));
         assertEquals(expectedOffsetStateMap, 
sharePartition.cachedState().get(10L).offsetState());
 
-        // Send the same fetch request batch again but only 2 offsets should 
come as acquired.
-        acquiredRecordsList = fetchAcquiredRecords(sharePartition.acquire(
-            MEMBER_ID,
-            BATCH_SIZE,
-            MAX_FETCH_RECORDS,
-            new FetchPartitionData(Errors.NONE, 20, 3, records,
-                Optional.empty(), OptionalLong.empty(), Optional.empty(), 
OptionalInt.empty(), false)),
-            2);
+        // Send the same fetch request batch again but only 2 offsets should 
come as acquired.
+        acquiredRecordsList = fetchAcquiredRecords(sharePartition.acquire(
+            MEMBER_ID,
+            BATCH_SIZE,
+            MAX_FETCH_RECORDS,
+            new FetchPartitionData(Errors.NONE, 20, 3, records,
+                Optional.empty(), OptionalLong.empty(), Optional.empty(), 
OptionalInt.empty(), false)),
+            2);
+
+        assertArrayEquals(expectedAcquiredRecords(12, 13, 2).toArray(), 
acquiredRecordsList.toArray());
+        assertEquals(15, sharePartition.nextFetchOffset());
+    }
+
+    @Test
+    public void testAcquireReleasedRecordMultipleBatches() {
+        SharePartition sharePartition = 
SharePartitionBuilder.builder().withState(SharePartitionState.ACTIVE).build();
+        // First fetch request with 5 records starting from offset 10.
+        MemoryRecords records1 = memoryRecords(5, 10);
+        // Second fetch request with 5 records starting from offset 15.
+        MemoryRecords records2 = memoryRecords(5, 15);
+        // Third fetch request with 5 records starting from offset 23, gap of 
3 offsets.
+        MemoryRecords records3 = memoryRecords(5, 23);
+        // Fourth fetch request with 5 records starting from offset 28.
+        MemoryRecords records4 = memoryRecords(5, 28);
+
+        List<AcquiredRecords> acquiredRecordsList = 
fetchAcquiredRecords(sharePartition.acquire(
+            MEMBER_ID,
+            BATCH_SIZE,
+            MAX_FETCH_RECORDS,
+            new FetchPartitionData(Errors.NONE, 40, 3, records1,
+                Optional.empty(), OptionalLong.empty(), Optional.empty(), 
OptionalInt.empty(), false)),
+            5);
+
+        assertArrayEquals(expectedAcquiredRecords(records1, 1).toArray(), 
acquiredRecordsList.toArray());
+        assertEquals(15, sharePartition.nextFetchOffset());
+
+        acquiredRecordsList = fetchAcquiredRecords(sharePartition.acquire(
+            MEMBER_ID,
+            BATCH_SIZE,
+            MAX_FETCH_RECORDS,
+            new FetchPartitionData(Errors.NONE, 30, 3, records2,
+                Optional.empty(), OptionalLong.empty(), Optional.empty(), 
OptionalInt.empty(), false)),
+            5);
+
+        assertArrayEquals(expectedAcquiredRecords(records2, 1).toArray(), 
acquiredRecordsList.toArray());
+        assertEquals(20, sharePartition.nextFetchOffset());
+
+        acquiredRecordsList = fetchAcquiredRecords(sharePartition.acquire(
+            MEMBER_ID,
+            BATCH_SIZE,
+            MAX_FETCH_RECORDS,
+            new FetchPartitionData(Errors.NONE, 30, 3, records3,
+                Optional.empty(), OptionalLong.empty(), Optional.empty(), 
OptionalInt.empty(), false)),
+            5);
+
+        assertArrayEquals(expectedAcquiredRecords(records3, 1).toArray(), 
acquiredRecordsList.toArray());
+        assertEquals(28, sharePartition.nextFetchOffset());
+
+        acquiredRecordsList = fetchAcquiredRecords(sharePartition.acquire(
+            MEMBER_ID,
+            BATCH_SIZE,
+            MAX_FETCH_RECORDS,
+            new FetchPartitionData(Errors.NONE, 30, 3, records4,
+                Optional.empty(), OptionalLong.empty(), Optional.empty(), 
OptionalInt.empty(), false)),
+            5);
+
+        assertArrayEquals(expectedAcquiredRecords(records4, 1).toArray(), 
acquiredRecordsList.toArray());
+        assertEquals(33, sharePartition.nextFetchOffset());
+
+        assertEquals(RecordState.ACQUIRED, 
sharePartition.cachedState().get(10L).batchState());
+        assertEquals(RecordState.ACQUIRED, 
sharePartition.cachedState().get(15L).batchState());
+        assertEquals(RecordState.ACQUIRED, 
sharePartition.cachedState().get(23L).batchState());
+        assertEquals(RecordState.ACQUIRED, 
sharePartition.cachedState().get(28L).batchState());
+        assertNull(sharePartition.cachedState().get(10L).offsetState());
+        assertNull(sharePartition.cachedState().get(15L).offsetState());
+        assertNull(sharePartition.cachedState().get(23L).offsetState());
+        assertNull(sharePartition.cachedState().get(28L).offsetState());
+
+        CompletableFuture<Void> ackResult = sharePartition.acknowledge(
+            MEMBER_ID,
+            List.of(new ShareAcknowledgementBatch(12, 30, 
Collections.singletonList((byte) 2))));
+        assertNull(ackResult.join());
+        assertFalse(ackResult.isCompletedExceptionally());
+
+        assertEquals(12, sharePartition.nextFetchOffset());
+        assertEquals(4, sharePartition.cachedState().size());
+        assertThrows(IllegalStateException.class, () -> 
sharePartition.cachedState().get(10L).batchState());
+        assertNotNull(sharePartition.cachedState().get(10L).offsetState());
+        assertEquals(RecordState.AVAILABLE, 
sharePartition.cachedState().get(15L).batchState());
+        assertNull(sharePartition.cachedState().get(15L).offsetState());
+        assertEquals(EMPTY_MEMBER_ID, 
sharePartition.cachedState().get(15L).batchMemberId());
+        assertEquals(RecordState.AVAILABLE, 
sharePartition.cachedState().get(23L).batchState());
+        assertNull(sharePartition.cachedState().get(23L).offsetState());
+        assertEquals(EMPTY_MEMBER_ID, 
sharePartition.cachedState().get(23L).batchMemberId());
+        assertThrows(IllegalStateException.class, () -> 
sharePartition.cachedState().get(28L).batchState());
+        assertNotNull(sharePartition.cachedState().get(28L).offsetState());
+
+        Map<Long, InFlightState> expectedOffsetStateMap = new HashMap<>();
+        expectedOffsetStateMap.put(10L, new 
InFlightState(RecordState.ACQUIRED, (short) 1, MEMBER_ID));
+        expectedOffsetStateMap.put(11L, new 
InFlightState(RecordState.ACQUIRED, (short) 1, MEMBER_ID));
+        expectedOffsetStateMap.put(12L, new 
InFlightState(RecordState.AVAILABLE, (short) 1, EMPTY_MEMBER_ID));
+        expectedOffsetStateMap.put(13L, new 
InFlightState(RecordState.AVAILABLE, (short) 1, EMPTY_MEMBER_ID));
+        expectedOffsetStateMap.put(14L, new 
InFlightState(RecordState.AVAILABLE, (short) 1, EMPTY_MEMBER_ID));
+        assertEquals(expectedOffsetStateMap, 
sharePartition.cachedState().get(10L).offsetState());
+
+        expectedOffsetStateMap.clear();
+        expectedOffsetStateMap.put(28L, new 
InFlightState(RecordState.AVAILABLE, (short) 1, EMPTY_MEMBER_ID));
+        expectedOffsetStateMap.put(29L, new 
InFlightState(RecordState.AVAILABLE, (short) 1, EMPTY_MEMBER_ID));
+        expectedOffsetStateMap.put(30L, new 
InFlightState(RecordState.AVAILABLE, (short) 1, EMPTY_MEMBER_ID));
+        expectedOffsetStateMap.put(31L, new 
InFlightState(RecordState.ACQUIRED, (short) 1, MEMBER_ID));
+        expectedOffsetStateMap.put(32L, new 
InFlightState(RecordState.ACQUIRED, (short) 1, MEMBER_ID));
+        assertEquals(expectedOffsetStateMap, 
sharePartition.cachedState().get(28L).offsetState());
+
+        // Send next batch from offset 12, only 3 records should be acquired.
+        acquiredRecordsList = fetchAcquiredRecords(sharePartition.acquire(
+            MEMBER_ID,
+            BATCH_SIZE,
+            MAX_FETCH_RECORDS,
+            new FetchPartitionData(Errors.NONE, 40, 3, records1,
+                Optional.empty(), OptionalLong.empty(), Optional.empty(), 
OptionalInt.empty(), false)),
+            3);
+
+        assertArrayEquals(expectedAcquiredRecords(12, 14, 2).toArray(), 
acquiredRecordsList.toArray());
+        assertEquals(15, sharePartition.nextFetchOffset());
+
+        // Though record2 batch exists to acquire but send batch record3, it 
should be acquired but
+        // next fetch offset should not move.
+        acquiredRecordsList = fetchAcquiredRecords(sharePartition.acquire(
+            MEMBER_ID,
+            BATCH_SIZE,
+            MAX_FETCH_RECORDS,
+            new FetchPartitionData(Errors.NONE, 40, 3, records3,
+                Optional.empty(), OptionalLong.empty(), Optional.empty(), 
OptionalInt.empty(), false)),
+            5);
+
+        assertArrayEquals(expectedAcquiredRecords(records3, 2).toArray(), 
acquiredRecordsList.toArray());
+        assertEquals(15, sharePartition.nextFetchOffset());
+
+        // Acquire partial records from batch 2.
+        MemoryRecords subsetRecords = memoryRecords(2, 17);
+        acquiredRecordsList = fetchAcquiredRecords(sharePartition.acquire(
+            MEMBER_ID,
+            BATCH_SIZE,
+            MAX_FETCH_RECORDS,
+            new FetchPartitionData(Errors.NONE, 20, 3, subsetRecords,
+                Optional.empty(), OptionalLong.empty(), Optional.empty(), 
OptionalInt.empty(), false)),
+            2);
+
+        assertArrayEquals(expectedAcquiredRecords(17, 18, 2).toArray(), 
acquiredRecordsList.toArray());
+        // Next fetch offset should not move.
+        assertEquals(15, sharePartition.nextFetchOffset());
+
+        // Acquire partial records from record 4 to further test if the next 
fetch offset move
+        // accordingly once complete record 2 is also acquired.
+        subsetRecords = memoryRecords(1, 28);
+        acquiredRecordsList = fetchAcquiredRecords(sharePartition.acquire(
+            MEMBER_ID,
+            BATCH_SIZE,
+            MAX_FETCH_RECORDS,
+            new FetchPartitionData(Errors.NONE, 20, 3, subsetRecords,
+                Optional.empty(), OptionalLong.empty(), Optional.empty(), 
OptionalInt.empty(), false)),
+            1);
+
+        assertArrayEquals(expectedAcquiredRecords(28, 28, 2).toArray(), 
acquiredRecordsList.toArray());
+        // Next fetch offset should not move.
+        assertEquals(15, sharePartition.nextFetchOffset());
+
+        // Try to acquire complete record 2 though it's already partially 
acquired, the next fetch
+        // offset should move.
+        acquiredRecordsList = fetchAcquiredRecords(sharePartition.acquire(
+            MEMBER_ID,
+            BATCH_SIZE,
+            MAX_FETCH_RECORDS,
+            new FetchPartitionData(Errors.NONE, 20, 3, records2,
+                Optional.empty(), OptionalLong.empty(), Optional.empty(), 
OptionalInt.empty(), false)),
+            3);
+
+        // Offset 15,16 and 19 should be acquired.
+        List<AcquiredRecords> expectedAcquiredRecords = 
expectedAcquiredRecords(15, 16, 2);
+        expectedAcquiredRecords.addAll(expectedAcquiredRecords(19, 19, 2));
+        assertArrayEquals(expectedAcquiredRecords.toArray(), 
acquiredRecordsList.toArray());
+        // Next fetch offset should not move.
+        assertEquals(29, sharePartition.nextFetchOffset());
+    }
+
+    @Test
+    public void testAcquireGapAtBeginningAndRecordsFetchedFromGap() {
+        Persister persister = Mockito.mock(Persister.class);
+        ReadShareGroupStateResult readShareGroupStateResult = 
Mockito.mock(ReadShareGroupStateResult.class);
+        
Mockito.when(readShareGroupStateResult.topicsData()).thenReturn(List.of(
+            new TopicData<>(TOPIC_ID_PARTITION.topicId(), List.of(
+                PartitionFactory.newPartitionAllData(0, 3, 11L, 
Errors.NONE.code(), Errors.NONE.message(),
+                    Arrays.asList(
+                        new PersisterStateBatch(21L, 30L, 
RecordState.ACKNOWLEDGED.id, (short) 2), // There is a gap from 11 to 20
+                        new PersisterStateBatch(31L, 40L, 
RecordState.ARCHIVED.id, (short) 1)
+                    ))))));
+        
Mockito.when(persister.readState(Mockito.any())).thenReturn(CompletableFuture.completedFuture(readShareGroupStateResult));
+
+        SharePartition sharePartition = 
SharePartitionBuilder.builder().withPersister(persister).build();
+
+        sharePartition.maybeInitialize();
+
+        // All records fetched are part of the gap. The gap is from 11 to 20, 
fetched offsets are 11 to 15.
+        MemoryRecords records = memoryRecords(5, 11);
+
+        List<AcquiredRecords> acquiredRecordsList = 
fetchAcquiredRecords(sharePartition.acquire(
+                MEMBER_ID,
+                BATCH_SIZE,
+                MAX_FETCH_RECORDS,
+                new FetchPartitionData(Errors.NONE, 3, 0, records,
+                    Optional.empty(), OptionalLong.empty(), Optional.empty(), 
OptionalInt.empty(), false)),
+            5);
+
+        assertArrayEquals(expectedAcquiredRecord(11, 15, 1).toArray(), 
acquiredRecordsList.toArray());
+
+        assertEquals(SharePartitionState.ACTIVE, 
sharePartition.partitionState());
+        assertFalse(sharePartition.cachedState().isEmpty());
+        assertEquals(11, sharePartition.startOffset());
+        assertEquals(40, sharePartition.endOffset());
+        assertEquals(3, sharePartition.stateEpoch());
+        assertEquals(16, sharePartition.nextFetchOffset());
+
+        SharePartition.InitialReadGapOffset initialReadGapOffset = 
sharePartition.initialReadGapOffset();
+        assertNotNull(initialReadGapOffset);
+
+        // After records are acquired, the initialReadGapOffset should be 
updated
+        assertEquals(16, initialReadGapOffset.gapStartOffset());
+        assertEquals(40, initialReadGapOffset.endOffset());
+    }
+
+    @Test
+    public void 
testAcquireGapAtBeginningAndFetchedRecordsOverlapInFlightBatches() {
+        Persister persister = Mockito.mock(Persister.class);
+        ReadShareGroupStateResult readShareGroupStateResult = 
Mockito.mock(ReadShareGroupStateResult.class);
+        
Mockito.when(readShareGroupStateResult.topicsData()).thenReturn(List.of(
+            new TopicData<>(TOPIC_ID_PARTITION.topicId(), List.of(
+                PartitionFactory.newPartitionAllData(0, 3, 11L, 
Errors.NONE.code(), Errors.NONE.message(),
+                    Arrays.asList(
+                        new PersisterStateBatch(21L, 30L, 
RecordState.ACKNOWLEDGED.id, (short) 2), // There is a gap from 11 to 20
+                        new PersisterStateBatch(31L, 40L, 
RecordState.ARCHIVED.id, (short) 1)
+                    ))))));
+        
Mockito.when(persister.readState(Mockito.any())).thenReturn(CompletableFuture.completedFuture(readShareGroupStateResult));
+
+        SharePartition sharePartition = 
SharePartitionBuilder.builder().withPersister(persister).build();
+
+        sharePartition.maybeInitialize();
+
+        // Fetched offsets overlap the inFlight batches. The gap is from 11 to 
20, but fetched records are from 11 to 25.
+        MemoryRecords records = memoryRecords(15, 11);
+
+        List<AcquiredRecords> acquiredRecordsList = 
fetchAcquiredRecords(sharePartition.acquire(
+                MEMBER_ID,
+                BATCH_SIZE,
+                MAX_FETCH_RECORDS,
+                new FetchPartitionData(Errors.NONE, 3, 0, records,
+                    Optional.empty(), OptionalLong.empty(), Optional.empty(), 
OptionalInt.empty(), false)),
+            10);
+
+        assertArrayEquals(expectedAcquiredRecord(11, 20, 1).toArray(), 
acquiredRecordsList.toArray());
+
+        assertEquals(SharePartitionState.ACTIVE, 
sharePartition.partitionState());
+        assertFalse(sharePartition.cachedState().isEmpty());
+        assertEquals(11, sharePartition.startOffset());
+        assertEquals(40, sharePartition.endOffset());
+        assertEquals(3, sharePartition.stateEpoch());
+        assertEquals(41, sharePartition.nextFetchOffset());
+
+        SharePartition.InitialReadGapOffset initialReadGapOffset = 
sharePartition.initialReadGapOffset();
+        assertNotNull(initialReadGapOffset);
+
+        // After records are acquired, the initialReadGapOffset should be 
updated
+        assertEquals(21, initialReadGapOffset.gapStartOffset());
+        assertEquals(40, initialReadGapOffset.endOffset());
+    }
+
+    @Test
+    public void 
testAcquireGapAtBeginningAndFetchedRecordsOverlapInFlightAvailableBatches() {
+        Persister persister = Mockito.mock(Persister.class);
+        ReadShareGroupStateResult readShareGroupStateResult = 
Mockito.mock(ReadShareGroupStateResult.class);
+        
Mockito.when(readShareGroupStateResult.topicsData()).thenReturn(List.of(
+            new TopicData<>(TOPIC_ID_PARTITION.topicId(), List.of(
+                PartitionFactory.newPartitionAllData(0, 3, 11L, 
Errors.NONE.code(), Errors.NONE.message(),
+                    Arrays.asList(
+                        new PersisterStateBatch(21L, 30L, 
RecordState.AVAILABLE.id, (short) 2), // There is a gap from 11 to 20
+                        new PersisterStateBatch(31L, 40L, 
RecordState.ARCHIVED.id, (short) 1)
+                    ))))));
+        
Mockito.when(persister.readState(Mockito.any())).thenReturn(CompletableFuture.completedFuture(readShareGroupStateResult));
+
+        WriteShareGroupStateResult writeShareGroupStateResult = 
Mockito.mock(WriteShareGroupStateResult.class);
+        
Mockito.when(writeShareGroupStateResult.topicsData()).thenReturn(List.of(
+            new TopicData<>(TOPIC_ID_PARTITION.topicId(), List.of(
+                PartitionFactory.newPartitionErrorData(0, Errors.NONE.code(), 
Errors.NONE.message())))));
+        
Mockito.when(persister.writeState(Mockito.any())).thenReturn(CompletableFuture.completedFuture(writeShareGroupStateResult));
+
+        SharePartition sharePartition = 
SharePartitionBuilder.builder().withPersister(persister).build();
+
+        sharePartition.maybeInitialize();
+
+        // Fetched offsets overlap the inFlight batches. The gap is from 11 to 
20, but fetched records are from 11 to 25.
+        MemoryRecords records = memoryRecords(15, 11);
+
+        List<AcquiredRecords> acquiredRecordsList = 
fetchAcquiredRecords(sharePartition.acquire(
+                MEMBER_ID,
+                BATCH_SIZE,
+                MAX_FETCH_RECORDS,
+                new FetchPartitionData(Errors.NONE, 3, 0, records,
+                    Optional.empty(), OptionalLong.empty(), Optional.empty(), 
OptionalInt.empty(), false)),
+            15);
+
+        // The gap from 11 to 20 will be acquired. Since the next batch is 
AVAILABLE, and we records fetched from replica manager
+        // overlap with the next batch, some records from the next batch will 
also be acquired
+        List<AcquiredRecords> expectedAcquiredRecord = new 
ArrayList<>(expectedAcquiredRecord(11, 20, 1));
+        expectedAcquiredRecord.addAll(expectedAcquiredRecord(21, 21, 3));
+        expectedAcquiredRecord.addAll(expectedAcquiredRecord(22, 22, 3));
+        expectedAcquiredRecord.addAll(expectedAcquiredRecord(23, 23, 3));
+        expectedAcquiredRecord.addAll(expectedAcquiredRecord(24, 24, 3));
+        expectedAcquiredRecord.addAll(expectedAcquiredRecord(25, 25, 3));
+        assertArrayEquals(expectedAcquiredRecord.toArray(), 
acquiredRecordsList.toArray());
+
+        assertEquals(SharePartitionState.ACTIVE, 
sharePartition.partitionState());
+        assertFalse(sharePartition.cachedState().isEmpty());
+        assertEquals(11, sharePartition.startOffset());
+        assertEquals(40, sharePartition.endOffset());
+        assertEquals(3, sharePartition.stateEpoch());
+        assertEquals(26, sharePartition.nextFetchOffset());
+
+        SharePartition.InitialReadGapOffset initialReadGapOffset = 
sharePartition.initialReadGapOffset();
+        assertNotNull(initialReadGapOffset);
+
+        // After records are acquired, the initialReadGapOffset should be 
updated
+        assertEquals(26, initialReadGapOffset.gapStartOffset());
+        assertEquals(40, initialReadGapOffset.endOffset());
+    }
+
+    @Test
+    public void 
testAcquireWhenCachedStateContainsGapsAndRecordsFetchedFromNonGapOffset() {
+        Persister persister = Mockito.mock(Persister.class);
+        ReadShareGroupStateResult readShareGroupStateResult = 
Mockito.mock(ReadShareGroupStateResult.class);
+        
Mockito.when(readShareGroupStateResult.topicsData()).thenReturn(List.of(
+            new TopicData<>(TOPIC_ID_PARTITION.topicId(), List.of(
+                PartitionFactory.newPartitionAllData(0, 3, 11L, 
Errors.NONE.code(), Errors.NONE.message(),
+                    Arrays.asList(
+                        new PersisterStateBatch(11L, 20L, 
RecordState.AVAILABLE.id, (short) 2),
+                        new PersisterStateBatch(31L, 40L, 
RecordState.ARCHIVED.id, (short) 1) // There is a gap from 21-30
+                    ))))));
+        
Mockito.when(persister.readState(Mockito.any())).thenReturn(CompletableFuture.completedFuture(readShareGroupStateResult));
+
+        SharePartition sharePartition = 
SharePartitionBuilder.builder().withPersister(persister).build();
+
+        sharePartition.maybeInitialize();
+
+        // Fetched records are part of inFlightBatch 11-20 with state 
AVAILABLE. Fetched offsets also overlap the
+        // inFlight batches. The gap is from 11 to 20, but fetched records are 
from 11 to 25.
+        MemoryRecords records = memoryRecords(15, 11);
+
+        List<AcquiredRecords> acquiredRecordsList = 
fetchAcquiredRecords(sharePartition.acquire(
+                MEMBER_ID,
+                BATCH_SIZE,
+                MAX_FETCH_RECORDS,
+                new FetchPartitionData(Errors.NONE, 3, 0, records,
+                    Optional.empty(), OptionalLong.empty(), Optional.empty(), 
OptionalInt.empty(), false)),
+            15);
+
+        // 2 different batches will be acquired this time (11-20 and 21-25). 
The first batch will have delivery count 3
+        // as previous deliveryCount was 2. The second batch will have 
delivery count 1 as it is acquired for the first time.
+        List<AcquiredRecords> expectedAcquiredRecord = new 
ArrayList<>(expectedAcquiredRecord(11, 20, 3));
+        expectedAcquiredRecord.addAll(expectedAcquiredRecord(21, 25, 1));
+        assertArrayEquals(expectedAcquiredRecord.toArray(), 
acquiredRecordsList.toArray());
+
+        assertEquals(SharePartitionState.ACTIVE, 
sharePartition.partitionState());
+        assertFalse(sharePartition.cachedState().isEmpty());
+        assertEquals(11, sharePartition.startOffset());
+        assertEquals(40, sharePartition.endOffset());
+        assertEquals(3, sharePartition.stateEpoch());
+        assertEquals(26, sharePartition.nextFetchOffset());
+
+        SharePartition.InitialReadGapOffset initialReadGapOffset = 
sharePartition.initialReadGapOffset();
+        assertNotNull(initialReadGapOffset);
+
+        // After records are acquired, the initialReadGapOffset should be 
updated
+        assertEquals(26, initialReadGapOffset.gapStartOffset());
+        assertEquals(40, initialReadGapOffset.endOffset());
+    }
+
+    @Test
+    public void 
testAcquireGapAtBeginningAndFetchedRecordsOverlapMultipleInFlightBatches() {
+        Persister persister = Mockito.mock(Persister.class);
+        ReadShareGroupStateResult readShareGroupStateResult = 
Mockito.mock(ReadShareGroupStateResult.class);
+        
Mockito.when(readShareGroupStateResult.topicsData()).thenReturn(List.of(
+            new TopicData<>(TOPIC_ID_PARTITION.topicId(), List.of(
+                PartitionFactory.newPartitionAllData(0, 3, 11L, 
Errors.NONE.code(), Errors.NONE.message(),
+                    Arrays.asList(
+                        new PersisterStateBatch(21L, 30L, 
RecordState.ACKNOWLEDGED.id, (short) 2), // There is a gap from 11 to 20
+                        new PersisterStateBatch(41L, 50L, 
RecordState.AVAILABLE.id, (short) 1), // There is a gap from 31 to 40
+                        new PersisterStateBatch(61L, 70L, 
RecordState.ARCHIVED.id, (short) 1), // There is a gap from 51 to 60
+                        new PersisterStateBatch(81L, 90L, 
RecordState.AVAILABLE.id, (short) 1) // There is a gap from 71 to 80
+                    ))))));
+        
Mockito.when(persister.readState(Mockito.any())).thenReturn(CompletableFuture.completedFuture(readShareGroupStateResult));
+
+        SharePartition sharePartition = 
SharePartitionBuilder.builder().withPersister(persister).build();
+
+        sharePartition.maybeInitialize();
+
+        MemoryRecords records = memoryRecords(75, 11);
+
+        List<AcquiredRecords> acquiredRecordsList = 
fetchAcquiredRecords(sharePartition.acquire(
+                MEMBER_ID,
+                BATCH_SIZE,
+                MAX_FETCH_RECORDS,
+                new FetchPartitionData(Errors.NONE, 3, 0, records,
+                    Optional.empty(), OptionalLong.empty(), Optional.empty(), 
OptionalInt.empty(), false)),
+            55);
+
+        // Acquired batches will contain the following ->
+        // 1. 11-20 (gap offsets)
+        // 2. 31-40 (gap offsets)
+        // 3. 41-50 (AVAILABLE batch in cachedState)
+        // 4. 51-60 (gap offsets)
+        // 5. 71-80 (gap offsets)
+        // 6. 81-85 (AVAILABLE batch in cachedState). These will be acquired 
as separate batches because we are breaking a batch in the cachedState
+        List<AcquiredRecords> expectedAcquiredRecord = new 
ArrayList<>(expectedAcquiredRecord(11, 20, 1));
+        expectedAcquiredRecord.addAll(expectedAcquiredRecord(31, 40, 1));
+        expectedAcquiredRecord.addAll(expectedAcquiredRecord(41, 50, 2));
+        expectedAcquiredRecord.addAll(expectedAcquiredRecord(51, 60, 1));
+        expectedAcquiredRecord.addAll(expectedAcquiredRecord(71, 80, 1));
+        expectedAcquiredRecord.addAll(expectedAcquiredRecord(81, 81, 2));
+        expectedAcquiredRecord.addAll(expectedAcquiredRecord(82, 82, 2));
+        expectedAcquiredRecord.addAll(expectedAcquiredRecord(83, 83, 2));
+        expectedAcquiredRecord.addAll(expectedAcquiredRecord(84, 84, 2));
+        expectedAcquiredRecord.addAll(expectedAcquiredRecord(85, 85, 2));
+        assertArrayEquals(expectedAcquiredRecord.toArray(), 
acquiredRecordsList.toArray());
+
+        assertEquals(SharePartitionState.ACTIVE, 
sharePartition.partitionState());
+        assertFalse(sharePartition.cachedState().isEmpty());
+        assertEquals(11, sharePartition.startOffset());
+        assertEquals(90, sharePartition.endOffset());
+        assertEquals(3, sharePartition.stateEpoch());
+        assertEquals(86, sharePartition.nextFetchOffset());
+
+        SharePartition.InitialReadGapOffset initialReadGapOffset = 
sharePartition.initialReadGapOffset();
+        assertNotNull(initialReadGapOffset);
+
+        // After records are acquired, the initialReadGapOffset should be 
updated
+        assertEquals(86, initialReadGapOffset.gapStartOffset());
+        assertEquals(90, initialReadGapOffset.endOffset());
+    }
+
+    @Test
+    public void testAcquireGapAtBeginningAndFetchedRecordsEndJustBeforeGap() {
+        Persister persister = Mockito.mock(Persister.class);
+        ReadShareGroupStateResult readShareGroupStateResult = 
Mockito.mock(ReadShareGroupStateResult.class);
+        
Mockito.when(readShareGroupStateResult.topicsData()).thenReturn(List.of(
+            new TopicData<>(TOPIC_ID_PARTITION.topicId(), List.of(
+                PartitionFactory.newPartitionAllData(0, 3, 11L, 
Errors.NONE.code(), Errors.NONE.message(),
+                    Arrays.asList(
+                        new PersisterStateBatch(21L, 30L, 
RecordState.AVAILABLE.id, (short) 2), // There is a gap from 11 to 20
+                        new PersisterStateBatch(41L, 50L, 
RecordState.ACKNOWLEDGED.id, (short) 1), // There is a gap from 31 to 40
+                        new PersisterStateBatch(61L, 70L, 
RecordState.ARCHIVED.id, (short) 1) // There is a gap from 51 to 60
+                    ))))));
+        
Mockito.when(persister.readState(Mockito.any())).thenReturn(CompletableFuture.completedFuture(readShareGroupStateResult));
+
+        SharePartition sharePartition = 
SharePartitionBuilder.builder().withPersister(persister).build();
+
+        sharePartition.maybeInitialize();
+
+        MemoryRecords records = memoryRecords(20, 11);
+
+        List<AcquiredRecords> acquiredRecordsList = 
fetchAcquiredRecords(sharePartition.acquire(
+                MEMBER_ID,
+                BATCH_SIZE,
+                MAX_FETCH_RECORDS,
+                new FetchPartitionData(Errors.NONE, 3, 0, records,
+                    Optional.empty(), OptionalLong.empty(), Optional.empty(), 
OptionalInt.empty(), false)),
+            20);
+
+        // Acquired batches will contain the following ->
+        // 1. 11-20 (gap offsets)
+        // 2. 21-30 (AVAILABLE batch in cachedState)
+        List<AcquiredRecords> expectedAcquiredRecord = new 
ArrayList<>(expectedAcquiredRecord(11, 20, 1));
+        expectedAcquiredRecord.addAll(expectedAcquiredRecord(21, 30, 3));
+        assertArrayEquals(expectedAcquiredRecord.toArray(), 
acquiredRecordsList.toArray());
+
+        assertEquals(SharePartitionState.ACTIVE, 
sharePartition.partitionState());
+        assertFalse(sharePartition.cachedState().isEmpty());
+        assertEquals(11, sharePartition.startOffset());
+        assertEquals(70, sharePartition.endOffset());
+        assertEquals(3, sharePartition.stateEpoch());
+        assertEquals(31, sharePartition.nextFetchOffset());
+
+        SharePartition.InitialReadGapOffset initialReadGapOffset = 
sharePartition.initialReadGapOffset();
+        assertNotNull(initialReadGapOffset);
+
+        // After records are acquired, the initialReadGapOffset should be 
updated
+        assertEquals(31, initialReadGapOffset.gapStartOffset());
+        assertEquals(70, initialReadGapOffset.endOffset());
+    }
+
+    @Test
+    public void 
testAcquireGapAtBeginningAndFetchedRecordsIncludeGapOffsetsAtEnd() {
+        Persister persister = Mockito.mock(Persister.class);
+        ReadShareGroupStateResult readShareGroupStateResult = 
Mockito.mock(ReadShareGroupStateResult.class);
+        
Mockito.when(readShareGroupStateResult.topicsData()).thenReturn(List.of(
+            new TopicData<>(TOPIC_ID_PARTITION.topicId(), List.of(
+                PartitionFactory.newPartitionAllData(0, 3, 11L, 
Errors.NONE.code(), Errors.NONE.message(),
+                    Arrays.asList(
+                        new PersisterStateBatch(21L, 30L, 
RecordState.ACKNOWLEDGED.id, (short) 2), // There is a gap from 11 to 20
+                        new PersisterStateBatch(41L, 50L, 
RecordState.AVAILABLE.id, (short) 1), // There is a gap from 31 to 40
+                        new PersisterStateBatch(61L, 70L, 
RecordState.ARCHIVED.id, (short) 1), // There is a gap from 51 to 60
+                        new PersisterStateBatch(81L, 90L, 
RecordState.AVAILABLE.id, (short) 1) // There is a gap from 71 to 80
+                    ))))));
+        
Mockito.when(persister.readState(Mockito.any())).thenReturn(CompletableFuture.completedFuture(readShareGroupStateResult));
+
+        SharePartition sharePartition = 
SharePartitionBuilder.builder().withPersister(persister).build();
+
+        sharePartition.maybeInitialize();
+
+        MemoryRecords records = memoryRecords(65, 11);
+
+        List<AcquiredRecords> acquiredRecordsList = 
fetchAcquiredRecords(sharePartition.acquire(
+                MEMBER_ID,
+                BATCH_SIZE,
+                MAX_FETCH_RECORDS,
+                new FetchPartitionData(Errors.NONE, 3, 0, records,
+                    Optional.empty(), OptionalLong.empty(), Optional.empty(), 
OptionalInt.empty(), false)),
+            45);
+
+        // Acquired batches will contain the following ->
+        // 1. 11-20 (gap offsets)
+        // 2. 31-40 (gap offsets)
+        // 3. 41-50 (AVAILABLE batch in cachedState)
+        // 4. 51-60 (gap offsets)
+        // 5. 71-75 (gap offsets). The gap is from 71 to 80, but the fetched 
records end at 75. These gap offsets will be acquired as a single batch
+        List<AcquiredRecords> expectedAcquiredRecord = new 
ArrayList<>(expectedAcquiredRecord(11, 20, 1));
+        expectedAcquiredRecord.addAll(expectedAcquiredRecord(31, 40, 1));
+        expectedAcquiredRecord.addAll(expectedAcquiredRecord(41, 50, 2));
+        expectedAcquiredRecord.addAll(expectedAcquiredRecord(51, 60, 1));
+        expectedAcquiredRecord.addAll(expectedAcquiredRecord(71, 75, 1));
+        assertArrayEquals(expectedAcquiredRecord.toArray(), 
acquiredRecordsList.toArray());
+
+        assertEquals(SharePartitionState.ACTIVE, 
sharePartition.partitionState());
+        assertFalse(sharePartition.cachedState().isEmpty());
+        assertEquals(11, sharePartition.startOffset());
+        assertEquals(90, sharePartition.endOffset());
+        assertEquals(3, sharePartition.stateEpoch());
+        assertEquals(76, sharePartition.nextFetchOffset());
+
+        SharePartition.InitialReadGapOffset initialReadGapOffset = 
sharePartition.initialReadGapOffset();
+        assertNotNull(initialReadGapOffset);
+
+        // After records are acquired, the initialReadGapOffset should be 
updated
+        assertEquals(76, initialReadGapOffset.gapStartOffset());
+        assertEquals(90, initialReadGapOffset.endOffset());
+    }
+
+
+    @Test
+    public void 
testAcquireWhenRecordsFetchedFromGapAndMaxFetchRecordsIsExceeded() {
+        Persister persister = Mockito.mock(Persister.class);
+        ReadShareGroupStateResult readShareGroupStateResult = 
Mockito.mock(ReadShareGroupStateResult.class);
+        
Mockito.when(readShareGroupStateResult.topicsData()).thenReturn(List.of(
+            new TopicData<>(TOPIC_ID_PARTITION.topicId(), List.of(
+                PartitionFactory.newPartitionAllData(0, 3, 11L, 
Errors.NONE.code(), Errors.NONE.message(),
+                    Arrays.asList(
+                        new PersisterStateBatch(11L, 20L, 
RecordState.ACKNOWLEDGED.id, (short) 2),
+                        new PersisterStateBatch(31L, 40L, 
RecordState.ARCHIVED.id, (short) 1) // There is a gap from 21-30
+                    ))))));
+        
Mockito.when(persister.readState(Mockito.any())).thenReturn(CompletableFuture.completedFuture(readShareGroupStateResult));
+
+        SharePartition sharePartition = 
SharePartitionBuilder.builder().withPersister(persister).build();
+
+        sharePartition.maybeInitialize();
+
+        // Creating 3 batches of records with a total of 8 records
+        ByteBuffer buffer = ByteBuffer.allocate(4096);
+        memoryRecordsBuilder(buffer, 3, 21).close();
+        memoryRecordsBuilder(buffer, 3, 24).close();
+        memoryRecordsBuilder(buffer, 2, 27).close();
+        buffer.flip();
+        MemoryRecords records = MemoryRecords.readableRecords(buffer);
+
+        List<AcquiredRecords> acquiredRecordsList = 
fetchAcquiredRecords(sharePartition.acquire(
+                MEMBER_ID,
+                BATCH_SIZE,
+                6, // maxFetchRecords is less than the number of records 
fetched
+                new FetchPartitionData(Errors.NONE, 3, 0, records,
+                    Optional.empty(), OptionalLong.empty(), Optional.empty(), 
OptionalInt.empty(), false)),
+            6);
+
+        // Since max fetch records (6) is less than the number of records 
fetched (8), only 6 records will be acquired
+        assertArrayEquals(expectedAcquiredRecord(21, 26, 1).toArray(), 
acquiredRecordsList.toArray());
+
+        assertEquals(SharePartitionState.ACTIVE, 
sharePartition.partitionState());
+        assertFalse(sharePartition.cachedState().isEmpty());
+        assertEquals(21, sharePartition.startOffset());
+        assertEquals(40, sharePartition.endOffset());
+        assertEquals(3, sharePartition.stateEpoch());
+        assertEquals(27, sharePartition.nextFetchOffset());
+
+        SharePartition.InitialReadGapOffset initialReadGapOffset = 
sharePartition.initialReadGapOffset();
+        assertNotNull(initialReadGapOffset);
+
+        assertEquals(27, initialReadGapOffset.gapStartOffset());
+        assertEquals(40, initialReadGapOffset.endOffset());
+    }
+
+    @Test
+    public void testAcquireMaxFetchRecordsExceededAfterAcquiringGaps() {
+        Persister persister = Mockito.mock(Persister.class);
+        ReadShareGroupStateResult readShareGroupStateResult = 
Mockito.mock(ReadShareGroupStateResult.class);
+        
Mockito.when(readShareGroupStateResult.topicsData()).thenReturn(List.of(
+            new TopicData<>(TOPIC_ID_PARTITION.topicId(), List.of(
+                PartitionFactory.newPartitionAllData(0, 3, 11L, 
Errors.NONE.code(), Errors.NONE.message(),
+                    Arrays.asList(
+                        new PersisterStateBatch(21L, 30L, 
RecordState.AVAILABLE.id, (short) 2), // There is a gap from 11-20
+                        new PersisterStateBatch(31L, 40L, 
RecordState.ARCHIVED.id, (short) 1)
+                    ))))));
+        
Mockito.when(persister.readState(Mockito.any())).thenReturn(CompletableFuture.completedFuture(readShareGroupStateResult));
+
+        SharePartition sharePartition = 
SharePartitionBuilder.builder().withPersister(persister).build();
+
+        sharePartition.maybeInitialize();
+
+        // Creating 3 batches of records with a total of 8 records
+        ByteBuffer buffer = ByteBuffer.allocate(4096);
+        memoryRecordsBuilder(buffer, 10, 11).close();
+        memoryRecordsBuilder(buffer, 10, 21).close();
+        buffer.flip();
+        MemoryRecords records = MemoryRecords.readableRecords(buffer);
+
+        List<AcquiredRecords> acquiredRecordsList = 
fetchAcquiredRecords(sharePartition.acquire(
+                MEMBER_ID,
+                BATCH_SIZE,
+                8, // maxFetchRecords is less than the number of records 
fetched
+                new FetchPartitionData(Errors.NONE, 3, 0, records,
+                    Optional.empty(), OptionalLong.empty(), Optional.empty(), 
OptionalInt.empty(), false)),
+            10);
+
+        assertArrayEquals(expectedAcquiredRecord(11, 20, 1).toArray(), 
acquiredRecordsList.toArray());
+
+        assertEquals(SharePartitionState.ACTIVE, 
sharePartition.partitionState());
+        assertFalse(sharePartition.cachedState().isEmpty());
+        assertEquals(11, sharePartition.startOffset());
+        assertEquals(40, sharePartition.endOffset());
+        assertEquals(3, sharePartition.stateEpoch());
+        assertEquals(21, sharePartition.nextFetchOffset());
+
+        SharePartition.InitialReadGapOffset initialReadGapOffset = 
sharePartition.initialReadGapOffset();
+        assertNotNull(initialReadGapOffset);
+
+        assertEquals(21, initialReadGapOffset.gapStartOffset());
+        assertEquals(40, initialReadGapOffset.endOffset());
+    }
+
+    @Test
+    public void testAcquireMaxFetchRecordsExceededBeforeAcquiringGaps() {
+        Persister persister = Mockito.mock(Persister.class);
+        ReadShareGroupStateResult readShareGroupStateResult = 
Mockito.mock(ReadShareGroupStateResult.class);
+        
Mockito.when(readShareGroupStateResult.topicsData()).thenReturn(List.of(
+            new TopicData<>(TOPIC_ID_PARTITION.topicId(), List.of(
+                PartitionFactory.newPartitionAllData(0, 3, 11L, 
Errors.NONE.code(), Errors.NONE.message(),
+                    Arrays.asList(
+                        new PersisterStateBatch(11L, 20L, 
RecordState.AVAILABLE.id, (short) 2),
+                        new PersisterStateBatch(31L, 40L, 
RecordState.AVAILABLE.id, (short) 1) // There is a gap from 21-30
+                    ))))));
+        
Mockito.when(persister.readState(Mockito.any())).thenReturn(CompletableFuture.completedFuture(readShareGroupStateResult));
+
+        SharePartition sharePartition = 
SharePartitionBuilder.builder().withPersister(persister).build();
+
+        sharePartition.maybeInitialize();
+
+        // Creating 3 batches of records with a total of 8 records
+        ByteBuffer buffer = ByteBuffer.allocate(4096);
+        memoryRecordsBuilder(buffer, 10, 11).close();
+        memoryRecordsBuilder(buffer, 20, 21).close();
+        buffer.flip();
+        MemoryRecords records = MemoryRecords.readableRecords(buffer);
+
+        List<AcquiredRecords> acquiredRecordsList = 
fetchAcquiredRecords(sharePartition.acquire(
+                MEMBER_ID,
+                BATCH_SIZE,
+                8, // maxFetchRecords is less than the number of records 
fetched
+                new FetchPartitionData(Errors.NONE, 3, 0, records,
+                    Optional.empty(), OptionalLong.empty(), Optional.empty(), 
OptionalInt.empty(), false)),
+            10);
+
+        assertArrayEquals(expectedAcquiredRecord(11, 20, 3).toArray(), 
acquiredRecordsList.toArray());
+
+        assertEquals(SharePartitionState.ACTIVE, 
sharePartition.partitionState());
+        assertFalse(sharePartition.cachedState().isEmpty());
+        assertEquals(11, sharePartition.startOffset());
+        assertEquals(40, sharePartition.endOffset());
+        assertEquals(3, sharePartition.stateEpoch());
+        assertEquals(21, sharePartition.nextFetchOffset());
+
+        SharePartition.InitialReadGapOffset initialReadGapOffset = 
sharePartition.initialReadGapOffset();
+        assertNotNull(initialReadGapOffset);
+
+        assertEquals(21, initialReadGapOffset.gapStartOffset());
+        assertEquals(40, initialReadGapOffset.endOffset());
+    }
+
+    @Test
+    public void 
testAcquireWhenRecordsFetchedFromGapAndPartitionContainsNaturalGaps() {
+        Persister persister = Mockito.mock(Persister.class);
+        ReadShareGroupStateResult readShareGroupStateResult = 
Mockito.mock(ReadShareGroupStateResult.class);
+        
Mockito.when(readShareGroupStateResult.topicsData()).thenReturn(List.of(
+            new TopicData<>(TOPIC_ID_PARTITION.topicId(), List.of(
+                PartitionFactory.newPartitionAllData(0, 3, 10L, 
Errors.NONE.code(), Errors.NONE.message(),
+                    Arrays.asList(
+                        new PersisterStateBatch(15L, 20L, 
RecordState.ACKNOWLEDGED.id, (short) 2), // There is a gap from 10 to 14
+                        new PersisterStateBatch(30L, 40L, 
RecordState.ARCHIVED.id, (short) 1) // There is a gap from 21-29
+                    ))))));
+        
Mockito.when(persister.readState(Mockito.any())).thenReturn(CompletableFuture.completedFuture(readShareGroupStateResult));
+
+        SharePartition sharePartition = 
SharePartitionBuilder.builder().withPersister(persister).build();
+
+        sharePartition.maybeInitialize();
+
+        ByteBuffer buffer = ByteBuffer.allocate(4096);
+        memoryRecordsBuilder(buffer, 11, 10).close();
+        memoryRecordsBuilder(buffer, 21, 30).close();
+        buffer.flip();
+        MemoryRecords records = MemoryRecords.readableRecords(buffer);
+
+        List<AcquiredRecords> acquiredRecordsList = 
fetchAcquiredRecords(sharePartition.acquire(
+                MEMBER_ID,
+                BATCH_SIZE,
+                MAX_FETCH_RECORDS,
+                new FetchPartitionData(Errors.NONE, 3, 0, records,
+                    Optional.empty(), OptionalLong.empty(), Optional.empty(), 
OptionalInt.empty(), false)),
+            24);
+
+        // Acquired batches will contain the following ->
+        // 1. 10-14 (gap offsets)
+        // 2. 21-29 (gap offsets)
+        // 3. 41-50 (gap offsets)
+        // The offsets fetched from partition include a natural gap from 21 to 
29. The cached state also contain the
+        // gap from 21 to 29. But since the broker does not parse the fetched 
records, the broker is not aware of this
+        // natural gap. In this case, the gap will be acquired, and it is the 
client's responsibility to inform the
+        // broker about this gap.
+        List<AcquiredRecords> expectedAcquiredRecord = new 
ArrayList<>(expectedAcquiredRecord(10, 14, 1));
+        expectedAcquiredRecord.addAll(expectedAcquiredRecord(21, 29, 1));
+        expectedAcquiredRecord.addAll(expectedAcquiredRecord(41, 50, 1));
+        assertArrayEquals(expectedAcquiredRecord.toArray(), 
acquiredRecordsList.toArray());
+
+        assertEquals(SharePartitionState.ACTIVE, 
sharePartition.partitionState());
+        assertFalse(sharePartition.cachedState().isEmpty());
+        assertEquals(10, sharePartition.startOffset());
+        assertEquals(50, sharePartition.endOffset());
+        assertEquals(3, sharePartition.stateEpoch());
+        assertEquals(51, sharePartition.nextFetchOffset());
+
+        SharePartition.InitialReadGapOffset initialReadGapOffset = 
sharePartition.initialReadGapOffset();
+        assertNull(initialReadGapOffset);
+    }
+
+    @Test
+    public void 
testAcquireCachedStateInitialGapMatchesWithActualPartitionGap() {
+        Persister persister = Mockito.mock(Persister.class);
+        ReadShareGroupStateResult readShareGroupStateResult = 
Mockito.mock(ReadShareGroupStateResult.class);
+        
Mockito.when(readShareGroupStateResult.topicsData()).thenReturn(List.of(
+            new TopicData<>(TOPIC_ID_PARTITION.topicId(), List.of(
+                PartitionFactory.newPartitionAllData(0, 3, 11L, 
Errors.NONE.code(), Errors.NONE.message(),
+                    Arrays.asList(
+                        new PersisterStateBatch(21L, 30L, 
RecordState.ACKNOWLEDGED.id, (short) 2), // There is a gap from 11 to 20
+                        new PersisterStateBatch(41L, 50L, 
RecordState.ARCHIVED.id, (short) 1) // There is a gap from 31-40
+                    ))))));
+        
Mockito.when(persister.readState(Mockito.any())).thenReturn(CompletableFuture.completedFuture(readShareGroupStateResult));
+
+        SharePartition sharePartition = 
SharePartitionBuilder.builder().withPersister(persister).build();
+
+        sharePartition.maybeInitialize();
+
+        // Creating 2 batches starting from 21, such that there is a natural 
gap from 11 to 20
+        ByteBuffer buffer = ByteBuffer.allocate(4096);
+        memoryRecordsBuilder(buffer, 15, 21).close();
+        memoryRecordsBuilder(buffer, 25, 36).close();
+        buffer.flip();
+        MemoryRecords records = MemoryRecords.readableRecords(buffer);
+
+        List<AcquiredRecords> acquiredRecordsList = 
fetchAcquiredRecords(sharePartition.acquire(
+                MEMBER_ID,
+                BATCH_SIZE,
+                MAX_FETCH_RECORDS,
+                new FetchPartitionData(Errors.NONE, 3, 0, records,
+                    Optional.empty(), OptionalLong.empty(), Optional.empty(), 
OptionalInt.empty(), false)),
+            20);
+
+        // Acquired batches will contain the following ->
+        // 1. 31-40 (gap offsets)
+        // 2. 51-60 (new offsets)
+        List<AcquiredRecords> expectedAcquiredRecord = new 
ArrayList<>(expectedAcquiredRecord(31, 40, 1));
+        expectedAcquiredRecord.addAll(expectedAcquiredRecord(51, 60, 1));
+        assertArrayEquals(expectedAcquiredRecord.toArray(), 
acquiredRecordsList.toArray());
 
-        assertArrayEquals(expectedAcquiredRecords(12, 13, 2).toArray(), 
acquiredRecordsList.toArray());
-        assertEquals(15, sharePartition.nextFetchOffset());
+        assertEquals(SharePartitionState.ACTIVE, 
sharePartition.partitionState());
+        assertFalse(sharePartition.cachedState().isEmpty());
+        assertEquals(11, sharePartition.startOffset());
+        assertEquals(60, sharePartition.endOffset());
+        assertEquals(3, sharePartition.stateEpoch());
+        assertEquals(61, sharePartition.nextFetchOffset());
+
+        SharePartition.InitialReadGapOffset initialReadGapOffset = 
sharePartition.initialReadGapOffset();
+        assertNull(initialReadGapOffset);
     }
 
     @Test
-    public void testAcquireReleasedRecordMultipleBatches() {
-        SharePartition sharePartition = 
SharePartitionBuilder.builder().withState(SharePartitionState.ACTIVE).build();
-        // First fetch request with 5 records starting from offset 10.
-        MemoryRecords records1 = memoryRecords(5, 10);
-        // Second fetch request with 5 records starting from offset 15.
-        MemoryRecords records2 = memoryRecords(5, 15);
-        // Third fetch request with 5 records starting from offset 23, gap of 
3 offsets.
-        MemoryRecords records3 = memoryRecords(5, 23);
-        // Fourth fetch request with 5 records starting from offset 28.
-        MemoryRecords records4 = memoryRecords(5, 28);
+    public void 
testAcquireCachedStateInitialGapOverlapsWithActualPartitionGap() {
+        Persister persister = Mockito.mock(Persister.class);
+        ReadShareGroupStateResult readShareGroupStateResult = 
Mockito.mock(ReadShareGroupStateResult.class);
+        
Mockito.when(readShareGroupStateResult.topicsData()).thenReturn(List.of(
+            new TopicData<>(TOPIC_ID_PARTITION.topicId(), List.of(
+                PartitionFactory.newPartitionAllData(0, 3, 11L, 
Errors.NONE.code(), Errors.NONE.message(),
+                    Arrays.asList(
+                        new PersisterStateBatch(21L, 30L, 
RecordState.ACKNOWLEDGED.id, (short) 2), // There is a gap from 11 to 20
+                        new PersisterStateBatch(41L, 50L, 
RecordState.ARCHIVED.id, (short) 1) // There is a gap from 31-40
+                    ))))));
+        
Mockito.when(persister.readState(Mockito.any())).thenReturn(CompletableFuture.completedFuture(readShareGroupStateResult));
+
+        SharePartition sharePartition = 
SharePartitionBuilder.builder().withPersister(persister).build();
+
+        sharePartition.maybeInitialize();
+
+        // Creating 2 batches starting from 16, such that there is a natural 
gap from 11 to 15
+        ByteBuffer buffer = ByteBuffer.allocate(4096);
+        memoryRecordsBuilder(buffer, 20, 16).close();
+        memoryRecordsBuilder(buffer, 25, 36).close();
+        buffer.flip();
+        MemoryRecords records = MemoryRecords.readableRecords(buffer);
 
         List<AcquiredRecords> acquiredRecordsList = 
fetchAcquiredRecords(sharePartition.acquire(
-            MEMBER_ID,
-            BATCH_SIZE,
-            MAX_FETCH_RECORDS,
-            new FetchPartitionData(Errors.NONE, 40, 3, records1,
-                Optional.empty(), OptionalLong.empty(), Optional.empty(), 
OptionalInt.empty(), false)),
-            5);
+                MEMBER_ID,
+                BATCH_SIZE,
+                MAX_FETCH_RECORDS,
+                new FetchPartitionData(Errors.NONE, 3, 0, records,
+                    Optional.empty(), OptionalLong.empty(), Optional.empty(), 
OptionalInt.empty(), false)),
+            25);
 
-        assertArrayEquals(expectedAcquiredRecords(records1, 1).toArray(), 
acquiredRecordsList.toArray());
-        assertEquals(15, sharePartition.nextFetchOffset());
+        // Acquired batches will contain the following ->
+        // 1. 16-20 (gap offsets)
+        // 1. 31-40 (gap offsets)
+        // 2. 51-60 (new offsets)
+        List<AcquiredRecords> expectedAcquiredRecord = new 
ArrayList<>(expectedAcquiredRecord(16, 20, 1));
+        expectedAcquiredRecord.addAll(expectedAcquiredRecord(31, 40, 1));
+        expectedAcquiredRecord.addAll(expectedAcquiredRecord(51, 60, 1));
+        assertArrayEquals(expectedAcquiredRecord.toArray(), 
acquiredRecordsList.toArray());
 
-        acquiredRecordsList = fetchAcquiredRecords(sharePartition.acquire(
-            MEMBER_ID,
-            BATCH_SIZE,
-            MAX_FETCH_RECORDS,
-            new FetchPartitionData(Errors.NONE, 30, 3, records2,
-                Optional.empty(), OptionalLong.empty(), Optional.empty(), 
OptionalInt.empty(), false)),
-            5);
+        assertEquals(SharePartitionState.ACTIVE, 
sharePartition.partitionState());
+        assertFalse(sharePartition.cachedState().isEmpty());
+        assertEquals(16, sharePartition.startOffset());
+        assertEquals(60, sharePartition.endOffset());
+        assertEquals(3, sharePartition.stateEpoch());
+        assertEquals(61, sharePartition.nextFetchOffset());
 
-        assertArrayEquals(expectedAcquiredRecords(records2, 1).toArray(), 
acquiredRecordsList.toArray());
-        assertEquals(20, sharePartition.nextFetchOffset());
+        SharePartition.InitialReadGapOffset initialReadGapOffset = 
sharePartition.initialReadGapOffset();
+        assertNull(initialReadGapOffset);
+    }
 
-        acquiredRecordsList = fetchAcquiredRecords(sharePartition.acquire(
-            MEMBER_ID,
-            BATCH_SIZE,
-            MAX_FETCH_RECORDS,
-            new FetchPartitionData(Errors.NONE, 30, 3, records3,
-                Optional.empty(), OptionalLong.empty(), Optional.empty(), 
OptionalInt.empty(), false)),
-            5);
+    @Test
+    public void 
testAcquireCachedStateGapInBetweenOverlapsWithActualPartitionGap() {
+        Persister persister = Mockito.mock(Persister.class);
+        ReadShareGroupStateResult readShareGroupStateResult = 
Mockito.mock(ReadShareGroupStateResult.class);
+        
Mockito.when(readShareGroupStateResult.topicsData()).thenReturn(List.of(
+            new TopicData<>(TOPIC_ID_PARTITION.topicId(), List.of(
+                PartitionFactory.newPartitionAllData(0, 3, 11L, 
Errors.NONE.code(), Errors.NONE.message(),
+                    Arrays.asList(
+                        new PersisterStateBatch(21L, 30L, 
RecordState.ACKNOWLEDGED.id, (short) 2), // There is a gap from 11 to 20
+                        new PersisterStateBatch(41L, 50L, 
RecordState.ARCHIVED.id, (short) 1) // There is a gap from 31-40
+                    ))))));
+        
Mockito.when(persister.readState(Mockito.any())).thenReturn(CompletableFuture.completedFuture(readShareGroupStateResult));
 
-        assertArrayEquals(expectedAcquiredRecords(records3, 1).toArray(), 
acquiredRecordsList.toArray());
-        assertEquals(28, sharePartition.nextFetchOffset());
+        SharePartition sharePartition = 
SharePartitionBuilder.builder().withPersister(persister).build();
 
-        acquiredRecordsList = fetchAcquiredRecords(sharePartition.acquire(
-            MEMBER_ID,
-            BATCH_SIZE,
-            MAX_FETCH_RECORDS,
-            new FetchPartitionData(Errors.NONE, 30, 3, records4,
-                Optional.empty(), OptionalLong.empty(), Optional.empty(), 
OptionalInt.empty(), false)),
-            5);
+        sharePartition.maybeInitialize();
 
-        assertArrayEquals(expectedAcquiredRecords(records4, 1).toArray(), 
acquiredRecordsList.toArray());
-        assertEquals(33, sharePartition.nextFetchOffset());
+        // Creating 3 batches starting from 11, such that there is a natural 
gap from 26 to 30
+        ByteBuffer buffer = ByteBuffer.allocate(4096);
+        memoryRecordsBuilder(buffer, 10, 11).close();
+        memoryRecordsBuilder(buffer, 15, 21).close();
+        memoryRecordsBuilder(buffer, 20, 41).close();
+        buffer.flip();
+        MemoryRecords records = MemoryRecords.readableRecords(buffer);
 
-        assertEquals(RecordState.ACQUIRED, 
sharePartition.cachedState().get(10L).batchState());
-        assertEquals(RecordState.ACQUIRED, 
sharePartition.cachedState().get(15L).batchState());
-        assertEquals(RecordState.ACQUIRED, 
sharePartition.cachedState().get(23L).batchState());
-        assertEquals(RecordState.ACQUIRED, 
sharePartition.cachedState().get(28L).batchState());
-        assertNull(sharePartition.cachedState().get(10L).offsetState());
-        assertNull(sharePartition.cachedState().get(15L).offsetState());
-        assertNull(sharePartition.cachedState().get(23L).offsetState());
-        assertNull(sharePartition.cachedState().get(28L).offsetState());
+        List<AcquiredRecords> acquiredRecordsList = 
fetchAcquiredRecords(sharePartition.acquire(
+                MEMBER_ID,
+                BATCH_SIZE,
+                MAX_FETCH_RECORDS,
+                new FetchPartitionData(Errors.NONE, 3, 0, records,
+                    Optional.empty(), OptionalLong.empty(), Optional.empty(), 
OptionalInt.empty(), false)),
+            30);
+
+        // Acquired batches will contain the following ->
+        // 1. 11-20 (gap offsets)
+        // 1. 31-40 (gap offsets)
+        // 2. 51-60 (new offsets)
+        // The entire gap of 31 to 40 will be acquired even when the fetched 
records only contain offsets 31 to 36 because
+        // we rely on the client to inform the broker about these natural gaps 
in the partition log
+        List<AcquiredRecords> expectedAcquiredRecord = new 
ArrayList<>(expectedAcquiredRecord(11, 20, 1));
+        expectedAcquiredRecord.addAll(expectedAcquiredRecord(31, 40, 1));
+        expectedAcquiredRecord.addAll(expectedAcquiredRecord(51, 60, 1));
+        assertArrayEquals(expectedAcquiredRecord.toArray(), 
acquiredRecordsList.toArray());
 
-        CompletableFuture<Void> ackResult = sharePartition.acknowledge(
-            MEMBER_ID,
-            Collections.singletonList(new ShareAcknowledgementBatch(12, 30, 
Collections.singletonList((byte) 2))));
-        assertNull(ackResult.join());
-        assertFalse(ackResult.isCompletedExceptionally());
+        assertEquals(SharePartitionState.ACTIVE, 
sharePartition.partitionState());
+        assertFalse(sharePartition.cachedState().isEmpty());
+        assertEquals(11, sharePartition.startOffset());
+        assertEquals(60, sharePartition.endOffset());
+        assertEquals(3, sharePartition.stateEpoch());
+        assertEquals(61, sharePartition.nextFetchOffset());
 
-        assertEquals(12, sharePartition.nextFetchOffset());
-        assertEquals(4, sharePartition.cachedState().size());
-        assertThrows(IllegalStateException.class, () -> 
sharePartition.cachedState().get(10L).batchState());
-        assertNotNull(sharePartition.cachedState().get(10L).offsetState());
-        assertEquals(RecordState.AVAILABLE, 
sharePartition.cachedState().get(15L).batchState());
-        assertNull(sharePartition.cachedState().get(15L).offsetState());
-        assertEquals(EMPTY_MEMBER_ID, 
sharePartition.cachedState().get(15L).batchMemberId());
-        assertEquals(RecordState.AVAILABLE, 
sharePartition.cachedState().get(23L).batchState());
-        assertNull(sharePartition.cachedState().get(23L).offsetState());
-        assertEquals(EMPTY_MEMBER_ID, 
sharePartition.cachedState().get(23L).batchMemberId());
-        assertThrows(IllegalStateException.class, () -> 
sharePartition.cachedState().get(28L).batchState());
-        assertNotNull(sharePartition.cachedState().get(28L).offsetState());
+        SharePartition.InitialReadGapOffset initialReadGapOffset = 
sharePartition.initialReadGapOffset();
+        assertNull(initialReadGapOffset);
+    }
 
-        Map<Long, InFlightState> expectedOffsetStateMap = new HashMap<>();
-        expectedOffsetStateMap.put(10L, new 
InFlightState(RecordState.ACQUIRED, (short) 1, MEMBER_ID));
-        expectedOffsetStateMap.put(11L, new 
InFlightState(RecordState.ACQUIRED, (short) 1, MEMBER_ID));
-        expectedOffsetStateMap.put(12L, new 
InFlightState(RecordState.AVAILABLE, (short) 1, EMPTY_MEMBER_ID));
-        expectedOffsetStateMap.put(13L, new 
InFlightState(RecordState.AVAILABLE, (short) 1, EMPTY_MEMBER_ID));
-        expectedOffsetStateMap.put(14L, new 
InFlightState(RecordState.AVAILABLE, (short) 1, EMPTY_MEMBER_ID));
-        assertEquals(expectedOffsetStateMap, 
sharePartition.cachedState().get(10L).offsetState());
+    @Test
+    public void testAcquireWhenRecordsFetchedAfterGapsAreFetched() {
+        Persister persister = Mockito.mock(Persister.class);
+        ReadShareGroupStateResult readShareGroupStateResult = 
Mockito.mock(ReadShareGroupStateResult.class);
+        
Mockito.when(readShareGroupStateResult.topicsData()).thenReturn(List.of(
+            new TopicData<>(TOPIC_ID_PARTITION.topicId(), List.of(
+                PartitionFactory.newPartitionAllData(0, 3, 11L, 
Errors.NONE.code(), Errors.NONE.message(),
+                    Arrays.asList(
+                        new PersisterStateBatch(11L, 20L, 
RecordState.ACKNOWLEDGED.id, (short) 2),
+                        new PersisterStateBatch(31L, 40L, 
RecordState.ARCHIVED.id, (short) 1) // There is a gap from 21 to 30
+                    ))))));
+        
Mockito.when(persister.readState(Mockito.any())).thenReturn(CompletableFuture.completedFuture(readShareGroupStateResult));
 
-        expectedOffsetStateMap.clear();
-        expectedOffsetStateMap.put(28L, new 
InFlightState(RecordState.AVAILABLE, (short) 1, EMPTY_MEMBER_ID));
-        expectedOffsetStateMap.put(29L, new 
InFlightState(RecordState.AVAILABLE, (short) 1, EMPTY_MEMBER_ID));
-        expectedOffsetStateMap.put(30L, new 
InFlightState(RecordState.AVAILABLE, (short) 1, EMPTY_MEMBER_ID));
-        expectedOffsetStateMap.put(31L, new 
InFlightState(RecordState.ACQUIRED, (short) 1, MEMBER_ID));
-        expectedOffsetStateMap.put(32L, new 
InFlightState(RecordState.ACQUIRED, (short) 1, MEMBER_ID));
-        assertEquals(expectedOffsetStateMap, 
sharePartition.cachedState().get(28L).offsetState());
+        WriteShareGroupStateResult writeShareGroupStateResult = 
Mockito.mock(WriteShareGroupStateResult.class);
+        
Mockito.when(writeShareGroupStateResult.topicsData()).thenReturn(List.of(
+            new TopicData<>(TOPIC_ID_PARTITION.topicId(), List.of(
+                PartitionFactory.newPartitionErrorData(0, Errors.NONE.code(), 
Errors.NONE.message())))));
+        
Mockito.when(persister.writeState(Mockito.any())).thenReturn(CompletableFuture.completedFuture(writeShareGroupStateResult));
 
-        // Send next batch from offset 12, only 3 records should be acquired.
-        acquiredRecordsList = fetchAcquiredRecords(sharePartition.acquire(
-            MEMBER_ID,
-            BATCH_SIZE,
-            MAX_FETCH_RECORDS,
-            new FetchPartitionData(Errors.NONE, 40, 3, records1,
-                Optional.empty(), OptionalLong.empty(), Optional.empty(), 
OptionalInt.empty(), false)),
-            3);
+        SharePartition sharePartition = 
SharePartitionBuilder.builder().withPersister(persister).build();
 
-        assertArrayEquals(expectedAcquiredRecords(12, 14, 2).toArray(), 
acquiredRecordsList.toArray());
-        assertEquals(15, sharePartition.nextFetchOffset());
+        sharePartition.maybeInitialize();
 
-        // Though record2 batch exists to acquire but send batch record3, it 
should be acquired but
-        // next fetch offset should not move.
-        acquiredRecordsList = fetchAcquiredRecords(sharePartition.acquire(
+        // Fetched records are from 21 to 35
+        MemoryRecords records = memoryRecords(15, 21);
+
+        List<AcquiredRecords> acquiredRecordsList = 
fetchAcquiredRecords(sharePartition.acquire(
             MEMBER_ID,
             BATCH_SIZE,
-            MAX_FETCH_RECORDS,
-            new FetchPartitionData(Errors.NONE, 40, 3, records3,
+            MAX_FETCH_RECORDS, // maxFetchRecords is less than the number of 
records fetched
+            new FetchPartitionData(Errors.NONE, 3, 0, records,
                 Optional.empty(), OptionalLong.empty(), Optional.empty(), 
OptionalInt.empty(), false)),
-            5);
+            10);
 
-        assertArrayEquals(expectedAcquiredRecords(records3, 2).toArray(), 
acquiredRecordsList.toArray());
-        assertEquals(15, sharePartition.nextFetchOffset());
+        // Since the gap if only from 21 to 30 and the next batch is ARCHIVED, 
only 10 gap offsets will be acquired as a single batch
+        assertArrayEquals(expectedAcquiredRecord(21, 30, 1).toArray(), 
acquiredRecordsList.toArray());
 
-        // Acquire partial records from batch 2.
-        MemoryRecords subsetRecords = memoryRecords(2, 17);
-        acquiredRecordsList = fetchAcquiredRecords(sharePartition.acquire(
-            MEMBER_ID,
-            BATCH_SIZE,
-            MAX_FETCH_RECORDS,
-            new FetchPartitionData(Errors.NONE, 20, 3, subsetRecords,
-                Optional.empty(), OptionalLong.empty(), Optional.empty(), 
OptionalInt.empty(), false)),
-            2);
+        assertEquals(SharePartitionState.ACTIVE, 
sharePartition.partitionState());
+        assertFalse(sharePartition.cachedState().isEmpty());
+        assertEquals(21, sharePartition.startOffset());
+        assertEquals(40, sharePartition.endOffset());
+        assertEquals(3, sharePartition.stateEpoch());
+        assertEquals(41, sharePartition.nextFetchOffset());
 
-        assertArrayEquals(expectedAcquiredRecords(17, 18, 2).toArray(), 
acquiredRecordsList.toArray());
-        // Next fetch offset should not move.
-        assertEquals(15, sharePartition.nextFetchOffset());
+        SharePartition.InitialReadGapOffset initialReadGapOffset = 
sharePartition.initialReadGapOffset();
+        assertNotNull(initialReadGapOffset);
 
-        // Acquire partial records from record 4 to further test if the next 
fetch offset move
-        // accordingly once complete record 2 is also acquired.
-        subsetRecords = memoryRecords(1, 28);
-        acquiredRecordsList = fetchAcquiredRecords(sharePartition.acquire(
-            MEMBER_ID,
-            BATCH_SIZE,
-            MAX_FETCH_RECORDS,
-            new FetchPartitionData(Errors.NONE, 20, 3, subsetRecords,
-                Optional.empty(), OptionalLong.empty(), Optional.empty(), 
OptionalInt.empty(), false)),
-            1);
+        assertEquals(31, initialReadGapOffset.gapStartOffset());
+        assertEquals(40, initialReadGapOffset.endOffset());
 
-        assertArrayEquals(expectedAcquiredRecords(28, 28, 2).toArray(), 
acquiredRecordsList.toArray());
-        // Next fetch offset should not move.
-        assertEquals(15, sharePartition.nextFetchOffset());
+        // Fetching from the  nextFetchOffset so that endOffset moves ahead
+        records = memoryRecords(15, 41);
 
-        // Try to acquire complete record 2 though it's already partially 
acquired, the next fetch
-        // offset should move.
         acquiredRecordsList = fetchAcquiredRecords(sharePartition.acquire(
-            MEMBER_ID,
-            BATCH_SIZE,
-            MAX_FETCH_RECORDS,
-            new FetchPartitionData(Errors.NONE, 20, 3, records2,
-                Optional.empty(), OptionalLong.empty(), Optional.empty(), 
OptionalInt.empty(), false)),
-            3);
+                MEMBER_ID,
+                BATCH_SIZE,
+                MAX_FETCH_RECORDS, // maxFetchRecords is less than the number 
of records fetched
+                new FetchPartitionData(Errors.NONE, 3, 0, records,
+                    Optional.empty(), OptionalLong.empty(), Optional.empty(), 
OptionalInt.empty(), false)),
+            15);
 
-        // Offset 15,16 and 19 should be acquired.
-        List<AcquiredRecords> expectedAcquiredRecords = 
expectedAcquiredRecords(15, 16, 2);
-        expectedAcquiredRecords.addAll(expectedAcquiredRecords(19, 19, 2));
-        assertArrayEquals(expectedAcquiredRecords.toArray(), 
acquiredRecordsList.toArray());
-        // Next fetch offset should not move.
-        assertEquals(29, sharePartition.nextFetchOffset());
+        assertArrayEquals(expectedAcquiredRecord(41, 55, 1).toArray(), 
acquiredRecordsList.toArray());
+
+        assertEquals(SharePartitionState.ACTIVE, 
sharePartition.partitionState());
+        assertFalse(sharePartition.cachedState().isEmpty());
+        assertEquals(21, sharePartition.startOffset());
+        assertEquals(55, sharePartition.endOffset());
+        assertEquals(3, sharePartition.stateEpoch());
+        assertEquals(56, sharePartition.nextFetchOffset());
+
+        // Since the endOffset is now moved ahead, the initialReadGapOffset 
should be empty
+        initialReadGapOffset = sharePartition.initialReadGapOffset();
+        assertNull(initialReadGapOffset);
     }
 
     @Test
@@ -5193,6 +6157,55 @@ public class SharePartitionTest {
         assertEquals(200, sharePartition.nextFetchOffset());
     }
 
+    @Test
+    public void testMaybeUpdateCachedStateGapAfterLastOffsetAcknowledged() {
+        Persister persister = Mockito.mock(Persister.class);
+        ReadShareGroupStateResult readShareGroupStateResult = 
Mockito.mock(ReadShareGroupStateResult.class);
+        
Mockito.when(readShareGroupStateResult.topicsData()).thenReturn(List.of(
+            new TopicData<>(TOPIC_ID_PARTITION.topicId(), List.of(
+                PartitionFactory.newPartitionAllData(0, 3, 11L, 
Errors.NONE.code(), Errors.NONE.message(),
+                    Arrays.asList(
+                        new PersisterStateBatch(11L, 20L, 
RecordState.AVAILABLE.id, (short) 2),
+                        new PersisterStateBatch(31L, 40L, 
RecordState.ARCHIVED.id, (short) 1) // There is a gap from 21 to 30
+                    ))))));
+        
Mockito.when(persister.readState(Mockito.any())).thenReturn(CompletableFuture.completedFuture(readShareGroupStateResult));
+
+        WriteShareGroupStateResult writeShareGroupStateResult = 
Mockito.mock(WriteShareGroupStateResult.class);
+        
Mockito.when(writeShareGroupStateResult.topicsData()).thenReturn(List.of(
+            new TopicData<>(TOPIC_ID_PARTITION.topicId(), List.of(
+                PartitionFactory.newPartitionErrorData(0, Errors.NONE.code(), 
Errors.NONE.message())))));
+        
Mockito.when(persister.writeState(Mockito.any())).thenReturn(CompletableFuture.completedFuture(writeShareGroupStateResult));
+
+        SharePartition sharePartition = 
SharePartitionBuilder.builder().withPersister(persister).build();
+
+        sharePartition.maybeInitialize();
+
+        // Acquiring the first AVAILABLE batch from 11 to 20
+        sharePartition.acquire(MEMBER_ID, BATCH_SIZE, MAX_FETCH_RECORDS, new 
FetchPartitionData(Errors.NONE, 20, 0, memoryRecords(10, 11),
+            Optional.empty(), OptionalLong.empty(), Optional.empty(),
+            OptionalInt.empty(), false));
+        assertTrue(sharePartition.canAcquireRecords());
+
+        // Sending acknowledgment for the first batch from 11 to 20
+        sharePartition.acknowledge(MEMBER_ID, List.of(
+            new ShareAcknowledgementBatch(11, 20, List.of((byte) 1))));
+
+        assertTrue(sharePartition.canAcquireRecords());
+        // After the acknowledgement is done successfully, 
maybeUpdateCachedStateAndOffsets method is invoked to see
+        // if the start offset can be moved ahead. The last offset 
acknowledged is 20. But instead of moving start
+        // offset to the next batch in the cached state (31 to 40), it is 
moved to the next offset of the last
+        // acknowledged offset (21). This is because there is an acquirable 
gap in the cached state from 21 to 30.
+        assertEquals(21, sharePartition.startOffset());
+        assertEquals(40, sharePartition.endOffset());
+        assertEquals(21, sharePartition.nextFetchOffset());
+
+        SharePartition.InitialReadGapOffset initialReadGapOffset = 
sharePartition.initialReadGapOffset();
+        assertNotNull(initialReadGapOffset);
+
+        assertEquals(21, initialReadGapOffset.gapStartOffset());
+        assertEquals(40, initialReadGapOffset.endOffset());
+    }
+
     @Test
     public void testCanAcquireRecordsReturnsTrue() {
         SharePartition sharePartition = 
SharePartitionBuilder.builder().withState(SharePartitionState.ACTIVE).build();
@@ -5749,16 +6762,16 @@ public class SharePartitionTest {
     @Test
     public void 
testMaybeInitializeWhenReadStateRpcReturnsZeroAvailableRecords() {
         List<PersisterStateBatch> stateBatches = new ArrayList<>();
+        stateBatches.add(new PersisterStateBatch(233L, 233L, 
RecordState.ARCHIVED.id, (short) 1));
         for (int i = 0; i < 500; i++) {
             stateBatches.add(new PersisterStateBatch(234L + i, 234L + i, 
RecordState.ACKNOWLEDGED.id, (short) 1));
         }
-        stateBatches.add(new PersisterStateBatch(232L, 232L, 
RecordState.ARCHIVED.id, (short) 1));
 
         Persister persister = Mockito.mock(Persister.class);
         ReadShareGroupStateResult readShareGroupStateResult = 
Mockito.mock(ReadShareGroupStateResult.class);
         
Mockito.when(readShareGroupStateResult.topicsData()).thenReturn(Collections.singletonList(
                 new TopicData<>(TOPIC_ID_PARTITION.topicId(), 
Collections.singletonList(
-                        PartitionFactory.newPartitionAllData(0, 3, 232L, 
Errors.NONE.code(), Errors.NONE.message(),
+                        PartitionFactory.newPartitionAllData(0, 3, 233L, 
Errors.NONE.code(), Errors.NONE.message(),
                                 stateBatches)))));
         
Mockito.when(persister.readState(Mockito.any())).thenReturn(CompletableFuture.completedFuture(readShareGroupStateResult));
         SharePartition sharePartition = 
SharePartitionBuilder.builder().withPersister(persister).build();
@@ -5845,6 +6858,37 @@ public class SharePartitionTest {
         assertEquals(RecordState.ACQUIRED, 
sharePartition.cachedState().get(10L).batchState());
     }
 
+    @Test
+    public void testFindLastOffsetAcknowledgedWhenGapAtBeginning() {
+        Persister persister = Mockito.mock(Persister.class);
+        ReadShareGroupStateResult readShareGroupStateResult = 
Mockito.mock(ReadShareGroupStateResult.class);
+        
Mockito.when(readShareGroupStateResult.topicsData()).thenReturn(List.of(
+            new TopicData<>(TOPIC_ID_PARTITION.topicId(), List.of(
+                PartitionFactory.newPartitionAllData(0, 3, 11L, 
Errors.NONE.code(), Errors.NONE.message(),
+                    Arrays.asList(
+                        new PersisterStateBatch(21L, 30L, 
RecordState.ACKNOWLEDGED.id, (short) 2), // There is a gap from 11 to 20
+                        new PersisterStateBatch(31L, 40L, 
RecordState.ARCHIVED.id, (short) 1)
+                    ))))));
+        
Mockito.when(persister.readState(Mockito.any())).thenReturn(CompletableFuture.completedFuture(readShareGroupStateResult));
+
+        SharePartition sharePartition = 
SharePartitionBuilder.builder().withPersister(persister).build();
+
+        sharePartition.maybeInitialize();
+
+        SharePartition.InitialReadGapOffset initialReadGapOffset = 
sharePartition.initialReadGapOffset();
+        assertNotNull(initialReadGapOffset);
+
+        // Since there is a gap in the beginning, the initialReadGapOffset 
window is same as the cachedState
+        assertEquals(11, initialReadGapOffset.gapStartOffset());
+        assertEquals(40, initialReadGapOffset.endOffset());
+
+        long lastOffsetAcknowledged = 
sharePartition.findLastOffsetAcknowledged();
+
+        // Since the initialReadGapOffset window begins at startOffset, we 
cannot count any of the offsets as acknowledged.
+        // Thus, lastOffsetAcknowledged should be -1
+        assertEquals(-1, lastOffsetAcknowledged);
+    }
+
     private List<AcquiredRecords> fetchAcquiredRecords(ShareAcquiredRecords 
shareAcquiredRecords, int expectedOffsetCount) {
         assertNotNull(shareAcquiredRecords);
         assertEquals(expectedOffsetCount, shareAcquiredRecords.count());

Reply via email to