dajac commented on code in PR #14147:
URL: https://github.com/apache/kafka/pull/14147#discussion_r1318160262


##########
group-coordinator/src/main/java/org/apache/kafka/coordinator/group/GroupMetadataManager.java:
##########
@@ -2930,72 +2930,86 @@ public CoordinatorResult<LeaveGroupResponseData, 
Record> genericGroupLeave(
             );
         }
 
-        CoordinatorResult<Void, Record> coordinatorResult = EMPTY_RESULT;
         List<MemberResponse> memberResponses = new ArrayList<>();
-
-            for (MemberIdentity member: request.members()) {
-                // The LeaveGroup API allows administrative removal of members 
by GroupInstanceId
-                // in which case we expect the MemberId to be undefined.
-                if (UNKNOWN_MEMBER_ID.equals(member.memberId())) {
-                    if (member.groupInstanceId() != null && 
group.hasStaticMember(member.groupInstanceId())) {
-                        coordinatorResult = 
removeCurrentMemberFromGenericGroup(
-                            group,
-                            group.staticMemberId(member.groupInstanceId()),
-                            member.reason()
-                        );
-                        memberResponses.add(
-                            new MemberResponse()
-                                .setMemberId(member.memberId())
-                                .setGroupInstanceId(member.groupInstanceId())
-                        );
-                    } else {
-                        memberResponses.add(
-                            new MemberResponse()
-                                .setMemberId(member.memberId())
-                                .setGroupInstanceId(member.groupInstanceId())
-                                .setErrorCode(Errors.UNKNOWN_MEMBER_ID.code())
-                        );
-                    }
-                } else if (group.isPendingMember(member.memberId())) {
-                    coordinatorResult = 
removePendingMemberAndUpdateGenericGroup(group, member.memberId());
-                    timer.cancel(genericGroupHeartbeatKey(group.groupId(), 
member.memberId()));
-                    log.info("Pending member {} has left group {} through 
explicit `LeaveGroup` request. Reason: {}",
-                        member.memberId(), group.groupId(), member.reason());
-
+        List<CoordinatorResult<Void, Record>> coordinatorResults = new 
ArrayList<>();
+
+        for (MemberIdentity member: request.members()) {
+            String reason = member.reason() != null ? member.reason() : "not 
provided";
+            // The LeaveGroup API allows administrative removal of members by 
GroupInstanceId
+            // in which case we expect the MemberId to be undefined.
+            if (UNKNOWN_MEMBER_ID.equals(member.memberId())) {
+                if (member.groupInstanceId() != null && 
group.hasStaticMember(member.groupInstanceId())) {
+                    coordinatorResults.add(removeCurrentMemberFromGenericGroup(
+                        group,
+                        group.staticMemberId(member.groupInstanceId()),
+                        reason
+                    ));
                     memberResponses.add(
                         new MemberResponse()
                             .setMemberId(member.memberId())
                             .setGroupInstanceId(member.groupInstanceId())
                     );
                 } else {
-                    try {
-                        group.validateMember(member.memberId(), 
member.groupInstanceId(), "leave-group");
-                        coordinatorResult = 
removeCurrentMemberFromGenericGroup(
-                            group,
-                            member.memberId(),
-                            member.reason()
-                        );
-                        memberResponses.add(
-                            new MemberResponse()
-                                .setMemberId(member.memberId())
-                                .setGroupInstanceId(member.groupInstanceId())
-                        );
-                    } catch (KafkaException e) {
-                        memberResponses.add(
-                            new MemberResponse()
-                                .setMemberId(member.memberId())
-                                .setGroupInstanceId(member.groupInstanceId())
-                                .setErrorCode(Errors.forException(e).code())
-                        );
-                    }
+                    memberResponses.add(
+                        new MemberResponse()
+                            .setMemberId(member.memberId())
+                            .setGroupInstanceId(member.groupInstanceId())
+                            .setErrorCode(Errors.UNKNOWN_MEMBER_ID.code())
+                    );
+                }
+            } else if (group.isPendingMember(member.memberId())) {
+                
coordinatorResults.add(removePendingMemberAndUpdateGenericGroup(group, 
member.memberId()));
+                timer.cancel(genericGroupHeartbeatKey(group.groupId(), 
member.memberId()));
+                log.info("Pending member {} has left group {} through explicit 
`LeaveGroup` request. Reason: {}",
+                    member.memberId(), group.groupId(), reason);
+
+                memberResponses.add(
+                    new MemberResponse()
+                        .setMemberId(member.memberId())
+                        .setGroupInstanceId(member.groupInstanceId())
+                );
+            } else {
+                try {
+                    group.validateMember(member.memberId(), 
member.groupInstanceId(), "leave-group");
+                    coordinatorResults.add(removeCurrentMemberFromGenericGroup(
+                        group,
+                        member.memberId(),
+                        reason
+                    ));
+                    memberResponses.add(
+                        new MemberResponse()
+                            .setMemberId(member.memberId())
+                            .setGroupInstanceId(member.groupInstanceId())
+                    );
+                } catch (KafkaException e) {
+                    memberResponses.add(
+                        new MemberResponse()
+                            .setMemberId(member.memberId())
+                            .setGroupInstanceId(member.groupInstanceId())
+                            .setErrorCode(Errors.forException(e).code())
+                    );
                 }
             }
-            return new CoordinatorResult<>(
-                coordinatorResult.records(),
-                new LeaveGroupResponseData()
-                    .setMembers(memberResponses),
-                coordinatorResult.appendFuture()
-            );
+        }
+
+        List<CoordinatorResult<Void, Record>> results = 
coordinatorResults.stream()
+            .filter(result -> result != EMPTY_RESULT)
+            .collect(Collectors.toList());
+
+        CoordinatorResult<Void, Record> coordinatorResult = EMPTY_RESULT;
+
+        if (results.size() > 1) {
+            throw new IllegalStateException("Expected max 1 non-empty result 
but found: " + results.size());
+        } else if (results.size() == 1) {
+            coordinatorResult = coordinatorResults.get(0);
+        }

Review Comment:
   This seems a bit fragile to me. How about creating a new CoordinatorResult 
which contains all the records generated and a write future that will be used 
to complete all the write future of all the generated CoordinatorResults? I 
suppose that we could assume that a CoordinatorResult without records can be 
ignored. If we have only one CoordinatorResult, we could just return it as you 
did here. I think that we could wrap this in a new class, say 
CoordinatorResultAggregator/Builder. What do you think?



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: jira-unsubscr...@kafka.apache.org

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org

Reply via email to