This is an automated email from the ASF dual-hosted git repository.
schofielaj pushed a commit to branch trunk
in repository https://gitbox.apache.org/repos/asf/kafka.git
The following commit(s) were added to refs/heads/trunk by this push:
new 01587d09d82 KAFKA-18494-3: solution for the bug relating to gaps in
the share partition cachedStates post initialization (#18696)
01587d09d82 is described below
commit 01587d09d82861144f57ba87067c01e72329a127
Author: Chirag Wadhwa <[email protected]>
AuthorDate: Wed Feb 5 20:46:25 2025 +0530
KAFKA-18494-3: solution for the bug relating to gaps in the share partition
cachedStates post initialization (#18696)
Reviewers: Apoorv Mittal <[email protected]>, Abhinav Dixit
<[email protected]>, Andrew Schofield <[email protected]>
---
.../java/kafka/server/share/SharePartition.java | 161 ++-
.../kafka/server/share/SharePartitionTest.java | 1350 +++++++++++++++++---
2 files changed, 1352 insertions(+), 159 deletions(-)
diff --git a/core/src/main/java/kafka/server/share/SharePartition.java
b/core/src/main/java/kafka/server/share/SharePartition.java
index 65dd7374720..2ba8574f785 100644
--- a/core/src/main/java/kafka/server/share/SharePartition.java
+++ b/core/src/main/java/kafka/server/share/SharePartition.java
@@ -287,6 +287,12 @@ public class SharePartition {
*/
private long endOffset;
+ /**
+ * The initial read gap offset tracks if there are any gaps in the
in-flight batch during intial
+ * read of the share partition state from the persister.
+ */
+ private InitialReadGapOffset initialReadGapOffset;
+
/**
* We maintain the latest fetch offset and its metadata to estimate the
minBytes requirement more efficiently.
*/
@@ -444,6 +450,11 @@ public class SharePartition {
stateEpoch = partitionData.stateEpoch();
List<PersisterStateBatch> stateBatches =
partitionData.stateBatches();
+ long gapStartOffset = -1;
+ // The previousBatchLastOffset is used to track the last
offset of the previous batch.
+ // For the first batch that should ideally start from
startOffset if there are no gaps,
+ // we assume the previousBatchLastOffset to be startOffset - 1.
+ long previousBatchLastOffset = startOffset - 1;
for (PersisterStateBatch stateBatch : stateBatches) {
if (stateBatch.firstOffset() < startOffset) {
log.error("Invalid state batch found for the share
partition: {}-{}. The base offset: {}"
@@ -452,6 +463,10 @@ public class SharePartition {
throwable = new
IllegalStateException(String.format("Failed to initialize the share partition
%s-%s", groupId, topicIdPartition));
return;
}
+ if (gapStartOffset == -1 && stateBatch.firstOffset() >
previousBatchLastOffset + 1) {
+ gapStartOffset = previousBatchLastOffset + 1;
+ }
+ previousBatchLastOffset = stateBatch.lastOffset();
InFlightBatch inFlightBatch = new
InFlightBatch(EMPTY_MEMBER_ID, stateBatch.firstOffset(),
stateBatch.lastOffset(),
RecordState.forId(stateBatch.deliveryState()), stateBatch.deliveryCount(),
null);
cachedState.put(stateBatch.firstOffset(), inFlightBatch);
@@ -462,6 +477,10 @@ public class SharePartition {
// in the cached state are not missed
findNextFetchOffset.set(true);
endOffset =
cachedState.lastEntry().getValue().lastOffset();
+ // initialReadGapOffset is not required, if there are no
gaps in the read state response
+ if (gapStartOffset != -1) {
+ initialReadGapOffset = new
InitialReadGapOffset(endOffset, gapStartOffset);
+ }
// In case the persister read state RPC result contains no
AVAILABLE records, we can update cached state
// and start/end offsets.
maybeUpdateCachedStateAndOffsets();
@@ -538,8 +557,20 @@ public class SharePartition {
}
long nextFetchOffset = -1;
-
+ long gapStartOffset = isInitialReadGapOffsetWindowActive() ?
initialReadGapOffset.gapStartOffset() : -1;
for (Map.Entry<Long, InFlightBatch> entry :
cachedState.entrySet()) {
+ // Check if there exists any gap in the in-flight batch which
needs to be fetched. If
+ // initialReadGapOffset's endOffset is equal to the share
partition's endOffset, then
+ // only the initial gaps should be considered. Once share
partition's endOffset is past
+ // initial read end offset then all gaps are anyway fetched.
+ if (isInitialReadGapOffsetWindowActive()) {
+ if (entry.getKey() > gapStartOffset) {
+ nextFetchOffset = gapStartOffset;
+ break;
+ }
+ gapStartOffset = entry.getValue().lastOffset() + 1;
+ }
+
// Check if the state is maintained per offset or batch. If
the offsetState
// is not maintained then the batch state is used to determine
the offsets state.
if (entry.getValue().offsetState() == null) {
@@ -638,6 +669,9 @@ public class SharePartition {
List<AcquiredRecords> result = new ArrayList<>();
// The acquired count is used to track the number of records
acquired for the request.
int acquiredCount = 0;
+ // This tracks whether there is a gap between the subMap entries.
If a gap is found, we will acquire
+ // the corresponding offsets in a separate batch.
+ long maybeGapStartOffset = baseOffset;
// The fetched records are already part of the in-flight records.
The records might
// be available for re-delivery hence try acquiring same. The
request batches could
// be an exact match, subset or span over multiple already fetched
batches.
@@ -648,6 +682,26 @@ public class SharePartition {
}
InFlightBatch inFlightBatch = entry.getValue();
+ // If the initialReadGapOffset window is active, we need to
treat the gaps in between the window as
+ // acquirable. Once the window is inactive (when we have
acquired all the gaps inside the window),
+ // the remaining gaps are natural (data does not exist at
those offsets) and we need not acquire them.
+ if (isInitialReadGapOffsetWindowActive()) {
+ // If nextBatchStartOffset is less than the key of the
entry, this means the fetch happened for a gap in the cachedState.
+ // Thus, a new batch needs to be acquired for the gap.
+ if (maybeGapStartOffset < entry.getKey()) {
+ ShareAcquiredRecords shareAcquiredRecords =
acquireNewBatchRecords(memberId, fetchPartitionData.records.batches(),
+ maybeGapStartOffset, entry.getKey() - 1,
batchSize, maxFetchRecords);
+ result.addAll(shareAcquiredRecords.acquiredRecords());
+ acquiredCount += shareAcquiredRecords.count();
+ }
+ // Set nextBatchStartOffset as the last offset of the
current in-flight batch + 1
+ maybeGapStartOffset = inFlightBatch.lastOffset() + 1;
+ // If the acquired count is equal to the max fetch records
then break the loop.
+ if (acquiredCount >= maxFetchRecords) {
+ break;
+ }
+ }
+
// Compute if the batch is a full match.
boolean fullMatch = checkForFullMatch(inFlightBatch,
firstBatch.baseOffset(), lastBatch.lastOffset());
@@ -715,6 +769,9 @@ public class SharePartition {
result.addAll(shareAcquiredRecords.acquiredRecords());
acquiredCount += shareAcquiredRecords.count();
}
+ if (!result.isEmpty()) {
+ maybeUpdateReadGapFetchOffset(result.get(result.size() -
1).lastOffset() + 1);
+ }
return new ShareAcquiredRecords(result, acquiredCount);
} finally {
lock.writeLock().unlock();
@@ -1177,6 +1234,24 @@ public class SharePartition {
};
}
+ // Method to reduce the window that tracks gaps in the cachedState
+ private void maybeUpdateReadGapFetchOffset(long offset) {
+ lock.writeLock().lock();
+ try {
+ if (initialReadGapOffset != null) {
+ if (initialReadGapOffset.endOffset() == endOffset) {
+ initialReadGapOffset.gapStartOffset(offset);
+ } else {
+ // The initial read gap offset is not valid anymore as the
end offset has moved
+ // beyond the initial read gap offset. Hence, reset the
initial read gap offset.
+ initialReadGapOffset = null;
+ }
+ }
+ } finally {
+ lock.writeLock().unlock();
+ }
+ }
+
private ShareAcquiredRecords acquireNewBatchRecords(
String memberId,
Iterable<? extends RecordBatch> batches,
@@ -1212,7 +1287,15 @@ public class SharePartition {
if (cachedState.firstKey() == firstAcquiredOffset) {
startOffset = firstAcquiredOffset;
}
- endOffset = lastAcquiredOffset;
+
+ // If the new batch acquired is part of a gap in the cachedState,
then endOffset should not be updated.
+ // Ex. if startOffset is 10, endOffset is 30, there is a gap from
10 to 20, and an inFlight batch from 21 to 30.
+ // In this case, the nextFetchOffset results in 10 and the records
are fetched. A new batch is acquired from
+ // 10 to 20, but the endOffset remains at 30.
+ if (lastAcquiredOffset > endOffset) {
+ endOffset = lastAcquiredOffset;
+ }
+ maybeUpdateReadGapFetchOffset(lastAcquiredOffset + 1);
return new ShareAcquiredRecords(acquiredRecords, (int)
(lastAcquiredOffset - firstAcquiredOffset + 1));
} finally {
lock.writeLock().unlock();
@@ -1791,10 +1874,23 @@ public class SharePartition {
long firstKeyToRemove = cachedState.firstKey();
long lastKeyToRemove;
NavigableMap.Entry<Long, InFlightBatch> entry =
cachedState.floorEntry(lastOffsetAcknowledged);
+ // If the lastOffsetAcknowledged is equal to the last offset of
entry, then the entire batch can potentially be removed.
if (lastOffsetAcknowledged == entry.getValue().lastOffset()) {
startOffset = cachedState.higherKey(lastOffsetAcknowledged);
+ if (isInitialReadGapOffsetWindowActive()) {
+ // This case will arise if we have a situation where there
is an acquirable gap after the lastOffsetAcknowledged.
+ // Ex, the cachedState has following state batches -> {(0,
10), (11, 20), (31,40)} and all these batches are acked.
+ // There is a gap from 21 to 30. Let the
initialReadGapOffset.gapStartOffset be 21. In this case,
+ // lastOffsetAcknowledged will be 20, but we cannot simply
move the start offset to the first offset
+ // of next cachedState batch (next cachedState batch is 31
to 40). There is an acquirable gap in between (21 to 30)
+ // and The startOffset should be at 21. Hence, we set
startOffset to the minimum of initialReadGapOffset.gapStartOffset
+ // and higher key of lastOffsetAcknowledged
+ startOffset =
Math.min(initialReadGapOffset.gapStartOffset(), startOffset);
+ }
lastKeyToRemove = entry.getKey();
} else {
+ // The code will reach this point only if
lastOffsetAcknowledged is in the middle of some stateBatch. In this case
+ // we can simply move the startOffset to the next offset of
lastOffsetAcknowledged and should consider any read gap offsets.
startOffset = lastOffsetAcknowledged + 1;
if (entry.getKey().equals(cachedState.firstKey())) {
// If the first batch in cachedState has some records yet
to be acknowledged,
@@ -1825,8 +1921,17 @@ public class SharePartition {
NavigableMap.Entry<Long, InFlightBatch> entry =
cachedState.floorEntry(startOffset);
if (entry == null) {
- log.error("The start offset: {} is not found in the cached state
for share partition: {}-{}."
- + " Cannot move the start offset.", startOffset, groupId,
topicIdPartition);
+ // The start offset is not found in the cached state when there is
a gap starting at the start offset.
+ // For example, if the start offset is 10 and the cached state has
batches -> { (21, 30), (31, 40) }.
+ // This case arises only when the share partition is initialized
and the read state response results in
+ // state batches containing gaps. This situation is possible in
the case where in the previous instance
+ // of this share partition, the gap offsets were fetched but not
acknowledged, and the next batch of offsets
+ // were fetched as well as acknowledged. In the above example,
possibly in the previous instance of the share
+ // partition, the batch 10-20 was fetched but not acknowledged and
the batch 21-30 was fetched and acknowledged.
+ // Thus, the persister has no clue about what happened with the
batch 10-20. During the re-initialization of
+ // the share partition, the start offset is set to 10 and the
cached state has the batch 21-30, resulting in a gap.
+ log.debug("The start offset: {} is not found in the cached state
for share partition: {}-{} " +
+ "as there is an acquirable gap at the beginning. Cannot move
the start offset.", startOffset, groupId, topicIdPartition);
return false;
}
RecordState startOffsetState = entry.getValue().offsetState == null ?
@@ -1835,6 +1940,10 @@ public class SharePartition {
return isRecordStateAcknowledged(startOffsetState);
}
+ private boolean isInitialReadGapOffsetWindowActive() {
+ return initialReadGapOffset != null &&
initialReadGapOffset.endOffset() == endOffset;
+ }
+
/**
* The record state is considered acknowledged if it is either
acknowledged or archived.
* These are terminal states for the record.
@@ -1847,12 +1956,18 @@ public class SharePartition {
return recordState == RecordState.ACKNOWLEDGED || recordState ==
RecordState.ARCHIVED;
}
- private long findLastOffsetAcknowledged() {
- lock.readLock().lock();
+ // Visible for testing
+ long findLastOffsetAcknowledged() {
long lastOffsetAcknowledged = -1;
+ lock.readLock().lock();
try {
for (NavigableMap.Entry<Long, InFlightBatch> entry :
cachedState.entrySet()) {
InFlightBatch inFlightBatch = entry.getValue();
+
+ if (isInitialReadGapOffsetWindowActive() &&
inFlightBatch.lastOffset() >= initialReadGapOffset.gapStartOffset()) {
+ return lastOffsetAcknowledged;
+ }
+
if (inFlightBatch.offsetState() == null) {
if
(!isRecordStateAcknowledged(inFlightBatch.batchState())) {
return lastOffsetAcknowledged;
@@ -2205,6 +2320,40 @@ public class SharePartition {
return timer;
}
+ // Visible for testing
+ InitialReadGapOffset initialReadGapOffset() {
+ return initialReadGapOffset;
+ }
+
+ /**
+ * The InitialReadGapOffset class is used to record the gap start and end
offset of the probable gaps
+ * of available records which are neither known to Persister nor to
SharePartition. Share Partition
+ * will use this information to determine the next fetch offset and should
try to fetch the records
+ * in the gap.
+ */
+ // Visible for Testing
+ static class InitialReadGapOffset {
+ private final long endOffset;
+ private long gapStartOffset;
+
+ InitialReadGapOffset(long endOffset, long gapStartOffset) {
+ this.endOffset = endOffset;
+ this.gapStartOffset = gapStartOffset;
+ }
+
+ long endOffset() {
+ return endOffset;
+ }
+
+ long gapStartOffset() {
+ return gapStartOffset;
+ }
+
+ void gapStartOffset(long gapStartOffset) {
+ this.gapStartOffset = gapStartOffset;
+ }
+ }
+
// Visible for testing
final class AcquisitionLockTimerTask extends TimerTask {
private final long expirationMs;
diff --git a/core/src/test/java/kafka/server/share/SharePartitionTest.java
b/core/src/test/java/kafka/server/share/SharePartitionTest.java
index 54cfa8cbd80..f4a04b3bb42 100644
--- a/core/src/test/java/kafka/server/share/SharePartitionTest.java
+++ b/core/src/test/java/kafka/server/share/SharePartitionTest.java
@@ -896,6 +896,167 @@ public class SharePartitionTest {
assertThrows(RuntimeException.class, sharePartition2::maybeInitialize);
}
+ @Test
+ public void testMaybeInitializeStateBatchesWithGapAtBeginning() {
+ Persister persister = Mockito.mock(Persister.class);
+ ReadShareGroupStateResult readShareGroupStateResult =
Mockito.mock(ReadShareGroupStateResult.class);
+
Mockito.when(readShareGroupStateResult.topicsData()).thenReturn(List.of(
+ new TopicData<>(TOPIC_ID_PARTITION.topicId(), List.of(
+ PartitionFactory.newPartitionAllData(0, 3, 10L,
Errors.NONE.code(), Errors.NONE.message(),
+ Arrays.asList(
+ new PersisterStateBatch(15L, 20L,
RecordState.ACKNOWLEDGED.id, (short) 2), // There is a gap from 10 to 14
+ new PersisterStateBatch(21L, 30L,
RecordState.ARCHIVED.id, (short) 3)))))));
+
Mockito.when(persister.readState(Mockito.any())).thenReturn(CompletableFuture.completedFuture(readShareGroupStateResult));
+ SharePartition sharePartition =
SharePartitionBuilder.builder().withPersister(persister).build();
+
+ CompletableFuture<Void> result = sharePartition.maybeInitialize();
+ assertTrue(result.isDone());
+ assertFalse(result.isCompletedExceptionally());
+
+ assertEquals(SharePartitionState.ACTIVE,
sharePartition.partitionState());
+ assertFalse(sharePartition.cachedState().isEmpty());
+ assertEquals(10, sharePartition.startOffset());
+ assertEquals(30, sharePartition.endOffset());
+ assertEquals(3, sharePartition.stateEpoch());
+ assertEquals(10, sharePartition.nextFetchOffset());
+
+ assertEquals(2, sharePartition.cachedState().size());
+ assertNotNull(sharePartition.cachedState().get(15L));
+ assertNotNull(sharePartition.cachedState().get(21L));
+
+ assertEquals(20, sharePartition.cachedState().get(15L).lastOffset());
+ assertEquals(RecordState.ACKNOWLEDGED,
sharePartition.cachedState().get(15L).batchState());
+ assertEquals(2,
sharePartition.cachedState().get(15L).batchDeliveryCount());
+ assertNull(sharePartition.cachedState().get(15L).offsetState());
+
+ assertEquals(30, sharePartition.cachedState().get(21L).lastOffset());
+ assertEquals(RecordState.ARCHIVED,
sharePartition.cachedState().get(21L).batchState());
+ assertEquals(3,
sharePartition.cachedState().get(21L).batchDeliveryCount());
+ assertNull(sharePartition.cachedState().get(21L).offsetState());
+
+ SharePartition.InitialReadGapOffset initialReadGapOffset =
sharePartition.initialReadGapOffset();
+ assertNotNull(initialReadGapOffset);
+
+ assertEquals(10, initialReadGapOffset.gapStartOffset());
+ assertEquals(30, initialReadGapOffset.endOffset());
+ }
+
+ @Test
+ public void testMaybeInitializeStateBatchesWithMultipleGaps() {
+ Persister persister = Mockito.mock(Persister.class);
+ ReadShareGroupStateResult readShareGroupStateResult =
Mockito.mock(ReadShareGroupStateResult.class);
+
Mockito.when(readShareGroupStateResult.topicsData()).thenReturn(List.of(
+ new TopicData<>(TOPIC_ID_PARTITION.topicId(), List.of(
+ PartitionFactory.newPartitionAllData(0, 3, 10L,
Errors.NONE.code(), Errors.NONE.message(),
+ Arrays.asList(
+ new PersisterStateBatch(15L, 20L,
RecordState.ACKNOWLEDGED.id, (short) 2), // There is a gap from 10 to 14
+ new PersisterStateBatch(30L, 40L,
RecordState.ARCHIVED.id, (short) 3))))))); // There is a gap from 21 to 29
+
Mockito.when(persister.readState(Mockito.any())).thenReturn(CompletableFuture.completedFuture(readShareGroupStateResult));
+ SharePartition sharePartition =
SharePartitionBuilder.builder().withPersister(persister).build();
+
+ CompletableFuture<Void> result = sharePartition.maybeInitialize();
+ assertTrue(result.isDone());
+ assertFalse(result.isCompletedExceptionally());
+
+ assertEquals(SharePartitionState.ACTIVE,
sharePartition.partitionState());
+ assertFalse(sharePartition.cachedState().isEmpty());
+ assertEquals(10, sharePartition.startOffset());
+ assertEquals(40, sharePartition.endOffset());
+ assertEquals(3, sharePartition.stateEpoch());
+ assertEquals(10, sharePartition.nextFetchOffset());
+
+ assertEquals(2, sharePartition.cachedState().size());
+ assertNotNull(sharePartition.cachedState().get(15L));
+ assertNotNull(sharePartition.cachedState().get(30L));
+
+ assertEquals(20, sharePartition.cachedState().get(15L).lastOffset());
+ assertEquals(RecordState.ACKNOWLEDGED,
sharePartition.cachedState().get(15L).batchState());
+ assertEquals(2,
sharePartition.cachedState().get(15L).batchDeliveryCount());
+ assertNull(sharePartition.cachedState().get(15L).offsetState());
+
+ assertEquals(40, sharePartition.cachedState().get(30L).lastOffset());
+ assertEquals(RecordState.ARCHIVED,
sharePartition.cachedState().get(30L).batchState());
+ assertEquals(3,
sharePartition.cachedState().get(30L).batchDeliveryCount());
+ assertNull(sharePartition.cachedState().get(30L).offsetState());
+
+ SharePartition.InitialReadGapOffset initialReadGapOffset =
sharePartition.initialReadGapOffset();
+ assertNotNull(initialReadGapOffset);
+
+ assertEquals(10, initialReadGapOffset.gapStartOffset());
+ assertEquals(40, initialReadGapOffset.endOffset());
+ }
+
+ @Test
+ public void testMaybeInitializeStateBatchesWithGapNotAtBeginning() {
+ Persister persister = Mockito.mock(Persister.class);
+ ReadShareGroupStateResult readShareGroupStateResult =
Mockito.mock(ReadShareGroupStateResult.class);
+
Mockito.when(readShareGroupStateResult.topicsData()).thenReturn(List.of(
+ new TopicData<>(TOPIC_ID_PARTITION.topicId(), List.of(
+ PartitionFactory.newPartitionAllData(0, 3, 15L,
Errors.NONE.code(), Errors.NONE.message(),
+ Arrays.asList(
+ new PersisterStateBatch(15L, 20L,
RecordState.ACKNOWLEDGED.id, (short) 2),
+ new PersisterStateBatch(30L, 40L,
RecordState.ARCHIVED.id, (short) 3))))))); // There is a gap from 21 to 29
+
Mockito.when(persister.readState(Mockito.any())).thenReturn(CompletableFuture.completedFuture(readShareGroupStateResult));
+ SharePartition sharePartition =
SharePartitionBuilder.builder().withPersister(persister).build();
+
+ CompletableFuture<Void> result = sharePartition.maybeInitialize();
+ assertTrue(result.isDone());
+ assertFalse(result.isCompletedExceptionally());
+
+ assertEquals(SharePartitionState.ACTIVE,
sharePartition.partitionState());
+ assertFalse(sharePartition.cachedState().isEmpty());
+ // The start offset will be moved to 21, since the offsets 15 to 20
are acknowledged, and will be removed
+ // from cached state in the maybeUpdateCachedStateAndOffsets method
+ assertEquals(21, sharePartition.startOffset());
+ assertEquals(40, sharePartition.endOffset());
+ assertEquals(3, sharePartition.stateEpoch());
+ assertEquals(21, sharePartition.nextFetchOffset());
+
+ assertEquals(1, sharePartition.cachedState().size());
+ assertNotNull(sharePartition.cachedState().get(30L));
+
+ assertEquals(40, sharePartition.cachedState().get(30L).lastOffset());
+ assertEquals(RecordState.ARCHIVED,
sharePartition.cachedState().get(30L).batchState());
+ assertEquals(3,
sharePartition.cachedState().get(30L).batchDeliveryCount());
+ assertNull(sharePartition.cachedState().get(30L).offsetState());
+
+ SharePartition.InitialReadGapOffset initialReadGapOffset =
sharePartition.initialReadGapOffset();
+ assertNotNull(initialReadGapOffset);
+
+ assertEquals(21, initialReadGapOffset.gapStartOffset());
+ assertEquals(40, initialReadGapOffset.endOffset());
+ }
+
+ @Test
+ public void testMaybeInitializeStateBatchesWithoutGaps() {
+ Persister persister = Mockito.mock(Persister.class);
+ ReadShareGroupStateResult readShareGroupStateResult =
Mockito.mock(ReadShareGroupStateResult.class);
+
Mockito.when(readShareGroupStateResult.topicsData()).thenReturn(List.of(
+ new TopicData<>(TOPIC_ID_PARTITION.topicId(), List.of(
+ PartitionFactory.newPartitionAllData(0, 3, 15L,
Errors.NONE.code(), Errors.NONE.message(),
+ Arrays.asList(
+ new PersisterStateBatch(15L, 20L,
RecordState.ACKNOWLEDGED.id, (short) 2),
+ new PersisterStateBatch(21L, 30L,
RecordState.ARCHIVED.id, (short) 3)))))));
+
Mockito.when(persister.readState(Mockito.any())).thenReturn(CompletableFuture.completedFuture(readShareGroupStateResult));
+ SharePartition sharePartition =
SharePartitionBuilder.builder().withPersister(persister).build();
+
+ CompletableFuture<Void> result = sharePartition.maybeInitialize();
+ assertTrue(result.isDone());
+ assertFalse(result.isCompletedExceptionally());
+
+ assertEquals(SharePartitionState.ACTIVE,
sharePartition.partitionState());
+ assertTrue(sharePartition.cachedState().isEmpty());
+ assertEquals(31, sharePartition.startOffset());
+ assertEquals(31, sharePartition.endOffset());
+ assertEquals(3, sharePartition.stateEpoch());
+ assertEquals(31, sharePartition.nextFetchOffset());
+
+ SharePartition.InitialReadGapOffset initialReadGapOffset =
sharePartition.initialReadGapOffset();
+
+ // Since there are no gaps present in the readState response,
initialReadGapOffset should be null
+ assertNull(initialReadGapOffset);
+ }
+
@Test
public void testAcquireSingleRecord() {
SharePartition sharePartition =
SharePartitionBuilder.builder().withState(SharePartitionState.ACTIVE).build();
@@ -1926,189 +2087,992 @@ public class SharePartitionTest {
expectedOffsetStateMap.put(14L, new
InFlightState(RecordState.ACQUIRED, (short) 1, MEMBER_ID));
assertEquals(expectedOffsetStateMap,
sharePartition.cachedState().get(10L).offsetState());
- // Send the same fetch request batch again but only 2 offsets should
come as acquired.
- acquiredRecordsList = fetchAcquiredRecords(sharePartition.acquire(
- MEMBER_ID,
- BATCH_SIZE,
- MAX_FETCH_RECORDS,
- new FetchPartitionData(Errors.NONE, 20, 3, records,
- Optional.empty(), OptionalLong.empty(), Optional.empty(),
OptionalInt.empty(), false)),
- 2);
+ // Send the same fetch request batch again but only 2 offsets should
come as acquired.
+ acquiredRecordsList = fetchAcquiredRecords(sharePartition.acquire(
+ MEMBER_ID,
+ BATCH_SIZE,
+ MAX_FETCH_RECORDS,
+ new FetchPartitionData(Errors.NONE, 20, 3, records,
+ Optional.empty(), OptionalLong.empty(), Optional.empty(),
OptionalInt.empty(), false)),
+ 2);
+
+ assertArrayEquals(expectedAcquiredRecords(12, 13, 2).toArray(),
acquiredRecordsList.toArray());
+ assertEquals(15, sharePartition.nextFetchOffset());
+ }
+
+ @Test
+ public void testAcquireReleasedRecordMultipleBatches() {
+ SharePartition sharePartition =
SharePartitionBuilder.builder().withState(SharePartitionState.ACTIVE).build();
+ // First fetch request with 5 records starting from offset 10.
+ MemoryRecords records1 = memoryRecords(5, 10);
+ // Second fetch request with 5 records starting from offset 15.
+ MemoryRecords records2 = memoryRecords(5, 15);
+ // Third fetch request with 5 records starting from offset 23, gap of
3 offsets.
+ MemoryRecords records3 = memoryRecords(5, 23);
+ // Fourth fetch request with 5 records starting from offset 28.
+ MemoryRecords records4 = memoryRecords(5, 28);
+
+ List<AcquiredRecords> acquiredRecordsList =
fetchAcquiredRecords(sharePartition.acquire(
+ MEMBER_ID,
+ BATCH_SIZE,
+ MAX_FETCH_RECORDS,
+ new FetchPartitionData(Errors.NONE, 40, 3, records1,
+ Optional.empty(), OptionalLong.empty(), Optional.empty(),
OptionalInt.empty(), false)),
+ 5);
+
+ assertArrayEquals(expectedAcquiredRecords(records1, 1).toArray(),
acquiredRecordsList.toArray());
+ assertEquals(15, sharePartition.nextFetchOffset());
+
+ acquiredRecordsList = fetchAcquiredRecords(sharePartition.acquire(
+ MEMBER_ID,
+ BATCH_SIZE,
+ MAX_FETCH_RECORDS,
+ new FetchPartitionData(Errors.NONE, 30, 3, records2,
+ Optional.empty(), OptionalLong.empty(), Optional.empty(),
OptionalInt.empty(), false)),
+ 5);
+
+ assertArrayEquals(expectedAcquiredRecords(records2, 1).toArray(),
acquiredRecordsList.toArray());
+ assertEquals(20, sharePartition.nextFetchOffset());
+
+ acquiredRecordsList = fetchAcquiredRecords(sharePartition.acquire(
+ MEMBER_ID,
+ BATCH_SIZE,
+ MAX_FETCH_RECORDS,
+ new FetchPartitionData(Errors.NONE, 30, 3, records3,
+ Optional.empty(), OptionalLong.empty(), Optional.empty(),
OptionalInt.empty(), false)),
+ 5);
+
+ assertArrayEquals(expectedAcquiredRecords(records3, 1).toArray(),
acquiredRecordsList.toArray());
+ assertEquals(28, sharePartition.nextFetchOffset());
+
+ acquiredRecordsList = fetchAcquiredRecords(sharePartition.acquire(
+ MEMBER_ID,
+ BATCH_SIZE,
+ MAX_FETCH_RECORDS,
+ new FetchPartitionData(Errors.NONE, 30, 3, records4,
+ Optional.empty(), OptionalLong.empty(), Optional.empty(),
OptionalInt.empty(), false)),
+ 5);
+
+ assertArrayEquals(expectedAcquiredRecords(records4, 1).toArray(),
acquiredRecordsList.toArray());
+ assertEquals(33, sharePartition.nextFetchOffset());
+
+ assertEquals(RecordState.ACQUIRED,
sharePartition.cachedState().get(10L).batchState());
+ assertEquals(RecordState.ACQUIRED,
sharePartition.cachedState().get(15L).batchState());
+ assertEquals(RecordState.ACQUIRED,
sharePartition.cachedState().get(23L).batchState());
+ assertEquals(RecordState.ACQUIRED,
sharePartition.cachedState().get(28L).batchState());
+ assertNull(sharePartition.cachedState().get(10L).offsetState());
+ assertNull(sharePartition.cachedState().get(15L).offsetState());
+ assertNull(sharePartition.cachedState().get(23L).offsetState());
+ assertNull(sharePartition.cachedState().get(28L).offsetState());
+
+ CompletableFuture<Void> ackResult = sharePartition.acknowledge(
+ MEMBER_ID,
+ List.of(new ShareAcknowledgementBatch(12, 30,
Collections.singletonList((byte) 2))));
+ assertNull(ackResult.join());
+ assertFalse(ackResult.isCompletedExceptionally());
+
+ assertEquals(12, sharePartition.nextFetchOffset());
+ assertEquals(4, sharePartition.cachedState().size());
+ assertThrows(IllegalStateException.class, () ->
sharePartition.cachedState().get(10L).batchState());
+ assertNotNull(sharePartition.cachedState().get(10L).offsetState());
+ assertEquals(RecordState.AVAILABLE,
sharePartition.cachedState().get(15L).batchState());
+ assertNull(sharePartition.cachedState().get(15L).offsetState());
+ assertEquals(EMPTY_MEMBER_ID,
sharePartition.cachedState().get(15L).batchMemberId());
+ assertEquals(RecordState.AVAILABLE,
sharePartition.cachedState().get(23L).batchState());
+ assertNull(sharePartition.cachedState().get(23L).offsetState());
+ assertEquals(EMPTY_MEMBER_ID,
sharePartition.cachedState().get(23L).batchMemberId());
+ assertThrows(IllegalStateException.class, () ->
sharePartition.cachedState().get(28L).batchState());
+ assertNotNull(sharePartition.cachedState().get(28L).offsetState());
+
+ Map<Long, InFlightState> expectedOffsetStateMap = new HashMap<>();
+ expectedOffsetStateMap.put(10L, new
InFlightState(RecordState.ACQUIRED, (short) 1, MEMBER_ID));
+ expectedOffsetStateMap.put(11L, new
InFlightState(RecordState.ACQUIRED, (short) 1, MEMBER_ID));
+ expectedOffsetStateMap.put(12L, new
InFlightState(RecordState.AVAILABLE, (short) 1, EMPTY_MEMBER_ID));
+ expectedOffsetStateMap.put(13L, new
InFlightState(RecordState.AVAILABLE, (short) 1, EMPTY_MEMBER_ID));
+ expectedOffsetStateMap.put(14L, new
InFlightState(RecordState.AVAILABLE, (short) 1, EMPTY_MEMBER_ID));
+ assertEquals(expectedOffsetStateMap,
sharePartition.cachedState().get(10L).offsetState());
+
+ expectedOffsetStateMap.clear();
+ expectedOffsetStateMap.put(28L, new
InFlightState(RecordState.AVAILABLE, (short) 1, EMPTY_MEMBER_ID));
+ expectedOffsetStateMap.put(29L, new
InFlightState(RecordState.AVAILABLE, (short) 1, EMPTY_MEMBER_ID));
+ expectedOffsetStateMap.put(30L, new
InFlightState(RecordState.AVAILABLE, (short) 1, EMPTY_MEMBER_ID));
+ expectedOffsetStateMap.put(31L, new
InFlightState(RecordState.ACQUIRED, (short) 1, MEMBER_ID));
+ expectedOffsetStateMap.put(32L, new
InFlightState(RecordState.ACQUIRED, (short) 1, MEMBER_ID));
+ assertEquals(expectedOffsetStateMap,
sharePartition.cachedState().get(28L).offsetState());
+
+ // Send next batch from offset 12, only 3 records should be acquired.
+ acquiredRecordsList = fetchAcquiredRecords(sharePartition.acquire(
+ MEMBER_ID,
+ BATCH_SIZE,
+ MAX_FETCH_RECORDS,
+ new FetchPartitionData(Errors.NONE, 40, 3, records1,
+ Optional.empty(), OptionalLong.empty(), Optional.empty(),
OptionalInt.empty(), false)),
+ 3);
+
+ assertArrayEquals(expectedAcquiredRecords(12, 14, 2).toArray(),
acquiredRecordsList.toArray());
+ assertEquals(15, sharePartition.nextFetchOffset());
+
+ // Though record2 batch exists to acquire but send batch record3, it
should be acquired but
+ // next fetch offset should not move.
+ acquiredRecordsList = fetchAcquiredRecords(sharePartition.acquire(
+ MEMBER_ID,
+ BATCH_SIZE,
+ MAX_FETCH_RECORDS,
+ new FetchPartitionData(Errors.NONE, 40, 3, records3,
+ Optional.empty(), OptionalLong.empty(), Optional.empty(),
OptionalInt.empty(), false)),
+ 5);
+
+ assertArrayEquals(expectedAcquiredRecords(records3, 2).toArray(),
acquiredRecordsList.toArray());
+ assertEquals(15, sharePartition.nextFetchOffset());
+
+ // Acquire partial records from batch 2.
+ MemoryRecords subsetRecords = memoryRecords(2, 17);
+ acquiredRecordsList = fetchAcquiredRecords(sharePartition.acquire(
+ MEMBER_ID,
+ BATCH_SIZE,
+ MAX_FETCH_RECORDS,
+ new FetchPartitionData(Errors.NONE, 20, 3, subsetRecords,
+ Optional.empty(), OptionalLong.empty(), Optional.empty(),
OptionalInt.empty(), false)),
+ 2);
+
+ assertArrayEquals(expectedAcquiredRecords(17, 18, 2).toArray(),
acquiredRecordsList.toArray());
+ // Next fetch offset should not move.
+ assertEquals(15, sharePartition.nextFetchOffset());
+
+ // Acquire partial records from record 4 to further test if the next
fetch offset move
+ // accordingly once complete record 2 is also acquired.
+ subsetRecords = memoryRecords(1, 28);
+ acquiredRecordsList = fetchAcquiredRecords(sharePartition.acquire(
+ MEMBER_ID,
+ BATCH_SIZE,
+ MAX_FETCH_RECORDS,
+ new FetchPartitionData(Errors.NONE, 20, 3, subsetRecords,
+ Optional.empty(), OptionalLong.empty(), Optional.empty(),
OptionalInt.empty(), false)),
+ 1);
+
+ assertArrayEquals(expectedAcquiredRecords(28, 28, 2).toArray(),
acquiredRecordsList.toArray());
+ // Next fetch offset should not move.
+ assertEquals(15, sharePartition.nextFetchOffset());
+
+ // Try to acquire complete record 2 though it's already partially
acquired, the next fetch
+ // offset should move.
+ acquiredRecordsList = fetchAcquiredRecords(sharePartition.acquire(
+ MEMBER_ID,
+ BATCH_SIZE,
+ MAX_FETCH_RECORDS,
+ new FetchPartitionData(Errors.NONE, 20, 3, records2,
+ Optional.empty(), OptionalLong.empty(), Optional.empty(),
OptionalInt.empty(), false)),
+ 3);
+
+ // Offset 15,16 and 19 should be acquired.
+ List<AcquiredRecords> expectedAcquiredRecords =
expectedAcquiredRecords(15, 16, 2);
+ expectedAcquiredRecords.addAll(expectedAcquiredRecords(19, 19, 2));
+ assertArrayEquals(expectedAcquiredRecords.toArray(),
acquiredRecordsList.toArray());
+ // Next fetch offset should not move.
+ assertEquals(29, sharePartition.nextFetchOffset());
+ }
+
+ @Test
+ public void testAcquireGapAtBeginningAndRecordsFetchedFromGap() {
+ Persister persister = Mockito.mock(Persister.class);
+ ReadShareGroupStateResult readShareGroupStateResult =
Mockito.mock(ReadShareGroupStateResult.class);
+
Mockito.when(readShareGroupStateResult.topicsData()).thenReturn(List.of(
+ new TopicData<>(TOPIC_ID_PARTITION.topicId(), List.of(
+ PartitionFactory.newPartitionAllData(0, 3, 11L,
Errors.NONE.code(), Errors.NONE.message(),
+ Arrays.asList(
+ new PersisterStateBatch(21L, 30L,
RecordState.ACKNOWLEDGED.id, (short) 2), // There is a gap from 11 to 20
+ new PersisterStateBatch(31L, 40L,
RecordState.ARCHIVED.id, (short) 1)
+ ))))));
+
Mockito.when(persister.readState(Mockito.any())).thenReturn(CompletableFuture.completedFuture(readShareGroupStateResult));
+
+ SharePartition sharePartition =
SharePartitionBuilder.builder().withPersister(persister).build();
+
+ sharePartition.maybeInitialize();
+
+ // All records fetched are part of the gap. The gap is from 11 to 20,
fetched offsets are 11 to 15.
+ MemoryRecords records = memoryRecords(5, 11);
+
+ List<AcquiredRecords> acquiredRecordsList =
fetchAcquiredRecords(sharePartition.acquire(
+ MEMBER_ID,
+ BATCH_SIZE,
+ MAX_FETCH_RECORDS,
+ new FetchPartitionData(Errors.NONE, 3, 0, records,
+ Optional.empty(), OptionalLong.empty(), Optional.empty(),
OptionalInt.empty(), false)),
+ 5);
+
+ assertArrayEquals(expectedAcquiredRecord(11, 15, 1).toArray(),
acquiredRecordsList.toArray());
+
+ assertEquals(SharePartitionState.ACTIVE,
sharePartition.partitionState());
+ assertFalse(sharePartition.cachedState().isEmpty());
+ assertEquals(11, sharePartition.startOffset());
+ assertEquals(40, sharePartition.endOffset());
+ assertEquals(3, sharePartition.stateEpoch());
+ assertEquals(16, sharePartition.nextFetchOffset());
+
+ SharePartition.InitialReadGapOffset initialReadGapOffset =
sharePartition.initialReadGapOffset();
+ assertNotNull(initialReadGapOffset);
+
+ // After records are acquired, the initialReadGapOffset should be
updated
+ assertEquals(16, initialReadGapOffset.gapStartOffset());
+ assertEquals(40, initialReadGapOffset.endOffset());
+ }
+
+ @Test
+ public void
testAcquireGapAtBeginningAndFetchedRecordsOverlapInFlightBatches() {
+ Persister persister = Mockito.mock(Persister.class);
+ ReadShareGroupStateResult readShareGroupStateResult =
Mockito.mock(ReadShareGroupStateResult.class);
+
Mockito.when(readShareGroupStateResult.topicsData()).thenReturn(List.of(
+ new TopicData<>(TOPIC_ID_PARTITION.topicId(), List.of(
+ PartitionFactory.newPartitionAllData(0, 3, 11L,
Errors.NONE.code(), Errors.NONE.message(),
+ Arrays.asList(
+ new PersisterStateBatch(21L, 30L,
RecordState.ACKNOWLEDGED.id, (short) 2), // There is a gap from 11 to 20
+ new PersisterStateBatch(31L, 40L,
RecordState.ARCHIVED.id, (short) 1)
+ ))))));
+
Mockito.when(persister.readState(Mockito.any())).thenReturn(CompletableFuture.completedFuture(readShareGroupStateResult));
+
+ SharePartition sharePartition =
SharePartitionBuilder.builder().withPersister(persister).build();
+
+ sharePartition.maybeInitialize();
+
+ // Fetched offsets overlap the inFlight batches. The gap is from 11 to
20, but fetched records are from 11 to 25.
+ MemoryRecords records = memoryRecords(15, 11);
+
+ List<AcquiredRecords> acquiredRecordsList =
fetchAcquiredRecords(sharePartition.acquire(
+ MEMBER_ID,
+ BATCH_SIZE,
+ MAX_FETCH_RECORDS,
+ new FetchPartitionData(Errors.NONE, 3, 0, records,
+ Optional.empty(), OptionalLong.empty(), Optional.empty(),
OptionalInt.empty(), false)),
+ 10);
+
+ assertArrayEquals(expectedAcquiredRecord(11, 20, 1).toArray(),
acquiredRecordsList.toArray());
+
+ assertEquals(SharePartitionState.ACTIVE,
sharePartition.partitionState());
+ assertFalse(sharePartition.cachedState().isEmpty());
+ assertEquals(11, sharePartition.startOffset());
+ assertEquals(40, sharePartition.endOffset());
+ assertEquals(3, sharePartition.stateEpoch());
+ assertEquals(41, sharePartition.nextFetchOffset());
+
+ SharePartition.InitialReadGapOffset initialReadGapOffset =
sharePartition.initialReadGapOffset();
+ assertNotNull(initialReadGapOffset);
+
+ // After records are acquired, the initialReadGapOffset should be
updated
+ assertEquals(21, initialReadGapOffset.gapStartOffset());
+ assertEquals(40, initialReadGapOffset.endOffset());
+ }
+
+ @Test
+ public void
testAcquireGapAtBeginningAndFetchedRecordsOverlapInFlightAvailableBatches() {
+ Persister persister = Mockito.mock(Persister.class);
+ ReadShareGroupStateResult readShareGroupStateResult =
Mockito.mock(ReadShareGroupStateResult.class);
+
Mockito.when(readShareGroupStateResult.topicsData()).thenReturn(List.of(
+ new TopicData<>(TOPIC_ID_PARTITION.topicId(), List.of(
+ PartitionFactory.newPartitionAllData(0, 3, 11L,
Errors.NONE.code(), Errors.NONE.message(),
+ Arrays.asList(
+ new PersisterStateBatch(21L, 30L,
RecordState.AVAILABLE.id, (short) 2), // There is a gap from 11 to 20
+ new PersisterStateBatch(31L, 40L,
RecordState.ARCHIVED.id, (short) 1)
+ ))))));
+
Mockito.when(persister.readState(Mockito.any())).thenReturn(CompletableFuture.completedFuture(readShareGroupStateResult));
+
+ WriteShareGroupStateResult writeShareGroupStateResult =
Mockito.mock(WriteShareGroupStateResult.class);
+
Mockito.when(writeShareGroupStateResult.topicsData()).thenReturn(List.of(
+ new TopicData<>(TOPIC_ID_PARTITION.topicId(), List.of(
+ PartitionFactory.newPartitionErrorData(0, Errors.NONE.code(),
Errors.NONE.message())))));
+
Mockito.when(persister.writeState(Mockito.any())).thenReturn(CompletableFuture.completedFuture(writeShareGroupStateResult));
+
+ SharePartition sharePartition =
SharePartitionBuilder.builder().withPersister(persister).build();
+
+ sharePartition.maybeInitialize();
+
+ // Fetched offsets overlap the inFlight batches. The gap is from 11 to
20, but fetched records are from 11 to 25.
+ MemoryRecords records = memoryRecords(15, 11);
+
+ List<AcquiredRecords> acquiredRecordsList =
fetchAcquiredRecords(sharePartition.acquire(
+ MEMBER_ID,
+ BATCH_SIZE,
+ MAX_FETCH_RECORDS,
+ new FetchPartitionData(Errors.NONE, 3, 0, records,
+ Optional.empty(), OptionalLong.empty(), Optional.empty(),
OptionalInt.empty(), false)),
+ 15);
+
+ // The gap from 11 to 20 will be acquired. Since the next batch is
AVAILABLE, and we records fetched from replica manager
+ // overlap with the next batch, some records from the next batch will
also be acquired
+ List<AcquiredRecords> expectedAcquiredRecord = new
ArrayList<>(expectedAcquiredRecord(11, 20, 1));
+ expectedAcquiredRecord.addAll(expectedAcquiredRecord(21, 21, 3));
+ expectedAcquiredRecord.addAll(expectedAcquiredRecord(22, 22, 3));
+ expectedAcquiredRecord.addAll(expectedAcquiredRecord(23, 23, 3));
+ expectedAcquiredRecord.addAll(expectedAcquiredRecord(24, 24, 3));
+ expectedAcquiredRecord.addAll(expectedAcquiredRecord(25, 25, 3));
+ assertArrayEquals(expectedAcquiredRecord.toArray(),
acquiredRecordsList.toArray());
+
+ assertEquals(SharePartitionState.ACTIVE,
sharePartition.partitionState());
+ assertFalse(sharePartition.cachedState().isEmpty());
+ assertEquals(11, sharePartition.startOffset());
+ assertEquals(40, sharePartition.endOffset());
+ assertEquals(3, sharePartition.stateEpoch());
+ assertEquals(26, sharePartition.nextFetchOffset());
+
+ SharePartition.InitialReadGapOffset initialReadGapOffset =
sharePartition.initialReadGapOffset();
+ assertNotNull(initialReadGapOffset);
+
+ // After records are acquired, the initialReadGapOffset should be
updated
+ assertEquals(26, initialReadGapOffset.gapStartOffset());
+ assertEquals(40, initialReadGapOffset.endOffset());
+ }
+
+ @Test
+ public void
testAcquireWhenCachedStateContainsGapsAndRecordsFetchedFromNonGapOffset() {
+ Persister persister = Mockito.mock(Persister.class);
+ ReadShareGroupStateResult readShareGroupStateResult =
Mockito.mock(ReadShareGroupStateResult.class);
+
Mockito.when(readShareGroupStateResult.topicsData()).thenReturn(List.of(
+ new TopicData<>(TOPIC_ID_PARTITION.topicId(), List.of(
+ PartitionFactory.newPartitionAllData(0, 3, 11L,
Errors.NONE.code(), Errors.NONE.message(),
+ Arrays.asList(
+ new PersisterStateBatch(11L, 20L,
RecordState.AVAILABLE.id, (short) 2),
+ new PersisterStateBatch(31L, 40L,
RecordState.ARCHIVED.id, (short) 1) // There is a gap from 21-30
+ ))))));
+
Mockito.when(persister.readState(Mockito.any())).thenReturn(CompletableFuture.completedFuture(readShareGroupStateResult));
+
+ SharePartition sharePartition =
SharePartitionBuilder.builder().withPersister(persister).build();
+
+ sharePartition.maybeInitialize();
+
+ // Fetched records are part of inFlightBatch 11-20 with state
AVAILABLE. Fetched offsets also overlap the
+ // inFlight batches. The gap is from 11 to 20, but fetched records are
from 11 to 25.
+ MemoryRecords records = memoryRecords(15, 11);
+
+ List<AcquiredRecords> acquiredRecordsList =
fetchAcquiredRecords(sharePartition.acquire(
+ MEMBER_ID,
+ BATCH_SIZE,
+ MAX_FETCH_RECORDS,
+ new FetchPartitionData(Errors.NONE, 3, 0, records,
+ Optional.empty(), OptionalLong.empty(), Optional.empty(),
OptionalInt.empty(), false)),
+ 15);
+
+ // 2 different batches will be acquired this time (11-20 and 21-25).
The first batch will have delivery count 3
+ // as previous deliveryCount was 2. The second batch will have
delivery count 1 as it is acquired for the first time.
+ List<AcquiredRecords> expectedAcquiredRecord = new
ArrayList<>(expectedAcquiredRecord(11, 20, 3));
+ expectedAcquiredRecord.addAll(expectedAcquiredRecord(21, 25, 1));
+ assertArrayEquals(expectedAcquiredRecord.toArray(),
acquiredRecordsList.toArray());
+
+ assertEquals(SharePartitionState.ACTIVE,
sharePartition.partitionState());
+ assertFalse(sharePartition.cachedState().isEmpty());
+ assertEquals(11, sharePartition.startOffset());
+ assertEquals(40, sharePartition.endOffset());
+ assertEquals(3, sharePartition.stateEpoch());
+ assertEquals(26, sharePartition.nextFetchOffset());
+
+ SharePartition.InitialReadGapOffset initialReadGapOffset =
sharePartition.initialReadGapOffset();
+ assertNotNull(initialReadGapOffset);
+
+ // After records are acquired, the initialReadGapOffset should be
updated
+ assertEquals(26, initialReadGapOffset.gapStartOffset());
+ assertEquals(40, initialReadGapOffset.endOffset());
+ }
+
+ @Test
+ public void
testAcquireGapAtBeginningAndFetchedRecordsOverlapMultipleInFlightBatches() {
+ Persister persister = Mockito.mock(Persister.class);
+ ReadShareGroupStateResult readShareGroupStateResult =
Mockito.mock(ReadShareGroupStateResult.class);
+
Mockito.when(readShareGroupStateResult.topicsData()).thenReturn(List.of(
+ new TopicData<>(TOPIC_ID_PARTITION.topicId(), List.of(
+ PartitionFactory.newPartitionAllData(0, 3, 11L,
Errors.NONE.code(), Errors.NONE.message(),
+ Arrays.asList(
+ new PersisterStateBatch(21L, 30L,
RecordState.ACKNOWLEDGED.id, (short) 2), // There is a gap from 11 to 20
+ new PersisterStateBatch(41L, 50L,
RecordState.AVAILABLE.id, (short) 1), // There is a gap from 31 to 40
+ new PersisterStateBatch(61L, 70L,
RecordState.ARCHIVED.id, (short) 1), // There is a gap from 51 to 60
+ new PersisterStateBatch(81L, 90L,
RecordState.AVAILABLE.id, (short) 1) // There is a gap from 71 to 80
+ ))))));
+
Mockito.when(persister.readState(Mockito.any())).thenReturn(CompletableFuture.completedFuture(readShareGroupStateResult));
+
+ SharePartition sharePartition =
SharePartitionBuilder.builder().withPersister(persister).build();
+
+ sharePartition.maybeInitialize();
+
+ MemoryRecords records = memoryRecords(75, 11);
+
+ List<AcquiredRecords> acquiredRecordsList =
fetchAcquiredRecords(sharePartition.acquire(
+ MEMBER_ID,
+ BATCH_SIZE,
+ MAX_FETCH_RECORDS,
+ new FetchPartitionData(Errors.NONE, 3, 0, records,
+ Optional.empty(), OptionalLong.empty(), Optional.empty(),
OptionalInt.empty(), false)),
+ 55);
+
+ // Acquired batches will contain the following ->
+ // 1. 11-20 (gap offsets)
+ // 2. 31-40 (gap offsets)
+ // 3. 41-50 (AVAILABLE batch in cachedState)
+ // 4. 51-60 (gap offsets)
+ // 5. 71-80 (gap offsets)
+ // 6. 81-85 (AVAILABLE batch in cachedState). These will be acquired
as separate batches because we are breaking a batch in the cachedState
+ List<AcquiredRecords> expectedAcquiredRecord = new
ArrayList<>(expectedAcquiredRecord(11, 20, 1));
+ expectedAcquiredRecord.addAll(expectedAcquiredRecord(31, 40, 1));
+ expectedAcquiredRecord.addAll(expectedAcquiredRecord(41, 50, 2));
+ expectedAcquiredRecord.addAll(expectedAcquiredRecord(51, 60, 1));
+ expectedAcquiredRecord.addAll(expectedAcquiredRecord(71, 80, 1));
+ expectedAcquiredRecord.addAll(expectedAcquiredRecord(81, 81, 2));
+ expectedAcquiredRecord.addAll(expectedAcquiredRecord(82, 82, 2));
+ expectedAcquiredRecord.addAll(expectedAcquiredRecord(83, 83, 2));
+ expectedAcquiredRecord.addAll(expectedAcquiredRecord(84, 84, 2));
+ expectedAcquiredRecord.addAll(expectedAcquiredRecord(85, 85, 2));
+ assertArrayEquals(expectedAcquiredRecord.toArray(),
acquiredRecordsList.toArray());
+
+ assertEquals(SharePartitionState.ACTIVE,
sharePartition.partitionState());
+ assertFalse(sharePartition.cachedState().isEmpty());
+ assertEquals(11, sharePartition.startOffset());
+ assertEquals(90, sharePartition.endOffset());
+ assertEquals(3, sharePartition.stateEpoch());
+ assertEquals(86, sharePartition.nextFetchOffset());
+
+ SharePartition.InitialReadGapOffset initialReadGapOffset =
sharePartition.initialReadGapOffset();
+ assertNotNull(initialReadGapOffset);
+
+ // After records are acquired, the initialReadGapOffset should be
updated
+ assertEquals(86, initialReadGapOffset.gapStartOffset());
+ assertEquals(90, initialReadGapOffset.endOffset());
+ }
+
+ @Test
+ public void testAcquireGapAtBeginningAndFetchedRecordsEndJustBeforeGap() {
+ Persister persister = Mockito.mock(Persister.class);
+ ReadShareGroupStateResult readShareGroupStateResult =
Mockito.mock(ReadShareGroupStateResult.class);
+
Mockito.when(readShareGroupStateResult.topicsData()).thenReturn(List.of(
+ new TopicData<>(TOPIC_ID_PARTITION.topicId(), List.of(
+ PartitionFactory.newPartitionAllData(0, 3, 11L,
Errors.NONE.code(), Errors.NONE.message(),
+ Arrays.asList(
+ new PersisterStateBatch(21L, 30L,
RecordState.AVAILABLE.id, (short) 2), // There is a gap from 11 to 20
+ new PersisterStateBatch(41L, 50L,
RecordState.ACKNOWLEDGED.id, (short) 1), // There is a gap from 31 to 40
+ new PersisterStateBatch(61L, 70L,
RecordState.ARCHIVED.id, (short) 1) // There is a gap from 51 to 60
+ ))))));
+
Mockito.when(persister.readState(Mockito.any())).thenReturn(CompletableFuture.completedFuture(readShareGroupStateResult));
+
+ SharePartition sharePartition =
SharePartitionBuilder.builder().withPersister(persister).build();
+
+ sharePartition.maybeInitialize();
+
+ MemoryRecords records = memoryRecords(20, 11);
+
+ List<AcquiredRecords> acquiredRecordsList =
fetchAcquiredRecords(sharePartition.acquire(
+ MEMBER_ID,
+ BATCH_SIZE,
+ MAX_FETCH_RECORDS,
+ new FetchPartitionData(Errors.NONE, 3, 0, records,
+ Optional.empty(), OptionalLong.empty(), Optional.empty(),
OptionalInt.empty(), false)),
+ 20);
+
+ // Acquired batches will contain the following ->
+ // 1. 11-20 (gap offsets)
+ // 2. 21-30 (AVAILABLE batch in cachedState)
+ List<AcquiredRecords> expectedAcquiredRecord = new
ArrayList<>(expectedAcquiredRecord(11, 20, 1));
+ expectedAcquiredRecord.addAll(expectedAcquiredRecord(21, 30, 3));
+ assertArrayEquals(expectedAcquiredRecord.toArray(),
acquiredRecordsList.toArray());
+
+ assertEquals(SharePartitionState.ACTIVE,
sharePartition.partitionState());
+ assertFalse(sharePartition.cachedState().isEmpty());
+ assertEquals(11, sharePartition.startOffset());
+ assertEquals(70, sharePartition.endOffset());
+ assertEquals(3, sharePartition.stateEpoch());
+ assertEquals(31, sharePartition.nextFetchOffset());
+
+ SharePartition.InitialReadGapOffset initialReadGapOffset =
sharePartition.initialReadGapOffset();
+ assertNotNull(initialReadGapOffset);
+
+ // After records are acquired, the initialReadGapOffset should be
updated
+ assertEquals(31, initialReadGapOffset.gapStartOffset());
+ assertEquals(70, initialReadGapOffset.endOffset());
+ }
+
+ @Test
+ public void
testAcquireGapAtBeginningAndFetchedRecordsIncludeGapOffsetsAtEnd() {
+ Persister persister = Mockito.mock(Persister.class);
+ ReadShareGroupStateResult readShareGroupStateResult =
Mockito.mock(ReadShareGroupStateResult.class);
+
Mockito.when(readShareGroupStateResult.topicsData()).thenReturn(List.of(
+ new TopicData<>(TOPIC_ID_PARTITION.topicId(), List.of(
+ PartitionFactory.newPartitionAllData(0, 3, 11L,
Errors.NONE.code(), Errors.NONE.message(),
+ Arrays.asList(
+ new PersisterStateBatch(21L, 30L,
RecordState.ACKNOWLEDGED.id, (short) 2), // There is a gap from 11 to 20
+ new PersisterStateBatch(41L, 50L,
RecordState.AVAILABLE.id, (short) 1), // There is a gap from 31 to 40
+ new PersisterStateBatch(61L, 70L,
RecordState.ARCHIVED.id, (short) 1), // There is a gap from 51 to 60
+ new PersisterStateBatch(81L, 90L,
RecordState.AVAILABLE.id, (short) 1) // There is a gap from 71 to 80
+ ))))));
+
Mockito.when(persister.readState(Mockito.any())).thenReturn(CompletableFuture.completedFuture(readShareGroupStateResult));
+
+ SharePartition sharePartition =
SharePartitionBuilder.builder().withPersister(persister).build();
+
+ sharePartition.maybeInitialize();
+
+ MemoryRecords records = memoryRecords(65, 11);
+
+ List<AcquiredRecords> acquiredRecordsList =
fetchAcquiredRecords(sharePartition.acquire(
+ MEMBER_ID,
+ BATCH_SIZE,
+ MAX_FETCH_RECORDS,
+ new FetchPartitionData(Errors.NONE, 3, 0, records,
+ Optional.empty(), OptionalLong.empty(), Optional.empty(),
OptionalInt.empty(), false)),
+ 45);
+
+ // Acquired batches will contain the following ->
+ // 1. 11-20 (gap offsets)
+ // 2. 31-40 (gap offsets)
+ // 3. 41-50 (AVAILABLE batch in cachedState)
+ // 4. 51-60 (gap offsets)
+ // 5. 71-75 (gap offsets). The gap is from 71 to 80, but the fetched
records end at 75. These gap offsets will be acquired as a single batch
+ List<AcquiredRecords> expectedAcquiredRecord = new
ArrayList<>(expectedAcquiredRecord(11, 20, 1));
+ expectedAcquiredRecord.addAll(expectedAcquiredRecord(31, 40, 1));
+ expectedAcquiredRecord.addAll(expectedAcquiredRecord(41, 50, 2));
+ expectedAcquiredRecord.addAll(expectedAcquiredRecord(51, 60, 1));
+ expectedAcquiredRecord.addAll(expectedAcquiredRecord(71, 75, 1));
+ assertArrayEquals(expectedAcquiredRecord.toArray(),
acquiredRecordsList.toArray());
+
+ assertEquals(SharePartitionState.ACTIVE,
sharePartition.partitionState());
+ assertFalse(sharePartition.cachedState().isEmpty());
+ assertEquals(11, sharePartition.startOffset());
+ assertEquals(90, sharePartition.endOffset());
+ assertEquals(3, sharePartition.stateEpoch());
+ assertEquals(76, sharePartition.nextFetchOffset());
+
+ SharePartition.InitialReadGapOffset initialReadGapOffset =
sharePartition.initialReadGapOffset();
+ assertNotNull(initialReadGapOffset);
+
+ // After records are acquired, the initialReadGapOffset should be
updated
+ assertEquals(76, initialReadGapOffset.gapStartOffset());
+ assertEquals(90, initialReadGapOffset.endOffset());
+ }
+
+
+ @Test
+ public void
testAcquireWhenRecordsFetchedFromGapAndMaxFetchRecordsIsExceeded() {
+ Persister persister = Mockito.mock(Persister.class);
+ ReadShareGroupStateResult readShareGroupStateResult =
Mockito.mock(ReadShareGroupStateResult.class);
+
Mockito.when(readShareGroupStateResult.topicsData()).thenReturn(List.of(
+ new TopicData<>(TOPIC_ID_PARTITION.topicId(), List.of(
+ PartitionFactory.newPartitionAllData(0, 3, 11L,
Errors.NONE.code(), Errors.NONE.message(),
+ Arrays.asList(
+ new PersisterStateBatch(11L, 20L,
RecordState.ACKNOWLEDGED.id, (short) 2),
+ new PersisterStateBatch(31L, 40L,
RecordState.ARCHIVED.id, (short) 1) // There is a gap from 21-30
+ ))))));
+
Mockito.when(persister.readState(Mockito.any())).thenReturn(CompletableFuture.completedFuture(readShareGroupStateResult));
+
+ SharePartition sharePartition =
SharePartitionBuilder.builder().withPersister(persister).build();
+
+ sharePartition.maybeInitialize();
+
+ // Creating 3 batches of records with a total of 8 records
+ ByteBuffer buffer = ByteBuffer.allocate(4096);
+ memoryRecordsBuilder(buffer, 3, 21).close();
+ memoryRecordsBuilder(buffer, 3, 24).close();
+ memoryRecordsBuilder(buffer, 2, 27).close();
+ buffer.flip();
+ MemoryRecords records = MemoryRecords.readableRecords(buffer);
+
+ List<AcquiredRecords> acquiredRecordsList =
fetchAcquiredRecords(sharePartition.acquire(
+ MEMBER_ID,
+ BATCH_SIZE,
+ 6, // maxFetchRecords is less than the number of records
fetched
+ new FetchPartitionData(Errors.NONE, 3, 0, records,
+ Optional.empty(), OptionalLong.empty(), Optional.empty(),
OptionalInt.empty(), false)),
+ 6);
+
+ // Since max fetch records (6) is less than the number of records
fetched (8), only 6 records will be acquired
+ assertArrayEquals(expectedAcquiredRecord(21, 26, 1).toArray(),
acquiredRecordsList.toArray());
+
+ assertEquals(SharePartitionState.ACTIVE,
sharePartition.partitionState());
+ assertFalse(sharePartition.cachedState().isEmpty());
+ assertEquals(21, sharePartition.startOffset());
+ assertEquals(40, sharePartition.endOffset());
+ assertEquals(3, sharePartition.stateEpoch());
+ assertEquals(27, sharePartition.nextFetchOffset());
+
+ SharePartition.InitialReadGapOffset initialReadGapOffset =
sharePartition.initialReadGapOffset();
+ assertNotNull(initialReadGapOffset);
+
+ assertEquals(27, initialReadGapOffset.gapStartOffset());
+ assertEquals(40, initialReadGapOffset.endOffset());
+ }
+
+ @Test
+ public void testAcquireMaxFetchRecordsExceededAfterAcquiringGaps() {
+ Persister persister = Mockito.mock(Persister.class);
+ ReadShareGroupStateResult readShareGroupStateResult =
Mockito.mock(ReadShareGroupStateResult.class);
+
Mockito.when(readShareGroupStateResult.topicsData()).thenReturn(List.of(
+ new TopicData<>(TOPIC_ID_PARTITION.topicId(), List.of(
+ PartitionFactory.newPartitionAllData(0, 3, 11L,
Errors.NONE.code(), Errors.NONE.message(),
+ Arrays.asList(
+ new PersisterStateBatch(21L, 30L,
RecordState.AVAILABLE.id, (short) 2), // There is a gap from 11-20
+ new PersisterStateBatch(31L, 40L,
RecordState.ARCHIVED.id, (short) 1)
+ ))))));
+
Mockito.when(persister.readState(Mockito.any())).thenReturn(CompletableFuture.completedFuture(readShareGroupStateResult));
+
+ SharePartition sharePartition =
SharePartitionBuilder.builder().withPersister(persister).build();
+
+ sharePartition.maybeInitialize();
+
+ // Creating 3 batches of records with a total of 8 records
+ ByteBuffer buffer = ByteBuffer.allocate(4096);
+ memoryRecordsBuilder(buffer, 10, 11).close();
+ memoryRecordsBuilder(buffer, 10, 21).close();
+ buffer.flip();
+ MemoryRecords records = MemoryRecords.readableRecords(buffer);
+
+ List<AcquiredRecords> acquiredRecordsList =
fetchAcquiredRecords(sharePartition.acquire(
+ MEMBER_ID,
+ BATCH_SIZE,
+ 8, // maxFetchRecords is less than the number of records
fetched
+ new FetchPartitionData(Errors.NONE, 3, 0, records,
+ Optional.empty(), OptionalLong.empty(), Optional.empty(),
OptionalInt.empty(), false)),
+ 10);
+
+ assertArrayEquals(expectedAcquiredRecord(11, 20, 1).toArray(),
acquiredRecordsList.toArray());
+
+ assertEquals(SharePartitionState.ACTIVE,
sharePartition.partitionState());
+ assertFalse(sharePartition.cachedState().isEmpty());
+ assertEquals(11, sharePartition.startOffset());
+ assertEquals(40, sharePartition.endOffset());
+ assertEquals(3, sharePartition.stateEpoch());
+ assertEquals(21, sharePartition.nextFetchOffset());
+
+ SharePartition.InitialReadGapOffset initialReadGapOffset =
sharePartition.initialReadGapOffset();
+ assertNotNull(initialReadGapOffset);
+
+ assertEquals(21, initialReadGapOffset.gapStartOffset());
+ assertEquals(40, initialReadGapOffset.endOffset());
+ }
+
+ @Test
+ public void testAcquireMaxFetchRecordsExceededBeforeAcquiringGaps() {
+ Persister persister = Mockito.mock(Persister.class);
+ ReadShareGroupStateResult readShareGroupStateResult =
Mockito.mock(ReadShareGroupStateResult.class);
+
Mockito.when(readShareGroupStateResult.topicsData()).thenReturn(List.of(
+ new TopicData<>(TOPIC_ID_PARTITION.topicId(), List.of(
+ PartitionFactory.newPartitionAllData(0, 3, 11L,
Errors.NONE.code(), Errors.NONE.message(),
+ Arrays.asList(
+ new PersisterStateBatch(11L, 20L,
RecordState.AVAILABLE.id, (short) 2),
+ new PersisterStateBatch(31L, 40L,
RecordState.AVAILABLE.id, (short) 1) // There is a gap from 21-30
+ ))))));
+
Mockito.when(persister.readState(Mockito.any())).thenReturn(CompletableFuture.completedFuture(readShareGroupStateResult));
+
+ SharePartition sharePartition =
SharePartitionBuilder.builder().withPersister(persister).build();
+
+ sharePartition.maybeInitialize();
+
+ // Creating 3 batches of records with a total of 8 records
+ ByteBuffer buffer = ByteBuffer.allocate(4096);
+ memoryRecordsBuilder(buffer, 10, 11).close();
+ memoryRecordsBuilder(buffer, 20, 21).close();
+ buffer.flip();
+ MemoryRecords records = MemoryRecords.readableRecords(buffer);
+
+ List<AcquiredRecords> acquiredRecordsList =
fetchAcquiredRecords(sharePartition.acquire(
+ MEMBER_ID,
+ BATCH_SIZE,
+ 8, // maxFetchRecords is less than the number of records
fetched
+ new FetchPartitionData(Errors.NONE, 3, 0, records,
+ Optional.empty(), OptionalLong.empty(), Optional.empty(),
OptionalInt.empty(), false)),
+ 10);
+
+ assertArrayEquals(expectedAcquiredRecord(11, 20, 3).toArray(),
acquiredRecordsList.toArray());
+
+ assertEquals(SharePartitionState.ACTIVE,
sharePartition.partitionState());
+ assertFalse(sharePartition.cachedState().isEmpty());
+ assertEquals(11, sharePartition.startOffset());
+ assertEquals(40, sharePartition.endOffset());
+ assertEquals(3, sharePartition.stateEpoch());
+ assertEquals(21, sharePartition.nextFetchOffset());
+
+ SharePartition.InitialReadGapOffset initialReadGapOffset =
sharePartition.initialReadGapOffset();
+ assertNotNull(initialReadGapOffset);
+
+ assertEquals(21, initialReadGapOffset.gapStartOffset());
+ assertEquals(40, initialReadGapOffset.endOffset());
+ }
+
+ @Test
+ public void
testAcquireWhenRecordsFetchedFromGapAndPartitionContainsNaturalGaps() {
+ Persister persister = Mockito.mock(Persister.class);
+ ReadShareGroupStateResult readShareGroupStateResult =
Mockito.mock(ReadShareGroupStateResult.class);
+
Mockito.when(readShareGroupStateResult.topicsData()).thenReturn(List.of(
+ new TopicData<>(TOPIC_ID_PARTITION.topicId(), List.of(
+ PartitionFactory.newPartitionAllData(0, 3, 10L,
Errors.NONE.code(), Errors.NONE.message(),
+ Arrays.asList(
+ new PersisterStateBatch(15L, 20L,
RecordState.ACKNOWLEDGED.id, (short) 2), // There is a gap from 10 to 14
+ new PersisterStateBatch(30L, 40L,
RecordState.ARCHIVED.id, (short) 1) // There is a gap from 21-29
+ ))))));
+
Mockito.when(persister.readState(Mockito.any())).thenReturn(CompletableFuture.completedFuture(readShareGroupStateResult));
+
+ SharePartition sharePartition =
SharePartitionBuilder.builder().withPersister(persister).build();
+
+ sharePartition.maybeInitialize();
+
+ ByteBuffer buffer = ByteBuffer.allocate(4096);
+ memoryRecordsBuilder(buffer, 11, 10).close();
+ memoryRecordsBuilder(buffer, 21, 30).close();
+ buffer.flip();
+ MemoryRecords records = MemoryRecords.readableRecords(buffer);
+
+ List<AcquiredRecords> acquiredRecordsList =
fetchAcquiredRecords(sharePartition.acquire(
+ MEMBER_ID,
+ BATCH_SIZE,
+ MAX_FETCH_RECORDS,
+ new FetchPartitionData(Errors.NONE, 3, 0, records,
+ Optional.empty(), OptionalLong.empty(), Optional.empty(),
OptionalInt.empty(), false)),
+ 24);
+
+ // Acquired batches will contain the following ->
+ // 1. 10-14 (gap offsets)
+ // 2. 21-29 (gap offsets)
+ // 3. 41-50 (gap offsets)
+ // The offsets fetched from partition include a natural gap from 21 to
29. The cached state also contain the
+ // gap from 21 to 29. But since the broker does not parse the fetched
records, the broker is not aware of this
+ // natural gap. In this case, the gap will be acquired, and it is the
client's responsibility to inform the
+ // broker about this gap.
+ List<AcquiredRecords> expectedAcquiredRecord = new
ArrayList<>(expectedAcquiredRecord(10, 14, 1));
+ expectedAcquiredRecord.addAll(expectedAcquiredRecord(21, 29, 1));
+ expectedAcquiredRecord.addAll(expectedAcquiredRecord(41, 50, 1));
+ assertArrayEquals(expectedAcquiredRecord.toArray(),
acquiredRecordsList.toArray());
+
+ assertEquals(SharePartitionState.ACTIVE,
sharePartition.partitionState());
+ assertFalse(sharePartition.cachedState().isEmpty());
+ assertEquals(10, sharePartition.startOffset());
+ assertEquals(50, sharePartition.endOffset());
+ assertEquals(3, sharePartition.stateEpoch());
+ assertEquals(51, sharePartition.nextFetchOffset());
+
+ SharePartition.InitialReadGapOffset initialReadGapOffset =
sharePartition.initialReadGapOffset();
+ assertNull(initialReadGapOffset);
+ }
+
+ @Test
+ public void
testAcquireCachedStateInitialGapMatchesWithActualPartitionGap() {
+ Persister persister = Mockito.mock(Persister.class);
+ ReadShareGroupStateResult readShareGroupStateResult =
Mockito.mock(ReadShareGroupStateResult.class);
+
Mockito.when(readShareGroupStateResult.topicsData()).thenReturn(List.of(
+ new TopicData<>(TOPIC_ID_PARTITION.topicId(), List.of(
+ PartitionFactory.newPartitionAllData(0, 3, 11L,
Errors.NONE.code(), Errors.NONE.message(),
+ Arrays.asList(
+ new PersisterStateBatch(21L, 30L,
RecordState.ACKNOWLEDGED.id, (short) 2), // There is a gap from 11 to 20
+ new PersisterStateBatch(41L, 50L,
RecordState.ARCHIVED.id, (short) 1) // There is a gap from 31-40
+ ))))));
+
Mockito.when(persister.readState(Mockito.any())).thenReturn(CompletableFuture.completedFuture(readShareGroupStateResult));
+
+ SharePartition sharePartition =
SharePartitionBuilder.builder().withPersister(persister).build();
+
+ sharePartition.maybeInitialize();
+
+ // Creating 2 batches starting from 21, such that there is a natural
gap from 11 to 20
+ ByteBuffer buffer = ByteBuffer.allocate(4096);
+ memoryRecordsBuilder(buffer, 15, 21).close();
+ memoryRecordsBuilder(buffer, 25, 36).close();
+ buffer.flip();
+ MemoryRecords records = MemoryRecords.readableRecords(buffer);
+
+ List<AcquiredRecords> acquiredRecordsList =
fetchAcquiredRecords(sharePartition.acquire(
+ MEMBER_ID,
+ BATCH_SIZE,
+ MAX_FETCH_RECORDS,
+ new FetchPartitionData(Errors.NONE, 3, 0, records,
+ Optional.empty(), OptionalLong.empty(), Optional.empty(),
OptionalInt.empty(), false)),
+ 20);
+
+ // Acquired batches will contain the following ->
+ // 1. 31-40 (gap offsets)
+ // 2. 51-60 (new offsets)
+ List<AcquiredRecords> expectedAcquiredRecord = new
ArrayList<>(expectedAcquiredRecord(31, 40, 1));
+ expectedAcquiredRecord.addAll(expectedAcquiredRecord(51, 60, 1));
+ assertArrayEquals(expectedAcquiredRecord.toArray(),
acquiredRecordsList.toArray());
- assertArrayEquals(expectedAcquiredRecords(12, 13, 2).toArray(),
acquiredRecordsList.toArray());
- assertEquals(15, sharePartition.nextFetchOffset());
+ assertEquals(SharePartitionState.ACTIVE,
sharePartition.partitionState());
+ assertFalse(sharePartition.cachedState().isEmpty());
+ assertEquals(11, sharePartition.startOffset());
+ assertEquals(60, sharePartition.endOffset());
+ assertEquals(3, sharePartition.stateEpoch());
+ assertEquals(61, sharePartition.nextFetchOffset());
+
+ SharePartition.InitialReadGapOffset initialReadGapOffset =
sharePartition.initialReadGapOffset();
+ assertNull(initialReadGapOffset);
}
@Test
- public void testAcquireReleasedRecordMultipleBatches() {
- SharePartition sharePartition =
SharePartitionBuilder.builder().withState(SharePartitionState.ACTIVE).build();
- // First fetch request with 5 records starting from offset 10.
- MemoryRecords records1 = memoryRecords(5, 10);
- // Second fetch request with 5 records starting from offset 15.
- MemoryRecords records2 = memoryRecords(5, 15);
- // Third fetch request with 5 records starting from offset 23, gap of
3 offsets.
- MemoryRecords records3 = memoryRecords(5, 23);
- // Fourth fetch request with 5 records starting from offset 28.
- MemoryRecords records4 = memoryRecords(5, 28);
+ public void
testAcquireCachedStateInitialGapOverlapsWithActualPartitionGap() {
+ Persister persister = Mockito.mock(Persister.class);
+ ReadShareGroupStateResult readShareGroupStateResult =
Mockito.mock(ReadShareGroupStateResult.class);
+
Mockito.when(readShareGroupStateResult.topicsData()).thenReturn(List.of(
+ new TopicData<>(TOPIC_ID_PARTITION.topicId(), List.of(
+ PartitionFactory.newPartitionAllData(0, 3, 11L,
Errors.NONE.code(), Errors.NONE.message(),
+ Arrays.asList(
+ new PersisterStateBatch(21L, 30L,
RecordState.ACKNOWLEDGED.id, (short) 2), // There is a gap from 11 to 20
+ new PersisterStateBatch(41L, 50L,
RecordState.ARCHIVED.id, (short) 1) // There is a gap from 31-40
+ ))))));
+
Mockito.when(persister.readState(Mockito.any())).thenReturn(CompletableFuture.completedFuture(readShareGroupStateResult));
+
+ SharePartition sharePartition =
SharePartitionBuilder.builder().withPersister(persister).build();
+
+ sharePartition.maybeInitialize();
+
+ // Creating 2 batches starting from 16, such that there is a natural
gap from 11 to 15
+ ByteBuffer buffer = ByteBuffer.allocate(4096);
+ memoryRecordsBuilder(buffer, 20, 16).close();
+ memoryRecordsBuilder(buffer, 25, 36).close();
+ buffer.flip();
+ MemoryRecords records = MemoryRecords.readableRecords(buffer);
List<AcquiredRecords> acquiredRecordsList =
fetchAcquiredRecords(sharePartition.acquire(
- MEMBER_ID,
- BATCH_SIZE,
- MAX_FETCH_RECORDS,
- new FetchPartitionData(Errors.NONE, 40, 3, records1,
- Optional.empty(), OptionalLong.empty(), Optional.empty(),
OptionalInt.empty(), false)),
- 5);
+ MEMBER_ID,
+ BATCH_SIZE,
+ MAX_FETCH_RECORDS,
+ new FetchPartitionData(Errors.NONE, 3, 0, records,
+ Optional.empty(), OptionalLong.empty(), Optional.empty(),
OptionalInt.empty(), false)),
+ 25);
- assertArrayEquals(expectedAcquiredRecords(records1, 1).toArray(),
acquiredRecordsList.toArray());
- assertEquals(15, sharePartition.nextFetchOffset());
+ // Acquired batches will contain the following ->
+ // 1. 16-20 (gap offsets)
+ // 1. 31-40 (gap offsets)
+ // 2. 51-60 (new offsets)
+ List<AcquiredRecords> expectedAcquiredRecord = new
ArrayList<>(expectedAcquiredRecord(16, 20, 1));
+ expectedAcquiredRecord.addAll(expectedAcquiredRecord(31, 40, 1));
+ expectedAcquiredRecord.addAll(expectedAcquiredRecord(51, 60, 1));
+ assertArrayEquals(expectedAcquiredRecord.toArray(),
acquiredRecordsList.toArray());
- acquiredRecordsList = fetchAcquiredRecords(sharePartition.acquire(
- MEMBER_ID,
- BATCH_SIZE,
- MAX_FETCH_RECORDS,
- new FetchPartitionData(Errors.NONE, 30, 3, records2,
- Optional.empty(), OptionalLong.empty(), Optional.empty(),
OptionalInt.empty(), false)),
- 5);
+ assertEquals(SharePartitionState.ACTIVE,
sharePartition.partitionState());
+ assertFalse(sharePartition.cachedState().isEmpty());
+ assertEquals(16, sharePartition.startOffset());
+ assertEquals(60, sharePartition.endOffset());
+ assertEquals(3, sharePartition.stateEpoch());
+ assertEquals(61, sharePartition.nextFetchOffset());
- assertArrayEquals(expectedAcquiredRecords(records2, 1).toArray(),
acquiredRecordsList.toArray());
- assertEquals(20, sharePartition.nextFetchOffset());
+ SharePartition.InitialReadGapOffset initialReadGapOffset =
sharePartition.initialReadGapOffset();
+ assertNull(initialReadGapOffset);
+ }
- acquiredRecordsList = fetchAcquiredRecords(sharePartition.acquire(
- MEMBER_ID,
- BATCH_SIZE,
- MAX_FETCH_RECORDS,
- new FetchPartitionData(Errors.NONE, 30, 3, records3,
- Optional.empty(), OptionalLong.empty(), Optional.empty(),
OptionalInt.empty(), false)),
- 5);
+ @Test
+ public void
testAcquireCachedStateGapInBetweenOverlapsWithActualPartitionGap() {
+ Persister persister = Mockito.mock(Persister.class);
+ ReadShareGroupStateResult readShareGroupStateResult =
Mockito.mock(ReadShareGroupStateResult.class);
+
Mockito.when(readShareGroupStateResult.topicsData()).thenReturn(List.of(
+ new TopicData<>(TOPIC_ID_PARTITION.topicId(), List.of(
+ PartitionFactory.newPartitionAllData(0, 3, 11L,
Errors.NONE.code(), Errors.NONE.message(),
+ Arrays.asList(
+ new PersisterStateBatch(21L, 30L,
RecordState.ACKNOWLEDGED.id, (short) 2), // There is a gap from 11 to 20
+ new PersisterStateBatch(41L, 50L,
RecordState.ARCHIVED.id, (short) 1) // There is a gap from 31-40
+ ))))));
+
Mockito.when(persister.readState(Mockito.any())).thenReturn(CompletableFuture.completedFuture(readShareGroupStateResult));
- assertArrayEquals(expectedAcquiredRecords(records3, 1).toArray(),
acquiredRecordsList.toArray());
- assertEquals(28, sharePartition.nextFetchOffset());
+ SharePartition sharePartition =
SharePartitionBuilder.builder().withPersister(persister).build();
- acquiredRecordsList = fetchAcquiredRecords(sharePartition.acquire(
- MEMBER_ID,
- BATCH_SIZE,
- MAX_FETCH_RECORDS,
- new FetchPartitionData(Errors.NONE, 30, 3, records4,
- Optional.empty(), OptionalLong.empty(), Optional.empty(),
OptionalInt.empty(), false)),
- 5);
+ sharePartition.maybeInitialize();
- assertArrayEquals(expectedAcquiredRecords(records4, 1).toArray(),
acquiredRecordsList.toArray());
- assertEquals(33, sharePartition.nextFetchOffset());
+ // Creating 3 batches starting from 11, such that there is a natural
gap from 26 to 30
+ ByteBuffer buffer = ByteBuffer.allocate(4096);
+ memoryRecordsBuilder(buffer, 10, 11).close();
+ memoryRecordsBuilder(buffer, 15, 21).close();
+ memoryRecordsBuilder(buffer, 20, 41).close();
+ buffer.flip();
+ MemoryRecords records = MemoryRecords.readableRecords(buffer);
- assertEquals(RecordState.ACQUIRED,
sharePartition.cachedState().get(10L).batchState());
- assertEquals(RecordState.ACQUIRED,
sharePartition.cachedState().get(15L).batchState());
- assertEquals(RecordState.ACQUIRED,
sharePartition.cachedState().get(23L).batchState());
- assertEquals(RecordState.ACQUIRED,
sharePartition.cachedState().get(28L).batchState());
- assertNull(sharePartition.cachedState().get(10L).offsetState());
- assertNull(sharePartition.cachedState().get(15L).offsetState());
- assertNull(sharePartition.cachedState().get(23L).offsetState());
- assertNull(sharePartition.cachedState().get(28L).offsetState());
+ List<AcquiredRecords> acquiredRecordsList =
fetchAcquiredRecords(sharePartition.acquire(
+ MEMBER_ID,
+ BATCH_SIZE,
+ MAX_FETCH_RECORDS,
+ new FetchPartitionData(Errors.NONE, 3, 0, records,
+ Optional.empty(), OptionalLong.empty(), Optional.empty(),
OptionalInt.empty(), false)),
+ 30);
+
+ // Acquired batches will contain the following ->
+ // 1. 11-20 (gap offsets)
+ // 1. 31-40 (gap offsets)
+ // 2. 51-60 (new offsets)
+ // The entire gap of 31 to 40 will be acquired even when the fetched
records only contain offsets 31 to 36 because
+ // we rely on the client to inform the broker about these natural gaps
in the partition log
+ List<AcquiredRecords> expectedAcquiredRecord = new
ArrayList<>(expectedAcquiredRecord(11, 20, 1));
+ expectedAcquiredRecord.addAll(expectedAcquiredRecord(31, 40, 1));
+ expectedAcquiredRecord.addAll(expectedAcquiredRecord(51, 60, 1));
+ assertArrayEquals(expectedAcquiredRecord.toArray(),
acquiredRecordsList.toArray());
- CompletableFuture<Void> ackResult = sharePartition.acknowledge(
- MEMBER_ID,
- Collections.singletonList(new ShareAcknowledgementBatch(12, 30,
Collections.singletonList((byte) 2))));
- assertNull(ackResult.join());
- assertFalse(ackResult.isCompletedExceptionally());
+ assertEquals(SharePartitionState.ACTIVE,
sharePartition.partitionState());
+ assertFalse(sharePartition.cachedState().isEmpty());
+ assertEquals(11, sharePartition.startOffset());
+ assertEquals(60, sharePartition.endOffset());
+ assertEquals(3, sharePartition.stateEpoch());
+ assertEquals(61, sharePartition.nextFetchOffset());
- assertEquals(12, sharePartition.nextFetchOffset());
- assertEquals(4, sharePartition.cachedState().size());
- assertThrows(IllegalStateException.class, () ->
sharePartition.cachedState().get(10L).batchState());
- assertNotNull(sharePartition.cachedState().get(10L).offsetState());
- assertEquals(RecordState.AVAILABLE,
sharePartition.cachedState().get(15L).batchState());
- assertNull(sharePartition.cachedState().get(15L).offsetState());
- assertEquals(EMPTY_MEMBER_ID,
sharePartition.cachedState().get(15L).batchMemberId());
- assertEquals(RecordState.AVAILABLE,
sharePartition.cachedState().get(23L).batchState());
- assertNull(sharePartition.cachedState().get(23L).offsetState());
- assertEquals(EMPTY_MEMBER_ID,
sharePartition.cachedState().get(23L).batchMemberId());
- assertThrows(IllegalStateException.class, () ->
sharePartition.cachedState().get(28L).batchState());
- assertNotNull(sharePartition.cachedState().get(28L).offsetState());
+ SharePartition.InitialReadGapOffset initialReadGapOffset =
sharePartition.initialReadGapOffset();
+ assertNull(initialReadGapOffset);
+ }
- Map<Long, InFlightState> expectedOffsetStateMap = new HashMap<>();
- expectedOffsetStateMap.put(10L, new
InFlightState(RecordState.ACQUIRED, (short) 1, MEMBER_ID));
- expectedOffsetStateMap.put(11L, new
InFlightState(RecordState.ACQUIRED, (short) 1, MEMBER_ID));
- expectedOffsetStateMap.put(12L, new
InFlightState(RecordState.AVAILABLE, (short) 1, EMPTY_MEMBER_ID));
- expectedOffsetStateMap.put(13L, new
InFlightState(RecordState.AVAILABLE, (short) 1, EMPTY_MEMBER_ID));
- expectedOffsetStateMap.put(14L, new
InFlightState(RecordState.AVAILABLE, (short) 1, EMPTY_MEMBER_ID));
- assertEquals(expectedOffsetStateMap,
sharePartition.cachedState().get(10L).offsetState());
+ @Test
+ public void testAcquireWhenRecordsFetchedAfterGapsAreFetched() {
+ Persister persister = Mockito.mock(Persister.class);
+ ReadShareGroupStateResult readShareGroupStateResult =
Mockito.mock(ReadShareGroupStateResult.class);
+
Mockito.when(readShareGroupStateResult.topicsData()).thenReturn(List.of(
+ new TopicData<>(TOPIC_ID_PARTITION.topicId(), List.of(
+ PartitionFactory.newPartitionAllData(0, 3, 11L,
Errors.NONE.code(), Errors.NONE.message(),
+ Arrays.asList(
+ new PersisterStateBatch(11L, 20L,
RecordState.ACKNOWLEDGED.id, (short) 2),
+ new PersisterStateBatch(31L, 40L,
RecordState.ARCHIVED.id, (short) 1) // There is a gap from 21 to 30
+ ))))));
+
Mockito.when(persister.readState(Mockito.any())).thenReturn(CompletableFuture.completedFuture(readShareGroupStateResult));
- expectedOffsetStateMap.clear();
- expectedOffsetStateMap.put(28L, new
InFlightState(RecordState.AVAILABLE, (short) 1, EMPTY_MEMBER_ID));
- expectedOffsetStateMap.put(29L, new
InFlightState(RecordState.AVAILABLE, (short) 1, EMPTY_MEMBER_ID));
- expectedOffsetStateMap.put(30L, new
InFlightState(RecordState.AVAILABLE, (short) 1, EMPTY_MEMBER_ID));
- expectedOffsetStateMap.put(31L, new
InFlightState(RecordState.ACQUIRED, (short) 1, MEMBER_ID));
- expectedOffsetStateMap.put(32L, new
InFlightState(RecordState.ACQUIRED, (short) 1, MEMBER_ID));
- assertEquals(expectedOffsetStateMap,
sharePartition.cachedState().get(28L).offsetState());
+ WriteShareGroupStateResult writeShareGroupStateResult =
Mockito.mock(WriteShareGroupStateResult.class);
+
Mockito.when(writeShareGroupStateResult.topicsData()).thenReturn(List.of(
+ new TopicData<>(TOPIC_ID_PARTITION.topicId(), List.of(
+ PartitionFactory.newPartitionErrorData(0, Errors.NONE.code(),
Errors.NONE.message())))));
+
Mockito.when(persister.writeState(Mockito.any())).thenReturn(CompletableFuture.completedFuture(writeShareGroupStateResult));
- // Send next batch from offset 12, only 3 records should be acquired.
- acquiredRecordsList = fetchAcquiredRecords(sharePartition.acquire(
- MEMBER_ID,
- BATCH_SIZE,
- MAX_FETCH_RECORDS,
- new FetchPartitionData(Errors.NONE, 40, 3, records1,
- Optional.empty(), OptionalLong.empty(), Optional.empty(),
OptionalInt.empty(), false)),
- 3);
+ SharePartition sharePartition =
SharePartitionBuilder.builder().withPersister(persister).build();
- assertArrayEquals(expectedAcquiredRecords(12, 14, 2).toArray(),
acquiredRecordsList.toArray());
- assertEquals(15, sharePartition.nextFetchOffset());
+ sharePartition.maybeInitialize();
- // Though record2 batch exists to acquire but send batch record3, it
should be acquired but
- // next fetch offset should not move.
- acquiredRecordsList = fetchAcquiredRecords(sharePartition.acquire(
+ // Fetched records are from 21 to 35
+ MemoryRecords records = memoryRecords(15, 21);
+
+ List<AcquiredRecords> acquiredRecordsList =
fetchAcquiredRecords(sharePartition.acquire(
MEMBER_ID,
BATCH_SIZE,
- MAX_FETCH_RECORDS,
- new FetchPartitionData(Errors.NONE, 40, 3, records3,
+ MAX_FETCH_RECORDS, // maxFetchRecords is less than the number of
records fetched
+ new FetchPartitionData(Errors.NONE, 3, 0, records,
Optional.empty(), OptionalLong.empty(), Optional.empty(),
OptionalInt.empty(), false)),
- 5);
+ 10);
- assertArrayEquals(expectedAcquiredRecords(records3, 2).toArray(),
acquiredRecordsList.toArray());
- assertEquals(15, sharePartition.nextFetchOffset());
+ // Since the gap if only from 21 to 30 and the next batch is ARCHIVED,
only 10 gap offsets will be acquired as a single batch
+ assertArrayEquals(expectedAcquiredRecord(21, 30, 1).toArray(),
acquiredRecordsList.toArray());
- // Acquire partial records from batch 2.
- MemoryRecords subsetRecords = memoryRecords(2, 17);
- acquiredRecordsList = fetchAcquiredRecords(sharePartition.acquire(
- MEMBER_ID,
- BATCH_SIZE,
- MAX_FETCH_RECORDS,
- new FetchPartitionData(Errors.NONE, 20, 3, subsetRecords,
- Optional.empty(), OptionalLong.empty(), Optional.empty(),
OptionalInt.empty(), false)),
- 2);
+ assertEquals(SharePartitionState.ACTIVE,
sharePartition.partitionState());
+ assertFalse(sharePartition.cachedState().isEmpty());
+ assertEquals(21, sharePartition.startOffset());
+ assertEquals(40, sharePartition.endOffset());
+ assertEquals(3, sharePartition.stateEpoch());
+ assertEquals(41, sharePartition.nextFetchOffset());
- assertArrayEquals(expectedAcquiredRecords(17, 18, 2).toArray(),
acquiredRecordsList.toArray());
- // Next fetch offset should not move.
- assertEquals(15, sharePartition.nextFetchOffset());
+ SharePartition.InitialReadGapOffset initialReadGapOffset =
sharePartition.initialReadGapOffset();
+ assertNotNull(initialReadGapOffset);
- // Acquire partial records from record 4 to further test if the next
fetch offset move
- // accordingly once complete record 2 is also acquired.
- subsetRecords = memoryRecords(1, 28);
- acquiredRecordsList = fetchAcquiredRecords(sharePartition.acquire(
- MEMBER_ID,
- BATCH_SIZE,
- MAX_FETCH_RECORDS,
- new FetchPartitionData(Errors.NONE, 20, 3, subsetRecords,
- Optional.empty(), OptionalLong.empty(), Optional.empty(),
OptionalInt.empty(), false)),
- 1);
+ assertEquals(31, initialReadGapOffset.gapStartOffset());
+ assertEquals(40, initialReadGapOffset.endOffset());
- assertArrayEquals(expectedAcquiredRecords(28, 28, 2).toArray(),
acquiredRecordsList.toArray());
- // Next fetch offset should not move.
- assertEquals(15, sharePartition.nextFetchOffset());
+ // Fetching from the nextFetchOffset so that endOffset moves ahead
+ records = memoryRecords(15, 41);
- // Try to acquire complete record 2 though it's already partially
acquired, the next fetch
- // offset should move.
acquiredRecordsList = fetchAcquiredRecords(sharePartition.acquire(
- MEMBER_ID,
- BATCH_SIZE,
- MAX_FETCH_RECORDS,
- new FetchPartitionData(Errors.NONE, 20, 3, records2,
- Optional.empty(), OptionalLong.empty(), Optional.empty(),
OptionalInt.empty(), false)),
- 3);
+ MEMBER_ID,
+ BATCH_SIZE,
+ MAX_FETCH_RECORDS, // maxFetchRecords is less than the number
of records fetched
+ new FetchPartitionData(Errors.NONE, 3, 0, records,
+ Optional.empty(), OptionalLong.empty(), Optional.empty(),
OptionalInt.empty(), false)),
+ 15);
- // Offset 15,16 and 19 should be acquired.
- List<AcquiredRecords> expectedAcquiredRecords =
expectedAcquiredRecords(15, 16, 2);
- expectedAcquiredRecords.addAll(expectedAcquiredRecords(19, 19, 2));
- assertArrayEquals(expectedAcquiredRecords.toArray(),
acquiredRecordsList.toArray());
- // Next fetch offset should not move.
- assertEquals(29, sharePartition.nextFetchOffset());
+ assertArrayEquals(expectedAcquiredRecord(41, 55, 1).toArray(),
acquiredRecordsList.toArray());
+
+ assertEquals(SharePartitionState.ACTIVE,
sharePartition.partitionState());
+ assertFalse(sharePartition.cachedState().isEmpty());
+ assertEquals(21, sharePartition.startOffset());
+ assertEquals(55, sharePartition.endOffset());
+ assertEquals(3, sharePartition.stateEpoch());
+ assertEquals(56, sharePartition.nextFetchOffset());
+
+ // Since the endOffset is now moved ahead, the initialReadGapOffset
should be empty
+ initialReadGapOffset = sharePartition.initialReadGapOffset();
+ assertNull(initialReadGapOffset);
}
@Test
@@ -5193,6 +6157,55 @@ public class SharePartitionTest {
assertEquals(200, sharePartition.nextFetchOffset());
}
+ @Test
+ public void testMaybeUpdateCachedStateGapAfterLastOffsetAcknowledged() {
+ Persister persister = Mockito.mock(Persister.class);
+ ReadShareGroupStateResult readShareGroupStateResult =
Mockito.mock(ReadShareGroupStateResult.class);
+
Mockito.when(readShareGroupStateResult.topicsData()).thenReturn(List.of(
+ new TopicData<>(TOPIC_ID_PARTITION.topicId(), List.of(
+ PartitionFactory.newPartitionAllData(0, 3, 11L,
Errors.NONE.code(), Errors.NONE.message(),
+ Arrays.asList(
+ new PersisterStateBatch(11L, 20L,
RecordState.AVAILABLE.id, (short) 2),
+ new PersisterStateBatch(31L, 40L,
RecordState.ARCHIVED.id, (short) 1) // There is a gap from 21 to 30
+ ))))));
+
Mockito.when(persister.readState(Mockito.any())).thenReturn(CompletableFuture.completedFuture(readShareGroupStateResult));
+
+ WriteShareGroupStateResult writeShareGroupStateResult =
Mockito.mock(WriteShareGroupStateResult.class);
+
Mockito.when(writeShareGroupStateResult.topicsData()).thenReturn(List.of(
+ new TopicData<>(TOPIC_ID_PARTITION.topicId(), List.of(
+ PartitionFactory.newPartitionErrorData(0, Errors.NONE.code(),
Errors.NONE.message())))));
+
Mockito.when(persister.writeState(Mockito.any())).thenReturn(CompletableFuture.completedFuture(writeShareGroupStateResult));
+
+ SharePartition sharePartition =
SharePartitionBuilder.builder().withPersister(persister).build();
+
+ sharePartition.maybeInitialize();
+
+ // Acquiring the first AVAILABLE batch from 11 to 20
+ sharePartition.acquire(MEMBER_ID, BATCH_SIZE, MAX_FETCH_RECORDS, new
FetchPartitionData(Errors.NONE, 20, 0, memoryRecords(10, 11),
+ Optional.empty(), OptionalLong.empty(), Optional.empty(),
+ OptionalInt.empty(), false));
+ assertTrue(sharePartition.canAcquireRecords());
+
+ // Sending acknowledgment for the first batch from 11 to 20
+ sharePartition.acknowledge(MEMBER_ID, List.of(
+ new ShareAcknowledgementBatch(11, 20, List.of((byte) 1))));
+
+ assertTrue(sharePartition.canAcquireRecords());
+ // After the acknowledgement is done successfully,
maybeUpdateCachedStateAndOffsets method is invoked to see
+ // if the start offset can be moved ahead. The last offset
acknowledged is 20. But instead of moving start
+ // offset to the next batch in the cached state (31 to 40), it is
moved to the next offset of the last
+ // acknowledged offset (21). This is because there is an acquirable
gap in the cached state from 21 to 30.
+ assertEquals(21, sharePartition.startOffset());
+ assertEquals(40, sharePartition.endOffset());
+ assertEquals(21, sharePartition.nextFetchOffset());
+
+ SharePartition.InitialReadGapOffset initialReadGapOffset =
sharePartition.initialReadGapOffset();
+ assertNotNull(initialReadGapOffset);
+
+ assertEquals(21, initialReadGapOffset.gapStartOffset());
+ assertEquals(40, initialReadGapOffset.endOffset());
+ }
+
@Test
public void testCanAcquireRecordsReturnsTrue() {
SharePartition sharePartition =
SharePartitionBuilder.builder().withState(SharePartitionState.ACTIVE).build();
@@ -5749,16 +6762,16 @@ public class SharePartitionTest {
@Test
public void
testMaybeInitializeWhenReadStateRpcReturnsZeroAvailableRecords() {
List<PersisterStateBatch> stateBatches = new ArrayList<>();
+ stateBatches.add(new PersisterStateBatch(233L, 233L,
RecordState.ARCHIVED.id, (short) 1));
for (int i = 0; i < 500; i++) {
stateBatches.add(new PersisterStateBatch(234L + i, 234L + i,
RecordState.ACKNOWLEDGED.id, (short) 1));
}
- stateBatches.add(new PersisterStateBatch(232L, 232L,
RecordState.ARCHIVED.id, (short) 1));
Persister persister = Mockito.mock(Persister.class);
ReadShareGroupStateResult readShareGroupStateResult =
Mockito.mock(ReadShareGroupStateResult.class);
Mockito.when(readShareGroupStateResult.topicsData()).thenReturn(Collections.singletonList(
new TopicData<>(TOPIC_ID_PARTITION.topicId(),
Collections.singletonList(
- PartitionFactory.newPartitionAllData(0, 3, 232L,
Errors.NONE.code(), Errors.NONE.message(),
+ PartitionFactory.newPartitionAllData(0, 3, 233L,
Errors.NONE.code(), Errors.NONE.message(),
stateBatches)))));
Mockito.when(persister.readState(Mockito.any())).thenReturn(CompletableFuture.completedFuture(readShareGroupStateResult));
SharePartition sharePartition =
SharePartitionBuilder.builder().withPersister(persister).build();
@@ -5845,6 +6858,37 @@ public class SharePartitionTest {
assertEquals(RecordState.ACQUIRED,
sharePartition.cachedState().get(10L).batchState());
}
+ @Test
+ public void testFindLastOffsetAcknowledgedWhenGapAtBeginning() {
+ Persister persister = Mockito.mock(Persister.class);
+ ReadShareGroupStateResult readShareGroupStateResult =
Mockito.mock(ReadShareGroupStateResult.class);
+
Mockito.when(readShareGroupStateResult.topicsData()).thenReturn(List.of(
+ new TopicData<>(TOPIC_ID_PARTITION.topicId(), List.of(
+ PartitionFactory.newPartitionAllData(0, 3, 11L,
Errors.NONE.code(), Errors.NONE.message(),
+ Arrays.asList(
+ new PersisterStateBatch(21L, 30L,
RecordState.ACKNOWLEDGED.id, (short) 2), // There is a gap from 11 to 20
+ new PersisterStateBatch(31L, 40L,
RecordState.ARCHIVED.id, (short) 1)
+ ))))));
+
Mockito.when(persister.readState(Mockito.any())).thenReturn(CompletableFuture.completedFuture(readShareGroupStateResult));
+
+ SharePartition sharePartition =
SharePartitionBuilder.builder().withPersister(persister).build();
+
+ sharePartition.maybeInitialize();
+
+ SharePartition.InitialReadGapOffset initialReadGapOffset =
sharePartition.initialReadGapOffset();
+ assertNotNull(initialReadGapOffset);
+
+ // Since there is a gap in the beginning, the initialReadGapOffset
window is same as the cachedState
+ assertEquals(11, initialReadGapOffset.gapStartOffset());
+ assertEquals(40, initialReadGapOffset.endOffset());
+
+ long lastOffsetAcknowledged =
sharePartition.findLastOffsetAcknowledged();
+
+ // Since the initialReadGapOffset window begins at startOffset, we
cannot count any of the offsets as acknowledged.
+ // Thus, lastOffsetAcknowledged should be -1
+ assertEquals(-1, lastOffsetAcknowledged);
+ }
+
private List<AcquiredRecords> fetchAcquiredRecords(ShareAcquiredRecords
shareAcquiredRecords, int expectedOffsetCount) {
assertNotNull(shareAcquiredRecords);
assertEquals(expectedOffsetCount, shareAcquiredRecords.count());