This is an automated email from the ASF dual-hosted git repository.

schofielaj pushed a commit to branch trunk
in repository https://gitbox.apache.org/repos/asf/kafka.git


The following commit(s) were added to refs/heads/trunk by this push:
     new df5839a9f43 KAFKA-17351: Improved handling of compacted topics in 
share partition (2/N) (#19010)
df5839a9f43 is described below

commit df5839a9f43fd0e66e553a468ea04c0999778a78
Author: Apoorv Mittal <[email protected]>
AuthorDate: Tue Feb 25 14:11:39 2025 +0000

    KAFKA-17351: Improved handling of compacted topics in share partition (2/N) 
(#19010)
    
    The PR handles fetch for `compacted` topics. The fix was required only
    when complete batch disappears from the topic log, and same batch is
    marked re-available in Share Partition state cache. Subsequent log reads
    will not result the disappeared batch in read response hence respective
    batch will be left as available in the state cache.
    
    The PR checks for the first fetched/read batch base offset and if it's
    greater than the position from where the read occurred (fetch offset)
    then if there exists any `available` batches in the state cache then
    they will be archived.
    
    Reviewers: Andrew Schofield <[email protected]>, Abhinav Dixit 
<[email protected]>
---
 .../java/kafka/server/share/SharePartition.java    | 139 +++++++-
 .../kafka/server/share/SharePartitionTest.java     | 351 ++++++++++++++++++++-
 .../acknowledge/ShareAcknowledgementBatch.java     |  29 +-
 3 files changed, 465 insertions(+), 54 deletions(-)

diff --git a/core/src/main/java/kafka/server/share/SharePartition.java 
b/core/src/main/java/kafka/server/share/SharePartition.java
index c158dc8019e..9c358cf4c1e 100644
--- a/core/src/main/java/kafka/server/share/SharePartition.java
+++ b/core/src/main/java/kafka/server/share/SharePartition.java
@@ -615,13 +615,44 @@ public class SharePartition {
      * Acquire the fetched records for the share partition. The acquired 
records are added to the
      * in-flight records and the next fetch offset is updated to the next 
offset that should be
      * fetched from the leader.
+     * <p>
+     * The method always acquire the full batch records. The cache state can 
consist of multiple
+     * full batches as a single batch. This behavior is driven by client 
configurations (batch size
+     * and max fetch records) and allows for efficient client 
acknowledgements. However, partial batches
+     * can exist in the cache only after a leader change and partial 
acknowledgements have been persisted
+     * prior leader change. In such case, when a share partition loses track 
of a batch's start and
+     * end offsets (e.g., after a leader change and partial acknowledgements), 
the cache stores the
+     * batch based on the offset range provided by the persister. This method 
handles these special
+     * batches by maintaining this range up to the last offset returned by the 
persister.  No special
+     * handling is required afterward; the cache will eventually return to 
managing full batches.
+     * <p>
+     * For compacted topics, batches may be non-contiguous, and records within 
cached batches may contain gaps.
+     * Because this method operates at the batch level, it acquires entire 
batches and relies on the
+     * client to report any gaps in the data. Whether non-contiguous batches 
are acquired depends on
+     * the first and last offsets of the fetched batches. Batches outside of 
this boundary will never
+     * be acquired. For instance, if fetched batches cover offsets [0-9 and 
20-29], and the configured
+     * batch size and maximum fetch records are large enough (greater than 30 
in this example), the
+     * intervening batch [10-19] will be acquired. Since full fetched batch is 
acquired, the client is
+     * responsible for reporting any data gaps. However, if the [0-9] and 
[20-29] ranges are fetched
+     * in separate calls to this method, the [10-19] batch will not be 
acquired and cannot exist in
+     * the cache.
+     * <p>
+     * However, for compacted topics, previously acquired batches (e.g., due 
to acquisition lock timeout
+     * or explicit client release) might become available for acquisition 
again. But subsequent fetches
+     * may reveal that these batches, or parts of them, have been removed by 
compaction. Because this
+     * method works with whole batches, the disappearance of individual 
offsets within a batch requires
+     * no special handling; the batch will be re-acquired, and the client will 
report the gaps. But if
+     * an entire batch has been compacted away, this method must archive it in 
the cache to allow the
+     * Share Partition Start Offset (SPSO) to progress. This is accomplished 
by comparing the fetchOffset
+     * (the offset from which the log was read) with the first base offset of 
the fetch response. Any
+     * batches from fetchOffset to first base offset of the fetch response are 
archived.
      *
      * @param memberId           The member id of the client that is fetching 
the record.
      * @param batchSize          The number of records per acquired records 
batch.
      * @param maxFetchRecords    The maximum number of records that should be 
acquired, this is a soft
      *                           limit and the method might acquire more 
records than the maxFetchRecords,
      *                           if the records are already part of the same 
fetch batch.
-*    * @param fetchOffset        The fetch offset for which the records are 
fetched.
+     * @param fetchOffset        The fetch offset for which the records are 
fetched.
      * @param fetchPartitionData The fetched records for the share partition.
      * @return The acquired records for the share partition.
      */
@@ -630,7 +661,7 @@ public class SharePartition {
         String memberId,
         int batchSize,
         int maxFetchRecords,
-        long fetchOffset /* TODO: Use fetch offset to archive any stale 
batches due to compaction */,
+        long fetchOffset,
         FetchPartitionData fetchPartitionData
     ) {
         log.trace("Received acquire request for share partition: {}-{} 
memberId: {}", groupId, topicIdPartition, memberId);
@@ -651,15 +682,23 @@ public class SharePartition {
         lock.writeLock().lock();
         try {
             long baseOffset = firstBatch.baseOffset();
+
+            // There might be cached batches which are stale due to topic 
compaction hence archive them.
+            maybeArchiveStaleBatches(fetchOffset, baseOffset);
+
             // Find the floor batch record for the request batch. The request 
batch could be
             // for a subset of the in-flight batch i.e. cached batch of offset 
10-14 and request batch
             // of 12-13. Hence, floor entry is fetched to find the sub-map.
-            Map.Entry<Long, InFlightBatch> floorOffset = 
cachedState.floorEntry(baseOffset);
+            Map.Entry<Long, InFlightBatch> floorEntry = 
cachedState.floorEntry(baseOffset);
             // We might find a batch with floor entry but not necessarily that 
batch has an overlap,
             // if the request batch base offset is ahead of last offset from 
floor entry i.e. cached
             // batch of 10-14 and request batch of 15-18, though floor entry 
is found but no overlap.
-            if (floorOffset != null && floorOffset.getValue().lastOffset() >= 
baseOffset) {
-                baseOffset = floorOffset.getKey();
+            // Such scenario will be handled in the next step when considering 
the subMap. However,
+            // if the floor entry is found and the request batch base offset 
is within the floor entry
+            // then adjust the base offset to the floor entry so that acquire 
method can still work on
+            // previously cached batch boundaries.
+            if (floorEntry != null && floorEntry.getValue().lastOffset() >= 
baseOffset) {
+                baseOffset = floorEntry.getKey();
             }
             // Validate if the fetch records are already part of existing 
batches and if available.
             NavigableMap<Long, InFlightBatch> subMap = 
cachedState.subMap(baseOffset, true, lastBatch.lastOffset(), true);
@@ -1061,25 +1100,97 @@ public class SharePartition {
         }
     }
 
+    /**
+     * The method archives the available records in the cached state that are 
between the fetch offset
+     * and the base offset of the first fetched batch. This method is required 
to handle the compacted
+     * topics where the already fetched batch which is marked re-available, 
might not result in subsequent
+     * fetch response from log. Hence, the batches need to be archived to 
allow the SPSO and next fetch
+     * offset to progress.
+     *
+     * @param fetchOffset The fetch offset.
+     * @param baseOffset  The base offset of the first fetched batch.
+     */
+    private void maybeArchiveStaleBatches(long fetchOffset, long baseOffset) {
+        lock.writeLock().lock();
+        try {
+            // If the fetch happens from within a batch then fetchOffset can 
be ahead of base offset else
+            // should be same as baseOffset of the first fetched batch. 
Otherwise, we might need to archive
+            // some stale batches.
+            if (cachedState.isEmpty() || fetchOffset >= baseOffset) {
+                // No stale batches to archive.
+                return;
+            }
+
+            // The fetch offset can exist in the middle of the batch. Hence, 
find the floor offset
+            // for the fetch offset and then find the sub-map from the floor 
offset to the base offset.
+            long floorOffset = fetchOffset;
+            Map.Entry<Long, InFlightBatch> floorEntry = 
cachedState.floorEntry(fetchOffset);
+            if (floorEntry != null && floorEntry.getValue().lastOffset() >= 
fetchOffset) {
+                floorOffset = floorEntry.getKey();
+            }
+
+            NavigableMap<Long, InFlightBatch> subMap = 
cachedState.subMap(floorOffset, true, baseOffset, false);
+            if (subMap.isEmpty()) {
+                // No stale batches to archive.
+                return;
+            }
+
+            // Though such batches can be removed from the cache, but it is 
better to archive them so
+            // that they are never acquired again.
+            boolean anyRecordArchived = archiveAvailableRecords(fetchOffset, 
baseOffset, subMap);
+
+            // If we have transitioned the state of any batch/offset from 
AVAILABLE to ARCHIVED,
+            // then there is a chance that the next fetch offset can change.
+            if (anyRecordArchived) {
+                findNextFetchOffset.set(true);
+            }
+        } finally {
+            lock.writeLock().unlock();
+        }
+    }
+
+    /**
+     * The method archives the available records in the cached state that are 
before the log start offset.
+     *
+     * @param logStartOffset The log start offset.
+     * @return A boolean which indicates whether any record is archived or not.
+     */
     private boolean archiveAvailableRecordsOnLsoMovement(long logStartOffset) {
+        lock.writeLock().lock();
+        try {
+            return archiveAvailableRecords(startOffset, logStartOffset, 
cachedState);
+        } finally {
+            lock.writeLock().unlock();
+        }
+    }
+
+    /**
+     * The method archive the available records in the given map that are 
before the end offset.
+     *
+     * @param startOffset The offset from which the available records should 
be archived.
+     * @param endOffset The offset before which the available records should 
be archived.
+     * @param map The map containing the in-flight records.
+     * @return A boolean which indicates whether any record is archived or not.
+     */
+    private boolean archiveAvailableRecords(long startOffset, long endOffset, 
NavigableMap<Long, InFlightBatch> map) {
         lock.writeLock().lock();
         try {
             boolean isAnyOffsetArchived = false, isAnyBatchArchived = false;
-            for (Map.Entry<Long, InFlightBatch> entry : 
cachedState.entrySet()) {
+            for (Map.Entry<Long, InFlightBatch> entry : map.entrySet()) {
                 long batchStartOffset = entry.getKey();
-                // We do not need to transition state of batches/offsets that 
are later than the new log start offset.
-                if (batchStartOffset >= logStartOffset) {
+                // We do not need to transition state of batches/offsets that 
are later than the offset.
+                if (batchStartOffset >= endOffset) {
                     break;
                 }
                 InFlightBatch inFlightBatch = entry.getValue();
-                boolean fullMatch = checkForFullMatch(inFlightBatch, 
startOffset, logStartOffset - 1);
+                boolean fullMatch = checkForFullMatch(inFlightBatch, 
startOffset, endOffset - 1);
 
                 // Maintain state per offset if the inflight batch is not a 
full match or the offset state is managed.
                 if (!fullMatch || inFlightBatch.offsetState() != null) {
-                    log.debug("Subset or offset tracked batch record found 
while trying to update offsets and cached" +
-                                    " state map due to LSO movement, batch: 
{}, offsets to update - " +
-                                    "first: {}, last: {} for the share 
partition: {}-{}", inFlightBatch, startOffset,
-                            logStartOffset - 1, groupId, topicIdPartition);
+                    log.debug("Subset or offset tracked batch record found 
while trying to update offsets "
+                        + "and cached state map, batch: {}, offsets to update 
- first: {}, last: {} "
+                        + "for the share partition: {}-{}", inFlightBatch, 
startOffset, endOffset - 1,
+                        groupId, topicIdPartition);
 
                     if (inFlightBatch.offsetState() == null) {
                         if (inFlightBatch.batchState() != 
RecordState.AVAILABLE) {
@@ -1087,7 +1198,7 @@ public class SharePartition {
                         }
                         inFlightBatch.maybeInitializeOffsetStateUpdate();
                     }
-                    isAnyOffsetArchived = isAnyOffsetArchived || 
archivePerOffsetBatchRecords(inFlightBatch, startOffset, logStartOffset - 1);
+                    isAnyOffsetArchived = isAnyOffsetArchived || 
archivePerOffsetBatchRecords(inFlightBatch, startOffset, endOffset - 1);
                     continue;
                 }
                 // The in-flight batch is a full match hence change the state 
of the complete batch.
diff --git a/core/src/test/java/kafka/server/share/SharePartitionTest.java 
b/core/src/test/java/kafka/server/share/SharePartitionTest.java
index 6257e87f96e..fec9b346cbb 100644
--- a/core/src/test/java/kafka/server/share/SharePartitionTest.java
+++ b/core/src/test/java/kafka/server/share/SharePartitionTest.java
@@ -22,9 +22,11 @@ import kafka.server.share.SharePartition.RecordState;
 import kafka.server.share.SharePartition.SharePartitionState;
 import kafka.server.share.SharePartitionManager.SharePartitionListener;
 
+import org.apache.kafka.clients.consumer.AcknowledgeType;
 import org.apache.kafka.common.TopicIdPartition;
 import org.apache.kafka.common.TopicPartition;
 import org.apache.kafka.common.Uuid;
+import org.apache.kafka.common.compress.Compression;
 import org.apache.kafka.common.errors.CoordinatorNotAvailableException;
 import org.apache.kafka.common.errors.FencedStateEpochException;
 import org.apache.kafka.common.errors.GroupIdNotFoundException;
@@ -39,6 +41,7 @@ import org.apache.kafka.common.record.FileRecords;
 import org.apache.kafka.common.record.MemoryRecords;
 import org.apache.kafka.common.record.MemoryRecordsBuilder;
 import org.apache.kafka.common.record.Records;
+import org.apache.kafka.common.record.TimestampType;
 import org.apache.kafka.common.requests.ListOffsetsRequest;
 import org.apache.kafka.common.utils.MockTime;
 import org.apache.kafka.common.utils.Time;
@@ -109,6 +112,7 @@ public class SharePartitionTest {
     private static final int BATCH_SIZE = 500;
     private static final int DEFAULT_FETCH_OFFSET = 0;
     private static final int MAX_FETCH_RECORDS = Integer.MAX_VALUE;
+    private static final byte ACKNOWLEDGE_TYPE_GAP_ID = 0;
 
     @BeforeEach
     public void setUp() {
@@ -1224,7 +1228,15 @@ public class SharePartitionTest {
     @Test
     public void testAcquireWithEmptyFetchRecords() {
         SharePartition sharePartition = 
SharePartitionBuilder.builder().withState(SharePartitionState.ACTIVE).build();
-        List<AcquiredRecords> acquiredRecordsList = 
fetchAcquiredRecords(sharePartition, MemoryRecords.EMPTY, 0);
+        List<AcquiredRecords> acquiredRecordsList = fetchAcquiredRecords(
+            sharePartition.acquire(
+                MEMBER_ID,
+                BATCH_SIZE,
+                MAX_FETCH_RECORDS,
+                DEFAULT_FETCH_OFFSET,
+                fetchPartitionData(MemoryRecords.EMPTY)),
+            0
+        );
 
         assertEquals(0, acquiredRecordsList.size());
         assertEquals(0, sharePartition.nextFetchOffset());
@@ -3585,8 +3597,8 @@ public class SharePartitionTest {
         recordsBuilder.appendWithOffset(20, 0L, 
TestUtils.randomString(10).getBytes(), TestUtils.randomString(10).getBytes());
         MemoryRecords records2 = recordsBuilder.build();
 
-        sharePartition.acquire("member-2", BATCH_SIZE, MAX_FETCH_RECORDS, 
DEFAULT_FETCH_OFFSET, fetchPartitionData(records1));
-        sharePartition.acquire(MEMBER_ID, BATCH_SIZE, MAX_FETCH_RECORDS, 
DEFAULT_FETCH_OFFSET, fetchPartitionData(records2));
+        sharePartition.acquire("member-2", BATCH_SIZE, MAX_FETCH_RECORDS, 5, 
fetchPartitionData(records1));
+        sharePartition.acquire(MEMBER_ID, BATCH_SIZE, MAX_FETCH_RECORDS, 10, 
fetchPartitionData(records2));
 
         // Acknowledging over subset of second batch with subset of gap 
offsets.
         sharePartition.acknowledge(MEMBER_ID, Collections.singletonList(new 
ShareAcknowledgementBatch(10, 18, Arrays.asList(
@@ -3655,8 +3667,8 @@ public class SharePartitionTest {
         recordsBuilder.appendWithOffset(20, 0L, 
TestUtils.randomString(10).getBytes(), TestUtils.randomString(10).getBytes());
         MemoryRecords records2 = recordsBuilder.build();
 
-        sharePartition.acquire("member-2", BATCH_SIZE, MAX_FETCH_RECORDS, 
DEFAULT_FETCH_OFFSET, fetchPartitionData(records1));
-        sharePartition.acquire(MEMBER_ID, BATCH_SIZE, MAX_FETCH_RECORDS, 
DEFAULT_FETCH_OFFSET, fetchPartitionData(records2));
+        sharePartition.acquire("member-2", BATCH_SIZE, MAX_FETCH_RECORDS, 5, 
fetchPartitionData(records1));
+        sharePartition.acquire(MEMBER_ID, BATCH_SIZE, MAX_FETCH_RECORDS, 10, 
fetchPartitionData(records2));
 
         // Acknowledging over subset of second batch with subset of gap 
offsets.
         sharePartition.acknowledge(MEMBER_ID, Collections.singletonList(new 
ShareAcknowledgementBatch(10, 18, Arrays.asList(
@@ -4588,7 +4600,7 @@ public class SharePartitionTest {
         fetchAcquiredRecords(sharePartition, memoryRecords(5, 5), 5);
         fetchAcquiredRecords(sharePartition, memoryRecords(5, 10), 5);
 
-        sharePartition.acquire("member-2", BATCH_SIZE, MAX_FETCH_RECORDS, 
DEFAULT_FETCH_OFFSET, fetchPartitionData(memoryRecords(5, 15)));
+        sharePartition.acquire("member-2", BATCH_SIZE, MAX_FETCH_RECORDS, 15, 
fetchPartitionData(memoryRecords(5, 15)));
 
         fetchAcquiredRecords(sharePartition, memoryRecords(5, 20), 5);
         fetchAcquiredRecords(sharePartition, memoryRecords(5, 25), 5);
@@ -4726,7 +4738,7 @@ public class SharePartitionTest {
         fetchAcquiredRecords(sharePartition, memoryRecords(5, 5), 5);
         fetchAcquiredRecords(sharePartition, memoryRecords(5, 10), 5);
 
-        sharePartition.acquire("member-2", BATCH_SIZE, MAX_FETCH_RECORDS, 
DEFAULT_FETCH_OFFSET, fetchPartitionData(memoryRecords(5, 15)));
+        sharePartition.acquire("member-2", BATCH_SIZE, MAX_FETCH_RECORDS, 15, 
fetchPartitionData(memoryRecords(5, 15)));
 
         fetchAcquiredRecords(sharePartition, memoryRecords(5, 20), 5);
         fetchAcquiredRecords(sharePartition, memoryRecords(5, 25), 5);
@@ -5804,7 +5816,7 @@ public class SharePartitionTest {
 
         fetchAcquiredRecords(sharePartition, memoryRecords(5, 5), 5);
 
-        sharePartition.acquire("member-2", BATCH_SIZE, MAX_FETCH_RECORDS, 
DEFAULT_FETCH_OFFSET, fetchPartitionData(memoryRecords(5, 10)));
+        sharePartition.acquire("member-2", BATCH_SIZE, MAX_FETCH_RECORDS, 10, 
fetchPartitionData(memoryRecords(5, 10)));
 
         fetchAcquiredRecords(sharePartition, memoryRecords(5, 15), 5);
 
@@ -5835,7 +5847,7 @@ public class SharePartitionTest {
 
         fetchAcquiredRecords(sharePartition, memoryRecords(5, 5), 5);
         fetchAcquiredRecords(sharePartition, memoryRecords(5, 10), 5);
-        sharePartition.acquire("member-2", BATCH_SIZE, MAX_FETCH_RECORDS, 
DEFAULT_FETCH_OFFSET, fetchPartitionData(memoryRecords(5, 15)));
+        sharePartition.acquire("member-2", BATCH_SIZE, MAX_FETCH_RECORDS, 15, 
fetchPartitionData(memoryRecords(5, 15)));
 
         CompletableFuture<Void> ackResult = 
sharePartition.acknowledge(MEMBER_ID, Arrays.asList(
                 new ShareAcknowledgementBatch(5, 9, 
Collections.singletonList((byte) 2)),
@@ -5957,7 +5969,7 @@ public class SharePartitionTest {
         assertFalse(sharePartition.findNextFetchOffset());
         assertEquals(10, sharePartition.nextFetchOffset());
 
-        sharePartition.acquire(memberId2, BATCH_SIZE, MAX_FETCH_RECORDS, 
DEFAULT_FETCH_OFFSET, fetchPartitionData(memoryRecords(10, 10)));
+        sharePartition.acquire(memberId2, BATCH_SIZE, MAX_FETCH_RECORDS, 10, 
fetchPartitionData(memoryRecords(10, 10)));
 
         assertFalse(sharePartition.findNextFetchOffset());
         assertEquals(20, sharePartition.nextFetchOffset());
@@ -5992,7 +6004,7 @@ public class SharePartitionTest {
                 new ShareAcknowledgementBatch(0, 2, 
Collections.singletonList((byte) 2))));
         assertEquals(0, sharePartition.nextFetchOffset());
 
-        sharePartition.acquire(memberId2, BATCH_SIZE, MAX_FETCH_RECORDS, 
DEFAULT_FETCH_OFFSET, fetchPartitionData(memoryRecords(2, 3)));
+        sharePartition.acquire(memberId2, BATCH_SIZE, MAX_FETCH_RECORDS, 3, 
fetchPartitionData(memoryRecords(2, 3)));
         assertEquals(0, sharePartition.nextFetchOffset());
 
         sharePartition.acquire(memberId1, BATCH_SIZE, MAX_FETCH_RECORDS, 
DEFAULT_FETCH_OFFSET, fetchPartitionData(records1));
@@ -6036,11 +6048,11 @@ public class SharePartitionTest {
                 new ShareAcknowledgementBatch(17, 20, 
Collections.singletonList((byte) 2))));
 
         // Reacquire with another member.
-        sharePartition.acquire("member-2", BATCH_SIZE, MAX_FETCH_RECORDS, 
DEFAULT_FETCH_OFFSET, fetchPartitionData(records1));
+        sharePartition.acquire("member-2", BATCH_SIZE, MAX_FETCH_RECORDS, 5, 
fetchPartitionData(records1));
         assertEquals(10, sharePartition.nextFetchOffset());
 
         // Reacquire with another member.
-        sharePartition.acquire("member-2", BATCH_SIZE, MAX_FETCH_RECORDS, 
DEFAULT_FETCH_OFFSET, fetchPartitionData(memoryRecords(7, 10)));
+        sharePartition.acquire("member-2", BATCH_SIZE, MAX_FETCH_RECORDS, 10, 
fetchPartitionData(memoryRecords(7, 10)));
         assertEquals(17, sharePartition.nextFetchOffset());
 
         assertEquals(RecordState.ACQUIRED, 
sharePartition.cachedState().get(5L).batchState());
@@ -6189,6 +6201,307 @@ public class SharePartitionTest {
         assertEquals(-1, lastOffsetAcknowledged);
     }
 
+    /**
+     * Test the case where the fetch batch has first record offset greater 
than the record batch start offset.
+     * Such batches can exist for compacted topics.
+     */
+    @Test
+    public void 
testAcquireAndAcknowledgeWithRecordsAheadOfRecordBatchStartOffset() {
+        SharePartition sharePartition = SharePartitionBuilder.builder()
+            .withState(SharePartitionState.ACTIVE)
+            .build();
+
+        ByteBuffer buffer = ByteBuffer.allocate(4096);
+        // Set the base offset at 5.
+        try (MemoryRecordsBuilder builder = MemoryRecords.builder(buffer, 
Compression.NONE,
+            TimestampType.CREATE_TIME, 5, 2)) {
+            // Append records from offset 10.
+            memoryRecords(2, 10).records().forEach(builder::append);
+            // Append records from offset 15.
+            memoryRecords(2, 15).records().forEach(builder::append);
+        }
+        buffer.flip();
+        MemoryRecords records = MemoryRecords.readableRecords(buffer);
+        // Complete batch from 5-16 will be acquired, hence 12 records.
+        fetchAcquiredRecords(sharePartition, records, 12);
+        // Partially acknowledge the batch from 5-16.
+        sharePartition.acknowledge(MEMBER_ID, Arrays.asList(
+            new ShareAcknowledgementBatch(5, 9, 
List.of(ACKNOWLEDGE_TYPE_GAP_ID)),
+            new ShareAcknowledgementBatch(10, 11, 
List.of(AcknowledgeType.ACCEPT.id)),
+            new ShareAcknowledgementBatch(12, 14, 
List.of(AcknowledgeType.REJECT.id)),
+            new ShareAcknowledgementBatch(15, 16, 
List.of(AcknowledgeType.RELEASE.id))));
+
+        assertEquals(15, sharePartition.nextFetchOffset());
+        assertEquals(1, sharePartition.cachedState().size());
+        assertNotNull(sharePartition.cachedState().get(5L));
+        assertNotNull(sharePartition.cachedState().get(5L).offsetState());
+
+        // Check cached state.
+        Map<Long, InFlightState> expectedOffsetStateMap = new HashMap<>();
+        expectedOffsetStateMap.put(5L, new InFlightState(RecordState.ARCHIVED, 
(short) 1, EMPTY_MEMBER_ID));
+        expectedOffsetStateMap.put(6L, new InFlightState(RecordState.ARCHIVED, 
(short) 1, EMPTY_MEMBER_ID));
+        expectedOffsetStateMap.put(7L, new InFlightState(RecordState.ARCHIVED, 
(short) 1, EMPTY_MEMBER_ID));
+        expectedOffsetStateMap.put(8L, new InFlightState(RecordState.ARCHIVED, 
(short) 1, EMPTY_MEMBER_ID));
+        expectedOffsetStateMap.put(9L, new InFlightState(RecordState.ARCHIVED, 
(short) 1, EMPTY_MEMBER_ID));
+        expectedOffsetStateMap.put(10L, new 
InFlightState(RecordState.ACKNOWLEDGED, (short) 1, EMPTY_MEMBER_ID));
+        expectedOffsetStateMap.put(11L, new 
InFlightState(RecordState.ACKNOWLEDGED, (short) 1, EMPTY_MEMBER_ID));
+        expectedOffsetStateMap.put(12L, new 
InFlightState(RecordState.ARCHIVED, (short) 1, EMPTY_MEMBER_ID));
+        expectedOffsetStateMap.put(13L, new 
InFlightState(RecordState.ARCHIVED, (short) 1, EMPTY_MEMBER_ID));
+        expectedOffsetStateMap.put(14L, new 
InFlightState(RecordState.ARCHIVED, (short) 1, EMPTY_MEMBER_ID));
+        expectedOffsetStateMap.put(15L, new 
InFlightState(RecordState.AVAILABLE, (short) 1, EMPTY_MEMBER_ID));
+        expectedOffsetStateMap.put(16L, new 
InFlightState(RecordState.AVAILABLE, (short) 1, EMPTY_MEMBER_ID));
+
+        assertEquals(expectedOffsetStateMap, 
sharePartition.cachedState().get(5L).offsetState());
+    }
+
+    /**
+     * Test the case where the available cached batches never appear again in 
fetch response within the
+     * previous fetch offset range. Also remove records from the previous 
fetch batches.
+     * <p>
+     * Such case can arise with compacted topics where complete batches are 
removed or records within
+     * batches are removed.
+     */
+    @Test
+    public void 
testAcquireWhenBatchesAreRemovedFromBetweenInSubsequentFetchData() {
+        SharePartition sharePartition = SharePartitionBuilder.builder()
+            .withState(SharePartitionState.ACTIVE)
+            .build();
+
+        // Create 3 batches of records for a single acquire.
+        ByteBuffer buffer = ByteBuffer.allocate(4096);
+        memoryRecordsBuilder(buffer, 5, 0).close();
+        memoryRecordsBuilder(buffer, 15, 5).close();
+        memoryRecordsBuilder(buffer, 15, 20).close();
+        buffer.flip();
+        MemoryRecords records = MemoryRecords.readableRecords(buffer);
+        // Acquire batch (0-34) which shall create single cache entry.
+        fetchAcquiredRecords(sharePartition, records, 35);
+        // Acquire another 3 individual batches of records.
+        fetchAcquiredRecords(sharePartition, memoryRecords(5, 40), 5);
+        fetchAcquiredRecords(sharePartition, memoryRecords(5, 45), 5);
+        fetchAcquiredRecords(sharePartition, memoryRecords(15, 50), 15);
+        // Release all batches in the cache.
+        sharePartition.releaseAcquiredRecords(MEMBER_ID);
+        // Validate cache has 4 entries.
+        assertEquals(4, sharePartition.cachedState().size());
+
+        // Compact all batches and remove some of the batches from the fetch 
response.
+        buffer = ByteBuffer.allocate(4096);
+        try (MemoryRecordsBuilder builder = MemoryRecords.builder(buffer, 
Compression.NONE,
+            TimestampType.CREATE_TIME, 0, 2)) {
+            // Append only 2 records for 0 offset batch starting from offset 1.
+            memoryRecords(2, 1).records().forEach(builder::append);
+        }
+        // Do not include batch from offset 5. And compact batch starting at 
offset 20.
+        try (MemoryRecordsBuilder builder = MemoryRecords.builder(buffer, 
Compression.NONE,
+            TimestampType.CREATE_TIME, 20, 2)) {
+            // Append 2 records for 20 offset batch starting from offset 20.
+            memoryRecords(2, 20).records().forEach(builder::append);
+            // And append 2 records matching the end offset of the batch.
+            memoryRecords(2, 33).records().forEach(builder::append);
+        }
+        // Send the full batch at offset 40.
+        memoryRecordsBuilder(buffer, 5, 40).close();
+        // Do not include batch from offset 45. And compact the batch at 
offset 50.
+        try (MemoryRecordsBuilder builder = MemoryRecords.builder(buffer, 
Compression.NONE,
+            TimestampType.CREATE_TIME, 50, 2)) {
+            // Append 5 records for 50 offset batch starting from offset 51.
+            memoryRecords(5, 51).records().forEach(builder::append);
+            // Append 2 records for in middle of the batch.
+            memoryRecords(2, 58).records().forEach(builder::append);
+            // And append 1 record prior to the end offset.
+            memoryRecords(1, 63).records().forEach(builder::append);
+        }
+        buffer.flip();
+        records = MemoryRecords.readableRecords(buffer);
+        // Acquire the new compacted batches. The acquire method determines 
the acquisition range using
+        // the first and last offsets of the fetched batches and acquires all 
available cached batches
+        // within that range. That means the batch from offset 45-49 which is 
not included in the
+        // fetch response will also be acquired. Similarly, for the batch from 
offset 5-19 which is
+        // anyway in the bigger cached batch of 0-34, will also be acquired. 
This avoids iterating
+        // through individual fetched batch boundaries; the client is 
responsible for reporting any
+        // data gaps via acknowledgements. This test also covers the edge case 
where the last fetched
+        // batch is compacted, and its last offset is before the previously 
cached version's last offset.
+        // In this situation, the last batch's offset state tracking is 
initialized. This is handled
+        // correctly because the client will send individual offset 
acknowledgements, which require offset
+        // state tracking anyway. While this last scenario is unlikely in 
practice (as a batch's reported
+        // last offset should remain correct even after compaction), the test 
verifies its proper handling.
+        fetchAcquiredRecords(sharePartition, records, 59);
+        assertEquals(64, sharePartition.nextFetchOffset());
+        assertEquals(4, sharePartition.cachedState().size());
+        sharePartition.cachedState().forEach((offset, inFlightState) -> {
+            // All batches other than the last batch should have batch state 
maintained.
+            if (offset < 50) {
+                assertNotNull(inFlightState.batchState());
+                assertEquals(RecordState.ACQUIRED, inFlightState.batchState());
+            } else {
+                assertNotNull(inFlightState.offsetState());
+                inFlightState.offsetState().forEach((recordOffset, 
offsetState) -> {
+                    // All offsets other than the last offset should be 
acquired.
+                    RecordState recordState = recordOffset < 64 ? 
RecordState.ACQUIRED : RecordState.AVAILABLE;
+                    assertEquals(recordState, offsetState.state(), "Incorrect 
state for offset: " + recordOffset);
+                });
+            }
+        });
+    }
+
+    /**
+     * This test verifies that cached batches which are no longer returned in 
fetch responses (starting
+     * from the fetchOffset) are correctly archived. Archiving these batches 
is crucial for the SPSO
+     * and the next fetch offset to advance. Without archiving, these offsets 
would be stuck, as the
+     * cached batches would remain available.
+     * <p>
+     * This scenario can occur with compacted topics when entire batches, 
previously held in the cache,
+     * are removed from the log at the offset where reading occurs.
+     */
+    @Test
+    public void testAcquireWhenBatchesRemovedForFetchOffset() {
+        SharePartition sharePartition = SharePartitionBuilder.builder()
+            .withState(SharePartitionState.ACTIVE)
+            .build();
+
+        fetchAcquiredRecords(sharePartition, memoryRecords(5, 0), 5);
+        fetchAcquiredRecords(sharePartition, memoryRecords(5, 5), 5);
+        fetchAcquiredRecords(sharePartition, memoryRecords(15, 10), 15);
+        // Release the batches in the cache.
+        sharePartition.releaseAcquiredRecords(MEMBER_ID);
+        // Validate cache has 3 entries.
+        assertEquals(3, sharePartition.cachedState().size());
+
+        // Compact second batch and remove first batch from the fetch response.
+        ByteBuffer buffer = ByteBuffer.allocate(4096);
+        try (MemoryRecordsBuilder builder = MemoryRecords.builder(buffer, 
Compression.NONE,
+            TimestampType.CREATE_TIME, 5, 2)) {
+            // Append only 4 records for 5th offset batch starting from offset 
6.
+            memoryRecords(4, 6).records().forEach(builder::append);
+        }
+        buffer.flip();
+        MemoryRecords records = MemoryRecords.readableRecords(buffer);
+
+        // Only second batch should be acquired and first batch offsets should 
be archived. Send
+        // fetchOffset as 0.
+        fetchAcquiredRecords(sharePartition, records, 0, 0, 5);
+        assertEquals(10, sharePartition.nextFetchOffset());
+        // The next fetch offset has been updated, but the start offset should 
remain unchanged since
+        // the acquire operation only marks offsets as archived. The start 
offset will be correctly
+        // updated once any records are acknowledged.
+        assertEquals(0, sharePartition.startOffset());
+
+        // Releasing acquired records updates the cache and moves the start 
offset.
+        sharePartition.releaseAcquiredRecords(MEMBER_ID);
+        assertEquals(5, sharePartition.startOffset());
+        assertEquals(5, sharePartition.nextFetchOffset());
+        // Validate first batch has been removed from the cache.
+        assertEquals(2, sharePartition.cachedState().size());
+        sharePartition.cachedState().forEach((offset, inFlightState) -> {
+            assertNotNull(inFlightState.batchState());
+            assertEquals(RecordState.AVAILABLE, inFlightState.batchState());
+        });
+    }
+
+    /**
+     * This test verifies that cached batches which are no longer returned in 
fetch responses are
+     * correctly archived, when fetchOffset is within an already cached batch. 
Archiving these batches/offsets
+     * is crucial for the SPSO and the next fetch offset to advance.
+     * <p>
+     * This scenario can occur with compacted topics when fetch triggers from 
an offset which is within
+     * a cached batch, and respective batch is removed from the log.
+     */
+    @Test
+    public void testAcquireWhenBatchesRemovedForFetchOffsetWithinBatch() {
+        SharePartition sharePartition = SharePartitionBuilder.builder()
+            .withState(SharePartitionState.ACTIVE)
+            .build();
+
+        fetchAcquiredRecords(sharePartition, memoryRecords(5, 5), 5);
+        fetchAcquiredRecords(sharePartition, memoryRecords(15, 10), 15);
+        // Acknowledge subset of the first batch offsets.
+        sharePartition.acknowledge(MEMBER_ID, List.of(
+            // Accept the 3 offsets of first batch.
+            new ShareAcknowledgementBatch(5, 7, 
List.of(AcknowledgeType.ACCEPT.id)))).join();
+        // Release the remaining batches/offsets in the cache.
+        sharePartition.releaseAcquiredRecords(MEMBER_ID).join();
+        // Validate cache has 2 entries.
+        assertEquals(2, sharePartition.cachedState().size());
+
+        // Mark fetch offset within the first batch to 8, first available 
offset.
+        fetchAcquiredRecords(sharePartition, memoryRecords(15, 10), 8, 0, 15);
+        assertEquals(25, sharePartition.nextFetchOffset());
+        // The next fetch offset has been updated, but the start offset should 
remain unchanged since
+        // the acquire operation only marks offsets as archived. The start 
offset will be correctly
+        // updated once any records are acknowledged.
+        assertEquals(8, sharePartition.startOffset());
+
+        // Releasing acquired records updates the cache and moves the start 
offset.
+        sharePartition.releaseAcquiredRecords(MEMBER_ID);
+        assertEquals(10, sharePartition.startOffset());
+        assertEquals(10, sharePartition.nextFetchOffset());
+        // Validate first batch has been removed from the cache.
+        assertEquals(1, sharePartition.cachedState().size());
+        assertEquals(RecordState.AVAILABLE, 
sharePartition.cachedState().get(10L).batchState());
+    }
+
+    /**
+     * This test verifies that when cached batch consists of multiple fetched 
batches but batches are
+     * removed from the log, starting at fetch offset, then cached batch is 
updated.
+     * <p>
+     * This scenario can occur with compacted topics when entire batches, 
previously held in the cache,
+     * are removed from the log at the offset where reading occurs.
+     */
+    @Test
+    public void 
testAcquireWhenBatchesRemovedForFetchOffsetForSameCachedBatch() {
+        SharePartition sharePartition = SharePartitionBuilder.builder()
+            .withState(SharePartitionState.ACTIVE)
+            .build();
+
+        // Create 3 batches of records for a single acquire.
+        ByteBuffer buffer = ByteBuffer.allocate(4096);
+        memoryRecordsBuilder(buffer, 5, 0).close();
+        memoryRecordsBuilder(buffer, 15, 5).close();
+        memoryRecordsBuilder(buffer, 15, 20).close();
+        buffer.flip();
+        MemoryRecords records = MemoryRecords.readableRecords(buffer);
+
+        // Acquire batch (0-34) which shall create single cache entry.
+        fetchAcquiredRecords(sharePartition, records, 35);
+        // Release the batches in the cache.
+        sharePartition.releaseAcquiredRecords(MEMBER_ID);
+        // Validate cache has 1 entry.
+        assertEquals(1, sharePartition.cachedState().size());
+
+        // Compact second batch and remove first batch from the fetch response.
+        buffer = ByteBuffer.allocate(4096);
+        try (MemoryRecordsBuilder builder = MemoryRecords.builder(buffer, 
Compression.NONE,
+            TimestampType.CREATE_TIME, 5, 2)) {
+            // Append only 4 records for 5th offset batch starting from offset 
6.
+            memoryRecords(4, 6).records().forEach(builder::append);
+        }
+        buffer.flip();
+        records = MemoryRecords.readableRecords(buffer);
+
+        // Only second batch should be acquired and first batch offsets should 
be archived. Send
+        // fetchOffset as 0.
+        fetchAcquiredRecords(sharePartition, records, 0, 0, 5);
+        assertEquals(10, sharePartition.nextFetchOffset());
+        // The next fetch offset has been updated, but the start offset should 
remain unchanged since
+        // the acquire operation only marks offsets as archived. The start 
offset will be correctly
+        // updated once any records are acknowledged.
+        assertEquals(0, sharePartition.startOffset());
+
+        // Releasing acquired records updates the cache and moves the start 
offset.
+        sharePartition.releaseAcquiredRecords(MEMBER_ID);
+        assertEquals(5, sharePartition.startOffset());
+        assertEquals(5, sharePartition.nextFetchOffset());
+        assertEquals(1, sharePartition.cachedState().size());
+        sharePartition.cachedState().forEach((offset, inFlightState) -> {
+            assertNotNull(inFlightState.offsetState());
+            inFlightState.offsetState().forEach((recordOffset, offsetState) -> 
{
+                RecordState recordState = recordOffset < 5 ? 
RecordState.ARCHIVED : RecordState.AVAILABLE;
+                assertEquals(recordState, offsetState.state());
+            });
+        });
+    }
+
     private FetchPartitionData fetchPartitionData(Records records) {
         return fetchPartitionData(records, 0);
     }
@@ -6199,11 +6512,15 @@ public class SharePartitionTest {
     }
 
     private List<AcquiredRecords> fetchAcquiredRecords(SharePartition 
sharePartition, Records records, long logStartOffset, int expectedOffsetCount) {
+        return fetchAcquiredRecords(sharePartition, records, 
records.batches().iterator().next().baseOffset(), logStartOffset, 
expectedOffsetCount);
+    }
+
+    private List<AcquiredRecords> fetchAcquiredRecords(SharePartition 
sharePartition, Records records, long fetchOffset, long logStartOffset, int 
expectedOffsetCount) {
         ShareAcquiredRecords shareAcquiredRecords = sharePartition.acquire(
             MEMBER_ID,
             BATCH_SIZE,
             MAX_FETCH_RECORDS,
-            DEFAULT_FETCH_OFFSET,
+            fetchOffset,
             fetchPartitionData(records, logStartOffset));
         return fetchAcquiredRecords(shareAcquiredRecords, expectedOffsetCount);
     }
@@ -6213,7 +6530,7 @@ public class SharePartitionTest {
             MEMBER_ID,
             BATCH_SIZE,
             MAX_FETCH_RECORDS,
-            DEFAULT_FETCH_OFFSET,
+            records.batches().iterator().next().baseOffset(),
             fetchPartitionData(records));
         return fetchAcquiredRecords(shareAcquiredRecords, expectedOffsetCount);
     }
@@ -6229,7 +6546,9 @@ public class SharePartitionTest {
     }
 
     private MemoryRecords memoryRecords(int numOfRecords, long startOffset) {
-        return memoryRecordsBuilder(numOfRecords, startOffset).build();
+        try (MemoryRecordsBuilder builder = memoryRecordsBuilder(numOfRecords, 
startOffset)) {
+            return builder.build();
+        }
     }
 
     private List<AcquiredRecords> expectedAcquiredRecord(long baseOffset, long 
lastOffset, int deliveryCount) {
diff --git 
a/server/src/main/java/org/apache/kafka/server/share/acknowledge/ShareAcknowledgementBatch.java
 
b/server/src/main/java/org/apache/kafka/server/share/acknowledge/ShareAcknowledgementBatch.java
index b23104d6dab..50aabb3903e 100644
--- 
a/server/src/main/java/org/apache/kafka/server/share/acknowledge/ShareAcknowledgementBatch.java
+++ 
b/server/src/main/java/org/apache/kafka/server/share/acknowledge/ShareAcknowledgementBatch.java
@@ -25,30 +25,11 @@ import java.util.List;
  * from {@link 
org.apache.kafka.common.message.ShareFetchRequestData.AcknowledgementBatch} and
  * {@link 
org.apache.kafka.common.message.ShareAcknowledgeRequestData.AcknowledgementBatch}
 classes.
  */
-public class ShareAcknowledgementBatch {
-
-    private final long firstOffset;
-    private final long lastOffset;
-    private final List<Byte> acknowledgeTypes;
-
-    public ShareAcknowledgementBatch(long firstOffset, long lastOffset, 
List<Byte> acknowledgeTypes) {
-        this.firstOffset = firstOffset;
-        this.lastOffset = lastOffset;
-        this.acknowledgeTypes = acknowledgeTypes;
-    }
-
-    public long firstOffset() {
-        return firstOffset;
-    }
-
-    public long lastOffset() {
-        return lastOffset;
-    }
-
-    public List<Byte> acknowledgeTypes() {
-        return acknowledgeTypes;
-    }
-
+public record ShareAcknowledgementBatch(
+    long firstOffset,
+    long lastOffset,
+    List<Byte> acknowledgeTypes
+) {
     @Override
     public String toString() {
         return "ShareAcknowledgementBatch(" +


Reply via email to