This is an automated email from the ASF dual-hosted git repository.

schofielaj pushed a commit to branch trunk
in repository https://gitbox.apache.org/repos/asf/kafka.git


The following commit(s) were added to refs/heads/trunk by this push:
     new 84f810e49b9 MINOR: Resolve hidden NPE in RequestQuotaTest (#21587)
84f810e49b9 is described below

commit 84f810e49b9eb1ca554b95059f6ab1c720d37b0a
Author: Andrew Schofield <[email protected]>
AuthorDate: Thu Feb 26 14:02:36 2026 +0000

    MINOR: Resolve hidden NPE in RequestQuotaTest (#21587)
    
    `RequestQuotaTest` was silently experiencing NPE when testing
    `SHARE_ACKNOWLEDGE`. This is because the default for the group ID in
    this request is null, even though this is never actually used in
    practice by a real client. The construction of
    `ShareAcknowledgeRequestData` in this test did not initialize a specific
    value for group ID, and this means it was left as null. The result was
    an NPE handling the request in the broker, which was not the intended
    action of the test.
    
    The PR explicitly handles null for group ID and member ID in
    `SHARE_FETCH` and `SHARE_ACKNOWLEDGE` requests so that we are not
    relying on the overall exception handling for this situation. In
    practice, this would not be necessary for a real client, but the
    defensive code makes sense for this test (or a poorly written client).
    It also initialises the request in the test case with a non-null group
    ID and member ID for `SHARE_ACKNOWLEDGE` which aligns with what already
    exists for `SHARE_FETCH`.
    
    Reviewers: Apoorv Mittal <[email protected]>, Chia-Ping Tsai
     <[email protected]>
---
 core/src/main/scala/kafka/server/KafkaApis.scala           | 14 +++++++++++++-
 .../test/scala/unit/kafka/server/RequestQuotaTest.scala    |  8 +++++++-
 2 files changed, 20 insertions(+), 2 deletions(-)

diff --git a/core/src/main/scala/kafka/server/KafkaApis.scala 
b/core/src/main/scala/kafka/server/KafkaApis.scala
index 93b32a50192..00659864384 100644
--- a/core/src/main/scala/kafka/server/KafkaApis.scala
+++ b/core/src/main/scala/kafka/server/KafkaApis.scala
@@ -3172,6 +3172,12 @@ class KafkaApis(val requestChannel: RequestChannel,
 
     val groupId = shareFetchRequest.data.groupId
 
+    if (groupId == null) {
+      requestHelper.sendMaybeThrottle(request,
+        
shareFetchRequest.getErrorResponse(AbstractResponse.DEFAULT_THROTTLE_TIME, 
Errors.INVALID_REQUEST.exception("Invalid group id in the request.")))
+      return CompletableFuture.completedFuture[Unit](())
+    }
+
     // Share Fetch needs permission to perform the READ action on the named 
group resource (groupId)
     if (!authHelper.authorize(request.context, READ, GROUP, groupId)) {
       requestHelper.sendMaybeThrottle(request, 
shareFetchRequest.getErrorResponse(AbstractResponse.DEFAULT_THROTTLE_TIME, 
Errors.GROUP_AUTHORIZATION_FAILED.exception))
@@ -3539,6 +3545,12 @@ class KafkaApis(val requestChannel: RequestChannel,
 
     val groupId = shareAcknowledgeRequest.data.groupId
 
+    if (groupId == null) {
+      requestHelper.sendMaybeThrottle(request,
+        
shareAcknowledgeRequest.getErrorResponse(AbstractResponse.DEFAULT_THROTTLE_TIME,
 Errors.INVALID_REQUEST.exception("Invalid group id in the request.")))
+      return CompletableFuture.completedFuture[Unit](())
+    }
+
     // Share Acknowledge needs permission to perform READ action on the named 
group resource (groupId)
     if (!authHelper.authorize(request.context, READ, GROUP, groupId)) {
       requestHelper.sendMaybeThrottle(request,
@@ -4222,7 +4234,7 @@ class KafkaApis(val requestChannel: RequestChannel,
    * @return boolean if the member id in the RPC is valid or not.
    */
   def isMemberIdValid(memberId: String): Boolean = {
-    memberId.nonEmpty && memberId.length <= 36
+    memberId != null && memberId.nonEmpty && memberId.length <= 36
   }
 
   private def updateRecordConversionStats(request: RequestChannel.Request,
diff --git a/core/src/test/scala/unit/kafka/server/RequestQuotaTest.scala 
b/core/src/test/scala/unit/kafka/server/RequestQuotaTest.scala
index 32bbe8bf429..df8683fe65f 100644
--- a/core/src/test/scala/unit/kafka/server/RequestQuotaTest.scala
+++ b/core/src/test/scala/unit/kafka/server/RequestQuotaTest.scala
@@ -329,6 +329,7 @@ class RequestQuotaTest extends BaseRequestTest {
                 )
               )
           )
+
         case ApiKeys.OFFSET_FETCH =>
           OffsetFetchRequest.Builder.forTopicNames(
             new OffsetFetchRequestData()
@@ -493,6 +494,7 @@ class RequestQuotaTest extends BaseRequestTest {
               .setHost("*")
               .setOperation(AclOperation.WRITE.code)
               .setPermissionType(AclPermissionType.DENY.code))))
+
         case ApiKeys.DELETE_ACLS =>
           new DeleteAclsRequest.Builder(new 
DeleteAclsRequestData().setFilters(util.List.of(
             new DeleteAclsRequestData.DeleteAclsFilter()
@@ -503,6 +505,7 @@ class RequestQuotaTest extends BaseRequestTest {
               .setHostFilter("*")
               .setOperation(AclOperation.ANY.code)
               .setPermissionType(AclPermissionType.DENY.code))))
+
         case ApiKeys.DESCRIBE_CONFIGS =>
           new DescribeConfigsRequest.Builder(new DescribeConfigsRequestData()
             .setResources(util.List.of(new 
DescribeConfigsRequestData.DescribeConfigsResource()
@@ -719,7 +722,10 @@ class RequestQuotaTest extends BaseRequestTest {
                   ).iterator)))
 
         case ApiKeys.SHARE_ACKNOWLEDGE =>
-          new ShareAcknowledgeRequest.Builder(new 
ShareAcknowledgeRequestData())
+          new ShareAcknowledgeRequest.Builder(
+            new ShareAcknowledgeRequestData()
+              .setGroupId("test-share-group")
+              .setMemberId(Uuid.randomUuid().toString))
 
         case ApiKeys.ADD_RAFT_VOTER =>
           new AddRaftVoterRequest.Builder(new AddRaftVoterRequestData())

Reply via email to