This is an automated email from the ASF dual-hosted git repository.
mittal pushed a commit to branch 4.1
in repository https://gitbox.apache.org/repos/asf/kafka.git
The following commit(s) were added to refs/heads/4.1 by this push:
new 978672b7249 KAFKA-19436: Restrict cache update for ongoing
batch/offset state (#20041) (#20647)
978672b7249 is described below
commit 978672b7249c902062ce5f758d32f476fb9b56fe
Author: Apoorv Mittal <[email protected]>
AuthorDate: Tue Oct 7 14:45:58 2025 +0100
KAFKA-19436: Restrict cache update for ongoing batch/offset state (#20041)
(#20647)
Cherry-pick commit from
https://github.com/apache/kafka/commit/96ef1c520a
In the stress testing it was noticed that on acquisition lock timeout,
some offsets were not found in the cache. The cache can be tried to be
updated in different acknowledgement calls hence if there is an ongoing
transition which is not yet finished but another parallel
acknowledgement triggers the cache update then the cache can be updated
incorrectly, while first transition is not yet finished.
Though the cache update happens for Archived and Acknowldeged records
hence this issue or existing implementation should not hamper the queues
functionality. But it might update the cache early when persister call
might fail or this issue triggers error logs with offset not found in
cache when acquisition lock timeouts (in some scenarios).
Reviewers: Abhinav Dixit <[email protected]>, Andrew Schofield
<[email protected]>
---
.../java/kafka/server/share/SharePartition.java | 24 ++++--
.../kafka/server/share/SharePartitionTest.java | 97 ++++++++++++++++++++++
2 files changed, 115 insertions(+), 6 deletions(-)
diff --git a/core/src/main/java/kafka/server/share/SharePartition.java
b/core/src/main/java/kafka/server/share/SharePartition.java
index 09385d6c48c..eb0fedbe8cd 100644
--- a/core/src/main/java/kafka/server/share/SharePartition.java
+++ b/core/src/main/java/kafka/server/share/SharePartition.java
@@ -2178,7 +2178,8 @@ public class SharePartition {
}
}
- private boolean canMoveStartOffset() {
+ // Visible for testing.
+ boolean canMoveStartOffset() {
// The Share Partition Start Offset may be moved after acknowledge
request is complete.
// The following conditions need to be met to move the startOffset:
// 1. When the cachedState is not empty.
@@ -2203,7 +2204,15 @@ public class SharePartition {
"as there is an acquirable gap at the beginning. Cannot move
the start offset.", startOffset, groupId, topicIdPartition);
return false;
}
- RecordState startOffsetState = entry.getValue().offsetState == null ?
+ boolean isBatchState = entry.getValue().offsetState() == null;
+ boolean isOngoingTransition = isBatchState ?
+ entry.getValue().batchHasOngoingStateTransition() :
+
entry.getValue().offsetState().get(startOffset).hasOngoingStateTransition();
+ if (isOngoingTransition) {
+ return false;
+ }
+
+ RecordState startOffsetState = isBatchState ?
entry.getValue().batchState() :
entry.getValue().offsetState().get(startOffset).state();
return isRecordStateAcknowledged(startOffsetState);
@@ -2238,13 +2247,13 @@ public class SharePartition {
}
if (inFlightBatch.offsetState() == null) {
- if
(!isRecordStateAcknowledged(inFlightBatch.batchState())) {
+ if (inFlightBatch.batchHasOngoingStateTransition() ||
!isRecordStateAcknowledged(inFlightBatch.batchState())) {
return lastOffsetAcknowledged;
}
lastOffsetAcknowledged = inFlightBatch.lastOffset();
} else {
for (Map.Entry<Long, InFlightState> offsetState :
inFlightBatch.offsetState.entrySet()) {
- if
(!isRecordStateAcknowledged(offsetState.getValue().state())) {
+ if (offsetState.getValue().hasOngoingStateTransition()
|| !isRecordStateAcknowledged(offsetState.getValue().state())) {
return lastOffsetAcknowledged;
}
lastOffsetAcknowledged = offsetState.getKey();
@@ -2913,7 +2922,8 @@ public class SharePartition {
return batchState;
}
- private boolean batchHasOngoingStateTransition() {
+ // Visible for testing.
+ boolean batchHasOngoingStateTransition() {
return inFlightState().hasOngoingStateTransition();
}
@@ -3034,7 +3044,8 @@ public class SharePartition {
acquisitionLockTimeoutTask = null;
}
- private boolean hasOngoingStateTransition() {
+ // Visible for testing.
+ boolean hasOngoingStateTransition() {
if (rollbackState == null) {
// This case could occur when the batch/offset hasn't
transitioned even once or the state transitions have
// been committed.
@@ -3067,6 +3078,7 @@ public class SharePartition {
return this;
} catch (IllegalStateException e) {
log.error("Failed to update state of the records", e);
+ rollbackState = null;
return null;
}
}
diff --git a/core/src/test/java/kafka/server/share/SharePartitionTest.java
b/core/src/test/java/kafka/server/share/SharePartitionTest.java
index 0cc7ad4d9a1..2c77e98d72a 100644
--- a/core/src/test/java/kafka/server/share/SharePartitionTest.java
+++ b/core/src/test/java/kafka/server/share/SharePartitionTest.java
@@ -6479,6 +6479,103 @@ public class SharePartitionTest {
assertEquals(-1, lastOffsetAcknowledged);
}
+ @Test
+ public void testCacheUpdateWhenBatchHasOngoingTransition() {
+ Persister persister = Mockito.mock(Persister.class);
+
+ SharePartition sharePartition = SharePartitionBuilder.builder()
+ .withState(SharePartitionState.ACTIVE)
+ .withPersister(persister)
+ .build();
+ // Acquire a single batch.
+ fetchAcquiredRecords(
+ sharePartition.acquire(MEMBER_ID, BATCH_SIZE, MAX_FETCH_RECORDS,
21,
+ fetchPartitionData(memoryRecords(10, 21)), FETCH_ISOLATION_HWM
+ ), 10
+ );
+
+ // Validate that there is no ongoing transition.
+
assertFalse(sharePartition.cachedState().get(21L).batchHasOngoingStateTransition());
+ // Return a future which will be completed later, so the batch state
has ongoing transition.
+ CompletableFuture<WriteShareGroupStateResult> future = new
CompletableFuture<>();
+ Mockito.when(persister.writeState(Mockito.any())).thenReturn(future);
+ // Acknowledge batch to create ongoing transition.
+ sharePartition.acknowledge(MEMBER_ID, List.of(new
ShareAcknowledgementBatch(21, 30, List.of(AcknowledgeType.ACCEPT.id))));
+
+ // Assert the start offset has not moved and batch has ongoing
transition.
+ assertEquals(21L, sharePartition.startOffset());
+ assertEquals(1, sharePartition.cachedState().size());
+
assertTrue(sharePartition.cachedState().get(21L).batchHasOngoingStateTransition());
+
+ // Validate that offset can't be moved because batch has ongoing
transition.
+ assertFalse(sharePartition.canMoveStartOffset());
+ assertEquals(-1, sharePartition.findLastOffsetAcknowledged());
+
+ // Complete the future so acknowledge API can be completed, which
updates the cache.
+ WriteShareGroupStateResult writeShareGroupStateResult =
Mockito.mock(WriteShareGroupStateResult.class);
+
Mockito.when(writeShareGroupStateResult.topicsData()).thenReturn(List.of(
+ new TopicData<>(TOPIC_ID_PARTITION.topicId(), List.of(
+ PartitionFactory.newPartitionErrorData(0, Errors.NONE.code(),
Errors.NONE.message())))));
+ future.complete(writeShareGroupStateResult);
+
+ // Validate the cache has been updated.
+ assertEquals(31L, sharePartition.startOffset());
+ assertTrue(sharePartition.cachedState().isEmpty());
+ }
+
+ @Test
+ public void testCacheUpdateWhenOffsetStateHasOngoingTransition() {
+ Persister persister = Mockito.mock(Persister.class);
+
+ SharePartition sharePartition = SharePartitionBuilder.builder()
+ .withState(SharePartitionState.ACTIVE)
+ .withPersister(persister)
+ .build();
+ // Acquire a single batch.
+ fetchAcquiredRecords(
+ sharePartition.acquire(MEMBER_ID, BATCH_SIZE, MAX_FETCH_RECORDS,
21,
+ fetchPartitionData(memoryRecords(10, 21)), FETCH_ISOLATION_HWM
+ ), 10
+ );
+
+ // Validate that there is no ongoing transition.
+
assertFalse(sharePartition.cachedState().get(21L).batchHasOngoingStateTransition());
+ assertNull(sharePartition.cachedState().get(21L).offsetState());
+ // Return a future which will be completed later, so the batch state
has ongoing transition.
+ CompletableFuture<WriteShareGroupStateResult> future = new
CompletableFuture<>();
+ Mockito.when(persister.writeState(Mockito.any())).thenReturn(future);
+ // Acknowledge offsets to create ongoing transition.
+ sharePartition.acknowledge(MEMBER_ID, List.of(new
ShareAcknowledgementBatch(21, 23, List.of(AcknowledgeType.ACCEPT.id))));
+
+ // Assert the start offset has not moved and offset state is now
maintained. Offset state should
+ // have ongoing transition.
+ assertEquals(21L, sharePartition.startOffset());
+ assertEquals(1, sharePartition.cachedState().size());
+ assertNotNull(sharePartition.cachedState().get(21L).offsetState());
+
assertTrue(sharePartition.cachedState().get(21L).offsetState().get(21L).hasOngoingStateTransition());
+
assertTrue(sharePartition.cachedState().get(21L).offsetState().get(22L).hasOngoingStateTransition());
+
assertTrue(sharePartition.cachedState().get(21L).offsetState().get(23L).hasOngoingStateTransition());
+ // Only 21, 22 and 23 offsets should have ongoing state transition as
the acknowledge request
+ // contains 21-23 offsets.
+
assertFalse(sharePartition.cachedState().get(21L).offsetState().get(24L).hasOngoingStateTransition());
+
+ // Validate that offset can't be moved because batch has ongoing
transition.
+ assertFalse(sharePartition.canMoveStartOffset());
+ assertEquals(-1, sharePartition.findLastOffsetAcknowledged());
+
+ // Complete the future so acknowledge API can be completed, which
updates the cache.
+ WriteShareGroupStateResult writeShareGroupStateResult =
Mockito.mock(WriteShareGroupStateResult.class);
+
Mockito.when(writeShareGroupStateResult.topicsData()).thenReturn(List.of(
+ new TopicData<>(TOPIC_ID_PARTITION.topicId(), List.of(
+ PartitionFactory.newPartitionErrorData(0, Errors.NONE.code(),
Errors.NONE.message())))));
+ future.complete(writeShareGroupStateResult);
+
+ // Validate the cache has been updated.
+ assertEquals(24L, sharePartition.startOffset());
+ assertEquals(1, sharePartition.cachedState().size());
+ assertNotNull(sharePartition.cachedState().get(21L));
+ }
+
/**
* Test the case where the fetch batch has first record offset greater
than the record batch start offset.
* Such batches can exist for compacted topics.