dajac commented on code in PR #14147:
URL: https://github.com/apache/kafka/pull/14147#discussion_r1322938184


##########
group-coordinator/src/main/java/org/apache/kafka/coordinator/group/GroupMetadataManager.java:
##########
@@ -2904,6 +2908,147 @@ private void validateGenericGroupHeartbeat(
         }
     }
 
+    /**
+     * Handle a generic LeaveGroupRequest.
+     *
+     * @param context        The request context.
+     * @param request        The actual LeaveGroup request.
+     *
+     * @return The LeaveGroup response and the GroupMetadata record to append 
if the group
+     *         no longer has any members.
+     */
+    public CoordinatorResult<LeaveGroupResponseData, Record> genericGroupLeave(
+        RequestContext context,
+        LeaveGroupRequestData request
+    ) throws UnknownMemberIdException, GroupIdNotFoundException {
+        GenericGroup group = getOrMaybeCreateGenericGroup(request.groupId(), 
false);
+        if (group.isInState(DEAD)) {
+            return new CoordinatorResult<>(
+                Collections.emptyList(),
+                new LeaveGroupResponseData()
+                    .setErrorCode(COORDINATOR_NOT_AVAILABLE.code())
+            );
+        }
+
+        List<MemberResponse> memberResponses = new ArrayList<>();
+
+        for (MemberIdentity member: request.members()) {
+            String reason = member.reason() != null ? member.reason() : "not 
provided";
+            // The LeaveGroup API allows administrative removal of members by 
GroupInstanceId
+            // in which case we expect the MemberId to be undefined.
+            if (UNKNOWN_MEMBER_ID.equals(member.memberId())) {
+                if (member.groupInstanceId() != null && 
group.hasStaticMember(member.groupInstanceId())) {
+                    removeCurrentMemberFromGenericGroup(
+                        group,
+                        group.staticMemberId(member.groupInstanceId()),
+                        reason
+                    );
+                    memberResponses.add(
+                        new MemberResponse()
+                            .setMemberId(member.memberId())
+                            .setGroupInstanceId(member.groupInstanceId())
+                    );
+                } else {
+                    memberResponses.add(
+                        new MemberResponse()
+                            .setMemberId(member.memberId())
+                            .setGroupInstanceId(member.groupInstanceId())
+                            .setErrorCode(Errors.UNKNOWN_MEMBER_ID.code())
+                    );
+                }
+            } else if (group.isPendingMember(member.memberId())) {
+                group.remove(member.memberId());
+                timer.cancel(genericGroupHeartbeatKey(group.groupId(), 
member.memberId()));
+                log.info("[Group {}] Pending member {} has left group through 
explicit `LeaveGroup` request; client  reason: {}",

Review Comment:
   nit: There is an extra space before `reason`.



##########
group-coordinator/src/main/java/org/apache/kafka/coordinator/group/GroupMetadataManager.java:
##########
@@ -2904,6 +2908,147 @@ private void validateGenericGroupHeartbeat(
         }
     }
 
+    /**
+     * Handle a generic LeaveGroupRequest.
+     *
+     * @param context        The request context.
+     * @param request        The actual LeaveGroup request.
+     *
+     * @return The LeaveGroup response and the GroupMetadata record to append 
if the group
+     *         no longer has any members.
+     */
+    public CoordinatorResult<LeaveGroupResponseData, Record> genericGroupLeave(
+        RequestContext context,
+        LeaveGroupRequestData request
+    ) throws UnknownMemberIdException, GroupIdNotFoundException {
+        GenericGroup group = getOrMaybeCreateGenericGroup(request.groupId(), 
false);
+        if (group.isInState(DEAD)) {
+            return new CoordinatorResult<>(
+                Collections.emptyList(),
+                new LeaveGroupResponseData()
+                    .setErrorCode(COORDINATOR_NOT_AVAILABLE.code())
+            );
+        }
+
+        List<MemberResponse> memberResponses = new ArrayList<>();
+
+        for (MemberIdentity member: request.members()) {
+            String reason = member.reason() != null ? member.reason() : "not 
provided";
+            // The LeaveGroup API allows administrative removal of members by 
GroupInstanceId
+            // in which case we expect the MemberId to be undefined.
+            if (UNKNOWN_MEMBER_ID.equals(member.memberId())) {
+                if (member.groupInstanceId() != null && 
group.hasStaticMember(member.groupInstanceId())) {
+                    removeCurrentMemberFromGenericGroup(
+                        group,
+                        group.staticMemberId(member.groupInstanceId()),
+                        reason
+                    );
+                    memberResponses.add(
+                        new MemberResponse()
+                            .setMemberId(member.memberId())
+                            .setGroupInstanceId(member.groupInstanceId())
+                    );
+                } else {
+                    memberResponses.add(
+                        new MemberResponse()
+                            .setMemberId(member.memberId())
+                            .setGroupInstanceId(member.groupInstanceId())
+                            .setErrorCode(Errors.UNKNOWN_MEMBER_ID.code())
+                    );
+                }
+            } else if (group.isPendingMember(member.memberId())) {
+                group.remove(member.memberId());
+                timer.cancel(genericGroupHeartbeatKey(group.groupId(), 
member.memberId()));
+                log.info("[Group {}] Pending member {} has left group through 
explicit `LeaveGroup` request; client  reason: {}",
+                    group.groupId(), member.memberId(), reason);
+
+                memberResponses.add(
+                    new MemberResponse()
+                        .setMemberId(member.memberId())
+                        .setGroupInstanceId(member.groupInstanceId())
+                );
+            } else {
+                try {
+                    group.validateMember(member.memberId(), 
member.groupInstanceId(), "leave-group");
+                    removeCurrentMemberFromGenericGroup(
+                        group,
+                        member.memberId(),
+                        reason
+                    );
+                    memberResponses.add(
+                        new MemberResponse()
+                            .setMemberId(member.memberId())
+                            .setGroupInstanceId(member.groupInstanceId())
+                    );
+                } catch (KafkaException e) {
+                    memberResponses.add(
+                        new MemberResponse()
+                            .setMemberId(member.memberId())
+                            .setGroupInstanceId(member.groupInstanceId())
+                            .setErrorCode(Errors.forException(e).code())
+                    );
+                }
+            }
+        }
+
+        List<String> validLeaveGroupMembers = memberResponses.stream()
+            .filter(response -> response.errorCode() == 0)

Review Comment:
   nit: Errors.NONE.code()?



##########
group-coordinator/src/test/java/org/apache/kafka/coordinator/group/GroupMetadataManagerTest.java:
##########
@@ -8747,6 +8776,563 @@ private static void checkJoinGroupResponse(
         assertEquals(expectedGroupInstanceIds, groupInstanceIds);
     }
 
+    @Test
+    public void testGroupStuckInRebalanceTimeoutDueToNonjoinedStaticMember() 
throws Exception {
+        GroupMetadataManagerTestContext context = new 
GroupMetadataManagerTestContext.Builder()
+            .build();
+        int longSessionTimeoutMs = 10000;
+        int rebalanceTimeoutMs = 5000;
+        RebalanceResult rebalanceResult = 
context.staticMembersJoinAndRebalance(
+            "group-id",
+            "leader-instance-id",
+            "follower-instance-id",
+            rebalanceTimeoutMs,
+            longSessionTimeoutMs
+        );
+        GenericGroup group = 
context.groupMetadataManager.getOrMaybeCreateGenericGroup("group-id", false);
+
+        // New member joins
+        JoinResult joinResult = context.sendGenericGroupJoin(
+            new JoinGroupRequestBuilder()
+                .withGroupId("group-id")
+                .withMemberId(UNKNOWN_MEMBER_ID)
+                .withProtocolSuperset()
+                .withSessionTimeoutMs(longSessionTimeoutMs)
+                .build()
+        );
+
+        // The new dynamic member has been elected as leader
+        assertNoOrEmptyResult(context.sleep(rebalanceTimeoutMs));
+        assertTrue(joinResult.joinFuture.isDone());
+        assertEquals(Errors.NONE.code(), 
joinResult.joinFuture.get().errorCode());
+        assertEquals(joinResult.joinFuture.get().leader(), 
joinResult.joinFuture.get().memberId());
+        assertEquals(3, joinResult.joinFuture.get().members().size());
+        assertEquals(2, joinResult.joinFuture.get().generationId());
+        assertTrue(group.isInState(COMPLETING_REBALANCE));
+
+        assertEquals(
+            mkSet(rebalanceResult.leaderId, rebalanceResult.followerId, 
joinResult.joinFuture.get().memberId()),
+            group.allMemberIds()
+        );
+        assertEquals(
+            mkSet(rebalanceResult.leaderId, rebalanceResult.followerId),
+            group.allStaticMemberIds()
+        );
+        assertEquals(
+            mkSet(joinResult.joinFuture.get().memberId()),
+            group.allDynamicMemberIds()
+        );
+
+        // Send a special leave group request from static follower, moving 
group towards PreparingRebalance
+        CoordinatorResult<LeaveGroupResponseData, Record> leaveResult = 
context.sendGenericGroupLeave(
+            new LeaveGroupRequestData()
+                .setGroupId("group-id")
+                .setMembers(Collections.singletonList(
+                    new MemberIdentity()
+                        .setMemberId(rebalanceResult.followerId)
+                        .setGroupInstanceId("follower-instance-id")
+                ))
+        );
+
+        LeaveGroupResponseData expectedResponse = new LeaveGroupResponseData()
+            .setMembers(Collections.singletonList(
+                new LeaveGroupResponseData.MemberResponse()
+                    .setMemberId(rebalanceResult.followerId)
+                    .setGroupInstanceId("follower-instance-id")));
+
+        assertEquals(expectedResponse, leaveResult.response());
+        assertTrue(group.isInState(PREPARING_REBALANCE));
+
+        context.sleep(rebalanceTimeoutMs);
+        // Only static leader is maintained, and group is stuck at 
PreparingRebalance stage
+        assertTrue(group.allDynamicMemberIds().isEmpty());
+        assertEquals(Collections.singleton(rebalanceResult.leaderId), 
group.allMemberIds());
+        assertTrue(group.allDynamicMemberIds().isEmpty());
+        assertEquals(2, group.generationId());
+        assertTrue(group.isInState(PREPARING_REBALANCE));
+    }
+
+    @Test
+    public void testPendingMembersLeaveGroup() throws Exception {
+        GroupMetadataManagerTestContext context = new 
GroupMetadataManagerTestContext.Builder()
+            .build();
+        GenericGroup group = context.createGenericGroup("group-id");
+        JoinGroupResponseData pendingJoinResponse = 
context.setupGroupWithPendingMember(group).pendingMemberResponse;
+
+        CoordinatorResult<LeaveGroupResponseData, Record> leaveResult = 
context.sendGenericGroupLeave(
+            new LeaveGroupRequestData()
+                .setGroupId("group-id")
+                .setMembers(Collections.singletonList(
+                    new MemberIdentity()
+                        .setMemberId(pendingJoinResponse.memberId())
+                ))
+        );
+
+        LeaveGroupResponseData expectedResponse = new LeaveGroupResponseData()
+            .setMembers(Collections.singletonList(
+                new LeaveGroupResponseData.MemberResponse()
+                    .setGroupInstanceId(null)
+                    .setMemberId(pendingJoinResponse.memberId())));
+
+        assertEquals(expectedResponse, leaveResult.response());
+        assertTrue(leaveResult.records().isEmpty());
+
+        assertTrue(group.isInState(COMPLETING_REBALANCE));
+        assertEquals(2, group.allMembers().size());
+        assertEquals(2, group.allDynamicMemberIds().size());
+        assertEquals(0, group.numPendingJoinMembers());
+    }
+
+    @Test
+    public void testLeaveGroupInvalidGroup() {
+        GroupMetadataManagerTestContext context = new 
GroupMetadataManagerTestContext.Builder()
+            .build();
+        context.createGenericGroup("group-id");
+
+        assertThrows(UnknownMemberIdException.class, () -> 
context.sendGenericGroupLeave(
+            new LeaveGroupRequestData()
+                .setGroupId("invalid-group-id")
+        ));
+    }
+
+    @Test
+    public void testLeaveGroupUnknownGroup() {

Review Comment:
   nit: UnknownMemnberId? The group seems to be correct here.



##########
group-coordinator/src/test/java/org/apache/kafka/coordinator/group/GroupMetadataManagerTest.java:
##########
@@ -8747,6 +8776,563 @@ private static void checkJoinGroupResponse(
         assertEquals(expectedGroupInstanceIds, groupInstanceIds);
     }
 
+    @Test
+    public void testGroupStuckInRebalanceTimeoutDueToNonjoinedStaticMember() 
throws Exception {
+        GroupMetadataManagerTestContext context = new 
GroupMetadataManagerTestContext.Builder()
+            .build();
+        int longSessionTimeoutMs = 10000;
+        int rebalanceTimeoutMs = 5000;
+        RebalanceResult rebalanceResult = 
context.staticMembersJoinAndRebalance(
+            "group-id",
+            "leader-instance-id",
+            "follower-instance-id",
+            rebalanceTimeoutMs,
+            longSessionTimeoutMs
+        );
+        GenericGroup group = 
context.groupMetadataManager.getOrMaybeCreateGenericGroup("group-id", false);
+
+        // New member joins
+        JoinResult joinResult = context.sendGenericGroupJoin(
+            new JoinGroupRequestBuilder()
+                .withGroupId("group-id")
+                .withMemberId(UNKNOWN_MEMBER_ID)
+                .withProtocolSuperset()
+                .withSessionTimeoutMs(longSessionTimeoutMs)
+                .build()
+        );
+
+        // The new dynamic member has been elected as leader
+        assertNoOrEmptyResult(context.sleep(rebalanceTimeoutMs));
+        assertTrue(joinResult.joinFuture.isDone());
+        assertEquals(Errors.NONE.code(), 
joinResult.joinFuture.get().errorCode());
+        assertEquals(joinResult.joinFuture.get().leader(), 
joinResult.joinFuture.get().memberId());
+        assertEquals(3, joinResult.joinFuture.get().members().size());
+        assertEquals(2, joinResult.joinFuture.get().generationId());
+        assertTrue(group.isInState(COMPLETING_REBALANCE));
+
+        assertEquals(
+            mkSet(rebalanceResult.leaderId, rebalanceResult.followerId, 
joinResult.joinFuture.get().memberId()),
+            group.allMemberIds()
+        );
+        assertEquals(
+            mkSet(rebalanceResult.leaderId, rebalanceResult.followerId),
+            group.allStaticMemberIds()
+        );
+        assertEquals(
+            mkSet(joinResult.joinFuture.get().memberId()),
+            group.allDynamicMemberIds()
+        );
+
+        // Send a special leave group request from static follower, moving 
group towards PreparingRebalance
+        CoordinatorResult<LeaveGroupResponseData, Record> leaveResult = 
context.sendGenericGroupLeave(
+            new LeaveGroupRequestData()
+                .setGroupId("group-id")
+                .setMembers(Collections.singletonList(
+                    new MemberIdentity()
+                        .setMemberId(rebalanceResult.followerId)
+                        .setGroupInstanceId("follower-instance-id")
+                ))
+        );
+
+        LeaveGroupResponseData expectedResponse = new LeaveGroupResponseData()
+            .setMembers(Collections.singletonList(
+                new LeaveGroupResponseData.MemberResponse()
+                    .setMemberId(rebalanceResult.followerId)
+                    .setGroupInstanceId("follower-instance-id")));
+
+        assertEquals(expectedResponse, leaveResult.response());
+        assertTrue(group.isInState(PREPARING_REBALANCE));
+
+        context.sleep(rebalanceTimeoutMs);
+        // Only static leader is maintained, and group is stuck at 
PreparingRebalance stage
+        assertTrue(group.allDynamicMemberIds().isEmpty());
+        assertEquals(Collections.singleton(rebalanceResult.leaderId), 
group.allMemberIds());
+        assertTrue(group.allDynamicMemberIds().isEmpty());
+        assertEquals(2, group.generationId());
+        assertTrue(group.isInState(PREPARING_REBALANCE));
+    }
+
+    @Test
+    public void testPendingMembersLeaveGroup() throws Exception {
+        GroupMetadataManagerTestContext context = new 
GroupMetadataManagerTestContext.Builder()
+            .build();
+        GenericGroup group = context.createGenericGroup("group-id");
+        JoinGroupResponseData pendingJoinResponse = 
context.setupGroupWithPendingMember(group).pendingMemberResponse;
+
+        CoordinatorResult<LeaveGroupResponseData, Record> leaveResult = 
context.sendGenericGroupLeave(
+            new LeaveGroupRequestData()
+                .setGroupId("group-id")
+                .setMembers(Collections.singletonList(
+                    new MemberIdentity()
+                        .setMemberId(pendingJoinResponse.memberId())
+                ))
+        );
+
+        LeaveGroupResponseData expectedResponse = new LeaveGroupResponseData()
+            .setMembers(Collections.singletonList(
+                new LeaveGroupResponseData.MemberResponse()
+                    .setGroupInstanceId(null)
+                    .setMemberId(pendingJoinResponse.memberId())));
+
+        assertEquals(expectedResponse, leaveResult.response());
+        assertTrue(leaveResult.records().isEmpty());
+
+        assertTrue(group.isInState(COMPLETING_REBALANCE));
+        assertEquals(2, group.allMembers().size());
+        assertEquals(2, group.allDynamicMemberIds().size());
+        assertEquals(0, group.numPendingJoinMembers());
+    }
+
+    @Test
+    public void testLeaveGroupInvalidGroup() {
+        GroupMetadataManagerTestContext context = new 
GroupMetadataManagerTestContext.Builder()
+            .build();
+        context.createGenericGroup("group-id");
+
+        assertThrows(UnknownMemberIdException.class, () -> 
context.sendGenericGroupLeave(
+            new LeaveGroupRequestData()
+                .setGroupId("invalid-group-id")
+        ));
+    }
+
+    @Test
+    public void testLeaveGroupUnknownGroup() {
+        GroupMetadataManagerTestContext context = new 
GroupMetadataManagerTestContext.Builder()
+            .build();
+        context.createGenericGroup("group-id");
+
+        CoordinatorResult<LeaveGroupResponseData, Record> leaveResult = 
context.sendGenericGroupLeave(
+            new LeaveGroupRequestData()
+                .setGroupId("group-id")
+                .setMembers(Collections.singletonList(
+                    new MemberIdentity()
+                        .setMemberId("member-id")
+                ))
+        );
+
+        LeaveGroupResponseData expectedResponse = new LeaveGroupResponseData()
+            .setMembers(Collections.singletonList(
+                new LeaveGroupResponseData.MemberResponse()
+                    .setGroupInstanceId(null)
+                    .setMemberId("member-id")
+                    .setErrorCode(Errors.UNKNOWN_MEMBER_ID.code())));
+
+        assertEquals(expectedResponse, leaveResult.response());
+        assertTrue(leaveResult.records().isEmpty());
+    }
+
+    @Test
+    public void testLeaveGroupUnknownConsumerExistingGroup() throws Exception {

Review Comment:
   This one looks pretty similar to the previous one, no?



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: jira-unsubscr...@kafka.apache.org

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org

Reply via email to