This is an automated email from the ASF dual-hosted git repository.

mittal pushed a commit to branch 4.2
in repository https://gitbox.apache.org/repos/asf/kafka.git


The following commit(s) were added to refs/heads/4.2 by this push:
     new 333bf4176e6 MINOR: Adding check for invalid batch from persister in 
Share Partition (#20979) (#20987)
333bf4176e6 is described below

commit 333bf4176e692252c9a33382328c064f9d47508b
Author: Apoorv Mittal <[email protected]>
AuthorDate: Tue Nov 25 12:21:29 2025 +0000

    MINOR: Adding check for invalid batch from persister in Share Partition 
(#20979) (#20987)
    
    While writing unit tests, encounterd incorrect further batches when
    persister provides corrupt batches. Added a check to fail early rather
    sending incorrect batches to clients later.
    
    Reviewers: Andrew Schofield <[email protected]>
---
 .../main/java/kafka/server/share/SharePartition.java |  9 +++++++++
 .../java/kafka/server/share/SharePartitionTest.java  | 20 ++++++++++++++++++++
 2 files changed, 29 insertions(+)

diff --git a/core/src/main/java/kafka/server/share/SharePartition.java 
b/core/src/main/java/kafka/server/share/SharePartition.java
index 6f7041b92c0..78ab7c29897 100644
--- a/core/src/main/java/kafka/server/share/SharePartition.java
+++ b/core/src/main/java/kafka/server/share/SharePartition.java
@@ -490,6 +490,15 @@ public class SharePartition {
                         throwable = new 
IllegalStateException(String.format("Failed to initialize the share partition 
%s-%s", groupId, topicIdPartition));
                         return;
                     }
+
+                    if (stateBatch.lastOffset() < stateBatch.firstOffset()) {
+                        log.error("Invalid state batch found for the share 
partition: {}-{}. The first offset: {}"
+                                + " is less than the last offset of the batch: 
{}.", groupId, topicIdPartition,
+                            stateBatch.firstOffset(), stateBatch.lastOffset());
+                        throwable = new 
IllegalStateException(String.format("Failed to initialize the share partition 
%s-%s", groupId, topicIdPartition));
+                        return;
+                    }
+
                     if (gapStartOffset == -1 && stateBatch.firstOffset() > 
previousBatchLastOffset + 1) {
                         gapStartOffset = previousBatchLastOffset + 1;
                     }
diff --git a/core/src/test/java/kafka/server/share/SharePartitionTest.java 
b/core/src/test/java/kafka/server/share/SharePartitionTest.java
index c8be18c027e..73c0242b81f 100644
--- a/core/src/test/java/kafka/server/share/SharePartitionTest.java
+++ b/core/src/test/java/kafka/server/share/SharePartitionTest.java
@@ -1804,6 +1804,26 @@ public class SharePartitionTest {
         assertEquals(3, sharePartition.deliveryCompleteCount());
     }
 
+    @Test
+    public void testMaybeInitializeWithInvalidOffsetInBatch() {
+        Persister persister = Mockito.mock(Persister.class);
+        ReadShareGroupStateResult readShareGroupStateResult = 
Mockito.mock(ReadShareGroupStateResult.class);
+        
Mockito.when(readShareGroupStateResult.topicsData()).thenReturn(List.of(
+            new TopicData<>(TOPIC_ID_PARTITION.topicId(), List.of(
+                PartitionFactory.newPartitionAllData(0, 3, 5L, 
Errors.NONE.code(), Errors.NONE.message(),
+                    List.of(
+                        new PersisterStateBatch(5L, 10L, 
RecordState.AVAILABLE.id, (short) 2),
+                        new PersisterStateBatch(11L, 10L, 
RecordState.ARCHIVED.id, (short) 3)))))));
+        
Mockito.when(persister.readState(Mockito.any())).thenReturn(CompletableFuture.completedFuture(readShareGroupStateResult));
+        SharePartition sharePartition = 
SharePartitionBuilder.builder().withPersister(persister).build();
+
+        CompletableFuture<Void> result = sharePartition.maybeInitialize();
+        assertTrue(result.isDone());
+        assertTrue(result.isCompletedExceptionally());
+        assertFutureThrows(IllegalStateException.class, result);
+        assertEquals(SharePartitionState.FAILED, 
sharePartition.partitionState());
+    }
+
     @Test
     public void testAcquireSingleRecord() throws InterruptedException {
         SharePartition sharePartition = SharePartitionBuilder.builder()

Reply via email to