dajac commented on code in PR #12855:
URL: https://github.com/apache/kafka/pull/12855#discussion_r1043725962


##########
core/src/main/scala/kafka/server/KafkaApis.scala:
##########
@@ -1571,53 +1571,47 @@ class KafkaApis(val requestChannel: RequestChannel,
     }
   }
 
-  def handleDescribeGroupRequest(request: RequestChannel.Request): Unit = {
-
-    def sendResponseCallback(describeGroupsResponseData: 
DescribeGroupsResponseData): Unit = {
-      def createResponse(requestThrottleMs: Int): AbstractResponse = {
-        describeGroupsResponseData.setThrottleTimeMs(requestThrottleMs)
-        new DescribeGroupsResponse(describeGroupsResponseData)
-      }
-      requestHelper.sendResponseMaybeThrottle(request, createResponse)
-    }
-
+  def handleDescribeGroupsRequest(request: RequestChannel.Request): 
CompletableFuture[Unit] = {
     val describeRequest = request.body[DescribeGroupsRequest]
-    val describeGroupsResponseData = new DescribeGroupsResponseData()
+    val includeAuthorizedOperations = 
describeRequest.data.includeAuthorizedOperations
+    val futures = new 
mutable.ArrayBuffer[CompletableFuture[DescribeGroupsResponseData.DescribedGroup]](
+      describeRequest.data.groups.size
+    )
 
     describeRequest.data.groups.forEach { groupId =>
       if (!authHelper.authorize(request.context, DESCRIBE, GROUP, groupId)) {
-        
describeGroupsResponseData.groups.add(DescribeGroupsResponse.forError(groupId, 
Errors.GROUP_AUTHORIZATION_FAILED))
+        futures += 
CompletableFuture.completedFuture(DescribeGroupsResponse.forError(
+          groupId,
+          Errors.GROUP_AUTHORIZATION_FAILED
+        ))
       } else {
-        val (error, summary) = groupCoordinator.handleDescribeGroup(groupId)
-        val members = summary.members.map { member =>
-          new DescribeGroupsResponseData.DescribedGroupMember()
-            .setMemberId(member.memberId)
-            .setGroupInstanceId(member.groupInstanceId.orNull)
-            .setClientId(member.clientId)
-            .setClientHost(member.clientHost)
-            .setMemberAssignment(member.assignment)
-            .setMemberMetadata(member.metadata)
-        }
-
-        val describedGroup = new DescribeGroupsResponseData.DescribedGroup()
-          .setErrorCode(error.code)
-          .setGroupId(groupId)
-          .setGroupState(summary.state)
-          .setProtocolType(summary.protocolType)
-          .setProtocolData(summary.protocol)
-          .setMembers(members.asJava)
-
-        if (request.header.apiVersion >= 3) {
-          if (error == Errors.NONE && 
describeRequest.data.includeAuthorizedOperations) {
-            
describedGroup.setAuthorizedOperations(authHelper.authorizedOperations(request, 
new Resource(ResourceType.GROUP, groupId)))
+        futures += newGroupCoordinator.describeGroup(

Review Comment:
   There is not particular reason, I guess. @hachikuji The reason I kept it 
like this is that we will need to fan out any way with the new controller as we 
will have multiple event loops. I suppose that we could say that this is an 
internal implementation details so not exposing it in the interface would make 
sense. What do you think?



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: [email protected]

For queries about this service, please contact Infrastructure at:
[email protected]

Reply via email to