dajac commented on code in PR #14147: URL: https://github.com/apache/kafka/pull/14147#discussion_r1322938184
########## group-coordinator/src/main/java/org/apache/kafka/coordinator/group/GroupMetadataManager.java: ########## @@ -2904,6 +2908,147 @@ private void validateGenericGroupHeartbeat( } } + /** + * Handle a generic LeaveGroupRequest. + * + * @param context The request context. + * @param request The actual LeaveGroup request. + * + * @return The LeaveGroup response and the GroupMetadata record to append if the group + * no longer has any members. + */ + public CoordinatorResult<LeaveGroupResponseData, Record> genericGroupLeave( + RequestContext context, + LeaveGroupRequestData request + ) throws UnknownMemberIdException, GroupIdNotFoundException { + GenericGroup group = getOrMaybeCreateGenericGroup(request.groupId(), false); + if (group.isInState(DEAD)) { + return new CoordinatorResult<>( + Collections.emptyList(), + new LeaveGroupResponseData() + .setErrorCode(COORDINATOR_NOT_AVAILABLE.code()) + ); + } + + List<MemberResponse> memberResponses = new ArrayList<>(); + + for (MemberIdentity member: request.members()) { + String reason = member.reason() != null ? member.reason() : "not provided"; + // The LeaveGroup API allows administrative removal of members by GroupInstanceId + // in which case we expect the MemberId to be undefined. + if (UNKNOWN_MEMBER_ID.equals(member.memberId())) { + if (member.groupInstanceId() != null && group.hasStaticMember(member.groupInstanceId())) { + removeCurrentMemberFromGenericGroup( + group, + group.staticMemberId(member.groupInstanceId()), + reason + ); + memberResponses.add( + new MemberResponse() + .setMemberId(member.memberId()) + .setGroupInstanceId(member.groupInstanceId()) + ); + } else { + memberResponses.add( + new MemberResponse() + .setMemberId(member.memberId()) + .setGroupInstanceId(member.groupInstanceId()) + .setErrorCode(Errors.UNKNOWN_MEMBER_ID.code()) + ); + } + } else if (group.isPendingMember(member.memberId())) { + group.remove(member.memberId()); + timer.cancel(genericGroupHeartbeatKey(group.groupId(), member.memberId())); + log.info("[Group {}] Pending member {} has left group through explicit `LeaveGroup` request; client reason: {}", Review Comment: nit: There is an extra space before `reason`. ########## group-coordinator/src/main/java/org/apache/kafka/coordinator/group/GroupMetadataManager.java: ########## @@ -2904,6 +2908,147 @@ private void validateGenericGroupHeartbeat( } } + /** + * Handle a generic LeaveGroupRequest. + * + * @param context The request context. + * @param request The actual LeaveGroup request. + * + * @return The LeaveGroup response and the GroupMetadata record to append if the group + * no longer has any members. + */ + public CoordinatorResult<LeaveGroupResponseData, Record> genericGroupLeave( + RequestContext context, + LeaveGroupRequestData request + ) throws UnknownMemberIdException, GroupIdNotFoundException { + GenericGroup group = getOrMaybeCreateGenericGroup(request.groupId(), false); + if (group.isInState(DEAD)) { + return new CoordinatorResult<>( + Collections.emptyList(), + new LeaveGroupResponseData() + .setErrorCode(COORDINATOR_NOT_AVAILABLE.code()) + ); + } + + List<MemberResponse> memberResponses = new ArrayList<>(); + + for (MemberIdentity member: request.members()) { + String reason = member.reason() != null ? member.reason() : "not provided"; + // The LeaveGroup API allows administrative removal of members by GroupInstanceId + // in which case we expect the MemberId to be undefined. + if (UNKNOWN_MEMBER_ID.equals(member.memberId())) { + if (member.groupInstanceId() != null && group.hasStaticMember(member.groupInstanceId())) { + removeCurrentMemberFromGenericGroup( + group, + group.staticMemberId(member.groupInstanceId()), + reason + ); + memberResponses.add( + new MemberResponse() + .setMemberId(member.memberId()) + .setGroupInstanceId(member.groupInstanceId()) + ); + } else { + memberResponses.add( + new MemberResponse() + .setMemberId(member.memberId()) + .setGroupInstanceId(member.groupInstanceId()) + .setErrorCode(Errors.UNKNOWN_MEMBER_ID.code()) + ); + } + } else if (group.isPendingMember(member.memberId())) { + group.remove(member.memberId()); + timer.cancel(genericGroupHeartbeatKey(group.groupId(), member.memberId())); + log.info("[Group {}] Pending member {} has left group through explicit `LeaveGroup` request; client reason: {}", + group.groupId(), member.memberId(), reason); + + memberResponses.add( + new MemberResponse() + .setMemberId(member.memberId()) + .setGroupInstanceId(member.groupInstanceId()) + ); + } else { + try { + group.validateMember(member.memberId(), member.groupInstanceId(), "leave-group"); + removeCurrentMemberFromGenericGroup( + group, + member.memberId(), + reason + ); + memberResponses.add( + new MemberResponse() + .setMemberId(member.memberId()) + .setGroupInstanceId(member.groupInstanceId()) + ); + } catch (KafkaException e) { + memberResponses.add( + new MemberResponse() + .setMemberId(member.memberId()) + .setGroupInstanceId(member.groupInstanceId()) + .setErrorCode(Errors.forException(e).code()) + ); + } + } + } + + List<String> validLeaveGroupMembers = memberResponses.stream() + .filter(response -> response.errorCode() == 0) Review Comment: nit: Errors.NONE.code()? ########## group-coordinator/src/test/java/org/apache/kafka/coordinator/group/GroupMetadataManagerTest.java: ########## @@ -8747,6 +8776,563 @@ private static void checkJoinGroupResponse( assertEquals(expectedGroupInstanceIds, groupInstanceIds); } + @Test + public void testGroupStuckInRebalanceTimeoutDueToNonjoinedStaticMember() throws Exception { + GroupMetadataManagerTestContext context = new GroupMetadataManagerTestContext.Builder() + .build(); + int longSessionTimeoutMs = 10000; + int rebalanceTimeoutMs = 5000; + RebalanceResult rebalanceResult = context.staticMembersJoinAndRebalance( + "group-id", + "leader-instance-id", + "follower-instance-id", + rebalanceTimeoutMs, + longSessionTimeoutMs + ); + GenericGroup group = context.groupMetadataManager.getOrMaybeCreateGenericGroup("group-id", false); + + // New member joins + JoinResult joinResult = context.sendGenericGroupJoin( + new JoinGroupRequestBuilder() + .withGroupId("group-id") + .withMemberId(UNKNOWN_MEMBER_ID) + .withProtocolSuperset() + .withSessionTimeoutMs(longSessionTimeoutMs) + .build() + ); + + // The new dynamic member has been elected as leader + assertNoOrEmptyResult(context.sleep(rebalanceTimeoutMs)); + assertTrue(joinResult.joinFuture.isDone()); + assertEquals(Errors.NONE.code(), joinResult.joinFuture.get().errorCode()); + assertEquals(joinResult.joinFuture.get().leader(), joinResult.joinFuture.get().memberId()); + assertEquals(3, joinResult.joinFuture.get().members().size()); + assertEquals(2, joinResult.joinFuture.get().generationId()); + assertTrue(group.isInState(COMPLETING_REBALANCE)); + + assertEquals( + mkSet(rebalanceResult.leaderId, rebalanceResult.followerId, joinResult.joinFuture.get().memberId()), + group.allMemberIds() + ); + assertEquals( + mkSet(rebalanceResult.leaderId, rebalanceResult.followerId), + group.allStaticMemberIds() + ); + assertEquals( + mkSet(joinResult.joinFuture.get().memberId()), + group.allDynamicMemberIds() + ); + + // Send a special leave group request from static follower, moving group towards PreparingRebalance + CoordinatorResult<LeaveGroupResponseData, Record> leaveResult = context.sendGenericGroupLeave( + new LeaveGroupRequestData() + .setGroupId("group-id") + .setMembers(Collections.singletonList( + new MemberIdentity() + .setMemberId(rebalanceResult.followerId) + .setGroupInstanceId("follower-instance-id") + )) + ); + + LeaveGroupResponseData expectedResponse = new LeaveGroupResponseData() + .setMembers(Collections.singletonList( + new LeaveGroupResponseData.MemberResponse() + .setMemberId(rebalanceResult.followerId) + .setGroupInstanceId("follower-instance-id"))); + + assertEquals(expectedResponse, leaveResult.response()); + assertTrue(group.isInState(PREPARING_REBALANCE)); + + context.sleep(rebalanceTimeoutMs); + // Only static leader is maintained, and group is stuck at PreparingRebalance stage + assertTrue(group.allDynamicMemberIds().isEmpty()); + assertEquals(Collections.singleton(rebalanceResult.leaderId), group.allMemberIds()); + assertTrue(group.allDynamicMemberIds().isEmpty()); + assertEquals(2, group.generationId()); + assertTrue(group.isInState(PREPARING_REBALANCE)); + } + + @Test + public void testPendingMembersLeaveGroup() throws Exception { + GroupMetadataManagerTestContext context = new GroupMetadataManagerTestContext.Builder() + .build(); + GenericGroup group = context.createGenericGroup("group-id"); + JoinGroupResponseData pendingJoinResponse = context.setupGroupWithPendingMember(group).pendingMemberResponse; + + CoordinatorResult<LeaveGroupResponseData, Record> leaveResult = context.sendGenericGroupLeave( + new LeaveGroupRequestData() + .setGroupId("group-id") + .setMembers(Collections.singletonList( + new MemberIdentity() + .setMemberId(pendingJoinResponse.memberId()) + )) + ); + + LeaveGroupResponseData expectedResponse = new LeaveGroupResponseData() + .setMembers(Collections.singletonList( + new LeaveGroupResponseData.MemberResponse() + .setGroupInstanceId(null) + .setMemberId(pendingJoinResponse.memberId()))); + + assertEquals(expectedResponse, leaveResult.response()); + assertTrue(leaveResult.records().isEmpty()); + + assertTrue(group.isInState(COMPLETING_REBALANCE)); + assertEquals(2, group.allMembers().size()); + assertEquals(2, group.allDynamicMemberIds().size()); + assertEquals(0, group.numPendingJoinMembers()); + } + + @Test + public void testLeaveGroupInvalidGroup() { + GroupMetadataManagerTestContext context = new GroupMetadataManagerTestContext.Builder() + .build(); + context.createGenericGroup("group-id"); + + assertThrows(UnknownMemberIdException.class, () -> context.sendGenericGroupLeave( + new LeaveGroupRequestData() + .setGroupId("invalid-group-id") + )); + } + + @Test + public void testLeaveGroupUnknownGroup() { Review Comment: nit: UnknownMemnberId? The group seems to be correct here. ########## group-coordinator/src/test/java/org/apache/kafka/coordinator/group/GroupMetadataManagerTest.java: ########## @@ -8747,6 +8776,563 @@ private static void checkJoinGroupResponse( assertEquals(expectedGroupInstanceIds, groupInstanceIds); } + @Test + public void testGroupStuckInRebalanceTimeoutDueToNonjoinedStaticMember() throws Exception { + GroupMetadataManagerTestContext context = new GroupMetadataManagerTestContext.Builder() + .build(); + int longSessionTimeoutMs = 10000; + int rebalanceTimeoutMs = 5000; + RebalanceResult rebalanceResult = context.staticMembersJoinAndRebalance( + "group-id", + "leader-instance-id", + "follower-instance-id", + rebalanceTimeoutMs, + longSessionTimeoutMs + ); + GenericGroup group = context.groupMetadataManager.getOrMaybeCreateGenericGroup("group-id", false); + + // New member joins + JoinResult joinResult = context.sendGenericGroupJoin( + new JoinGroupRequestBuilder() + .withGroupId("group-id") + .withMemberId(UNKNOWN_MEMBER_ID) + .withProtocolSuperset() + .withSessionTimeoutMs(longSessionTimeoutMs) + .build() + ); + + // The new dynamic member has been elected as leader + assertNoOrEmptyResult(context.sleep(rebalanceTimeoutMs)); + assertTrue(joinResult.joinFuture.isDone()); + assertEquals(Errors.NONE.code(), joinResult.joinFuture.get().errorCode()); + assertEquals(joinResult.joinFuture.get().leader(), joinResult.joinFuture.get().memberId()); + assertEquals(3, joinResult.joinFuture.get().members().size()); + assertEquals(2, joinResult.joinFuture.get().generationId()); + assertTrue(group.isInState(COMPLETING_REBALANCE)); + + assertEquals( + mkSet(rebalanceResult.leaderId, rebalanceResult.followerId, joinResult.joinFuture.get().memberId()), + group.allMemberIds() + ); + assertEquals( + mkSet(rebalanceResult.leaderId, rebalanceResult.followerId), + group.allStaticMemberIds() + ); + assertEquals( + mkSet(joinResult.joinFuture.get().memberId()), + group.allDynamicMemberIds() + ); + + // Send a special leave group request from static follower, moving group towards PreparingRebalance + CoordinatorResult<LeaveGroupResponseData, Record> leaveResult = context.sendGenericGroupLeave( + new LeaveGroupRequestData() + .setGroupId("group-id") + .setMembers(Collections.singletonList( + new MemberIdentity() + .setMemberId(rebalanceResult.followerId) + .setGroupInstanceId("follower-instance-id") + )) + ); + + LeaveGroupResponseData expectedResponse = new LeaveGroupResponseData() + .setMembers(Collections.singletonList( + new LeaveGroupResponseData.MemberResponse() + .setMemberId(rebalanceResult.followerId) + .setGroupInstanceId("follower-instance-id"))); + + assertEquals(expectedResponse, leaveResult.response()); + assertTrue(group.isInState(PREPARING_REBALANCE)); + + context.sleep(rebalanceTimeoutMs); + // Only static leader is maintained, and group is stuck at PreparingRebalance stage + assertTrue(group.allDynamicMemberIds().isEmpty()); + assertEquals(Collections.singleton(rebalanceResult.leaderId), group.allMemberIds()); + assertTrue(group.allDynamicMemberIds().isEmpty()); + assertEquals(2, group.generationId()); + assertTrue(group.isInState(PREPARING_REBALANCE)); + } + + @Test + public void testPendingMembersLeaveGroup() throws Exception { + GroupMetadataManagerTestContext context = new GroupMetadataManagerTestContext.Builder() + .build(); + GenericGroup group = context.createGenericGroup("group-id"); + JoinGroupResponseData pendingJoinResponse = context.setupGroupWithPendingMember(group).pendingMemberResponse; + + CoordinatorResult<LeaveGroupResponseData, Record> leaveResult = context.sendGenericGroupLeave( + new LeaveGroupRequestData() + .setGroupId("group-id") + .setMembers(Collections.singletonList( + new MemberIdentity() + .setMemberId(pendingJoinResponse.memberId()) + )) + ); + + LeaveGroupResponseData expectedResponse = new LeaveGroupResponseData() + .setMembers(Collections.singletonList( + new LeaveGroupResponseData.MemberResponse() + .setGroupInstanceId(null) + .setMemberId(pendingJoinResponse.memberId()))); + + assertEquals(expectedResponse, leaveResult.response()); + assertTrue(leaveResult.records().isEmpty()); + + assertTrue(group.isInState(COMPLETING_REBALANCE)); + assertEquals(2, group.allMembers().size()); + assertEquals(2, group.allDynamicMemberIds().size()); + assertEquals(0, group.numPendingJoinMembers()); + } + + @Test + public void testLeaveGroupInvalidGroup() { + GroupMetadataManagerTestContext context = new GroupMetadataManagerTestContext.Builder() + .build(); + context.createGenericGroup("group-id"); + + assertThrows(UnknownMemberIdException.class, () -> context.sendGenericGroupLeave( + new LeaveGroupRequestData() + .setGroupId("invalid-group-id") + )); + } + + @Test + public void testLeaveGroupUnknownGroup() { + GroupMetadataManagerTestContext context = new GroupMetadataManagerTestContext.Builder() + .build(); + context.createGenericGroup("group-id"); + + CoordinatorResult<LeaveGroupResponseData, Record> leaveResult = context.sendGenericGroupLeave( + new LeaveGroupRequestData() + .setGroupId("group-id") + .setMembers(Collections.singletonList( + new MemberIdentity() + .setMemberId("member-id") + )) + ); + + LeaveGroupResponseData expectedResponse = new LeaveGroupResponseData() + .setMembers(Collections.singletonList( + new LeaveGroupResponseData.MemberResponse() + .setGroupInstanceId(null) + .setMemberId("member-id") + .setErrorCode(Errors.UNKNOWN_MEMBER_ID.code()))); + + assertEquals(expectedResponse, leaveResult.response()); + assertTrue(leaveResult.records().isEmpty()); + } + + @Test + public void testLeaveGroupUnknownConsumerExistingGroup() throws Exception { Review Comment: This one looks pretty similar to the previous one, no? -- This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. To unsubscribe, e-mail: jira-unsubscr...@kafka.apache.org For queries about this service, please contact Infrastructure at: us...@infra.apache.org