This is an automated email from the ASF dual-hosted git repository.

bbejeck pushed a commit to branch kip1071
in repository https://gitbox.apache.org/repos/asf/kafka.git

commit dc545b2b6ecc180c546a896d8c314cf42ba7a619
Author: Bill Bejeck <[email protected]>
AuthorDate: Mon Dec 9 12:16:34 2024 -0500

    MINOR: Added tests and side cleanup (#18069)
    
    Adding a couple of tests and some side cleanup.
    Co-authored-by: Bruno Cadonna <[email protected]>
    
    Reviewers: Bruno Cadonna <[email protected]>, Lucas Brutschy 
<[email protected]>
---
 .../coordinator/group/GroupMetadataManager.java    | 19 +++++++++++--
 .../group/GroupMetadataManagerTest.java            | 31 +++++++++++++++++++++-
 2 files changed, 47 insertions(+), 3 deletions(-)

diff --git 
a/group-coordinator/src/main/java/org/apache/kafka/coordinator/group/GroupMetadataManager.java
 
b/group-coordinator/src/main/java/org/apache/kafka/coordinator/group/GroupMetadataManager.java
index 7244b20d052..024959e0d92 100644
--- 
a/group-coordinator/src/main/java/org/apache/kafka/coordinator/group/GroupMetadataManager.java
+++ 
b/group-coordinator/src/main/java/org/apache/kafka/coordinator/group/GroupMetadataManager.java
@@ -1446,6 +1446,22 @@ public class GroupMetadataManager {
         }
     }
 
+    /**
+     * Throws an InvalidRequestException if the value is null or empty.
+     *
+     * @param value The value.
+     * @param error The error message.
+     * @throws InvalidRequestException
+     */
+    private void throwIfNullOrEmpty(
+            String value,
+            String error
+    ) throws InvalidRequestException {
+        if (value == null || value.trim().isEmpty()) {
+            throw new InvalidRequestException(error);
+        }
+    }
+
     /**
      * Validates the request.
      *
@@ -1531,7 +1547,7 @@ public class GroupMetadataManager {
         throwIfEmptyString(request.groupId(), "GroupId can't be empty.");
         throwIfEmptyString(request.instanceId(), "InstanceId can't be empty.");
         throwIfEmptyString(request.rackId(), "RackId can't be empty.");
-        throwIfEmptyString(request.memberId(), "MemberId can't be empty.");
+        throwIfNullOrEmpty(request.memberId(), "MemberId can't be null or 
empty.");
 
 
         if (request.memberEpoch() == 0) {
@@ -1549,7 +1565,6 @@ public class GroupMetadataManager {
             }
             // TODO: check that active, standby and warmup do not intersect
         } else if (request.memberEpoch() == LEAVE_GROUP_STATIC_MEMBER_EPOCH) {
-            throwIfEmptyString(request.memberId(), "MemberId can't be empty.");
             throwIfNull(request.instanceId(), "InstanceId can't be null.");
         } else if ((request.memberEpoch() <  0) && (request.memberEpoch() != 
LEAVE_GROUP_MEMBER_EPOCH)) {
             throw new InvalidRequestException("MemberEpoch is invalid.");
diff --git 
a/group-coordinator/src/test/java/org/apache/kafka/coordinator/group/GroupMetadataManagerTest.java
 
b/group-coordinator/src/test/java/org/apache/kafka/coordinator/group/GroupMetadataManagerTest.java
index 997db91bbb0..74fbd040513 100644
--- 
a/group-coordinator/src/test/java/org/apache/kafka/coordinator/group/GroupMetadataManagerTest.java
+++ 
b/group-coordinator/src/test/java/org/apache/kafka/coordinator/group/GroupMetadataManagerTest.java
@@ -340,7 +340,15 @@ public class GroupMetadataManagerTest {
             new StreamsGroupHeartbeatRequestData()
                 .setGroupId("foo")
                 .setMemberEpoch(0)));
-        assertEquals("MemberId can't be empty.", ex.getMessage());
+        assertEquals("MemberId can't be null or empty.", ex.getMessage());
+
+        // MemberId must not be null in all requests
+        ex = assertThrows(InvalidRequestException.class, () -> 
context.streamsGroupHeartbeat(
+                new StreamsGroupHeartbeatRequestData()
+                        .setGroupId("foo")
+                        .setMemberId(null)
+                        .setMemberEpoch(0)));
+        assertEquals("MemberId can't be null or empty.", ex.getMessage());
 
         // InstanceId must be non-empty if provided in all requests.
         ex = assertThrows(InvalidRequestException.class, () -> 
context.streamsGroupHeartbeat(
@@ -360,6 +368,27 @@ public class GroupMetadataManagerTest {
                 .setRackId("")));
         assertEquals("RackId can't be empty.", ex.getMessage());
 
+        // InstanceId can't be null with static membership
+        ex = assertThrows(InvalidRequestException.class, () -> 
context.streamsGroupHeartbeat(
+                new StreamsGroupHeartbeatRequestData()
+                        .setGroupId("foo")
+                        .setMemberId(memberId)
+                        .setMemberEpoch(LEAVE_GROUP_STATIC_MEMBER_EPOCH)
+                        .setInstanceId(null)
+                        .setRackId("rackid")));
+        assertEquals("InstanceId can't be null.", ex.getMessage());
+
+        // valid memberEpoch values are 0+, -1 (member leave group), or -2 
(static member leave group)
+        ex = assertThrows(InvalidRequestException.class, () -> 
context.streamsGroupHeartbeat(
+                new StreamsGroupHeartbeatRequestData()
+                        .setGroupId("foo")
+                        .setMemberId(memberId)
+                        .setMemberEpoch(-3)
+                        .setInstanceId("bar")
+                        .setRackId("baz")));
+        assertEquals("MemberEpoch is invalid.", ex.getMessage());
+
+
         // TODO: Test supplied topology
     }
 

Reply via email to