This is an automated email from the ASF dual-hosted git repository.
bbejeck pushed a commit to branch kip1071
in repository https://gitbox.apache.org/repos/asf/kafka.git
The following commit(s) were added to refs/heads/kip1071 by this push:
new ea5d5e67c2c MINOR: Added tests and side cleanup (#18069)
ea5d5e67c2c is described below
commit ea5d5e67c2c22386fa039442f12735e8e709eedb
Author: Bill Bejeck <[email protected]>
AuthorDate: Mon Dec 9 12:16:34 2024 -0500
MINOR: Added tests and side cleanup (#18069)
Adding a couple of tests and some side cleanup.
Co-authored-by: Bruno Cadonna <[email protected]>
Reviewers: Bruno Cadonna <[email protected]>, Lucas Brutschy
<[email protected]>
---
.../coordinator/group/GroupMetadataManager.java | 19 +++++++++++--
.../group/GroupMetadataManagerTest.java | 31 +++++++++++++++++++++-
2 files changed, 47 insertions(+), 3 deletions(-)
diff --git
a/group-coordinator/src/main/java/org/apache/kafka/coordinator/group/GroupMetadataManager.java
b/group-coordinator/src/main/java/org/apache/kafka/coordinator/group/GroupMetadataManager.java
index 7244b20d052..024959e0d92 100644
---
a/group-coordinator/src/main/java/org/apache/kafka/coordinator/group/GroupMetadataManager.java
+++
b/group-coordinator/src/main/java/org/apache/kafka/coordinator/group/GroupMetadataManager.java
@@ -1446,6 +1446,22 @@ public class GroupMetadataManager {
}
}
+ /**
+ * Throws an InvalidRequestException if the value is null or empty.
+ *
+ * @param value The value.
+ * @param error The error message.
+ * @throws InvalidRequestException
+ */
+ private void throwIfNullOrEmpty(
+ String value,
+ String error
+ ) throws InvalidRequestException {
+ if (value == null || value.trim().isEmpty()) {
+ throw new InvalidRequestException(error);
+ }
+ }
+
/**
* Validates the request.
*
@@ -1531,7 +1547,7 @@ public class GroupMetadataManager {
throwIfEmptyString(request.groupId(), "GroupId can't be empty.");
throwIfEmptyString(request.instanceId(), "InstanceId can't be empty.");
throwIfEmptyString(request.rackId(), "RackId can't be empty.");
- throwIfEmptyString(request.memberId(), "MemberId can't be empty.");
+ throwIfNullOrEmpty(request.memberId(), "MemberId can't be null or
empty.");
if (request.memberEpoch() == 0) {
@@ -1549,7 +1565,6 @@ public class GroupMetadataManager {
}
// TODO: check that active, standby and warmup do not intersect
} else if (request.memberEpoch() == LEAVE_GROUP_STATIC_MEMBER_EPOCH) {
- throwIfEmptyString(request.memberId(), "MemberId can't be empty.");
throwIfNull(request.instanceId(), "InstanceId can't be null.");
} else if ((request.memberEpoch() < 0) && (request.memberEpoch() !=
LEAVE_GROUP_MEMBER_EPOCH)) {
throw new InvalidRequestException("MemberEpoch is invalid.");
diff --git
a/group-coordinator/src/test/java/org/apache/kafka/coordinator/group/GroupMetadataManagerTest.java
b/group-coordinator/src/test/java/org/apache/kafka/coordinator/group/GroupMetadataManagerTest.java
index 997db91bbb0..74fbd040513 100644
---
a/group-coordinator/src/test/java/org/apache/kafka/coordinator/group/GroupMetadataManagerTest.java
+++
b/group-coordinator/src/test/java/org/apache/kafka/coordinator/group/GroupMetadataManagerTest.java
@@ -340,7 +340,15 @@ public class GroupMetadataManagerTest {
new StreamsGroupHeartbeatRequestData()
.setGroupId("foo")
.setMemberEpoch(0)));
- assertEquals("MemberId can't be empty.", ex.getMessage());
+ assertEquals("MemberId can't be null or empty.", ex.getMessage());
+
+ // MemberId must not be null in all requests
+ ex = assertThrows(InvalidRequestException.class, () ->
context.streamsGroupHeartbeat(
+ new StreamsGroupHeartbeatRequestData()
+ .setGroupId("foo")
+ .setMemberId(null)
+ .setMemberEpoch(0)));
+ assertEquals("MemberId can't be null or empty.", ex.getMessage());
// InstanceId must be non-empty if provided in all requests.
ex = assertThrows(InvalidRequestException.class, () ->
context.streamsGroupHeartbeat(
@@ -360,6 +368,27 @@ public class GroupMetadataManagerTest {
.setRackId("")));
assertEquals("RackId can't be empty.", ex.getMessage());
+ // InstanceId can't be null with static membership
+ ex = assertThrows(InvalidRequestException.class, () ->
context.streamsGroupHeartbeat(
+ new StreamsGroupHeartbeatRequestData()
+ .setGroupId("foo")
+ .setMemberId(memberId)
+ .setMemberEpoch(LEAVE_GROUP_STATIC_MEMBER_EPOCH)
+ .setInstanceId(null)
+ .setRackId("rackid")));
+ assertEquals("InstanceId can't be null.", ex.getMessage());
+
+ // valid memberEpoch values are 0+, -1 (member leave group), or -2
(static member leave group)
+ ex = assertThrows(InvalidRequestException.class, () ->
context.streamsGroupHeartbeat(
+ new StreamsGroupHeartbeatRequestData()
+ .setGroupId("foo")
+ .setMemberId(memberId)
+ .setMemberEpoch(-3)
+ .setInstanceId("bar")
+ .setRackId("baz")));
+ assertEquals("MemberEpoch is invalid.", ex.getMessage());
+
+
// TODO: Test supplied topology
}