This is an automated email from the ASF dual-hosted git repository.
dajac pushed a commit to branch trunk
in repository https://gitbox.apache.org/repos/asf/kafka.git
The following commit(s) were added to refs/heads/trunk by this push:
new f9a09fdd294 MINOR: Small refactor in DescribeGroupsResponse (#12970)
f9a09fdd294 is described below
commit f9a09fdd2948a3b29af9629cc993683f7e7fc66d
Author: David Jacot <[email protected]>
AuthorDate: Fri Dec 9 14:15:53 2022 +0100
MINOR: Small refactor in DescribeGroupsResponse (#12970)
This patch does a few cleanups:
* It removes `DescribeGroupsResponse.fromError` and pushes its logic to
`DescribeGroupsRequest.getErrorResponse` to be consistent with how we
implemented the other requests/responses.
* It renames `DescribedGroup.forError` to `DescribedGroup.groupError`.
The patch relies on existing tests.
Reviewers: Mickael Maison <[email protected]>
---
.../common/requests/DescribeGroupsRequest.java | 18 ++++++++++-----
.../common/requests/DescribeGroupsResponse.java | 26 ++++++++--------------
core/src/main/scala/kafka/server/KafkaApis.scala | 2 +-
3 files changed, 22 insertions(+), 24 deletions(-)
diff --git
a/clients/src/main/java/org/apache/kafka/common/requests/DescribeGroupsRequest.java
b/clients/src/main/java/org/apache/kafka/common/requests/DescribeGroupsRequest.java
index eff5bb9fff1..25afb432cb7 100644
---
a/clients/src/main/java/org/apache/kafka/common/requests/DescribeGroupsRequest.java
+++
b/clients/src/main/java/org/apache/kafka/common/requests/DescribeGroupsRequest.java
@@ -17,14 +17,13 @@
package org.apache.kafka.common.requests;
import org.apache.kafka.common.message.DescribeGroupsRequestData;
+import org.apache.kafka.common.message.DescribeGroupsResponseData;
import org.apache.kafka.common.protocol.ApiKeys;
import org.apache.kafka.common.protocol.ByteBufferAccessor;
import org.apache.kafka.common.protocol.Errors;
import java.nio.ByteBuffer;
-import static
org.apache.kafka.common.requests.AbstractResponse.DEFAULT_THROTTLE_TIME;
-
public class DescribeGroupsRequest extends AbstractRequest {
public static class Builder extends
AbstractRequest.Builder<DescribeGroupsRequest> {
private final DescribeGroupsRequestData data;
@@ -59,11 +58,18 @@ public class DescribeGroupsRequest extends AbstractRequest {
@Override
public AbstractResponse getErrorResponse(int throttleTimeMs, Throwable e) {
- if (version() == 0) {
- return DescribeGroupsResponse.fromError(DEFAULT_THROTTLE_TIME,
Errors.forException(e), data.groups());
- } else {
- return DescribeGroupsResponse.fromError(throttleTimeMs,
Errors.forException(e), data.groups());
+ Errors error = Errors.forException(e);
+ DescribeGroupsResponseData describeGroupsResponseData = new
DescribeGroupsResponseData();
+
+ data.groups().forEach(groupId ->
+
describeGroupsResponseData.groups().add(DescribeGroupsResponse.groupError(groupId,
error))
+ );
+
+ if (version() >= 1) {
+ describeGroupsResponseData.setThrottleTimeMs(throttleTimeMs);
}
+
+ return new DescribeGroupsResponse(describeGroupsResponseData);
}
public static DescribeGroupsRequest parse(ByteBuffer buffer, short
version) {
diff --git
a/clients/src/main/java/org/apache/kafka/common/requests/DescribeGroupsResponse.java
b/clients/src/main/java/org/apache/kafka/common/requests/DescribeGroupsResponse.java
index 119bedfdb19..bd341e19f9a 100644
---
a/clients/src/main/java/org/apache/kafka/common/requests/DescribeGroupsResponse.java
+++
b/clients/src/main/java/org/apache/kafka/common/requests/DescribeGroupsResponse.java
@@ -93,16 +93,21 @@ public class DescribeGroupsResponse extends
AbstractResponse {
final String protocolType,
final String protocol,
final List<DescribedGroupMember> members,
- final int authorizedOperations) {
- DescribedGroup groupMetadata = new DescribedGroup();
- groupMetadata.setGroupId(groupId)
+ final int authorizedOperations
+ ) {
+ return new DescribedGroup()
+ .setGroupId(groupId)
.setErrorCode(error.code())
.setGroupState(state)
.setProtocolType(protocolType)
.setProtocolData(protocol)
.setMembers(members)
.setAuthorizedOperations(authorizedOperations);
- return groupMetadata;
+ }
+
+ public static DescribedGroup groupError(String groupId, Errors error) {
+ return groupMetadata(groupId, error,
DescribeGroupsResponse.UNKNOWN_STATE,
DescribeGroupsResponse.UNKNOWN_PROTOCOL_TYPE,
+ DescribeGroupsResponse.UNKNOWN_PROTOCOL, Collections.emptyList(),
AUTHORIZED_OPERATIONS_OMITTED);
}
@Override
@@ -132,19 +137,6 @@ public class DescribeGroupsResponse extends
AbstractResponse {
return errorCounts;
}
- public static DescribedGroup forError(String groupId, Errors error) {
- return groupMetadata(groupId, error,
DescribeGroupsResponse.UNKNOWN_STATE,
DescribeGroupsResponse.UNKNOWN_PROTOCOL_TYPE,
- DescribeGroupsResponse.UNKNOWN_PROTOCOL,
Collections.emptyList(), AUTHORIZED_OPERATIONS_OMITTED);
- }
-
- public static DescribeGroupsResponse fromError(int throttleTimeMs, Errors
error, List<String> groupIds) {
- DescribeGroupsResponseData describeGroupsResponseData = new
DescribeGroupsResponseData();
- describeGroupsResponseData.setThrottleTimeMs(throttleTimeMs);
- for (String groupId : groupIds)
-
describeGroupsResponseData.groups().add(DescribeGroupsResponse.forError(groupId,
error));
- return new DescribeGroupsResponse(describeGroupsResponseData);
- }
-
public static DescribeGroupsResponse parse(ByteBuffer buffer, short
version) {
return new DescribeGroupsResponse(new DescribeGroupsResponseData(new
ByteBufferAccessor(buffer), version));
}
diff --git a/core/src/main/scala/kafka/server/KafkaApis.scala
b/core/src/main/scala/kafka/server/KafkaApis.scala
index 050122dc296..c8be15a58a5 100644
--- a/core/src/main/scala/kafka/server/KafkaApis.scala
+++ b/core/src/main/scala/kafka/server/KafkaApis.scala
@@ -1586,7 +1586,7 @@ class KafkaApis(val requestChannel: RequestChannel,
describeRequest.data.groups.forEach { groupId =>
if (!authHelper.authorize(request.context, DESCRIBE, GROUP, groupId)) {
-
describeGroupsResponseData.groups.add(DescribeGroupsResponse.forError(groupId,
Errors.GROUP_AUTHORIZATION_FAILED))
+
describeGroupsResponseData.groups.add(DescribeGroupsResponse.groupError(groupId,
Errors.GROUP_AUTHORIZATION_FAILED))
} else {
val (error, summary) = groupCoordinator.handleDescribeGroup(groupId)
val members = summary.members.map { member =>