hachikuji commented on code in PR #12855:
URL: https://github.com/apache/kafka/pull/12855#discussion_r1043778147


##########
core/src/main/scala/kafka/server/KafkaApis.scala:
##########
@@ -1571,53 +1571,47 @@ class KafkaApis(val requestChannel: RequestChannel,
     }
   }
 
-  def handleDescribeGroupRequest(request: RequestChannel.Request): Unit = {
-
-    def sendResponseCallback(describeGroupsResponseData: 
DescribeGroupsResponseData): Unit = {
-      def createResponse(requestThrottleMs: Int): AbstractResponse = {
-        describeGroupsResponseData.setThrottleTimeMs(requestThrottleMs)
-        new DescribeGroupsResponse(describeGroupsResponseData)
-      }
-      requestHelper.sendResponseMaybeThrottle(request, createResponse)
-    }
-
+  def handleDescribeGroupsRequest(request: RequestChannel.Request): 
CompletableFuture[Unit] = {
     val describeRequest = request.body[DescribeGroupsRequest]
-    val describeGroupsResponseData = new DescribeGroupsResponseData()
+    val includeAuthorizedOperations = 
describeRequest.data.includeAuthorizedOperations
+    val futures = new 
mutable.ArrayBuffer[CompletableFuture[DescribeGroupsResponseData.DescribedGroup]](
+      describeRequest.data.groups.size
+    )
 
     describeRequest.data.groups.forEach { groupId =>
       if (!authHelper.authorize(request.context, DESCRIBE, GROUP, groupId)) {
-        
describeGroupsResponseData.groups.add(DescribeGroupsResponse.forError(groupId, 
Errors.GROUP_AUTHORIZATION_FAILED))
+        futures += 
CompletableFuture.completedFuture(DescribeGroupsResponse.forError(
+          groupId,
+          Errors.GROUP_AUTHORIZATION_FAILED
+        ))
       } else {
-        val (error, summary) = groupCoordinator.handleDescribeGroup(groupId)
-        val members = summary.members.map { member =>
-          new DescribeGroupsResponseData.DescribedGroupMember()
-            .setMemberId(member.memberId)
-            .setGroupInstanceId(member.groupInstanceId.orNull)
-            .setClientId(member.clientId)
-            .setClientHost(member.clientHost)
-            .setMemberAssignment(member.assignment)
-            .setMemberMetadata(member.metadata)
-        }
-
-        val describedGroup = new DescribeGroupsResponseData.DescribedGroup()
-          .setErrorCode(error.code)
-          .setGroupId(groupId)
-          .setGroupState(summary.state)
-          .setProtocolType(summary.protocolType)
-          .setProtocolData(summary.protocol)
-          .setMembers(members.asJava)
-
-        if (request.header.apiVersion >= 3) {
-          if (error == Errors.NONE && 
describeRequest.data.includeAuthorizedOperations) {
-            
describedGroup.setAuthorizedOperations(authHelper.authorizedOperations(request, 
new Resource(ResourceType.GROUP, groupId)))
+        futures += newGroupCoordinator.describeGroup(

Review Comment:
   I don't have a strong opinion, but I slightly prefer keeping `KafkaApis` 
simple and having the complexity in the coordinator implementation. I also 
wonder if that makes optimization easier. For example, even if we have fanout 
to the respective coordinators, it may still be the case that multiple groups 
are handled by the same coordinator. Perhaps that would mean fewer messages on 
the respective queues? Not sure.



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: [email protected]

For queries about this service, please contact Infrastructure at:
[email protected]

Reply via email to