This is an automated email from the ASF dual-hosted git repository. schofielaj pushed a commit to branch trunk in repository https://gitbox.apache.org/repos/asf/kafka.git
The following commit(s) were added to refs/heads/trunk by this push: new e3e4c179592 Add DescribeShareGroupOffsets API [KIP-932] (#18500) e3e4c179592 is described below commit e3e4c1795927fcec7e9eb7b7e41795d70bb6a855 Author: Sanskar Jhajharia <sjhajha...@confluent.io> AuthorDate: Tue Jan 14 20:03:39 2025 +0530 Add DescribeShareGroupOffsets API [KIP-932] (#18500) Reviewers: Apoorv Mittal <apoorvmitta...@gmail.com>, Andrew Schofield <aschofi...@confluent.io> --- .../org/apache/kafka/common/protocol/ApiKeys.java | 3 +- .../kafka/common/requests/AbstractRequest.java | 2 + .../kafka/common/requests/AbstractResponse.java | 2 + .../requests/DescribeShareGroupOffsetsRequest.java | 89 ++++++++++++++++++++++ .../DescribeShareGroupOffsetsResponse.java | 68 +++++++++++++++++ .../message/DescribeShareGroupOffsetsRequest.json | 35 +++++++++ .../message/DescribeShareGroupOffsetsResponse.json | 54 +++++++++++++ .../kafka/common/requests/RequestResponseTest.java | 26 +++++++ core/src/main/scala/kafka/server/KafkaApis.scala | 8 ++ .../scala/unit/kafka/server/RequestQuotaTest.scala | 3 + docs/security.html | 6 ++ .../apache/kafka/network/RequestConvertToJson.java | 32 +++++--- 12 files changed, 315 insertions(+), 13 deletions(-) diff --git a/clients/src/main/java/org/apache/kafka/common/protocol/ApiKeys.java b/clients/src/main/java/org/apache/kafka/common/protocol/ApiKeys.java index ed805c941e8..9863aad4022 100644 --- a/clients/src/main/java/org/apache/kafka/common/protocol/ApiKeys.java +++ b/clients/src/main/java/org/apache/kafka/common/protocol/ApiKeys.java @@ -132,7 +132,8 @@ public enum ApiKeys { DELETE_SHARE_GROUP_STATE(ApiMessageType.DELETE_SHARE_GROUP_STATE, true), READ_SHARE_GROUP_STATE_SUMMARY(ApiMessageType.READ_SHARE_GROUP_STATE_SUMMARY, true), STREAMS_GROUP_HEARTBEAT(ApiMessageType.STREAMS_GROUP_HEARTBEAT), - STREAMS_GROUP_DESCRIBE(ApiMessageType.STREAMS_GROUP_DESCRIBE); + STREAMS_GROUP_DESCRIBE(ApiMessageType.STREAMS_GROUP_DESCRIBE), + DESCRIBE_SHARE_GROUP_OFFSETS(ApiMessageType.DESCRIBE_SHARE_GROUP_OFFSETS); private static final Map<ApiMessageType.ListenerType, EnumSet<ApiKeys>> APIS_BY_LISTENER = diff --git a/clients/src/main/java/org/apache/kafka/common/requests/AbstractRequest.java b/clients/src/main/java/org/apache/kafka/common/requests/AbstractRequest.java index c16c71903d6..f2a7c5ee63c 100644 --- a/clients/src/main/java/org/apache/kafka/common/requests/AbstractRequest.java +++ b/clients/src/main/java/org/apache/kafka/common/requests/AbstractRequest.java @@ -354,6 +354,8 @@ public abstract class AbstractRequest implements AbstractRequestResponse { return StreamsGroupHeartbeatRequest.parse(buffer, apiVersion); case STREAMS_GROUP_DESCRIBE: return StreamsGroupDescribeRequest.parse(buffer, apiVersion); + case DESCRIBE_SHARE_GROUP_OFFSETS: + return DescribeShareGroupOffsetsRequest.parse(buffer, apiVersion); default: throw new AssertionError(String.format("ApiKey %s is not currently handled in `parseRequest`, the " + "code should be updated to do so.", apiKey)); diff --git a/clients/src/main/java/org/apache/kafka/common/requests/AbstractResponse.java b/clients/src/main/java/org/apache/kafka/common/requests/AbstractResponse.java index 8f7b12d4fa4..8f344cb718e 100644 --- a/clients/src/main/java/org/apache/kafka/common/requests/AbstractResponse.java +++ b/clients/src/main/java/org/apache/kafka/common/requests/AbstractResponse.java @@ -291,6 +291,8 @@ public abstract class AbstractResponse implements AbstractRequestResponse { return StreamsGroupHeartbeatResponse.parse(responseBuffer, version); case STREAMS_GROUP_DESCRIBE: return StreamsGroupDescribeResponse.parse(responseBuffer, version); + case DESCRIBE_SHARE_GROUP_OFFSETS: + return DescribeShareGroupOffsetsResponse.parse(responseBuffer, version); default: throw new AssertionError(String.format("ApiKey %s is not currently handled in `parseResponse`, the " + "code should be updated to do so.", apiKey)); diff --git a/clients/src/main/java/org/apache/kafka/common/requests/DescribeShareGroupOffsetsRequest.java b/clients/src/main/java/org/apache/kafka/common/requests/DescribeShareGroupOffsetsRequest.java new file mode 100644 index 00000000000..072b16e9443 --- /dev/null +++ b/clients/src/main/java/org/apache/kafka/common/requests/DescribeShareGroupOffsetsRequest.java @@ -0,0 +1,89 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.kafka.common.requests; + +import org.apache.kafka.common.message.DescribeShareGroupOffsetsRequestData; +import org.apache.kafka.common.message.DescribeShareGroupOffsetsResponseData; +import org.apache.kafka.common.protocol.ApiKeys; +import org.apache.kafka.common.protocol.ByteBufferAccessor; +import org.apache.kafka.common.protocol.Errors; + +import java.nio.ByteBuffer; +import java.util.ArrayList; +import java.util.List; +import java.util.stream.Collectors; + +public class DescribeShareGroupOffsetsRequest extends AbstractRequest { + public static class Builder extends AbstractRequest.Builder<DescribeShareGroupOffsetsRequest> { + + private final DescribeShareGroupOffsetsRequestData data; + + public Builder(DescribeShareGroupOffsetsRequestData data) { + this(data, false); + } + + public Builder(DescribeShareGroupOffsetsRequestData data, boolean enableUnstableLastVersion) { + super(ApiKeys.DESCRIBE_SHARE_GROUP_OFFSETS, enableUnstableLastVersion); + this.data = data; + } + + @Override + public DescribeShareGroupOffsetsRequest build(short version) { + return new DescribeShareGroupOffsetsRequest(data, version); + } + + @Override + public String toString() { + return data.toString(); + } + } + + private final DescribeShareGroupOffsetsRequestData data; + + public DescribeShareGroupOffsetsRequest(DescribeShareGroupOffsetsRequestData data, short version) { + super(ApiKeys.DESCRIBE_SHARE_GROUP_OFFSETS, version); + this.data = data; + } + + @Override + public DescribeShareGroupOffsetsResponse getErrorResponse(int throttleTimeMs, Throwable e) { + List<DescribeShareGroupOffsetsResponseData.DescribeShareGroupOffsetsResponseTopic> results = new ArrayList<>(); + data.topics().forEach( + topicResult -> results.add(new DescribeShareGroupOffsetsResponseData.DescribeShareGroupOffsetsResponseTopic() + .setTopicName(topicResult.topicName()) + .setPartitions(topicResult.partitions().stream() + .map(partitionData -> new DescribeShareGroupOffsetsResponseData.DescribeShareGroupOffsetsResponsePartition() + .setPartitionIndex(partitionData) + .setErrorCode(Errors.forException(e).code())) + .collect(Collectors.toList())))); + return new DescribeShareGroupOffsetsResponse(new DescribeShareGroupOffsetsResponseData() + .setResponses(results)); + } + + @Override + public DescribeShareGroupOffsetsRequestData data() { + return data; + } + + public static DescribeShareGroupOffsetsRequest parse(ByteBuffer buffer, short version) { + return new DescribeShareGroupOffsetsRequest( + new DescribeShareGroupOffsetsRequestData(new ByteBufferAccessor(buffer), version), + version + ); + } +} diff --git a/clients/src/main/java/org/apache/kafka/common/requests/DescribeShareGroupOffsetsResponse.java b/clients/src/main/java/org/apache/kafka/common/requests/DescribeShareGroupOffsetsResponse.java new file mode 100644 index 00000000000..183cdb14113 --- /dev/null +++ b/clients/src/main/java/org/apache/kafka/common/requests/DescribeShareGroupOffsetsResponse.java @@ -0,0 +1,68 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.kafka.common.requests; + +import org.apache.kafka.common.message.DescribeShareGroupOffsetsResponseData; +import org.apache.kafka.common.protocol.ApiKeys; +import org.apache.kafka.common.protocol.ByteBufferAccessor; +import org.apache.kafka.common.protocol.Errors; + +import java.nio.ByteBuffer; +import java.util.HashMap; +import java.util.Map; + +public class DescribeShareGroupOffsetsResponse extends AbstractResponse { + private final DescribeShareGroupOffsetsResponseData data; + + public DescribeShareGroupOffsetsResponse(DescribeShareGroupOffsetsResponseData data) { + super(ApiKeys.DESCRIBE_SHARE_GROUP_OFFSETS); + this.data = data; + } + + @Override + public DescribeShareGroupOffsetsResponseData data() { + return data; + } + + @Override + public Map<Errors, Integer> errorCounts() { + Map<Errors, Integer> counts = new HashMap<>(); + data.responses().forEach( + result -> result.partitions().forEach( + partitionResult -> updateErrorCounts(counts, Errors.forCode(partitionResult.errorCode())) + ) + ); + return counts; + } + + @Override + public int throttleTimeMs() { + return data.throttleTimeMs(); + } + + @Override + public void maybeSetThrottleTimeMs(int throttleTimeMs) { + data.setThrottleTimeMs(throttleTimeMs); + } + + public static DescribeShareGroupOffsetsResponse parse(ByteBuffer buffer, short version) { + return new DescribeShareGroupOffsetsResponse( + new DescribeShareGroupOffsetsResponseData(new ByteBufferAccessor(buffer), version) + ); + } +} diff --git a/clients/src/main/resources/common/message/DescribeShareGroupOffsetsRequest.json b/clients/src/main/resources/common/message/DescribeShareGroupOffsetsRequest.json new file mode 100644 index 00000000000..04ed6a910dc --- /dev/null +++ b/clients/src/main/resources/common/message/DescribeShareGroupOffsetsRequest.json @@ -0,0 +1,35 @@ +// Licensed to the Apache Software Foundation (ASF) under one or more +// contributor license agreements. See the NOTICE file distributed with +// this work for additional information regarding copyright ownership. +// The ASF licenses this file to You under the Apache License, Version 2.0 +// (the "License"); you may not use this file except in compliance with +// the License. You may obtain a copy of the License at +// +// http://www.apache.org/licenses/LICENSE-2.0 +// +// Unless required by applicable law or agreed to in writing, software +// distributed under the License is distributed on an "AS IS" BASIS, +// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. +// See the License for the specific language governing permissions and +// limitations under the License. + +{ + "apiKey": 90, + "type": "request", + "listeners": ["broker"], + "name": "DescribeShareGroupOffsetsRequest", + "validVersions": "0", + "flexibleVersions": "0+", + "latestVersionUnstable": true, + "fields": [ + { "name": "GroupId", "type": "string", "versions": "0+", "entityType": "groupId", + "about": "The group identifier." }, + { "name": "Topics", "type": "[]DescribeShareGroupOffsetsRequestTopic", "versions": "0+", + "about": "The topics to describe offsets for.", "fields": [ + { "name": "TopicName", "type": "string", "versions": "0+", "entityType": "topicName", + "about": "The topic name." }, + { "name": "Partitions", "type": "[]int32", "versions": "0+", + "about": "The partitions." } + ]} + ] +} diff --git a/clients/src/main/resources/common/message/DescribeShareGroupOffsetsResponse.json b/clients/src/main/resources/common/message/DescribeShareGroupOffsetsResponse.json new file mode 100644 index 00000000000..80a541f1a2f --- /dev/null +++ b/clients/src/main/resources/common/message/DescribeShareGroupOffsetsResponse.json @@ -0,0 +1,54 @@ +// Licensed to the Apache Software Foundation (ASF) under one or more +// contributor license agreements. See the NOTICE file distributed with +// this work for additional information regarding copyright ownership. +// The ASF licenses this file to You under the Apache License, Version 2.0 +// (the "License"); you may not use this file except in compliance with +// the License. You may obtain a copy of the License at +// +// http://www.apache.org/licenses/LICENSE-2.0 +// +// Unless required by applicable law or agreed to in writing, software +// distributed under the License is distributed on an "AS IS" BASIS, +// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. +// See the License for the specific language governing permissions and +// limitations under the License. + + +{ + "apiKey": 90, + "type": "response", + "name": "DescribeShareGroupOffsetsResponse", + "validVersions": "0", + "flexibleVersions": "0+", + // Supported errors: + // - GROUP_AUTHORIZATION_FAILED (version 0+) + // - NOT_COORDINATOR (version 0+) + // - COORDINATOR_NOT_AVAILABLE (version 0+) + // - COORDINATOR_LOAD_IN_PROGRESS (version 0+) + // - GROUP_ID_NOT_FOUND (version 0+) + // - INVALID_REQUEST (version 0+) + // - UNKNOWN_SERVER_ERROR (version 0+) + "fields": [ + { "name": "ThrottleTimeMs", "type": "int32", "versions": "0+", + "about": "The duration in milliseconds for which the request was throttled due to a quota violation, or zero if the request did not violate any quota." }, + { "name": "Responses", "type": "[]DescribeShareGroupOffsetsResponseTopic", "versions": "0+", + "about": "The results for each topic.", "fields": [ + { "name": "TopicName", "type": "string", "versions": "0+", "entityType": "topicName", + "about": "The topic name." }, + { "name": "TopicId", "type": "uuid", "versions": "0+", + "about": "The unique topic ID." }, + { "name": "Partitions", "type": "[]DescribeShareGroupOffsetsResponsePartition", "versions": "0+", "fields": [ + { "name": "PartitionIndex", "type": "int32", "versions": "0+", + "about": "The partition index." }, + { "name": "StartOffset", "type": "int64", "versions": "0+", + "about": "The share-partition start offset." }, + { "name": "LeaderEpoch", "type": "int32", "versions": "0+", + "about": "The leader epoch of the partition." }, + { "name": "ErrorCode", "type": "int16", "versions": "0+", + "about": "The error code, or 0 if there was no error." }, + { "name": "ErrorMessage", "type": "string", "versions": "0+", "nullableVersions": "0+", "default": "null", + "about": "The error message, or null if there was no error." } + ]} + ]} + ] +} \ No newline at end of file diff --git a/clients/src/test/java/org/apache/kafka/common/requests/RequestResponseTest.java b/clients/src/test/java/org/apache/kafka/common/requests/RequestResponseTest.java index c65e75dfa9c..49b3179fa40 100644 --- a/clients/src/test/java/org/apache/kafka/common/requests/RequestResponseTest.java +++ b/clients/src/test/java/org/apache/kafka/common/requests/RequestResponseTest.java @@ -136,6 +136,8 @@ import org.apache.kafka.common.message.DescribeProducersRequestData; import org.apache.kafka.common.message.DescribeProducersResponseData; import org.apache.kafka.common.message.DescribeQuorumRequestData; import org.apache.kafka.common.message.DescribeQuorumResponseData; +import org.apache.kafka.common.message.DescribeShareGroupOffsetsRequestData; +import org.apache.kafka.common.message.DescribeShareGroupOffsetsResponseData; import org.apache.kafka.common.message.DescribeTopicPartitionsRequestData; import org.apache.kafka.common.message.DescribeTopicPartitionsResponseData; import org.apache.kafka.common.message.DescribeTransactionsRequestData; @@ -1074,6 +1076,7 @@ public class RequestResponseTest { case READ_SHARE_GROUP_STATE_SUMMARY: return createReadShareGroupStateSummaryRequest(version); case STREAMS_GROUP_HEARTBEAT: return createStreamsGroupHeartbeatRequest(version); case STREAMS_GROUP_DESCRIBE: return createStreamsGroupDescribeRequest(version); + case DESCRIBE_SHARE_GROUP_OFFSETS: return createDescribeShareGroupOffsetsRequest(version); default: throw new IllegalArgumentException("Unknown API key " + apikey); } } @@ -1170,6 +1173,7 @@ public class RequestResponseTest { case READ_SHARE_GROUP_STATE_SUMMARY: return createReadShareGroupStateSummaryResponse(); case STREAMS_GROUP_HEARTBEAT: return createStreamsGroupHeartbeatResponse(); case STREAMS_GROUP_DESCRIBE: return createStreamsGroupDescribeResponse(); + case DESCRIBE_SHARE_GROUP_OFFSETS: return createDescribeShareGroupOffsetsResponse(); default: throw new IllegalArgumentException("Unknown API key " + apikey); } } @@ -3953,6 +3957,28 @@ public class RequestResponseTest { return new ReadShareGroupStateSummaryResponse(data); } + private DescribeShareGroupOffsetsRequest createDescribeShareGroupOffsetsRequest(short version) { + DescribeShareGroupOffsetsRequestData data = new DescribeShareGroupOffsetsRequestData() + .setGroupId("group") + .setTopics(Collections.singletonList(new DescribeShareGroupOffsetsRequestData.DescribeShareGroupOffsetsRequestTopic() + .setTopicName("topic-1") + .setPartitions(Collections.singletonList(0)))); + return new DescribeShareGroupOffsetsRequest.Builder(data).build(version); + } + + private DescribeShareGroupOffsetsResponse createDescribeShareGroupOffsetsResponse() { + DescribeShareGroupOffsetsResponseData data = new DescribeShareGroupOffsetsResponseData() + .setResponses(Collections.singletonList(new DescribeShareGroupOffsetsResponseData.DescribeShareGroupOffsetsResponseTopic() + .setTopicName("group") + .setTopicId(Uuid.randomUuid()) + .setPartitions(Collections.singletonList(new DescribeShareGroupOffsetsResponseData.DescribeShareGroupOffsetsResponsePartition() + .setPartitionIndex(0) + .setErrorCode(Errors.NONE.code()) + .setStartOffset(0) + .setLeaderEpoch(0))))); + return new DescribeShareGroupOffsetsResponse(data); + } + private AbstractRequest createStreamsGroupDescribeRequest(final short version) { return new StreamsGroupDescribeRequest.Builder(new StreamsGroupDescribeRequestData() .setGroupIds(Collections.singletonList("group")) diff --git a/core/src/main/scala/kafka/server/KafkaApis.scala b/core/src/main/scala/kafka/server/KafkaApis.scala index 085df39c886..c1ed5de10cd 100644 --- a/core/src/main/scala/kafka/server/KafkaApis.scala +++ b/core/src/main/scala/kafka/server/KafkaApis.scala @@ -243,6 +243,7 @@ class KafkaApis(val requestChannel: RequestChannel, case ApiKeys.WRITE_SHARE_GROUP_STATE => handleWriteShareGroupStateRequest(request) case ApiKeys.DELETE_SHARE_GROUP_STATE => handleDeleteShareGroupStateRequest(request) case ApiKeys.READ_SHARE_GROUP_STATE_SUMMARY => handleReadShareGroupStateSummaryRequest(request) + case ApiKeys.DESCRIBE_SHARE_GROUP_OFFSETS => handleDescribeShareGroupOffsetsRequest(request) case _ => throw new IllegalStateException(s"No handler for request api key ${request.header.apiKey}") } } catch { @@ -3520,6 +3521,13 @@ class KafkaApis(val requestChannel: RequestChannel, CompletableFuture.completedFuture[Unit](()) } + def handleDescribeShareGroupOffsetsRequest(request: RequestChannel.Request): Unit = { + val describeShareGroupOffsetsRequest = request.body[DescribeShareGroupOffsetsRequest] + // TODO: Implement the DescribeShareGroupOffsetsRequest handling + requestHelper.sendMaybeThrottle(request, describeShareGroupOffsetsRequest.getErrorResponse(Errors.UNSUPPORTED_VERSION.exception)) + CompletableFuture.completedFuture[Unit](()) + } + // Visible for Testing def getAcknowledgeBatchesFromShareAcknowledgeRequest(shareAcknowledgeRequest: ShareAcknowledgeRequest, topicIdNames: util.Map[Uuid, String], diff --git a/core/src/test/scala/unit/kafka/server/RequestQuotaTest.scala b/core/src/test/scala/unit/kafka/server/RequestQuotaTest.scala index 8585b5dc2c2..122ee1f6d89 100644 --- a/core/src/test/scala/unit/kafka/server/RequestQuotaTest.scala +++ b/core/src/test/scala/unit/kafka/server/RequestQuotaTest.scala @@ -746,6 +746,9 @@ class RequestQuotaTest extends BaseRequestTest { case ApiKeys.STREAMS_GROUP_DESCRIBE => new StreamsGroupDescribeRequest.Builder(new StreamsGroupDescribeRequestData(), true) + case ApiKeys.DESCRIBE_SHARE_GROUP_OFFSETS => + new DescribeShareGroupOffsetsRequest.Builder(new DescribeShareGroupOffsetsRequestData(), true) + case _ => throw new IllegalArgumentException("Unsupported API key " + apiKey) } diff --git a/docs/security.html b/docs/security.html index 7b08458a864..1b82dfc1223 100644 --- a/docs/security.html +++ b/docs/security.html @@ -2265,6 +2265,12 @@ RULE:[n:string](regexp)s/pattern/replacement/g/U</code></pre> <td>Cluster</td> <td></td> </tr> + <tr> + <td>DESCRIBE_SHARE_GROUP_OFFSETS (90)</td> + <td>Read</td> + <td>Group</td> + <td></td> + </tr> </tbody> </table> diff --git a/server/src/main/java/org/apache/kafka/network/RequestConvertToJson.java b/server/src/main/java/org/apache/kafka/network/RequestConvertToJson.java index b4b0cadb16d..d2bda3245f3 100644 --- a/server/src/main/java/org/apache/kafka/network/RequestConvertToJson.java +++ b/server/src/main/java/org/apache/kafka/network/RequestConvertToJson.java @@ -90,6 +90,8 @@ import org.apache.kafka.common.message.DescribeProducersRequestDataJsonConverter import org.apache.kafka.common.message.DescribeProducersResponseDataJsonConverter; import org.apache.kafka.common.message.DescribeQuorumRequestDataJsonConverter; import org.apache.kafka.common.message.DescribeQuorumResponseDataJsonConverter; +import org.apache.kafka.common.message.DescribeShareGroupOffsetsRequestDataJsonConverter; +import org.apache.kafka.common.message.DescribeShareGroupOffsetsResponseDataJsonConverter; import org.apache.kafka.common.message.DescribeTopicPartitionsRequestDataJsonConverter; import org.apache.kafka.common.message.DescribeTopicPartitionsResponseDataJsonConverter; import org.apache.kafka.common.message.DescribeTransactionsRequestDataJsonConverter; @@ -274,6 +276,8 @@ import org.apache.kafka.common.requests.DescribeProducersRequest; import org.apache.kafka.common.requests.DescribeProducersResponse; import org.apache.kafka.common.requests.DescribeQuorumRequest; import org.apache.kafka.common.requests.DescribeQuorumResponse; +import org.apache.kafka.common.requests.DescribeShareGroupOffsetsRequest; +import org.apache.kafka.common.requests.DescribeShareGroupOffsetsResponse; import org.apache.kafka.common.requests.DescribeTopicPartitionsRequest; import org.apache.kafka.common.requests.DescribeTopicPartitionsResponse; import org.apache.kafka.common.requests.DescribeTransactionsRequest; @@ -401,6 +405,8 @@ public class RequestConvertToJson { return AddOffsetsToTxnRequestDataJsonConverter.write(((AddOffsetsToTxnRequest) request).data(), request.version()); case ADD_PARTITIONS_TO_TXN: return AddPartitionsToTxnRequestDataJsonConverter.write(((AddPartitionsToTxnRequest) request).data(), request.version()); + case ADD_RAFT_VOTER: + return AddRaftVoterRequestDataJsonConverter.write(((AddRaftVoterRequest) request).data(), request.version()); case ALLOCATE_PRODUCER_IDS: return AllocateProducerIdsRequestDataJsonConverter.write(((AllocateProducerIdsRequest) request).data(), request.version()); case ALTER_CLIENT_QUOTAS: @@ -469,6 +475,8 @@ public class RequestConvertToJson { return DescribeProducersRequestDataJsonConverter.write(((DescribeProducersRequest) request).data(), request.version()); case DESCRIBE_QUORUM: return DescribeQuorumRequestDataJsonConverter.write(((DescribeQuorumRequest) request).data(), request.version()); + case DESCRIBE_SHARE_GROUP_OFFSETS: + return DescribeShareGroupOffsetsRequestDataJsonConverter.write(((DescribeShareGroupOffsetsRequest) request).data(), request.version()); case DESCRIBE_TOPIC_PARTITIONS: return DescribeTopicPartitionsRequestDataJsonConverter.write(((DescribeTopicPartitionsRequest) request).data(), request.version()); case DESCRIBE_TRANSACTIONS: @@ -535,6 +543,8 @@ public class RequestConvertToJson { return ReadShareGroupStateRequestDataJsonConverter.write(((ReadShareGroupStateRequest) request).data(), request.version()); case READ_SHARE_GROUP_STATE_SUMMARY: return ReadShareGroupStateSummaryRequestDataJsonConverter.write(((ReadShareGroupStateSummaryRequest) request).data(), request.version()); + case REMOVE_RAFT_VOTER: + return RemoveRaftVoterRequestDataJsonConverter.write(((RemoveRaftVoterRequest) request).data(), request.version()); case RENEW_DELEGATION_TOKEN: return RenewDelegationTokenRequestDataJsonConverter.write(((RenewDelegationTokenRequest) request).data(), request.version()); case SASL_AUTHENTICATE: @@ -565,18 +575,14 @@ public class RequestConvertToJson { return UpdateFeaturesRequestDataJsonConverter.write(((UpdateFeaturesRequest) request).data(), request.version()); case UPDATE_METADATA: return UpdateMetadataRequestDataJsonConverter.write(((UpdateMetadataRequest) request).data(), request.version()); + case UPDATE_RAFT_VOTER: + return UpdateRaftVoterRequestDataJsonConverter.write(((UpdateRaftVoterRequest) request).data(), request.version()); case VOTE: return VoteRequestDataJsonConverter.write(((VoteRequest) request).data(), request.version()); case WRITE_SHARE_GROUP_STATE: return WriteShareGroupStateRequestDataJsonConverter.write(((WriteShareGroupStateRequest) request).data(), request.version()); case WRITE_TXN_MARKERS: return WriteTxnMarkersRequestDataJsonConverter.write(((WriteTxnMarkersRequest) request).data(), request.version()); - case ADD_RAFT_VOTER: - return AddRaftVoterRequestDataJsonConverter.write(((AddRaftVoterRequest) request).data(), request.version()); - case REMOVE_RAFT_VOTER: - return RemoveRaftVoterRequestDataJsonConverter.write(((RemoveRaftVoterRequest) request).data(), request.version()); - case UPDATE_RAFT_VOTER: - return UpdateRaftVoterRequestDataJsonConverter.write(((UpdateRaftVoterRequest) request).data(), request.version()); default: throw new IllegalStateException("ApiKey " + request.apiKey() + " is not currently handled in `request`, the " + "code should be updated to do so."); @@ -589,6 +595,8 @@ public class RequestConvertToJson { return AddOffsetsToTxnResponseDataJsonConverter.write(((AddOffsetsToTxnResponse) response).data(), version); case ADD_PARTITIONS_TO_TXN: return AddPartitionsToTxnResponseDataJsonConverter.write(((AddPartitionsToTxnResponse) response).data(), version); + case ADD_RAFT_VOTER: + return AddRaftVoterResponseDataJsonConverter.write(((AddRaftVoterResponse) response).data(), version); case ALLOCATE_PRODUCER_IDS: return AllocateProducerIdsResponseDataJsonConverter.write(((AllocateProducerIdsResponse) response).data(), version); case ALTER_CLIENT_QUOTAS: @@ -657,6 +665,8 @@ public class RequestConvertToJson { return DescribeProducersResponseDataJsonConverter.write(((DescribeProducersResponse) response).data(), version); case DESCRIBE_QUORUM: return DescribeQuorumResponseDataJsonConverter.write(((DescribeQuorumResponse) response).data(), version); + case DESCRIBE_SHARE_GROUP_OFFSETS: + return DescribeShareGroupOffsetsResponseDataJsonConverter.write(((DescribeShareGroupOffsetsResponse) response).data(), version); case DESCRIBE_TOPIC_PARTITIONS: return DescribeTopicPartitionsResponseDataJsonConverter.write(((DescribeTopicPartitionsResponse) response).data(), version); case DESCRIBE_TRANSACTIONS: @@ -723,6 +733,8 @@ public class RequestConvertToJson { return ReadShareGroupStateResponseDataJsonConverter.write(((ReadShareGroupStateResponse) response).data(), version); case READ_SHARE_GROUP_STATE_SUMMARY: return ReadShareGroupStateSummaryResponseDataJsonConverter.write(((ReadShareGroupStateSummaryResponse) response).data(), version); + case REMOVE_RAFT_VOTER: + return RemoveRaftVoterResponseDataJsonConverter.write(((RemoveRaftVoterResponse) response).data(), version); case RENEW_DELEGATION_TOKEN: return RenewDelegationTokenResponseDataJsonConverter.write(((RenewDelegationTokenResponse) response).data(), version); case SASL_AUTHENTICATE: @@ -753,18 +765,14 @@ public class RequestConvertToJson { return UpdateFeaturesResponseDataJsonConverter.write(((UpdateFeaturesResponse) response).data(), version); case UPDATE_METADATA: return UpdateMetadataResponseDataJsonConverter.write(((UpdateMetadataResponse) response).data(), version); + case UPDATE_RAFT_VOTER: + return UpdateRaftVoterResponseDataJsonConverter.write(((UpdateRaftVoterResponse) response).data(), version); case VOTE: return VoteResponseDataJsonConverter.write(((VoteResponse) response).data(), version); case WRITE_SHARE_GROUP_STATE: return WriteShareGroupStateResponseDataJsonConverter.write(((WriteShareGroupStateResponse) response).data(), version); case WRITE_TXN_MARKERS: return WriteTxnMarkersResponseDataJsonConverter.write(((WriteTxnMarkersResponse) response).data(), version); - case ADD_RAFT_VOTER: - return AddRaftVoterResponseDataJsonConverter.write(((AddRaftVoterResponse) response).data(), version); - case REMOVE_RAFT_VOTER: - return RemoveRaftVoterResponseDataJsonConverter.write(((RemoveRaftVoterResponse) response).data(), version); - case UPDATE_RAFT_VOTER: - return UpdateRaftVoterResponseDataJsonConverter.write(((UpdateRaftVoterResponse) response).data(), version); default: throw new IllegalStateException("ApiKey " + response.apiKey() + " is not currently handled in `response`, the " + "code should be updated to do so.");