This is an automated email from the ASF dual-hosted git repository.

lucasbru pushed a commit to branch trunk
in repository https://gitbox.apache.org/repos/asf/kafka.git


The following commit(s) were added to refs/heads/trunk by this push:
     new 7997c9ebe0c KAFKA-20620: Add StreamsGroupTopologyDescriptionUpdate RPC 
schema and extend StreamsGroupDescribe/Heartbeat (#22397)
7997c9ebe0c is described below

commit 7997c9ebe0c651432b28c7e7a7dc62ce456b33b4
Author: Alieh Saeedi <[email protected]>
AuthorDate: Wed Jun 3 11:05:03 2026 +0200

    KAFKA-20620: Add StreamsGroupTopologyDescriptionUpdate RPC schema and 
extend StreamsGroupDescribe/Heartbeat (#22397)
    
    First sub-task of
    
    
[KIP-1331](https://cwiki.apache.org/confluence/display/KAFKA/KIP-1331%3A+Streams+Group+Topology+Description+Plugin)
    (Streams Group Topology Description Plugin). Adds the wire-format
    scaffolding only — no broker handler or client logic yet (those land in
    later tickets).
    
      - New RPC `StreamsGroupTopologyDescriptionUpdate` (apiKey 93) —
    request/response schemas + Java wrappers, wired through `ApiKeys`,
    `AbstractRequest`, `AbstractResponse`,
    `RequestConvertToJson`.
    - New error codes: `GROUP_DELETION_FAILED` (134),
    `STREAMS_TOPOLOGY_DESCRIPTION_UPDATE_FAILED` (135), with matching
    exception classes.
    - `StreamsGroupHeartbeatResponse`: tagged `TopologyDescriptionRequired`.
    - `StreamsGroupDescribeRequest`: tagged `IncludeTopologyDescription`.
    - `StreamsGroupDescribeResponse`: tagged `TopologyDescription` +
    `TopologyDescriptionStatus`.
    - `DeleteGroups{Request,Response}` bumped to v3 with per-group
    `ErrorMessage`.
    
    Reviewers: Lucas Brutschy <[email protected]>
    
    ---------
    
    Co-authored-by: Claude Opus 4.7 (1M context) <[email protected]>
---
 .../errors/GroupDeletionFailedException.java       |  31 +++++
 ...msTopologyDescriptionUpdateFailedException.java |  32 +++++
 .../org/apache/kafka/common/protocol/ApiKeys.java  |   3 +-
 .../org/apache/kafka/common/protocol/Errors.java   |   6 +-
 .../kafka/common/requests/AbstractRequest.java     |   2 +
 .../kafka/common/requests/AbstractResponse.java    |   2 +
 .../requests/StreamsGroupDescribeResponse.java     |   6 +
 ...reamsGroupTopologyDescriptionUpdateRequest.java |  83 +++++++++++
 ...amsGroupTopologyDescriptionUpdateResponse.java} |  31 ++---
 .../common/message/DeleteGroupsRequest.json        |   6 +-
 .../common/message/DeleteGroupsResponse.json       |   9 +-
 .../message/StreamsGroupDescribeRequest.json       |  11 +-
 .../message/StreamsGroupDescribeResponse.json      |  45 +++++-
 .../message/StreamsGroupHeartbeatRequest.json      |   7 +-
 .../message/StreamsGroupHeartbeatResponse.json     |   6 +-
 ...reamsGroupTopologyDescriptionUpdateRequest.json |  71 ++++++++++
 ...amsGroupTopologyDescriptionUpdateResponse.json} |  33 +++--
 .../kafka/common/requests/RequestResponseTest.java | 152 +++++++++++++++++++--
 core/src/main/scala/kafka/server/KafkaApis.scala   |  10 ++
 .../scala/unit/kafka/server/RequestQuotaTest.scala |   3 +
 .../apache/kafka/network/RequestConvertToJson.java |   8 ++
 21 files changed, 501 insertions(+), 56 deletions(-)

diff --git 
a/clients/src/main/java/org/apache/kafka/common/errors/GroupDeletionFailedException.java
 
b/clients/src/main/java/org/apache/kafka/common/errors/GroupDeletionFailedException.java
new file mode 100644
index 00000000000..eaa83160526
--- /dev/null
+++ 
b/clients/src/main/java/org/apache/kafka/common/errors/GroupDeletionFailedException.java
@@ -0,0 +1,31 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ *    http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.kafka.common.errors;
+
+/**
+ * Indicates that {@code DeleteGroups} could not complete for the affected 
group. The
+ * accompanying error message describes the underlying cause; the caller may 
retry once
+ * the underlying condition is resolved.
+ */
+public class GroupDeletionFailedException extends ApiException {
+
+    private static final long serialVersionUID = 1L;
+
+    public GroupDeletionFailedException(String message) {
+        super(message);
+    }
+}
diff --git 
a/clients/src/main/java/org/apache/kafka/common/errors/StreamsTopologyDescriptionUpdateFailedException.java
 
b/clients/src/main/java/org/apache/kafka/common/errors/StreamsTopologyDescriptionUpdateFailedException.java
new file mode 100644
index 00000000000..df981b8b5b9
--- /dev/null
+++ 
b/clients/src/main/java/org/apache/kafka/common/errors/StreamsTopologyDescriptionUpdateFailedException.java
@@ -0,0 +1,32 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ *    http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.kafka.common.errors;
+
+/**
+ * Indicates that the streams group topology description plugin failed to 
process
+ * a StreamsGroupTopologyDescriptionUpdate request. The accompanying error 
message
+ * describes the underlying cause; the broker tracks the transient-vs-permanent
+ * distinction internally and does not reflect it on the wire.
+ */
+public class StreamsTopologyDescriptionUpdateFailedException extends 
ApiException {
+
+    private static final long serialVersionUID = 1L;
+
+    public StreamsTopologyDescriptionUpdateFailedException(String message) {
+        super(message);
+    }
+}
diff --git 
a/clients/src/main/java/org/apache/kafka/common/protocol/ApiKeys.java 
b/clients/src/main/java/org/apache/kafka/common/protocol/ApiKeys.java
index 79b283b4f8d..22dc37e67d1 100644
--- a/clients/src/main/java/org/apache/kafka/common/protocol/ApiKeys.java
+++ b/clients/src/main/java/org/apache/kafka/common/protocol/ApiKeys.java
@@ -137,7 +137,8 @@ public enum ApiKeys {
     STREAMS_GROUP_DESCRIBE(ApiMessageType.STREAMS_GROUP_DESCRIBE),
     DESCRIBE_SHARE_GROUP_OFFSETS(ApiMessageType.DESCRIBE_SHARE_GROUP_OFFSETS),
     ALTER_SHARE_GROUP_OFFSETS(ApiMessageType.ALTER_SHARE_GROUP_OFFSETS),
-    DELETE_SHARE_GROUP_OFFSETS(ApiMessageType.DELETE_SHARE_GROUP_OFFSETS);
+    DELETE_SHARE_GROUP_OFFSETS(ApiMessageType.DELETE_SHARE_GROUP_OFFSETS),
+    
STREAMS_GROUP_TOPOLOGY_DESCRIPTION_UPDATE(ApiMessageType.STREAMS_GROUP_TOPOLOGY_DESCRIPTION_UPDATE);
 
     private static final Map<ApiMessageType.ListenerType, EnumSet<ApiKeys>> 
APIS_BY_LISTENER =
         new EnumMap<>(ApiMessageType.ListenerType.class);
diff --git a/clients/src/main/java/org/apache/kafka/common/protocol/Errors.java 
b/clients/src/main/java/org/apache/kafka/common/protocol/Errors.java
index a27a7fcf23c..15ac7765a29 100644
--- a/clients/src/main/java/org/apache/kafka/common/protocol/Errors.java
+++ b/clients/src/main/java/org/apache/kafka/common/protocol/Errors.java
@@ -45,6 +45,7 @@ import 
org.apache.kafka.common.errors.FencedStateEpochException;
 import org.apache.kafka.common.errors.FetchSessionIdNotFoundException;
 import org.apache.kafka.common.errors.FetchSessionTopicIdException;
 import org.apache.kafka.common.errors.GroupAuthorizationException;
+import org.apache.kafka.common.errors.GroupDeletionFailedException;
 import org.apache.kafka.common.errors.GroupIdNotFoundException;
 import org.apache.kafka.common.errors.GroupMaxSizeReachedException;
 import org.apache.kafka.common.errors.GroupNotEmptyException;
@@ -122,6 +123,7 @@ import 
org.apache.kafka.common.errors.StaleBrokerEpochException;
 import org.apache.kafka.common.errors.StaleMemberEpochException;
 import org.apache.kafka.common.errors.StreamsInvalidTopologyEpochException;
 import org.apache.kafka.common.errors.StreamsInvalidTopologyException;
+import 
org.apache.kafka.common.errors.StreamsTopologyDescriptionUpdateFailedException;
 import org.apache.kafka.common.errors.StreamsTopologyFencedException;
 import org.apache.kafka.common.errors.TelemetryTooLargeException;
 import org.apache.kafka.common.errors.ThrottlingQuotaExceededException;
@@ -418,7 +420,9 @@ public enum Errors {
     STREAMS_INVALID_TOPOLOGY(130, "The supplied topology is invalid.", 
StreamsInvalidTopologyException::new),
     STREAMS_INVALID_TOPOLOGY_EPOCH(131, "The supplied topology epoch is 
invalid.", StreamsInvalidTopologyEpochException::new),
     STREAMS_TOPOLOGY_FENCED(132, "The supplied topology epoch is outdated.", 
StreamsTopologyFencedException::new),
-    SHARE_SESSION_LIMIT_REACHED(133, "The limit of share sessions has been 
reached.", ShareSessionLimitReachedException::new);
+    SHARE_SESSION_LIMIT_REACHED(133, "The limit of share sessions has been 
reached.", ShareSessionLimitReachedException::new),
+    GROUP_DELETION_FAILED(134, "DeleteGroups could not complete; see the error 
message on the per-group result for details.", 
GroupDeletionFailedException::new),
+    STREAMS_TOPOLOGY_DESCRIPTION_UPDATE_FAILED(135, "The broker could not 
process the topology description update; see the error message for details.", 
StreamsTopologyDescriptionUpdateFailedException::new);
 
     private static final Logger log = LoggerFactory.getLogger(Errors.class);
 
diff --git 
a/clients/src/main/java/org/apache/kafka/common/requests/AbstractRequest.java 
b/clients/src/main/java/org/apache/kafka/common/requests/AbstractRequest.java
index 750de2050f4..8630c379ed6 100644
--- 
a/clients/src/main/java/org/apache/kafka/common/requests/AbstractRequest.java
+++ 
b/clients/src/main/java/org/apache/kafka/common/requests/AbstractRequest.java
@@ -354,6 +354,8 @@ public abstract class AbstractRequest implements 
AbstractRequestResponse {
                 return AlterShareGroupOffsetsRequest.parse(readable, 
apiVersion);
             case DELETE_SHARE_GROUP_OFFSETS:
                 return DeleteShareGroupOffsetsRequest.parse(readable, 
apiVersion);
+            case STREAMS_GROUP_TOPOLOGY_DESCRIPTION_UPDATE:
+                return 
StreamsGroupTopologyDescriptionUpdateRequest.parse(readable, apiVersion);
             default:
                 throw new AssertionError(String.format("ApiKey %s is not 
currently handled in `parseRequest`, the " +
                         "code should be updated to do so.", apiKey));
diff --git 
a/clients/src/main/java/org/apache/kafka/common/requests/AbstractResponse.java 
b/clients/src/main/java/org/apache/kafka/common/requests/AbstractResponse.java
index 7d96d1a1731..961559d9e8f 100644
--- 
a/clients/src/main/java/org/apache/kafka/common/requests/AbstractResponse.java
+++ 
b/clients/src/main/java/org/apache/kafka/common/requests/AbstractResponse.java
@@ -290,6 +290,8 @@ public abstract class AbstractResponse implements 
AbstractRequestResponse {
                 return AlterShareGroupOffsetsResponse.parse(readable, version);
             case DELETE_SHARE_GROUP_OFFSETS:
                 return DeleteShareGroupOffsetsResponse.parse(readable, 
version);
+            case STREAMS_GROUP_TOPOLOGY_DESCRIPTION_UPDATE:
+                return 
StreamsGroupTopologyDescriptionUpdateResponse.parse(readable, version);
             default:
                 throw new AssertionError(String.format("ApiKey %s is not 
currently handled in `parseResponse`, the " +
                         "code should be updated to do so.", apiKey));
diff --git 
a/clients/src/main/java/org/apache/kafka/common/requests/StreamsGroupDescribeResponse.java
 
b/clients/src/main/java/org/apache/kafka/common/requests/StreamsGroupDescribeResponse.java
index efee6e521f4..8423b0b69ec 100644
--- 
a/clients/src/main/java/org/apache/kafka/common/requests/StreamsGroupDescribeResponse.java
+++ 
b/clients/src/main/java/org/apache/kafka/common/requests/StreamsGroupDescribeResponse.java
@@ -38,6 +38,12 @@ import java.util.Map;
  */
 public class StreamsGroupDescribeResponse extends AbstractResponse {
 
+    // TopologyDescriptionStatus int8 values (v1+). These values must not 
change.
+    public static final byte TOPOLOGY_DESCRIPTION_STATUS_NOT_REQUESTED = 0;
+    public static final byte TOPOLOGY_DESCRIPTION_STATUS_NOT_STORED = 1;
+    public static final byte TOPOLOGY_DESCRIPTION_STATUS_ERROR = 2;
+    public static final byte TOPOLOGY_DESCRIPTION_STATUS_AVAILABLE = 3;
+
     private final StreamsGroupDescribeResponseData data;
 
     public StreamsGroupDescribeResponse(StreamsGroupDescribeResponseData data) 
{
diff --git 
a/clients/src/main/java/org/apache/kafka/common/requests/StreamsGroupTopologyDescriptionUpdateRequest.java
 
b/clients/src/main/java/org/apache/kafka/common/requests/StreamsGroupTopologyDescriptionUpdateRequest.java
new file mode 100644
index 00000000000..d476f7ac44d
--- /dev/null
+++ 
b/clients/src/main/java/org/apache/kafka/common/requests/StreamsGroupTopologyDescriptionUpdateRequest.java
@@ -0,0 +1,83 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ *    http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.kafka.common.requests;
+
+import 
org.apache.kafka.common.message.StreamsGroupTopologyDescriptionUpdateRequestData;
+import 
org.apache.kafka.common.message.StreamsGroupTopologyDescriptionUpdateResponseData;
+import org.apache.kafka.common.protocol.ApiKeys;
+import org.apache.kafka.common.protocol.Readable;
+
+/**
+ * Sent by a Streams client to push its topology description to the broker, in 
response
+ * to {@code TopologyDescriptionRequired=true} on a {@code 
StreamsGroupHeartbeatResponse}.
+ * The broker validates that {@code MemberId} still belongs to the group, 
checks the
+ * {@code TopologyEpoch} against the group's current epoch, and persists the 
description.
+ * See KIP-1331.
+ *
+ * <p>Legal error codes are documented on {@link 
StreamsGroupTopologyDescriptionUpdateResponse}.
+ */
+public class StreamsGroupTopologyDescriptionUpdateRequest extends 
AbstractRequest {
+
+    public static class Builder extends 
AbstractRequest.Builder<StreamsGroupTopologyDescriptionUpdateRequest> {
+        private final StreamsGroupTopologyDescriptionUpdateRequestData data;
+
+        public Builder(StreamsGroupTopologyDescriptionUpdateRequestData data) {
+            // The schema is marked latestVersionUnstable until the broker 
handler lands; opt in
+            // here so the Builder can still construct the only existing 
version.
+            super(ApiKeys.STREAMS_GROUP_TOPOLOGY_DESCRIPTION_UPDATE, true);
+            this.data = data;
+        }
+
+        @Override
+        public StreamsGroupTopologyDescriptionUpdateRequest build(short 
version) {
+            return new StreamsGroupTopologyDescriptionUpdateRequest(data, 
version);
+        }
+
+        @Override
+        public String toString() {
+            return data.toString();
+        }
+    }
+
+    private final StreamsGroupTopologyDescriptionUpdateRequestData data;
+
+    public 
StreamsGroupTopologyDescriptionUpdateRequest(StreamsGroupTopologyDescriptionUpdateRequestData
 data, short version) {
+        super(ApiKeys.STREAMS_GROUP_TOPOLOGY_DESCRIPTION_UPDATE, version);
+        this.data = data;
+    }
+
+    @Override
+    public AbstractResponse getErrorResponse(int throttleTimeMs, Throwable e) {
+        ApiError apiError = ApiError.fromThrowable(e);
+        return new StreamsGroupTopologyDescriptionUpdateResponse(
+            new StreamsGroupTopologyDescriptionUpdateResponseData()
+                .setThrottleTimeMs(throttleTimeMs)
+                .setErrorCode(apiError.error().code())
+                .setErrorMessage(apiError.message())
+        );
+    }
+
+    @Override
+    public StreamsGroupTopologyDescriptionUpdateRequestData data() {
+        return data;
+    }
+
+    public static StreamsGroupTopologyDescriptionUpdateRequest parse(Readable 
readable, short version) {
+        return new StreamsGroupTopologyDescriptionUpdateRequest(
+            new StreamsGroupTopologyDescriptionUpdateRequestData(readable, 
version), version);
+    }
+}
diff --git 
a/clients/src/main/java/org/apache/kafka/common/requests/StreamsGroupDescribeResponse.java
 
b/clients/src/main/java/org/apache/kafka/common/requests/StreamsGroupTopologyDescriptionUpdateResponse.java
similarity index 63%
copy from 
clients/src/main/java/org/apache/kafka/common/requests/StreamsGroupDescribeResponse.java
copy to 
clients/src/main/java/org/apache/kafka/common/requests/StreamsGroupTopologyDescriptionUpdateResponse.java
index efee6e521f4..fb93bf49ccd 100644
--- 
a/clients/src/main/java/org/apache/kafka/common/requests/StreamsGroupDescribeResponse.java
+++ 
b/clients/src/main/java/org/apache/kafka/common/requests/StreamsGroupTopologyDescriptionUpdateResponse.java
@@ -16,12 +16,11 @@
  */
 package org.apache.kafka.common.requests;
 
-import org.apache.kafka.common.message.StreamsGroupDescribeResponseData;
+import 
org.apache.kafka.common.message.StreamsGroupTopologyDescriptionUpdateResponseData;
 import org.apache.kafka.common.protocol.ApiKeys;
 import org.apache.kafka.common.protocol.Errors;
 import org.apache.kafka.common.protocol.Readable;
 
-import java.util.EnumMap;
 import java.util.Map;
 
 /**
@@ -32,31 +31,28 @@ import java.util.Map;
  * - {@link Errors#COORDINATOR_NOT_AVAILABLE}
  * - {@link Errors#COORDINATOR_LOAD_IN_PROGRESS}
  * - {@link Errors#INVALID_REQUEST}
- * - {@link Errors#INVALID_GROUP_ID}
+ * - {@link Errors#UNSUPPORTED_VERSION}
+ * - {@link Errors#UNKNOWN_MEMBER_ID}
  * - {@link Errors#GROUP_ID_NOT_FOUND}
- * - {@link Errors#TOPIC_AUTHORIZATION_FAILED}
+ * - {@link Errors#STREAMS_TOPOLOGY_DESCRIPTION_UPDATE_FAILED}
  */
-public class StreamsGroupDescribeResponse extends AbstractResponse {
+public class StreamsGroupTopologyDescriptionUpdateResponse extends 
AbstractResponse {
 
-    private final StreamsGroupDescribeResponseData data;
+    private final StreamsGroupTopologyDescriptionUpdateResponseData data;
 
-    public StreamsGroupDescribeResponse(StreamsGroupDescribeResponseData data) 
{
-        super(ApiKeys.STREAMS_GROUP_DESCRIBE);
+    public 
StreamsGroupTopologyDescriptionUpdateResponse(StreamsGroupTopologyDescriptionUpdateResponseData
 data) {
+        super(ApiKeys.STREAMS_GROUP_TOPOLOGY_DESCRIPTION_UPDATE);
         this.data = data;
     }
 
     @Override
-    public StreamsGroupDescribeResponseData data() {
+    public StreamsGroupTopologyDescriptionUpdateResponseData data() {
         return data;
     }
 
     @Override
     public Map<Errors, Integer> errorCounts() {
-        Map<Errors, Integer> counts = new EnumMap<>(Errors.class);
-        data.groups().forEach(
-            group -> updateErrorCounts(counts, 
Errors.forCode(group.errorCode()))
-        );
-        return counts;
+        return errorCounts(Errors.forCode(data.errorCode()));
     }
 
     @Override
@@ -69,9 +65,8 @@ public class StreamsGroupDescribeResponse extends 
AbstractResponse {
         data.setThrottleTimeMs(throttleTimeMs);
     }
 
-    public static StreamsGroupDescribeResponse parse(Readable readable, short 
version) {
-        return new StreamsGroupDescribeResponse(
-            new StreamsGroupDescribeResponseData(readable, version)
-        );
+    public static StreamsGroupTopologyDescriptionUpdateResponse parse(Readable 
readable, short version) {
+        return new StreamsGroupTopologyDescriptionUpdateResponse(
+            new StreamsGroupTopologyDescriptionUpdateResponseData(readable, 
version));
     }
 }
diff --git a/clients/src/main/resources/common/message/DeleteGroupsRequest.json 
b/clients/src/main/resources/common/message/DeleteGroupsRequest.json
index 7d7c4371789..5c55a9939b7 100644
--- a/clients/src/main/resources/common/message/DeleteGroupsRequest.json
+++ b/clients/src/main/resources/common/message/DeleteGroupsRequest.json
@@ -21,7 +21,11 @@
   // Version 1 is the same as version 0.
   //
   // Version 2 is the first flexible version.
-  "validVersions": "0-2",
+  //
+  // Version 3 corresponds to the addition of an ErrorMessage field on each 
per-group
+  // result in the response (populated when the broker needs to surface a 
cause to the
+  // caller). The request body shape is unchanged at version 3.
+  "validVersions": "0-3",
   "flexibleVersions": "2+",
   "fields": [
     { "name": "GroupsNames", "type": "[]string", "versions": "0+", 
"entityType": "groupId",
diff --git 
a/clients/src/main/resources/common/message/DeleteGroupsResponse.json 
b/clients/src/main/resources/common/message/DeleteGroupsResponse.json
index 168cde03ba3..4f5ad712a63 100644
--- a/clients/src/main/resources/common/message/DeleteGroupsResponse.json
+++ b/clients/src/main/resources/common/message/DeleteGroupsResponse.json
@@ -20,7 +20,10 @@
   // Starting in version 1, on quota violation, brokers send out responses 
before throttling.
   //
   // Version 2 is the first flexible version.
-  "validVersions": "0-2",
+  //
+  // Version 3 adds the per-group ErrorMessage field so the broker can surface 
a cause
+  // string when GROUP_DELETION_FAILED (or any other non-NONE error) is 
returned for a group.
+  "validVersions": "0-3",
   "flexibleVersions": "2+",
   "fields": [
     { "name": "ThrottleTimeMs", "type": "int32", "versions": "0+",
@@ -30,7 +33,9 @@
       { "name": "GroupId", "type": "string", "versions": "0+", "mapKey": true, 
"entityType": "groupId",
         "about": "The group id." },
       { "name": "ErrorCode", "type": "int16", "versions": "0+",
-        "about": "The deletion error, or 0 if the deletion succeeded." }
+        "about": "The deletion error, or 0 if the deletion succeeded." },
+      { "name": "ErrorMessage", "type": "string", "versions": "3+", 
"nullableVersions": "3+", "ignorable": true, "default": "null",
+        "about": "The error message, or null if there was no error." }
     ]}
   ]
 }
diff --git 
a/clients/src/main/resources/common/message/StreamsGroupDescribeRequest.json 
b/clients/src/main/resources/common/message/StreamsGroupDescribeRequest.json
index 6e36479043a..7d3741f1d33 100644
--- a/clients/src/main/resources/common/message/StreamsGroupDescribeRequest.json
+++ b/clients/src/main/resources/common/message/StreamsGroupDescribeRequest.json
@@ -18,12 +18,17 @@
   "type": "request",
   "listeners": ["broker"],
   "name": "StreamsGroupDescribeRequest",
-  "validVersions": "0",
+  // Version 1 adds IncludeTopologyDescription (KIP-1331). Marked unstable 
until the broker
+  // topology-description manager lands; flip to false before release.
+  "latestVersionUnstable": true,
+  "validVersions": "0-1",
   "flexibleVersions": "0+",
   "fields": [
     { "name": "GroupIds", "type": "[]string", "versions": "0+", "entityType": 
"groupId",
       "about": "The ids of the groups to describe" },
     { "name": "IncludeAuthorizedOperations", "type": "bool", "versions": "0+",
-      "about": "Whether to include authorized operations." }
+      "about": "Whether to include authorized operations." },
+    { "name": "IncludeTopologyDescription", "type": "bool", "versions": "1+", 
"default": "false",
+      "about": "Whether to include the full topology description from the 
topology description plugin in the response." }
   ]
-}
\ No newline at end of file
+}
diff --git 
a/clients/src/main/resources/common/message/StreamsGroupDescribeResponse.json 
b/clients/src/main/resources/common/message/StreamsGroupDescribeResponse.json
index b99f9c00b08..8b6eb165e18 100644
--- 
a/clients/src/main/resources/common/message/StreamsGroupDescribeResponse.json
+++ 
b/clients/src/main/resources/common/message/StreamsGroupDescribeResponse.json
@@ -17,7 +17,8 @@
   "apiKey": 89,
   "type": "response",
   "name": "StreamsGroupDescribeResponse",
-  "validVersions": "0",
+  // Version 1 adds TopologyDescription and TopologyDescriptionStatus 
(KIP-1331).
+  "validVersions": "0-1",
   "flexibleVersions": "0+",
   // Supported errors:
   // - GROUP_AUTHORIZATION_FAILED (version 0+)
@@ -106,7 +107,12 @@
               "about": "True for classic members that have not been upgraded 
yet." }
           ]},
         { "name": "AuthorizedOperations", "type": "int32", "versions": "0+", 
"default": "-2147483648",
-          "about": "32-bit bitfield to represent authorized operations for 
this group." }
+          "about": "32-bit bitfield to represent authorized operations for 
this group." },
+        { "name": "TopologyDescription", "type": "TopologyDescription", 
"versions": "1+",
+          "nullableVersions": "1+", "default": "null",
+          "about": "The full topology description for this group. Non-null if 
and only if TopologyDescriptionStatus is AVAILABLE (3); null otherwise." },
+        { "name": "TopologyDescriptionStatus", "type": "int8", "versions": 
"1+", "default": "0",
+          "about": "The status of the topology description for this group, 
paired with TopologyDescription: 0=NOT_REQUESTED (client did not set 
IncludeTopologyDescription; TopologyDescription is null); 1=NOT_STORED (no 
description recorded for this group; TopologyDescription is null); 2=ERROR 
(broker failed to fetch the description, see broker logs; TopologyDescription 
is null); 3=AVAILABLE (TopologyDescription is non-null and carries the 
description)." }
       ]
     }
   ],
@@ -155,6 +161,39 @@
       { "name": "TopicConfigs", "type": "[]KeyValue", "versions": "0+",
         "about": "Topic-level configurations as key-value pairs."
       }
+    ]},
+    { "name": "TopologyDescription", "versions": "1+", "fields": [
+      { "name": "Subtopologies", "type": "[]TopologyDescriptionSubtopology", 
"versions": "1+",
+        "about": "The subtopologies that make up this topology." },
+      { "name": "GlobalStores", "type": "[]TopologyDescriptionGlobalStore", 
"versions": "1+",
+        "about": "Global state stores used by this topology." }
+    ]},
+    { "name": "TopologyDescriptionSubtopology", "versions": "1+", "fields": [
+      { "name": "SubtopologyId", "type": "string", "versions": "1+",
+        "about": "The subtopology identifier, unique within the topology." },
+      { "name": "Nodes", "type": "[]TopologyDescriptionNode", "versions": "1+",
+        "about": "The processing nodes in this subtopology." }
+    ]},
+    { "name": "TopologyDescriptionNode", "versions": "1+", "fields": [
+      { "name": "Name", "type": "string", "versions": "1+",
+        "about": "The name of this node (e.g., KSTREAM-SOURCE-0000000000)." },
+      { "name": "NodeType", "type": "int8", "versions": "1+",
+        "about": "The type of this node: 1=SOURCE, 2=PROCESSOR, 3=SINK." },
+      { "name": "SourceTopics", "type": "[]string", "versions": "1+", 
"entityType": "topicName",
+        "about": "The source topics this node reads from. Defined only for 
source nodes, may be empty if source topics are dynamically determined." },
+      { "name": "SinkTopic", "type": "string", "versions": "1+", "entityType": 
"topicName",
+        "nullableVersions": "1+", "default": "null",
+        "about": "The topic this node writes to. Defined only for sink nodes, 
may be null if sink topic is dynamically determined." },
+      { "name": "Stores", "type": "[]string", "versions": "1+",
+        "about": "The state store names accessed by this node. Defined only 
for processor nodes." },
+      { "name": "Successors", "type": "[]string", "versions": "1+",
+        "about": "The names of successor nodes in the processing graph. 
Predecessor relationships are reconstructed from this field." }
+    ]},
+    { "name": "TopologyDescriptionGlobalStore", "versions": "1+", "fields": [
+      { "name": "Source", "type": "TopologyDescriptionNode", "versions": "1+",
+        "about": "The source node providing data to the global store." },
+      { "name": "Processor", "type": "TopologyDescriptionNode", "versions": 
"1+",
+        "about": "The processor node that populates the global store." }
     ]}
   ]
-}
\ No newline at end of file
+}
diff --git 
a/clients/src/main/resources/common/message/StreamsGroupHeartbeatRequest.json 
b/clients/src/main/resources/common/message/StreamsGroupHeartbeatRequest.json
index a2cba46e763..dd03c28d07c 100644
--- 
a/clients/src/main/resources/common/message/StreamsGroupHeartbeatRequest.json
+++ 
b/clients/src/main/resources/common/message/StreamsGroupHeartbeatRequest.json
@@ -18,7 +18,12 @@
   "type": "request",
   "listeners": ["broker"],
   "name": "StreamsGroupHeartbeatRequest",
-  "validVersions": "0",
+  // Version 1 is the same as version 0; bumped together with 
StreamsGroupHeartbeatResponse v1,
+  // which adds TopologyDescriptionRequired (KIP-1331). Required so the 
response v1 is negotiated.
+  // Marked unstable until the broker topology-description manager lands 
(KIP-1331); flip to false
+  // before release.
+  "latestVersionUnstable": true,
+  "validVersions": "0-1",
   "flexibleVersions": "0+",
   "fields": [
     { "name": "GroupId", "type": "string", "versions": "0+", "entityType": 
"groupId",
diff --git 
a/clients/src/main/resources/common/message/StreamsGroupHeartbeatResponse.json 
b/clients/src/main/resources/common/message/StreamsGroupHeartbeatResponse.json
index 538816f4129..6422712ba52 100644
--- 
a/clients/src/main/resources/common/message/StreamsGroupHeartbeatResponse.json
+++ 
b/clients/src/main/resources/common/message/StreamsGroupHeartbeatResponse.json
@@ -17,7 +17,8 @@
   "apiKey": 88,
   "type": "response",
   "name": "StreamsGroupHeartbeatResponse",
-  "validVersions": "0",
+  // Version 1 adds TopologyDescriptionRequired (KIP-1331).
+  "validVersions": "0-1",
   "flexibleVersions": "0+",
   // Supported errors:
   // - GROUP_AUTHORIZATION_FAILED (version 0+)
@@ -65,6 +66,9 @@
     { "name": "WarmupTasks", "type": "[]TaskIds", "versions": "0+", 
"nullableVersions": "0+", "default": "null",
       "about": "Assigned warm-up tasks for this client. Null if unchanged 
since last heartbeat." },
 
+    { "name": "TopologyDescriptionRequired", "type": "bool", "versions": "1+", 
"default": "false",
+      "about": "True if the broker does not have an up-to-date topology 
description for this group. The client should send the topology description via 
StreamsGroupTopologyDescriptionUpdate." },
+
     // IQ-related information
     { "name": "EndpointInformationEpoch", "type": "int32", "versions": "0+",
       "about": "The endpoint epoch set in the response"},
diff --git 
a/clients/src/main/resources/common/message/StreamsGroupTopologyDescriptionUpdateRequest.json
 
b/clients/src/main/resources/common/message/StreamsGroupTopologyDescriptionUpdateRequest.json
new file mode 100644
index 00000000000..019ee189721
--- /dev/null
+++ 
b/clients/src/main/resources/common/message/StreamsGroupTopologyDescriptionUpdateRequest.json
@@ -0,0 +1,71 @@
+// Licensed to the Apache Software Foundation (ASF) under one or more
+// contributor license agreements.  See the NOTICE file distributed with
+// this work for additional information regarding copyright ownership.
+// The ASF licenses this file to You under the Apache License, Version 2.0
+// (the "License"); you may not use this file except in compliance with
+// the License.  You may obtain a copy of the License at
+//
+//    http://www.apache.org/licenses/LICENSE-2.0
+//
+// Unless required by applicable law or agreed to in writing, software
+// distributed under the License is distributed on an "AS IS" BASIS,
+// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+// See the License for the specific language governing permissions and
+// limitations under the License.
+
+{
+  "apiKey": 93,
+  "type": "request",
+  "listeners": ["broker"],
+  "name": "StreamsGroupTopologyDescriptionUpdateRequest",
+  // The broker handler is a stub returning UNSUPPORTED_VERSION; the real 
handler lands in a
+  // later sub-task of KIP-1331. Flip to false before release.
+  "latestVersionUnstable": true,
+  "validVersions": "0",
+  "flexibleVersions": "0+",
+  "fields": [
+    { "name": "GroupId", "type": "string", "versions": "0+", "entityType": 
"groupId",
+      "about": "The streams group identifier." },
+    { "name": "MemberId", "type": "string", "versions": "0+",
+      "about": "The ID of the streams group member sending the push. The 
broker validates that this member is still in the group; mismatches (including 
when the group itself has been deleted) are rejected with UNKNOWN_MEMBER_ID so 
the client treats itself as fenced and rejoins." },
+    { "name": "TopologyEpoch", "type": "int32", "versions": "0+",
+      "about": "The epoch of the topology being described." },
+    { "name": "TopologyDescription", "type": "TopologyDescription", 
"versions": "0+",
+      "about": "The topology description." }
+  ],
+  "commonStructs": [
+    { "name": "TopologyDescription", "versions": "0+", "fields": [
+      { "name": "Subtopologies", "type": "[]TopologyDescriptionSubtopology", 
"versions": "0+",
+        "about": "The subtopologies that make up this topology." },
+      { "name": "GlobalStores", "type": "[]TopologyDescriptionGlobalStore", 
"versions": "0+",
+        "about": "Global state stores used by this topology." }
+    ]},
+    { "name": "TopologyDescriptionSubtopology", "versions": "0+", "fields": [
+      { "name": "SubtopologyId", "type": "string", "versions": "0+",
+        "about": "The subtopology identifier, unique within the topology." },
+      { "name": "Nodes", "type": "[]TopologyDescriptionNode", "versions": "0+",
+        "about": "The processing nodes in this subtopology." }
+    ]},
+    { "name": "TopologyDescriptionNode", "versions": "0+", "fields": [
+      { "name": "Name", "type": "string", "versions": "0+",
+        "about": "The name of this node (e.g., KSTREAM-SOURCE-0000000000)." },
+      { "name": "NodeType", "type": "int8", "versions": "0+",
+        "about": "The type of this node: 1=SOURCE, 2=PROCESSOR, 3=SINK." },
+      { "name": "SourceTopics", "type": "[]string", "versions": "0+", 
"entityType": "topicName",
+        "about": "The source topics this node reads from. Defined only for 
source nodes, may be empty if source topics are dynamically determined." },
+      { "name": "SinkTopic", "type": "string", "versions": "0+", "entityType": 
"topicName",
+        "nullableVersions": "0+", "default": "null",
+        "about": "The topic this node writes to. Defined only for sink nodes, 
may be null if sink topic is dynamically determined." },
+      { "name": "Stores", "type": "[]string", "versions": "0+",
+        "about": "The state store names accessed by this node. Defined only 
for processor nodes." },
+      { "name": "Successors", "type": "[]string", "versions": "0+",
+        "about": "The names of successor nodes in the processing graph. 
Predecessor relationships are reconstructed from this field." }
+    ]},
+    { "name": "TopologyDescriptionGlobalStore", "versions": "0+", "fields": [
+      { "name": "Source", "type": "TopologyDescriptionNode", "versions": "0+",
+        "about": "The source node providing data to the global store." },
+      { "name": "Processor", "type": "TopologyDescriptionNode", "versions": 
"0+",
+        "about": "The processor node that populates the global store." }
+    ]}
+  ]
+}
diff --git 
a/clients/src/main/resources/common/message/DeleteGroupsResponse.json 
b/clients/src/main/resources/common/message/StreamsGroupTopologyDescriptionUpdateResponse.json
similarity index 54%
copy from clients/src/main/resources/common/message/DeleteGroupsResponse.json
copy to 
clients/src/main/resources/common/message/StreamsGroupTopologyDescriptionUpdateResponse.json
index 168cde03ba3..e03b4215cd3 100644
--- a/clients/src/main/resources/common/message/DeleteGroupsResponse.json
+++ 
b/clients/src/main/resources/common/message/StreamsGroupTopologyDescriptionUpdateResponse.json
@@ -14,23 +14,28 @@
 // limitations under the License.
 
 {
-  "apiKey": 42,
+  "apiKey": 93,
   "type": "response",
-  "name": "DeleteGroupsResponse",
-  // Starting in version 1, on quota violation, brokers send out responses 
before throttling.
-  //
-  // Version 2 is the first flexible version.
-  "validVersions": "0-2",
-  "flexibleVersions": "2+",
+  "name": "StreamsGroupTopologyDescriptionUpdateResponse",
+  "validVersions": "0",
+  "flexibleVersions": "0+",
+  // Supported errors:
+  // - GROUP_AUTHORIZATION_FAILED (version 0+)
+  // - NOT_COORDINATOR (version 0+)
+  // - COORDINATOR_NOT_AVAILABLE (version 0+)
+  // - COORDINATOR_LOAD_IN_PROGRESS (version 0+)
+  // - INVALID_REQUEST (version 0+)
+  // - UNSUPPORTED_VERSION (version 0+)
+  // - UNKNOWN_MEMBER_ID (version 0+)
+  // - GROUP_ID_NOT_FOUND (version 0+)
+  // - STREAMS_TOPOLOGY_DESCRIPTION_UPDATE_FAILED (version 0+)
   "fields": [
     { "name": "ThrottleTimeMs", "type": "int32", "versions": "0+",
       "about": "The duration in milliseconds for which the request was 
throttled due to a quota violation, or zero if the request did not violate any 
quota." },
-    { "name": "Results", "type": "[]DeletableGroupResult", "versions": "0+",
-      "about": "The deletion results.", "fields": [
-      { "name": "GroupId", "type": "string", "versions": "0+", "mapKey": true, 
"entityType": "groupId",
-        "about": "The group id." },
-      { "name": "ErrorCode", "type": "int16", "versions": "0+",
-        "about": "The deletion error, or 0 if the deletion succeeded." }
-    ]}
+    { "name": "ErrorCode", "type": "int16", "versions": "0+",
+      "about": "The top-level error code, or 0 if there was no error." },
+    { "name": "ErrorMessage", "type": "string", "versions": "0+",
+      "nullableVersions": "0+", "default": "null",
+      "about": "The top-level error message, or null if there was no error." }
   ]
 }
diff --git 
a/clients/src/test/java/org/apache/kafka/common/requests/RequestResponseTest.java
 
b/clients/src/test/java/org/apache/kafka/common/requests/RequestResponseTest.java
index 29bc1213c63..26a13f561bb 100644
--- 
a/clients/src/test/java/org/apache/kafka/common/requests/RequestResponseTest.java
+++ 
b/clients/src/test/java/org/apache/kafka/common/requests/RequestResponseTest.java
@@ -234,6 +234,8 @@ import 
org.apache.kafka.common.message.StreamsGroupDescribeRequestData;
 import org.apache.kafka.common.message.StreamsGroupDescribeResponseData;
 import org.apache.kafka.common.message.StreamsGroupHeartbeatRequestData;
 import org.apache.kafka.common.message.StreamsGroupHeartbeatResponseData;
+import 
org.apache.kafka.common.message.StreamsGroupTopologyDescriptionUpdateRequestData;
+import 
org.apache.kafka.common.message.StreamsGroupTopologyDescriptionUpdateResponseData;
 import org.apache.kafka.common.message.SyncGroupRequestData;
 import 
org.apache.kafka.common.message.SyncGroupRequestData.SyncGroupRequestAssignment;
 import org.apache.kafka.common.message.SyncGroupResponseData;
@@ -703,6 +705,17 @@ public class RequestResponseTest {
         assertTrue(exception.getMessage().contains("[foo, bar]"));
     }
 
+    @Test
+    public void testDeleteGroupsResponseV3PreservesErrorMessage() {
+        DeleteGroupsResponse response = createDeleteGroupsResponse();
+        short version = ApiKeys.DELETE_GROUPS.latestVersion();
+        DeleteGroupsResponse parsed = 
DeleteGroupsResponse.parse(response.serialize(version), version);
+        DeletableGroupResult failed = 
parsed.data().results().find("failed-group");
+        assertNotNull(failed);
+        assertEquals(Errors.GROUP_DELETION_FAILED.code(), failed.errorCode());
+        assertEquals("plugin offline", failed.errorMessage());
+    }
+
     @Test
     public void testFetchRequestIsolationLevel() {
         FetchRequest request = createFetchRequest((short) 4, 
IsolationLevel.READ_COMMITTED);
@@ -1076,6 +1089,7 @@ public class RequestResponseTest {
             case DESCRIBE_SHARE_GROUP_OFFSETS: return 
createDescribeShareGroupOffsetsRequest(version);
             case ALTER_SHARE_GROUP_OFFSETS: return 
createAlterShareGroupOffsetsRequest(version);
             case DELETE_SHARE_GROUP_OFFSETS: return 
createDeleteShareGroupOffsetsRequest(version);
+            case STREAMS_GROUP_TOPOLOGY_DESCRIPTION_UPDATE: return 
createStreamsGroupTopologyDescriptionUpdateRequest(version);
             default: throw new IllegalArgumentException("Unknown API key " + 
apikey);
         }
     }
@@ -1166,11 +1180,12 @@ public class RequestResponseTest {
             case WRITE_SHARE_GROUP_STATE: return 
createWriteShareGroupStateResponse();
             case DELETE_SHARE_GROUP_STATE: return 
createDeleteShareGroupStateResponse();
             case READ_SHARE_GROUP_STATE_SUMMARY: return 
createReadShareGroupStateSummaryResponse();
-            case STREAMS_GROUP_HEARTBEAT: return 
createStreamsGroupHeartbeatResponse();
-            case STREAMS_GROUP_DESCRIBE: return 
createStreamsGroupDescribeResponse();
+            case STREAMS_GROUP_HEARTBEAT: return 
createStreamsGroupHeartbeatResponse(version);
+            case STREAMS_GROUP_DESCRIBE: return 
createStreamsGroupDescribeResponse(version);
             case DESCRIBE_SHARE_GROUP_OFFSETS: return 
createDescribeShareGroupOffsetsResponse();
             case ALTER_SHARE_GROUP_OFFSETS: return 
createAlterShareGroupOffsetsResponse();
             case DELETE_SHARE_GROUP_OFFSETS: return 
createDeleteShareGroupOffsetsResponse();
+            case STREAMS_GROUP_TOPOLOGY_DESCRIPTION_UPDATE: return 
createStreamsGroupTopologyDescriptionUpdateResponse();
             default: throw new IllegalArgumentException("Unknown API key " + 
apikey);
         }
     }
@@ -2286,6 +2301,10 @@ public class RequestResponseTest {
         result.add(new DeletableGroupResult()
                        .setGroupId("test-group")
                        .setErrorCode(Errors.NONE.code()));
+        result.add(new DeletableGroupResult()
+                       .setGroupId("failed-group")
+                       .setErrorCode(Errors.GROUP_DELETION_FAILED.code())
+                       .setErrorMessage("plugin offline"));
         return new DeleteGroupsResponse(
             new DeleteGroupsResponseData()
                 .setResults(result)
@@ -3857,20 +3876,42 @@ public class RequestResponseTest {
     }
 
     private AbstractRequest createStreamsGroupDescribeRequest(final short 
version) {
-        return new StreamsGroupDescribeRequest.Builder(new 
StreamsGroupDescribeRequestData()
+        StreamsGroupDescribeRequestData data = new 
StreamsGroupDescribeRequestData()
             .setGroupIds(Collections.singletonList("group"))
-            .setIncludeAuthorizedOperations(false)).build(version);
+            .setIncludeAuthorizedOperations(false);
+        if (version >= 1) {
+            data.setIncludeTopologyDescription(true);
+        }
+        return new StreamsGroupDescribeRequest.Builder(data).build(version);
     }
 
     private AbstractRequest createStreamsGroupHeartbeatRequest(final short 
version) {
         return new StreamsGroupHeartbeatRequest.Builder(new 
StreamsGroupHeartbeatRequestData()).build(version);
     }
 
-    private AbstractResponse createStreamsGroupDescribeResponse() {
-        StreamsGroupDescribeResponseData data = new 
StreamsGroupDescribeResponseData()
-            .setGroups(Collections.singletonList(
+    private AbstractResponse createStreamsGroupDescribeResponse(final short 
version) {
+        StreamsGroupDescribeResponseData.DescribedGroup group =
+            new StreamsGroupDescribeResponseData.DescribedGroup()
+                .setGroupId("group")
+                .setErrorCode((short) 0)
+                .setErrorMessage(Errors.forCode((short) 0).message())
+                .setGroupState("EMPTY")
+                .setGroupEpoch(0)
+                .setAssignmentEpoch(0)
+                .setMembers(new ArrayList<>(0))
+                .setTopology(null);
+        if (version >= 1) {
+            group.setTopologyDescription(new 
StreamsGroupDescribeResponseData.TopologyDescription()
+                .setSubtopologies(new ArrayList<>(0))
+                .setGlobalStores(new ArrayList<>(0)));
+            
group.setTopologyDescriptionStatus(StreamsGroupDescribeResponse.TOPOLOGY_DESCRIPTION_STATUS_AVAILABLE);
+        }
+        List<StreamsGroupDescribeResponseData.DescribedGroup> groups = new 
ArrayList<>();
+        groups.add(group);
+        if (version >= 1) {
+            StreamsGroupDescribeResponseData.DescribedGroup notStoredGroup =
                 new StreamsGroupDescribeResponseData.DescribedGroup()
-                    .setGroupId("group")
+                    .setGroupId("group-without-description")
                     .setErrorCode((short) 0)
                     .setErrorMessage(Errors.forCode((short) 0).message())
                     .setGroupState("EMPTY")
@@ -3878,13 +3919,82 @@ public class RequestResponseTest {
                     .setAssignmentEpoch(0)
                     .setMembers(new ArrayList<>(0))
                     .setTopology(null)
-            ))
+                    .setTopologyDescription(null)
+                    
.setTopologyDescriptionStatus(StreamsGroupDescribeResponse.TOPOLOGY_DESCRIPTION_STATUS_NOT_STORED);
+            groups.add(notStoredGroup);
+        }
+        StreamsGroupDescribeResponseData data = new 
StreamsGroupDescribeResponseData()
+            .setGroups(groups)
             .setThrottleTimeMs(1000);
         return new StreamsGroupDescribeResponse(data);
     }
 
-    private AbstractResponse createStreamsGroupHeartbeatResponse() {
-        return new StreamsGroupHeartbeatResponse(new 
StreamsGroupHeartbeatResponseData());
+    private AbstractResponse createStreamsGroupHeartbeatResponse(final short 
version) {
+        StreamsGroupHeartbeatResponseData data = new 
StreamsGroupHeartbeatResponseData();
+        if (version >= 1) {
+            data.setTopologyDescriptionRequired(true);
+        }
+        return new StreamsGroupHeartbeatResponse(data);
+    }
+
+    private AbstractRequest 
createStreamsGroupTopologyDescriptionUpdateRequest(final short version) {
+        
StreamsGroupTopologyDescriptionUpdateRequestData.TopologyDescriptionNode 
sourceNode =
+            new 
StreamsGroupTopologyDescriptionUpdateRequestData.TopologyDescriptionNode()
+                .setName("KSTREAM-SOURCE-0000000000")
+                .setNodeType((byte) 1)
+                .setSourceTopics(List.of("input-topic"))
+                .setSuccessors(List.of("KSTREAM-PROCESSOR-0000000001"));
+        
StreamsGroupTopologyDescriptionUpdateRequestData.TopologyDescriptionNode 
processorNode =
+            new 
StreamsGroupTopologyDescriptionUpdateRequestData.TopologyDescriptionNode()
+                .setName("KSTREAM-PROCESSOR-0000000001")
+                .setNodeType((byte) 2)
+                .setStores(List.of("store-1"))
+                .setSuccessors(List.of("KSTREAM-SINK-0000000002"));
+        
StreamsGroupTopologyDescriptionUpdateRequestData.TopologyDescriptionNode 
sinkNode =
+            new 
StreamsGroupTopologyDescriptionUpdateRequestData.TopologyDescriptionNode()
+                .setName("KSTREAM-SINK-0000000002")
+                .setNodeType((byte) 3)
+                .setSinkTopic("output-topic");
+        
StreamsGroupTopologyDescriptionUpdateRequestData.TopologyDescriptionNode 
dynamicSinkNode =
+            new 
StreamsGroupTopologyDescriptionUpdateRequestData.TopologyDescriptionNode()
+                .setName("KSTREAM-SINK-0000000005")
+                .setNodeType((byte) 3)
+                .setSinkTopic(null);
+        
StreamsGroupTopologyDescriptionUpdateRequestData.TopologyDescriptionSubtopology 
subtopology =
+            new 
StreamsGroupTopologyDescriptionUpdateRequestData.TopologyDescriptionSubtopology()
+                .setSubtopologyId("0")
+                .setNodes(List.of(sourceNode, processorNode, sinkNode, 
dynamicSinkNode));
+        
StreamsGroupTopologyDescriptionUpdateRequestData.TopologyDescriptionNode 
globalSource =
+            new 
StreamsGroupTopologyDescriptionUpdateRequestData.TopologyDescriptionNode()
+                .setName("KSTREAM-GLOBAL-SOURCE-0000000003")
+                .setNodeType((byte) 1)
+                .setSourceTopics(List.of("global-topic"));
+        
StreamsGroupTopologyDescriptionUpdateRequestData.TopologyDescriptionNode 
globalProcessor =
+            new 
StreamsGroupTopologyDescriptionUpdateRequestData.TopologyDescriptionNode()
+                .setName("KTABLE-SOURCE-0000000004")
+                .setNodeType((byte) 2)
+                .setStores(List.of("global-store"));
+        
StreamsGroupTopologyDescriptionUpdateRequestData.TopologyDescriptionGlobalStore 
globalStore =
+            new 
StreamsGroupTopologyDescriptionUpdateRequestData.TopologyDescriptionGlobalStore()
+                .setSource(globalSource)
+                .setProcessor(globalProcessor);
+        StreamsGroupTopologyDescriptionUpdateRequestData.TopologyDescription 
topology =
+            new 
StreamsGroupTopologyDescriptionUpdateRequestData.TopologyDescription()
+                .setSubtopologies(List.of(subtopology))
+                .setGlobalStores(List.of(globalStore));
+        return new StreamsGroupTopologyDescriptionUpdateRequest.Builder(
+            new StreamsGroupTopologyDescriptionUpdateRequestData()
+                .setGroupId("test-group")
+                .setMemberId("test-member")
+                .setTopologyEpoch(1)
+                .setTopologyDescription(topology)
+        ).build(version);
+    }
+
+    private AbstractResponse 
createStreamsGroupTopologyDescriptionUpdateResponse() {
+        return new StreamsGroupTopologyDescriptionUpdateResponse(
+            new StreamsGroupTopologyDescriptionUpdateResponseData()
+        );
     }
 
     @Test
@@ -4018,4 +4128,24 @@ public class RequestResponseTest {
                 assertThrows(UnsupportedVersionException.class, () -> new 
ListConfigResourcesRequest.Builder(data).build((short) 0));
             });
     }
+
+    @Test
+    public void 
testStreamsGroupDescribeRequestV0RejectsIncludeTopologyDescription() {
+        StreamsGroupDescribeRequestData data = new 
StreamsGroupDescribeRequestData()
+            .setGroupIds(List.of("g1"))
+            .setIncludeTopologyDescription(true);
+        StreamsGroupDescribeRequest request = new 
StreamsGroupDescribeRequest.Builder(data).build((short) 0);
+        assertThrows(UnsupportedVersionException.class, () -> 
request.serialize());
+    }
+
+    @Test
+    public void 
testStreamsGroupDescribeResponseV0RejectsTopologyDescriptionFields() {
+        StreamsGroupDescribeResponseData data = new 
StreamsGroupDescribeResponseData()
+            .setGroups(List.of(new 
StreamsGroupDescribeResponseData.DescribedGroup()
+                .setGroupId("g1")
+                .setTopologyDescription(new 
StreamsGroupDescribeResponseData.TopologyDescription())
+                
.setTopologyDescriptionStatus(StreamsGroupDescribeResponse.TOPOLOGY_DESCRIPTION_STATUS_AVAILABLE)));
+        StreamsGroupDescribeResponse response = new 
StreamsGroupDescribeResponse(data);
+        assertThrows(UnsupportedVersionException.class, () -> 
response.serialize((short) 0));
+    }
 }
diff --git a/core/src/main/scala/kafka/server/KafkaApis.scala 
b/core/src/main/scala/kafka/server/KafkaApis.scala
index 03bac25b016..a530454cb91 100644
--- a/core/src/main/scala/kafka/server/KafkaApis.scala
+++ b/core/src/main/scala/kafka/server/KafkaApis.scala
@@ -248,6 +248,7 @@ class KafkaApis(val requestChannel: RequestChannel,
         case ApiKeys.DELETE_SHARE_GROUP_OFFSETS => 
handleDeleteShareGroupOffsetsRequest(request).exceptionally(handleError)
         case ApiKeys.STREAMS_GROUP_DESCRIBE => 
handleStreamsGroupDescribe(request).exceptionally(handleError)
         case ApiKeys.STREAMS_GROUP_HEARTBEAT => 
handleStreamsGroupHeartbeat(request).exceptionally(handleError)
+        case ApiKeys.STREAMS_GROUP_TOPOLOGY_DESCRIPTION_UPDATE => 
handleStreamsGroupTopologyDescriptionUpdate(request).exceptionally(handleError)
         case _ => throw new IllegalStateException(s"No handler for request api 
key ${request.header.apiKey}")
       }
     } catch {
@@ -2897,6 +2898,15 @@ class KafkaApis(val requestChannel: RequestChannel,
     }
   }
 
+  // Stub handler for KIP-1331. The full handler lands in a later sub-task; 
until then this
+  // responds with UNSUPPORTED_VERSION so callers fail loud rather than hit 
the IllegalStateException
+  // default branch in handle().
+  def handleStreamsGroupTopologyDescriptionUpdate(request: Request): 
CompletableFuture[Unit] = {
+    val updateRequest = 
request.body(classOf[StreamsGroupTopologyDescriptionUpdateRequest])
+    requestHelper.sendMaybeThrottle(request, 
updateRequest.getErrorResponse(Errors.UNSUPPORTED_VERSION.exception))
+    CompletableFuture.completedFuture[Unit](())
+  }
+
   def handleStreamsGroupDescribe(request: Request): CompletableFuture[Unit] = {
     val streamsGroupDescribeRequest = 
request.body(classOf[StreamsGroupDescribeRequest])
     val includeAuthorizedOperations = 
streamsGroupDescribeRequest.data.includeAuthorizedOperations
diff --git a/core/src/test/scala/unit/kafka/server/RequestQuotaTest.scala 
b/core/src/test/scala/unit/kafka/server/RequestQuotaTest.scala
index 639328798a0..02c523f0d3f 100644
--- a/core/src/test/scala/unit/kafka/server/RequestQuotaTest.scala
+++ b/core/src/test/scala/unit/kafka/server/RequestQuotaTest.scala
@@ -758,6 +758,9 @@ class RequestQuotaTest extends BaseRequestTest {
         case ApiKeys.STREAMS_GROUP_DESCRIBE =>
           new StreamsGroupDescribeRequest.Builder(new 
StreamsGroupDescribeRequestData())
 
+        case ApiKeys.STREAMS_GROUP_TOPOLOGY_DESCRIPTION_UPDATE =>
+          new StreamsGroupTopologyDescriptionUpdateRequest.Builder(new 
StreamsGroupTopologyDescriptionUpdateRequestData())
+
         case ApiKeys.DESCRIBE_SHARE_GROUP_OFFSETS =>
           new DescribeShareGroupOffsetsRequest.Builder(new 
DescribeShareGroupOffsetsRequestData())
 
diff --git 
a/server/src/main/java/org/apache/kafka/network/RequestConvertToJson.java 
b/server/src/main/java/org/apache/kafka/network/RequestConvertToJson.java
index 09602ea498d..023ef02a5f6 100644
--- a/server/src/main/java/org/apache/kafka/network/RequestConvertToJson.java
+++ b/server/src/main/java/org/apache/kafka/network/RequestConvertToJson.java
@@ -179,6 +179,8 @@ import 
org.apache.kafka.common.message.StreamsGroupDescribeRequestDataJsonConver
 import 
org.apache.kafka.common.message.StreamsGroupDescribeResponseDataJsonConverter;
 import 
org.apache.kafka.common.message.StreamsGroupHeartbeatRequestDataJsonConverter;
 import 
org.apache.kafka.common.message.StreamsGroupHeartbeatResponseDataJsonConverter;
+import 
org.apache.kafka.common.message.StreamsGroupTopologyDescriptionUpdateRequestDataJsonConverter;
+import 
org.apache.kafka.common.message.StreamsGroupTopologyDescriptionUpdateResponseDataJsonConverter;
 import org.apache.kafka.common.message.SyncGroupRequestDataJsonConverter;
 import org.apache.kafka.common.message.SyncGroupResponseDataJsonConverter;
 import org.apache.kafka.common.message.TxnOffsetCommitRequestDataJsonConverter;
@@ -362,6 +364,8 @@ import 
org.apache.kafka.common.requests.StreamsGroupDescribeRequest;
 import org.apache.kafka.common.requests.StreamsGroupDescribeResponse;
 import org.apache.kafka.common.requests.StreamsGroupHeartbeatRequest;
 import org.apache.kafka.common.requests.StreamsGroupHeartbeatResponse;
+import 
org.apache.kafka.common.requests.StreamsGroupTopologyDescriptionUpdateRequest;
+import 
org.apache.kafka.common.requests.StreamsGroupTopologyDescriptionUpdateResponse;
 import org.apache.kafka.common.requests.SyncGroupRequest;
 import org.apache.kafka.common.requests.SyncGroupResponse;
 import org.apache.kafka.common.requests.TxnOffsetCommitRequest;
@@ -564,6 +568,8 @@ public class RequestConvertToJson {
                 
UpdateFeaturesRequestDataJsonConverter.write(((UpdateFeaturesRequest) 
request).data(), request.version());
             case UPDATE_RAFT_VOTER ->
                 
UpdateRaftVoterRequestDataJsonConverter.write(((UpdateRaftVoterRequest) 
request).data(), request.version());
+            case STREAMS_GROUP_TOPOLOGY_DESCRIPTION_UPDATE ->
+                
StreamsGroupTopologyDescriptionUpdateRequestDataJsonConverter.write(((StreamsGroupTopologyDescriptionUpdateRequest)
 request).data(), request.version());
             case VOTE -> VoteRequestDataJsonConverter.write(((VoteRequest) 
request).data(), request.version());
             case WRITE_SHARE_GROUP_STATE ->
                 
WriteShareGroupStateRequestDataJsonConverter.write(((WriteShareGroupStateRequest)
 request).data(), request.version());
@@ -741,6 +747,8 @@ public class RequestConvertToJson {
                 
UpdateFeaturesResponseDataJsonConverter.write(((UpdateFeaturesResponse) 
response).data(), version);
             case UPDATE_RAFT_VOTER ->
                 
UpdateRaftVoterResponseDataJsonConverter.write(((UpdateRaftVoterResponse) 
response).data(), version);
+            case STREAMS_GROUP_TOPOLOGY_DESCRIPTION_UPDATE ->
+                
StreamsGroupTopologyDescriptionUpdateResponseDataJsonConverter.write(((StreamsGroupTopologyDescriptionUpdateResponse)
 response).data(), version);
             case VOTE -> VoteResponseDataJsonConverter.write(((VoteResponse) 
response).data(), version);
             case WRITE_SHARE_GROUP_STATE ->
                 
WriteShareGroupStateResponseDataJsonConverter.write(((WriteShareGroupStateResponse)
 response).data(), version);

Reply via email to