This is an automated email from the ASF dual-hosted git repository.
mittal pushed a commit to branch trunk
in repository https://gitbox.apache.org/repos/asf/kafka.git
The following commit(s) were added to refs/heads/trunk by this push:
new aef1fac1271 MINOR: Cleanup in SharePartition acknowledgement
functionality (#21236)
aef1fac1271 is described below
commit aef1fac1271ab6065a19ddda12dd19ccfd70d6b2
Author: Abhinav Dixit <[email protected]>
AuthorDate: Sat Jan 3 16:14:38 2026 +0530
MINOR: Cleanup in SharePartition acknowledgement functionality (#21236)
### About
This PR performs cleanup in `SharePartition.java` acknowledgement flow.
We do not need `Objects.requireNonNull(recordState);` firstly because
this has been covered in this
[code](https://github.com/apache/kafka/blob/trunk/core/src/main/java/kafka/server/share/SharePartition.java#L1000).
Also throwing a `NullPointerException` is indeed wrong for an invalid
acknowledge type provided by the client.
### Testing
Added some unit tests.
Reviewers: Andrew Schofield <[email protected]>, Apoorv Mittal
<[email protected]>
---
.../java/kafka/server/share/SharePartition.java | 14 +++++---
.../kafka/server/share/SharePartitionTest.java | 40 ++++++++++++++++++++++
2 files changed, 49 insertions(+), 5 deletions(-)
diff --git a/core/src/main/java/kafka/server/share/SharePartition.java
b/core/src/main/java/kafka/server/share/SharePartition.java
index baafdb24ae1..4dbbc6720b2 100644
--- a/core/src/main/java/kafka/server/share/SharePartition.java
+++ b/core/src/main/java/kafka/server/share/SharePartition.java
@@ -2086,7 +2086,7 @@ public class SharePartition {
Map<Long, Byte> ackTypeMap = new HashMap<>();
for (int index = 0; index < batch.acknowledgeTypes().size(); index++) {
byte ackType = batch.acknowledgeTypes().get(index);
- // Validate
+ // Validate ackType except values 0, since 0 stands for gap.
if (ackType != 0) {
AcknowledgeType.forId(ackType);
}
@@ -2304,7 +2304,7 @@ public class SharePartition {
// Only valid for ACQUIRED offsets; the check above
ensures this.
long key = offsetState.getKey();
InFlightState state = offsetState.getValue();
- log.debug("Renewing acq lock for {}-{} with offset {} in
batch {} for member {}.",
+ log.debug("Renewing acquisition lock for {}-{} with offset
{} in batch {} for member {}.",
groupId, topicIdPartition, key, inFlightBatch,
memberId);
state.cancelAndClearAcquisitionLockTimeoutTask();
AcquisitionLockTimerTask renewalTask =
scheduleAcquisitionLockTimeout(memberId, key, key);
@@ -2315,7 +2315,9 @@ public class SharePartition {
// mapping between bytes and record state type. All ack
types have been added except for RENEW which
// has been handled above.
RecordState recordState =
ACK_TYPE_TO_RECORD_STATE.get(ackType);
- Objects.requireNonNull(recordState);
+ if (recordState == null) {
+ return Optional.of(new
IllegalArgumentException("Unknown acknowledge type id: " + ackType));
+ }
InFlightState updateResult =
offsetState.getValue().startStateTransition(
recordState,
@@ -2372,7 +2374,7 @@ public class SharePartition {
if (ackType == AcknowledgeType.RENEW.id) {
// Renew the acquisition lock timer for the complete batch. We
have already
// checked that the batchState is ACQUIRED above.
- log.debug("Renewing acq lock for {}-{} with batch {}-{} for
member {}.",
+ log.debug("Renewing acquisition lock for {}-{} with batch
{}-{} for member {}.",
groupId, topicIdPartition, inFlightBatch.firstOffset(),
inFlightBatch.lastOffset(), memberId);
inFlightBatch.cancelAndClearAcquisitionLockTimeoutTask();
AcquisitionLockTimerTask renewalTask =
scheduleAcquisitionLockTimeout(memberId,
@@ -2387,7 +2389,9 @@ public class SharePartition {
// either released or moved to a state where member id existence
is not important. The member id
// is only important when the batch is acquired.
RecordState recordState = ACK_TYPE_TO_RECORD_STATE.get(ackType);
- Objects.requireNonNull(recordState);
+ if (recordState == null) {
+ return Optional.of(new IllegalArgumentException("Unknown
acknowledge type id: " + ackType));
+ }
InFlightState updateResult =
inFlightBatch.startBatchStateTransition(
recordState,
diff --git a/core/src/test/java/kafka/server/share/SharePartitionTest.java
b/core/src/test/java/kafka/server/share/SharePartitionTest.java
index f210ab22ef9..863b98a2b1e 100644
--- a/core/src/test/java/kafka/server/share/SharePartitionTest.java
+++ b/core/src/test/java/kafka/server/share/SharePartitionTest.java
@@ -12069,6 +12069,46 @@ public class SharePartitionTest {
assertEquals(EMPTY_MEMBER_ID,
sharePartition.cachedState().get(5L).offsetState().get(14L).memberId());
}
+ @Test
+ public void testInvalidAcknowledgeTypeInBatchAcknowledgement() {
+ ReplicaManager replicaManager = Mockito.mock(ReplicaManager.class);
+ SharePartition sharePartition = SharePartitionBuilder.builder()
+ .withReplicaManager(replicaManager)
+ .withState(SharePartitionState.ACTIVE)
+ .build();
+
+ MemoryRecords records1 = memoryRecords(1, 10);
+ List<AcquiredRecords> acquiredRecordsList =
fetchAcquiredRecords(sharePartition, records1, 10);
+ assertEquals(1, acquiredRecordsList.size());
+
+ // Invalid acknowledge type 5.
+ CompletableFuture<Void> ackResult = sharePartition.acknowledge(
+ MEMBER_ID,
+ List.of(new ShareAcknowledgementBatch(1, 10, List.of((byte) 5))));
+ assertTrue(ackResult.isCompletedExceptionally());
+ assertFutureThrows(InvalidRequestException.class, ackResult);
+ }
+
+ @Test
+ public void testInvalidAcknowledgeTypeInSubsetAcknowledgement() {
+ ReplicaManager replicaManager = Mockito.mock(ReplicaManager.class);
+ SharePartition sharePartition = SharePartitionBuilder.builder()
+ .withReplicaManager(replicaManager)
+ .withState(SharePartitionState.ACTIVE)
+ .build();
+
+ MemoryRecords records1 = memoryRecords(1, 10);
+ List<AcquiredRecords> acquiredRecordsList =
fetchAcquiredRecords(sharePartition, records1, 10);
+ assertEquals(1, acquiredRecordsList.size());
+
+ // Invalid acknowledge type -1.
+ CompletableFuture<Void> ackResult = sharePartition.acknowledge(
+ MEMBER_ID,
+ List.of(new ShareAcknowledgementBatch(2, 3, List.of((byte) -1))));
+ assertTrue(ackResult.isCompletedExceptionally());
+ assertFutureThrows(InvalidRequestException.class, ackResult);
+ }
+
/**
* This function produces transactional data of a given no. of records
followed by a transactional marker (COMMIT/ABORT).
*/