dajac commented on code in PR #14124: URL: https://github.com/apache/kafka/pull/14124#discussion_r1311565779
########## clients/src/main/resources/common/message/ConsumerGroupDescribeResponse.json: ########## @@ -0,0 +1,98 @@ +// Licensed to the Apache Software Foundation (ASF) under one or more +// contributor license agreements. See the NOTICE file distributed with +// this work for additional information regarding copyright ownership. +// The ASF licenses this file to You under the Apache License, Version 2.0 +// (the "License"); you may not use this file except in compliance with +// the License. You may obtain a copy of the License at +// +// http://www.apache.org/licenses/LICENSE-2.0 +// +// Unless required by applicable law or agreed to in writing, software +// distributed under the License is distributed on an "AS IS" BASIS, +// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. +// See the License for the specific language governing permissions and +// limitations under the License. + +{ + "apiKey": 69, + "type": "response", + "name": "ConsumerGroupDescribeResponse", + "validVersions": "0", + "flexibleVersions": "0+", + // Supported errors: + // - GROUP_AUTHORIZATION_FAILED (version 0+) + // - NOT_COORDINATOR (version 0+) + // - COORDINATOR_NOT_AVAILABLE (version 0+) + // - COORDINATOR_LOAD_IN_PROGRESS (version 0+) + // - INVALID_REQUEST (version 0+) + // - INVALID_GROUP_ID (version 0+) + // - GROUP_ID_NOT_FOUND (version 0+) + "fields": [ + { "name": "ThrottleTimeMs", "type": "int32", "versions": "0+", + "about": "The duration in milliseconds for which the request was throttled due to a quota violation, or zero if the request did not violate any quota." }, + { "name": "Groups", "type": "[]DescribedGroup", "versions": "0+", + "about": "Each described group.", + "fields": [ + { "name": "ErrorCode", "type": "int16", "versions": "0+", + "about": "The describe error, or 0 if there was no error." }, + { "name": "ErrorMessage", "type": "string", "versions": "0+", "nullableVersions": "0+", "default": "null", + "about": "The top-level error message, or null if there was no error." }, + { "name": "GroupId", "type": "string", "versions": "0+", "entityType": "groupId", + "about": "The group ID string." }, + { "name": "GroupState", "type": "string", "versions": "0+", + "about": "The group state string, or the empty string." }, + { "name": "GroupEpoch", "type": "int32", "versions": "0+", + "about": "The group epoch." }, + { "name": "AssignmentEpoch", "type": "int32", "versions": "0+", + "about": "The assignment epoch." }, + { "name": "AssignorName", "type": "string", "versions": "0+", + "about": "The selected assignor." }, + { "name": "Members", "type": "[]Member", "versions": "0+", + "about": "The members.", + "fields": [ + { "name": "MemberId", "type": "uuid", "versions": "0+", + "about": "The member ID." }, + { "name": "InstanceId", "type": "string", "versions": "0+", "nullableVersions": "0+", "default": "null", + "about": "The member instance ID." }, + { "name": "RackId", "type": "string", "versions": "0+", "nullableVersions": "0+", "default": "null", + "about": "The member rack ID." }, + { "name": "MemberEpoch", "type": "int32", "versions": "0+", + "about": "The current member epoch." }, + { "name": "ClientId", "type": "string", "versions": "0+", + "about": "The client ID." }, + { "name": "ClientHost", "type": "string", "versions": "0+", + "about": "The client host." }, + { "name": "SubscribedTopicNames", "type": "[]string", "versions": "0+", + "about": "The subscribed topic names." }, + { "name": "SubscribedTopicRegex", "type": "string", "versions": "0+", "nullableVersions": "0+", "default": "null", + "about": "the subscribed topic regex otherwise or null of not provided." }, + { "name": "Assignment", "type": "Assignment", "versions": "0+", + "about": "The current assignment." }, + { "name": "TargetAssignment", "type": "Assignment", "versions": "0+", + "about": "The target assignment." } + ]}, + { "name": "AuthorizedOperations", "type": "int32", "versions": "3+", "default": "-2147483648", + "about": "32-bit bitfield to represent authorized operations for this group." } + ]} Review Comment: nit: ditto. ########## clients/src/main/java/org/apache/kafka/common/requests/ConsumerGroupDescribeResponse.java: ########## @@ -0,0 +1,76 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ +package org.apache.kafka.common.requests; + +import org.apache.kafka.common.message.ConsumerGroupDescribeResponseData; +import org.apache.kafka.common.protocol.ApiKeys; +import org.apache.kafka.common.protocol.ByteBufferAccessor; +import org.apache.kafka.common.protocol.Errors; + +import java.nio.ByteBuffer; +import java.util.HashMap; +import java.util.Map; + +/** + * Possible error codes. + * + * - {@link Errors#GROUP_AUTHORIZATION_FAILED} + * - {@link Errors#NOT_COORDINATOR} + * - {@link Errors#COORDINATOR_NOT_AVAILABLE} + * - {@link Errors#COORDINATOR_LOAD_IN_PROGRESS} + * - {@link Errors#INVALID_REQUEST} + * - {@link Errors#INVALID_GROUP_ID} + * - {@link Errors#GROUP_ID_NOT_FOUND} + */ +public class ConsumerGroupDescribeResponse extends AbstractResponse { + + private final ConsumerGroupDescribeResponseData data; + protected ConsumerGroupDescribeResponse(ConsumerGroupDescribeResponseData data) { Review Comment: nit: Could we add an empty line before this one? I think that we generally make the constructor public in responses. ########## clients/src/main/resources/common/message/ConsumerGroupDescribeResponse.json: ########## @@ -0,0 +1,98 @@ +// Licensed to the Apache Software Foundation (ASF) under one or more +// contributor license agreements. See the NOTICE file distributed with +// this work for additional information regarding copyright ownership. +// The ASF licenses this file to You under the Apache License, Version 2.0 +// (the "License"); you may not use this file except in compliance with +// the License. You may obtain a copy of the License at +// +// http://www.apache.org/licenses/LICENSE-2.0 +// +// Unless required by applicable law or agreed to in writing, software +// distributed under the License is distributed on an "AS IS" BASIS, +// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. +// See the License for the specific language governing permissions and +// limitations under the License. + +{ + "apiKey": 69, + "type": "response", + "name": "ConsumerGroupDescribeResponse", + "validVersions": "0", + "flexibleVersions": "0+", + // Supported errors: + // - GROUP_AUTHORIZATION_FAILED (version 0+) + // - NOT_COORDINATOR (version 0+) + // - COORDINATOR_NOT_AVAILABLE (version 0+) + // - COORDINATOR_LOAD_IN_PROGRESS (version 0+) + // - INVALID_REQUEST (version 0+) + // - INVALID_GROUP_ID (version 0+) + // - GROUP_ID_NOT_FOUND (version 0+) + "fields": [ + { "name": "ThrottleTimeMs", "type": "int32", "versions": "0+", + "about": "The duration in milliseconds for which the request was throttled due to a quota violation, or zero if the request did not violate any quota." }, + { "name": "Groups", "type": "[]DescribedGroup", "versions": "0+", + "about": "Each described group.", + "fields": [ + { "name": "ErrorCode", "type": "int16", "versions": "0+", + "about": "The describe error, or 0 if there was no error." }, + { "name": "ErrorMessage", "type": "string", "versions": "0+", "nullableVersions": "0+", "default": "null", + "about": "The top-level error message, or null if there was no error." }, + { "name": "GroupId", "type": "string", "versions": "0+", "entityType": "groupId", + "about": "The group ID string." }, + { "name": "GroupState", "type": "string", "versions": "0+", + "about": "The group state string, or the empty string." }, + { "name": "GroupEpoch", "type": "int32", "versions": "0+", + "about": "The group epoch." }, + { "name": "AssignmentEpoch", "type": "int32", "versions": "0+", + "about": "The assignment epoch." }, + { "name": "AssignorName", "type": "string", "versions": "0+", + "about": "The selected assignor." }, + { "name": "Members", "type": "[]Member", "versions": "0+", + "about": "The members.", + "fields": [ + { "name": "MemberId", "type": "uuid", "versions": "0+", + "about": "The member ID." }, + { "name": "InstanceId", "type": "string", "versions": "0+", "nullableVersions": "0+", "default": "null", + "about": "The member instance ID." }, + { "name": "RackId", "type": "string", "versions": "0+", "nullableVersions": "0+", "default": "null", + "about": "The member rack ID." }, + { "name": "MemberEpoch", "type": "int32", "versions": "0+", + "about": "The current member epoch." }, + { "name": "ClientId", "type": "string", "versions": "0+", + "about": "The client ID." }, + { "name": "ClientHost", "type": "string", "versions": "0+", + "about": "The client host." }, + { "name": "SubscribedTopicNames", "type": "[]string", "versions": "0+", + "about": "The subscribed topic names." }, + { "name": "SubscribedTopicRegex", "type": "string", "versions": "0+", "nullableVersions": "0+", "default": "null", + "about": "the subscribed topic regex otherwise or null of not provided." }, + { "name": "Assignment", "type": "Assignment", "versions": "0+", + "about": "The current assignment." }, + { "name": "TargetAssignment", "type": "Assignment", "versions": "0+", + "about": "The target assignment." } + ]}, + { "name": "AuthorizedOperations", "type": "int32", "versions": "3+", "default": "-2147483648", + "about": "32-bit bitfield to represent authorized operations for this group." } + ]} + ], + "commonStructs": [ + { "name": "TopicPartitions", "versions": "0+", "fields": [ + { "name": "TopicId", "type": "uuid", "versions": "0+", + "about": "The topic ID." }, + { "name": "TopicName", "type": "string", "versions": "0+", + "about": "The topic name."}, Review Comment: nit: There is an extra space after `:` and a missing space before `}`. ########## core/src/main/scala/kafka/server/KafkaApis.scala: ########## @@ -3661,6 +3662,13 @@ class KafkaApis(val requestChannel: RequestChannel, } } + def handleConsumerGroupDescribe(request: RequestChannel.Request): CompletableFuture[Unit] = { + // TODO: This method is Work-In-Progress + // It will be finished in the second PR of KAFKA-14509 Review Comment: nit: We usually try to avoid leaving TODOs in the code. I think that we can remove it as we know that we will do this in the second part of this JIRA. ########## clients/src/main/java/org/apache/kafka/common/requests/ConsumerGroupDescribeResponse.java: ########## @@ -0,0 +1,76 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ +package org.apache.kafka.common.requests; + +import org.apache.kafka.common.message.ConsumerGroupDescribeResponseData; +import org.apache.kafka.common.protocol.ApiKeys; +import org.apache.kafka.common.protocol.ByteBufferAccessor; +import org.apache.kafka.common.protocol.Errors; + +import java.nio.ByteBuffer; +import java.util.HashMap; +import java.util.Map; + +/** + * Possible error codes. + * + * - {@link Errors#GROUP_AUTHORIZATION_FAILED} + * - {@link Errors#NOT_COORDINATOR} + * - {@link Errors#COORDINATOR_NOT_AVAILABLE} + * - {@link Errors#COORDINATOR_LOAD_IN_PROGRESS} + * - {@link Errors#INVALID_REQUEST} + * - {@link Errors#INVALID_GROUP_ID} + * - {@link Errors#GROUP_ID_NOT_FOUND} + */ +public class ConsumerGroupDescribeResponse extends AbstractResponse { + + private final ConsumerGroupDescribeResponseData data; + protected ConsumerGroupDescribeResponse(ConsumerGroupDescribeResponseData data) { + super(ApiKeys.CONSUMER_GROUP_DESCRIBE); + this.data = data; + } + + @Override + public ConsumerGroupDescribeResponseData data() { + return data; + } + + @Override + public Map<Errors, Integer> errorCounts() { Review Comment: Should we add a unit test for this method? ########## clients/src/test/java/org/apache/kafka/common/requests/RequestResponseTest.java: ########## @@ -1132,10 +1135,24 @@ private AbstractResponse getResponse(ApiKeys apikey, short version) { case LIST_TRANSACTIONS: return createListTransactionsResponse(); case ALLOCATE_PRODUCER_IDS: return createAllocateProducerIdsResponse(); case CONSUMER_GROUP_HEARTBEAT: return createConsumerGroupHeartbeatResponse(); + case CONSUMER_GROUP_DESCRIBE: return createConsumerGroupDescribeResponse(); default: throw new IllegalArgumentException("Unknown API key " + apikey); } } + private ConsumerGroupDescribeRequest createConsumerGroupDescribeRequest(short version) { + ConsumerGroupDescribeRequestData data = new ConsumerGroupDescribeRequestData() + .setGroupIds(Arrays.asList("group")) + .setIncludeAuthorizedOperations(false); + return new ConsumerGroupDescribeRequest.Builder(data).build(version); + } + + private ConsumerGroupDescribeResponse createConsumerGroupDescribeResponse() { + ConsumerGroupDescribeResponseData data = new ConsumerGroupDescribeResponseData() + .setThrottleTimeMs(1000); Review Comment: Should we put at least one group in the response? ########## clients/src/main/resources/common/message/ConsumerGroupDescribeRequest.json: ########## @@ -0,0 +1,33 @@ +// Licensed to the Apache Software Foundation (ASF) under one or more +// contributor license agreements. See the NOTICE file distributed with +// this work for additional information regarding copyright ownership. +// The ASF licenses this file to You under the Apache License, Version 2.0 +// (the "License"); you may not use this file except in compliance with +// the License. You may obtain a copy of the License at +// +// http://www.apache.org/licenses/LICENSE-2.0 +// +// Unless required by applicable law or agreed to in writing, software +// distributed under the License is distributed on an "AS IS" BASIS, +// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. +// See the License for the specific language governing permissions and +// limitations under the License. + +{ + "apiKey": 69, + "type": "request", + "listeners": ["zkBroker", "broker"], + "name": "ConsumerGroupDescribeRequest", + "validVersions": "0", + // The ConsumerGroupDescribe API is added as part of KIP-848 and is still + // under development. Hence, the API is not exposed by default by brokers + // unless explicitly enabled. + "latestVersionUnstable": true, + "flexibleVersions": "0+", + "fields": [ + { "name": "GroupIds", "type": "[]string", "versions": "0+", "entityType": "groupId", + "about": "The names of the groups to describe" }, Review Comment: nit: Should we use `ids` instead of `names`? ########## clients/src/main/resources/common/message/ConsumerGroupDescribeResponse.json: ########## @@ -0,0 +1,98 @@ +// Licensed to the Apache Software Foundation (ASF) under one or more +// contributor license agreements. See the NOTICE file distributed with +// this work for additional information regarding copyright ownership. +// The ASF licenses this file to You under the Apache License, Version 2.0 +// (the "License"); you may not use this file except in compliance with +// the License. You may obtain a copy of the License at +// +// http://www.apache.org/licenses/LICENSE-2.0 +// +// Unless required by applicable law or agreed to in writing, software +// distributed under the License is distributed on an "AS IS" BASIS, +// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. +// See the License for the specific language governing permissions and +// limitations under the License. + +{ + "apiKey": 69, + "type": "response", + "name": "ConsumerGroupDescribeResponse", + "validVersions": "0", + "flexibleVersions": "0+", + // Supported errors: + // - GROUP_AUTHORIZATION_FAILED (version 0+) + // - NOT_COORDINATOR (version 0+) + // - COORDINATOR_NOT_AVAILABLE (version 0+) + // - COORDINATOR_LOAD_IN_PROGRESS (version 0+) + // - INVALID_REQUEST (version 0+) + // - INVALID_GROUP_ID (version 0+) + // - GROUP_ID_NOT_FOUND (version 0+) + "fields": [ + { "name": "ThrottleTimeMs", "type": "int32", "versions": "0+", + "about": "The duration in milliseconds for which the request was throttled due to a quota violation, or zero if the request did not violate any quota." }, + { "name": "Groups", "type": "[]DescribedGroup", "versions": "0+", + "about": "Each described group.", + "fields": [ + { "name": "ErrorCode", "type": "int16", "versions": "0+", + "about": "The describe error, or 0 if there was no error." }, + { "name": "ErrorMessage", "type": "string", "versions": "0+", "nullableVersions": "0+", "default": "null", + "about": "The top-level error message, or null if there was no error." }, + { "name": "GroupId", "type": "string", "versions": "0+", "entityType": "groupId", + "about": "The group ID string." }, + { "name": "GroupState", "type": "string", "versions": "0+", + "about": "The group state string, or the empty string." }, + { "name": "GroupEpoch", "type": "int32", "versions": "0+", + "about": "The group epoch." }, + { "name": "AssignmentEpoch", "type": "int32", "versions": "0+", + "about": "The assignment epoch." }, + { "name": "AssignorName", "type": "string", "versions": "0+", + "about": "The selected assignor." }, + { "name": "Members", "type": "[]Member", "versions": "0+", + "about": "The members.", + "fields": [ + { "name": "MemberId", "type": "uuid", "versions": "0+", + "about": "The member ID." }, + { "name": "InstanceId", "type": "string", "versions": "0+", "nullableVersions": "0+", "default": "null", + "about": "The member instance ID." }, + { "name": "RackId", "type": "string", "versions": "0+", "nullableVersions": "0+", "default": "null", + "about": "The member rack ID." }, + { "name": "MemberEpoch", "type": "int32", "versions": "0+", + "about": "The current member epoch." }, + { "name": "ClientId", "type": "string", "versions": "0+", + "about": "The client ID." }, + { "name": "ClientHost", "type": "string", "versions": "0+", + "about": "The client host." }, + { "name": "SubscribedTopicNames", "type": "[]string", "versions": "0+", + "about": "The subscribed topic names." }, + { "name": "SubscribedTopicRegex", "type": "string", "versions": "0+", "nullableVersions": "0+", "default": "null", + "about": "the subscribed topic regex otherwise or null of not provided." }, + { "name": "Assignment", "type": "Assignment", "versions": "0+", + "about": "The current assignment." }, + { "name": "TargetAssignment", "type": "Assignment", "versions": "0+", + "about": "The target assignment." } + ]}, + { "name": "AuthorizedOperations", "type": "int32", "versions": "3+", "default": "-2147483648", + "about": "32-bit bitfield to represent authorized operations for this group." } + ]} + ], + "commonStructs": [ + { "name": "TopicPartitions", "versions": "0+", "fields": [ + { "name": "TopicId", "type": "uuid", "versions": "0+", + "about": "The topic ID." }, + { "name": "TopicName", "type": "string", "versions": "0+", + "about": "The topic name."}, + { "name": "Partitions", "type": "[]int32", "versions": "0+", + "about": "The partitions." } + ]}, + { "name": "Assignment", "versions": "0+", "fields": [ + { "name": "TopicPartitions", "type": "[]TopicPartitions", "versions": "0+", + "about": "The assigned topic-partitions to the member." }, Review Comment: nit: There is an extra space. ########## core/src/main/scala/kafka/server/KafkaApis.scala: ########## @@ -3661,6 +3662,13 @@ class KafkaApis(val requestChannel: RequestChannel, } } + def handleConsumerGroupDescribe(request: RequestChannel.Request): CompletableFuture[Unit] = { Review Comment: Should we add a unit test in KafkaApisTest to ensure that the basic plumbing works? ########## clients/src/main/resources/common/message/ConsumerGroupDescribeResponse.json: ########## @@ -0,0 +1,98 @@ +// Licensed to the Apache Software Foundation (ASF) under one or more +// contributor license agreements. See the NOTICE file distributed with +// this work for additional information regarding copyright ownership. +// The ASF licenses this file to You under the Apache License, Version 2.0 +// (the "License"); you may not use this file except in compliance with +// the License. You may obtain a copy of the License at +// +// http://www.apache.org/licenses/LICENSE-2.0 +// +// Unless required by applicable law or agreed to in writing, software +// distributed under the License is distributed on an "AS IS" BASIS, +// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. +// See the License for the specific language governing permissions and +// limitations under the License. + +{ + "apiKey": 69, + "type": "response", + "name": "ConsumerGroupDescribeResponse", + "validVersions": "0", + "flexibleVersions": "0+", + // Supported errors: + // - GROUP_AUTHORIZATION_FAILED (version 0+) + // - NOT_COORDINATOR (version 0+) + // - COORDINATOR_NOT_AVAILABLE (version 0+) + // - COORDINATOR_LOAD_IN_PROGRESS (version 0+) + // - INVALID_REQUEST (version 0+) + // - INVALID_GROUP_ID (version 0+) + // - GROUP_ID_NOT_FOUND (version 0+) + "fields": [ + { "name": "ThrottleTimeMs", "type": "int32", "versions": "0+", + "about": "The duration in milliseconds for which the request was throttled due to a quota violation, or zero if the request did not violate any quota." }, + { "name": "Groups", "type": "[]DescribedGroup", "versions": "0+", + "about": "Each described group.", + "fields": [ + { "name": "ErrorCode", "type": "int16", "versions": "0+", + "about": "The describe error, or 0 if there was no error." }, + { "name": "ErrorMessage", "type": "string", "versions": "0+", "nullableVersions": "0+", "default": "null", + "about": "The top-level error message, or null if there was no error." }, + { "name": "GroupId", "type": "string", "versions": "0+", "entityType": "groupId", + "about": "The group ID string." }, + { "name": "GroupState", "type": "string", "versions": "0+", + "about": "The group state string, or the empty string." }, + { "name": "GroupEpoch", "type": "int32", "versions": "0+", + "about": "The group epoch." }, + { "name": "AssignmentEpoch", "type": "int32", "versions": "0+", + "about": "The assignment epoch." }, + { "name": "AssignorName", "type": "string", "versions": "0+", + "about": "The selected assignor." }, + { "name": "Members", "type": "[]Member", "versions": "0+", + "about": "The members.", + "fields": [ + { "name": "MemberId", "type": "uuid", "versions": "0+", + "about": "The member ID." }, + { "name": "InstanceId", "type": "string", "versions": "0+", "nullableVersions": "0+", "default": "null", + "about": "The member instance ID." }, + { "name": "RackId", "type": "string", "versions": "0+", "nullableVersions": "0+", "default": "null", + "about": "The member rack ID." }, + { "name": "MemberEpoch", "type": "int32", "versions": "0+", + "about": "The current member epoch." }, + { "name": "ClientId", "type": "string", "versions": "0+", + "about": "The client ID." }, + { "name": "ClientHost", "type": "string", "versions": "0+", + "about": "The client host." }, + { "name": "SubscribedTopicNames", "type": "[]string", "versions": "0+", + "about": "The subscribed topic names." }, + { "name": "SubscribedTopicRegex", "type": "string", "versions": "0+", "nullableVersions": "0+", "default": "null", + "about": "the subscribed topic regex otherwise or null of not provided." }, + { "name": "Assignment", "type": "Assignment", "versions": "0+", + "about": "The current assignment." }, + { "name": "TargetAssignment", "type": "Assignment", "versions": "0+", + "about": "The target assignment." } + ]}, Review Comment: nit: Should we align this on `fields`? ########## clients/src/main/java/org/apache/kafka/common/requests/ConsumerGroupDescribeRequest.java: ########## @@ -0,0 +1,86 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ +package org.apache.kafka.common.requests; + +import org.apache.kafka.common.message.ConsumerGroupDescribeRequestData; +import org.apache.kafka.common.message.ConsumerGroupDescribeResponseData; +import org.apache.kafka.common.protocol.ApiKeys; +import org.apache.kafka.common.protocol.ByteBufferAccessor; +import org.apache.kafka.common.protocol.Errors; + +import java.nio.ByteBuffer; + +public class ConsumerGroupDescribeRequest extends AbstractRequest { + + public static class Builder extends AbstractRequest.Builder<ConsumerGroupDescribeRequest> { + + private final ConsumerGroupDescribeRequestData data; + + public Builder(ConsumerGroupDescribeRequestData data) { + this(data, false); + } + + public Builder(ConsumerGroupDescribeRequestData data, boolean enableUnstableLastVersion) { + super(ApiKeys.CONSUMER_GROUP_DESCRIBE, enableUnstableLastVersion); + this.data = data; + } + + @Override + public ConsumerGroupDescribeRequest build(short version) { + return new ConsumerGroupDescribeRequest(data, version); + } + + @Override + public String toString() { + return data.toString(); + } + } + + private final ConsumerGroupDescribeRequestData data; + + public ConsumerGroupDescribeRequest(ConsumerGroupDescribeRequestData data, short version) { + super(ApiKeys.CONSUMER_GROUP_DESCRIBE, version); + this.data = data; + } + + @Override + public ConsumerGroupDescribeResponse getErrorResponse(int throttleTimeMs, Throwable e) { Review Comment: Should we add a unit test for this method? -- This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. To unsubscribe, e-mail: jira-unsubscr...@kafka.apache.org For queries about this service, please contact Infrastructure at: us...@infra.apache.org