This is an automated email from the ASF dual-hosted git repository.

mittal pushed a commit to branch trunk
in repository https://gitbox.apache.org/repos/asf/kafka.git


The following commit(s) were added to refs/heads/trunk by this push:
     new aef1fac1271 MINOR: Cleanup in SharePartition acknowledgement 
functionality (#21236)
aef1fac1271 is described below

commit aef1fac1271ab6065a19ddda12dd19ccfd70d6b2
Author: Abhinav Dixit <[email protected]>
AuthorDate: Sat Jan 3 16:14:38 2026 +0530

    MINOR: Cleanup in SharePartition acknowledgement functionality (#21236)
    
    ### About
    This PR performs cleanup in `SharePartition.java` acknowledgement flow.
    We do not need `Objects.requireNonNull(recordState);` firstly because
    this has been covered in this
    
[code](https://github.com/apache/kafka/blob/trunk/core/src/main/java/kafka/server/share/SharePartition.java#L1000).
    Also throwing a `NullPointerException` is indeed wrong for an invalid
    acknowledge type provided by the client.
    
    ### Testing
    Added some unit tests.
    
    Reviewers: Andrew Schofield <[email protected]>, Apoorv Mittal
     <[email protected]>
---
 .../java/kafka/server/share/SharePartition.java    | 14 +++++---
 .../kafka/server/share/SharePartitionTest.java     | 40 ++++++++++++++++++++++
 2 files changed, 49 insertions(+), 5 deletions(-)

diff --git a/core/src/main/java/kafka/server/share/SharePartition.java 
b/core/src/main/java/kafka/server/share/SharePartition.java
index baafdb24ae1..4dbbc6720b2 100644
--- a/core/src/main/java/kafka/server/share/SharePartition.java
+++ b/core/src/main/java/kafka/server/share/SharePartition.java
@@ -2086,7 +2086,7 @@ public class SharePartition {
         Map<Long, Byte> ackTypeMap = new HashMap<>();
         for (int index = 0; index < batch.acknowledgeTypes().size(); index++) {
             byte ackType = batch.acknowledgeTypes().get(index);
-            // Validate
+            // Validate ackType except values 0, since 0 stands for gap.
             if (ackType != 0) {
                 AcknowledgeType.forId(ackType);
             }
@@ -2304,7 +2304,7 @@ public class SharePartition {
                     // Only valid for ACQUIRED offsets; the check above 
ensures this.
                     long key = offsetState.getKey();
                     InFlightState state = offsetState.getValue();
-                    log.debug("Renewing acq lock for {}-{} with offset {} in 
batch {} for member {}.",
+                    log.debug("Renewing acquisition lock for {}-{} with offset 
{} in batch {} for member {}.",
                         groupId, topicIdPartition, key, inFlightBatch, 
memberId);
                     state.cancelAndClearAcquisitionLockTimeoutTask();
                     AcquisitionLockTimerTask renewalTask = 
scheduleAcquisitionLockTimeout(memberId, key, key);
@@ -2315,7 +2315,9 @@ public class SharePartition {
                     // mapping between bytes and record state type. All ack 
types have been added except for RENEW which
                     // has been handled above.
                     RecordState recordState = 
ACK_TYPE_TO_RECORD_STATE.get(ackType);
-                    Objects.requireNonNull(recordState);
+                    if (recordState == null) {
+                        return Optional.of(new 
IllegalArgumentException("Unknown acknowledge type id: " + ackType));
+                    }
 
                     InFlightState updateResult = 
offsetState.getValue().startStateTransition(
                         recordState,
@@ -2372,7 +2374,7 @@ public class SharePartition {
             if (ackType == AcknowledgeType.RENEW.id) {
                 // Renew the acquisition lock timer for the complete batch. We 
have already
                 // checked that the batchState is ACQUIRED above.
-                log.debug("Renewing acq lock for {}-{} with batch {}-{} for 
member {}.",
+                log.debug("Renewing acquisition lock for {}-{} with batch 
{}-{} for member {}.",
                     groupId, topicIdPartition, inFlightBatch.firstOffset(), 
inFlightBatch.lastOffset(), memberId);
                 inFlightBatch.cancelAndClearAcquisitionLockTimeoutTask();
                 AcquisitionLockTimerTask renewalTask = 
scheduleAcquisitionLockTimeout(memberId,
@@ -2387,7 +2389,9 @@ public class SharePartition {
             // either released or moved to a state where member id existence 
is not important. The member id
             // is only important when the batch is acquired.
             RecordState recordState = ACK_TYPE_TO_RECORD_STATE.get(ackType);
-            Objects.requireNonNull(recordState);
+            if (recordState == null) {
+                return Optional.of(new IllegalArgumentException("Unknown 
acknowledge type id: " + ackType));
+            }
 
             InFlightState updateResult = 
inFlightBatch.startBatchStateTransition(
                 recordState,
diff --git a/core/src/test/java/kafka/server/share/SharePartitionTest.java 
b/core/src/test/java/kafka/server/share/SharePartitionTest.java
index f210ab22ef9..863b98a2b1e 100644
--- a/core/src/test/java/kafka/server/share/SharePartitionTest.java
+++ b/core/src/test/java/kafka/server/share/SharePartitionTest.java
@@ -12069,6 +12069,46 @@ public class SharePartitionTest {
         assertEquals(EMPTY_MEMBER_ID, 
sharePartition.cachedState().get(5L).offsetState().get(14L).memberId());
     }
 
+    @Test
+    public void testInvalidAcknowledgeTypeInBatchAcknowledgement() {
+        ReplicaManager replicaManager = Mockito.mock(ReplicaManager.class);
+        SharePartition sharePartition = SharePartitionBuilder.builder()
+            .withReplicaManager(replicaManager)
+            .withState(SharePartitionState.ACTIVE)
+            .build();
+
+        MemoryRecords records1 = memoryRecords(1, 10);
+        List<AcquiredRecords> acquiredRecordsList = 
fetchAcquiredRecords(sharePartition, records1, 10);
+        assertEquals(1, acquiredRecordsList.size());
+
+        // Invalid acknowledge type 5.
+        CompletableFuture<Void> ackResult = sharePartition.acknowledge(
+            MEMBER_ID,
+            List.of(new ShareAcknowledgementBatch(1, 10, List.of((byte) 5))));
+        assertTrue(ackResult.isCompletedExceptionally());
+        assertFutureThrows(InvalidRequestException.class, ackResult);
+    }
+
+    @Test
+    public void testInvalidAcknowledgeTypeInSubsetAcknowledgement() {
+        ReplicaManager replicaManager = Mockito.mock(ReplicaManager.class);
+        SharePartition sharePartition = SharePartitionBuilder.builder()
+            .withReplicaManager(replicaManager)
+            .withState(SharePartitionState.ACTIVE)
+            .build();
+
+        MemoryRecords records1 = memoryRecords(1, 10);
+        List<AcquiredRecords> acquiredRecordsList = 
fetchAcquiredRecords(sharePartition, records1, 10);
+        assertEquals(1, acquiredRecordsList.size());
+
+        // Invalid acknowledge type -1.
+        CompletableFuture<Void> ackResult = sharePartition.acknowledge(
+            MEMBER_ID,
+            List.of(new ShareAcknowledgementBatch(2, 3, List.of((byte) -1))));
+        assertTrue(ackResult.isCompletedExceptionally());
+        assertFutureThrows(InvalidRequestException.class, ackResult);
+    }
+
     /**
      * This function produces transactional data of a given no. of records 
followed by a transactional marker (COMMIT/ABORT).
      */

Reply via email to