dajac commented on code in PR #14147: URL: https://github.com/apache/kafka/pull/14147#discussion_r1315455089
########## group-coordinator/src/main/java/org/apache/kafka/coordinator/group/GroupCoordinatorService.java: ########## @@ -411,9 +413,37 @@ public CompletableFuture<LeaveGroupResponseData> leaveGroup( return FutureUtils.failedFuture(Errors.COORDINATOR_NOT_AVAILABLE.exception()); } - return FutureUtils.failedFuture(Errors.UNSUPPORTED_VERSION.exception( - "This API is not implemented yet." - )); + if (!isGroupIdNotEmpty(request.groupId())) { + return CompletableFuture.completedFuture(new LeaveGroupResponseData() + .setErrorCode(Errors.INVALID_GROUP_ID.code())); + } + Review Comment: nit: Let's remove this empty line. ########## group-coordinator/src/main/java/org/apache/kafka/coordinator/group/GroupCoordinatorShard.java: ########## @@ -306,6 +308,22 @@ public CoordinatorResult<OffsetCommitResponseData, Record> commitOffset( return offsetMetadataManager.commitOffset(context, request); } + /** Review Comment: The javadoc is incorrect. ########## group-coordinator/src/main/java/org/apache/kafka/coordinator/group/GroupMetadataManager.java: ########## @@ -2904,6 +2908,135 @@ private void validateGenericGroupHeartbeat( } } + /** + * Handle a generic group LeaveGroup request. + * + * @param context The request context. + * @param request The actual LeaveGroup request. + * + * @return The LeaveGroup response and the GroupMetadata record to append if the group + * no longer has any member. + */ + public CoordinatorResult<LeaveGroupResponseData, Record> genericGroupLeave( + RequestContext context, + LeaveGroupRequestData request + ) throws UnknownMemberIdException, GroupIdNotFoundException { + GenericGroup group = getOrMaybeCreateGenericGroup(request.groupId(), false); + if (group.isInState(DEAD)) { + return new CoordinatorResult<>(Collections.emptyList(), + new LeaveGroupResponseData() + .setErrorCode(COORDINATOR_NOT_AVAILABLE.code()) + ); + } + + CoordinatorResult<Void, Record> coordinatorResult = EMPTY_RESULT; + List<MemberResponse> memberResponses = new ArrayList<>(); + + for (MemberIdentity member : request.members()) { + // The LeaveGroup API allows administrative removal of members by GroupInstanceId + // in which case we expect the MemberId to be undefined. + if (member.memberId().equals(UNKNOWN_MEMBER_ID)) { + if (member.groupInstanceId() != null && group.staticMemberId(member.groupInstanceId()) != null) { + coordinatorResult = removeCurrentMemberFromGenericGroup( + group, + group.staticMemberId(member.groupInstanceId()), + member.reason() + ); + memberResponses.add( + new MemberResponse() + .setMemberId(member.memberId()) + .setGroupInstanceId(member.groupInstanceId()) + ); + } else { + memberResponses.add( + new MemberResponse() + .setMemberId(member.memberId()) + .setGroupInstanceId(member.groupInstanceId()) + .setErrorCode(Errors.UNKNOWN_MEMBER_ID.code()) + ); + } + } else if (group.isPendingMember(member.memberId())) { + coordinatorResult = removePendingMemberAndUpdateGenericGroup(group, member.memberId()); + timer.cancel(genericGroupHeartbeatKey(group.groupId(), member.memberId())); + log.info("Pending member {} has left group {} through explicit `LeaveGroup` request.", + member.memberId(), group.groupId()); + + memberResponses.add( + new MemberResponse() + .setMemberId(member.memberId()) + .setGroupInstanceId(member.groupInstanceId()) + ); + } else { + try { + group.validateMember(member.memberId(), member.groupInstanceId(), "leave-group"); + coordinatorResult = removeCurrentMemberFromGenericGroup( Review Comment: The fact that we override `coordinatorResult` for each member worries me. Can't we have issues with this? For instance, let's say that the the first member is removed from the group and the group become empty. This generate a record. Now, the next member in the request is a pending member. Wouldn't we lose the previous record in this case? ########## group-coordinator/src/main/java/org/apache/kafka/coordinator/group/GroupMetadataManager.java: ########## @@ -2904,6 +2908,135 @@ private void validateGenericGroupHeartbeat( } } + /** + * Handle a generic group LeaveGroup request. + * + * @param context The request context. + * @param request The actual LeaveGroup request. + * + * @return The LeaveGroup response and the GroupMetadata record to append if the group + * no longer has any member. + */ + public CoordinatorResult<LeaveGroupResponseData, Record> genericGroupLeave( + RequestContext context, + LeaveGroupRequestData request + ) throws UnknownMemberIdException, GroupIdNotFoundException { + GenericGroup group = getOrMaybeCreateGenericGroup(request.groupId(), false); + if (group.isInState(DEAD)) { + return new CoordinatorResult<>(Collections.emptyList(), + new LeaveGroupResponseData() + .setErrorCode(COORDINATOR_NOT_AVAILABLE.code()) + ); + } + + CoordinatorResult<Void, Record> coordinatorResult = EMPTY_RESULT; + List<MemberResponse> memberResponses = new ArrayList<>(); + + for (MemberIdentity member : request.members()) { + // The LeaveGroup API allows administrative removal of members by GroupInstanceId + // in which case we expect the MemberId to be undefined. + if (member.memberId().equals(UNKNOWN_MEMBER_ID)) { + if (member.groupInstanceId() != null && group.staticMemberId(member.groupInstanceId()) != null) { + coordinatorResult = removeCurrentMemberFromGenericGroup( + group, + group.staticMemberId(member.groupInstanceId()), + member.reason() + ); + memberResponses.add( + new MemberResponse() + .setMemberId(member.memberId()) + .setGroupInstanceId(member.groupInstanceId()) + ); + } else { + memberResponses.add( + new MemberResponse() + .setMemberId(member.memberId()) + .setGroupInstanceId(member.groupInstanceId()) + .setErrorCode(Errors.UNKNOWN_MEMBER_ID.code()) + ); + } + } else if (group.isPendingMember(member.memberId())) { + coordinatorResult = removePendingMemberAndUpdateGenericGroup(group, member.memberId()); + timer.cancel(genericGroupHeartbeatKey(group.groupId(), member.memberId())); + log.info("Pending member {} has left group {} through explicit `LeaveGroup` request.", Review Comment: nit: Should we also log the reason here? ########## group-coordinator/src/main/java/org/apache/kafka/coordinator/group/GroupCoordinatorService.java: ########## @@ -411,9 +413,37 @@ public CompletableFuture<LeaveGroupResponseData> leaveGroup( return FutureUtils.failedFuture(Errors.COORDINATOR_NOT_AVAILABLE.exception()); } - return FutureUtils.failedFuture(Errors.UNSUPPORTED_VERSION.exception( - "This API is not implemented yet." - )); + if (!isGroupIdNotEmpty(request.groupId())) { + return CompletableFuture.completedFuture(new LeaveGroupResponseData() + .setErrorCode(Errors.INVALID_GROUP_ID.code())); + } + + + return runtime.scheduleWriteOperation("generic-group-leave", Review Comment: nit: Let's put `"generic-group-leave"` on a new line in order to remain consistent with the existing code. ########## group-coordinator/src/test/java/org/apache/kafka/coordinator/group/GroupMetadataManagerTest.java: ########## @@ -1099,6 +1101,42 @@ public List<JoinGroupResponseData> joinWithNMembers( return joinResponses; } + public CoordinatorResult<LeaveGroupResponseData, Record> sendGenericGroupLeave( + LeaveGroupRequestData request + ) { + RequestContext context = new RequestContext( + new RequestHeader( + ApiKeys.LEAVE_GROUP, + ApiKeys.LEAVE_GROUP.latestVersion(), + "client", + 0 + ), + "1", + InetAddress.getLoopbackAddress(), + KafkaPrincipal.ANONYMOUS, + ListenerName.forSecurityProtocol(SecurityProtocol.PLAINTEXT), + SecurityProtocol.PLAINTEXT, + ClientInformation.EMPTY, + false + ); + + return groupMetadataManager.genericGroupLeave(context, request); + } + + public void verifyLeaveGroupResponse( + LeaveGroupResponseData actualResponse, + Errors expectedTopLevelError, + List<Errors> expectedMemberLevelErrors + ) { + assertEquals(expectedTopLevelError.code(), actualResponse.errorCode()); + if (!expectedMemberLevelErrors.isEmpty()) { + assertEquals(expectedMemberLevelErrors.size(), actualResponse.members().size()); + for (int i = 0; i < expectedMemberLevelErrors.size(); i++) { + assertEquals(expectedMemberLevelErrors.get(i).code(), actualResponse.members().get(i).errorCode()); + } + } Review Comment: I am not a fan of this because we don't verify all the fields (e.g. member id) in the response. Should we generate the expected response and use `assertEquals(expected, actual)`? ########## group-coordinator/src/main/java/org/apache/kafka/coordinator/group/GroupMetadataManager.java: ########## @@ -2904,6 +2908,135 @@ private void validateGenericGroupHeartbeat( } } + /** + * Handle a generic group LeaveGroup request. + * + * @param context The request context. + * @param request The actual LeaveGroup request. + * + * @return The LeaveGroup response and the GroupMetadata record to append if the group + * no longer has any member. + */ + public CoordinatorResult<LeaveGroupResponseData, Record> genericGroupLeave( + RequestContext context, + LeaveGroupRequestData request + ) throws UnknownMemberIdException, GroupIdNotFoundException { + GenericGroup group = getOrMaybeCreateGenericGroup(request.groupId(), false); + if (group.isInState(DEAD)) { + return new CoordinatorResult<>(Collections.emptyList(), Review Comment: nit: Should we put `Collections.emptyList()` on a new line? ########## group-coordinator/src/test/java/org/apache/kafka/coordinator/group/GroupMetadataManagerTest.java: ########## @@ -8574,6 +8612,7 @@ public void testHeartbeatDuringRebalanceCausesRebalanceInProgress() throws Excep assertEquals(Errors.REBALANCE_IN_PROGRESS.code(), heartbeatResponse.errorCode()); } +<<<<<<< HEAD Review Comment: This should be removed. ########## group-coordinator/src/main/java/org/apache/kafka/coordinator/group/GroupMetadataManager.java: ########## @@ -2904,6 +2908,135 @@ private void validateGenericGroupHeartbeat( } } + /** + * Handle a generic group LeaveGroup request. + * + * @param context The request context. + * @param request The actual LeaveGroup request. + * + * @return The LeaveGroup response and the GroupMetadata record to append if the group + * no longer has any member. + */ + public CoordinatorResult<LeaveGroupResponseData, Record> genericGroupLeave( + RequestContext context, + LeaveGroupRequestData request + ) throws UnknownMemberIdException, GroupIdNotFoundException { + GenericGroup group = getOrMaybeCreateGenericGroup(request.groupId(), false); + if (group.isInState(DEAD)) { + return new CoordinatorResult<>(Collections.emptyList(), + new LeaveGroupResponseData() + .setErrorCode(COORDINATOR_NOT_AVAILABLE.code()) + ); + } + + CoordinatorResult<Void, Record> coordinatorResult = EMPTY_RESULT; + List<MemberResponse> memberResponses = new ArrayList<>(); + + for (MemberIdentity member : request.members()) { + // The LeaveGroup API allows administrative removal of members by GroupInstanceId + // in which case we expect the MemberId to be undefined. + if (member.memberId().equals(UNKNOWN_MEMBER_ID)) { + if (member.groupInstanceId() != null && group.staticMemberId(member.groupInstanceId()) != null) { + coordinatorResult = removeCurrentMemberFromGenericGroup( + group, + group.staticMemberId(member.groupInstanceId()), + member.reason() + ); + memberResponses.add( + new MemberResponse() + .setMemberId(member.memberId()) + .setGroupInstanceId(member.groupInstanceId()) + ); + } else { + memberResponses.add( + new MemberResponse() + .setMemberId(member.memberId()) + .setGroupInstanceId(member.groupInstanceId()) + .setErrorCode(Errors.UNKNOWN_MEMBER_ID.code()) + ); + } + } else if (group.isPendingMember(member.memberId())) { + coordinatorResult = removePendingMemberAndUpdateGenericGroup(group, member.memberId()); + timer.cancel(genericGroupHeartbeatKey(group.groupId(), member.memberId())); + log.info("Pending member {} has left group {} through explicit `LeaveGroup` request.", + member.memberId(), group.groupId()); + + memberResponses.add( + new MemberResponse() + .setMemberId(member.memberId()) + .setGroupInstanceId(member.groupInstanceId()) + ); + } else { + try { + group.validateMember(member.memberId(), member.groupInstanceId(), "leave-group"); + coordinatorResult = removeCurrentMemberFromGenericGroup( + group, + member.memberId(), + member.reason() + ); + memberResponses.add( + new MemberResponse() + .setMemberId(member.memberId()) + .setGroupInstanceId(member.groupInstanceId()) + ); + } catch (KafkaException e) { + memberResponses.add( + new MemberResponse() + .setMemberId(member.memberId()) + .setGroupInstanceId(member.groupInstanceId()) + .setErrorCode(Errors.forException(e).code()) + ); + } + } + } + return new CoordinatorResult<>( + coordinatorResult.records(), + new LeaveGroupResponseData() + .setMembers(memberResponses), + coordinatorResult.appendFuture() + ); + } + + /** + * Remove a member from the group. Cancel member's heartbeat, and prepare rebalance + * or complete the join phase if necessary. + * + * @param group The generic group. + * @param memberId The member id. + * @param reason The reason for the LeaveGroup request. + * + * @return The GroupMetadata record and the append future to be completed once the record is + * appended to the log (and replicated). + */ + private CoordinatorResult<Void, Record> removeCurrentMemberFromGenericGroup( + GenericGroup group, + String memberId, + String reason + ) { + GenericGroupMember member = group.member(memberId); + reason = reason != null ? reason : "not provided"; + timer.cancel(genericGroupHeartbeatKey(group.groupId(), memberId)); + log.info("[Group {}] Member {} has left group through explicit `LeaveGroup` request; client reason: {}", + group.groupId(), memberId, reason); + + group.completeJoinFuture(member, + new JoinGroupResponseData() + .setMemberId(UNKNOWN_MEMBER_ID) + .setErrorCode(Errors.UNKNOWN_MEMBER_ID.code()) + ); + group.remove(member.memberId()); + + switch (group.currentState()) { + case STABLE: + case COMPLETING_REBALANCE: + return maybePrepareRebalanceOrCompleteJoin(group, reason); + case PREPARING_REBALANCE: + timer.cancel(genericGroupJoinKey(group.groupId())); Review Comment: This does not seem correct to me. I think that we should cancel the timer only if we transition to the next state, no? ########## group-coordinator/src/main/java/org/apache/kafka/coordinator/group/GroupMetadataManager.java: ########## @@ -2904,6 +2908,135 @@ private void validateGenericGroupHeartbeat( } } + /** + * Handle a generic group LeaveGroup request. + * + * @param context The request context. + * @param request The actual LeaveGroup request. + * + * @return The LeaveGroup response and the GroupMetadata record to append if the group + * no longer has any member. + */ + public CoordinatorResult<LeaveGroupResponseData, Record> genericGroupLeave( + RequestContext context, + LeaveGroupRequestData request + ) throws UnknownMemberIdException, GroupIdNotFoundException { + GenericGroup group = getOrMaybeCreateGenericGroup(request.groupId(), false); + if (group.isInState(DEAD)) { + return new CoordinatorResult<>(Collections.emptyList(), + new LeaveGroupResponseData() + .setErrorCode(COORDINATOR_NOT_AVAILABLE.code()) + ); + } + + CoordinatorResult<Void, Record> coordinatorResult = EMPTY_RESULT; + List<MemberResponse> memberResponses = new ArrayList<>(); + + for (MemberIdentity member : request.members()) { + // The LeaveGroup API allows administrative removal of members by GroupInstanceId + // in which case we expect the MemberId to be undefined. + if (member.memberId().equals(UNKNOWN_MEMBER_ID)) { Review Comment: nit: Would it make sense to use `UNKNOWN_MEMBER_ID.equals(member.memberId())`? That would make the code a little more robust in the case where `memberId` would be null. ########## group-coordinator/src/main/java/org/apache/kafka/coordinator/group/GroupMetadataManager.java: ########## @@ -2904,6 +2908,135 @@ private void validateGenericGroupHeartbeat( } } + /** + * Handle a generic group LeaveGroup request. + * + * @param context The request context. + * @param request The actual LeaveGroup request. + * + * @return The LeaveGroup response and the GroupMetadata record to append if the group + * no longer has any member. + */ + public CoordinatorResult<LeaveGroupResponseData, Record> genericGroupLeave( + RequestContext context, + LeaveGroupRequestData request + ) throws UnknownMemberIdException, GroupIdNotFoundException { + GenericGroup group = getOrMaybeCreateGenericGroup(request.groupId(), false); + if (group.isInState(DEAD)) { + return new CoordinatorResult<>(Collections.emptyList(), + new LeaveGroupResponseData() + .setErrorCode(COORDINATOR_NOT_AVAILABLE.code()) + ); + } + + CoordinatorResult<Void, Record> coordinatorResult = EMPTY_RESULT; + List<MemberResponse> memberResponses = new ArrayList<>(); + + for (MemberIdentity member : request.members()) { Review Comment: nit: The indentation is wrong for the `for` loop. ########## group-coordinator/src/main/java/org/apache/kafka/coordinator/group/GroupMetadataManager.java: ########## @@ -2904,6 +2908,135 @@ private void validateGenericGroupHeartbeat( } } + /** + * Handle a generic group LeaveGroup request. + * + * @param context The request context. + * @param request The actual LeaveGroup request. + * + * @return The LeaveGroup response and the GroupMetadata record to append if the group + * no longer has any member. + */ + public CoordinatorResult<LeaveGroupResponseData, Record> genericGroupLeave( + RequestContext context, + LeaveGroupRequestData request + ) throws UnknownMemberIdException, GroupIdNotFoundException { + GenericGroup group = getOrMaybeCreateGenericGroup(request.groupId(), false); + if (group.isInState(DEAD)) { + return new CoordinatorResult<>(Collections.emptyList(), + new LeaveGroupResponseData() + .setErrorCode(COORDINATOR_NOT_AVAILABLE.code()) + ); + } + + CoordinatorResult<Void, Record> coordinatorResult = EMPTY_RESULT; + List<MemberResponse> memberResponses = new ArrayList<>(); + + for (MemberIdentity member : request.members()) { + // The LeaveGroup API allows administrative removal of members by GroupInstanceId + // in which case we expect the MemberId to be undefined. + if (member.memberId().equals(UNKNOWN_MEMBER_ID)) { + if (member.groupInstanceId() != null && group.staticMemberId(member.groupInstanceId()) != null) { Review Comment: nit: I wonder if we should introduce `GenericGroup.isStaticMember` method to simplify the condition here? ########## group-coordinator/src/main/java/org/apache/kafka/coordinator/group/GroupMetadataManager.java: ########## @@ -2904,6 +2908,135 @@ private void validateGenericGroupHeartbeat( } } + /** + * Handle a generic group LeaveGroup request. + * + * @param context The request context. + * @param request The actual LeaveGroup request. + * + * @return The LeaveGroup response and the GroupMetadata record to append if the group + * no longer has any member. + */ + public CoordinatorResult<LeaveGroupResponseData, Record> genericGroupLeave( + RequestContext context, + LeaveGroupRequestData request + ) throws UnknownMemberIdException, GroupIdNotFoundException { + GenericGroup group = getOrMaybeCreateGenericGroup(request.groupId(), false); + if (group.isInState(DEAD)) { + return new CoordinatorResult<>(Collections.emptyList(), + new LeaveGroupResponseData() + .setErrorCode(COORDINATOR_NOT_AVAILABLE.code()) + ); + } + + CoordinatorResult<Void, Record> coordinatorResult = EMPTY_RESULT; + List<MemberResponse> memberResponses = new ArrayList<>(); + + for (MemberIdentity member : request.members()) { + // The LeaveGroup API allows administrative removal of members by GroupInstanceId + // in which case we expect the MemberId to be undefined. + if (member.memberId().equals(UNKNOWN_MEMBER_ID)) { + if (member.groupInstanceId() != null && group.staticMemberId(member.groupInstanceId()) != null) { + coordinatorResult = removeCurrentMemberFromGenericGroup( + group, + group.staticMemberId(member.groupInstanceId()), + member.reason() + ); + memberResponses.add( + new MemberResponse() + .setMemberId(member.memberId()) + .setGroupInstanceId(member.groupInstanceId()) + ); + } else { + memberResponses.add( + new MemberResponse() + .setMemberId(member.memberId()) + .setGroupInstanceId(member.groupInstanceId()) + .setErrorCode(Errors.UNKNOWN_MEMBER_ID.code()) + ); + } + } else if (group.isPendingMember(member.memberId())) { + coordinatorResult = removePendingMemberAndUpdateGenericGroup(group, member.memberId()); + timer.cancel(genericGroupHeartbeatKey(group.groupId(), member.memberId())); + log.info("Pending member {} has left group {} through explicit `LeaveGroup` request.", + member.memberId(), group.groupId()); + + memberResponses.add( + new MemberResponse() + .setMemberId(member.memberId()) + .setGroupInstanceId(member.groupInstanceId()) + ); + } else { + try { + group.validateMember(member.memberId(), member.groupInstanceId(), "leave-group"); + coordinatorResult = removeCurrentMemberFromGenericGroup( + group, + member.memberId(), + member.reason() + ); + memberResponses.add( + new MemberResponse() + .setMemberId(member.memberId()) + .setGroupInstanceId(member.groupInstanceId()) + ); + } catch (KafkaException e) { + memberResponses.add( + new MemberResponse() + .setMemberId(member.memberId()) + .setGroupInstanceId(member.groupInstanceId()) + .setErrorCode(Errors.forException(e).code()) + ); + } + } + } + return new CoordinatorResult<>( + coordinatorResult.records(), + new LeaveGroupResponseData() + .setMembers(memberResponses), + coordinatorResult.appendFuture() + ); + } + + /** + * Remove a member from the group. Cancel member's heartbeat, and prepare rebalance + * or complete the join phase if necessary. + * + * @param group The generic group. + * @param memberId The member id. + * @param reason The reason for the LeaveGroup request. + * + * @return The GroupMetadata record and the append future to be completed once the record is + * appended to the log (and replicated). + */ + private CoordinatorResult<Void, Record> removeCurrentMemberFromGenericGroup( + GenericGroup group, + String memberId, + String reason + ) { + GenericGroupMember member = group.member(memberId); + reason = reason != null ? reason : "not provided"; + timer.cancel(genericGroupHeartbeatKey(group.groupId(), memberId)); + log.info("[Group {}] Member {} has left group through explicit `LeaveGroup` request; client reason: {}", + group.groupId(), memberId, reason); + + group.completeJoinFuture(member, Review Comment: nit: Should we put `member` on a new line? ########## group-coordinator/src/test/java/org/apache/kafka/coordinator/group/GroupMetadataManagerTest.java: ########## @@ -8745,6 +8784,429 @@ private static void checkJoinGroupResponse( .collect(Collectors.toSet()); assertEquals(expectedGroupInstanceIds, groupInstanceIds); +======= + @Test + public void testGroupStuckInRebalanceTimeoutDueToNonjoinedStaticMember() throws Exception { + GroupMetadataManagerTestContext context = new GroupMetadataManagerTestContext.Builder() + .build(); + int longSessionTimeoutMs = 10000; + int rebalanceTimeoutMs = 5000; + RebalanceResult rebalanceResult = context.staticMembersJoinAndRebalance( + "group-id", + "leader-instance-id", + "follower-instance-id", + rebalanceTimeoutMs, + longSessionTimeoutMs + ); + GenericGroup group = context.groupMetadataManager.getOrMaybeCreateGenericGroup("group-id", false); + + // New member joins + JoinResult joinResult = context.sendGenericGroupJoin( + new JoinGroupRequestBuilder() + .withGroupId("group-id") + .withMemberId(UNKNOWN_MEMBER_ID) + .withProtocolSuperset() + .withSessionTimeoutMs(longSessionTimeoutMs) + .build() + ); + + // The new dynamic member has been elected as leader + assertNoOrEmptyResult(context.sleep(rebalanceTimeoutMs)); + assertTrue(joinResult.joinFuture.isDone()); + assertEquals(Errors.NONE.code(), joinResult.joinFuture.get().errorCode()); + assertEquals(joinResult.joinFuture.get().leader(), joinResult.joinFuture.get().memberId()); + assertEquals(3, joinResult.joinFuture.get().members().size()); + assertEquals(2, joinResult.joinFuture.get().generationId()); + assertTrue(group.isInState(COMPLETING_REBALANCE)); + + assertEquals( + mkSet(rebalanceResult.leaderId, rebalanceResult.followerId, joinResult.joinFuture.get().memberId()), + group.allMemberIds() + ); + assertEquals( + mkSet(rebalanceResult.leaderId, rebalanceResult.followerId), + group.allStaticMemberIds() + ); + assertEquals( + mkSet(joinResult.joinFuture.get().memberId()), + group.allDynamicMemberIds() + ); + + // Send a special leave group request from static follower, moving group towards PreparingRebalance + CoordinatorResult<LeaveGroupResponseData, Record> leaveResult = context.sendGenericGroupLeave( + new LeaveGroupRequestData() + .setGroupId("group-id") + .setMembers(Collections.singletonList( + new MemberIdentity() + .setMemberId(rebalanceResult.followerId) + .setGroupInstanceId("follower-instance-id") + )) + ); + + assertTrue(leaveResult.records().isEmpty()); + context.verifyLeaveGroupResponse(leaveResult.response(), Errors.NONE, Collections.emptyList()); + assertTrue(group.isInState(PREPARING_REBALANCE)); + + context.sleep(rebalanceTimeoutMs); + // Only static leader is maintained, and group is stuck at PreparingRebalance stage + assertTrue(group.allDynamicMemberIds().isEmpty()); + assertEquals(Collections.singleton(rebalanceResult.leaderId), group.allMemberIds()); + assertTrue(group.allDynamicMemberIds().isEmpty()); + assertEquals(2, group.generationId()); + assertTrue(group.isInState(PREPARING_REBALANCE)); + } + + @Test + public void testPendingMembersLeaveGroup() throws Exception { + GroupMetadataManagerTestContext context = new GroupMetadataManagerTestContext.Builder() + .build(); + GenericGroup group = context.createGenericGroup("group-id"); + JoinGroupResponseData pendingJoinResponse = context.setupGroupWithPendingMember(group); + + CoordinatorResult<LeaveGroupResponseData, Record> leaveResult = context.sendGenericGroupLeave( + new LeaveGroupRequestData() + .setGroupId("group-id") + .setMembers(Collections.singletonList( + new MemberIdentity() + .setMemberId(pendingJoinResponse.memberId()) + )) + ); + assertTrue(leaveResult.records().isEmpty()); + context.verifyLeaveGroupResponse(leaveResult.response(), Errors.NONE, Collections.emptyList()); + + assertTrue(group.isInState(COMPLETING_REBALANCE)); + assertEquals(2, group.allMembers().size()); + assertEquals(2, group.allDynamicMemberIds().size()); + assertEquals(0, group.numPendingJoinMembers()); + } + + @Test + public void testLeaveGroupInvalidGroup() { + GroupMetadataManagerTestContext context = new GroupMetadataManagerTestContext.Builder() + .build(); + context.createGenericGroup("group-id"); + + assertThrows(UnknownMemberIdException.class, () -> context.sendGenericGroupLeave( + new LeaveGroupRequestData() + .setGroupId("invalid-group-id") + )); + } + + @Test + public void testLeaveGroupUnknownGroup() { + GroupMetadataManagerTestContext context = new GroupMetadataManagerTestContext.Builder() + .build(); + context.createGenericGroup("group-id"); + + CoordinatorResult<LeaveGroupResponseData, Record> leaveResult = context.sendGenericGroupLeave( + new LeaveGroupRequestData() + .setGroupId("group-id") + .setMembers(Collections.singletonList( + new MemberIdentity() + .setMemberId("member-id") + )) + ); + + assertTrue(leaveResult.records().isEmpty()); + context.verifyLeaveGroupResponse( + leaveResult.response(), + Errors.NONE, + Collections.singletonList(Errors.UNKNOWN_MEMBER_ID) + ); + } + + @Test + public void testLeaveGroupUnknownConsumerExistingGroup() throws Exception { + GroupMetadataManagerTestContext context = new GroupMetadataManagerTestContext.Builder() + .build(); + context.createGenericGroup("group-id"); + + context.joinGenericGroupAsDynamicMemberAndCompleteJoin( + new JoinGroupRequestBuilder() + .withGroupId("group-id") + .withMemberId(UNKNOWN_MEMBER_ID) + .withDefaultProtocolTypeAndProtocols() + .build() + ); + + CoordinatorResult<LeaveGroupResponseData, Record> leaveResult = context.sendGenericGroupLeave( + new LeaveGroupRequestData() + .setGroupId("group-id") + .setMembers(Collections.singletonList( + new MemberIdentity() + .setMemberId("other-member-id") + )) + ); + assertTrue(leaveResult.records().isEmpty()); + context.verifyLeaveGroupResponse( + leaveResult.response(), + Errors.NONE, + Collections.singletonList(Errors.UNKNOWN_MEMBER_ID) + ); + } + + @Test + public void testLeaveDeadGroup() { + GroupMetadataManagerTestContext context = new GroupMetadataManagerTestContext.Builder() + .build(); + GenericGroup group = context.createGenericGroup("group-id"); + group.transitionTo(DEAD); + + CoordinatorResult<LeaveGroupResponseData, Record> leaveResult = context.sendGenericGroupLeave( + new LeaveGroupRequestData() + .setGroupId("group-id") + .setMembers(Collections.singletonList( + new MemberIdentity() + .setMemberId("member-id") + )) + ); + + assertTrue(leaveResult.records().isEmpty()); + context.verifyLeaveGroupResponse( + leaveResult.response(), + Errors.COORDINATOR_NOT_AVAILABLE, + Collections.emptyList() + ); + } + + @Test + public void testValidLeaveGroup() throws Exception { + GroupMetadataManagerTestContext context = new GroupMetadataManagerTestContext.Builder() + .build(); + GenericGroup group = context.createGenericGroup("group-id"); + + JoinGroupResponseData joinResponse = context.joinGenericGroupAsDynamicMemberAndCompleteJoin( + new JoinGroupRequestBuilder() + .withGroupId("group-id") + .withMemberId(UNKNOWN_MEMBER_ID) + .withDefaultProtocolTypeAndProtocols() + .build() + ); + + // Dynamic member leaves. The group becomes empty. + CoordinatorResult<LeaveGroupResponseData, Record> leaveResult = context.sendGenericGroupLeave( + new LeaveGroupRequestData() + .setGroupId("group-id") + .setMembers(Collections.singletonList( + new MemberIdentity() + .setMemberId(joinResponse.memberId()) + )) + ); + assertEquals( + Collections.singletonList(newGroupMetadataRecordWithCurrentState(group, MetadataVersion.latest())), + leaveResult.records() + ); + // Simulate a successful write to the log. + leaveResult.appendFuture().complete(null); + + context.verifyLeaveGroupResponse(leaveResult.response(), Errors.NONE, Collections.emptyList()); + assertTrue(group.isInState(EMPTY)); + assertEquals(2, group.generationId()); + } + + @Test + public void testLeaveGroupWithFencedInstanceId() throws Exception { + GroupMetadataManagerTestContext context = new GroupMetadataManagerTestContext.Builder() + .build(); + context.createGenericGroup("group-id"); + + context.joinGenericGroupAndCompleteJoin( + new JoinGroupRequestBuilder() + .withGroupId("group-id") + .withGroupInstanceId("group-instance-id") + .withMemberId(UNKNOWN_MEMBER_ID) + .withDefaultProtocolTypeAndProtocols() + .build(), + true, + true + ); + + CoordinatorResult<LeaveGroupResponseData, Record> leaveResult = context.sendGenericGroupLeave( + new LeaveGroupRequestData() + .setGroupId("group-id") + .setMembers(Collections.singletonList( + new MemberIdentity() + .setGroupInstanceId("group-instance-id") + .setMemberId("other-member-id") // invalid member id + )) + ); + assertTrue(leaveResult.records().isEmpty()); + context.verifyLeaveGroupResponse(leaveResult.response(), + Errors.NONE, + Collections.singletonList(Errors.FENCED_INSTANCE_ID) + ); + } + + @Test + public void testLeaveGroupStaticMemberWithUnknownMemberId() throws Exception { + GroupMetadataManagerTestContext context = new GroupMetadataManagerTestContext.Builder() + .build(); + context.createGenericGroup("group-id"); + + context.joinGenericGroupAndCompleteJoin( + new JoinGroupRequestBuilder() + .withGroupId("group-id") + .withGroupInstanceId("group-instance-id") + .withMemberId(UNKNOWN_MEMBER_ID) + .withDefaultProtocolTypeAndProtocols() + .build(), + true, + true + ); + + // Having unknown member id will not affect the request processing due to valid group instance id. + CoordinatorResult<LeaveGroupResponseData, Record> leaveResult = context.sendGenericGroupLeave( + new LeaveGroupRequestData() + .setGroupId("group-id") + .setMembers(Collections.singletonList( + new MemberIdentity() + .setGroupInstanceId("group-instance-id") + .setMemberId(UNKNOWN_MEMBER_ID) + )) + ); + context.verifyLeaveGroupResponse(leaveResult.response(), + Errors.NONE, + Collections.singletonList(Errors.NONE) + ); + } + + @Test + public void testStaticMembersValidBatchLeaveGroup() throws Exception { + GroupMetadataManagerTestContext context = new GroupMetadataManagerTestContext.Builder() + .build(); + context.staticMembersJoinAndRebalance( + "group-id", + "leader-instance-id", + "follower-instance-id" + ); + + CoordinatorResult<LeaveGroupResponseData, Record> leaveResult = context.sendGenericGroupLeave( + new LeaveGroupRequestData() + .setGroupId("group-id") + .setMembers( + Arrays.asList( + new MemberIdentity() + .setGroupInstanceId("leader-instance-id"), + new MemberIdentity() + .setGroupInstanceId("follower-instance-id") + ) + ) + ); + context.verifyLeaveGroupResponse( + leaveResult.response(), + Errors.NONE, + Arrays.asList(Errors.NONE, Errors.NONE) + ); + } + + @Test + public void testStaticMembersLeaveUnknownGroup() throws Exception { + GroupMetadataManagerTestContext context = new GroupMetadataManagerTestContext.Builder() + .build(); + context.staticMembersJoinAndRebalance( + "group-id", + "leader-instance-id", + "follower-instance-id" + ); + + assertThrows(UnknownMemberIdException.class, () -> context.sendGenericGroupLeave( + new LeaveGroupRequestData() + .setGroupId("invalid-group-id") // Invalid group id + .setMembers( + Arrays.asList( + new MemberIdentity() + .setGroupInstanceId("leader-instance-id"), + new MemberIdentity() + .setGroupInstanceId("follower-instance-id") + ) + ) + )); + } + + @Test + public void testStaticMembersFencedInstanceBatchLeaveGroup() throws Exception { + GroupMetadataManagerTestContext context = new GroupMetadataManagerTestContext.Builder() + .build(); + context.staticMembersJoinAndRebalance( + "group-id", + "leader-instance-id", + "follower-instance-id" + ); + + CoordinatorResult<LeaveGroupResponseData, Record> leaveResult = context.sendGenericGroupLeave( + new LeaveGroupRequestData() + .setGroupId("group-id") + .setMembers( + Arrays.asList( + new MemberIdentity() + .setGroupInstanceId("leader-instance-id"), + new MemberIdentity() + .setGroupInstanceId("follower-instance-id") + .setMemberId("invalid-member-id") + ) + ) + ); + context.verifyLeaveGroupResponse( + leaveResult.response(), + Errors.NONE, + Arrays.asList(Errors.NONE, Errors.FENCED_INSTANCE_ID) + ); + } + + @Test + public void testStaticMembersUnknownInstanceBatchLeaveGroup() throws Exception { + GroupMetadataManagerTestContext context = new GroupMetadataManagerTestContext.Builder() + .build(); + context.staticMembersJoinAndRebalance( + "group-id", + "leader-instance-id", + "follower-instance-id" + ); + + CoordinatorResult<LeaveGroupResponseData, Record> leaveResult = context.sendGenericGroupLeave( + new LeaveGroupRequestData() + .setGroupId("group-id") + .setMembers( + Arrays.asList( + new MemberIdentity() + .setGroupInstanceId("unknown-instance-id"), // Unknown instance id + new MemberIdentity() + .setGroupInstanceId("follower-instance-id") + ) + ) + ); + context.verifyLeaveGroupResponse( + leaveResult.response(), + Errors.NONE, + Arrays.asList(Errors.UNKNOWN_MEMBER_ID, Errors.NONE) + ); + } + + @Test + public void testPendingMemberBatchLeaveGroup() throws Exception { + GroupMetadataManagerTestContext context = new GroupMetadataManagerTestContext.Builder() + .build(); + GenericGroup group = context.createGenericGroup("group-id"); + JoinGroupResponseData pendingJoinResponse = context.setupGroupWithPendingMember(group); + + CoordinatorResult<LeaveGroupResponseData, Record> leaveResult = context.sendGenericGroupLeave( + new LeaveGroupRequestData() + .setGroupId("group-id") + .setMembers( + Arrays.asList( + new MemberIdentity() + .setGroupInstanceId("unknown-instance-id"), // Unknown instance id + new MemberIdentity() + .setMemberId(pendingJoinResponse.memberId()) + ) + ) + ); + context.verifyLeaveGroupResponse( + leaveResult.response(), + Errors.NONE, + Arrays.asList(Errors.UNKNOWN_MEMBER_ID, Errors.NONE) + ); +>>>>>>> 518dbe693c (implement LeaveGroup API) } Review Comment: I wonder if we should add a test with multiple members (e.g. a joined member, a pending member, etc.) that verifies that records are correctly handled in this case. What do you think? -- This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. To unsubscribe, e-mail: jira-unsubscr...@kafka.apache.org For queries about this service, please contact Infrastructure at: us...@infra.apache.org