This is an automated email from the ASF dual-hosted git repository. bbejeck pushed a commit to branch kip1071 in repository https://gitbox.apache.org/repos/asf/kafka.git
commit dc545b2b6ecc180c546a896d8c314cf42ba7a619 Author: Bill Bejeck <[email protected]> AuthorDate: Mon Dec 9 12:16:34 2024 -0500 MINOR: Added tests and side cleanup (#18069) Adding a couple of tests and some side cleanup. Co-authored-by: Bruno Cadonna <[email protected]> Reviewers: Bruno Cadonna <[email protected]>, Lucas Brutschy <[email protected]> --- .../coordinator/group/GroupMetadataManager.java | 19 +++++++++++-- .../group/GroupMetadataManagerTest.java | 31 +++++++++++++++++++++- 2 files changed, 47 insertions(+), 3 deletions(-) diff --git a/group-coordinator/src/main/java/org/apache/kafka/coordinator/group/GroupMetadataManager.java b/group-coordinator/src/main/java/org/apache/kafka/coordinator/group/GroupMetadataManager.java index 7244b20d052..024959e0d92 100644 --- a/group-coordinator/src/main/java/org/apache/kafka/coordinator/group/GroupMetadataManager.java +++ b/group-coordinator/src/main/java/org/apache/kafka/coordinator/group/GroupMetadataManager.java @@ -1446,6 +1446,22 @@ public class GroupMetadataManager { } } + /** + * Throws an InvalidRequestException if the value is null or empty. + * + * @param value The value. + * @param error The error message. + * @throws InvalidRequestException + */ + private void throwIfNullOrEmpty( + String value, + String error + ) throws InvalidRequestException { + if (value == null || value.trim().isEmpty()) { + throw new InvalidRequestException(error); + } + } + /** * Validates the request. * @@ -1531,7 +1547,7 @@ public class GroupMetadataManager { throwIfEmptyString(request.groupId(), "GroupId can't be empty."); throwIfEmptyString(request.instanceId(), "InstanceId can't be empty."); throwIfEmptyString(request.rackId(), "RackId can't be empty."); - throwIfEmptyString(request.memberId(), "MemberId can't be empty."); + throwIfNullOrEmpty(request.memberId(), "MemberId can't be null or empty."); if (request.memberEpoch() == 0) { @@ -1549,7 +1565,6 @@ public class GroupMetadataManager { } // TODO: check that active, standby and warmup do not intersect } else if (request.memberEpoch() == LEAVE_GROUP_STATIC_MEMBER_EPOCH) { - throwIfEmptyString(request.memberId(), "MemberId can't be empty."); throwIfNull(request.instanceId(), "InstanceId can't be null."); } else if ((request.memberEpoch() < 0) && (request.memberEpoch() != LEAVE_GROUP_MEMBER_EPOCH)) { throw new InvalidRequestException("MemberEpoch is invalid."); diff --git a/group-coordinator/src/test/java/org/apache/kafka/coordinator/group/GroupMetadataManagerTest.java b/group-coordinator/src/test/java/org/apache/kafka/coordinator/group/GroupMetadataManagerTest.java index 997db91bbb0..74fbd040513 100644 --- a/group-coordinator/src/test/java/org/apache/kafka/coordinator/group/GroupMetadataManagerTest.java +++ b/group-coordinator/src/test/java/org/apache/kafka/coordinator/group/GroupMetadataManagerTest.java @@ -340,7 +340,15 @@ public class GroupMetadataManagerTest { new StreamsGroupHeartbeatRequestData() .setGroupId("foo") .setMemberEpoch(0))); - assertEquals("MemberId can't be empty.", ex.getMessage()); + assertEquals("MemberId can't be null or empty.", ex.getMessage()); + + // MemberId must not be null in all requests + ex = assertThrows(InvalidRequestException.class, () -> context.streamsGroupHeartbeat( + new StreamsGroupHeartbeatRequestData() + .setGroupId("foo") + .setMemberId(null) + .setMemberEpoch(0))); + assertEquals("MemberId can't be null or empty.", ex.getMessage()); // InstanceId must be non-empty if provided in all requests. ex = assertThrows(InvalidRequestException.class, () -> context.streamsGroupHeartbeat( @@ -360,6 +368,27 @@ public class GroupMetadataManagerTest { .setRackId(""))); assertEquals("RackId can't be empty.", ex.getMessage()); + // InstanceId can't be null with static membership + ex = assertThrows(InvalidRequestException.class, () -> context.streamsGroupHeartbeat( + new StreamsGroupHeartbeatRequestData() + .setGroupId("foo") + .setMemberId(memberId) + .setMemberEpoch(LEAVE_GROUP_STATIC_MEMBER_EPOCH) + .setInstanceId(null) + .setRackId("rackid"))); + assertEquals("InstanceId can't be null.", ex.getMessage()); + + // valid memberEpoch values are 0+, -1 (member leave group), or -2 (static member leave group) + ex = assertThrows(InvalidRequestException.class, () -> context.streamsGroupHeartbeat( + new StreamsGroupHeartbeatRequestData() + .setGroupId("foo") + .setMemberId(memberId) + .setMemberEpoch(-3) + .setInstanceId("bar") + .setRackId("baz"))); + assertEquals("MemberEpoch is invalid.", ex.getMessage()); + + // TODO: Test supplied topology }
