This is an automated email from the ASF dual-hosted git repository. mittal pushed a commit to branch trunk in repository https://gitbox.apache.org/repos/asf/kafka.git
The following commit(s) were added to refs/heads/trunk by this push: new 49ee1fb4f9d KAFKA-19632: Handle overlap batch on partition re-assignment (#20395) 49ee1fb4f9d is described below commit 49ee1fb4f9deb326d82b86d3a047182f7d96ddfa Author: Apoorv Mittal <apoorvmitta...@gmail.com> AuthorDate: Tue Aug 26 13:25:57 2025 +0100 KAFKA-19632: Handle overlap batch on partition re-assignment (#20395) The PR fixes the batch alignment issue when partitions are re-assigned. During initial read of state the batches can be broken arbitrarily. Say the start offset is 10 and cache contains [15-18] batch during initialization. When fetch happens at offset 10 and say the fetched batch contain 10 records i.e. [10-19] then correct batches will be created if maxFetchRecords is greater than 10. But if maxFetchRecords is less than 10 then last offset of batch is determined, which will be 19. Hence acquire method will incorrectly create a batch of [10-19] while [15-18] already exists. Below check is required t resolve the issue: ``` if (isInitialReadGapOffsetWindowActive() && lastAcquiredOffset > lastOffset) { lastAcquiredOffset = lastOffset; } ``` While testing with other cases, other issues were determined while updating the gap offset, acquire of records prior share partitions end offset and determining next fetch offset with compacted topics. All these issues can arise mainly during initial read window after partition re-assignment. Reviewers: Andrew Schofield <aschofi...@confluent.io>, Abhinav Dixit <adi...@confluent.io>, Chirag Wadhwa <cwad...@confluent.io> --- .../java/kafka/server/share/SharePartition.java | 92 ++- .../kafka/server/share/SharePartitionTest.java | 666 +++++++++++++++++++++ .../acknowledge/ShareAcknowledgementBatch.java | 3 + 3 files changed, 749 insertions(+), 12 deletions(-) diff --git a/core/src/main/java/kafka/server/share/SharePartition.java b/core/src/main/java/kafka/server/share/SharePartition.java index a07f9a12fbb..08a9539dbed 100644 --- a/core/src/main/java/kafka/server/share/SharePartition.java +++ b/core/src/main/java/kafka/server/share/SharePartition.java @@ -572,7 +572,16 @@ public class SharePartition { nextFetchOffset = gapStartOffset; break; } - gapStartOffset = entry.getValue().lastOffset() + 1; + // If the gapStartOffset is already past the last offset of the in-flight batch, + // then do not consider this batch for finding the next fetch offset. For example, + // consider during initialization, the initialReadGapOffset is set to 5 and the + // first cached batch is 15-18. First read will happen at offset 5 and say the data + // fetched is [5-6], now next fetch offset should be 7. This works fine but say + // subsequent read returns batch 8-11, and the gapStartOffset will be 12. Without + // the max check, the next fetch offset returned will be 7 which is incorrect. + // The natural gaps for which no data is available shall be considered hence + // take the max of the gapStartOffset and the last offset of the in-flight batch. + gapStartOffset = Math.max(entry.getValue().lastOffset() + 1, gapStartOffset); } // Check if the state is maintained per offset or batch. If the offsetState @@ -699,16 +708,33 @@ public class SharePartition { // Find the floor batch record for the request batch. The request batch could be // for a subset of the in-flight batch i.e. cached batch of offset 10-14 and request batch - // of 12-13. Hence, floor entry is fetched to find the sub-map. + // of 12-13. Hence, floor entry is fetched to find the sub-map. Secondly, when the share + // partition is initialized with persisted state, the start offset might be moved to a later + // offset. In such case, the first batch base offset might be less than the start offset. Map.Entry<Long, InFlightBatch> floorEntry = cachedState.floorEntry(baseOffset); - // We might find a batch with floor entry but not necessarily that batch has an overlap, - // if the request batch base offset is ahead of last offset from floor entry i.e. cached - // batch of 10-14 and request batch of 15-18, though floor entry is found but no overlap. - // Such scenario will be handled in the next step when considering the subMap. However, - // if the floor entry is found and the request batch base offset is within the floor entry - // then adjust the base offset to the floor entry so that acquire method can still work on - // previously cached batch boundaries. - if (floorEntry != null && floorEntry.getValue().lastOffset() >= baseOffset) { + if (floorEntry == null) { + // The initialize method check that there couldn't be any batches prior to the start offset. + // And once share partition starts fetching records, it will always fetch records, at least, + // from the start offset, but there could be cases where the batch base offset is prior + // to the start offset. This can happen when the share partition is initialized with + // partial persisted state and moved start offset i.e. start offset is not the batch's + // first offset. In such case, we need to adjust the base offset to the start offset. + // It's safe to adjust the base offset to the start offset when there isn't any floor + // i.e. no cached batches available prior to the request batch base offset. Hence, + // check for the floor entry and adjust the base offset accordingly. + if (baseOffset < startOffset) { + log.info("Adjusting base offset for the fetch as it's prior to start offset: {}-{}" + + "from {} to {}", groupId, topicIdPartition, baseOffset, startOffset); + baseOffset = startOffset; + } + } else if (floorEntry.getValue().lastOffset() >= baseOffset) { + // We might find a batch with floor entry but not necessarily that batch has an overlap, + // if the request batch base offset is ahead of last offset from floor entry i.e. cached + // batch of 10-14 and request batch of 15-18, though floor entry is found but no overlap. + // Such scenario will be handled in the next step when considering the subMap. However, + // if the floor entry is found and the request batch base offset is within the floor entry + // then adjust the base offset to the floor entry so that acquire method can still work on + // previously cached batch boundaries. baseOffset = floorEntry.getKey(); } // Validate if the fetch records are already part of existing batches and if available. @@ -755,7 +781,8 @@ public class SharePartition { result.addAll(shareAcquiredRecords.acquiredRecords()); acquiredCount += shareAcquiredRecords.count(); } - // Set nextBatchStartOffset as the last offset of the current in-flight batch + 1 + // Set nextBatchStartOffset as the last offset of the current in-flight batch + 1. + // Hence, after the loop iteration the next gap can be considered. maybeGapStartOffset = inFlightBatch.lastOffset() + 1; // If the acquired count is equal to the max fetch records then break the loop. if (acquiredCount >= maxRecordsToAcquire) { @@ -1057,10 +1084,24 @@ public class SharePartition { /** * Updates the cached state, start and end offsets of the share partition as per the new log * start offset. The method is called when the log start offset is moved for the share partition. + * <p> + * This method only archives the available records in the cached state that are before the new log + * start offset. It does not persist the archived state batches to the persister, rather it + * updates the cached state and offsets to reflect the new log start offset. The state in persister + * will be updated lazily during the acknowledge/release records API calls or acquisition lock timeout. + * <p> + * The AVAILABLE state records can either have ongoing state transition or not. Hence, the archive + * records method will update the state of the records to ARCHIVED and set the terminal state flag + * hence if the transition is rolled back then the state will not be AVAILABLE again. However, + * the ACQUIRED state records will not be archived as they are still in-flight and acknowledge + * method also do not allow the state update for any offsets post the log start offset, hence those + * records will only be archived once acquisition lock timeout occurs. * * @param logStartOffset The new log start offset. */ void updateCacheAndOffsets(long logStartOffset) { + log.debug("Updating cached states for share partition: {}-{} with new log start offset: {}", + groupId, topicIdPartition, logStartOffset); lock.writeLock().lock(); try { if (logStartOffset <= startOffset) { @@ -1432,7 +1473,11 @@ public class SharePartition { lock.writeLock().lock(); try { if (initialReadGapOffset != null) { - if (initialReadGapOffset.endOffset() == endOffset) { + // When last cached batch for initial read gap window is acquired, then endOffset is + // same as the initialReadGapOffset's endOffset, but the gap offset to update is + // endOffset + 1. Hence, do not update the gap start offset if the request offset + // is ahead of the endOffset. + if (initialReadGapOffset.endOffset() == endOffset && offset <= initialReadGapOffset.endOffset()) { initialReadGapOffset.gapStartOffset(offset); } else { // The initial read gap offset is not valid anymore as the end offset has moved @@ -1445,6 +1490,15 @@ public class SharePartition { } } + /** + * The method calculates the last offset and maximum records to acquire. The adjustment is needed + * to ensure that the records acquired do not exceed the maximum in-flight messages limit. + * + * @param fetchOffset The offset from which the records are fetched. + * @param maxFetchRecords The maximum number of records to acquire. + * @param lastOffset The last offset to acquire records to, which is the last offset of the fetched batch. + * @return LastOffsetAndMaxRecords object, containing the last offset to acquire and the maximum records to acquire. + */ private LastOffsetAndMaxRecords lastOffsetAndMaxRecordsToAcquire(long fetchOffset, int maxFetchRecords, long lastOffset) { // There can always be records fetched exceeding the max in-flight messages limit. Hence, // we need to check if the share partition has reached the max in-flight messages limit @@ -1512,6 +1566,20 @@ public class SharePartition { // which falls under the max messages limit. As the max fetch records is the soft // limit, the last offset can be higher than the max messages. lastAcquiredOffset = lastOffsetFromBatchWithRequestOffset(batches, firstAcquiredOffset + maxFetchRecords - 1); + // If the initial read gap offset window is active then it's not guaranteed that the + // batches align on batch boundaries. Hence, reset to last offset itself if the batch's + // last offset is greater than the last offset for acquisition, else there could be + // a situation where the batch overlaps with the initial read gap offset window batch. + // For example, if the initial read gap offset window is 10-30 i.e. initialReadGapOffset's + // startOffset is 10 and endOffset is 30, and the first persister's read batch is 15-30. + // Say first fetched batch from log is 10-30 and maxFetchRecords is 1, then the lastOffset + // in this method call would be 14. As the maxFetchRecords is lesser than the batch, + // hence last batch offset for request offset is fetched. In this example it will + // be 30, hence check if the initial read gap offset window is active and the last acquired + // offset should be adjusted to 14 instead of 30. + if (isInitialReadGapOffsetWindowActive() && lastAcquiredOffset > lastOffset) { + lastAcquiredOffset = lastOffset; + } } // Create batches of acquired records. diff --git a/core/src/test/java/kafka/server/share/SharePartitionTest.java b/core/src/test/java/kafka/server/share/SharePartitionTest.java index ba24f3b2595..75f27ec5382 100644 --- a/core/src/test/java/kafka/server/share/SharePartitionTest.java +++ b/core/src/test/java/kafka/server/share/SharePartitionTest.java @@ -1088,6 +1088,672 @@ public class SharePartitionTest { assertNull(initialReadGapOffset); } + @Test + public void testMaybeInitializeAndAcquire() { + Persister persister = Mockito.mock(Persister.class); + ReadShareGroupStateResult readShareGroupStateResult = Mockito.mock(ReadShareGroupStateResult.class); + Mockito.when(readShareGroupStateResult.topicsData()).thenReturn(List.of( + new TopicData<>(TOPIC_ID_PARTITION.topicId(), List.of( + PartitionFactory.newPartitionAllData(0, 3, 10L, Errors.NONE.code(), Errors.NONE.message(), + List.of( + new PersisterStateBatch(15L, 18L, RecordState.AVAILABLE.id, (short) 2), + new PersisterStateBatch(20L, 22L, RecordState.ARCHIVED.id, (short) 2), + new PersisterStateBatch(26L, 30L, RecordState.AVAILABLE.id, (short) 3))))))); + Mockito.when(persister.readState(Mockito.any())).thenReturn(CompletableFuture.completedFuture(readShareGroupStateResult)); + SharePartition sharePartition = SharePartitionBuilder.builder().withPersister(persister).build(); + + CompletableFuture<Void> result = sharePartition.maybeInitialize(); + assertTrue(result.isDone()); + assertFalse(result.isCompletedExceptionally()); + + assertEquals(SharePartitionState.ACTIVE, sharePartition.partitionState()); + assertEquals(3, sharePartition.cachedState().size()); + assertEquals(10, sharePartition.startOffset()); + assertEquals(30, sharePartition.endOffset()); + assertEquals(10, sharePartition.nextFetchOffset()); + + assertEquals(18, sharePartition.cachedState().get(15L).lastOffset()); + assertEquals(22, sharePartition.cachedState().get(20L).lastOffset()); + assertEquals(30, sharePartition.cachedState().get(26L).lastOffset()); + assertEquals(RecordState.AVAILABLE, sharePartition.cachedState().get(15L).batchState()); + assertEquals(RecordState.ARCHIVED, sharePartition.cachedState().get(20L).batchState()); + assertEquals(RecordState.AVAILABLE, sharePartition.cachedState().get(26L).batchState()); + assertNotNull(sharePartition.initialReadGapOffset()); + assertEquals(10L, sharePartition.initialReadGapOffset().gapStartOffset()); + assertEquals(30L, sharePartition.initialReadGapOffset().endOffset()); + + // Create a single batch record that covers the entire range from 10 to 30 of initial read gap. + // The records in the batch are from 10 to 49. + MemoryRecords records = memoryRecords(40, 10); + // Set max fetch records to 1, records will be acquired till the first gap is encountered. + List<AcquiredRecords> acquiredRecordsList = fetchAcquiredRecords(sharePartition.acquire( + MEMBER_ID, + BATCH_SIZE, + 1, + 10, + fetchPartitionData(records), + FETCH_ISOLATION_HWM), + 5); + + assertArrayEquals(expectedAcquiredRecord(10, 14, 1).toArray(), acquiredRecordsList.toArray()); + assertEquals(15, sharePartition.nextFetchOffset()); + assertEquals(4, sharePartition.cachedState().size()); + assertEquals(10, sharePartition.cachedState().get(10L).firstOffset()); + assertEquals(14, sharePartition.cachedState().get(10L).lastOffset()); + assertEquals(RecordState.ACQUIRED, sharePartition.cachedState().get(10L).batchState()); + assertEquals(1, sharePartition.cachedState().get(10L).batchDeliveryCount()); + assertNull(sharePartition.cachedState().get(10L).offsetState()); + assertEquals(RecordState.AVAILABLE, sharePartition.cachedState().get(15L).batchState()); + assertEquals(RecordState.ARCHIVED, sharePartition.cachedState().get(20L).batchState()); + assertEquals(RecordState.AVAILABLE, sharePartition.cachedState().get(26L).batchState()); + assertNotNull(sharePartition.initialReadGapOffset()); + assertEquals(15L, sharePartition.initialReadGapOffset().gapStartOffset()); + + // Send the same batch again to acquire the next set of records. + acquiredRecordsList = fetchAcquiredRecords(sharePartition.acquire( + MEMBER_ID, + BATCH_SIZE, + 10, + 15, + fetchPartitionData(records), + FETCH_ISOLATION_HWM), + 13); + + List<AcquiredRecords> expectedAcquiredRecords = new ArrayList<>(expectedAcquiredRecord(15, 18, 3)); + expectedAcquiredRecords.addAll(expectedAcquiredRecord(19, 19, 1)); + expectedAcquiredRecords.addAll(expectedAcquiredRecord(23, 25, 1)); + expectedAcquiredRecords.addAll(expectedAcquiredRecord(26, 30, 4)); + + assertArrayEquals(expectedAcquiredRecords.toArray(), acquiredRecordsList.toArray()); + assertEquals(31, sharePartition.nextFetchOffset()); + assertEquals(6, sharePartition.cachedState().size()); + assertEquals(19, sharePartition.cachedState().get(19L).firstOffset()); + assertEquals(19, sharePartition.cachedState().get(19L).lastOffset()); + assertEquals(RecordState.ACQUIRED, sharePartition.cachedState().get(19L).batchState()); + assertEquals(1, sharePartition.cachedState().get(19L).batchDeliveryCount()); + assertNull(sharePartition.cachedState().get(19L).offsetState()); + assertEquals(RecordState.ACQUIRED, sharePartition.cachedState().get(15L).batchState()); + assertEquals(RecordState.ARCHIVED, sharePartition.cachedState().get(20L).batchState()); + assertEquals(23, sharePartition.cachedState().get(23L).firstOffset()); + assertEquals(25, sharePartition.cachedState().get(23L).lastOffset()); + assertEquals(RecordState.ACQUIRED, sharePartition.cachedState().get(23L).batchState()); + assertEquals(1, sharePartition.cachedState().get(23L).batchDeliveryCount()); + assertEquals(RecordState.ACQUIRED, sharePartition.cachedState().get(26L).batchState()); + assertEquals(30L, sharePartition.endOffset()); + // As all the gaps are now filled, the initialReadGapOffset should be null. + assertNull(sharePartition.initialReadGapOffset()); + + // Now initial read gap is filled, so the complete batch can be acquired despite max fetch records being 1. + acquiredRecordsList = fetchAcquiredRecords(sharePartition.acquire( + MEMBER_ID, + BATCH_SIZE, + 1, + 31, + fetchPartitionData(records), + FETCH_ISOLATION_HWM), + 19); + + assertArrayEquals(expectedAcquiredRecord(31, 49, 1).toArray(), acquiredRecordsList.toArray()); + assertEquals(50, sharePartition.nextFetchOffset()); + assertEquals(7, sharePartition.cachedState().size()); + assertEquals(31, sharePartition.cachedState().get(31L).firstOffset()); + assertEquals(49, sharePartition.cachedState().get(31L).lastOffset()); + assertEquals(RecordState.ACQUIRED, sharePartition.cachedState().get(31L).batchState()); + assertEquals(1, sharePartition.cachedState().get(31L).batchDeliveryCount()); + assertNull(sharePartition.cachedState().get(31L).offsetState()); + assertEquals(49L, sharePartition.endOffset()); + } + + @Test + public void testMaybeInitializeAndAcquireWithHigherMaxFetchRecords() { + Persister persister = Mockito.mock(Persister.class); + ReadShareGroupStateResult readShareGroupStateResult = Mockito.mock(ReadShareGroupStateResult.class); + Mockito.when(readShareGroupStateResult.topicsData()).thenReturn(List.of( + new TopicData<>(TOPIC_ID_PARTITION.topicId(), List.of( + PartitionFactory.newPartitionAllData(0, 3, 10L, Errors.NONE.code(), Errors.NONE.message(), + List.of( + new PersisterStateBatch(15L, 18L, RecordState.AVAILABLE.id, (short) 2), + new PersisterStateBatch(20L, 22L, RecordState.ARCHIVED.id, (short) 2), + new PersisterStateBatch(26L, 30L, RecordState.AVAILABLE.id, (short) 3))))))); + Mockito.when(persister.readState(Mockito.any())).thenReturn(CompletableFuture.completedFuture(readShareGroupStateResult)); + SharePartition sharePartition = SharePartitionBuilder.builder().withPersister(persister).build(); + + CompletableFuture<Void> result = sharePartition.maybeInitialize(); + assertTrue(result.isDone()); + assertFalse(result.isCompletedExceptionally()); + + assertEquals(SharePartitionState.ACTIVE, sharePartition.partitionState()); + assertEquals(3, sharePartition.cachedState().size()); + assertEquals(10, sharePartition.startOffset()); + assertEquals(30, sharePartition.endOffset()); + assertEquals(10, sharePartition.nextFetchOffset()); + + assertEquals(18, sharePartition.cachedState().get(15L).lastOffset()); + assertEquals(22, sharePartition.cachedState().get(20L).lastOffset()); + assertEquals(30, sharePartition.cachedState().get(26L).lastOffset()); + assertEquals(RecordState.AVAILABLE, sharePartition.cachedState().get(15L).batchState()); + assertEquals(RecordState.ARCHIVED, sharePartition.cachedState().get(20L).batchState()); + assertEquals(RecordState.AVAILABLE, sharePartition.cachedState().get(26L).batchState()); + assertNotNull(sharePartition.initialReadGapOffset()); + assertEquals(10L, sharePartition.initialReadGapOffset().gapStartOffset()); + assertEquals(30L, sharePartition.initialReadGapOffset().endOffset()); + + // Create a single batch record that covers the entire range from 10 to 30 of initial read gap. + // The records in the batch are from 10 to 49. + MemoryRecords records = memoryRecords(40, 10); + // Set max fetch records to 500, all records should be acquired. + List<AcquiredRecords> acquiredRecordsList = fetchAcquiredRecords(sharePartition.acquire( + MEMBER_ID, + BATCH_SIZE, + 500, + 10, + fetchPartitionData(records), + FETCH_ISOLATION_HWM), + 37); + + List<AcquiredRecords> expectedAcquiredRecords = new ArrayList<>(expectedAcquiredRecord(10, 14, 1)); + expectedAcquiredRecords.addAll(expectedAcquiredRecord(15, 18, 3)); + expectedAcquiredRecords.addAll(expectedAcquiredRecord(19, 19, 1)); + expectedAcquiredRecords.addAll(expectedAcquiredRecord(23, 25, 1)); + expectedAcquiredRecords.addAll(expectedAcquiredRecord(26, 30, 4)); + expectedAcquiredRecords.addAll(expectedAcquiredRecord(31, 49, 1)); + + assertArrayEquals(expectedAcquiredRecords.toArray(), acquiredRecordsList.toArray()); + assertEquals(50, sharePartition.nextFetchOffset()); + assertEquals(7, sharePartition.cachedState().size()); + assertEquals(10, sharePartition.cachedState().get(10L).firstOffset()); + assertEquals(14, sharePartition.cachedState().get(10L).lastOffset()); + assertEquals(19, sharePartition.cachedState().get(19L).firstOffset()); + assertEquals(19, sharePartition.cachedState().get(19L).lastOffset()); + assertEquals(23, sharePartition.cachedState().get(23L).firstOffset()); + assertEquals(25, sharePartition.cachedState().get(23L).lastOffset()); + assertEquals(31, sharePartition.cachedState().get(31L).firstOffset()); + assertEquals(49, sharePartition.cachedState().get(31L).lastOffset()); + + assertEquals(RecordState.ACQUIRED, sharePartition.cachedState().get(10L).batchState()); + assertEquals(RecordState.ACQUIRED, sharePartition.cachedState().get(15L).batchState()); + assertEquals(RecordState.ACQUIRED, sharePartition.cachedState().get(19L).batchState()); + assertEquals(RecordState.ARCHIVED, sharePartition.cachedState().get(20L).batchState()); + assertEquals(RecordState.ACQUIRED, sharePartition.cachedState().get(23L).batchState()); + assertEquals(RecordState.ACQUIRED, sharePartition.cachedState().get(26L).batchState()); + assertEquals(RecordState.ACQUIRED, sharePartition.cachedState().get(31L).batchState()); + assertEquals(49L, sharePartition.endOffset()); + // As all the gaps are now filled, the initialReadGapOffset should be null. + assertNull(sharePartition.initialReadGapOffset()); + } + + @Test + public void testMaybeInitializeAndAcquireWithFetchBatchLastOffsetWithinCachedBatch() { + Persister persister = Mockito.mock(Persister.class); + ReadShareGroupStateResult readShareGroupStateResult = Mockito.mock(ReadShareGroupStateResult.class); + Mockito.when(readShareGroupStateResult.topicsData()).thenReturn(List.of( + new TopicData<>(TOPIC_ID_PARTITION.topicId(), List.of( + PartitionFactory.newPartitionAllData(0, 3, 10L, Errors.NONE.code(), Errors.NONE.message(), + List.of( + new PersisterStateBatch(15L, 18L, RecordState.AVAILABLE.id, (short) 2), + new PersisterStateBatch(20L, 22L, RecordState.ARCHIVED.id, (short) 2), + new PersisterStateBatch(26L, 30L, RecordState.AVAILABLE.id, (short) 3))))))); + Mockito.when(persister.readState(Mockito.any())).thenReturn(CompletableFuture.completedFuture(readShareGroupStateResult)); + SharePartition sharePartition = SharePartitionBuilder.builder().withPersister(persister).build(); + + CompletableFuture<Void> result = sharePartition.maybeInitialize(); + assertTrue(result.isDone()); + assertFalse(result.isCompletedExceptionally()); + + assertEquals(SharePartitionState.ACTIVE, sharePartition.partitionState()); + assertEquals(3, sharePartition.cachedState().size()); + assertEquals(10, sharePartition.startOffset()); + assertEquals(30, sharePartition.endOffset()); + assertEquals(10, sharePartition.nextFetchOffset()); + + assertEquals(18, sharePartition.cachedState().get(15L).lastOffset()); + assertEquals(22, sharePartition.cachedState().get(20L).lastOffset()); + assertEquals(30, sharePartition.cachedState().get(26L).lastOffset()); + assertEquals(RecordState.AVAILABLE, sharePartition.cachedState().get(15L).batchState()); + assertEquals(RecordState.ARCHIVED, sharePartition.cachedState().get(20L).batchState()); + assertEquals(RecordState.AVAILABLE, sharePartition.cachedState().get(26L).batchState()); + assertNotNull(sharePartition.initialReadGapOffset()); + assertEquals(10L, sharePartition.initialReadGapOffset().gapStartOffset()); + assertEquals(30L, sharePartition.initialReadGapOffset().endOffset()); + + // Create a single batch record that ends in between the cached batch and the fetch offset is + // post startOffset. + MemoryRecords records = memoryRecords(16, 12); + // Set max fetch records to 500, records should be acquired till the last offset of the fetched batch. + List<AcquiredRecords> acquiredRecordsList = fetchAcquiredRecords(sharePartition.acquire( + MEMBER_ID, + BATCH_SIZE, + 500, + 10, + fetchPartitionData(records), + FETCH_ISOLATION_HWM), + 13); + + List<AcquiredRecords> expectedAcquiredRecords = new ArrayList<>(expectedAcquiredRecord(12, 14, 1)); + expectedAcquiredRecords.addAll(expectedAcquiredRecord(15, 18, 3)); + expectedAcquiredRecords.addAll(expectedAcquiredRecord(19, 19, 1)); + expectedAcquiredRecords.addAll(expectedAcquiredRecord(23, 25, 1)); + expectedAcquiredRecords.addAll(expectedAcquiredRecords(26, 27, 4)); + + assertArrayEquals(expectedAcquiredRecords.toArray(), acquiredRecordsList.toArray()); + assertEquals(28, sharePartition.nextFetchOffset()); + assertEquals(6, sharePartition.cachedState().size()); + assertEquals(12, sharePartition.cachedState().get(12L).firstOffset()); + assertEquals(14, sharePartition.cachedState().get(12L).lastOffset()); + assertEquals(19, sharePartition.cachedState().get(19L).firstOffset()); + assertEquals(19, sharePartition.cachedState().get(19L).lastOffset()); + assertEquals(23, sharePartition.cachedState().get(23L).firstOffset()); + assertEquals(25, sharePartition.cachedState().get(23L).lastOffset()); + + assertEquals(RecordState.ACQUIRED, sharePartition.cachedState().get(12L).batchState()); + assertEquals(RecordState.ACQUIRED, sharePartition.cachedState().get(15L).batchState()); + assertEquals(RecordState.ACQUIRED, sharePartition.cachedState().get(19L).batchState()); + assertEquals(RecordState.ARCHIVED, sharePartition.cachedState().get(20L).batchState()); + assertEquals(RecordState.ACQUIRED, sharePartition.cachedState().get(23L).batchState()); + assertThrows(IllegalStateException.class, () -> sharePartition.cachedState().get(26L).batchState()); + assertEquals(RecordState.ACQUIRED, sharePartition.cachedState().get(26L).offsetState().get(26L).state()); + assertEquals(RecordState.ACQUIRED, sharePartition.cachedState().get(26L).offsetState().get(27L).state()); + assertEquals(RecordState.AVAILABLE, sharePartition.cachedState().get(26L).offsetState().get(28L).state()); + assertEquals(RecordState.AVAILABLE, sharePartition.cachedState().get(26L).offsetState().get(29L).state()); + assertEquals(RecordState.AVAILABLE, sharePartition.cachedState().get(26L).offsetState().get(30L).state()); + assertEquals(30L, sharePartition.endOffset()); + assertNotNull(sharePartition.initialReadGapOffset()); + assertEquals(28L, sharePartition.initialReadGapOffset().gapStartOffset()); + } + + @Test + public void testMaybeInitializeAndAcquireWithFetchBatchPriorStartOffset() { + Persister persister = Mockito.mock(Persister.class); + ReadShareGroupStateResult readShareGroupStateResult = Mockito.mock(ReadShareGroupStateResult.class); + Mockito.when(readShareGroupStateResult.topicsData()).thenReturn(List.of( + new TopicData<>(TOPIC_ID_PARTITION.topicId(), List.of( + PartitionFactory.newPartitionAllData(0, 3, 10L, Errors.NONE.code(), Errors.NONE.message(), + List.of( + new PersisterStateBatch(15L, 18L, RecordState.AVAILABLE.id, (short) 2), + new PersisterStateBatch(20L, 22L, RecordState.ARCHIVED.id, (short) 2), + new PersisterStateBatch(26L, 30L, RecordState.AVAILABLE.id, (short) 3))))))); + Mockito.when(persister.readState(Mockito.any())).thenReturn(CompletableFuture.completedFuture(readShareGroupStateResult)); + SharePartition sharePartition = SharePartitionBuilder.builder().withPersister(persister).build(); + + CompletableFuture<Void> result = sharePartition.maybeInitialize(); + assertTrue(result.isDone()); + assertFalse(result.isCompletedExceptionally()); + + assertEquals(SharePartitionState.ACTIVE, sharePartition.partitionState()); + assertEquals(3, sharePartition.cachedState().size()); + assertEquals(10, sharePartition.startOffset()); + assertEquals(30, sharePartition.endOffset()); + assertEquals(10, sharePartition.nextFetchOffset()); + + assertEquals(18, sharePartition.cachedState().get(15L).lastOffset()); + assertEquals(22, sharePartition.cachedState().get(20L).lastOffset()); + assertEquals(30, sharePartition.cachedState().get(26L).lastOffset()); + assertEquals(RecordState.AVAILABLE, sharePartition.cachedState().get(15L).batchState()); + assertEquals(RecordState.ARCHIVED, sharePartition.cachedState().get(20L).batchState()); + assertEquals(RecordState.AVAILABLE, sharePartition.cachedState().get(26L).batchState()); + assertNotNull(sharePartition.initialReadGapOffset()); + assertEquals(10L, sharePartition.initialReadGapOffset().gapStartOffset()); + assertEquals(30L, sharePartition.initialReadGapOffset().endOffset()); + + // Create a single batch record where first offset is prior startOffset. + MemoryRecords records = memoryRecords(16, 6); + // Set max fetch records to 500, records should be acquired till the last offset of the fetched batch. + List<AcquiredRecords> acquiredRecordsList = fetchAcquiredRecords(sharePartition.acquire( + MEMBER_ID, + BATCH_SIZE, + 500, + 10, + fetchPartitionData(records), + FETCH_ISOLATION_HWM), + 10); + + List<AcquiredRecords> expectedAcquiredRecords = new ArrayList<>(expectedAcquiredRecord(10, 14, 1)); + expectedAcquiredRecords.addAll(expectedAcquiredRecord(15, 18, 3)); + expectedAcquiredRecords.addAll(expectedAcquiredRecord(19, 19, 1)); + + assertArrayEquals(expectedAcquiredRecords.toArray(), acquiredRecordsList.toArray()); + assertEquals(23, sharePartition.nextFetchOffset()); + assertEquals(5, sharePartition.cachedState().size()); + assertEquals(10, sharePartition.cachedState().get(10L).firstOffset()); + assertEquals(14, sharePartition.cachedState().get(10L).lastOffset()); + assertEquals(19, sharePartition.cachedState().get(19L).firstOffset()); + assertEquals(19, sharePartition.cachedState().get(19L).lastOffset()); + + assertEquals(RecordState.ACQUIRED, sharePartition.cachedState().get(10L).batchState()); + assertEquals(RecordState.ACQUIRED, sharePartition.cachedState().get(15L).batchState()); + assertEquals(RecordState.ACQUIRED, sharePartition.cachedState().get(19L).batchState()); + assertEquals(RecordState.ARCHIVED, sharePartition.cachedState().get(20L).batchState()); + assertEquals(RecordState.AVAILABLE, sharePartition.cachedState().get(26L).batchState()); + assertEquals(30L, sharePartition.endOffset()); + assertNotNull(sharePartition.initialReadGapOffset()); + assertEquals(20L, sharePartition.initialReadGapOffset().gapStartOffset()); + } + + @Test + public void testMaybeInitializeAndAcquireWithMultipleBatches() { + Persister persister = Mockito.mock(Persister.class); + ReadShareGroupStateResult readShareGroupStateResult = Mockito.mock(ReadShareGroupStateResult.class); + Mockito.when(readShareGroupStateResult.topicsData()).thenReturn(List.of( + new TopicData<>(TOPIC_ID_PARTITION.topicId(), List.of( + PartitionFactory.newPartitionAllData(0, 3, 5L, Errors.NONE.code(), Errors.NONE.message(), + List.of( + new PersisterStateBatch(15L, 18L, RecordState.AVAILABLE.id, (short) 2), + new PersisterStateBatch(20L, 22L, RecordState.ARCHIVED.id, (short) 2), + new PersisterStateBatch(26L, 30L, RecordState.AVAILABLE.id, (short) 3))))))); + Mockito.when(persister.readState(Mockito.any())).thenReturn(CompletableFuture.completedFuture(readShareGroupStateResult)); + SharePartition sharePartition = SharePartitionBuilder.builder().withPersister(persister).build(); + + CompletableFuture<Void> result = sharePartition.maybeInitialize(); + assertTrue(result.isDone()); + assertFalse(result.isCompletedExceptionally()); + + assertEquals(SharePartitionState.ACTIVE, sharePartition.partitionState()); + assertEquals(3, sharePartition.cachedState().size()); + assertEquals(5, sharePartition.startOffset()); + assertEquals(30, sharePartition.endOffset()); + assertEquals(5, sharePartition.nextFetchOffset()); + + assertEquals(18, sharePartition.cachedState().get(15L).lastOffset()); + assertEquals(22, sharePartition.cachedState().get(20L).lastOffset()); + assertEquals(30, sharePartition.cachedState().get(26L).lastOffset()); + assertEquals(RecordState.AVAILABLE, sharePartition.cachedState().get(15L).batchState()); + assertEquals(RecordState.ARCHIVED, sharePartition.cachedState().get(20L).batchState()); + assertEquals(RecordState.AVAILABLE, sharePartition.cachedState().get(26L).batchState()); + assertNotNull(sharePartition.initialReadGapOffset()); + assertEquals(5L, sharePartition.initialReadGapOffset().gapStartOffset()); + assertEquals(30L, sharePartition.initialReadGapOffset().endOffset()); + + // Create multiple batch records that covers the entire range from 5 to 30 of initial read gap. + // The records in the batch are from 5 to 49. + ByteBuffer buffer = ByteBuffer.allocate(4096); + memoryRecordsBuilder(buffer, 2, 5).close(); + memoryRecordsBuilder(buffer, 1, 8).close(); + memoryRecordsBuilder(buffer, 2, 10).close(); + memoryRecordsBuilder(buffer, 6, 13).close(); + memoryRecordsBuilder(buffer, 3, 19).close(); + memoryRecordsBuilder(buffer, 9, 22).close(); + memoryRecordsBuilder(buffer, 19, 31).close(); + buffer.flip(); + MemoryRecords records = MemoryRecords.readableRecords(buffer); + // Set max fetch records to 1, records will be acquired till the first gap is encountered. + List<AcquiredRecords> acquiredRecordsList = fetchAcquiredRecords(sharePartition.acquire( + MEMBER_ID, + BATCH_SIZE, + 1, + 5L, + fetchPartitionData(records), + FETCH_ISOLATION_HWM), + 2); + + assertArrayEquals(expectedAcquiredRecord(5, 6, 1).toArray(), acquiredRecordsList.toArray()); + assertEquals(7, sharePartition.nextFetchOffset()); + assertEquals(4, sharePartition.cachedState().size()); + assertEquals(5, sharePartition.cachedState().get(5L).firstOffset()); + assertEquals(6, sharePartition.cachedState().get(5L).lastOffset()); + assertEquals(RecordState.ACQUIRED, sharePartition.cachedState().get(5L).batchState()); + assertEquals(1, sharePartition.cachedState().get(5L).batchDeliveryCount()); + assertNull(sharePartition.cachedState().get(5L).offsetState()); + assertEquals(RecordState.AVAILABLE, sharePartition.cachedState().get(15L).batchState()); + assertEquals(RecordState.ARCHIVED, sharePartition.cachedState().get(20L).batchState()); + assertEquals(RecordState.AVAILABLE, sharePartition.cachedState().get(26L).batchState()); + assertNotNull(sharePartition.initialReadGapOffset()); + assertEquals(7L, sharePartition.initialReadGapOffset().gapStartOffset()); + + // Remove first batch from the records as the fetch offset has moved forward to 7 offset. + List<RecordBatch> batch = TestUtils.toList(records.batches()); + records = records.slice(batch.get(0).sizeInBytes(), records.sizeInBytes() - batch.get(0).sizeInBytes()); + // Send the batch again to acquire the next set of records. + acquiredRecordsList = fetchAcquiredRecords(sharePartition.acquire( + MEMBER_ID, + BATCH_SIZE, + 3, + 7L, + fetchPartitionData(records), + FETCH_ISOLATION_HWM), + 4); + + assertArrayEquals(expectedAcquiredRecord(8, 11, 1).toArray(), acquiredRecordsList.toArray()); + assertEquals(12, sharePartition.nextFetchOffset()); + assertEquals(5, sharePartition.cachedState().size()); + assertEquals(8, sharePartition.cachedState().get(8L).firstOffset()); + assertEquals(11, sharePartition.cachedState().get(8L).lastOffset()); + assertEquals(RecordState.ACQUIRED, sharePartition.cachedState().get(8L).batchState()); + assertEquals(1, sharePartition.cachedState().get(8L).batchDeliveryCount()); + assertNull(sharePartition.cachedState().get(8L).offsetState()); + assertEquals(RecordState.AVAILABLE, sharePartition.cachedState().get(15L).batchState()); + assertEquals(RecordState.ARCHIVED, sharePartition.cachedState().get(20L).batchState()); + assertEquals(RecordState.AVAILABLE, sharePartition.cachedState().get(26L).batchState()); + assertEquals(30L, sharePartition.endOffset()); + assertNotNull(sharePartition.initialReadGapOffset()); + assertEquals(12L, sharePartition.initialReadGapOffset().gapStartOffset()); + + // Remove the next 2 batches from the records as the fetch offset has moved forward to 12 offset. + int size = batch.get(1).sizeInBytes() + batch.get(2).sizeInBytes(); + records = records.slice(size, records.sizeInBytes() - size); + // Send the records with 8 as max fetch records to acquire new and existing cached batches. + acquiredRecordsList = fetchAcquiredRecords(sharePartition.acquire( + MEMBER_ID, + BATCH_SIZE, + 8, + 12, + fetchPartitionData(records), + FETCH_ISOLATION_HWM), + 10); + + List<AcquiredRecords> expectedAcquiredRecords = new ArrayList<>(expectedAcquiredRecord(13, 14, 1)); + expectedAcquiredRecords.addAll(expectedAcquiredRecord(15, 18, 3)); + expectedAcquiredRecords.addAll(expectedAcquiredRecord(19, 19, 1)); + expectedAcquiredRecords.addAll(expectedAcquiredRecord(23, 25, 1)); + + assertArrayEquals(expectedAcquiredRecords.toArray(), acquiredRecordsList.toArray()); + assertEquals(26, sharePartition.nextFetchOffset()); + assertEquals(8, sharePartition.cachedState().size()); + assertEquals(13, sharePartition.cachedState().get(13L).firstOffset()); + assertEquals(14, sharePartition.cachedState().get(13L).lastOffset()); + assertEquals(RecordState.ACQUIRED, sharePartition.cachedState().get(13L).batchState()); + assertEquals(RecordState.ACQUIRED, sharePartition.cachedState().get(15L).batchState()); + assertEquals(19, sharePartition.cachedState().get(19L).firstOffset()); + assertEquals(19, sharePartition.cachedState().get(19L).lastOffset()); + assertEquals(RecordState.ACQUIRED, sharePartition.cachedState().get(19L).batchState()); + assertEquals(RecordState.ARCHIVED, sharePartition.cachedState().get(20L).batchState()); + assertEquals(23, sharePartition.cachedState().get(23L).firstOffset()); + assertEquals(25, sharePartition.cachedState().get(23L).lastOffset()); + assertEquals(RecordState.ACQUIRED, sharePartition.cachedState().get(23L).batchState()); + assertEquals(RecordState.AVAILABLE, sharePartition.cachedState().get(26L).batchState()); + assertEquals(30L, sharePartition.endOffset()); + assertNotNull(sharePartition.initialReadGapOffset()); + assertEquals(26L, sharePartition.initialReadGapOffset().gapStartOffset()); + + // Remove the next 2 batches from the records as the fetch offset has moved forward to 26 offset. + // Do not remove the 5th batch as it's only partially acquired. + size = batch.get(3).sizeInBytes() + batch.get(4).sizeInBytes(); + records = records.slice(size, records.sizeInBytes() - size); + // Send the records with 10 as max fetch records to acquire the existing and till end of the + // fetched data. + acquiredRecordsList = fetchAcquiredRecords(sharePartition.acquire( + MEMBER_ID, + BATCH_SIZE, + 10, + 26, + fetchPartitionData(records), + FETCH_ISOLATION_HWM), + 24); + + expectedAcquiredRecords = new ArrayList<>(expectedAcquiredRecord(26, 30, 4)); + expectedAcquiredRecords.addAll(expectedAcquiredRecord(31, 49, 1)); + + assertArrayEquals(expectedAcquiredRecords.toArray(), acquiredRecordsList.toArray()); + assertEquals(50, sharePartition.nextFetchOffset()); + assertEquals(9, sharePartition.cachedState().size()); + assertEquals(31, sharePartition.cachedState().get(31L).firstOffset()); + assertEquals(49, sharePartition.cachedState().get(31L).lastOffset()); + assertEquals(RecordState.ACQUIRED, sharePartition.cachedState().get(31L).batchState()); + assertEquals(RecordState.ACQUIRED, sharePartition.cachedState().get(26L).batchState()); + assertEquals(49L, sharePartition.endOffset()); + // As all the gaps are now filled, the initialReadGapOffset should be null. + assertNull(sharePartition.initialReadGapOffset()); + } + + @Test + public void testMaybeInitializeAndAcquireWithMultipleBatchesAndLastOffsetWithinCachedBatch() { + Persister persister = Mockito.mock(Persister.class); + ReadShareGroupStateResult readShareGroupStateResult = Mockito.mock(ReadShareGroupStateResult.class); + Mockito.when(readShareGroupStateResult.topicsData()).thenReturn(List.of( + new TopicData<>(TOPIC_ID_PARTITION.topicId(), List.of( + PartitionFactory.newPartitionAllData(0, 3, 5L, Errors.NONE.code(), Errors.NONE.message(), + List.of( + new PersisterStateBatch(15L, 18L, RecordState.AVAILABLE.id, (short) 2), + new PersisterStateBatch(20L, 22L, RecordState.ARCHIVED.id, (short) 2), + new PersisterStateBatch(26L, 30L, RecordState.AVAILABLE.id, (short) 3))))))); + Mockito.when(persister.readState(Mockito.any())).thenReturn(CompletableFuture.completedFuture(readShareGroupStateResult)); + SharePartition sharePartition = SharePartitionBuilder.builder().withPersister(persister).build(); + + CompletableFuture<Void> result = sharePartition.maybeInitialize(); + assertTrue(result.isDone()); + assertFalse(result.isCompletedExceptionally()); + + assertEquals(SharePartitionState.ACTIVE, sharePartition.partitionState()); + assertEquals(3, sharePartition.cachedState().size()); + assertEquals(5, sharePartition.startOffset()); + assertEquals(30, sharePartition.endOffset()); + assertEquals(5, sharePartition.nextFetchOffset()); + + assertEquals(18, sharePartition.cachedState().get(15L).lastOffset()); + assertEquals(22, sharePartition.cachedState().get(20L).lastOffset()); + assertEquals(30, sharePartition.cachedState().get(26L).lastOffset()); + assertEquals(RecordState.AVAILABLE, sharePartition.cachedState().get(15L).batchState()); + assertEquals(RecordState.ARCHIVED, sharePartition.cachedState().get(20L).batchState()); + assertEquals(RecordState.AVAILABLE, sharePartition.cachedState().get(26L).batchState()); + assertNotNull(sharePartition.initialReadGapOffset()); + assertEquals(5L, sharePartition.initialReadGapOffset().gapStartOffset()); + assertEquals(30L, sharePartition.initialReadGapOffset().endOffset()); + + // Create multiple batch records that ends in between the cached batch and the fetch offset is + // post startOffset. + ByteBuffer buffer = ByteBuffer.allocate(4096); + memoryRecordsBuilder(buffer, 2, 7).close(); + memoryRecordsBuilder(buffer, 2, 10).close(); + memoryRecordsBuilder(buffer, 6, 13).close(); + // Though 19 offset is a gap but still be acquired. + memoryRecordsBuilder(buffer, 8, 20).close(); + buffer.flip(); + MemoryRecords records = MemoryRecords.readableRecords(buffer); + // Set max fetch records to 500, records should be acquired till the last offset of the fetched batch. + List<AcquiredRecords> acquiredRecordsList = fetchAcquiredRecords(sharePartition.acquire( + MEMBER_ID, + BATCH_SIZE, + 500, + 5, + fetchPartitionData(records), + FETCH_ISOLATION_HWM), + 18); + + List<AcquiredRecords> expectedAcquiredRecords = new ArrayList<>(expectedAcquiredRecord(7, 14, 1)); + expectedAcquiredRecords.addAll(expectedAcquiredRecord(15, 18, 3)); + expectedAcquiredRecords.addAll(expectedAcquiredRecord(19, 19, 1)); + expectedAcquiredRecords.addAll(expectedAcquiredRecord(23, 25, 1)); + expectedAcquiredRecords.addAll(expectedAcquiredRecords(26, 27, 4)); + + assertArrayEquals(expectedAcquiredRecords.toArray(), acquiredRecordsList.toArray()); + assertEquals(28, sharePartition.nextFetchOffset()); + assertEquals(6, sharePartition.cachedState().size()); + assertEquals(7, sharePartition.cachedState().get(7L).firstOffset()); + assertEquals(14, sharePartition.cachedState().get(7L).lastOffset()); + assertEquals(19, sharePartition.cachedState().get(19L).firstOffset()); + assertEquals(19, sharePartition.cachedState().get(19L).lastOffset()); + assertEquals(23, sharePartition.cachedState().get(23L).firstOffset()); + assertEquals(25, sharePartition.cachedState().get(23L).lastOffset()); + + assertEquals(RecordState.ACQUIRED, sharePartition.cachedState().get(7L).batchState()); + assertEquals(RecordState.ACQUIRED, sharePartition.cachedState().get(15L).batchState()); + assertEquals(RecordState.ACQUIRED, sharePartition.cachedState().get(19L).batchState()); + assertEquals(RecordState.ARCHIVED, sharePartition.cachedState().get(20L).batchState()); + assertEquals(RecordState.ACQUIRED, sharePartition.cachedState().get(23L).batchState()); + assertThrows(IllegalStateException.class, () -> sharePartition.cachedState().get(26L).batchState()); + assertEquals(RecordState.ACQUIRED, sharePartition.cachedState().get(26L).offsetState().get(26L).state()); + assertEquals(RecordState.ACQUIRED, sharePartition.cachedState().get(26L).offsetState().get(27L).state()); + assertEquals(RecordState.AVAILABLE, sharePartition.cachedState().get(26L).offsetState().get(28L).state()); + assertEquals(RecordState.AVAILABLE, sharePartition.cachedState().get(26L).offsetState().get(29L).state()); + assertEquals(RecordState.AVAILABLE, sharePartition.cachedState().get(26L).offsetState().get(30L).state()); + assertEquals(30L, sharePartition.endOffset()); + assertNotNull(sharePartition.initialReadGapOffset()); + assertEquals(28L, sharePartition.initialReadGapOffset().gapStartOffset()); + } + + @Test + public void testMaybeInitializeAndAcquireWithMultipleBatchesPriorStartOffset() { + Persister persister = Mockito.mock(Persister.class); + ReadShareGroupStateResult readShareGroupStateResult = Mockito.mock(ReadShareGroupStateResult.class); + Mockito.when(readShareGroupStateResult.topicsData()).thenReturn(List.of( + new TopicData<>(TOPIC_ID_PARTITION.topicId(), List.of( + PartitionFactory.newPartitionAllData(0, 3, 10L, Errors.NONE.code(), Errors.NONE.message(), + List.of( + new PersisterStateBatch(15L, 18L, RecordState.AVAILABLE.id, (short) 2), + new PersisterStateBatch(20L, 22L, RecordState.ARCHIVED.id, (short) 2), + new PersisterStateBatch(26L, 30L, RecordState.AVAILABLE.id, (short) 3))))))); + Mockito.when(persister.readState(Mockito.any())).thenReturn(CompletableFuture.completedFuture(readShareGroupStateResult)); + SharePartition sharePartition = SharePartitionBuilder.builder().withPersister(persister).build(); + + CompletableFuture<Void> result = sharePartition.maybeInitialize(); + assertTrue(result.isDone()); + assertFalse(result.isCompletedExceptionally()); + + assertEquals(SharePartitionState.ACTIVE, sharePartition.partitionState()); + assertEquals(3, sharePartition.cachedState().size()); + assertEquals(10, sharePartition.startOffset()); + assertEquals(30, sharePartition.endOffset()); + assertEquals(10, sharePartition.nextFetchOffset()); + + assertEquals(18, sharePartition.cachedState().get(15L).lastOffset()); + assertEquals(22, sharePartition.cachedState().get(20L).lastOffset()); + assertEquals(30, sharePartition.cachedState().get(26L).lastOffset()); + assertEquals(RecordState.AVAILABLE, sharePartition.cachedState().get(15L).batchState()); + assertEquals(RecordState.ARCHIVED, sharePartition.cachedState().get(20L).batchState()); + assertEquals(RecordState.AVAILABLE, sharePartition.cachedState().get(26L).batchState()); + assertNotNull(sharePartition.initialReadGapOffset()); + assertEquals(10L, sharePartition.initialReadGapOffset().gapStartOffset()); + assertEquals(30L, sharePartition.initialReadGapOffset().endOffset()); + + // Create multiple batch records where multiple batches base offsets are prior startOffset. + ByteBuffer buffer = ByteBuffer.allocate(4096); + memoryRecordsBuilder(buffer, 2, 3).close(); + memoryRecordsBuilder(buffer, 1, 6).close(); + memoryRecordsBuilder(buffer, 4, 8).close(); + memoryRecordsBuilder(buffer, 10, 13).close(); + buffer.flip(); + MemoryRecords records = MemoryRecords.readableRecords(buffer); + // Set max fetch records to 500, records should be acquired till the last offset of the fetched batch. + List<AcquiredRecords> acquiredRecordsList = fetchAcquiredRecords(sharePartition.acquire( + MEMBER_ID, + BATCH_SIZE, + 500, + 10, + fetchPartitionData(records), + FETCH_ISOLATION_HWM), + 10); + + List<AcquiredRecords> expectedAcquiredRecords = new ArrayList<>(expectedAcquiredRecord(10, 14, 1)); + expectedAcquiredRecords.addAll(expectedAcquiredRecord(15, 18, 3)); + expectedAcquiredRecords.addAll(expectedAcquiredRecord(19, 19, 1)); + + assertArrayEquals(expectedAcquiredRecords.toArray(), acquiredRecordsList.toArray()); + assertEquals(23, sharePartition.nextFetchOffset()); + assertEquals(5, sharePartition.cachedState().size()); + assertEquals(10, sharePartition.cachedState().get(10L).firstOffset()); + assertEquals(14, sharePartition.cachedState().get(10L).lastOffset()); + assertEquals(19, sharePartition.cachedState().get(19L).firstOffset()); + assertEquals(19, sharePartition.cachedState().get(19L).lastOffset()); + + assertEquals(RecordState.ACQUIRED, sharePartition.cachedState().get(10L).batchState()); + assertEquals(RecordState.ACQUIRED, sharePartition.cachedState().get(15L).batchState()); + assertEquals(RecordState.ACQUIRED, sharePartition.cachedState().get(19L).batchState()); + assertEquals(RecordState.ARCHIVED, sharePartition.cachedState().get(20L).batchState()); + assertEquals(RecordState.AVAILABLE, sharePartition.cachedState().get(26L).batchState()); + assertEquals(30L, sharePartition.endOffset()); + assertNotNull(sharePartition.initialReadGapOffset()); + assertEquals(20L, sharePartition.initialReadGapOffset().gapStartOffset()); + } + @Test public void testAcquireSingleRecord() throws InterruptedException { SharePartition sharePartition = SharePartitionBuilder.builder() diff --git a/server/src/main/java/org/apache/kafka/server/share/acknowledge/ShareAcknowledgementBatch.java b/server/src/main/java/org/apache/kafka/server/share/acknowledge/ShareAcknowledgementBatch.java index bc29f37c62f..96efa5eb29e 100644 --- a/server/src/main/java/org/apache/kafka/server/share/acknowledge/ShareAcknowledgementBatch.java +++ b/server/src/main/java/org/apache/kafka/server/share/acknowledge/ShareAcknowledgementBatch.java @@ -24,6 +24,9 @@ import java.util.List; * The class abstracts the acknowledgement request for <code>SharePartition</code> class constructed * from {@link org.apache.kafka.common.message.ShareFetchRequestData.AcknowledgementBatch} and * {@link org.apache.kafka.common.message.ShareAcknowledgeRequestData.AcknowledgementBatch} classes. + * <p> + * Acknowledge types are represented as a list of bytes, where each byte corresponds to an acknowledge + * type defined in {@link org.apache.kafka.clients.consumer.AcknowledgeType}. */ public record ShareAcknowledgementBatch( long firstOffset,