This is an automated email from the ASF dual-hosted git repository.

schofielaj pushed a commit to branch trunk
in repository https://gitbox.apache.org/repos/asf/kafka.git


The following commit(s) were added to refs/heads/trunk by this push:
     new e3e4c179592 Add DescribeShareGroupOffsets API [KIP-932] (#18500)
e3e4c179592 is described below

commit e3e4c1795927fcec7e9eb7b7e41795d70bb6a855
Author: Sanskar Jhajharia <sjhajha...@confluent.io>
AuthorDate: Tue Jan 14 20:03:39 2025 +0530

    Add DescribeShareGroupOffsets API [KIP-932] (#18500)
    
    Reviewers: Apoorv Mittal <apoorvmitta...@gmail.com>, Andrew Schofield 
<aschofi...@confluent.io>
---
 .../org/apache/kafka/common/protocol/ApiKeys.java  |  3 +-
 .../kafka/common/requests/AbstractRequest.java     |  2 +
 .../kafka/common/requests/AbstractResponse.java    |  2 +
 .../requests/DescribeShareGroupOffsetsRequest.java | 89 ++++++++++++++++++++++
 .../DescribeShareGroupOffsetsResponse.java         | 68 +++++++++++++++++
 .../message/DescribeShareGroupOffsetsRequest.json  | 35 +++++++++
 .../message/DescribeShareGroupOffsetsResponse.json | 54 +++++++++++++
 .../kafka/common/requests/RequestResponseTest.java | 26 +++++++
 core/src/main/scala/kafka/server/KafkaApis.scala   |  8 ++
 .../scala/unit/kafka/server/RequestQuotaTest.scala |  3 +
 docs/security.html                                 |  6 ++
 .../apache/kafka/network/RequestConvertToJson.java | 32 +++++---
 12 files changed, 315 insertions(+), 13 deletions(-)

diff --git 
a/clients/src/main/java/org/apache/kafka/common/protocol/ApiKeys.java 
b/clients/src/main/java/org/apache/kafka/common/protocol/ApiKeys.java
index ed805c941e8..9863aad4022 100644
--- a/clients/src/main/java/org/apache/kafka/common/protocol/ApiKeys.java
+++ b/clients/src/main/java/org/apache/kafka/common/protocol/ApiKeys.java
@@ -132,7 +132,8 @@ public enum ApiKeys {
     DELETE_SHARE_GROUP_STATE(ApiMessageType.DELETE_SHARE_GROUP_STATE, true),
     
READ_SHARE_GROUP_STATE_SUMMARY(ApiMessageType.READ_SHARE_GROUP_STATE_SUMMARY, 
true),
     STREAMS_GROUP_HEARTBEAT(ApiMessageType.STREAMS_GROUP_HEARTBEAT),
-    STREAMS_GROUP_DESCRIBE(ApiMessageType.STREAMS_GROUP_DESCRIBE);
+    STREAMS_GROUP_DESCRIBE(ApiMessageType.STREAMS_GROUP_DESCRIBE),
+    DESCRIBE_SHARE_GROUP_OFFSETS(ApiMessageType.DESCRIBE_SHARE_GROUP_OFFSETS);
     
 
     private static final Map<ApiMessageType.ListenerType, EnumSet<ApiKeys>> 
APIS_BY_LISTENER =
diff --git 
a/clients/src/main/java/org/apache/kafka/common/requests/AbstractRequest.java 
b/clients/src/main/java/org/apache/kafka/common/requests/AbstractRequest.java
index c16c71903d6..f2a7c5ee63c 100644
--- 
a/clients/src/main/java/org/apache/kafka/common/requests/AbstractRequest.java
+++ 
b/clients/src/main/java/org/apache/kafka/common/requests/AbstractRequest.java
@@ -354,6 +354,8 @@ public abstract class AbstractRequest implements 
AbstractRequestResponse {
                 return StreamsGroupHeartbeatRequest.parse(buffer, apiVersion);
             case STREAMS_GROUP_DESCRIBE:
                 return StreamsGroupDescribeRequest.parse(buffer, apiVersion);
+            case DESCRIBE_SHARE_GROUP_OFFSETS:
+                return DescribeShareGroupOffsetsRequest.parse(buffer, 
apiVersion);
             default:
                 throw new AssertionError(String.format("ApiKey %s is not 
currently handled in `parseRequest`, the " +
                         "code should be updated to do so.", apiKey));
diff --git 
a/clients/src/main/java/org/apache/kafka/common/requests/AbstractResponse.java 
b/clients/src/main/java/org/apache/kafka/common/requests/AbstractResponse.java
index 8f7b12d4fa4..8f344cb718e 100644
--- 
a/clients/src/main/java/org/apache/kafka/common/requests/AbstractResponse.java
+++ 
b/clients/src/main/java/org/apache/kafka/common/requests/AbstractResponse.java
@@ -291,6 +291,8 @@ public abstract class AbstractResponse implements 
AbstractRequestResponse {
                 return StreamsGroupHeartbeatResponse.parse(responseBuffer, 
version);
             case STREAMS_GROUP_DESCRIBE:
                 return StreamsGroupDescribeResponse.parse(responseBuffer, 
version);
+            case DESCRIBE_SHARE_GROUP_OFFSETS:
+                return DescribeShareGroupOffsetsResponse.parse(responseBuffer, 
version);
             default:
                 throw new AssertionError(String.format("ApiKey %s is not 
currently handled in `parseResponse`, the " +
                         "code should be updated to do so.", apiKey));
diff --git 
a/clients/src/main/java/org/apache/kafka/common/requests/DescribeShareGroupOffsetsRequest.java
 
b/clients/src/main/java/org/apache/kafka/common/requests/DescribeShareGroupOffsetsRequest.java
new file mode 100644
index 00000000000..072b16e9443
--- /dev/null
+++ 
b/clients/src/main/java/org/apache/kafka/common/requests/DescribeShareGroupOffsetsRequest.java
@@ -0,0 +1,89 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ *    http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.kafka.common.requests;
+
+import org.apache.kafka.common.message.DescribeShareGroupOffsetsRequestData;
+import org.apache.kafka.common.message.DescribeShareGroupOffsetsResponseData;
+import org.apache.kafka.common.protocol.ApiKeys;
+import org.apache.kafka.common.protocol.ByteBufferAccessor;
+import org.apache.kafka.common.protocol.Errors;
+
+import java.nio.ByteBuffer;
+import java.util.ArrayList;
+import java.util.List;
+import java.util.stream.Collectors;
+
+public class DescribeShareGroupOffsetsRequest extends AbstractRequest {
+    public static class Builder extends 
AbstractRequest.Builder<DescribeShareGroupOffsetsRequest> {
+
+        private final DescribeShareGroupOffsetsRequestData data;
+
+        public Builder(DescribeShareGroupOffsetsRequestData data) {
+            this(data, false);
+        }
+
+        public Builder(DescribeShareGroupOffsetsRequestData data, boolean 
enableUnstableLastVersion) {
+            super(ApiKeys.DESCRIBE_SHARE_GROUP_OFFSETS, 
enableUnstableLastVersion);
+            this.data = data;
+        }
+
+        @Override
+        public DescribeShareGroupOffsetsRequest build(short version) {
+            return new DescribeShareGroupOffsetsRequest(data, version);
+        }
+
+        @Override
+        public String toString() {
+            return data.toString();
+        }
+    }
+
+    private final DescribeShareGroupOffsetsRequestData data;
+
+    public 
DescribeShareGroupOffsetsRequest(DescribeShareGroupOffsetsRequestData data, 
short version) {
+        super(ApiKeys.DESCRIBE_SHARE_GROUP_OFFSETS, version);
+        this.data = data;
+    }
+
+    @Override
+    public DescribeShareGroupOffsetsResponse getErrorResponse(int 
throttleTimeMs, Throwable e) {
+        
List<DescribeShareGroupOffsetsResponseData.DescribeShareGroupOffsetsResponseTopic>
 results = new ArrayList<>();
+        data.topics().forEach(
+                topicResult -> results.add(new 
DescribeShareGroupOffsetsResponseData.DescribeShareGroupOffsetsResponseTopic()
+                        .setTopicName(topicResult.topicName())
+                        .setPartitions(topicResult.partitions().stream()
+                                .map(partitionData -> new 
DescribeShareGroupOffsetsResponseData.DescribeShareGroupOffsetsResponsePartition()
+                                        .setPartitionIndex(partitionData)
+                                        
.setErrorCode(Errors.forException(e).code()))
+                                .collect(Collectors.toList()))));
+        return new DescribeShareGroupOffsetsResponse(new 
DescribeShareGroupOffsetsResponseData()
+                .setResponses(results));
+    }
+
+    @Override
+    public DescribeShareGroupOffsetsRequestData data() {
+        return data;
+    }
+
+    public static DescribeShareGroupOffsetsRequest parse(ByteBuffer buffer, 
short version) {
+        return new DescribeShareGroupOffsetsRequest(
+                new DescribeShareGroupOffsetsRequestData(new 
ByteBufferAccessor(buffer), version),
+                version
+        );
+    }
+}
diff --git 
a/clients/src/main/java/org/apache/kafka/common/requests/DescribeShareGroupOffsetsResponse.java
 
b/clients/src/main/java/org/apache/kafka/common/requests/DescribeShareGroupOffsetsResponse.java
new file mode 100644
index 00000000000..183cdb14113
--- /dev/null
+++ 
b/clients/src/main/java/org/apache/kafka/common/requests/DescribeShareGroupOffsetsResponse.java
@@ -0,0 +1,68 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ *    http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.kafka.common.requests;
+
+import org.apache.kafka.common.message.DescribeShareGroupOffsetsResponseData;
+import org.apache.kafka.common.protocol.ApiKeys;
+import org.apache.kafka.common.protocol.ByteBufferAccessor;
+import org.apache.kafka.common.protocol.Errors;
+
+import java.nio.ByteBuffer;
+import java.util.HashMap;
+import java.util.Map;
+
+public class DescribeShareGroupOffsetsResponse extends AbstractResponse {
+    private final DescribeShareGroupOffsetsResponseData data;
+
+    public 
DescribeShareGroupOffsetsResponse(DescribeShareGroupOffsetsResponseData data) {
+        super(ApiKeys.DESCRIBE_SHARE_GROUP_OFFSETS);
+        this.data = data;
+    }
+
+    @Override
+    public DescribeShareGroupOffsetsResponseData data() {
+        return data;
+    }
+
+    @Override
+    public Map<Errors, Integer> errorCounts() {
+        Map<Errors, Integer> counts = new HashMap<>();
+        data.responses().forEach(
+                result -> result.partitions().forEach(
+                        partitionResult -> updateErrorCounts(counts, 
Errors.forCode(partitionResult.errorCode()))
+                )
+        );
+        return counts;
+    }
+
+    @Override
+    public int throttleTimeMs() {
+        return data.throttleTimeMs();
+    }
+
+    @Override
+    public void maybeSetThrottleTimeMs(int throttleTimeMs) {
+        data.setThrottleTimeMs(throttleTimeMs);
+    }
+
+    public static DescribeShareGroupOffsetsResponse parse(ByteBuffer buffer, 
short version) {
+        return new DescribeShareGroupOffsetsResponse(
+                new DescribeShareGroupOffsetsResponseData(new 
ByteBufferAccessor(buffer), version)
+        );
+    }
+}
diff --git 
a/clients/src/main/resources/common/message/DescribeShareGroupOffsetsRequest.json
 
b/clients/src/main/resources/common/message/DescribeShareGroupOffsetsRequest.json
new file mode 100644
index 00000000000..04ed6a910dc
--- /dev/null
+++ 
b/clients/src/main/resources/common/message/DescribeShareGroupOffsetsRequest.json
@@ -0,0 +1,35 @@
+// Licensed to the Apache Software Foundation (ASF) under one or more
+// contributor license agreements.  See the NOTICE file distributed with
+// this work for additional information regarding copyright ownership.
+// The ASF licenses this file to You under the Apache License, Version 2.0
+// (the "License"); you may not use this file except in compliance with
+// the License.  You may obtain a copy of the License at
+//
+//    http://www.apache.org/licenses/LICENSE-2.0
+//
+// Unless required by applicable law or agreed to in writing, software
+// distributed under the License is distributed on an "AS IS" BASIS,
+// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+// See the License for the specific language governing permissions and
+// limitations under the License.
+
+{
+  "apiKey": 90,
+  "type": "request",
+  "listeners": ["broker"],
+  "name": "DescribeShareGroupOffsetsRequest",
+  "validVersions": "0",
+  "flexibleVersions": "0+",
+  "latestVersionUnstable": true,
+  "fields": [
+    { "name": "GroupId", "type": "string", "versions": "0+", "entityType": 
"groupId",
+      "about": "The group identifier." },
+    { "name": "Topics", "type": "[]DescribeShareGroupOffsetsRequestTopic", 
"versions": "0+",
+      "about": "The topics to describe offsets for.",  "fields": [
+      { "name": "TopicName", "type": "string", "versions": "0+", "entityType": 
"topicName",
+        "about": "The topic name." },
+      { "name": "Partitions", "type": "[]int32", "versions": "0+",
+        "about": "The partitions." }
+    ]}
+  ]
+}
diff --git 
a/clients/src/main/resources/common/message/DescribeShareGroupOffsetsResponse.json
 
b/clients/src/main/resources/common/message/DescribeShareGroupOffsetsResponse.json
new file mode 100644
index 00000000000..80a541f1a2f
--- /dev/null
+++ 
b/clients/src/main/resources/common/message/DescribeShareGroupOffsetsResponse.json
@@ -0,0 +1,54 @@
+// Licensed to the Apache Software Foundation (ASF) under one or more
+// contributor license agreements.  See the NOTICE file distributed with
+// this work for additional information regarding copyright ownership.
+// The ASF licenses this file to You under the Apache License, Version 2.0
+// (the "License"); you may not use this file except in compliance with
+// the License.  You may obtain a copy of the License at
+//
+//    http://www.apache.org/licenses/LICENSE-2.0
+//
+// Unless required by applicable law or agreed to in writing, software
+// distributed under the License is distributed on an "AS IS" BASIS,
+// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+// See the License for the specific language governing permissions and
+// limitations under the License.
+
+
+{
+  "apiKey": 90,
+  "type": "response",
+  "name": "DescribeShareGroupOffsetsResponse",
+  "validVersions": "0",
+  "flexibleVersions": "0+",
+  // Supported errors:
+  // - GROUP_AUTHORIZATION_FAILED (version 0+)
+  // - NOT_COORDINATOR (version 0+)
+  // - COORDINATOR_NOT_AVAILABLE (version 0+)
+  // - COORDINATOR_LOAD_IN_PROGRESS (version 0+)
+  // - GROUP_ID_NOT_FOUND (version 0+)
+  // - INVALID_REQUEST (version 0+)
+  // - UNKNOWN_SERVER_ERROR (version 0+)
+  "fields": [
+    { "name": "ThrottleTimeMs", "type": "int32", "versions": "0+",
+      "about": "The duration in milliseconds for which the request was 
throttled due to a quota violation, or zero if the request did not violate any 
quota." },
+    { "name": "Responses", "type": "[]DescribeShareGroupOffsetsResponseTopic", 
"versions": "0+",
+      "about": "The results for each topic.", "fields": [
+      { "name": "TopicName", "type": "string", "versions": "0+", "entityType": 
"topicName",
+        "about": "The topic name." },
+      { "name": "TopicId", "type": "uuid", "versions": "0+",
+        "about": "The unique topic ID." },
+      { "name": "Partitions", "type": 
"[]DescribeShareGroupOffsetsResponsePartition", "versions": "0+", "fields": [
+        { "name": "PartitionIndex", "type": "int32", "versions": "0+",
+          "about": "The partition index." },
+        { "name": "StartOffset", "type": "int64", "versions": "0+",
+          "about": "The share-partition start offset." },
+        { "name": "LeaderEpoch", "type": "int32", "versions": "0+",
+          "about": "The leader epoch of the partition." },
+        { "name": "ErrorCode", "type": "int16", "versions": "0+",
+          "about": "The error code, or 0 if there was no error." },
+        { "name": "ErrorMessage", "type": "string", "versions": "0+", 
"nullableVersions": "0+", "default": "null",
+          "about": "The error message, or null if there was no error." }
+      ]}
+    ]}
+  ]
+}
\ No newline at end of file
diff --git 
a/clients/src/test/java/org/apache/kafka/common/requests/RequestResponseTest.java
 
b/clients/src/test/java/org/apache/kafka/common/requests/RequestResponseTest.java
index c65e75dfa9c..49b3179fa40 100644
--- 
a/clients/src/test/java/org/apache/kafka/common/requests/RequestResponseTest.java
+++ 
b/clients/src/test/java/org/apache/kafka/common/requests/RequestResponseTest.java
@@ -136,6 +136,8 @@ import 
org.apache.kafka.common.message.DescribeProducersRequestData;
 import org.apache.kafka.common.message.DescribeProducersResponseData;
 import org.apache.kafka.common.message.DescribeQuorumRequestData;
 import org.apache.kafka.common.message.DescribeQuorumResponseData;
+import org.apache.kafka.common.message.DescribeShareGroupOffsetsRequestData;
+import org.apache.kafka.common.message.DescribeShareGroupOffsetsResponseData;
 import org.apache.kafka.common.message.DescribeTopicPartitionsRequestData;
 import org.apache.kafka.common.message.DescribeTopicPartitionsResponseData;
 import org.apache.kafka.common.message.DescribeTransactionsRequestData;
@@ -1074,6 +1076,7 @@ public class RequestResponseTest {
             case READ_SHARE_GROUP_STATE_SUMMARY: return 
createReadShareGroupStateSummaryRequest(version);
             case STREAMS_GROUP_HEARTBEAT: return 
createStreamsGroupHeartbeatRequest(version);
             case STREAMS_GROUP_DESCRIBE: return 
createStreamsGroupDescribeRequest(version);
+            case DESCRIBE_SHARE_GROUP_OFFSETS: return 
createDescribeShareGroupOffsetsRequest(version);
             default: throw new IllegalArgumentException("Unknown API key " + 
apikey);
         }
     }
@@ -1170,6 +1173,7 @@ public class RequestResponseTest {
             case READ_SHARE_GROUP_STATE_SUMMARY: return 
createReadShareGroupStateSummaryResponse();
             case STREAMS_GROUP_HEARTBEAT: return 
createStreamsGroupHeartbeatResponse();
             case STREAMS_GROUP_DESCRIBE: return 
createStreamsGroupDescribeResponse();
+            case DESCRIBE_SHARE_GROUP_OFFSETS: return 
createDescribeShareGroupOffsetsResponse();
             default: throw new IllegalArgumentException("Unknown API key " + 
apikey);
         }
     }
@@ -3953,6 +3957,28 @@ public class RequestResponseTest {
         return new ReadShareGroupStateSummaryResponse(data);
     }
 
+    private DescribeShareGroupOffsetsRequest 
createDescribeShareGroupOffsetsRequest(short version) {
+        DescribeShareGroupOffsetsRequestData data = new 
DescribeShareGroupOffsetsRequestData()
+                .setGroupId("group")
+                .setTopics(Collections.singletonList(new 
DescribeShareGroupOffsetsRequestData.DescribeShareGroupOffsetsRequestTopic()
+                        .setTopicName("topic-1")
+                        .setPartitions(Collections.singletonList(0))));
+        return new 
DescribeShareGroupOffsetsRequest.Builder(data).build(version);
+    }
+
+    private DescribeShareGroupOffsetsResponse 
createDescribeShareGroupOffsetsResponse() {
+        DescribeShareGroupOffsetsResponseData data = new 
DescribeShareGroupOffsetsResponseData()
+                .setResponses(Collections.singletonList(new 
DescribeShareGroupOffsetsResponseData.DescribeShareGroupOffsetsResponseTopic()
+                        .setTopicName("group")
+                        .setTopicId(Uuid.randomUuid())
+                        .setPartitions(Collections.singletonList(new 
DescribeShareGroupOffsetsResponseData.DescribeShareGroupOffsetsResponsePartition()
+                                .setPartitionIndex(0)
+                                .setErrorCode(Errors.NONE.code())
+                                .setStartOffset(0)
+                                .setLeaderEpoch(0)))));
+        return new DescribeShareGroupOffsetsResponse(data);
+    }
+
     private AbstractRequest createStreamsGroupDescribeRequest(final short 
version) {
         return new StreamsGroupDescribeRequest.Builder(new 
StreamsGroupDescribeRequestData()
             .setGroupIds(Collections.singletonList("group"))
diff --git a/core/src/main/scala/kafka/server/KafkaApis.scala 
b/core/src/main/scala/kafka/server/KafkaApis.scala
index 085df39c886..c1ed5de10cd 100644
--- a/core/src/main/scala/kafka/server/KafkaApis.scala
+++ b/core/src/main/scala/kafka/server/KafkaApis.scala
@@ -243,6 +243,7 @@ class KafkaApis(val requestChannel: RequestChannel,
         case ApiKeys.WRITE_SHARE_GROUP_STATE => 
handleWriteShareGroupStateRequest(request)
         case ApiKeys.DELETE_SHARE_GROUP_STATE => 
handleDeleteShareGroupStateRequest(request)
         case ApiKeys.READ_SHARE_GROUP_STATE_SUMMARY => 
handleReadShareGroupStateSummaryRequest(request)
+        case ApiKeys.DESCRIBE_SHARE_GROUP_OFFSETS => 
handleDescribeShareGroupOffsetsRequest(request)
         case _ => throw new IllegalStateException(s"No handler for request api 
key ${request.header.apiKey}")
       }
     } catch {
@@ -3520,6 +3521,13 @@ class KafkaApis(val requestChannel: RequestChannel,
     CompletableFuture.completedFuture[Unit](())
   }
 
+  def handleDescribeShareGroupOffsetsRequest(request: RequestChannel.Request): 
Unit = {
+    val describeShareGroupOffsetsRequest = 
request.body[DescribeShareGroupOffsetsRequest]
+    // TODO: Implement the DescribeShareGroupOffsetsRequest handling
+    requestHelper.sendMaybeThrottle(request, 
describeShareGroupOffsetsRequest.getErrorResponse(Errors.UNSUPPORTED_VERSION.exception))
+    CompletableFuture.completedFuture[Unit](())
+  }
+
   // Visible for Testing
   def 
getAcknowledgeBatchesFromShareAcknowledgeRequest(shareAcknowledgeRequest: 
ShareAcknowledgeRequest,
                                                        topicIdNames: 
util.Map[Uuid, String],
diff --git a/core/src/test/scala/unit/kafka/server/RequestQuotaTest.scala 
b/core/src/test/scala/unit/kafka/server/RequestQuotaTest.scala
index 8585b5dc2c2..122ee1f6d89 100644
--- a/core/src/test/scala/unit/kafka/server/RequestQuotaTest.scala
+++ b/core/src/test/scala/unit/kafka/server/RequestQuotaTest.scala
@@ -746,6 +746,9 @@ class RequestQuotaTest extends BaseRequestTest {
         case ApiKeys.STREAMS_GROUP_DESCRIBE =>
           new StreamsGroupDescribeRequest.Builder(new 
StreamsGroupDescribeRequestData(), true)
 
+        case ApiKeys.DESCRIBE_SHARE_GROUP_OFFSETS =>
+          new DescribeShareGroupOffsetsRequest.Builder(new 
DescribeShareGroupOffsetsRequestData(), true)
+
         case _ =>
           throw new IllegalArgumentException("Unsupported API key " + apiKey)
     }
diff --git a/docs/security.html b/docs/security.html
index 7b08458a864..1b82dfc1223 100644
--- a/docs/security.html
+++ b/docs/security.html
@@ -2265,6 +2265,12 @@ 
RULE:[n:string](regexp)s/pattern/replacement/g/U</code></pre>
             <td>Cluster</td>
             <td></td>
         </tr>
+        <tr>
+            <td>DESCRIBE_SHARE_GROUP_OFFSETS (90)</td>
+            <td>Read</td>
+            <td>Group</td>
+            <td></td>
+        </tr>
         </tbody>
     </table>
 
diff --git 
a/server/src/main/java/org/apache/kafka/network/RequestConvertToJson.java 
b/server/src/main/java/org/apache/kafka/network/RequestConvertToJson.java
index b4b0cadb16d..d2bda3245f3 100644
--- a/server/src/main/java/org/apache/kafka/network/RequestConvertToJson.java
+++ b/server/src/main/java/org/apache/kafka/network/RequestConvertToJson.java
@@ -90,6 +90,8 @@ import 
org.apache.kafka.common.message.DescribeProducersRequestDataJsonConverter
 import 
org.apache.kafka.common.message.DescribeProducersResponseDataJsonConverter;
 import org.apache.kafka.common.message.DescribeQuorumRequestDataJsonConverter;
 import org.apache.kafka.common.message.DescribeQuorumResponseDataJsonConverter;
+import 
org.apache.kafka.common.message.DescribeShareGroupOffsetsRequestDataJsonConverter;
+import 
org.apache.kafka.common.message.DescribeShareGroupOffsetsResponseDataJsonConverter;
 import 
org.apache.kafka.common.message.DescribeTopicPartitionsRequestDataJsonConverter;
 import 
org.apache.kafka.common.message.DescribeTopicPartitionsResponseDataJsonConverter;
 import 
org.apache.kafka.common.message.DescribeTransactionsRequestDataJsonConverter;
@@ -274,6 +276,8 @@ import 
org.apache.kafka.common.requests.DescribeProducersRequest;
 import org.apache.kafka.common.requests.DescribeProducersResponse;
 import org.apache.kafka.common.requests.DescribeQuorumRequest;
 import org.apache.kafka.common.requests.DescribeQuorumResponse;
+import org.apache.kafka.common.requests.DescribeShareGroupOffsetsRequest;
+import org.apache.kafka.common.requests.DescribeShareGroupOffsetsResponse;
 import org.apache.kafka.common.requests.DescribeTopicPartitionsRequest;
 import org.apache.kafka.common.requests.DescribeTopicPartitionsResponse;
 import org.apache.kafka.common.requests.DescribeTransactionsRequest;
@@ -401,6 +405,8 @@ public class RequestConvertToJson {
                 return 
AddOffsetsToTxnRequestDataJsonConverter.write(((AddOffsetsToTxnRequest) 
request).data(), request.version());
             case ADD_PARTITIONS_TO_TXN:
                 return 
AddPartitionsToTxnRequestDataJsonConverter.write(((AddPartitionsToTxnRequest) 
request).data(), request.version());
+            case ADD_RAFT_VOTER:
+                return 
AddRaftVoterRequestDataJsonConverter.write(((AddRaftVoterRequest) 
request).data(), request.version());
             case ALLOCATE_PRODUCER_IDS:
                 return 
AllocateProducerIdsRequestDataJsonConverter.write(((AllocateProducerIdsRequest) 
request).data(), request.version());
             case ALTER_CLIENT_QUOTAS:
@@ -469,6 +475,8 @@ public class RequestConvertToJson {
                 return 
DescribeProducersRequestDataJsonConverter.write(((DescribeProducersRequest) 
request).data(), request.version());
             case DESCRIBE_QUORUM:
                 return 
DescribeQuorumRequestDataJsonConverter.write(((DescribeQuorumRequest) 
request).data(), request.version());
+            case DESCRIBE_SHARE_GROUP_OFFSETS:
+                return 
DescribeShareGroupOffsetsRequestDataJsonConverter.write(((DescribeShareGroupOffsetsRequest)
 request).data(), request.version());
             case DESCRIBE_TOPIC_PARTITIONS:
                 return 
DescribeTopicPartitionsRequestDataJsonConverter.write(((DescribeTopicPartitionsRequest)
 request).data(), request.version());
             case DESCRIBE_TRANSACTIONS:
@@ -535,6 +543,8 @@ public class RequestConvertToJson {
                 return 
ReadShareGroupStateRequestDataJsonConverter.write(((ReadShareGroupStateRequest) 
request).data(), request.version());
             case READ_SHARE_GROUP_STATE_SUMMARY:
                 return 
ReadShareGroupStateSummaryRequestDataJsonConverter.write(((ReadShareGroupStateSummaryRequest)
 request).data(), request.version());
+            case REMOVE_RAFT_VOTER:
+                return 
RemoveRaftVoterRequestDataJsonConverter.write(((RemoveRaftVoterRequest) 
request).data(), request.version());
             case RENEW_DELEGATION_TOKEN:
                 return 
RenewDelegationTokenRequestDataJsonConverter.write(((RenewDelegationTokenRequest)
 request).data(), request.version());
             case SASL_AUTHENTICATE:
@@ -565,18 +575,14 @@ public class RequestConvertToJson {
                 return 
UpdateFeaturesRequestDataJsonConverter.write(((UpdateFeaturesRequest) 
request).data(), request.version());
             case UPDATE_METADATA:
                 return 
UpdateMetadataRequestDataJsonConverter.write(((UpdateMetadataRequest) 
request).data(), request.version());
+            case UPDATE_RAFT_VOTER:
+                return 
UpdateRaftVoterRequestDataJsonConverter.write(((UpdateRaftVoterRequest) 
request).data(), request.version());
             case VOTE:
                 return VoteRequestDataJsonConverter.write(((VoteRequest) 
request).data(), request.version());
             case WRITE_SHARE_GROUP_STATE:
                 return 
WriteShareGroupStateRequestDataJsonConverter.write(((WriteShareGroupStateRequest)
 request).data(), request.version());
             case WRITE_TXN_MARKERS:
                 return 
WriteTxnMarkersRequestDataJsonConverter.write(((WriteTxnMarkersRequest) 
request).data(), request.version());
-            case ADD_RAFT_VOTER:
-                return 
AddRaftVoterRequestDataJsonConverter.write(((AddRaftVoterRequest) 
request).data(), request.version());
-            case REMOVE_RAFT_VOTER:
-                return 
RemoveRaftVoterRequestDataJsonConverter.write(((RemoveRaftVoterRequest) 
request).data(), request.version());
-            case UPDATE_RAFT_VOTER:
-                return 
UpdateRaftVoterRequestDataJsonConverter.write(((UpdateRaftVoterRequest) 
request).data(), request.version());
             default:
                 throw new IllegalStateException("ApiKey " + request.apiKey() + 
" is not currently handled in `request`, the " +
                     "code should be updated to do so.");
@@ -589,6 +595,8 @@ public class RequestConvertToJson {
                 return 
AddOffsetsToTxnResponseDataJsonConverter.write(((AddOffsetsToTxnResponse) 
response).data(), version);
             case ADD_PARTITIONS_TO_TXN:
                 return 
AddPartitionsToTxnResponseDataJsonConverter.write(((AddPartitionsToTxnResponse) 
response).data(), version);
+            case ADD_RAFT_VOTER:
+                return 
AddRaftVoterResponseDataJsonConverter.write(((AddRaftVoterResponse) 
response).data(), version);
             case ALLOCATE_PRODUCER_IDS:
                 return 
AllocateProducerIdsResponseDataJsonConverter.write(((AllocateProducerIdsResponse)
 response).data(), version);
             case ALTER_CLIENT_QUOTAS:
@@ -657,6 +665,8 @@ public class RequestConvertToJson {
                 return 
DescribeProducersResponseDataJsonConverter.write(((DescribeProducersResponse) 
response).data(), version);
             case DESCRIBE_QUORUM:
                 return 
DescribeQuorumResponseDataJsonConverter.write(((DescribeQuorumResponse) 
response).data(), version);
+            case DESCRIBE_SHARE_GROUP_OFFSETS:
+                return 
DescribeShareGroupOffsetsResponseDataJsonConverter.write(((DescribeShareGroupOffsetsResponse)
 response).data(), version);
             case DESCRIBE_TOPIC_PARTITIONS:
                 return 
DescribeTopicPartitionsResponseDataJsonConverter.write(((DescribeTopicPartitionsResponse)
 response).data(), version);
             case DESCRIBE_TRANSACTIONS:
@@ -723,6 +733,8 @@ public class RequestConvertToJson {
                 return 
ReadShareGroupStateResponseDataJsonConverter.write(((ReadShareGroupStateResponse)
 response).data(), version);
             case READ_SHARE_GROUP_STATE_SUMMARY:
                 return 
ReadShareGroupStateSummaryResponseDataJsonConverter.write(((ReadShareGroupStateSummaryResponse)
 response).data(), version);
+            case REMOVE_RAFT_VOTER:
+                return 
RemoveRaftVoterResponseDataJsonConverter.write(((RemoveRaftVoterResponse) 
response).data(), version);
             case RENEW_DELEGATION_TOKEN:
                 return 
RenewDelegationTokenResponseDataJsonConverter.write(((RenewDelegationTokenResponse)
 response).data(), version);
             case SASL_AUTHENTICATE:
@@ -753,18 +765,14 @@ public class RequestConvertToJson {
                 return 
UpdateFeaturesResponseDataJsonConverter.write(((UpdateFeaturesResponse) 
response).data(), version);
             case UPDATE_METADATA:
                 return 
UpdateMetadataResponseDataJsonConverter.write(((UpdateMetadataResponse) 
response).data(), version);
+            case UPDATE_RAFT_VOTER:
+                return 
UpdateRaftVoterResponseDataJsonConverter.write(((UpdateRaftVoterResponse) 
response).data(), version);
             case VOTE:
                 return VoteResponseDataJsonConverter.write(((VoteResponse) 
response).data(), version);
             case WRITE_SHARE_GROUP_STATE:
                 return 
WriteShareGroupStateResponseDataJsonConverter.write(((WriteShareGroupStateResponse)
 response).data(), version);
             case WRITE_TXN_MARKERS:
                 return 
WriteTxnMarkersResponseDataJsonConverter.write(((WriteTxnMarkersResponse) 
response).data(), version);
-            case ADD_RAFT_VOTER:
-                return 
AddRaftVoterResponseDataJsonConverter.write(((AddRaftVoterResponse) 
response).data(), version);
-            case REMOVE_RAFT_VOTER:
-                return 
RemoveRaftVoterResponseDataJsonConverter.write(((RemoveRaftVoterResponse) 
response).data(), version);
-            case UPDATE_RAFT_VOTER:
-                return 
UpdateRaftVoterResponseDataJsonConverter.write(((UpdateRaftVoterResponse) 
response).data(), version);
             default:
                 throw new IllegalStateException("ApiKey " + response.apiKey() 
+ " is not currently handled in `response`, the " +
                     "code should be updated to do so.");

Reply via email to