This is an automated email from the ASF dual-hosted git repository.
mittal pushed a commit to branch 4.2
in repository https://gitbox.apache.org/repos/asf/kafka.git
The following commit(s) were added to refs/heads/4.2 by this push:
new cf579edd268 KAFKA-19954: Fixing overlapping batches post
re-initalization of Share Partition (#21051)
cf579edd268 is described below
commit cf579edd2680e0cc3031bc200be35c1490ab53d8
Author: Apoorv Mittal <[email protected]>
AuthorDate: Tue Dec 2 20:50:33 2025 +0000
KAFKA-19954: Fixing overlapping batches post re-initalization of Share
Partition (#21051)
The PR fixes the issue when SharePartition is re-initalized and starts
fetching prior to the start offset. In edge cases, there occurs a
scenario when `acquire` method creates overlapping batches in cache.
The issue is more evident in `record_limit` mode.
Consider a scenario where log has 0-99 offsets as batch and post
re-initialization of share partition the start offset becomes 5, first 5
records already acknowledged in previous share-partition instance. Prior
to the fix:
- The request arrives for fetching next 5 offsets hence the cache will
hold 5-99, with 5-9 offsets as acquired.
- Client acknowledes 5-9 offset, start offset moves to 10.
- Client re-fetches from offset 10 and gets same 0-99 batch.
- Acquire readjusts the base offset to 10 as start offset has moved.
- There won't be any overlapping batch in cache for 10-99 as the key in
cache is 5.
- Hence, 0-99 cache entry will be created
Post Fix:
- There will be an overlapping entry in the cache as start considering
the minimum of baseOffset and batch's firstOffset.
Also fixed other scenarios where due to startOffset move and no cache
overlap the offsets could be acquired prior to startOffset.
Reviewers: Andrew Schofield <[email protected]>, Abhinav Dixit
<[email protected]>
---
.../java/kafka/server/share/SharePartition.java | 27 ++-
.../kafka/server/share/SharePartitionTest.java | 210 +++++++++++++++++++++
2 files changed, 234 insertions(+), 3 deletions(-)
diff --git a/core/src/main/java/kafka/server/share/SharePartition.java
b/core/src/main/java/kafka/server/share/SharePartition.java
index 23a85611719..c6eb76129a6 100644
--- a/core/src/main/java/kafka/server/share/SharePartition.java
+++ b/core/src/main/java/kafka/server/share/SharePartition.java
@@ -736,6 +736,14 @@ public class SharePartition {
return ShareAcquiredRecords.empty();
}
+ // Though there shouldn't be any case where fetch batch is prior to
start offset, as fetch
+ // offset should have moved past start offset. However, check ensure
that no records are
+ // acquired prior to start offset.
+ if (lastBatch.lastOffset() < startOffset()) {
+ // Fetch batch is prior to start offset, nothing to acquire.
+ return ShareAcquiredRecords.empty();
+ }
+
LastOffsetAndMaxRecords lastOffsetAndMaxRecords =
lastOffsetAndMaxRecordsToAcquire(fetchOffset,
maxFetchRecords, lastBatch.lastOffset());
if (lastOffsetAndMaxRecords.maxRecords() <= 0) {
@@ -744,7 +752,7 @@ public class SharePartition {
// The lastOffsetAndMaxRecords contains the last offset to acquire and
the maximum number of records
// to acquire.
int maxRecordsToAcquire = lastOffsetAndMaxRecords.maxRecords();
- long lastOffsetToAcquire = lastOffsetAndMaxRecords.lastOffset();
+ final long lastOffsetToAcquire = lastOffsetAndMaxRecords.lastOffset();
// We require the first batch of records to get the base offset. Stop
parsing further
// batches.
@@ -788,7 +796,12 @@ public class SharePartition {
baseOffset = floorEntry.getKey();
}
// Validate if the fetch records are already part of existing
batches and if available.
- NavigableMap<Long, InFlightBatch> subMap =
cachedState.subMap(baseOffset, true, lastOffsetToAcquire, true);
+ // The sub map is used to find the overlapping batches in the
cache for the request batch.
+ // However, as baseOffset might have been adjusted above, which
could either move ahead
+ // to align with startOffset or moved back to align with floor
entry hence compute the
+ // min of first batch base offset and adjusted base offset.
+ final NavigableMap<Long, InFlightBatch> subMap =
cachedState.subMap(
+ Math.min(firstBatch.baseOffset(), baseOffset), true,
lastOffsetToAcquire, true);
// No overlap with request offsets in the cache for in-flight
records. Acquire the complete
// batch.
if (subMap.isEmpty()) {
@@ -796,8 +809,16 @@ public class SharePartition {
groupId, topicIdPartition);
// It's safe to use lastOffsetToAcquire instead of
lastBatch.lastOffset() because there is no
// overlap hence the lastOffsetToAcquire is same as
lastBatch.lastOffset() or before that.
+ // Also, the first offset to acquire should be baseOffset. The
baseOffset could be adjusted
+ // either prior to the fetch batch's base offset, in that case
there has to be a submap
+ // entry hence the current code path shall not be executed, or
the baseOffset is adjusted
+ // past the batch's base offset, to startOffset, in which case
acquire should honour the
+ // adjusted baseOffset. Consider persister returns 5 as
startOffset and a batch of 15-20
+ // in ARCHIVED state. The fetch returns 0-10 as first batch,
then the baseOffset
+ // is adjusted to 5, to the startOffset. As there is no cached
batch from 0-10, the
+ // submap will be empty and the first offset for batch for
acquire should be 5 not 0.
ShareAcquiredRecords shareAcquiredRecords =
acquireNewBatchRecords(memberId, fetchPartitionData.records.batches(),
isRecordLimitMode,
- firstBatch.baseOffset(), lastOffsetToAcquire, batchSize,
maxRecordsToAcquire);
+ baseOffset, lastOffsetToAcquire, batchSize,
maxRecordsToAcquire);
return
maybeFilterAbortedTransactionalAcquiredRecords(fetchPartitionData,
isolationLevel, shareAcquiredRecords);
}
diff --git a/core/src/test/java/kafka/server/share/SharePartitionTest.java
b/core/src/test/java/kafka/server/share/SharePartitionTest.java
index b8e2d1f81ad..f84f64afdb1 100644
--- a/core/src/test/java/kafka/server/share/SharePartitionTest.java
+++ b/core/src/test/java/kafka/server/share/SharePartitionTest.java
@@ -2520,6 +2520,109 @@ public class SharePartitionTest {
assertEquals(25, sharePartition.nextFetchOffset());
}
+ @Test
+ public void testAcquireBatchPriorToStartOffset() {
+ Persister persister = Mockito.mock(Persister.class);
+ ReadShareGroupStateResult readShareGroupStateResult =
Mockito.mock(ReadShareGroupStateResult.class);
+
Mockito.when(readShareGroupStateResult.topicsData()).thenReturn(List.of(
+ new TopicData<>(TOPIC_ID_PARTITION.topicId(), List.of(
+ PartitionFactory.newPartitionAllData(0, 3, 5L,
Errors.NONE.code(), Errors.NONE.message(),
+ List.of(
+ new PersisterStateBatch(5L, 99L,
RecordState.AVAILABLE.id, (short) 1)
+ ))))));
+
Mockito.when(persister.readState(Mockito.any())).thenReturn(CompletableFuture.completedFuture(readShareGroupStateResult));
+
+ SharePartition sharePartition = SharePartitionBuilder.builder()
+ .withPersister(persister)
+ .withSharePartitionMetrics(sharePartitionMetrics)
+ .build();
+ sharePartition.maybeInitialize();
+
+ // Validate the cached state after initialization.
+ assertEquals(5, sharePartition.nextFetchOffset());
+ assertEquals(5, sharePartition.startOffset());
+ assertEquals(1, sharePartition.cachedState().size());
+ assertEquals(5, sharePartition.cachedState().get(5L).firstOffset());
+ assertEquals(99, sharePartition.cachedState().get(5L).lastOffset());
+ assertEquals(RecordState.AVAILABLE,
sharePartition.cachedState().get(5L).batchState());
+
+ // Acquire offsets prior to start offset. Should not acquire any
records.
+ fetchAcquiredRecords(sharePartition.acquire(
+ MEMBER_ID,
+ ShareAcquireMode.RECORD_LIMIT,
+ 10,
+ 10,
+ DEFAULT_FETCH_OFFSET,
+ fetchPartitionData(memoryRecords(0, 5)),
+ FETCH_ISOLATION_HWM),
+ 0);
+
+ // Validate the cached state remains unchanged.
+ assertEquals(5, sharePartition.nextFetchOffset());
+ assertEquals(5, sharePartition.startOffset());
+ assertEquals(1, sharePartition.cachedState().size());
+ assertEquals(5, sharePartition.cachedState().get(5L).firstOffset());
+ assertEquals(99, sharePartition.cachedState().get(5L).lastOffset());
+ assertEquals(RecordState.AVAILABLE,
sharePartition.cachedState().get(5L).batchState());
+ }
+
+ /**
+ * Test validates the scenario where the partition has been re-initialized
and the first fetch batch
+ * has start offset in middle and no overlap with the batch returned from
the persister during initialization.
+ * In this case, the acquire logic should respect the start offset and not
allow acquiring records
+ * prior to the start offset.
+ */
+ @Test
+ public void
testAcquireBatchWithMovedStartOffsetAndNoOverlapWithCachedBatch() {
+ Persister persister = Mockito.mock(Persister.class);
+ ReadShareGroupStateResult readShareGroupStateResult =
Mockito.mock(ReadShareGroupStateResult.class);
+
Mockito.when(readShareGroupStateResult.topicsData()).thenReturn(List.of(
+ new TopicData<>(TOPIC_ID_PARTITION.topicId(), List.of(
+ PartitionFactory.newPartitionAllData(0, 3, 5L,
Errors.NONE.code(), Errors.NONE.message(),
+ List.of(
+ new PersisterStateBatch(15L, 20L,
RecordState.ARCHIVED.id, (short) 1)
+ ))))));
+
Mockito.when(persister.readState(Mockito.any())).thenReturn(CompletableFuture.completedFuture(readShareGroupStateResult));
+
+ SharePartition sharePartition = SharePartitionBuilder.builder()
+ .withPersister(persister)
+ .withSharePartitionMetrics(sharePartitionMetrics)
+ .build();
+ sharePartition.maybeInitialize();
+
+ // Validate the cached state after initialization.
+ assertEquals(5, sharePartition.nextFetchOffset());
+ assertEquals(5, sharePartition.startOffset());
+ assertEquals(1, sharePartition.cachedState().size());
+ assertEquals(15, sharePartition.cachedState().get(15L).firstOffset());
+ assertEquals(20, sharePartition.cachedState().get(15L).lastOffset());
+ assertEquals(RecordState.ARCHIVED,
sharePartition.cachedState().get(15L).batchState());
+ // As there is a gap between 5-14 offsets, gap window should be
created.
+ assertNotNull(sharePartition.persisterReadResultGapWindow());
+ assertEquals(5,
sharePartition.persisterReadResultGapWindow().gapStartOffset());
+
+ // Acquire offsets starting prior to start offset and going beyond it.
Only offsets from 5-9 should
+ // be acquired.
+ List<AcquiredRecords> acquiredRecordsList =
fetchAcquiredRecords(sharePartition.acquire(
+ MEMBER_ID,
+ ShareAcquireMode.RECORD_LIMIT,
+ 10,
+ 10,
+ 5L,
+ fetchPartitionData(memoryRecords(0, 10)),
+ FETCH_ISOLATION_HWM),
+ 5);
+
+ assertArrayEquals(expectedAcquiredRecord(5, 9, 1).toArray(),
acquiredRecordsList.toArray());
+ assertEquals(2, sharePartition.cachedState().size());
+ assertEquals(10, sharePartition.nextFetchOffset());
+ assertEquals(5L, sharePartition.startOffset());
+ assertEquals(5, sharePartition.cachedState().get(5L).firstOffset());
+ assertEquals(9, sharePartition.cachedState().get(5L).lastOffset());
+ assertNotNull(sharePartition.persisterReadResultGapWindow());
+ assertEquals(10,
sharePartition.persisterReadResultGapWindow().gapStartOffset());
+ }
+
@Test
public void testNextFetchOffsetInitialState() {
SharePartition sharePartition =
SharePartitionBuilder.builder().withState(SharePartitionState.ACTIVE).build();
@@ -10635,6 +10738,113 @@ public class SharePartitionTest {
assertEquals(5,
sharePartitionMetrics.inFlightBatchMessageCount().max());
}
+ /**
+ * Test validates the scenario where the partition has been re-initialized
with a moved start offset.
+ * In this case, the acquire logic should respect the new start offset and
not allow acquiring
+ * records before the new start offset. For the test, simulate a scenario
where the log batch is 0-99
+ * offsets, but post re-initialization, the start offset is moved to 5
hence share partition should
+ * have 5-99 offsets in AVAILABLE state in the cache. Post acquire and
acknowledge of 5-14 offsets,
+ * the start offset is moved to 15. Next acquire on the same log batch
should only allow acquiring
+ * offsets from 15 offset and should not create any other entries in the
cache.
+ */
+ @Test
+ public void testAcquireBatchInRecordLimitModeWithMovedStartOffset() {
+ Persister persister = Mockito.mock(Persister.class);
+ ReadShareGroupStateResult readShareGroupStateResult =
Mockito.mock(ReadShareGroupStateResult.class);
+
Mockito.when(readShareGroupStateResult.topicsData()).thenReturn(List.of(
+ new TopicData<>(TOPIC_ID_PARTITION.topicId(), List.of(
+ PartitionFactory.newPartitionAllData(0, 3, 5L,
Errors.NONE.code(), Errors.NONE.message(),
+ List.of(
+ new PersisterStateBatch(5L, 99L,
RecordState.AVAILABLE.id, (short) 1)
+ ))))));
+
Mockito.when(persister.readState(Mockito.any())).thenReturn(CompletableFuture.completedFuture(readShareGroupStateResult));
+
+ SharePartition sharePartition = SharePartitionBuilder.builder()
+ .withPersister(persister)
+ .withSharePartitionMetrics(sharePartitionMetrics)
+ .build();
+ sharePartition.maybeInitialize();
+
+ // Validate the cached state after initialization.
+ assertEquals(5, sharePartition.nextFetchOffset());
+ assertEquals(5, sharePartition.startOffset());
+ assertEquals(1, sharePartition.cachedState().size());
+ assertEquals(5, sharePartition.cachedState().get(5L).firstOffset());
+ assertEquals(99, sharePartition.cachedState().get(5L).lastOffset());
+ assertEquals(RecordState.AVAILABLE,
sharePartition.cachedState().get(5L).batchState());
+
+ // Send the batch of 0-99 offsets for acquire, only 5-14 should be
acquired. Though the start
+ // offset for the share partition is 5 and the log batch base offset
is 0, but acquire should
+ // respect the share partition start offset.
+ List<AcquiredRecords> acquiredRecordsList =
fetchAcquiredRecords(sharePartition.acquire(
+ MEMBER_ID,
+ ShareAcquireMode.RECORD_LIMIT,
+ 10,
+ 10,
+ 5L,
+ fetchPartitionData(memoryRecords(0, 100)),
+ FETCH_ISOLATION_HWM),
+ 10);
+
+ assertArrayEquals(expectedAcquiredRecords(5, 14, 2).toArray(),
acquiredRecordsList.toArray());
+
+ assertEquals(15, sharePartition.nextFetchOffset());
+ assertEquals(5L, sharePartition.startOffset());
+ assertEquals(1, sharePartition.cachedState().size());
+ assertEquals(5, sharePartition.cachedState().get(5L).firstOffset());
+ assertEquals(99, sharePartition.cachedState().get(5L).lastOffset());
+
+ // Offset state should be maintained since partial offsets in the
batch are acquired.
+ assertThrows(IllegalStateException.class, () ->
sharePartition.cachedState().get(5L).batchState());
+ assertEquals(RecordState.ACQUIRED,
sharePartition.cachedState().get(5L).offsetState().get(5L).state());
+ assertEquals(RecordState.ACQUIRED,
sharePartition.cachedState().get(5L).offsetState().get(14L).state());
+ assertEquals(RecordState.AVAILABLE,
sharePartition.cachedState().get(5L).offsetState().get(15L).state());
+
+ // Acknowledge the acquired offsets 5-14 so the start offset moves to
15.
+ WriteShareGroupStateResult writeShareGroupStateResult =
Mockito.mock(WriteShareGroupStateResult.class);
+
Mockito.when(writeShareGroupStateResult.topicsData()).thenReturn(List.of(
+ new TopicData<>(TOPIC_ID_PARTITION.topicId(), List.of(
+ PartitionFactory.newPartitionErrorData(0, Errors.NONE.code(),
Errors.NONE.message())))));
+
Mockito.when(persister.writeState(Mockito.any())).thenReturn(CompletableFuture.completedFuture(writeShareGroupStateResult));
+ sharePartition.acknowledge(MEMBER_ID, List.of(
+ new ShareAcknowledgementBatch(5, 14,
List.of(AcknowledgeType.ACCEPT.id))
+ ));
+
+ assertEquals(15, sharePartition.nextFetchOffset());
+ assertEquals(15L, sharePartition.startOffset());
+ assertEquals(1, sharePartition.cachedState().size());
+ assertEquals(5, sharePartition.cachedState().get(5L).firstOffset());
+ assertEquals(99, sharePartition.cachedState().get(5L).lastOffset());
+
+ assertEquals(RecordState.ACKNOWLEDGED,
sharePartition.cachedState().get(5L).offsetState().get(5L).state());
+ assertEquals(RecordState.ACKNOWLEDGED,
sharePartition.cachedState().get(5L).offsetState().get(14L).state());
+ assertEquals(RecordState.AVAILABLE,
sharePartition.cachedState().get(5L).offsetState().get(15L).state());
+
+ // Re-acquire on the same log batch of 0-99 offsets, only 15-24 should
be acquired.
+ acquiredRecordsList = fetchAcquiredRecords(sharePartition.acquire(
+ MEMBER_ID,
+ ShareAcquireMode.RECORD_LIMIT,
+ 10,
+ 10,
+ 15,
+ fetchPartitionData(memoryRecords(0, 100)),
+ FETCH_ISOLATION_HWM),
+ 10);
+
+ assertArrayEquals(expectedAcquiredRecords(15, 24, 2).toArray(),
acquiredRecordsList.toArray());
+ assertEquals(25, sharePartition.nextFetchOffset());
+ assertEquals(15L, sharePartition.startOffset());
+ assertEquals(1, sharePartition.cachedState().size());
+ assertEquals(5, sharePartition.cachedState().get(5L).firstOffset());
+ assertEquals(99, sharePartition.cachedState().get(5L).lastOffset());
+
+ assertEquals(RecordState.ACKNOWLEDGED,
sharePartition.cachedState().get(5L).offsetState().get(5L).state());
+ assertEquals(RecordState.ACKNOWLEDGED,
sharePartition.cachedState().get(5L).offsetState().get(14L).state());
+ assertEquals(RecordState.ACQUIRED,
sharePartition.cachedState().get(5L).offsetState().get(15L).state());
+ assertEquals(RecordState.ACQUIRED,
sharePartition.cachedState().get(5L).offsetState().get(24L).state());
+ assertEquals(RecordState.AVAILABLE,
sharePartition.cachedState().get(5L).offsetState().get(25L).state());
+ }
+
@Test
public void testAcknowledgeInRecordLimitMode() {
Persister persister = Mockito.mock(Persister.class);