This is an automated email from the ASF dual-hosted git repository.

dajac pushed a commit to branch trunk
in repository https://gitbox.apache.org/repos/asf/kafka.git


The following commit(s) were added to refs/heads/trunk by this push:
     new e9057aab37a KAFKA-14502; Implement LeaveGroup protocol in new 
GroupCoordinator (#14147)
e9057aab37a is described below

commit e9057aab37a1aedc2917f82922aff11661d9a001
Author: Jeff Kim <[email protected]>
AuthorDate: Wed Sep 13 04:43:37 2023 -0400

    KAFKA-14502; Implement LeaveGroup protocol in new GroupCoordinator (#14147)
    
    This patch implements the LeaveGroup API in the new group coordinator.
    
    Reviewers: David Jacot <[email protected]>
---
 checkstyle/suppressions.xml                        |   2 +-
 .../coordinator/group/GroupCoordinatorService.java |  42 +-
 .../coordinator/group/GroupCoordinatorShard.java   |  18 +
 .../coordinator/group/GroupMetadataManager.java    | 147 +++++
 .../coordinator/group/generic/GenericGroup.java    |   4 +-
 .../group/runtime/CoordinatorResult.java           |   8 +-
 .../group/runtime/CoordinatorRuntime.java          |   4 +-
 .../group/GroupCoordinatorServiceTest.java         |  87 +++
 .../group/GroupMetadataManagerTest.java            | 599 ++++++++++++++++++++-
 9 files changed, 893 insertions(+), 18 deletions(-)

diff --git a/checkstyle/suppressions.xml b/checkstyle/suppressions.xml
index 87f10655bd7..6e946b1bead 100644
--- a/checkstyle/suppressions.xml
+++ b/checkstyle/suppressions.xml
@@ -330,7 +330,7 @@
     <suppress checks="ParameterNumber"
               files="(ConsumerGroupMember|GroupMetadataManager).java"/>
     <suppress checks="ClassDataAbstractionCouplingCheck"
-              
files="(RecordHelpersTest|GroupMetadataManagerTest|GroupCoordinatorServiceTest).java"/>
+              
files="(RecordHelpersTest|GroupMetadataManager|GroupMetadataManagerTest|GroupCoordinatorServiceTest).java"/>
     <suppress checks="JavaNCSS"
               files="GroupMetadataManagerTest.java"/>
 
diff --git 
a/group-coordinator/src/main/java/org/apache/kafka/coordinator/group/GroupCoordinatorService.java
 
b/group-coordinator/src/main/java/org/apache/kafka/coordinator/group/GroupCoordinatorService.java
index f018fa0ae6e..ac3aa9d45b4 100644
--- 
a/group-coordinator/src/main/java/org/apache/kafka/coordinator/group/GroupCoordinatorService.java
+++ 
b/group-coordinator/src/main/java/org/apache/kafka/coordinator/group/GroupCoordinatorService.java
@@ -26,6 +26,7 @@ import 
org.apache.kafka.common.errors.NotEnoughReplicasException;
 import org.apache.kafka.common.errors.NotLeaderOrFollowerException;
 import org.apache.kafka.common.errors.RecordBatchTooLargeException;
 import org.apache.kafka.common.errors.RecordTooLargeException;
+import org.apache.kafka.common.errors.UnknownMemberIdException;
 import org.apache.kafka.common.errors.UnknownTopicOrPartitionException;
 import org.apache.kafka.common.internals.Topic;
 import org.apache.kafka.common.message.ConsumerGroupHeartbeatRequestData;
@@ -79,6 +80,7 @@ import java.util.Properties;
 import java.util.concurrent.CompletableFuture;
 import java.util.concurrent.atomic.AtomicBoolean;
 import java.util.function.IntSupplier;
+import java.util.stream.Collectors;
 
 /**
  * The group coordinator service.
@@ -302,7 +304,8 @@ public class GroupCoordinatorService implements 
GroupCoordinator {
             return responseFuture;
         }
 
-        runtime.scheduleWriteOperation("generic-group-join",
+        runtime.scheduleWriteOperation(
+            "generic-group-join",
             topicPartitionFor(request.groupId()),
             coordinator -> coordinator.genericGroupJoin(context, request, 
responseFuture)
         ).exceptionally(exception -> {
@@ -341,7 +344,8 @@ public class GroupCoordinatorService implements 
GroupCoordinator {
 
         CompletableFuture<SyncGroupResponseData> responseFuture = new 
CompletableFuture<>();
 
-        runtime.scheduleWriteOperation("generic-group-sync",
+        runtime.scheduleWriteOperation(
+            "generic-group-sync",
             topicPartitionFor(request.groupId()),
             coordinator -> coordinator.genericGroupSync(context, request, 
responseFuture)
         ).exceptionally(exception -> {
@@ -411,9 +415,37 @@ public class GroupCoordinatorService implements 
GroupCoordinator {
             return 
FutureUtils.failedFuture(Errors.COORDINATOR_NOT_AVAILABLE.exception());
         }
 
-        return FutureUtils.failedFuture(Errors.UNSUPPORTED_VERSION.exception(
-            "This API is not implemented yet."
-        ));
+        if (!isGroupIdNotEmpty(request.groupId())) {
+            return CompletableFuture.completedFuture(new 
LeaveGroupResponseData()
+                .setErrorCode(Errors.INVALID_GROUP_ID.code()));
+        }
+
+        return runtime.scheduleWriteOperation(
+            "generic-group-leave",
+            topicPartitionFor(request.groupId()),
+            coordinator -> coordinator.genericGroupLeave(context, request)
+        ).exceptionally(exception -> {
+            if (!(exception instanceof KafkaException)) {
+                log.error("LeaveGroup request {} hit an unexpected exception: 
{}",
+                    request, exception.getMessage());
+            }
+
+            if (exception instanceof UnknownMemberIdException) {
+                // Group was not found.
+                List<LeaveGroupResponseData.MemberResponse> memberResponses = 
request.members().stream()
+                    .map(member -> new LeaveGroupResponseData.MemberResponse()
+                        .setMemberId(member.memberId())
+                        .setGroupInstanceId(member.groupInstanceId())
+                        .setErrorCode(Errors.UNKNOWN_MEMBER_ID.code()))
+                    .collect(Collectors.toList());
+
+                return new LeaveGroupResponseData()
+                    .setMembers(memberResponses);
+            }
+
+            return new LeaveGroupResponseData()
+                .setErrorCode(Errors.forException(exception).code());
+        });
     }
 
     /**
diff --git 
a/group-coordinator/src/main/java/org/apache/kafka/coordinator/group/GroupCoordinatorShard.java
 
b/group-coordinator/src/main/java/org/apache/kafka/coordinator/group/GroupCoordinatorShard.java
index 94a4329f54c..ed68bb22c5a 100644
--- 
a/group-coordinator/src/main/java/org/apache/kafka/coordinator/group/GroupCoordinatorShard.java
+++ 
b/group-coordinator/src/main/java/org/apache/kafka/coordinator/group/GroupCoordinatorShard.java
@@ -22,6 +22,8 @@ import org.apache.kafka.common.message.HeartbeatRequestData;
 import org.apache.kafka.common.message.HeartbeatResponseData;
 import org.apache.kafka.common.message.JoinGroupRequestData;
 import org.apache.kafka.common.message.JoinGroupResponseData;
+import org.apache.kafka.common.message.LeaveGroupRequestData;
+import org.apache.kafka.common.message.LeaveGroupResponseData;
 import org.apache.kafka.common.message.OffsetCommitRequestData;
 import org.apache.kafka.common.message.OffsetCommitResponseData;
 import org.apache.kafka.common.message.OffsetFetchRequestData;
@@ -306,6 +308,22 @@ public class GroupCoordinatorShard implements 
CoordinatorShard<Record> {
         return offsetMetadataManager.commitOffset(context, request);
     }
 
+    /**
+     * Handles a LeaveGroup request.
+     *
+     * @param context The request context.
+     * @param request The actual LeaveGroup request.
+     *
+     * @return A Result containing the LeaveGroup response and
+     *         a list of records to update the state machine.
+     */
+    public CoordinatorResult<LeaveGroupResponseData, Record> genericGroupLeave(
+        RequestContext context,
+        LeaveGroupRequestData request
+    ) throws ApiException {
+        return groupMetadataManager.genericGroupLeave(context, request);
+    }
+
     /**
      * The coordinator has been loaded. This is used to apply any
      * post loading operations (e.g. registering timers).
diff --git 
a/group-coordinator/src/main/java/org/apache/kafka/coordinator/group/GroupMetadataManager.java
 
b/group-coordinator/src/main/java/org/apache/kafka/coordinator/group/GroupMetadataManager.java
index fce74d58561..a174a0fb1ff 100644
--- 
a/group-coordinator/src/main/java/org/apache/kafka/coordinator/group/GroupMetadataManager.java
+++ 
b/group-coordinator/src/main/java/org/apache/kafka/coordinator/group/GroupMetadataManager.java
@@ -37,6 +37,10 @@ import 
org.apache.kafka.common.message.JoinGroupRequestData.JoinGroupRequestProt
 import 
org.apache.kafka.common.message.JoinGroupRequestData.JoinGroupRequestProtocolCollection;
 import org.apache.kafka.common.message.JoinGroupRequestData;
 import org.apache.kafka.common.message.JoinGroupResponseData;
+import org.apache.kafka.common.message.LeaveGroupRequestData;
+import org.apache.kafka.common.message.LeaveGroupRequestData.MemberIdentity;
+import org.apache.kafka.common.message.LeaveGroupResponseData;
+import org.apache.kafka.common.message.LeaveGroupResponseData.MemberResponse;
 import org.apache.kafka.common.message.SyncGroupRequestData;
 import org.apache.kafka.common.message.SyncGroupResponseData;
 import org.apache.kafka.common.protocol.Errors;
@@ -2904,6 +2908,149 @@ public class GroupMetadataManager {
         }
     }
 
+    /**
+     * Handle a generic LeaveGroupRequest.
+     *
+     * @param context        The request context.
+     * @param request        The actual LeaveGroup request.
+     *
+     * @return The LeaveGroup response and the GroupMetadata record to append 
if the group
+     *         no longer has any members.
+     */
+    public CoordinatorResult<LeaveGroupResponseData, Record> genericGroupLeave(
+        RequestContext context,
+        LeaveGroupRequestData request
+    ) throws UnknownMemberIdException, GroupIdNotFoundException {
+        GenericGroup group = getOrMaybeCreateGenericGroup(request.groupId(), 
false);
+        if (group.isInState(DEAD)) {
+            return new CoordinatorResult<>(
+                Collections.emptyList(),
+                new LeaveGroupResponseData()
+                    .setErrorCode(COORDINATOR_NOT_AVAILABLE.code())
+            );
+        }
+
+        List<MemberResponse> memberResponses = new ArrayList<>();
+
+        for (MemberIdentity member: request.members()) {
+            String reason = member.reason() != null ? member.reason() : "not 
provided";
+            // The LeaveGroup API allows administrative removal of members by 
GroupInstanceId
+            // in which case we expect the MemberId to be undefined.
+            if (UNKNOWN_MEMBER_ID.equals(member.memberId())) {
+                if (member.groupInstanceId() != null && 
group.hasStaticMember(member.groupInstanceId())) {
+                    removeCurrentMemberFromGenericGroup(
+                        group,
+                        group.staticMemberId(member.groupInstanceId()),
+                        reason
+                    );
+                    memberResponses.add(
+                        new MemberResponse()
+                            .setMemberId(member.memberId())
+                            .setGroupInstanceId(member.groupInstanceId())
+                    );
+                } else {
+                    memberResponses.add(
+                        new MemberResponse()
+                            .setMemberId(member.memberId())
+                            .setGroupInstanceId(member.groupInstanceId())
+                            .setErrorCode(Errors.UNKNOWN_MEMBER_ID.code())
+                    );
+                }
+            } else if (group.isPendingMember(member.memberId())) {
+                group.remove(member.memberId());
+                timer.cancel(genericGroupHeartbeatKey(group.groupId(), 
member.memberId()));
+                log.info("[Group {}] Pending member {} has left group through 
explicit `LeaveGroup` request; client reason: {}",
+                    group.groupId(), member.memberId(), reason);
+
+                memberResponses.add(
+                    new MemberResponse()
+                        .setMemberId(member.memberId())
+                        .setGroupInstanceId(member.groupInstanceId())
+                );
+            } else {
+                try {
+                    group.validateMember(member.memberId(), 
member.groupInstanceId(), "leave-group");
+                    removeCurrentMemberFromGenericGroup(
+                        group,
+                        member.memberId(),
+                        reason
+                    );
+                    memberResponses.add(
+                        new MemberResponse()
+                            .setMemberId(member.memberId())
+                            .setGroupInstanceId(member.groupInstanceId())
+                    );
+                } catch (KafkaException e) {
+                    memberResponses.add(
+                        new MemberResponse()
+                            .setMemberId(member.memberId())
+                            .setGroupInstanceId(member.groupInstanceId())
+                            .setErrorCode(Errors.forException(e).code())
+                    );
+                }
+            }
+        }
+
+        List<String> validLeaveGroupMembers = memberResponses.stream()
+            .filter(response -> response.errorCode() == Errors.NONE.code())
+            .map(MemberResponse::memberId)
+            .collect(Collectors.toList());
+
+        String reason = "explicit `LeaveGroup` request for (" + String.join(", 
", validLeaveGroupMembers) + ") members.";
+        CoordinatorResult<Void, Record> coordinatorResult = EMPTY_RESULT;
+
+        if (!validLeaveGroupMembers.isEmpty()) {
+            switch (group.currentState()) {
+                case STABLE:
+                case COMPLETING_REBALANCE:
+                    coordinatorResult = 
maybePrepareRebalanceOrCompleteJoin(group, reason);
+                    break;
+                case PREPARING_REBALANCE:
+                    coordinatorResult = maybeCompleteJoinPhase(group);
+                    break;
+                default:
+            }
+        }
+
+        return new CoordinatorResult<>(
+            coordinatorResult.records(),
+            new LeaveGroupResponseData()
+                .setMembers(memberResponses),
+            coordinatorResult.appendFuture()
+        );
+    }
+
+    /**
+     * Remove a member from the group. Cancel member's heartbeat, and prepare 
rebalance
+     * or complete the join phase if necessary.
+     * 
+     * @param group     The generic group.
+     * @param memberId  The member id.
+     * @param reason    The reason for the LeaveGroup request.
+     *
+     */
+    private void removeCurrentMemberFromGenericGroup(
+        GenericGroup group,
+        String memberId,
+        String reason
+    ) {
+        GenericGroupMember member = group.member(memberId);
+        timer.cancel(genericGroupHeartbeatKey(group.groupId(), memberId));
+        log.info("[Group {}] Member {} has left group through explicit 
`LeaveGroup` request; client reason: {}",
+            group.groupId(), memberId, reason);
+
+        // New members may timeout with a pending JoinGroup while the group is 
still rebalancing, so we have
+        // to invoke the callback before removing the member. We return 
UNKNOWN_MEMBER_ID so that the consumer
+        // will retry the JoinGroup request if is still active.
+        group.completeJoinFuture(
+            member,
+            new JoinGroupResponseData()
+                .setMemberId(UNKNOWN_MEMBER_ID)
+                .setErrorCode(Errors.UNKNOWN_MEMBER_ID.code())
+        );
+        group.remove(member.memberId());
+    }
+
     /**
      * Checks whether the given protocol type or name in the request is 
inconsistent with the group's.
      *
diff --git 
a/group-coordinator/src/main/java/org/apache/kafka/coordinator/group/generic/GenericGroup.java
 
b/group-coordinator/src/main/java/org/apache/kafka/coordinator/group/generic/GenericGroup.java
index 3347970aaf0..d163f385fe0 100644
--- 
a/group-coordinator/src/main/java/org/apache/kafka/coordinator/group/generic/GenericGroup.java
+++ 
b/group-coordinator/src/main/java/org/apache/kafka/coordinator/group/generic/GenericGroup.java
@@ -692,10 +692,10 @@ public class GenericGroup implements Group {
     }
 
     /**
-     * @return all static members in the group.
+     * @return the ids of all static members in the group.
      */
     public Set<String> allStaticMemberIds() {
-        return staticMembers.keySet();
+        return new HashSet<>(staticMembers.values());
     }
 
     // For testing only.
diff --git 
a/group-coordinator/src/main/java/org/apache/kafka/coordinator/group/runtime/CoordinatorResult.java
 
b/group-coordinator/src/main/java/org/apache/kafka/coordinator/group/runtime/CoordinatorResult.java
index 92907797581..53ce2f886de 100644
--- 
a/group-coordinator/src/main/java/org/apache/kafka/coordinator/group/runtime/CoordinatorResult.java
+++ 
b/group-coordinator/src/main/java/org/apache/kafka/coordinator/group/runtime/CoordinatorResult.java
@@ -41,7 +41,7 @@ public class CoordinatorResult<T, U> {
     /**
      * The future to complete once the records are committed.
      */
-    private final CompletableFuture<T> appendFuture;
+    private final CompletableFuture<Void> appendFuture;
 
     /**
      * Constructs a Result with records and a response.
@@ -64,7 +64,7 @@ public class CoordinatorResult<T, U> {
      */
     public CoordinatorResult(
         List<U> records,
-        CompletableFuture<T> appendFuture
+        CompletableFuture<Void> appendFuture
     ) {
         this(records, null, appendFuture);
     }
@@ -79,7 +79,7 @@ public class CoordinatorResult<T, U> {
     public CoordinatorResult(
         List<U> records,
         T response,
-        CompletableFuture<T> appendFuture
+        CompletableFuture<Void> appendFuture
     ) {
         this.records = Objects.requireNonNull(records);
         this.response = response;
@@ -114,7 +114,7 @@ public class CoordinatorResult<T, U> {
     /**
      * @return The append-future.
      */
-    public CompletableFuture<T> appendFuture() {
+    public CompletableFuture<Void> appendFuture() {
         return appendFuture;
     }
 
diff --git 
a/group-coordinator/src/main/java/org/apache/kafka/coordinator/group/runtime/CoordinatorRuntime.java
 
b/group-coordinator/src/main/java/org/apache/kafka/coordinator/group/runtime/CoordinatorRuntime.java
index 9ae0d1fc42a..9cf277d80c4 100644
--- 
a/group-coordinator/src/main/java/org/apache/kafka/coordinator/group/runtime/CoordinatorRuntime.java
+++ 
b/group-coordinator/src/main/java/org/apache/kafka/coordinator/group/runtime/CoordinatorRuntime.java
@@ -698,10 +698,10 @@ public class CoordinatorRuntime<S extends 
CoordinatorShard<U>, U> implements Aut
          */
         @Override
         public void complete(Throwable exception) {
-            CompletableFuture<T> appendFuture = result != null ? 
result.appendFuture() : null;
+            CompletableFuture<Void> appendFuture = result != null ? 
result.appendFuture() : null;
 
             if (exception == null) {
-                if (appendFuture != null) 
result.appendFuture().complete(result.response());
+                if (appendFuture != null) result.appendFuture().complete(null);
                 future.complete(result.response());
             } else {
                 if (appendFuture != null) 
result.appendFuture().completeExceptionally(exception);
diff --git 
a/group-coordinator/src/test/java/org/apache/kafka/coordinator/group/GroupCoordinatorServiceTest.java
 
b/group-coordinator/src/test/java/org/apache/kafka/coordinator/group/GroupCoordinatorServiceTest.java
index c5e0c97555b..61af5582824 100644
--- 
a/group-coordinator/src/test/java/org/apache/kafka/coordinator/group/GroupCoordinatorServiceTest.java
+++ 
b/group-coordinator/src/test/java/org/apache/kafka/coordinator/group/GroupCoordinatorServiceTest.java
@@ -28,6 +28,7 @@ import 
org.apache.kafka.common.errors.NotLeaderOrFollowerException;
 import org.apache.kafka.common.errors.RebalanceInProgressException;
 import org.apache.kafka.common.errors.RecordBatchTooLargeException;
 import org.apache.kafka.common.errors.RecordTooLargeException;
+import org.apache.kafka.common.errors.UnknownMemberIdException;
 import org.apache.kafka.common.errors.UnknownTopicOrPartitionException;
 import org.apache.kafka.common.message.ConsumerGroupHeartbeatRequestData;
 import org.apache.kafka.common.message.ConsumerGroupHeartbeatResponseData;
@@ -37,6 +38,8 @@ import org.apache.kafka.common.message.JoinGroupRequestData;
 import org.apache.kafka.common.message.JoinGroupResponseData;
 import org.apache.kafka.common.message.OffsetFetchRequestData;
 import org.apache.kafka.common.message.OffsetFetchResponseData;
+import org.apache.kafka.common.message.LeaveGroupRequestData;
+import org.apache.kafka.common.message.LeaveGroupResponseData;
 import org.apache.kafka.common.message.SyncGroupRequestData;
 import org.apache.kafka.common.message.SyncGroupResponseData;
 import org.apache.kafka.common.network.ClientInformation;
@@ -62,6 +65,7 @@ import org.junit.jupiter.params.provider.ValueSource;
 import org.mockito.ArgumentMatchers;
 
 import java.net.InetAddress;
+import java.util.Arrays;
 import java.util.Collections;
 import java.util.OptionalInt;
 import java.util.Properties;
@@ -701,4 +705,87 @@ public class GroupCoordinatorServiceTest {
 
         assertEquals(response, future.get(5, TimeUnit.SECONDS));
     }
+
+    @Test
+    public void testLeaveGroup() throws Exception {
+        CoordinatorRuntime<GroupCoordinatorShard, Record> runtime = 
mockRuntime();
+        GroupCoordinatorService service = new GroupCoordinatorService(
+            new LogContext(),
+            createConfig(),
+            runtime
+        );
+
+        LeaveGroupRequestData request = new LeaveGroupRequestData()
+            .setGroupId("foo");
+
+        service.startup(() -> 1);
+
+        when(runtime.scheduleWriteOperation(
+            ArgumentMatchers.eq("generic-group-leave"),
+            ArgumentMatchers.eq(new TopicPartition("__consumer_offsets", 0)),
+            ArgumentMatchers.any()
+        )).thenReturn(CompletableFuture.completedFuture(
+            new LeaveGroupResponseData()
+        ));
+
+        CompletableFuture<LeaveGroupResponseData> future = service.leaveGroup(
+            requestContext(ApiKeys.LEAVE_GROUP),
+            request
+        );
+
+        assertTrue(future.isDone());
+        assertEquals(new LeaveGroupResponseData(), future.get());
+    }
+
+    @Test
+    public void testLeaveGroupThrowsUnknownMemberIdException() throws 
Exception {
+        CoordinatorRuntime<GroupCoordinatorShard, Record> runtime = 
mockRuntime();
+        GroupCoordinatorService service = new GroupCoordinatorService(
+            new LogContext(),
+            createConfig(),
+            runtime
+        );
+
+        LeaveGroupRequestData request = new LeaveGroupRequestData()
+            .setGroupId("foo")
+            .setMembers(Arrays.asList(
+                new LeaveGroupRequestData.MemberIdentity()
+                    .setMemberId("member-1")
+                    .setGroupInstanceId("instance-1"),
+                new LeaveGroupRequestData.MemberIdentity()
+                    .setMemberId("member-2")
+                    .setGroupInstanceId("instance-2")
+            ));
+
+        service.startup(() -> 1);
+
+        when(runtime.scheduleWriteOperation(
+            ArgumentMatchers.eq("generic-group-leave"),
+            ArgumentMatchers.eq(new TopicPartition("__consumer_offsets", 0)),
+            ArgumentMatchers.any()
+        )).thenReturn(FutureUtils.failedFuture(
+            new UnknownMemberIdException()
+        ));
+
+        CompletableFuture<LeaveGroupResponseData> future = service.leaveGroup(
+            requestContext(ApiKeys.LEAVE_GROUP),
+            request
+        );
+
+        assertTrue(future.isDone());
+        LeaveGroupResponseData expectedResponse = new LeaveGroupResponseData()
+            .setErrorCode(Errors.NONE.code())
+            .setMembers(Arrays.asList(
+                new LeaveGroupResponseData.MemberResponse()
+                    .setMemberId("member-1")
+                    .setGroupInstanceId("instance-1")
+                    .setErrorCode(Errors.UNKNOWN_MEMBER_ID.code()),
+                new LeaveGroupResponseData.MemberResponse()
+                    .setMemberId("member-2")
+                    .setGroupInstanceId("instance-2")
+                    .setErrorCode(Errors.UNKNOWN_MEMBER_ID.code())
+            ));
+
+        assertEquals(expectedResponse, future.get());
+    }
 }
diff --git 
a/group-coordinator/src/test/java/org/apache/kafka/coordinator/group/GroupMetadataManagerTest.java
 
b/group-coordinator/src/test/java/org/apache/kafka/coordinator/group/GroupMetadataManagerTest.java
index ef553db41b9..3a735db37e3 100644
--- 
a/group-coordinator/src/test/java/org/apache/kafka/coordinator/group/GroupMetadataManagerTest.java
+++ 
b/group-coordinator/src/test/java/org/apache/kafka/coordinator/group/GroupMetadataManagerTest.java
@@ -38,6 +38,9 @@ import org.apache.kafka.common.message.HeartbeatResponseData;
 import org.apache.kafka.common.message.JoinGroupRequestData;
 import org.apache.kafka.common.message.JoinGroupResponseData;
 import 
org.apache.kafka.common.message.JoinGroupResponseData.JoinGroupResponseMember;
+import org.apache.kafka.common.message.LeaveGroupRequestData;
+import org.apache.kafka.common.message.LeaveGroupRequestData.MemberIdentity;
+import org.apache.kafka.common.message.LeaveGroupResponseData;
 import org.apache.kafka.common.message.SyncGroupRequestData;
 import 
org.apache.kafka.common.message.SyncGroupRequestData.SyncGroupRequestAssignment;
 import org.apache.kafka.common.message.SyncGroupResponseData;
@@ -738,7 +741,6 @@ public class GroupMetadataManagerTest {
             return new SyncResult(responseFuture, coordinatorResult);
         }
 
-
         public RebalanceResult staticMembersJoinAndRebalance(
             String groupId,
             String leaderInstanceId,
@@ -854,7 +856,7 @@ public class GroupMetadataManagerTest {
             );
         }
 
-        public JoinGroupResponseData setupGroupWithPendingMember(GenericGroup 
group) throws Exception {
+        public PendingMemberGroupResult 
setupGroupWithPendingMember(GenericGroup group) throws Exception {
             // Add the first member
             JoinGroupRequestData joinRequest = new JoinGroupRequestBuilder()
                 .withGroupId("group-id")
@@ -957,9 +959,14 @@ public class GroupMetadataManagerTest {
             assertTrue(followerJoinResult.records.isEmpty());
             assertFalse(followerJoinResult.joinFuture.isDone());
             assertTrue(group.isInState(PREPARING_REBALANCE));
+            assertEquals(2, group.size());
             assertEquals(1, group.numPendingJoinMembers());
 
-            return pendingMemberJoinResult.joinFuture.get();
+            return new PendingMemberGroupResult(
+                leaderJoinResponse.memberId(),
+                followerId,
+                pendingMemberJoinResult.joinFuture.get()
+            );
         }
 
         public void verifySessionExpiration(GenericGroup group, int timeoutMs) 
{
@@ -1099,6 +1106,28 @@ public class GroupMetadataManagerTest {
             return joinResponses;
         }
 
+        public CoordinatorResult<LeaveGroupResponseData, Record> 
sendGenericGroupLeave(
+            LeaveGroupRequestData request
+        ) {
+            RequestContext context = new RequestContext(
+                new RequestHeader(
+                    ApiKeys.LEAVE_GROUP,
+                    ApiKeys.LEAVE_GROUP.latestVersion(),
+                    "client",
+                    0
+                ),
+                "1",
+                InetAddress.getLoopbackAddress(),
+                KafkaPrincipal.ANONYMOUS,
+                ListenerName.forSecurityProtocol(SecurityProtocol.PLAINTEXT),
+                SecurityProtocol.PLAINTEXT,
+                ClientInformation.EMPTY,
+                false
+            );
+
+            return groupMetadataManager.genericGroupLeave(context, request);
+        }
+
         private ApiMessage messageOrNull(ApiMessageAndVersion 
apiMessageAndVersion) {
             if (apiMessageAndVersion == null) {
                 return null;
@@ -7730,7 +7759,7 @@ public class GroupMetadataManagerTest {
 
         // Set up a group in with a pending member. The test checks if the 
pending member joining
         // completes the rebalancing operation
-        JoinGroupResponseData pendingMemberResponse = 
context.setupGroupWithPendingMember(group);
+        JoinGroupResponseData pendingMemberResponse = 
context.setupGroupWithPendingMember(group).pendingMemberResponse;
 
         // Compete join group for the pending member
         JoinGroupRequestData request = new JoinGroupRequestBuilder()
@@ -8747,6 +8776,552 @@ public class GroupMetadataManagerTest {
         assertEquals(expectedGroupInstanceIds, groupInstanceIds);
     }
 
+    @Test
+    public void testGroupStuckInRebalanceTimeoutDueToNonjoinedStaticMember() 
throws Exception {
+        GroupMetadataManagerTestContext context = new 
GroupMetadataManagerTestContext.Builder()
+            .build();
+        int longSessionTimeoutMs = 10000;
+        int rebalanceTimeoutMs = 5000;
+        RebalanceResult rebalanceResult = 
context.staticMembersJoinAndRebalance(
+            "group-id",
+            "leader-instance-id",
+            "follower-instance-id",
+            rebalanceTimeoutMs,
+            longSessionTimeoutMs
+        );
+        GenericGroup group = 
context.groupMetadataManager.getOrMaybeCreateGenericGroup("group-id", false);
+
+        // New member joins
+        JoinResult joinResult = context.sendGenericGroupJoin(
+            new JoinGroupRequestBuilder()
+                .withGroupId("group-id")
+                .withMemberId(UNKNOWN_MEMBER_ID)
+                .withProtocolSuperset()
+                .withSessionTimeoutMs(longSessionTimeoutMs)
+                .build()
+        );
+
+        // The new dynamic member has been elected as leader
+        assertNoOrEmptyResult(context.sleep(rebalanceTimeoutMs));
+        assertTrue(joinResult.joinFuture.isDone());
+        assertEquals(Errors.NONE.code(), 
joinResult.joinFuture.get().errorCode());
+        assertEquals(joinResult.joinFuture.get().leader(), 
joinResult.joinFuture.get().memberId());
+        assertEquals(3, joinResult.joinFuture.get().members().size());
+        assertEquals(2, joinResult.joinFuture.get().generationId());
+        assertTrue(group.isInState(COMPLETING_REBALANCE));
+
+        assertEquals(
+            mkSet(rebalanceResult.leaderId, rebalanceResult.followerId, 
joinResult.joinFuture.get().memberId()),
+            group.allMemberIds()
+        );
+        assertEquals(
+            mkSet(rebalanceResult.leaderId, rebalanceResult.followerId),
+            group.allStaticMemberIds()
+        );
+        assertEquals(
+            mkSet(joinResult.joinFuture.get().memberId()),
+            group.allDynamicMemberIds()
+        );
+
+        // Send a special leave group request from static follower, moving 
group towards PreparingRebalance
+        CoordinatorResult<LeaveGroupResponseData, Record> leaveResult = 
context.sendGenericGroupLeave(
+            new LeaveGroupRequestData()
+                .setGroupId("group-id")
+                .setMembers(Collections.singletonList(
+                    new MemberIdentity()
+                        .setMemberId(rebalanceResult.followerId)
+                        .setGroupInstanceId("follower-instance-id")
+                ))
+        );
+
+        LeaveGroupResponseData expectedResponse = new LeaveGroupResponseData()
+            .setMembers(Collections.singletonList(
+                new LeaveGroupResponseData.MemberResponse()
+                    .setMemberId(rebalanceResult.followerId)
+                    .setGroupInstanceId("follower-instance-id")));
+
+        assertEquals(expectedResponse, leaveResult.response());
+        assertTrue(group.isInState(PREPARING_REBALANCE));
+
+        context.sleep(rebalanceTimeoutMs);
+        // Only static leader is maintained, and group is stuck at 
PreparingRebalance stage
+        assertTrue(group.allDynamicMemberIds().isEmpty());
+        assertEquals(Collections.singleton(rebalanceResult.leaderId), 
group.allMemberIds());
+        assertTrue(group.allDynamicMemberIds().isEmpty());
+        assertEquals(2, group.generationId());
+        assertTrue(group.isInState(PREPARING_REBALANCE));
+    }
+
+    @Test
+    public void testPendingMembersLeaveGroup() throws Exception {
+        GroupMetadataManagerTestContext context = new 
GroupMetadataManagerTestContext.Builder()
+            .build();
+        GenericGroup group = context.createGenericGroup("group-id");
+        JoinGroupResponseData pendingJoinResponse = 
context.setupGroupWithPendingMember(group).pendingMemberResponse;
+
+        CoordinatorResult<LeaveGroupResponseData, Record> leaveResult = 
context.sendGenericGroupLeave(
+            new LeaveGroupRequestData()
+                .setGroupId("group-id")
+                .setMembers(Collections.singletonList(
+                    new MemberIdentity()
+                        .setMemberId(pendingJoinResponse.memberId())
+                ))
+        );
+
+        LeaveGroupResponseData expectedResponse = new LeaveGroupResponseData()
+            .setMembers(Collections.singletonList(
+                new LeaveGroupResponseData.MemberResponse()
+                    .setGroupInstanceId(null)
+                    .setMemberId(pendingJoinResponse.memberId())));
+
+        assertEquals(expectedResponse, leaveResult.response());
+        assertTrue(leaveResult.records().isEmpty());
+
+        assertTrue(group.isInState(COMPLETING_REBALANCE));
+        assertEquals(2, group.allMembers().size());
+        assertEquals(2, group.allDynamicMemberIds().size());
+        assertEquals(0, group.numPendingJoinMembers());
+    }
+
+    @Test
+    public void testLeaveGroupInvalidGroup() {
+        GroupMetadataManagerTestContext context = new 
GroupMetadataManagerTestContext.Builder()
+            .build();
+        context.createGenericGroup("group-id");
+
+        assertThrows(UnknownMemberIdException.class, () -> 
context.sendGenericGroupLeave(
+            new LeaveGroupRequestData()
+                .setGroupId("invalid-group-id")
+        ));
+    }
+
+    @Test
+    public void testLeaveGroupUnknownGroup() {
+        GroupMetadataManagerTestContext context = new 
GroupMetadataManagerTestContext.Builder()
+            .build();
+
+        assertThrows(UnknownMemberIdException.class, () -> 
context.sendGenericGroupLeave(
+            new LeaveGroupRequestData()
+                .setGroupId("unknown-group-id")
+                .setMembers(Collections.singletonList(
+                    new MemberIdentity()
+                        .setMemberId("member-id")
+                ))
+        ));
+    }
+
+    @Test
+    public void testLeaveGroupUnknownMemberIdExistingGroup() throws Exception {
+        GroupMetadataManagerTestContext context = new 
GroupMetadataManagerTestContext.Builder()
+            .build();
+        context.createGenericGroup("group-id");
+
+        context.joinGenericGroupAsDynamicMemberAndCompleteJoin(
+            new JoinGroupRequestBuilder()
+                .withGroupId("group-id")
+                .withMemberId(UNKNOWN_MEMBER_ID)
+                .withDefaultProtocolTypeAndProtocols()
+                .build()
+        );
+
+        CoordinatorResult<LeaveGroupResponseData, Record> leaveResult = 
context.sendGenericGroupLeave(
+            new LeaveGroupRequestData()
+                .setGroupId("group-id")
+                .setMembers(Collections.singletonList(
+                    new MemberIdentity()
+                        .setMemberId("unknown-member-id")
+                ))
+        );
+
+        LeaveGroupResponseData expectedResponse = new LeaveGroupResponseData()
+            .setMembers(Collections.singletonList(
+                new LeaveGroupResponseData.MemberResponse()
+                    .setGroupInstanceId(null)
+                    .setMemberId("unknown-member-id")
+                    .setErrorCode(Errors.UNKNOWN_MEMBER_ID.code())));
+
+        assertEquals(expectedResponse, leaveResult.response());
+        assertTrue(leaveResult.records().isEmpty());
+    }
+
+    @Test
+    public void testLeaveDeadGroup() {
+        GroupMetadataManagerTestContext context = new 
GroupMetadataManagerTestContext.Builder()
+            .build();
+        GenericGroup group = context.createGenericGroup("group-id");
+        group.transitionTo(DEAD);
+
+        CoordinatorResult<LeaveGroupResponseData, Record> leaveResult = 
context.sendGenericGroupLeave(
+            new LeaveGroupRequestData()
+                .setGroupId("group-id")
+                .setMembers(Collections.singletonList(
+                    new MemberIdentity()
+                        .setMemberId("member-id")
+                ))
+        );
+
+        LeaveGroupResponseData expectedResponse = new LeaveGroupResponseData()
+            .setErrorCode(Errors.COORDINATOR_NOT_AVAILABLE.code());
+
+        assertEquals(expectedResponse, leaveResult.response());
+        assertTrue(leaveResult.records().isEmpty());
+    }
+
+    @Test
+    public void testValidLeaveGroup() throws Exception {
+        GroupMetadataManagerTestContext context = new 
GroupMetadataManagerTestContext.Builder()
+            .build();
+        GenericGroup group = context.createGenericGroup("group-id");
+
+        JoinGroupResponseData joinResponse = 
context.joinGenericGroupAsDynamicMemberAndCompleteJoin(
+            new JoinGroupRequestBuilder()
+                .withGroupId("group-id")
+                .withMemberId(UNKNOWN_MEMBER_ID)
+                .withDefaultProtocolTypeAndProtocols()
+                .build()
+        );
+
+        // Dynamic member leaves. The group becomes empty.
+        CoordinatorResult<LeaveGroupResponseData, Record> leaveResult = 
context.sendGenericGroupLeave(
+            new LeaveGroupRequestData()
+                .setGroupId("group-id")
+                .setMembers(Collections.singletonList(
+                    new MemberIdentity()
+                        .setMemberId(joinResponse.memberId())
+                ))
+        );
+        assertEquals(
+            
Collections.singletonList(newGroupMetadataRecordWithCurrentState(group, 
MetadataVersion.latest())),
+            leaveResult.records()
+        );
+        // Simulate a successful write to the log.
+        leaveResult.appendFuture().complete(null);
+
+        LeaveGroupResponseData expectedResponse = new LeaveGroupResponseData()
+            .setMembers(Collections.singletonList(
+                new LeaveGroupResponseData.MemberResponse()
+                    .setGroupInstanceId(null)
+                    .setMemberId(joinResponse.memberId())));
+
+        assertEquals(expectedResponse, leaveResult.response());
+        assertTrue(group.isInState(EMPTY));
+        assertEquals(2, group.generationId());
+    }
+
+    @Test
+    public void testLeaveGroupWithFencedInstanceId() throws Exception {
+        GroupMetadataManagerTestContext context = new 
GroupMetadataManagerTestContext.Builder()
+            .build();
+        context.createGenericGroup("group-id");
+
+        context.joinGenericGroupAndCompleteJoin(
+            new JoinGroupRequestBuilder()
+                .withGroupId("group-id")
+                .withGroupInstanceId("group-instance-id")
+                .withMemberId(UNKNOWN_MEMBER_ID)
+                .withDefaultProtocolTypeAndProtocols()
+                .build(),
+            true,
+            true
+        );
+
+        CoordinatorResult<LeaveGroupResponseData, Record> leaveResult = 
context.sendGenericGroupLeave(
+            new LeaveGroupRequestData()
+                .setGroupId("group-id")
+                .setMembers(Collections.singletonList(
+                    new MemberIdentity()
+                        .setGroupInstanceId("group-instance-id")
+                        .setMemberId("other-member-id") // invalid member id
+                ))
+        );
+
+        LeaveGroupResponseData expectedResponse = new LeaveGroupResponseData()
+            .setMembers(Collections.singletonList(
+                new LeaveGroupResponseData.MemberResponse()
+                    .setGroupInstanceId("group-instance-id")
+                    .setMemberId("other-member-id")
+                    .setErrorCode(Errors.FENCED_INSTANCE_ID.code())));
+
+        assertEquals(expectedResponse, leaveResult.response());
+        assertTrue(leaveResult.records().isEmpty());
+    }
+
+    @Test
+    public void testLeaveGroupStaticMemberWithUnknownMemberId() throws 
Exception {
+        GroupMetadataManagerTestContext context = new 
GroupMetadataManagerTestContext.Builder()
+            .build();
+        context.createGenericGroup("group-id");
+
+        context.joinGenericGroupAndCompleteJoin(
+            new JoinGroupRequestBuilder()
+                .withGroupId("group-id")
+                .withGroupInstanceId("group-instance-id")
+                .withMemberId(UNKNOWN_MEMBER_ID)
+                .withDefaultProtocolTypeAndProtocols()
+                .build(),
+            true,
+            true
+        );
+
+        // Having unknown member id will not affect the request processing due 
to valid group instance id.
+        CoordinatorResult<LeaveGroupResponseData, Record> leaveResult = 
context.sendGenericGroupLeave(
+            new LeaveGroupRequestData()
+                .setGroupId("group-id")
+                .setMembers(Collections.singletonList(
+                    new MemberIdentity()
+                        .setGroupInstanceId("group-instance-id")
+                        .setMemberId(UNKNOWN_MEMBER_ID)
+                ))
+        );
+
+        LeaveGroupResponseData expectedResponse = new LeaveGroupResponseData()
+            .setMembers(Collections.singletonList(
+                new LeaveGroupResponseData.MemberResponse()
+                    .setGroupInstanceId("group-instance-id")));
+
+        assertEquals(expectedResponse, leaveResult.response());
+    }
+
+    @Test
+    public void testStaticMembersValidBatchLeaveGroup() throws Exception {
+        GroupMetadataManagerTestContext context = new 
GroupMetadataManagerTestContext.Builder()
+            .build();
+        context.staticMembersJoinAndRebalance(
+            "group-id",
+            "leader-instance-id",
+            "follower-instance-id"
+        );
+
+        CoordinatorResult<LeaveGroupResponseData, Record> leaveResult = 
context.sendGenericGroupLeave(
+            new LeaveGroupRequestData()
+                .setGroupId("group-id")
+                .setMembers(
+                    Arrays.asList(
+                        new MemberIdentity()
+                            .setGroupInstanceId("leader-instance-id"),
+                        new MemberIdentity()
+                            .setGroupInstanceId("follower-instance-id")
+                    )
+                )
+        );
+
+        LeaveGroupResponseData expectedResponse = new LeaveGroupResponseData()
+            .setMembers(Arrays.asList(
+                new LeaveGroupResponseData.MemberResponse()
+                    .setGroupInstanceId("leader-instance-id"),
+                new LeaveGroupResponseData.MemberResponse()
+                    .setGroupInstanceId("follower-instance-id")));
+
+        assertEquals(expectedResponse, leaveResult.response());
+    }
+
+    @Test
+    public void testStaticMembersLeaveUnknownGroup() throws Exception {
+        GroupMetadataManagerTestContext context = new 
GroupMetadataManagerTestContext.Builder()
+            .build();
+        context.staticMembersJoinAndRebalance(
+            "group-id",
+            "leader-instance-id",
+            "follower-instance-id"
+        );
+
+        assertThrows(UnknownMemberIdException.class, () -> 
context.sendGenericGroupLeave(
+            new LeaveGroupRequestData()
+                .setGroupId("invalid-group-id") // Invalid group id
+                .setMembers(
+                    Arrays.asList(
+                        new MemberIdentity()
+                            .setGroupInstanceId("leader-instance-id"),
+                        new MemberIdentity()
+                            .setGroupInstanceId("follower-instance-id")
+                    )
+                )
+        ));
+    }
+
+    @Test
+    public void testStaticMembersFencedInstanceBatchLeaveGroup() throws 
Exception {
+        GroupMetadataManagerTestContext context = new 
GroupMetadataManagerTestContext.Builder()
+            .build();
+        context.staticMembersJoinAndRebalance(
+            "group-id",
+            "leader-instance-id",
+            "follower-instance-id"
+        );
+
+        CoordinatorResult<LeaveGroupResponseData, Record> leaveResult = 
context.sendGenericGroupLeave(
+            new LeaveGroupRequestData()
+                .setGroupId("group-id")
+                .setMembers(
+                    Arrays.asList(
+                        new MemberIdentity()
+                            .setGroupInstanceId("leader-instance-id"),
+                        new MemberIdentity()
+                            .setGroupInstanceId("follower-instance-id")
+                            .setMemberId("invalid-member-id")
+                    )
+                )
+        );
+
+        LeaveGroupResponseData expectedResponse = new LeaveGroupResponseData()
+            .setMembers(Arrays.asList(
+                new LeaveGroupResponseData.MemberResponse()
+                    .setGroupInstanceId("leader-instance-id"),
+                new LeaveGroupResponseData.MemberResponse()
+                    .setGroupInstanceId("follower-instance-id")
+                    .setMemberId("invalid-member-id")
+                    .setErrorCode(Errors.FENCED_INSTANCE_ID.code())));
+
+        assertEquals(expectedResponse, leaveResult.response());
+    }
+
+    @Test
+    public void testStaticMembersUnknownInstanceBatchLeaveGroup() throws 
Exception {
+        GroupMetadataManagerTestContext context = new 
GroupMetadataManagerTestContext.Builder()
+            .build();
+        context.staticMembersJoinAndRebalance(
+            "group-id",
+            "leader-instance-id",
+            "follower-instance-id"
+        );
+
+        CoordinatorResult<LeaveGroupResponseData, Record> leaveResult = 
context.sendGenericGroupLeave(
+            new LeaveGroupRequestData()
+                .setGroupId("group-id")
+                .setMembers(
+                    Arrays.asList(
+                        new MemberIdentity()
+                            .setGroupInstanceId("unknown-instance-id"), // 
Unknown instance id
+                        new MemberIdentity()
+                            .setGroupInstanceId("follower-instance-id")
+                    )
+                )
+        );
+
+        LeaveGroupResponseData expectedResponse = new LeaveGroupResponseData()
+            .setMembers(Arrays.asList(
+                new LeaveGroupResponseData.MemberResponse()
+                    .setGroupInstanceId("unknown-instance-id")
+                    .setErrorCode(Errors.UNKNOWN_MEMBER_ID.code()),
+                new LeaveGroupResponseData.MemberResponse()
+                    .setGroupInstanceId("follower-instance-id")));
+
+        assertEquals(expectedResponse, leaveResult.response());
+        assertTrue(leaveResult.records().isEmpty());
+    }
+
+    @Test
+    public void testPendingMemberBatchLeaveGroup() throws Exception {
+        GroupMetadataManagerTestContext context = new 
GroupMetadataManagerTestContext.Builder()
+            .build();
+        GenericGroup group = context.createGenericGroup("group-id");
+        JoinGroupResponseData pendingJoinResponse = 
context.setupGroupWithPendingMember(group).pendingMemberResponse;
+
+        CoordinatorResult<LeaveGroupResponseData, Record> leaveResult = 
context.sendGenericGroupLeave(
+            new LeaveGroupRequestData()
+                .setGroupId("group-id")
+                .setMembers(
+                    Arrays.asList(
+                        new MemberIdentity()
+                            .setGroupInstanceId("unknown-instance-id"), // 
Unknown instance id
+                        new MemberIdentity()
+                            .setMemberId(pendingJoinResponse.memberId())
+                    )
+                )
+        );
+
+        LeaveGroupResponseData expectedResponse = new LeaveGroupResponseData()
+            .setMembers(Arrays.asList(
+                new LeaveGroupResponseData.MemberResponse()
+                    .setGroupInstanceId("unknown-instance-id")
+                    .setErrorCode(Errors.UNKNOWN_MEMBER_ID.code()),
+                new LeaveGroupResponseData.MemberResponse()
+                    .setGroupInstanceId(null)
+                    .setMemberId(pendingJoinResponse.memberId())));
+
+        assertEquals(expectedResponse, leaveResult.response());
+    }
+
+    @Test
+    public void testJoinedMemberPendingMemberBatchLeaveGroup() throws 
Exception {
+        GroupMetadataManagerTestContext context = new 
GroupMetadataManagerTestContext.Builder()
+            .build();
+        GenericGroup group = context.createGenericGroup("group-id");
+        PendingMemberGroupResult pendingMemberGroupResult = 
context.setupGroupWithPendingMember(group);
+
+        CoordinatorResult<LeaveGroupResponseData, Record> leaveResult = 
context.sendGenericGroupLeave(
+            new LeaveGroupRequestData()
+                .setGroupId("group-id")
+                .setMembers(
+                    Arrays.asList(
+                        new MemberIdentity()
+                            .setMemberId(pendingMemberGroupResult.leaderId),
+                        new MemberIdentity()
+                            .setMemberId(pendingMemberGroupResult.followerId),
+                        new MemberIdentity()
+                            
.setMemberId(pendingMemberGroupResult.pendingMemberResponse.memberId())
+                    )
+                )
+        );
+
+        LeaveGroupResponseData expectedResponse = new LeaveGroupResponseData()
+            .setMembers(Arrays.asList(
+                new LeaveGroupResponseData.MemberResponse()
+                    .setGroupInstanceId(null)
+                    .setMemberId(pendingMemberGroupResult.leaderId),
+                new LeaveGroupResponseData.MemberResponse()
+                    .setGroupInstanceId(null)
+                    .setMemberId(pendingMemberGroupResult.followerId),
+                new LeaveGroupResponseData.MemberResponse()
+                    .setGroupInstanceId(null)
+                    
.setMemberId(pendingMemberGroupResult.pendingMemberResponse.memberId())));
+
+        assertEquals(expectedResponse, leaveResult.response());
+    }
+
+    @Test
+    public void 
testJoinedMemberPendingMemberBatchLeaveGroupWithUnknownMember() throws 
Exception {
+        GroupMetadataManagerTestContext context = new 
GroupMetadataManagerTestContext.Builder()
+            .build();
+        GenericGroup group = context.createGenericGroup("group-id");
+        PendingMemberGroupResult pendingMemberGroupResult = 
context.setupGroupWithPendingMember(group);
+
+        CoordinatorResult<LeaveGroupResponseData, Record> leaveResult = 
context.sendGenericGroupLeave(
+            new LeaveGroupRequestData()
+                .setGroupId("group-id")
+                .setMembers(
+                    Arrays.asList(
+                        new MemberIdentity()
+                            .setMemberId(pendingMemberGroupResult.leaderId),
+                        new MemberIdentity()
+                            .setMemberId(pendingMemberGroupResult.followerId),
+                        new MemberIdentity()
+                            
.setMemberId(pendingMemberGroupResult.pendingMemberResponse.memberId()),
+                        new MemberIdentity()
+                            .setMemberId("unknown-member-id")
+                    )
+                )
+        );
+
+        LeaveGroupResponseData expectedResponse = new LeaveGroupResponseData()
+            .setMembers(Arrays.asList(
+                new LeaveGroupResponseData.MemberResponse()
+                    .setGroupInstanceId(null)
+                    .setMemberId(pendingMemberGroupResult.leaderId),
+                new LeaveGroupResponseData.MemberResponse()
+                    .setGroupInstanceId(null)
+                    .setMemberId(pendingMemberGroupResult.followerId),
+                new LeaveGroupResponseData.MemberResponse()
+                    .setGroupInstanceId(null)
+                    
.setMemberId(pendingMemberGroupResult.pendingMemberResponse.memberId()),
+                new LeaveGroupResponseData.MemberResponse()
+                    .setGroupInstanceId(null)
+                    .setMemberId("unknown-member-id")
+                    .setErrorCode(Errors.UNKNOWN_MEMBER_ID.code())));
+
+        assertEquals(expectedResponse, leaveResult.response());
+    }
+
     private static void assertNoOrEmptyResult(List<ExpiredTimeout<Void, 
Record>> timeouts) {
         assertTrue(timeouts.size() <= 1);
         timeouts.forEach(timeout -> assertEquals(EMPTY_RESULT, 
timeout.result));
@@ -8970,6 +9545,22 @@ public class GroupMetadataManagerTest {
         }
     }
 
+    private static class PendingMemberGroupResult {
+        String leaderId;
+        String followerId;
+        JoinGroupResponseData pendingMemberResponse;
+
+        public PendingMemberGroupResult(
+            String leaderId,
+            String followerId,
+            JoinGroupResponseData pendingMemberResponse
+        ) {
+            this.leaderId = leaderId;
+            this.followerId = followerId;
+            this.pendingMemberResponse = pendingMemberResponse;
+        }
+    }
+
     private static class JoinResult {
         CompletableFuture<JoinGroupResponseData> joinFuture;
         List<Record> records;

Reply via email to