dajac commented on code in PR #14147: URL: https://github.com/apache/kafka/pull/14147#discussion_r1318160262
########## group-coordinator/src/main/java/org/apache/kafka/coordinator/group/GroupMetadataManager.java: ########## @@ -2930,72 +2930,86 @@ public CoordinatorResult<LeaveGroupResponseData, Record> genericGroupLeave( ); } - CoordinatorResult<Void, Record> coordinatorResult = EMPTY_RESULT; List<MemberResponse> memberResponses = new ArrayList<>(); - - for (MemberIdentity member: request.members()) { - // The LeaveGroup API allows administrative removal of members by GroupInstanceId - // in which case we expect the MemberId to be undefined. - if (UNKNOWN_MEMBER_ID.equals(member.memberId())) { - if (member.groupInstanceId() != null && group.hasStaticMember(member.groupInstanceId())) { - coordinatorResult = removeCurrentMemberFromGenericGroup( - group, - group.staticMemberId(member.groupInstanceId()), - member.reason() - ); - memberResponses.add( - new MemberResponse() - .setMemberId(member.memberId()) - .setGroupInstanceId(member.groupInstanceId()) - ); - } else { - memberResponses.add( - new MemberResponse() - .setMemberId(member.memberId()) - .setGroupInstanceId(member.groupInstanceId()) - .setErrorCode(Errors.UNKNOWN_MEMBER_ID.code()) - ); - } - } else if (group.isPendingMember(member.memberId())) { - coordinatorResult = removePendingMemberAndUpdateGenericGroup(group, member.memberId()); - timer.cancel(genericGroupHeartbeatKey(group.groupId(), member.memberId())); - log.info("Pending member {} has left group {} through explicit `LeaveGroup` request. Reason: {}", - member.memberId(), group.groupId(), member.reason()); - + List<CoordinatorResult<Void, Record>> coordinatorResults = new ArrayList<>(); + + for (MemberIdentity member: request.members()) { + String reason = member.reason() != null ? member.reason() : "not provided"; + // The LeaveGroup API allows administrative removal of members by GroupInstanceId + // in which case we expect the MemberId to be undefined. + if (UNKNOWN_MEMBER_ID.equals(member.memberId())) { + if (member.groupInstanceId() != null && group.hasStaticMember(member.groupInstanceId())) { + coordinatorResults.add(removeCurrentMemberFromGenericGroup( + group, + group.staticMemberId(member.groupInstanceId()), + reason + )); memberResponses.add( new MemberResponse() .setMemberId(member.memberId()) .setGroupInstanceId(member.groupInstanceId()) ); } else { - try { - group.validateMember(member.memberId(), member.groupInstanceId(), "leave-group"); - coordinatorResult = removeCurrentMemberFromGenericGroup( - group, - member.memberId(), - member.reason() - ); - memberResponses.add( - new MemberResponse() - .setMemberId(member.memberId()) - .setGroupInstanceId(member.groupInstanceId()) - ); - } catch (KafkaException e) { - memberResponses.add( - new MemberResponse() - .setMemberId(member.memberId()) - .setGroupInstanceId(member.groupInstanceId()) - .setErrorCode(Errors.forException(e).code()) - ); - } + memberResponses.add( + new MemberResponse() + .setMemberId(member.memberId()) + .setGroupInstanceId(member.groupInstanceId()) + .setErrorCode(Errors.UNKNOWN_MEMBER_ID.code()) + ); + } + } else if (group.isPendingMember(member.memberId())) { + coordinatorResults.add(removePendingMemberAndUpdateGenericGroup(group, member.memberId())); + timer.cancel(genericGroupHeartbeatKey(group.groupId(), member.memberId())); + log.info("Pending member {} has left group {} through explicit `LeaveGroup` request. Reason: {}", + member.memberId(), group.groupId(), reason); + + memberResponses.add( + new MemberResponse() + .setMemberId(member.memberId()) + .setGroupInstanceId(member.groupInstanceId()) + ); + } else { + try { + group.validateMember(member.memberId(), member.groupInstanceId(), "leave-group"); + coordinatorResults.add(removeCurrentMemberFromGenericGroup( + group, + member.memberId(), + reason + )); + memberResponses.add( + new MemberResponse() + .setMemberId(member.memberId()) + .setGroupInstanceId(member.groupInstanceId()) + ); + } catch (KafkaException e) { + memberResponses.add( + new MemberResponse() + .setMemberId(member.memberId()) + .setGroupInstanceId(member.groupInstanceId()) + .setErrorCode(Errors.forException(e).code()) + ); } } - return new CoordinatorResult<>( - coordinatorResult.records(), - new LeaveGroupResponseData() - .setMembers(memberResponses), - coordinatorResult.appendFuture() - ); + } + + List<CoordinatorResult<Void, Record>> results = coordinatorResults.stream() + .filter(result -> result != EMPTY_RESULT) + .collect(Collectors.toList()); + + CoordinatorResult<Void, Record> coordinatorResult = EMPTY_RESULT; + + if (results.size() > 1) { + throw new IllegalStateException("Expected max 1 non-empty result but found: " + results.size()); + } else if (results.size() == 1) { + coordinatorResult = coordinatorResults.get(0); + } Review Comment: This seems a bit fragile to me. How about creating a new CoordinatorResult which contains all the records generated and a write future that will be used to complete all the write future of all the generated CoordinatorResults? I suppose that we could assume that a CoordinatorResult without records can be ignored. If we have only one CoordinatorResult, we could just return it as you did here. I think that we could wrap this in a new class, say CoordinatorResultAggregator/Builder. What do you think? -- This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. To unsubscribe, e-mail: jira-unsubscr...@kafka.apache.org For queries about this service, please contact Infrastructure at: us...@infra.apache.org