This is an automated email from the ASF dual-hosted git repository.
mittal pushed a commit to branch trunk
in repository https://gitbox.apache.org/repos/asf/kafka.git
The following commit(s) were added to refs/heads/trunk by this push:
new 8fe1157ac5f MINOR: Adding check for invalid batch from persister in
Share Partition (#20979)
8fe1157ac5f is described below
commit 8fe1157ac5f6c2eaf79e21da3d9bfe5e979312a6
Author: Apoorv Mittal <[email protected]>
AuthorDate: Mon Nov 24 21:52:17 2025 +0000
MINOR: Adding check for invalid batch from persister in Share Partition
(#20979)
While writing unit tests, encounterd incorrect further batches when
persister provides corrupt batches. Added a check to fail early rather
sending incorrect batches to clients later.
Reviewers: Andrew Schofield <[email protected]>
---
.../main/java/kafka/server/share/SharePartition.java | 9 +++++++++
.../java/kafka/server/share/SharePartitionTest.java | 20 ++++++++++++++++++++
2 files changed, 29 insertions(+)
diff --git a/core/src/main/java/kafka/server/share/SharePartition.java
b/core/src/main/java/kafka/server/share/SharePartition.java
index 6f7041b92c0..78ab7c29897 100644
--- a/core/src/main/java/kafka/server/share/SharePartition.java
+++ b/core/src/main/java/kafka/server/share/SharePartition.java
@@ -490,6 +490,15 @@ public class SharePartition {
throwable = new
IllegalStateException(String.format("Failed to initialize the share partition
%s-%s", groupId, topicIdPartition));
return;
}
+
+ if (stateBatch.lastOffset() < stateBatch.firstOffset()) {
+ log.error("Invalid state batch found for the share
partition: {}-{}. The first offset: {}"
+ + " is less than the last offset of the batch:
{}.", groupId, topicIdPartition,
+ stateBatch.firstOffset(), stateBatch.lastOffset());
+ throwable = new
IllegalStateException(String.format("Failed to initialize the share partition
%s-%s", groupId, topicIdPartition));
+ return;
+ }
+
if (gapStartOffset == -1 && stateBatch.firstOffset() >
previousBatchLastOffset + 1) {
gapStartOffset = previousBatchLastOffset + 1;
}
diff --git a/core/src/test/java/kafka/server/share/SharePartitionTest.java
b/core/src/test/java/kafka/server/share/SharePartitionTest.java
index c8be18c027e..73c0242b81f 100644
--- a/core/src/test/java/kafka/server/share/SharePartitionTest.java
+++ b/core/src/test/java/kafka/server/share/SharePartitionTest.java
@@ -1804,6 +1804,26 @@ public class SharePartitionTest {
assertEquals(3, sharePartition.deliveryCompleteCount());
}
+ @Test
+ public void testMaybeInitializeWithInvalidOffsetInBatch() {
+ Persister persister = Mockito.mock(Persister.class);
+ ReadShareGroupStateResult readShareGroupStateResult =
Mockito.mock(ReadShareGroupStateResult.class);
+
Mockito.when(readShareGroupStateResult.topicsData()).thenReturn(List.of(
+ new TopicData<>(TOPIC_ID_PARTITION.topicId(), List.of(
+ PartitionFactory.newPartitionAllData(0, 3, 5L,
Errors.NONE.code(), Errors.NONE.message(),
+ List.of(
+ new PersisterStateBatch(5L, 10L,
RecordState.AVAILABLE.id, (short) 2),
+ new PersisterStateBatch(11L, 10L,
RecordState.ARCHIVED.id, (short) 3)))))));
+
Mockito.when(persister.readState(Mockito.any())).thenReturn(CompletableFuture.completedFuture(readShareGroupStateResult));
+ SharePartition sharePartition =
SharePartitionBuilder.builder().withPersister(persister).build();
+
+ CompletableFuture<Void> result = sharePartition.maybeInitialize();
+ assertTrue(result.isDone());
+ assertTrue(result.isCompletedExceptionally());
+ assertFutureThrows(IllegalStateException.class, result);
+ assertEquals(SharePartitionState.FAILED,
sharePartition.partitionState());
+ }
+
@Test
public void testAcquireSingleRecord() throws InterruptedException {
SharePartition sharePartition = SharePartitionBuilder.builder()