This is an automated email from the ASF dual-hosted git repository.

dajac pushed a commit to branch trunk
in repository https://gitbox.apache.org/repos/asf/kafka.git


The following commit(s) were added to refs/heads/trunk by this push:
     new 57737a357fa KAFKA-18188; Admin LeaveGroup should allow removing member 
using consumer protocol by member id (#18116)
57737a357fa is described below

commit 57737a357fad495964c1aefc593b67d107098d38
Author: David Jacot <[email protected]>
AuthorDate: Wed Dec 11 08:17:32 2024 +0100

    KAFKA-18188; Admin LeaveGroup should allow removing member using consumer 
protocol by member id (#18116)
    
    The LeaveGroup API is used by the admin client to remove static members or 
remove all members from the group. The latter does not work because the API 
does not allow removing a member using the CONSUMER protocol by member id. 
Moreover, the response should only include the member id if the member id was 
included in the request. This patch fixes both issues.
    
    Reviewers: Dongnuo Lyu <[email protected]>, Christo Lolov 
<[email protected]>, Jeff Kim <[email protected]>
---
 .../unit/kafka/server/LeaveGroupRequestTest.scala  | 49 +++++++++++++++------
 .../coordinator/group/GroupMetadataManager.java    | 42 ++++++++----------
 .../group/GroupMetadataManagerTest.java            | 51 ++++++++++++----------
 3 files changed, 83 insertions(+), 59 deletions(-)

diff --git a/core/src/test/scala/unit/kafka/server/LeaveGroupRequestTest.scala 
b/core/src/test/scala/unit/kafka/server/LeaveGroupRequestTest.scala
index 84e609fcd92..4cc3f968d27 100644
--- a/core/src/test/scala/unit/kafka/server/LeaveGroupRequestTest.scala
+++ b/core/src/test/scala/unit/kafka/server/LeaveGroupRequestTest.scala
@@ -49,29 +49,50 @@ class LeaveGroupRequestTest(cluster: ClusterInstance) 
extends GroupCoordinatorBa
       numPartitions = 3
     )
 
+    def instanceId(memberId: String): String = "instance_" + memberId
+    val memberIds = Range(0, 3).map { __ =>
+      Uuid.randomUuid().toString
+    }
+
     for (version <- 3 to 
ApiKeys.LEAVE_GROUP.latestVersion(isUnstableApiEnabled)) {
-      val memberId = Uuid.randomUuid().toString
-      assertEquals(Errors.NONE.code, consumerGroupHeartbeat(
-        groupId = "group",
-        memberId = memberId,
-        memberEpoch = 0,
-        instanceId = "instance-id",
-        rebalanceTimeoutMs = 5 * 60 * 1000,
-        subscribedTopicNames = List("foo"),
-        topicPartitions = List.empty,
-      ).errorCode)
+      // Join with all the members.
+      memberIds.foreach { memberId =>
+        assertEquals(Errors.NONE.code, consumerGroupHeartbeat(
+          groupId = "group",
+          memberId = memberId,
+          memberEpoch = 0,
+          instanceId = instanceId(memberId),
+          rebalanceTimeoutMs = 5 * 60 * 1000,
+          subscribedTopicNames = List("foo"),
+          topicPartitions = List.empty,
+        ).errorCode)
+      }
 
       assertEquals(
         new LeaveGroupResponseData()
           .setMembers(List(
             new LeaveGroupResponseData.MemberResponse()
-              .setMemberId(memberId)
-              .setGroupInstanceId("instance-id")
+              .setMemberId(JoinGroupRequest.UNKNOWN_MEMBER_ID)
+              .setGroupInstanceId(instanceId(memberIds(0))),
+            new LeaveGroupResponseData.MemberResponse()
+              .setMemberId(memberIds(1))
+              .setGroupInstanceId(instanceId(memberIds(1))),
+            new LeaveGroupResponseData.MemberResponse()
+              .setMemberId(memberIds(2))
+              .setGroupInstanceId(null)
           ).asJava),
         classicLeaveGroup(
           groupId = "group",
-          memberIds = List(JoinGroupRequest.UNKNOWN_MEMBER_ID),
-          groupInstanceIds = List("instance-id"),
+          memberIds = List(
+            JoinGroupRequest.UNKNOWN_MEMBER_ID,
+            memberIds(1),
+            memberIds(2)
+          ),
+          groupInstanceIds = List(
+            instanceId(memberIds(0)),
+            instanceId(memberIds(1)),
+            null
+          ),
           version = version.toShort
         )
       )
diff --git 
a/group-coordinator/src/main/java/org/apache/kafka/coordinator/group/GroupMetadataManager.java
 
b/group-coordinator/src/main/java/org/apache/kafka/coordinator/group/GroupMetadataManager.java
index dd0c6954088..6b9a5e7cbd5 100644
--- 
a/group-coordinator/src/main/java/org/apache/kafka/coordinator/group/GroupMetadataManager.java
+++ 
b/group-coordinator/src/main/java/org/apache/kafka/coordinator/group/GroupMetadataManager.java
@@ -6020,7 +6020,7 @@ public class GroupMetadataManager {
         }
 
         if (group.type() == CLASSIC) {
-            return classicGroupLeaveToClassicGroup((ClassicGroup) group, 
context, request);
+            return classicGroupLeaveToClassicGroup((ClassicGroup) group, 
request);
         } else if (group.type() == CONSUMER) {
             return classicGroupLeaveToConsumerGroup((ConsumerGroup) group, 
request);
         } else {
@@ -6046,48 +6046,46 @@ public class GroupMetadataManager {
         List<CoordinatorRecord> records = new ArrayList<>();
 
         for (MemberIdentity memberIdentity : request.members()) {
-            String memberId = memberIdentity.memberId();
-            String instanceId = memberIdentity.groupInstanceId();
             String reason = memberIdentity.reason() != null ? 
memberIdentity.reason() : "not provided";
 
-            ConsumerGroupMember member;
             try {
-                if (instanceId == null) {
-                    member = group.getOrMaybeCreateMember(memberId, false);
-                    throwIfMemberDoesNotUseClassicProtocol(member);
+                ConsumerGroupMember member;
+
+                if (memberIdentity.groupInstanceId() == null) {
+                    member = 
group.getOrMaybeCreateMember(memberIdentity.memberId(), false);
 
                     log.info("[GroupId {}] Dynamic member {} has left group " +
                             "through explicit `LeaveGroup` request; client 
reason: {}",
-                        groupId, memberId, reason);
+                        groupId, memberIdentity.memberId(), reason);
                 } else {
-                    member = group.staticMember(instanceId);
-                    throwIfStaticMemberIsUnknown(member, instanceId);
+                    member = 
group.staticMember(memberIdentity.groupInstanceId());
+                    throwIfStaticMemberIsUnknown(member, 
memberIdentity.groupInstanceId());
                     // The LeaveGroup API allows administrative removal of 
members by GroupInstanceId
                     // in which case we expect the MemberId to be undefined.
-                    if (!UNKNOWN_MEMBER_ID.equals(memberId)) {
-                        throwIfInstanceIdIsFenced(member, groupId, memberId, 
instanceId);
-                        throwIfMemberDoesNotUseClassicProtocol(member);
+                    if (!UNKNOWN_MEMBER_ID.equals(memberIdentity.memberId())) {
+                        throwIfInstanceIdIsFenced(member, groupId, 
memberIdentity.memberId(), memberIdentity.groupInstanceId());
                     }
 
-                    memberId = member.memberId();
                     log.info("[GroupId {}] Static member {} with instance id 
{} has left group " +
                             "through explicit `LeaveGroup` request; client 
reason: {}",
-                        groupId, memberId, instanceId, reason);
+                        groupId, memberIdentity.memberId(), 
memberIdentity.groupInstanceId(), reason);
                 }
 
-                removeMember(records, groupId, memberId);
-                cancelTimers(groupId, memberId);
+                removeMember(records, groupId, member.memberId());
+                cancelTimers(groupId, member.memberId());
+
                 memberResponses.add(
                     new MemberResponse()
-                        .setMemberId(memberId)
-                        .setGroupInstanceId(instanceId)
+                        .setMemberId(memberIdentity.memberId())
+                        .setGroupInstanceId(memberIdentity.groupInstanceId())
                 );
+
                 validLeaveGroupMembers.add(member);
             } catch (KafkaException e) {
                 memberResponses.add(
                     new MemberResponse()
-                        .setMemberId(memberId)
-                        .setGroupInstanceId(instanceId)
+                        .setMemberId(memberIdentity.memberId())
+                        .setGroupInstanceId(memberIdentity.groupInstanceId())
                         .setErrorCode(Errors.forException(e).code())
                 );
             }
@@ -6126,7 +6124,6 @@ public class GroupMetadataManager {
      * Handle a classic LeaveGroupRequest to a ClassicGroup.
      *
      * @param group          The ClassicGroup.
-     * @param context        The request context.
      * @param request        The actual LeaveGroup request.
      *
      * @return The LeaveGroup response and the GroupMetadata record to append 
if the group
@@ -6134,7 +6131,6 @@ public class GroupMetadataManager {
      */
     private CoordinatorResult<LeaveGroupResponseData, CoordinatorRecord> 
classicGroupLeaveToClassicGroup(
         ClassicGroup group,
-        RequestContext context,
         LeaveGroupRequestData request
     ) throws UnknownMemberIdException {
         if (group.isInState(DEAD)) {
diff --git 
a/group-coordinator/src/test/java/org/apache/kafka/coordinator/group/GroupMetadataManagerTest.java
 
b/group-coordinator/src/test/java/org/apache/kafka/coordinator/group/GroupMetadataManagerTest.java
index 18540253487..0535f763b4f 100644
--- 
a/group-coordinator/src/test/java/org/apache/kafka/coordinator/group/GroupMetadataManagerTest.java
+++ 
b/group-coordinator/src/test/java/org/apache/kafka/coordinator/group/GroupMetadataManagerTest.java
@@ -117,8 +117,6 @@ import java.util.stream.Collectors;
 import java.util.stream.IntStream;
 import java.util.stream.Stream;
 
-import static 
org.apache.kafka.common.message.JoinGroupRequestData.JoinGroupRequestProtocol;
-import static 
org.apache.kafka.common.message.JoinGroupRequestData.JoinGroupRequestProtocolCollection;
 import static org.apache.kafka.common.protocol.Errors.NOT_COORDINATOR;
 import static 
org.apache.kafka.common.requests.ConsumerGroupHeartbeatRequest.LEAVE_GROUP_MEMBER_EPOCH;
 import static 
org.apache.kafka.common.requests.ConsumerGroupHeartbeatRequest.LEAVE_GROUP_STATIC_MEMBER_EPOCH;
@@ -13861,15 +13859,17 @@ public class GroupMetadataManagerTest {
         context.assertJoinTimeout(groupId, memberId2, 
member2.rebalanceTimeoutMs());
         context.assertSessionTimeout(groupId, memberId2, 
member2.classicMemberMetadata().get().sessionTimeoutMs());
 
-        // Member 1 and member 2 leave the group.
+        // Member 1, member 2 and member 3 leave the group.
         CoordinatorResult<LeaveGroupResponseData, CoordinatorRecord> 
leaveResult = context.sendClassicGroupLeave(
             new LeaveGroupRequestData()
                 .setGroupId("group-id")
                 .setMembers(List.of(
                     // Valid member id.
                     new MemberIdentity()
-                        .setMemberId(memberId1),
+                        .setMemberId(memberId1)
+                        .setGroupInstanceId(null),
                     new MemberIdentity()
+                        .setMemberId(UNKNOWN_MEMBER_ID)
                         .setGroupInstanceId(instanceId2),
                     // Member that doesn't use the classic protocol.
                     new MemberIdentity()
@@ -13877,8 +13877,10 @@ public class GroupMetadataManagerTest {
                         .setGroupInstanceId(instanceId3),
                     // Unknown member id.
                     new MemberIdentity()
-                        .setMemberId("unknown-member-id"),
+                        .setMemberId("unknown-member-id")
+                        .setGroupInstanceId(null),
                     new MemberIdentity()
+                        .setMemberId(UNKNOWN_MEMBER_ID)
                         .setGroupInstanceId("unknown-instance-id"),
                     // Fenced instance id.
                     new MemberIdentity()
@@ -13895,11 +13897,10 @@ public class GroupMetadataManagerTest {
                         .setMemberId(memberId1),
                     new LeaveGroupResponseData.MemberResponse()
                         .setGroupInstanceId(instanceId2)
-                        .setMemberId(memberId2),
+                        .setMemberId(UNKNOWN_MEMBER_ID),
                     new LeaveGroupResponseData.MemberResponse()
                         .setGroupInstanceId(instanceId3)
-                        .setMemberId(memberId3)
-                        .setErrorCode(Errors.UNKNOWN_MEMBER_ID.code()),
+                        .setMemberId(memberId3),
                     new LeaveGroupResponseData.MemberResponse()
                         .setGroupInstanceId(null)
                         .setMemberId("unknown-member-id")
@@ -13908,8 +13909,8 @@ public class GroupMetadataManagerTest {
                         .setGroupInstanceId("unknown-instance-id")
                         .setErrorCode(Errors.UNKNOWN_MEMBER_ID.code()),
                     new LeaveGroupResponseData.MemberResponse()
-                        .setGroupInstanceId(instanceId3)
                         .setMemberId("unknown-member-id")
+                        .setGroupInstanceId(instanceId3)
                         .setErrorCode(Errors.FENCED_INSTANCE_ID.code())
                 )),
             leaveResult.response()
@@ -13924,6 +13925,12 @@ public class GroupMetadataManagerTest {
             
GroupCoordinatorRecordHelpers.newConsumerGroupCurrentAssignmentTombstoneRecord(groupId,
 memberId2),
             
GroupCoordinatorRecordHelpers.newConsumerGroupTargetAssignmentTombstoneRecord(groupId,
 memberId2),
             
GroupCoordinatorRecordHelpers.newConsumerGroupMemberSubscriptionTombstoneRecord(groupId,
 memberId2),
+            // Remove member 3.
+            
GroupCoordinatorRecordHelpers.newConsumerGroupCurrentAssignmentTombstoneRecord(groupId,
 memberId3),
+            
GroupCoordinatorRecordHelpers.newConsumerGroupTargetAssignmentTombstoneRecord(groupId,
 memberId3),
+            
GroupCoordinatorRecordHelpers.newConsumerGroupMemberSubscriptionTombstoneRecord(groupId,
 memberId3),
+            // Update subscription metadata.
+            
GroupCoordinatorRecordHelpers.newConsumerGroupSubscriptionMetadataRecord(groupId,
 Collections.emptyMap()),
             // Bump the group epoch.
             GroupCoordinatorRecordHelpers.newConsumerGroupEpochRecord(groupId, 
11)
         );
@@ -14045,7 +14052,7 @@ public class GroupMetadataManagerTest {
         String groupId = "group-id";
         String memberId = Uuid.randomUuid().toString();
 
-        // Consumer group without member using the classic protocol.
+        // Consumer group.
         GroupMetadataManagerTestContext context = new 
GroupMetadataManagerTestContext.Builder()
             .withConsumerGroup(new ConsumerGroupBuilder(groupId, 10)
                 .withMember(new ConsumerGroupMember.Builder(memberId)
@@ -14058,9 +14065,7 @@ public class GroupMetadataManagerTest {
                 .setGroupId("group-id")
                 .setMembers(List.of(
                     new MemberIdentity()
-                        .setMemberId("unknown-member-id"),
-                    new MemberIdentity()
-                        .setMemberId(memberId)
+                        .setMemberId("unknown-member-id")
                 ))
         );
 
@@ -14070,10 +14075,6 @@ public class GroupMetadataManagerTest {
                     new LeaveGroupResponseData.MemberResponse()
                         .setGroupInstanceId(null)
                         .setMemberId("unknown-member-id")
-                        .setErrorCode(Errors.UNKNOWN_MEMBER_ID.code()),
-                    new LeaveGroupResponseData.MemberResponse()
-                        .setGroupInstanceId(null)
-                        .setMemberId(memberId)
                         .setErrorCode(Errors.UNKNOWN_MEMBER_ID.code())
                 )),
             leaveResult.response()
@@ -16047,9 +16048,15 @@ public class GroupMetadataManagerTest {
             new LeaveGroupRequestData()
                 .setGroupId(groupId)
                 .setMembers(List.of(
-                    new MemberIdentity().setGroupInstanceId(memberId1),
-                    new MemberIdentity().setGroupInstanceId(memberId2),
-                    new MemberIdentity().setGroupInstanceId(memberId3)
+                    new MemberIdentity()
+                        .setMemberId(memberId1)
+                        .setGroupInstanceId(null),
+                    new MemberIdentity()
+                        .setMemberId(memberId2)
+                        .setGroupInstanceId(memberId2),
+                    new MemberIdentity()
+                        .setMemberId(UNKNOWN_MEMBER_ID)
+                        .setGroupInstanceId(memberId3)
                 ))
         );
 
@@ -16058,12 +16065,12 @@ public class GroupMetadataManagerTest {
                 .setMembers(List.of(
                     new LeaveGroupResponseData.MemberResponse()
                         .setMemberId(memberId1)
-                        .setGroupInstanceId(memberId1),
+                        .setGroupInstanceId(null),
                     new LeaveGroupResponseData.MemberResponse()
                         .setMemberId(memberId2)
                         .setGroupInstanceId(memberId2),
                     new LeaveGroupResponseData.MemberResponse()
-                        .setMemberId(memberId3)
+                        .setMemberId(UNKNOWN_MEMBER_ID)
                         .setGroupInstanceId(memberId3)
                 )),
             result.response()

Reply via email to