This is an automated email from the ASF dual-hosted git repository.

dajac pushed a commit to branch trunk
in repository https://gitbox.apache.org/repos/asf/kafka.git


The following commit(s) were added to refs/heads/trunk by this push:
     new f9a09fdd294 MINOR: Small refactor in DescribeGroupsResponse (#12970)
f9a09fdd294 is described below

commit f9a09fdd2948a3b29af9629cc993683f7e7fc66d
Author: David Jacot <[email protected]>
AuthorDate: Fri Dec 9 14:15:53 2022 +0100

    MINOR: Small refactor in DescribeGroupsResponse (#12970)
    
    This patch does a few cleanups:
    * It removes `DescribeGroupsResponse.fromError` and pushes its logic to 
`DescribeGroupsRequest.getErrorResponse` to be consistent with how we 
implemented the other requests/responses.
    * It renames `DescribedGroup.forError` to `DescribedGroup.groupError`.
    
    The patch relies on existing tests.
    
    Reviewers: Mickael Maison <[email protected]>
---
 .../common/requests/DescribeGroupsRequest.java     | 18 ++++++++++-----
 .../common/requests/DescribeGroupsResponse.java    | 26 ++++++++--------------
 core/src/main/scala/kafka/server/KafkaApis.scala   |  2 +-
 3 files changed, 22 insertions(+), 24 deletions(-)

diff --git 
a/clients/src/main/java/org/apache/kafka/common/requests/DescribeGroupsRequest.java
 
b/clients/src/main/java/org/apache/kafka/common/requests/DescribeGroupsRequest.java
index eff5bb9fff1..25afb432cb7 100644
--- 
a/clients/src/main/java/org/apache/kafka/common/requests/DescribeGroupsRequest.java
+++ 
b/clients/src/main/java/org/apache/kafka/common/requests/DescribeGroupsRequest.java
@@ -17,14 +17,13 @@
 package org.apache.kafka.common.requests;
 
 import org.apache.kafka.common.message.DescribeGroupsRequestData;
+import org.apache.kafka.common.message.DescribeGroupsResponseData;
 import org.apache.kafka.common.protocol.ApiKeys;
 import org.apache.kafka.common.protocol.ByteBufferAccessor;
 import org.apache.kafka.common.protocol.Errors;
 
 import java.nio.ByteBuffer;
 
-import static 
org.apache.kafka.common.requests.AbstractResponse.DEFAULT_THROTTLE_TIME;
-
 public class DescribeGroupsRequest extends AbstractRequest {
     public static class Builder extends 
AbstractRequest.Builder<DescribeGroupsRequest> {
         private final DescribeGroupsRequestData data;
@@ -59,11 +58,18 @@ public class DescribeGroupsRequest extends AbstractRequest {
 
     @Override
     public AbstractResponse getErrorResponse(int throttleTimeMs, Throwable e) {
-        if (version() == 0) {
-            return DescribeGroupsResponse.fromError(DEFAULT_THROTTLE_TIME, 
Errors.forException(e), data.groups());
-        } else {
-            return DescribeGroupsResponse.fromError(throttleTimeMs, 
Errors.forException(e), data.groups());
+        Errors error = Errors.forException(e);
+        DescribeGroupsResponseData describeGroupsResponseData = new 
DescribeGroupsResponseData();
+
+        data.groups().forEach(groupId ->
+            
describeGroupsResponseData.groups().add(DescribeGroupsResponse.groupError(groupId,
 error))
+        );
+
+        if (version() >= 1) {
+            describeGroupsResponseData.setThrottleTimeMs(throttleTimeMs);
         }
+
+        return new DescribeGroupsResponse(describeGroupsResponseData);
     }
 
     public static DescribeGroupsRequest parse(ByteBuffer buffer, short 
version) {
diff --git 
a/clients/src/main/java/org/apache/kafka/common/requests/DescribeGroupsResponse.java
 
b/clients/src/main/java/org/apache/kafka/common/requests/DescribeGroupsResponse.java
index 119bedfdb19..bd341e19f9a 100644
--- 
a/clients/src/main/java/org/apache/kafka/common/requests/DescribeGroupsResponse.java
+++ 
b/clients/src/main/java/org/apache/kafka/common/requests/DescribeGroupsResponse.java
@@ -93,16 +93,21 @@ public class DescribeGroupsResponse extends 
AbstractResponse {
         final String protocolType,
         final String protocol,
         final List<DescribedGroupMember> members,
-        final int authorizedOperations) {
-        DescribedGroup groupMetadata = new DescribedGroup();
-        groupMetadata.setGroupId(groupId)
+        final int authorizedOperations
+    ) {
+        return new DescribedGroup()
+            .setGroupId(groupId)
             .setErrorCode(error.code())
             .setGroupState(state)
             .setProtocolType(protocolType)
             .setProtocolData(protocol)
             .setMembers(members)
             .setAuthorizedOperations(authorizedOperations);
-        return  groupMetadata;
+    }
+
+    public static DescribedGroup groupError(String groupId, Errors error) {
+        return groupMetadata(groupId, error, 
DescribeGroupsResponse.UNKNOWN_STATE, 
DescribeGroupsResponse.UNKNOWN_PROTOCOL_TYPE,
+            DescribeGroupsResponse.UNKNOWN_PROTOCOL, Collections.emptyList(), 
AUTHORIZED_OPERATIONS_OMITTED);
     }
 
     @Override
@@ -132,19 +137,6 @@ public class DescribeGroupsResponse extends 
AbstractResponse {
         return errorCounts;
     }
 
-    public static DescribedGroup forError(String groupId, Errors error) {
-        return groupMetadata(groupId, error, 
DescribeGroupsResponse.UNKNOWN_STATE, 
DescribeGroupsResponse.UNKNOWN_PROTOCOL_TYPE,
-                DescribeGroupsResponse.UNKNOWN_PROTOCOL, 
Collections.emptyList(), AUTHORIZED_OPERATIONS_OMITTED);
-    }
-
-    public static DescribeGroupsResponse fromError(int throttleTimeMs, Errors 
error, List<String> groupIds) {
-        DescribeGroupsResponseData describeGroupsResponseData = new 
DescribeGroupsResponseData();
-        describeGroupsResponseData.setThrottleTimeMs(throttleTimeMs);
-        for (String groupId : groupIds)
-            
describeGroupsResponseData.groups().add(DescribeGroupsResponse.forError(groupId,
 error));
-        return new DescribeGroupsResponse(describeGroupsResponseData);
-    }
-
     public static DescribeGroupsResponse parse(ByteBuffer buffer, short 
version) {
         return new DescribeGroupsResponse(new DescribeGroupsResponseData(new 
ByteBufferAccessor(buffer), version));
     }
diff --git a/core/src/main/scala/kafka/server/KafkaApis.scala 
b/core/src/main/scala/kafka/server/KafkaApis.scala
index 050122dc296..c8be15a58a5 100644
--- a/core/src/main/scala/kafka/server/KafkaApis.scala
+++ b/core/src/main/scala/kafka/server/KafkaApis.scala
@@ -1586,7 +1586,7 @@ class KafkaApis(val requestChannel: RequestChannel,
 
     describeRequest.data.groups.forEach { groupId =>
       if (!authHelper.authorize(request.context, DESCRIBE, GROUP, groupId)) {
-        
describeGroupsResponseData.groups.add(DescribeGroupsResponse.forError(groupId, 
Errors.GROUP_AUTHORIZATION_FAILED))
+        
describeGroupsResponseData.groups.add(DescribeGroupsResponse.groupError(groupId,
 Errors.GROUP_AUTHORIZATION_FAILED))
       } else {
         val (error, summary) = groupCoordinator.handleDescribeGroup(groupId)
         val members = summary.members.map { member =>

Reply via email to