This is an automated email from the ASF dual-hosted git repository.

cadonna pushed a commit to branch kip1071
in repository https://gitbox.apache.org/repos/asf/kafka.git

commit 6a4aa0cf1d40d0585efd4b13f21d9ed6691baa7f
Author: Lucas Brutschy <[email protected]>
AuthorDate: Thu Aug 15 15:32:21 2024 +0200

    Resolve conflicts from 11/25 trunk rebase - Implement DescribeStreamsGroup 
RPC handling
    
    Implement the DescribeStreamsGroup RPC handling for KIP-1071.
---
 .../org/apache/kafka/common/protocol/ApiKeys.java  |   2 +-
 .../kafka/common/requests/AbstractRequest.java     |   2 +
 .../kafka/common/requests/AbstractResponse.java    |   2 +
 .../requests/StreamsGroupDescribeRequest.java      |  98 +++++++++
 .../requests/StreamsGroupDescribeResponse.java     |  76 +++++++
 .../message/StreamsGroupDescribeRequest.json       |   2 +-
 .../message/StreamsGroupDescribeResponse.json      |  10 +-
 .../message/StreamsGroupHeartbeatRequest.json      |   2 +-
 .../message/StreamsGroupInitializeRequest.json     |   2 +-
 .../StreamsGroupHeartbeatRequestManagerTest.java   |   3 +-
 .../kafka/common/requests/RequestResponseTest.java |  57 +++---
 .../group/GroupCoordinatorAdapter.scala            |  11 +-
 core/src/main/scala/kafka/server/KafkaApis.scala   |  66 +++++-
 core/src/main/scala/kafka/server/KafkaConfig.scala |   7 +
 .../scala/unit/kafka/server/KafkaApisTest.scala    | 224 +++++++++++++++++----
 .../scala/unit/kafka/server/KafkaConfigTest.scala  |   6 +
 gradle/spotbugs-exclude.xml                        |   5 +
 .../kafka/coordinator/group/GroupCoordinator.java  |  14 ++
 .../coordinator/group/GroupCoordinatorService.java |  53 +++++
 .../coordinator/group/GroupCoordinatorShard.java   |  16 ++
 .../coordinator/group/GroupMetadataManager.java    |  35 +++-
 .../group/streams/CurrentAssignmentBuilder.java    |   8 +-
 .../coordinator/group/streams/StreamsGroup.java    | 109 ++--------
 .../group/streams/StreamsGroupMember.java          | 109 +++++-----
 .../coordinator/group/streams/StreamsTopology.java |  33 +++
 .../group/streams/TargetAssignmentBuilder.java     |  19 --
 .../group/GroupCoordinatorServiceTest.java         | 138 +++++++++++++
 .../group/GroupCoordinatorShardTest.java           |   2 +-
 .../group/GroupMetadataManagerTest.java            | 112 +++++++++++
 .../group/GroupMetadataManagerTestContext.java     |  94 ++++++++-
 .../group/streams/StreamsGroupBuilder.java         | 101 ++++++++++
 .../group/streams/StreamsGroupMemberTest.java      | 127 +++++++++++-
 .../group/streams/StreamsGroupTest.java            |   4 +-
 .../group/streams/StreamsTopologyTest.java         | 182 +++++++++++++++++
 .../group/streams/TargetAssignmentBuilderTest.java |   5 +-
 35 files changed, 1463 insertions(+), 273 deletions(-)

diff --git 
a/clients/src/main/java/org/apache/kafka/common/protocol/ApiKeys.java 
b/clients/src/main/java/org/apache/kafka/common/protocol/ApiKeys.java
index a53b082c997..345b9b808a8 100644
--- a/clients/src/main/java/org/apache/kafka/common/protocol/ApiKeys.java
+++ b/clients/src/main/java/org/apache/kafka/common/protocol/ApiKeys.java
@@ -131,8 +131,8 @@ public enum ApiKeys {
     WRITE_SHARE_GROUP_STATE(ApiMessageType.WRITE_SHARE_GROUP_STATE, true),
     DELETE_SHARE_GROUP_STATE(ApiMessageType.DELETE_SHARE_GROUP_STATE, true),
     
READ_SHARE_GROUP_STATE_SUMMARY(ApiMessageType.READ_SHARE_GROUP_STATE_SUMMARY, 
true),
-    STREAMS_GROUP_HEARTBEAT(ApiMessageType.STREAMS_GROUP_HEARTBEAT),
     STREAMS_GROUP_INITIALIZE(ApiMessageType.STREAMS_GROUP_INITIALIZE),
+    STREAMS_GROUP_HEARTBEAT(ApiMessageType.STREAMS_GROUP_HEARTBEAT),
     STREAMS_GROUP_DESCRIBE(ApiMessageType.STREAMS_GROUP_DESCRIBE);
     
 
diff --git 
a/clients/src/main/java/org/apache/kafka/common/requests/AbstractRequest.java 
b/clients/src/main/java/org/apache/kafka/common/requests/AbstractRequest.java
index 3d99419987d..4e5d21251f8 100644
--- 
a/clients/src/main/java/org/apache/kafka/common/requests/AbstractRequest.java
+++ 
b/clients/src/main/java/org/apache/kafka/common/requests/AbstractRequest.java
@@ -354,6 +354,8 @@ public abstract class AbstractRequest implements 
AbstractRequestResponse {
                 return StreamsGroupHeartbeatRequest.parse(buffer, apiVersion);
             case STREAMS_GROUP_INITIALIZE:
                 return StreamsGroupInitializeRequest.parse(buffer, apiVersion);
+            case STREAMS_GROUP_DESCRIBE:
+                return StreamsGroupDescribeRequest.parse(buffer, apiVersion);
             default:
                 throw new AssertionError(String.format("ApiKey %s is not 
currently handled in `parseRequest`, the " +
                         "code should be updated to do so.", apiKey));
diff --git 
a/clients/src/main/java/org/apache/kafka/common/requests/AbstractResponse.java 
b/clients/src/main/java/org/apache/kafka/common/requests/AbstractResponse.java
index 9e25cfd99da..25f3afe0d5b 100644
--- 
a/clients/src/main/java/org/apache/kafka/common/requests/AbstractResponse.java
+++ 
b/clients/src/main/java/org/apache/kafka/common/requests/AbstractResponse.java
@@ -291,6 +291,8 @@ public abstract class AbstractResponse implements 
AbstractRequestResponse {
                 return StreamsGroupHeartbeatResponse.parse(responseBuffer, 
version);
             case STREAMS_GROUP_INITIALIZE:
                 return StreamsGroupInitializeResponse.parse(responseBuffer, 
version);
+            case STREAMS_GROUP_DESCRIBE:
+                return StreamsGroupDescribeResponse.parse(responseBuffer, 
version);
             default:
                 throw new AssertionError(String.format("ApiKey %s is not 
currently handled in `parseResponse`, the " +
                         "code should be updated to do so.", apiKey));
diff --git 
a/clients/src/main/java/org/apache/kafka/common/requests/StreamsGroupDescribeRequest.java
 
b/clients/src/main/java/org/apache/kafka/common/requests/StreamsGroupDescribeRequest.java
new file mode 100644
index 00000000000..2a3834d16a8
--- /dev/null
+++ 
b/clients/src/main/java/org/apache/kafka/common/requests/StreamsGroupDescribeRequest.java
@@ -0,0 +1,98 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ *    http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.kafka.common.requests;
+
+import java.nio.ByteBuffer;
+import java.util.List;
+import java.util.stream.Collectors;
+import org.apache.kafka.common.message.StreamsGroupDescribeRequestData;
+import org.apache.kafka.common.message.StreamsGroupDescribeResponseData;
+import org.apache.kafka.common.protocol.ApiKeys;
+import org.apache.kafka.common.protocol.ByteBufferAccessor;
+import org.apache.kafka.common.protocol.Errors;
+
+public class StreamsGroupDescribeRequest extends AbstractRequest {
+
+    public static class Builder extends 
AbstractRequest.Builder<StreamsGroupDescribeRequest> {
+
+        private final StreamsGroupDescribeRequestData data;
+
+        public Builder(StreamsGroupDescribeRequestData data) {
+            this(data, false);
+        }
+
+        public Builder(StreamsGroupDescribeRequestData data, boolean 
enableUnstableLastVersion) {
+            super(ApiKeys.STREAMS_GROUP_DESCRIBE, enableUnstableLastVersion);
+            this.data = data;
+        }
+
+        @Override
+        public StreamsGroupDescribeRequest build(short version) {
+            return new StreamsGroupDescribeRequest(data, version);
+        }
+
+        @Override
+        public String toString() {
+            return data.toString();
+        }
+    }
+
+    private final StreamsGroupDescribeRequestData data;
+
+    public StreamsGroupDescribeRequest(StreamsGroupDescribeRequestData data, 
short version) {
+        super(ApiKeys.STREAMS_GROUP_DESCRIBE, version);
+        this.data = data;
+    }
+
+    @Override
+    public StreamsGroupDescribeResponse getErrorResponse(int throttleTimeMs, 
Throwable e) {
+        StreamsGroupDescribeResponseData data = new 
StreamsGroupDescribeResponseData()
+            .setThrottleTimeMs(throttleTimeMs);
+        // Set error for each group
+        this.data.groupIds().forEach(
+            groupId -> data.groups().add(
+                new StreamsGroupDescribeResponseData.DescribedGroup()
+                    .setGroupId(groupId)
+                    .setErrorCode(Errors.forException(e).code())
+            )
+        );
+        return new StreamsGroupDescribeResponse(data);
+    }
+
+    @Override
+    public StreamsGroupDescribeRequestData data() {
+        return data;
+    }
+
+    public static StreamsGroupDescribeRequest parse(ByteBuffer buffer, short 
version) {
+        return new StreamsGroupDescribeRequest(
+            new StreamsGroupDescribeRequestData(new 
ByteBufferAccessor(buffer), version),
+            version
+        );
+    }
+
+    public static List<StreamsGroupDescribeResponseData.DescribedGroup> 
getErrorDescribedGroupList(
+        List<String> groupIds,
+        Errors error
+    ) {
+        return groupIds.stream()
+            .map(groupId -> new 
StreamsGroupDescribeResponseData.DescribedGroup()
+                .setGroupId(groupId)
+                .setErrorCode(error.code())
+            ).collect(Collectors.toList());
+    }
+}
diff --git 
a/clients/src/main/java/org/apache/kafka/common/requests/StreamsGroupDescribeResponse.java
 
b/clients/src/main/java/org/apache/kafka/common/requests/StreamsGroupDescribeResponse.java
new file mode 100644
index 00000000000..cf1d5529623
--- /dev/null
+++ 
b/clients/src/main/java/org/apache/kafka/common/requests/StreamsGroupDescribeResponse.java
@@ -0,0 +1,76 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ *    http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.kafka.common.requests;
+
+import java.nio.ByteBuffer;
+import java.util.HashMap;
+import java.util.Map;
+import org.apache.kafka.common.message.StreamsGroupDescribeResponseData;
+import org.apache.kafka.common.protocol.ApiKeys;
+import org.apache.kafka.common.protocol.ByteBufferAccessor;
+import org.apache.kafka.common.protocol.Errors;
+
+/**
+ * Possible error codes.
+ *
+ * - {@link Errors#GROUP_AUTHORIZATION_FAILED}
+ * - {@link Errors#NOT_COORDINATOR}
+ * - {@link Errors#COORDINATOR_NOT_AVAILABLE}
+ * - {@link Errors#COORDINATOR_LOAD_IN_PROGRESS}
+ * - {@link Errors#INVALID_REQUEST}
+ * - {@link Errors#INVALID_GROUP_ID}
+ * - {@link Errors#GROUP_ID_NOT_FOUND}
+ */
+public class StreamsGroupDescribeResponse extends AbstractResponse {
+
+    private final StreamsGroupDescribeResponseData data;
+
+    public StreamsGroupDescribeResponse(StreamsGroupDescribeResponseData data) 
{
+        super(ApiKeys.STREAMS_GROUP_DESCRIBE);
+        this.data = data;
+    }
+
+    @Override
+    public StreamsGroupDescribeResponseData data() {
+        return data;
+    }
+
+    @Override
+    public Map<Errors, Integer> errorCounts() {
+        HashMap<Errors, Integer> counts = new HashMap<>();
+        data.groups().forEach(
+            group -> updateErrorCounts(counts, 
Errors.forCode(group.errorCode()))
+        );
+        return counts;
+    }
+
+    @Override
+    public int throttleTimeMs() {
+        return data.throttleTimeMs();
+    }
+
+    @Override
+    public void maybeSetThrottleTimeMs(int throttleTimeMs) {
+        data.setThrottleTimeMs(throttleTimeMs);
+    }
+
+    public static StreamsGroupDescribeResponse parse(ByteBuffer buffer, short 
version) {
+        return new StreamsGroupDescribeResponse(
+            new StreamsGroupDescribeResponseData(new 
ByteBufferAccessor(buffer), version)
+        );
+    }
+}
diff --git 
a/clients/src/main/resources/common/message/StreamsGroupDescribeRequest.json 
b/clients/src/main/resources/common/message/StreamsGroupDescribeRequest.json
index 17b4abac9d2..b7047141e8a 100644
--- a/clients/src/main/resources/common/message/StreamsGroupDescribeRequest.json
+++ b/clients/src/main/resources/common/message/StreamsGroupDescribeRequest.json
@@ -16,7 +16,7 @@
 {
   "apiKey": 90,
   "type": "request",
-  "listeners": ["broker"],
+  "listeners": ["broker", "zkBroker"],
   "name": "StreamsGroupDescribeRequest",
   "validVersions": "0",
   "flexibleVersions": "0+",
diff --git 
a/clients/src/main/resources/common/message/StreamsGroupDescribeResponse.json 
b/clients/src/main/resources/common/message/StreamsGroupDescribeResponse.json
index 1093100992f..9cf580f0441 100644
--- 
a/clients/src/main/resources/common/message/StreamsGroupDescribeResponse.json
+++ 
b/clients/src/main/resources/common/message/StreamsGroupDescribeResponse.json
@@ -45,8 +45,8 @@
           "about": "The group epoch." },
         { "name": "AssignmentEpoch", "type": "int32", "versions": "0+",
           "about": "The assignment epoch." },
-        { "name":  "Topology", "type": "[]Subtopology", "versions": "0+",
-          "about": "The sub-topologies of the streams application.",
+        { "name":  "Topology", "type": "[]Subtopology", "versions": "0+", 
"nullableVersions": "0+", "default": "null",
+          "about": "The sub-topologies of the streams application. Null if 
uninitialized.",
           "fields": [
             { "name": "Subtopology", "type": "string", "versions": "0+",
               "about": "String to uniquely identify the sub-topology." },
@@ -54,8 +54,8 @@
               "about": "The topics the topology reads from." },
             { "name": "SourceTopicRegex", "type": "string", "versions": "0+",
               "about": "The regular expressions identifying topics the 
topology reads from." },
-            { "name": "SinkTopics", "type": "[]string", "versions": "0+",
-              "about": "The topics the topology writes to." },
+            { "name": "RepartitionSinkTopics", "type": "[]string", "versions": 
"0+",
+              "about": "The repartition topics the topology writes to." },
             { "name": "StateChangelogTopics", "type": "[]TopicInfo", 
"versions": "0+",
               "about": "The set of state changelog topics associated with this 
sub-topology. Created automatically." },
             { "name": "RepartitionSourceTopics", "type": "[]TopicInfo", 
"versions": "0+",
@@ -82,7 +82,7 @@
             { "name": "TopologyId", "type": "string", "versions": "0+",
               "about": "The ID of the topology. Must be non-empty." },
 
-            { "name": "ProcessId", "type": "uuid", "versions": "0+",
+            { "name": "ProcessId", "type": "string", "versions": "0+",
               "about": "Identity of the streams instance that may have 
multiple clients. " },
             { "name": "ClientTags", "type": "[]KeyValue", "versions": "0+",
               "about": "Used for rack-aware assignment algorithm." },
diff --git 
a/clients/src/main/resources/common/message/StreamsGroupHeartbeatRequest.json 
b/clients/src/main/resources/common/message/StreamsGroupHeartbeatRequest.json
index ac765b2269c..7f829f1e410 100644
--- 
a/clients/src/main/resources/common/message/StreamsGroupHeartbeatRequest.json
+++ 
b/clients/src/main/resources/common/message/StreamsGroupHeartbeatRequest.json
@@ -16,7 +16,7 @@
 {
   "apiKey": 89,
   "type": "request",
-  "listeners": ["broker"],
+  "listeners": ["broker", "zkBroker"],
   "name": "StreamsGroupHeartbeatRequest",
   "validVersions": "0",
   "flexibleVersions": "0+",
diff --git 
a/clients/src/main/resources/common/message/StreamsGroupInitializeRequest.json 
b/clients/src/main/resources/common/message/StreamsGroupInitializeRequest.json
index 169629fb2c0..602b557e3e9 100644
--- 
a/clients/src/main/resources/common/message/StreamsGroupInitializeRequest.json
+++ 
b/clients/src/main/resources/common/message/StreamsGroupInitializeRequest.json
@@ -16,7 +16,7 @@
 {
   "apiKey": 88,
   "type": "request",
-  "listeners": ["broker"],
+  "listeners": ["broker", "zkBroker"],
   "name": "StreamsGroupInitializeRequest",
   "validVersions": "0",
   "flexibleVersions": "0+",
diff --git 
a/clients/src/test/java/org/apache/kafka/clients/consumer/internals/StreamsGroupHeartbeatRequestManagerTest.java
 
b/clients/src/test/java/org/apache/kafka/clients/consumer/internals/StreamsGroupHeartbeatRequestManagerTest.java
index bc91fc5df83..f4c67e1d9b9 100644
--- 
a/clients/src/test/java/org/apache/kafka/clients/consumer/internals/StreamsGroupHeartbeatRequestManagerTest.java
+++ 
b/clients/src/test/java/org/apache/kafka/clients/consumer/internals/StreamsGroupHeartbeatRequestManagerTest.java
@@ -214,7 +214,8 @@ class StreamsGroupHeartbeatRequestManagerTest {
         StreamsGroupHeartbeatRequest request = (StreamsGroupHeartbeatRequest) 
result.unsentRequests.get(0).requestBuilder().build();
 
         assertEquals(processID.toString(), request.data().processId());
-        assertEquals(endPoint, request.data().userEndpoint());
+        assertEquals(endPoint.host, request.data().userEndpoint().host());
+        assertEquals(endPoint.port, request.data().userEndpoint().port());
         assertEquals(1, request.data().clientTags().size());
         assertEquals("clientTag1", request.data().clientTags().get(0).key());
         assertEquals("value2", request.data().clientTags().get(0).value());
diff --git 
a/clients/src/test/java/org/apache/kafka/common/requests/RequestResponseTest.java
 
b/clients/src/test/java/org/apache/kafka/common/requests/RequestResponseTest.java
index 5bf5baf7c8b..1e5a1c4c864 100644
--- 
a/clients/src/test/java/org/apache/kafka/common/requests/RequestResponseTest.java
+++ 
b/clients/src/test/java/org/apache/kafka/common/requests/RequestResponseTest.java
@@ -234,6 +234,8 @@ import 
org.apache.kafka.common.message.ShareGroupHeartbeatResponseData;
 import 
org.apache.kafka.common.message.StopReplicaRequestData.StopReplicaPartitionState;
 import 
org.apache.kafka.common.message.StopReplicaRequestData.StopReplicaTopicState;
 import org.apache.kafka.common.message.StopReplicaResponseData;
+import org.apache.kafka.common.message.StreamsGroupDescribeRequestData;
+import org.apache.kafka.common.message.StreamsGroupDescribeResponseData;
 import org.apache.kafka.common.message.StreamsGroupHeartbeatRequestData;
 import org.apache.kafka.common.message.StreamsGroupHeartbeatResponseData;
 import org.apache.kafka.common.message.StreamsGroupInitializeRequestData;
@@ -1127,8 +1129,9 @@ public class RequestResponseTest {
             case WRITE_SHARE_GROUP_STATE: return 
createWriteShareGroupStateRequest(version);
             case DELETE_SHARE_GROUP_STATE: return 
createDeleteShareGroupStateRequest(version);
             case READ_SHARE_GROUP_STATE_SUMMARY: return 
createReadShareGroupStateSummaryRequest(version);
-            case STREAMS_GROUP_HEARTBEAT: return 
createStreamsHeartbeatRequest(version);
-            case STREAMS_GROUP_INITIALIZE: return 
createStreamsInitializeRequest(version);
+            case STREAMS_GROUP_HEARTBEAT: return 
createStreamsGroupHeartbeatRequest(version);
+            case STREAMS_GROUP_INITIALIZE: return 
createStreamsGroupInitializeRequest(version);
+            case STREAMS_GROUP_DESCRIBE: return 
createStreamsGroupDescribeRequest(version);
             default: throw new IllegalArgumentException("Unknown API key " + 
apikey);
         }
     }
@@ -1223,8 +1226,9 @@ public class RequestResponseTest {
             case WRITE_SHARE_GROUP_STATE: return 
createWriteShareGroupStateResponse();
             case DELETE_SHARE_GROUP_STATE: return 
createDeleteShareGroupStateResponse();
             case READ_SHARE_GROUP_STATE_SUMMARY: return 
createReadShareGroupStateSummaryResponse();
-            case STREAMS_GROUP_HEARTBEAT: return 
createStreamsHeartbeatResponse();
-            case STREAMS_GROUP_INITIALIZE: return 
createStreamsInitializeResponse();
+            case STREAMS_GROUP_HEARTBEAT: return 
createStreamsGroupHeartbeatResponse();
+            case STREAMS_GROUP_INITIALIZE: return 
createStreamsGroupInitializeResponse();
+            case STREAMS_GROUP_DESCRIBE: return 
createStreamsGroupDescribeResponse();
             default: throw new IllegalArgumentException("Unknown API key " + 
apikey);
         }
     }
@@ -4023,35 +4027,42 @@ public class RequestResponseTest {
         return new ReadShareGroupStateSummaryResponse(data);
     }
 
-//    private AbstractRequest createStreamsPrepareAssignmentRequest(final 
short version) {
-//        return new StreamsPrepareAssignmentRequest.Builder(new 
StreamsPrepareAssignmentRequestData()).build(version);
-//    }
-//
-//    private AbstractRequest createStreamsInstallAssignmentRequest(final 
short version) {
-//        return new StreamsInstallAssignmentRequest.Builder(new 
StreamsInstallAssignmentRequestData()).build(version);
-//    }
+    private AbstractRequest createStreamsGroupDescribeRequest(final short 
version) {
+        return new StreamsGroupDescribeRequest.Builder(new 
StreamsGroupDescribeRequestData()
+            .setGroupIds(Collections.singletonList("group"))
+            .setIncludeAuthorizedOperations(false)).build(version);
+    }
 
-    private AbstractRequest createStreamsInitializeRequest(final short 
version) {
+    private AbstractRequest createStreamsGroupInitializeRequest(final short 
version) {
         return new StreamsGroupInitializeRequest.Builder(new 
StreamsGroupInitializeRequestData()).build(version);
     }
 
-    private AbstractRequest createStreamsHeartbeatRequest(final short version) 
{
+    private AbstractRequest createStreamsGroupHeartbeatRequest(final short 
version) {
         return new StreamsGroupHeartbeatRequest.Builder(new 
StreamsGroupHeartbeatRequestData()).build(version);
     }
 
-//    private AbstractResponse createStreamsPrepareAssignmentResponse() {
-//        return new StreamsPrepareAssignmentResponse(new 
StreamsPrepareAssignmentResponseData());
-//    }
-//
-//    private AbstractResponse createStreamsInstallAssignmentResponse() {
-//        return new StreamsInstallAssignmentResponse(new 
StreamsInstallAssignmentResponseData());
-//    }
-//
-    private AbstractResponse createStreamsInitializeResponse() {
+    private AbstractResponse createStreamsGroupDescribeResponse() {
+        StreamsGroupDescribeResponseData data = new 
StreamsGroupDescribeResponseData()
+            .setGroups(Collections.singletonList(
+                new StreamsGroupDescribeResponseData.DescribedGroup()
+                    .setGroupId("group")
+                    .setErrorCode((short) 0)
+                    .setErrorMessage(Errors.forCode((short) 0).message())
+                    .setGroupState("EMPTY")
+                    .setGroupEpoch(0)
+                    .setAssignmentEpoch(0)
+                    .setMembers(new ArrayList<>(0))
+                    .setTopology(new ArrayList<>(0))
+            ))
+            .setThrottleTimeMs(1000);
+        return new StreamsGroupDescribeResponse(data);
+    }
+
+    private AbstractResponse createStreamsGroupInitializeResponse() {
         return new StreamsGroupInitializeResponse(new 
StreamsGroupInitializeResponseData());
     }
 
-    private AbstractResponse createStreamsHeartbeatResponse() {
+    private AbstractResponse createStreamsGroupHeartbeatResponse() {
         return new StreamsGroupHeartbeatResponse(new 
StreamsGroupHeartbeatResponseData());
     }
 
diff --git 
a/core/src/main/scala/kafka/coordinator/group/GroupCoordinatorAdapter.scala 
b/core/src/main/scala/kafka/coordinator/group/GroupCoordinatorAdapter.scala
index 74f30f0d765..a48b2c6ffc8 100644
--- a/core/src/main/scala/kafka/coordinator/group/GroupCoordinatorAdapter.scala
+++ b/core/src/main/scala/kafka/coordinator/group/GroupCoordinatorAdapter.scala
@@ -18,7 +18,7 @@ package kafka.coordinator.group
 
 import kafka.server.{KafkaConfig, ReplicaManager}
 import org.apache.kafka.common.{TopicIdPartition, TopicPartition, Uuid}
-import org.apache.kafka.common.message.{ConsumerGroupDescribeResponseData, 
ConsumerGroupHeartbeatRequestData, ConsumerGroupHeartbeatResponseData, 
DeleteGroupsResponseData, DescribeGroupsResponseData, HeartbeatRequestData, 
HeartbeatResponseData, JoinGroupRequestData, JoinGroupResponseData, 
LeaveGroupRequestData, LeaveGroupResponseData, ListGroupsRequestData, 
ListGroupsResponseData, OffsetCommitRequestData, OffsetCommitResponseData, 
OffsetDeleteRequestData, OffsetDeleteResponseData, Offset [...]
+import org.apache.kafka.common.message.{ConsumerGroupDescribeResponseData, 
ConsumerGroupHeartbeatRequestData, ConsumerGroupHeartbeatResponseData, 
DeleteGroupsResponseData, DescribeGroupsResponseData, HeartbeatRequestData, 
HeartbeatResponseData, JoinGroupRequestData, JoinGroupResponseData, 
LeaveGroupRequestData, LeaveGroupResponseData, ListGroupsRequestData, 
ListGroupsResponseData, OffsetCommitRequestData, OffsetCommitResponseData, 
OffsetDeleteRequestData, OffsetDeleteResponseData, Offset [...]
 import org.apache.kafka.common.metrics.Metrics
 import org.apache.kafka.common.protocol.{ApiKeys, Errors}
 import org.apache.kafka.common.record.RecordBatch
@@ -672,6 +672,15 @@ private[group] class GroupCoordinatorAdapter(
     ))
   }
 
+  override def streamsGroupDescribe(
+    context: RequestContext,
+    groupIds: util.List[String]
+  ): 
CompletableFuture[util.List[StreamsGroupDescribeResponseData.DescribedGroup]] = 
{
+    FutureUtils.failedFuture(Errors.UNSUPPORTED_VERSION.exception(
+      s"The old group coordinator does not support 
${ApiKeys.STREAMS_GROUP_DESCRIBE.name} API."
+    ))
+  }
+
   override def shareGroupDescribe(
     context: RequestContext,
     groupIds: util.List[String]
diff --git a/core/src/main/scala/kafka/server/KafkaApis.scala 
b/core/src/main/scala/kafka/server/KafkaApis.scala
index 9b22b16a55f..d3bfd420ace 100644
--- a/core/src/main/scala/kafka/server/KafkaApis.scala
+++ b/core/src/main/scala/kafka/server/KafkaApis.scala
@@ -276,6 +276,7 @@ class KafkaApis(val requestChannel: RequestChannel,
         case ApiKeys.WRITE_SHARE_GROUP_STATE => 
handleWriteShareGroupStateRequest(request)
         case ApiKeys.DELETE_SHARE_GROUP_STATE => 
handleDeleteShareGroupStateRequest(request)
         case ApiKeys.READ_SHARE_GROUP_STATE_SUMMARY => 
handleReadShareGroupStateSummaryRequest(request)
+        case ApiKeys.STREAMS_GROUP_DESCRIBE => 
handleStreamsGroupDescribe(request).exceptionally(handleError)
         case ApiKeys.STREAMS_GROUP_INITIALIZE => 
handleStreamsInitialize(request).exceptionally(handleError)
         case ApiKeys.STREAMS_GROUP_HEARTBEAT => 
handleStreamsHeartbeat(request).exceptionally(handleError)
         case _ => throw new IllegalStateException(s"No handler for request api 
key ${request.header.apiKey}")
@@ -3885,12 +3886,16 @@ class KafkaApis(val requestChannel: RequestChannel,
 
   }
 
+  private def isStreamsGroupProtocolEnabled(): Boolean = {
+    config.groupCoordinatorRebalanceProtocols.contains(Group.GroupType.STREAMS)
+  }
+  
   def handleStreamsInitialize(request: RequestChannel.Request): 
CompletableFuture[Unit] = {
     val streamsInitializeRequest = request.body[StreamsGroupInitializeRequest]
 
     // TODO: Check ACLs on CREATE TOPIC & DESCRIBE_CONFIGS
 
-    if (!config.isNewGroupCoordinatorEnabled) {
+    if (!isStreamsGroupProtocolEnabled()) {
       // The API is not supported by the "old" group coordinator (the 
default). If the
       // new one is not enabled, we fail directly here.
       requestHelper.sendMaybeThrottle(request, 
streamsInitializeRequest.getErrorResponse(Errors.UNSUPPORTED_VERSION.exception))
@@ -3915,7 +3920,7 @@ class KafkaApis(val requestChannel: RequestChannel,
   def handleStreamsHeartbeat(request: RequestChannel.Request): 
CompletableFuture[Unit] = {
     val streamsHeartbeatRequest = request.body[StreamsGroupHeartbeatRequest]
 
-    if (!config.isNewGroupCoordinatorEnabled) {
+    if (!isStreamsGroupProtocolEnabled()) {
       // The API is not supported by the "old" group coordinator (the 
default). If the
       // new one is not enabled, we fail directly here.
       requestHelper.sendMaybeThrottle(request, 
streamsHeartbeatRequest.getErrorResponse(Errors.UNSUPPORTED_VERSION.exception))
@@ -3937,6 +3942,63 @@ class KafkaApis(val requestChannel: RequestChannel,
     }
   }
 
+  def handleStreamsGroupDescribe(request: RequestChannel.Request): 
CompletableFuture[Unit] = {
+    val streamsGroupDescribeRequest = request.body[StreamsGroupDescribeRequest]
+    val includeAuthorizedOperations = 
streamsGroupDescribeRequest.data.includeAuthorizedOperations
+
+    if (!isStreamsGroupProtocolEnabled()) {
+      // The API is not supported by the "old" group coordinator (the 
default). If the
+      // new one is not enabled, we fail directly here.
+      requestHelper.sendMaybeThrottle(request, 
request.body[StreamsGroupDescribeRequest].getErrorResponse(Errors.UNSUPPORTED_VERSION.exception))
+      CompletableFuture.completedFuture[Unit](())
+    } else {
+      val response = new StreamsGroupDescribeResponseData()
+
+      val authorizedGroups = new ArrayBuffer[String]()
+      streamsGroupDescribeRequest.data.groupIds.forEach { groupId =>
+        if (!authHelper.authorize(request.context, DESCRIBE, GROUP, groupId)) {
+          response.groups.add(new 
StreamsGroupDescribeResponseData.DescribedGroup()
+            .setGroupId(groupId)
+            .setErrorCode(Errors.GROUP_AUTHORIZATION_FAILED.code)
+          )
+        } else {
+          authorizedGroups += groupId
+        }
+      }
+
+      groupCoordinator.streamsGroupDescribe(
+        request.context,
+        authorizedGroups.asJava
+      ).handle[Unit] { (results, exception) =>
+        if (exception != null) {
+          requestHelper.sendMaybeThrottle(request, 
streamsGroupDescribeRequest.getErrorResponse(exception))
+        } else {
+          if (includeAuthorizedOperations) {
+            results.forEach { groupResult =>
+              if (groupResult.errorCode == Errors.NONE.code) {
+                
groupResult.setAuthorizedOperations(authHelper.authorizedOperations(
+                  request,
+                  new Resource(ResourceType.GROUP, groupResult.groupId)
+                ))
+              }
+            }
+          }
+
+          if (response.groups.isEmpty) {
+            // If the response is empty, we can directly reuse the results.
+            response.setGroups(results)
+          } else {
+            // Otherwise, we have to copy the results into the existing ones.
+            response.groups.addAll(results)
+          }
+
+          requestHelper.sendMaybeThrottle(request, new 
StreamsGroupDescribeResponse(response))
+        }
+      }
+    }
+
+  }
+  
   def handleGetTelemetrySubscriptionsRequest(request: RequestChannel.Request): 
Unit = {
     val subscriptionRequest = request.body[GetTelemetrySubscriptionsRequest]
 
diff --git a/core/src/main/scala/kafka/server/KafkaConfig.scala 
b/core/src/main/scala/kafka/server/KafkaConfig.scala
index 18327a59b55..3b9aa091b8a 100755
--- a/core/src/main/scala/kafka/server/KafkaConfig.scala
+++ b/core/src/main/scala/kafka/server/KafkaConfig.scala
@@ -599,6 +599,13 @@ class KafkaConfig private(doLog: Boolean, val props: 
util.Map[_, _])
       warn(s"Share groups and the new '${GroupType.SHARE}' rebalance protocol 
are enabled. " +
         "This is part of the early access of KIP-932 and MUST NOT be used in 
production.")
     }
+    if (protocols.contains(GroupType.STREAMS)) {
+      if (processRoles.isEmpty) {
+        throw new ConfigException(s"The new '${GroupType.STREAMS}' rebalance 
protocol is only supported in KRaft cluster.")
+      }
+      warn(s"The new '${GroupType.STREAMS}' rebalance protocol is enabled 
along with the new group coordinator. " +
+        "This is part of the preview of KIP-1071 and MUST NOT be used in 
production.")
+    }
     protocols
   }
 
diff --git a/core/src/test/scala/unit/kafka/server/KafkaApisTest.scala 
b/core/src/test/scala/unit/kafka/server/KafkaApisTest.scala
index 917c47aeea2..202149fe747 100644
--- a/core/src/test/scala/unit/kafka/server/KafkaApisTest.scala
+++ b/core/src/test/scala/unit/kafka/server/KafkaApisTest.scala
@@ -11235,42 +11235,48 @@ class KafkaApisTest extends Logging {
   }
 
   @Test
-  def testStreamsInitializeRequest(): Unit = {
-    val streamsInitializeRequest = new 
StreamsGroupInitializeRequestData().setGroupId("group")
+  def testStreamsGroupInitializeRequest(): Unit = {
+    metadataCache = mock(classOf[KRaftMetadataCache])
+
+    val streamsGroupInitializeRequest = new 
StreamsGroupInitializeRequestData().setGroupId("group")
 
-    val requestChannelRequest = buildRequest(new 
StreamsGroupInitializeRequest.Builder(streamsInitializeRequest, true).build())
+    val requestChannelRequest = buildRequest(new 
StreamsGroupInitializeRequest.Builder(streamsGroupInitializeRequest, 
true).build())
 
     val future = new CompletableFuture[StreamsGroupInitializeResponseData]()
     when(groupCoordinator.streamsInitialize(
       requestChannelRequest.context,
-      streamsInitializeRequest
+      streamsGroupInitializeRequest
     )).thenReturn(future)
-    kafkaApis = createKafkaApis(overrideProperties = Map(
-      GroupCoordinatorConfig.NEW_GROUP_COORDINATOR_ENABLE_CONFIG -> "true"
-    ))
+    kafkaApis = createKafkaApis(
+      overrideProperties = 
Map(GroupCoordinatorConfig.GROUP_COORDINATOR_REBALANCE_PROTOCOLS_CONFIG -> 
"classic,streams"),
+      raftSupport = true
+    )
     kafkaApis.handle(requestChannelRequest, RequestLocal.NoCaching)
 
-    val streamsInitializeResponse = new StreamsGroupInitializeResponseData()
+    val streamsGroupInitializeResponse = new 
StreamsGroupInitializeResponseData()
 
-    future.complete(streamsInitializeResponse)
+    future.complete(streamsGroupInitializeResponse)
     val response = 
verifyNoThrottling[StreamsGroupInitializeResponse](requestChannelRequest)
-    assertEquals(streamsInitializeResponse, response.data)
+    assertEquals(streamsGroupInitializeResponse, response.data)
   }
 
   @Test
-  def testStreamsInitializeRequestFutureFailed(): Unit = {
-    val streamsInitializeRequest = new 
StreamsGroupInitializeRequestData().setGroupId("group")
+  def testStreamsGroupInitializeRequestFutureFailed(): Unit = {
+    metadataCache = mock(classOf[KRaftMetadataCache])
 
-    val requestChannelRequest = buildRequest(new 
StreamsGroupInitializeRequest.Builder(streamsInitializeRequest, true).build())
+    val streamsGroupInitializeRequest = new 
StreamsGroupInitializeRequestData().setGroupId("group")
+
+    val requestChannelRequest = buildRequest(new 
StreamsGroupInitializeRequest.Builder(streamsGroupInitializeRequest, 
true).build())
 
     val future = new CompletableFuture[StreamsGroupInitializeResponseData]()
     when(groupCoordinator.streamsInitialize(
       requestChannelRequest.context,
-      streamsInitializeRequest
+      streamsGroupInitializeRequest
     )).thenReturn(future)
-    kafkaApis = createKafkaApis(overrideProperties = Map(
-      GroupCoordinatorConfig.NEW_GROUP_COORDINATOR_ENABLE_CONFIG -> "true"
-    ))
+    kafkaApis = createKafkaApis(
+      overrideProperties = 
Map(GroupCoordinatorConfig.GROUP_COORDINATOR_REBALANCE_PROTOCOLS_CONFIG -> 
"classic,streams"),
+      raftSupport = true
+    )
     kafkaApis.handle(requestChannelRequest, RequestLocal.NoCaching)
 
     future.completeExceptionally(Errors.FENCED_MEMBER_EPOCH.exception)
@@ -11279,17 +11285,20 @@ class KafkaApisTest extends Logging {
   }
 
   @Test
-  def testStreamsInitializeRequestAuthorizationFailed(): Unit = {
-    val streamsInitializeRequest = new 
StreamsGroupInitializeRequestData().setGroupId("group")
+  def testStreamsGroupInitializeRequestAuthorizationFailed(): Unit = {
+    metadataCache = mock(classOf[KRaftMetadataCache])
+
+    val streamsGroupInitializeRequest = new 
StreamsGroupInitializeRequestData().setGroupId("group")
 
-    val requestChannelRequest = buildRequest(new 
StreamsGroupInitializeRequest.Builder(streamsInitializeRequest, true).build())
+    val requestChannelRequest = buildRequest(new 
StreamsGroupInitializeRequest.Builder(streamsGroupInitializeRequest, 
true).build())
 
     val authorizer: Authorizer = mock(classOf[Authorizer])
     when(authorizer.authorize(any[RequestContext], any[util.List[Action]]))
       .thenReturn(Seq(AuthorizationResult.DENIED).asJava)
     kafkaApis = createKafkaApis(
       authorizer = Some(authorizer),
-      overrideProperties = 
Map(GroupCoordinatorConfig.NEW_GROUP_COORDINATOR_ENABLE_CONFIG -> "true")
+      overrideProperties = 
Map(GroupCoordinatorConfig.GROUP_COORDINATOR_REBALANCE_PROTOCOLS_CONFIG -> 
"classic,streams"),
+      raftSupport = true
     )
     kafkaApis.handle(requestChannelRequest, RequestLocal.NoCaching)
 
@@ -11299,43 +11308,49 @@ class KafkaApisTest extends Logging {
 
 
   @Test
-  def testStreamsHeartbeatRequest(): Unit = {
-    val streamsHeartbeatRequest = new 
StreamsGroupHeartbeatRequestData().setGroupId("group")
+  def testStreamsGroupHeartbeatRequest(): Unit = {
+    metadataCache = mock(classOf[KRaftMetadataCache])
 
-    val requestChannelRequest = buildRequest(new 
StreamsGroupHeartbeatRequest.Builder(streamsHeartbeatRequest, true).build())
+    val streamsGroupHeartbeatRequest = new 
StreamsGroupHeartbeatRequestData().setGroupId("group")
+
+    val requestChannelRequest = buildRequest(new 
StreamsGroupHeartbeatRequest.Builder(streamsGroupHeartbeatRequest, 
true).build())
 
     val future = new CompletableFuture[StreamsGroupHeartbeatResponseData]()
     when(groupCoordinator.streamsHeartbeat(
       requestChannelRequest.context,
-      streamsHeartbeatRequest
+      streamsGroupHeartbeatRequest
     )).thenReturn(future)
-    kafkaApis = createKafkaApis(overrideProperties = Map(
-      GroupCoordinatorConfig.NEW_GROUP_COORDINATOR_ENABLE_CONFIG -> "true"
-    ))
+    kafkaApis = createKafkaApis(
+      overrideProperties = 
Map(GroupCoordinatorConfig.GROUP_COORDINATOR_REBALANCE_PROTOCOLS_CONFIG -> 
"classic,streams"),
+      raftSupport = true
+    )
     kafkaApis.handle(requestChannelRequest, RequestLocal.NoCaching)
 
-    val streamsHeartbeatResponse = new StreamsGroupHeartbeatResponseData()
+    val streamsGroupHeartbeatResponse = new StreamsGroupHeartbeatResponseData()
       .setMemberId("member")
 
-    future.complete(streamsHeartbeatResponse)
+    future.complete(streamsGroupHeartbeatResponse)
     val response = 
verifyNoThrottling[StreamsGroupHeartbeatResponse](requestChannelRequest)
-    assertEquals(streamsHeartbeatResponse, response.data)
+    assertEquals(streamsGroupHeartbeatResponse, response.data)
   }
 
   @Test
-  def testStreamsHeartbeatRequestFutureFailed(): Unit = {
-    val streamsHeartbeatRequest = new 
StreamsGroupHeartbeatRequestData().setGroupId("group")
+  def testStreamsGroupHeartbeatRequestFutureFailed(): Unit = {
+    metadataCache = mock(classOf[KRaftMetadataCache])
+
+    val streamsGroupHeartbeatRequest = new 
StreamsGroupHeartbeatRequestData().setGroupId("group")
 
-    val requestChannelRequest = buildRequest(new 
StreamsGroupHeartbeatRequest.Builder(streamsHeartbeatRequest, true).build())
+    val requestChannelRequest = buildRequest(new 
StreamsGroupHeartbeatRequest.Builder(streamsGroupHeartbeatRequest, 
true).build())
 
     val future = new CompletableFuture[StreamsGroupHeartbeatResponseData]()
     when(groupCoordinator.streamsHeartbeat(
       requestChannelRequest.context,
-      streamsHeartbeatRequest
+      streamsGroupHeartbeatRequest
     )).thenReturn(future)
-    kafkaApis = createKafkaApis(overrideProperties = Map(
-      GroupCoordinatorConfig.NEW_GROUP_COORDINATOR_ENABLE_CONFIG -> "true"
-    ))
+    kafkaApis = createKafkaApis(
+      overrideProperties = 
Map(GroupCoordinatorConfig.GROUP_COORDINATOR_REBALANCE_PROTOCOLS_CONFIG -> 
"classic,streams"),
+      raftSupport = true
+    )
     kafkaApis.handle(requestChannelRequest, RequestLocal.NoCaching)
 
     future.completeExceptionally(Errors.FENCED_MEMBER_EPOCH.exception)
@@ -11344,17 +11359,20 @@ class KafkaApisTest extends Logging {
   }
 
   @Test
-  def testStreamsHeartbeatRequestAuthorizationFailed(): Unit = {
-    val streamsHeartbeatRequest = new 
StreamsGroupHeartbeatRequestData().setGroupId("group")
+  def testStreamsGroupHeartbeatRequestAuthorizationFailed(): Unit = {
+    metadataCache = mock(classOf[KRaftMetadataCache])
 
-    val requestChannelRequest = buildRequest(new 
StreamsGroupHeartbeatRequest.Builder(streamsHeartbeatRequest, true).build())
+    val streamsGroupHeartbeatRequest = new 
StreamsGroupHeartbeatRequestData().setGroupId("group")
+
+    val requestChannelRequest = buildRequest(new 
StreamsGroupHeartbeatRequest.Builder(streamsGroupHeartbeatRequest, 
true).build())
 
     val authorizer: Authorizer = mock(classOf[Authorizer])
     when(authorizer.authorize(any[RequestContext], any[util.List[Action]]))
       .thenReturn(Seq(AuthorizationResult.DENIED).asJava)
     kafkaApis = createKafkaApis(
       authorizer = Some(authorizer),
-      overrideProperties = 
Map(GroupCoordinatorConfig.NEW_GROUP_COORDINATOR_ENABLE_CONFIG -> "true")
+      overrideProperties = 
Map(GroupCoordinatorConfig.GROUP_COORDINATOR_REBALANCE_PROTOCOLS_CONFIG -> 
"classic,streams"),
+      raftSupport = true
     )
     kafkaApis.handle(requestChannelRequest, RequestLocal.NoCaching)
 
@@ -11483,6 +11501,128 @@ class KafkaApisTest extends Logging {
     assertEquals(Errors.FENCED_MEMBER_EPOCH.code, 
response.data.groups.get(0).errorCode)
   }
 
+  @ParameterizedTest
+  @ValueSource(booleans = Array(true, false))
+  def testStreamsGroupDescribe(includeAuthorizedOperations: Boolean): Unit = {
+    metadataCache = mock(classOf[KRaftMetadataCache])
+
+    val groupIds = List("group-id-0", "group-id-1", "group-id-2").asJava
+    val streamsGroupDescribeRequestData = new StreamsGroupDescribeRequestData()
+      .setIncludeAuthorizedOperations(includeAuthorizedOperations)
+    streamsGroupDescribeRequestData.groupIds.addAll(groupIds)
+    val requestChannelRequest = buildRequest(new 
StreamsGroupDescribeRequest.Builder(streamsGroupDescribeRequestData, 
true).build())
+
+    val future = new 
CompletableFuture[util.List[StreamsGroupDescribeResponseData.DescribedGroup]]()
+    when(groupCoordinator.streamsGroupDescribe(
+      any[RequestContext],
+      any[util.List[String]]
+    )).thenReturn(future)
+    kafkaApis = createKafkaApis(
+      overrideProperties = 
Map(GroupCoordinatorConfig.GROUP_COORDINATOR_REBALANCE_PROTOCOLS_CONFIG -> 
"classic,streams"),
+      raftSupport = true
+    )
+    kafkaApis.handle(requestChannelRequest, RequestLocal.NoCaching)
+
+    future.complete(List(
+      new 
StreamsGroupDescribeResponseData.DescribedGroup().setGroupId(groupIds.get(0)),
+      new 
StreamsGroupDescribeResponseData.DescribedGroup().setGroupId(groupIds.get(1)),
+      new 
StreamsGroupDescribeResponseData.DescribedGroup().setGroupId(groupIds.get(2))
+    ).asJava)
+
+    var authorizedOperationsInt = Int.MinValue;
+    if (includeAuthorizedOperations) {
+      authorizedOperationsInt = Utils.to32BitField(
+        AclEntry.supportedOperations(ResourceType.GROUP).asScala
+          .map(_.code.asInstanceOf[JByte]).asJava)
+    }
+
+    // Can't reuse the above list here because we would not test the 
implementation in KafkaApis then
+    val describedGroups = List(
+      new 
StreamsGroupDescribeResponseData.DescribedGroup().setGroupId(groupIds.get(0)),
+      new 
StreamsGroupDescribeResponseData.DescribedGroup().setGroupId(groupIds.get(1)),
+      new 
StreamsGroupDescribeResponseData.DescribedGroup().setGroupId(groupIds.get(2))
+    ).map(group => group.setAuthorizedOperations(authorizedOperationsInt))
+    val expectedStreamsGroupDescribeResponseData = new 
StreamsGroupDescribeResponseData()
+      .setGroups(describedGroups.asJava)
+
+    val response = 
verifyNoThrottling[StreamsGroupDescribeResponse](requestChannelRequest)
+
+    assertEquals(expectedStreamsGroupDescribeResponseData, response.data)
+  }
+
+  @Test
+  def testStreamsGroupDescribeReturnsUnsupportedVersion(): Unit = {
+    metadataCache = mock(classOf[ZkMetadataCache])
+
+    val groupId = "group0"
+    val streamsGroupDescribeRequestData = new StreamsGroupDescribeRequestData()
+    streamsGroupDescribeRequestData.groupIds.add(groupId)
+    val requestChannelRequest = buildRequest(new 
StreamsGroupDescribeRequest.Builder(streamsGroupDescribeRequestData, 
true).build())
+
+    val errorCode = Errors.UNSUPPORTED_VERSION.code
+    val expectedDescribedGroup = new 
StreamsGroupDescribeResponseData.DescribedGroup().setGroupId(groupId).setErrorCode(errorCode)
+    val expectedResponse = new StreamsGroupDescribeResponseData()
+    expectedResponse.groups.add(expectedDescribedGroup)
+    kafkaApis = createKafkaApis()
+    kafkaApis.handle(requestChannelRequest, RequestLocal.NoCaching)
+    val response = 
verifyNoThrottling[StreamsGroupDescribeResponse](requestChannelRequest)
+
+    assertEquals(expectedResponse, response.data)
+  }
+
+  @Test
+  def testStreamsGroupDescribeAuthorizationFailed(): Unit = {
+    metadataCache = mock(classOf[KRaftMetadataCache])
+
+    val streamsGroupDescribeRequestData = new StreamsGroupDescribeRequestData()
+    streamsGroupDescribeRequestData.groupIds.add("group-id")
+    val requestChannelRequest = buildRequest(new 
StreamsGroupDescribeRequest.Builder(streamsGroupDescribeRequestData, 
true).build())
+
+    val authorizer: Authorizer = mock(classOf[Authorizer])
+    when(authorizer.authorize(any[RequestContext], any[util.List[Action]]))
+      .thenReturn(Seq(AuthorizationResult.DENIED).asJava)
+
+    val future = new 
CompletableFuture[util.List[StreamsGroupDescribeResponseData.DescribedGroup]]()
+    when(groupCoordinator.streamsGroupDescribe(
+      any[RequestContext],
+      any[util.List[String]]
+    )).thenReturn(future)
+    future.complete(List().asJava)
+    kafkaApis = createKafkaApis(
+      authorizer = Some(authorizer),
+      overrideProperties = 
Map(GroupCoordinatorConfig.GROUP_COORDINATOR_REBALANCE_PROTOCOLS_CONFIG -> 
"classic,streams"),
+      raftSupport = true
+    )
+    kafkaApis.handle(requestChannelRequest, RequestLocal.NoCaching)
+
+    val response = 
verifyNoThrottling[StreamsGroupDescribeResponse](requestChannelRequest)
+    assertEquals(Errors.GROUP_AUTHORIZATION_FAILED.code, 
response.data.groups.get(0).errorCode)
+  }
+
+  @Test
+  def testStreamsGroupDescribeFutureFailed(): Unit = {
+    metadataCache = mock(classOf[KRaftMetadataCache])
+
+    val streamsGroupDescribeRequestData = new StreamsGroupDescribeRequestData()
+    streamsGroupDescribeRequestData.groupIds.add("group-id")
+    val requestChannelRequest = buildRequest(new 
StreamsGroupDescribeRequest.Builder(streamsGroupDescribeRequestData, 
true).build())
+
+    val future = new 
CompletableFuture[util.List[StreamsGroupDescribeResponseData.DescribedGroup]]()
+    when(groupCoordinator.streamsGroupDescribe(
+      any[RequestContext],
+      any[util.List[String]]
+    )).thenReturn(future)
+    kafkaApis = createKafkaApis(
+      overrideProperties = 
Map(GroupCoordinatorConfig.GROUP_COORDINATOR_REBALANCE_PROTOCOLS_CONFIG -> 
"classic,streams"),
+      raftSupport = true
+    )
+    kafkaApis.handle(requestChannelRequest, RequestLocal.NoCaching)
+
+    future.completeExceptionally(Errors.FENCED_MEMBER_EPOCH.exception)
+    val response = 
verifyNoThrottling[StreamsGroupDescribeResponse](requestChannelRequest)
+    assertEquals(Errors.FENCED_MEMBER_EPOCH.code, 
response.data.groups.get(0).errorCode)
+  }
+  
   @Test
   def testGetTelemetrySubscriptionsNotAllowedForZkClusters(): Unit = {
     val data = new GetTelemetrySubscriptionsRequestData()
diff --git a/core/src/test/scala/unit/kafka/server/KafkaConfigTest.scala 
b/core/src/test/scala/unit/kafka/server/KafkaConfigTest.scala
index 1c5c2d3b0cd..28717aee87b 100755
--- a/core/src/test/scala/unit/kafka/server/KafkaConfigTest.scala
+++ b/core/src/test/scala/unit/kafka/server/KafkaConfigTest.scala
@@ -1938,6 +1938,12 @@ class KafkaConfigTest {
     assertEquals(Set(GroupType.CLASSIC, GroupType.CONSUMER, GroupType.SHARE), 
config.groupCoordinatorRebalanceProtocols)
     assertTrue(config.isNewGroupCoordinatorEnabled)
     assertTrue(config.shareGroupConfig.isShareGroupEnabled)
+    
+    // This is OK.
+    
props.put(GroupCoordinatorConfig.GROUP_COORDINATOR_REBALANCE_PROTOCOLS_CONFIG, 
"classic,streams")
+    val config2 = KafkaConfig.fromProps(props)
+    assertEquals(Set(GroupType.CLASSIC, GroupType.STREAMS), 
config2.groupCoordinatorRebalanceProtocols)
+    assertTrue(config2.isNewGroupCoordinatorEnabled)
   }
 
   @Test
diff --git a/gradle/spotbugs-exclude.xml b/gradle/spotbugs-exclude.xml
index b5a3c9bd96e..2b3c5b128d8 100644
--- a/gradle/spotbugs-exclude.xml
+++ b/gradle/spotbugs-exclude.xml
@@ -562,4 +562,9 @@ For a detailed description of spotbugs bug categories, see 
https://spotbugs.read
         <Bug pattern="SING_SINGLETON_HAS_NONPRIVATE_CONSTRUCTOR"/>
     </Match>
 
+    <Match>
+        <Class 
name="org.apache.kafka.coordinator.group.streams.CurrentAssignmentBuilder"/>
+        <Bug pattern="URF_UNREAD_FIELD"/>
+    </Match>
+
 </FindBugsFilter>
diff --git 
a/group-coordinator/src/main/java/org/apache/kafka/coordinator/group/GroupCoordinator.java
 
b/group-coordinator/src/main/java/org/apache/kafka/coordinator/group/GroupCoordinator.java
index f25b5c39461..cea5da08608 100644
--- 
a/group-coordinator/src/main/java/org/apache/kafka/coordinator/group/GroupCoordinator.java
+++ 
b/group-coordinator/src/main/java/org/apache/kafka/coordinator/group/GroupCoordinator.java
@@ -39,6 +39,7 @@ import 
org.apache.kafka.common.message.OffsetFetchResponseData;
 import org.apache.kafka.common.message.ShareGroupDescribeResponseData;
 import org.apache.kafka.common.message.ShareGroupHeartbeatRequestData;
 import org.apache.kafka.common.message.ShareGroupHeartbeatResponseData;
+import org.apache.kafka.common.message.StreamsGroupDescribeResponseData;
 import org.apache.kafka.common.message.StreamsGroupHeartbeatRequestData;
 import org.apache.kafka.common.message.StreamsGroupHeartbeatResponseData;
 import org.apache.kafka.common.message.StreamsGroupInitializeRequestData;
@@ -229,6 +230,19 @@ public interface GroupCoordinator {
         List<String> groupIds
     );
 
+    /**
+     * Describe streams groups.
+     *
+     * @param context           The coordinator request context.
+     * @param groupIds          The group ids.
+     *
+     * @return A future yielding the results or an exception.
+     */
+    CompletableFuture<List<StreamsGroupDescribeResponseData.DescribedGroup>> 
streamsGroupDescribe(
+        RequestContext context,
+        List<String> groupIds
+    );
+
     /**
      * Describe share groups.
      *
diff --git 
a/group-coordinator/src/main/java/org/apache/kafka/coordinator/group/GroupCoordinatorService.java
 
b/group-coordinator/src/main/java/org/apache/kafka/coordinator/group/GroupCoordinatorService.java
index b2e2460064c..5d632528961 100644
--- 
a/group-coordinator/src/main/java/org/apache/kafka/coordinator/group/GroupCoordinatorService.java
+++ 
b/group-coordinator/src/main/java/org/apache/kafka/coordinator/group/GroupCoordinatorService.java
@@ -44,6 +44,7 @@ import 
org.apache.kafka.common.message.ShareGroupDescribeResponseData;
 import 
org.apache.kafka.common.message.ShareGroupDescribeResponseData.DescribedGroup;
 import org.apache.kafka.common.message.ShareGroupHeartbeatRequestData;
 import org.apache.kafka.common.message.ShareGroupHeartbeatResponseData;
+import org.apache.kafka.common.message.StreamsGroupDescribeResponseData;
 import org.apache.kafka.common.message.StreamsGroupHeartbeatRequestData;
 import org.apache.kafka.common.message.StreamsGroupHeartbeatResponseData;
 import org.apache.kafka.common.message.StreamsGroupInitializeRequestData;
@@ -60,6 +61,7 @@ import org.apache.kafka.common.requests.DescribeGroupsRequest;
 import org.apache.kafka.common.requests.OffsetCommitRequest;
 import org.apache.kafka.common.requests.RequestContext;
 import org.apache.kafka.common.requests.ShareGroupDescribeRequest;
+import org.apache.kafka.common.requests.StreamsGroupDescribeRequest;
 import org.apache.kafka.common.requests.TransactionResult;
 import org.apache.kafka.common.requests.TxnOffsetCommitRequest;
 import org.apache.kafka.common.utils.BufferSupplier;
@@ -723,6 +725,57 @@ public class GroupCoordinatorService implements 
GroupCoordinator {
         return FutureUtils.combineFutures(futures, ArrayList::new, 
List::addAll);
     }
 
+    /**
+     * See {@link GroupCoordinator#streamsGroupDescribe(RequestContext, List)}.
+     */
+    @Override
+    public 
CompletableFuture<List<StreamsGroupDescribeResponseData.DescribedGroup>> 
streamsGroupDescribe(
+        RequestContext context,
+        List<String> groupIds
+    ) {
+        if (!isActive.get()) {
+            return 
CompletableFuture.completedFuture(StreamsGroupDescribeRequest.getErrorDescribedGroupList(
+                groupIds,
+                Errors.COORDINATOR_NOT_AVAILABLE
+            ));
+        }
+
+        final 
List<CompletableFuture<List<StreamsGroupDescribeResponseData.DescribedGroup>>> 
futures =
+            new ArrayList<>(groupIds.size());
+        final Map<TopicPartition, List<String>> groupsByTopicPartition = new 
HashMap<>();
+        groupIds.forEach(groupId -> {
+            if (isGroupIdNotEmpty(groupId)) {
+                groupsByTopicPartition
+                    .computeIfAbsent(topicPartitionFor(groupId), __ -> new 
ArrayList<>())
+                    .add(groupId);
+            } else {
+                
futures.add(CompletableFuture.completedFuture(Collections.singletonList(
+                    new StreamsGroupDescribeResponseData.DescribedGroup()
+                        .setGroupId(null)
+                        .setErrorCode(Errors.INVALID_GROUP_ID.code())
+                )));
+            }
+        });
+
+        groupsByTopicPartition.forEach((topicPartition, groupList) -> {
+            
CompletableFuture<List<StreamsGroupDescribeResponseData.DescribedGroup>> future 
=
+                runtime.scheduleReadOperation(
+                    "streams-group-describe",
+                    topicPartition,
+                    (coordinator, lastCommittedOffset) -> 
coordinator.streamsGroupDescribe(groupIds, lastCommittedOffset)
+                ).exceptionally(exception -> handleOperationException(
+                    "streams-group-describe",
+                    groupList,
+                    exception,
+                    (error, __) -> 
StreamsGroupDescribeRequest.getErrorDescribedGroupList(groupList, error)
+                ));
+
+            futures.add(future);
+        });
+
+        return FutureUtils.combineFutures(futures, ArrayList::new, 
List::addAll);
+    }
+    
     /**
      * See {@link GroupCoordinator#shareGroupDescribe(RequestContext, List)}.
      */
diff --git 
a/group-coordinator/src/main/java/org/apache/kafka/coordinator/group/GroupCoordinatorShard.java
 
b/group-coordinator/src/main/java/org/apache/kafka/coordinator/group/GroupCoordinatorShard.java
index 4816649d9f6..dff55c9b684 100644
--- 
a/group-coordinator/src/main/java/org/apache/kafka/coordinator/group/GroupCoordinatorShard.java
+++ 
b/group-coordinator/src/main/java/org/apache/kafka/coordinator/group/GroupCoordinatorShard.java
@@ -39,6 +39,7 @@ import 
org.apache.kafka.common.message.OffsetFetchResponseData;
 import org.apache.kafka.common.message.ShareGroupDescribeResponseData;
 import org.apache.kafka.common.message.ShareGroupHeartbeatRequestData;
 import org.apache.kafka.common.message.ShareGroupHeartbeatResponseData;
+import org.apache.kafka.common.message.StreamsGroupDescribeResponseData;
 import org.apache.kafka.common.message.StreamsGroupHeartbeatRequestData;
 import org.apache.kafka.common.message.StreamsGroupHeartbeatResponseData;
 import org.apache.kafka.common.message.StreamsGroupInitializeRequestData;
@@ -645,6 +646,21 @@ public class GroupCoordinatorShard implements 
CoordinatorShard<CoordinatorRecord
         return groupMetadataManager.consumerGroupDescribe(groupIds, 
committedOffset);
     }
 
+    /**
+     * Handles a StreamsGroupDescribe request.
+     *
+     * @param groupIds      The IDs of the groups to describe.
+     *
+     * @return A list containing the 
StreamsGroupDescribeResponseData.DescribedGroup.
+     *
+     */
+    public List<StreamsGroupDescribeResponseData.DescribedGroup> 
streamsGroupDescribe(
+        List<String> groupIds,
+        long committedOffset
+    ) {
+        return groupMetadataManager.streamsGroupDescribe(groupIds, 
committedOffset);
+    }
+
     /**
      * Handles a ShareGroupDescribe request.
      *
diff --git 
a/group-coordinator/src/main/java/org/apache/kafka/coordinator/group/GroupMetadataManager.java
 
b/group-coordinator/src/main/java/org/apache/kafka/coordinator/group/GroupMetadataManager.java
index 1d95cad5a7e..e934304b168 100644
--- 
a/group-coordinator/src/main/java/org/apache/kafka/coordinator/group/GroupMetadataManager.java
+++ 
b/group-coordinator/src/main/java/org/apache/kafka/coordinator/group/GroupMetadataManager.java
@@ -53,6 +53,7 @@ import org.apache.kafka.common.message.ListGroupsResponseData;
 import org.apache.kafka.common.message.ShareGroupDescribeResponseData;
 import org.apache.kafka.common.message.ShareGroupHeartbeatRequestData;
 import org.apache.kafka.common.message.ShareGroupHeartbeatResponseData;
+import org.apache.kafka.common.message.StreamsGroupDescribeResponseData;
 import org.apache.kafka.common.message.StreamsGroupHeartbeatRequestData;
 import 
org.apache.kafka.common.message.StreamsGroupHeartbeatRequestData.Endpoint;
 import 
org.apache.kafka.common.message.StreamsGroupHeartbeatRequestData.KeyValue;
@@ -863,6 +864,33 @@ public class GroupMetadataManager {
         return describedGroups;
     }
 
+    /**
+     * Handles a StreamsGroupDescribe request.
+     *
+     * @param groupIds          The IDs of the groups to describe.
+     * @param committedOffset   A specified committed offset corresponding to 
this shard.
+     *
+     * @return A list containing the 
StreamsGroupDescribeResponseData.DescribedGroup.
+     */
+    public List<StreamsGroupDescribeResponseData.DescribedGroup> 
streamsGroupDescribe(
+        List<String> groupIds,
+        long committedOffset
+    ) {
+        final List<StreamsGroupDescribeResponseData.DescribedGroup> 
describedGroups = new ArrayList<>();
+        groupIds.forEach(groupId -> {
+            try {
+                describedGroups.add(streamsGroup(groupId, 
committedOffset).asDescribedGroup(committedOffset));
+            } catch (GroupIdNotFoundException exception) {
+                describedGroups.add(new 
StreamsGroupDescribeResponseData.DescribedGroup()
+                    .setGroupId(groupId)
+                    .setErrorCode(Errors.GROUP_ID_NOT_FOUND.code())
+                );
+            }
+        });
+
+        return describedGroups;
+    }
+    
     /**
      * Handles a DescribeGroup request.
      *
@@ -3839,10 +3867,8 @@ public class GroupMetadataManager {
         boolean staticMemberReplaced,
         List<CoordinatorRecord> records
     ) {
-        String preferredServerAssignor = group.computePreferredServerAssignor(
-            member,
-            updatedMember
-        ).orElse(defaultTaskAssignor.name());
+        // TODO: Read the preferred server assignor from the group 
configuration
+        String preferredServerAssignor = defaultTaskAssignor.name();
         try {
             org.apache.kafka.coordinator.group.streams.TargetAssignmentBuilder 
assignmentResultBuilder =
                 new 
org.apache.kafka.coordinator.group.streams.TargetAssignmentBuilder(group.groupId(),
 groupEpoch, taskAssignors.get(preferredServerAssignor))
@@ -3851,7 +3877,6 @@ public class GroupMetadataManager {
                     .withStaticMembers(group.staticMembers())
                     .withSubscriptionMetadata(subscriptionMetadata)
                     .withTargetAssignment(group.targetAssignment())
-                    .withTopicsImage(metadataImage.topics())
                     .addOrUpdateMember(updatedMember.memberId(), 
updatedMember);
             
org.apache.kafka.coordinator.group.streams.TargetAssignmentBuilder.TargetAssignmentResult
 assignmentResult;
             // A new static member is replacing an older one with the same 
subscriptions.
diff --git 
a/group-coordinator/src/main/java/org/apache/kafka/coordinator/group/streams/CurrentAssignmentBuilder.java
 
b/group-coordinator/src/main/java/org/apache/kafka/coordinator/group/streams/CurrentAssignmentBuilder.java
index 19eb79e36e6..f7690381708 100644
--- 
a/group-coordinator/src/main/java/org/apache/kafka/coordinator/group/streams/CurrentAssignmentBuilder.java
+++ 
b/group-coordinator/src/main/java/org/apache/kafka/coordinator/group/streams/CurrentAssignmentBuilder.java
@@ -182,7 +182,7 @@ public class CurrentAssignmentBuilder {
 
                 // If the member provides its owned tasks. We verify if it 
still
                 // owns any of the revoked tasks. If it does, we cannot 
progress.
-                if (ownsRevokedTasks(member.activeTasksPendingRevocation())) {
+                if 
(ownsRevokedActiveTasks(member.activeTasksPendingRevocation())) {
                     return member;
                 }
 
@@ -229,12 +229,12 @@ public class CurrentAssignmentBuilder {
     }
 
     /**
-     * Decides whether the current ownedTopicTasks contains any partition that 
is pending revocation.
+     * Decides whether the current ownedActiveTasks contains any partition 
that is pending revocation.
      *
      * @param assignment The assignment that has the tasks pending revocation.
      * @return A boolean based on the condition mentioned above.
      */
-    private boolean ownsRevokedTasks(
+    private boolean ownsRevokedActiveTasks(
         Map<String, Set<Integer>> assignment
     ) {
         if (ownedActiveTasks == null) {
@@ -313,7 +313,7 @@ public class CurrentAssignmentBuilder {
             }
         }
 
-        if (!newTasksPendingRevocation.isEmpty() && 
ownsRevokedTasks(newTasksPendingRevocation)) {
+        if (!newTasksPendingRevocation.isEmpty() && 
ownsRevokedActiveTasks(newTasksPendingRevocation)) {
             // If there are tasks to be revoked, the member remains in its 
current
             // epoch and requests the revocation of those tasks. It 
transitions to
             // the UNREVOKED_TASKS state to wait until the client acknowledges 
the
diff --git 
a/group-coordinator/src/main/java/org/apache/kafka/coordinator/group/streams/StreamsGroup.java
 
b/group-coordinator/src/main/java/org/apache/kafka/coordinator/group/streams/StreamsGroup.java
index 09dcf7817a5..1d4c2ace5e1 100644
--- 
a/group-coordinator/src/main/java/org/apache/kafka/coordinator/group/streams/StreamsGroup.java
+++ 
b/group-coordinator/src/main/java/org/apache/kafka/coordinator/group/streams/StreamsGroup.java
@@ -23,6 +23,7 @@ import org.apache.kafka.common.errors.ApiException;
 import org.apache.kafka.common.errors.StaleMemberEpochException;
 import org.apache.kafka.common.errors.UnknownMemberIdException;
 import org.apache.kafka.common.message.ListGroupsResponseData;
+import org.apache.kafka.common.message.StreamsGroupDescribeResponseData;
 import org.apache.kafka.common.protocol.Errors;
 import org.apache.kafka.coordinator.group.CoordinatorRecord;
 import org.apache.kafka.coordinator.group.Group;
@@ -131,11 +132,6 @@ public class StreamsGroup implements Group {
      */
     private final TimelineHashMap<String, String> staticMembers;
 
-    /**
-     * The number of members supporting each assignor name.
-     */
-    private final TimelineHashMap<String, Integer> assignors;
-
     /**
      * The metadata associated with each subscribed topic name.
      */
@@ -175,7 +171,7 @@ public class StreamsGroup implements Group {
     /**
      * The Streams topology.
      */
-    private TimelineObject<Optional<StreamsTopology>> topology;
+    private final TimelineObject<Optional<StreamsTopology>> topology;
 
     /**
      * The metadata refresh deadline. It consists of a timestamp in 
milliseconds together with the group epoch at the time of setting it.
@@ -197,7 +193,6 @@ public class StreamsGroup implements Group {
         this.groupEpoch = new TimelineInteger(snapshotRegistry);
         this.members = new TimelineHashMap<>(snapshotRegistry, 0);
         this.staticMembers = new TimelineHashMap<>(snapshotRegistry, 0);
-        this.assignors = new TimelineHashMap<>(snapshotRegistry, 0);
         this.subscribedTopicMetadata = new TimelineHashMap<>(snapshotRegistry, 
0);
         this.targetAssignmentEpoch = new TimelineInteger(snapshotRegistry);
         this.targetAssignment = new TimelineHashMap<>(snapshotRegistry, 0);
@@ -369,7 +364,6 @@ public class StreamsGroup implements Group {
         maybeUpdateTaskEpoch(oldMember, newMember);
         updateStaticMember(newMember);
         maybeUpdateGroupState();
-        maybeUpdateAssignors(oldMember, newMember);
     }
 
     /**
@@ -461,36 +455,6 @@ public class StreamsGroup implements Group {
         return 
Collections.unmodifiableMap(invertedTargetWarmupTasksAssignment);
     }
 
-    /**
-     * Updates the server assignors count.
-     *
-     * @param oldMember The old member.
-     * @param newMember The new member.
-     */
-    private void maybeUpdateAssignors(
-        StreamsGroupMember oldMember,
-        StreamsGroupMember newMember
-    ) {
-        maybeUpdateAssignors(assignors, oldMember, newMember);
-    }
-
-    private static void maybeUpdateAssignors(
-        Map<String, Integer> serverAssignorCount,
-        StreamsGroupMember oldMember,
-        StreamsGroupMember newMember
-    ) {
-        if (oldMember != null) {
-            oldMember.assignor().ifPresent(name ->
-                serverAssignorCount.compute(name, StreamsGroup::decValue)
-            );
-        }
-        if (newMember != null) {
-            newMember.assignor().ifPresent(name ->
-                serverAssignorCount.compute(name, StreamsGroup::incValue)
-            );
-        }
-    }
-
     /**
      * Updates the target assignment of a member.
      *
@@ -656,43 +620,6 @@ public class StreamsGroup implements Group {
         }
     }
 
-    /**
-     * Compute the preferred (server side) assignor for the group while taking 
into account the updated member. The computation relies on
-     * {{@link StreamsGroup#assignors}} persisted structure but it does not 
update it.
-     *
-     * @param oldMember The old member.
-     * @param newMember The new member.
-     * @return An Optional containing the preferred assignor.
-     */
-    public Optional<String> computePreferredServerAssignor(
-        StreamsGroupMember oldMember,
-        StreamsGroupMember newMember
-    ) {
-        // Copy the current count and update it.
-        Map<String, Integer> counts = new HashMap<>(this.assignors);
-        maybeUpdateAssignors(counts, oldMember, newMember);
-
-        return counts.entrySet().stream()
-            .max(Map.Entry.comparingByValue())
-            .map(Map.Entry::getKey);
-    }
-
-    /**
-     * @return The preferred assignor for the group.
-     */
-    public Optional<String> preferredServerAssignor() {
-        return preferredServerAssignor(Long.MAX_VALUE);
-    }
-
-    /**
-     * @return The preferred assignor for the group with given offset.
-     */
-    public Optional<String> preferredServerAssignor(long committedOffset) {
-        return assignors.entrySet(committedOffset).stream()
-            .max(Map.Entry.comparingByValue())
-            .map(Map.Entry::getKey);
-    }
-
     /**
      * @return An immutable Map of subscription metadata for each topic that 
the consumer group is subscribed to.
      */
@@ -1097,21 +1024,23 @@ public class StreamsGroup implements Group {
         });
     }
 
-    /**
-     * Decrements value by 1; returns null when reaching zero. This helper is 
meant to be used with Map#compute.
-     */
-    private static Integer decValue(String key, Integer value) {
-        if (value == null) {
-            return null;
-        }
-        return value == 1 ? null : value - 1;
-    }
-
-    /**
-     * Increments value by 1; This helper is meant to be used with Map#compute.
-     */
-    private static Integer incValue(String key, Integer value) {
-        return value == null ? 1 : value + 1;
+    public StreamsGroupDescribeResponseData.DescribedGroup asDescribedGroup(
+        long committedOffset
+    ) {
+        StreamsGroupDescribeResponseData.DescribedGroup describedGroup = new 
StreamsGroupDescribeResponseData.DescribedGroup()
+            .setGroupId(groupId)
+            .setGroupEpoch(groupEpoch.get(committedOffset))
+            .setGroupState(state.get(committedOffset).toString())
+            .setAssignmentEpoch(targetAssignmentEpoch.get(committedOffset))
+            
.setTopology(topology.get(committedOffset).map(StreamsTopology::asStreamsGroupDescribeTopology).orElse(null));
+        members.entrySet(committedOffset).forEach(
+            entry -> describedGroup.members().add(
+                entry.getValue().asStreamsGroupDescribeMember(
+                    targetAssignment.get(entry.getValue().memberId(), 
committedOffset)
+                )
+            )
+        );
+        return describedGroup;
     }
 
     /**
diff --git 
a/group-coordinator/src/main/java/org/apache/kafka/coordinator/group/streams/StreamsGroupMember.java
 
b/group-coordinator/src/main/java/org/apache/kafka/coordinator/group/streams/StreamsGroupMember.java
index 5ad4370a5ab..e50e450feb4 100644
--- 
a/group-coordinator/src/main/java/org/apache/kafka/coordinator/group/streams/StreamsGroupMember.java
+++ 
b/group-coordinator/src/main/java/org/apache/kafka/coordinator/group/streams/StreamsGroupMember.java
@@ -16,12 +16,9 @@
  */
 package org.apache.kafka.coordinator.group.streams;
 
-import org.apache.kafka.common.Uuid;
-import org.apache.kafka.common.message.ConsumerGroupDescribeResponseData;
+import org.apache.kafka.common.message.StreamsGroupDescribeResponseData;
 import 
org.apache.kafka.coordinator.group.generated.StreamsGroupCurrentMemberAssignmentValue;
 import 
org.apache.kafka.coordinator.group.generated.StreamsGroupMemberMetadataValue;
-import org.apache.kafka.image.TopicImage;
-import org.apache.kafka.image.TopicsImage;
 
 import java.util.ArrayList;
 import java.util.Collections;
@@ -40,7 +37,7 @@ import java.util.stream.Collectors;
  */
 public class StreamsGroupMember {
 
-    /**
+  /**
      * A builder that facilitates the creation of a new member or the update 
of an existing one.
      * <p>
      * Please refer to the javadoc of {{@link StreamsGroupMember}} for the 
definition of the fields.
@@ -57,10 +54,9 @@ public class StreamsGroupMember {
         private String clientId = "";
         private String clientHost = "";
         private String topologyId;
-        private String assignor;
         private String processId;
         private StreamsGroupMemberMetadataValue.Endpoint userEndpoint;
-        private Map<String, String> clientTags;
+        private Map<String, String> clientTags = Collections.emptyMap();
         private Map<String, Set<Integer>> assignedActiveTasks = 
Collections.emptyMap();
         private Map<String, Set<Integer>> assignedStandbyTasks = 
Collections.emptyMap();
         private Map<String, Set<Integer>> assignedWarmupTasks = 
Collections.emptyMap();
@@ -82,7 +78,6 @@ public class StreamsGroupMember {
             this.clientId = member.clientId;
             this.clientHost = member.clientHost;
             this.topologyId = member.topologyId;
-            this.assignor = member.assignor;
             this.processId = member.processId;
             this.userEndpoint = member.userEndpoint;
             this.clientTags = member.clientTags;
@@ -140,11 +135,6 @@ public class StreamsGroupMember {
             return this;
         }
 
-        public StreamsGroupMember.Builder maybeUpdateAssignor(Optional<String> 
assignor) {
-            this.assignor = assignor.orElse(this.assignor);
-            return this;
-        }
-
         public Builder setClientId(String clientId) {
             this.clientId = clientId;
             return this;
@@ -170,11 +160,6 @@ public class StreamsGroupMember {
             return this;
         }
 
-        public Builder setAssignor(String assignor) {
-            this.assignor = assignor;
-            return this;
-        }
-
         public Builder setProcessId(String processId) {
             this.processId = processId;
             return this;
@@ -271,7 +256,6 @@ public class StreamsGroupMember {
                 clientId,
                 clientHost,
                 topologyId,
-                assignor,
                 processId,
                 userEndpoint,
                 clientTags,
@@ -334,11 +318,6 @@ public class StreamsGroupMember {
      */
     private final String topologyId;
 
-    /**
-     * The assignor
-     */
-    private final String assignor;
-
     /**
      * The process ID
      */
@@ -385,7 +364,6 @@ public class StreamsGroupMember {
         String clientId,
         String clientHost,
         String topologyId,
-        String assignor,
         String processId,
         StreamsGroupMemberMetadataValue.Endpoint userEndpoint,
         Map<String, String> clientTags,
@@ -405,7 +383,6 @@ public class StreamsGroupMember {
         this.clientId = clientId;
         this.clientHost = clientHost;
         this.topologyId = topologyId;
-        this.assignor = assignor;
         this.processId = processId;
         this.userEndpoint = userEndpoint;
         this.clientTags = clientTags;
@@ -478,13 +455,6 @@ public class StreamsGroupMember {
         return topologyId;
     }
 
-    /**
-     * @return The assignor
-     */
-    public Optional<String> assignor() {
-        return Optional.ofNullable(assignor);
-    }
-
     /**
      * @return The process ID
      */
@@ -548,33 +518,58 @@ public class StreamsGroupMember {
         return activeTasksPendingRevocation;
     }
 
-    private static List<ConsumerGroupDescribeResponseData.TopicPartitions> 
topicPartitionsFromMap(
-        Map<Uuid, Set<Integer>> partitions,
-        TopicsImage topicsImage
+    /**
+     * @param targetAssignment The target assignment of this member in the 
corresponding group.
+     *
+     * @return The StreamsGroupMember mapped as 
StreamsGroupDescribeResponseData.Member.
+     */
+    public StreamsGroupDescribeResponseData.Member 
asStreamsGroupDescribeMember(
+        Assignment targetAssignment
     ) {
-        List<ConsumerGroupDescribeResponseData.TopicPartitions> 
topicPartitions = new ArrayList<>();
-        partitions.forEach((topicId, partitionSet) -> {
-            String topicName = lookupTopicNameById(topicId, topicsImage);
-            if (topicName != null) {
-                topicPartitions.add(new 
ConsumerGroupDescribeResponseData.TopicPartitions()
-                    .setTopicId(topicId)
-                    .setTopicName(topicName)
-                    .setPartitions(new ArrayList<>(partitionSet)));
-            }
-        });
-        return topicPartitions;
+        final StreamsGroupDescribeResponseData.Assignment 
describedTargetAssignment =
+            new StreamsGroupDescribeResponseData.Assignment();
+
+        if (targetAssignment != null) {
+            describedTargetAssignment
+                .setActiveTasks(taskIdsFromMap(targetAssignment.activeTasks()))
+                
.setStandbyTasks(taskIdsFromMap(targetAssignment.standbyTasks()))
+                
.setWarmupTasks(taskIdsFromMap(targetAssignment.warmupTasks()));
+        }
+
+        return new StreamsGroupDescribeResponseData.Member()
+            .setMemberEpoch(memberEpoch)
+            .setMemberId(memberId)
+            .setAssignment(
+                new StreamsGroupDescribeResponseData.Assignment()
+                    .setActiveTasks(taskIdsFromMap(assignedActiveTasks))
+                    .setStandbyTasks(taskIdsFromMap(assignedStandbyTasks))
+                    .setWarmupTasks(taskIdsFromMap(assignedWarmupTasks)))
+            .setTargetAssignment(describedTargetAssignment)
+            .setClientHost(clientHost)
+            .setClientId(clientId)
+            .setInstanceId(instanceId)
+            .setRackId(rackId)
+            .setClientTags(clientTags.entrySet().stream().map(
+                entry -> new StreamsGroupDescribeResponseData.KeyValue()
+                    .setKey(entry.getKey())
+                    .setValue(entry.getValue())
+            ).collect(Collectors.toList()))
+            .setProcessId(processId)
+            .setTopologyId(topologyId);
+        // TODO: TaskOffset and TaskEndOffset are missing.
+
     }
 
-    private static String lookupTopicNameById(
-        Uuid topicId,
-        TopicsImage topicsImage
+    private static List<StreamsGroupDescribeResponseData.TaskIds> 
taskIdsFromMap(
+        Map<String, Set<Integer>> tasks
     ) {
-        TopicImage topicImage = topicsImage.getTopic(topicId);
-        if (topicImage != null) {
-            return topicImage.name();
-        } else {
-            return null;
-        }
+        List<StreamsGroupDescribeResponseData.TaskIds> taskIds = new 
ArrayList<>();
+        tasks.forEach((subtopologyId, partitionSet) -> {
+            taskIds.add(new StreamsGroupDescribeResponseData.TaskIds()
+                .setSubtopology(subtopologyId)
+                .setPartitions(new ArrayList<>(partitionSet)));
+        });
+        return taskIds;
     }
 
     @SuppressWarnings("checkstyle:CyclomaticComplexity")
@@ -597,7 +592,6 @@ public class StreamsGroupMember {
             && Objects.equals(clientId, that.clientId)
             && Objects.equals(clientHost, that.clientHost)
             && Objects.deepEquals(topologyId, that.topologyId)
-            && Objects.equals(assignor, that.assignor)
             && Objects.equals(processId, that.processId)
             && Objects.equals(userEndpoint, that.userEndpoint)
             && Objects.equals(clientTags, that.clientTags)
@@ -619,7 +613,6 @@ public class StreamsGroupMember {
         result = 31 * result + Objects.hashCode(clientId);
         result = 31 * result + Objects.hashCode(clientHost);
         result = 31 * result + Objects.hashCode(topologyId);
-        result = 31 * result + Objects.hashCode(assignor);
         result = 31 * result + Objects.hashCode(processId);
         result = 31 * result + Objects.hashCode(userEndpoint);
         result = 31 * result + Objects.hashCode(clientTags);
diff --git 
a/group-coordinator/src/main/java/org/apache/kafka/coordinator/group/streams/StreamsTopology.java
 
b/group-coordinator/src/main/java/org/apache/kafka/coordinator/group/streams/StreamsTopology.java
index e0bec1c292a..5f09941fae9 100644
--- 
a/group-coordinator/src/main/java/org/apache/kafka/coordinator/group/streams/StreamsTopology.java
+++ 
b/group-coordinator/src/main/java/org/apache/kafka/coordinator/group/streams/StreamsTopology.java
@@ -16,6 +16,8 @@
  */
 package org.apache.kafka.coordinator.group.streams;
 
+import java.util.List;
+import org.apache.kafka.common.message.StreamsGroupDescribeResponseData;
 import org.apache.kafka.coordinator.group.generated.StreamsGroupTopologyValue;
 import 
org.apache.kafka.coordinator.group.generated.StreamsGroupTopologyValue.Subtopology;
 import 
org.apache.kafka.coordinator.group.generated.StreamsGroupTopologyValue.TopicInfo;
@@ -88,4 +90,35 @@ public class StreamsTopology {
             ", subtopologies=" + subtopologies +
             '}';
     }
+
+    public List<StreamsGroupDescribeResponseData.Subtopology> 
asStreamsGroupDescribeTopology() {
+        return subtopologies.values().stream().map(
+            subtopology -> new StreamsGroupDescribeResponseData.Subtopology()
+                .setSourceTopicRegex(subtopology.sourceTopicRegex())
+                .setSubtopology(subtopology.subtopology())
+                .setSourceTopics(subtopology.sourceTopics())
+                .setRepartitionSinkTopics(subtopology.repartitionSinkTopics())
+                .setRepartitionSourceTopics(
+                    
asStreamsGroupDescribeTopicInfo(subtopology.repartitionSourceTopics()))
+                .setStateChangelogTopics(
+                    
asStreamsGroupDescribeTopicInfo(subtopology.stateChangelogTopics()))
+        ).collect(Collectors.toList());
+    }
+
+    private static List<StreamsGroupDescribeResponseData.TopicInfo> 
asStreamsGroupDescribeTopicInfo(
+        final List<TopicInfo> topicInfos) {
+        return topicInfos.stream().map(x ->
+            new StreamsGroupDescribeResponseData.TopicInfo()
+                .setName(x.name())
+                .setPartitions(x.partitions())
+                .setTopicConfigs(
+                    x.topicConfigs() != null ?
+                        x.topicConfigs().stream().map(
+                            y -> new 
StreamsGroupDescribeResponseData.KeyValue()
+                                .setKey(y.key())
+                                .setValue(y.value())
+                        ).collect(Collectors.toList()) : null
+                )
+        ).collect(Collectors.toList());
+    }
 }
diff --git 
a/group-coordinator/src/main/java/org/apache/kafka/coordinator/group/streams/TargetAssignmentBuilder.java
 
b/group-coordinator/src/main/java/org/apache/kafka/coordinator/group/streams/TargetAssignmentBuilder.java
index 1fd34e27e89..51e07c3418c 100644
--- 
a/group-coordinator/src/main/java/org/apache/kafka/coordinator/group/streams/TargetAssignmentBuilder.java
+++ 
b/group-coordinator/src/main/java/org/apache/kafka/coordinator/group/streams/TargetAssignmentBuilder.java
@@ -23,7 +23,6 @@ import 
org.apache.kafka.coordinator.group.taskassignor.GroupSpecImpl;
 import org.apache.kafka.coordinator.group.taskassignor.MemberAssignment;
 import org.apache.kafka.coordinator.group.taskassignor.TaskAssignor;
 import org.apache.kafka.coordinator.group.taskassignor.TaskAssignorException;
-import org.apache.kafka.image.TopicsImage;
 
 import java.util.ArrayList;
 import java.util.Collections;
@@ -115,11 +114,6 @@ public class TargetAssignmentBuilder {
      */
     private Map<String, org.apache.kafka.coordinator.group.streams.Assignment> 
targetAssignment = Collections.emptyMap();
 
-    /**
-     * The topics image.
-     */
-    private TopicsImage topicsImage = TopicsImage.EMPTY;
-
     /**
      * The topology.
      */
@@ -204,19 +198,6 @@ public class TargetAssignmentBuilder {
         return this;
     }
 
-    /**
-     * Adds the topics image.
-     *
-     * @param topicsImage The topics image.
-     * @return This object.
-     */
-    public TargetAssignmentBuilder withTopicsImage(
-        TopicsImage topicsImage
-    ) {
-        this.topicsImage = topicsImage;
-        return this;
-    }
-
     /**
      * Adds the topology image.
      *
diff --git 
a/group-coordinator/src/test/java/org/apache/kafka/coordinator/group/GroupCoordinatorServiceTest.java
 
b/group-coordinator/src/test/java/org/apache/kafka/coordinator/group/GroupCoordinatorServiceTest.java
index fcdd221c907..5b1aad09027 100644
--- 
a/group-coordinator/src/test/java/org/apache/kafka/coordinator/group/GroupCoordinatorServiceTest.java
+++ 
b/group-coordinator/src/test/java/org/apache/kafka/coordinator/group/GroupCoordinatorServiceTest.java
@@ -54,6 +54,7 @@ import 
org.apache.kafka.common.message.OffsetFetchResponseData;
 import org.apache.kafka.common.message.ShareGroupDescribeResponseData;
 import org.apache.kafka.common.message.ShareGroupHeartbeatRequestData;
 import org.apache.kafka.common.message.ShareGroupHeartbeatResponseData;
+import org.apache.kafka.common.message.StreamsGroupDescribeResponseData;
 import org.apache.kafka.common.message.StreamsGroupHeartbeatRequestData;
 import org.apache.kafka.common.message.StreamsGroupHeartbeatResponseData;
 import org.apache.kafka.common.message.StreamsGroupInitializeRequestData;
@@ -1771,6 +1772,143 @@ public class GroupCoordinatorServiceTest {
         );
     }
 
+    @Test
+    public void testStreamsGroupDescribe() throws InterruptedException, 
ExecutionException {
+        CoordinatorRuntime<GroupCoordinatorShard, CoordinatorRecord> runtime = 
mockRuntime();
+        GroupCoordinatorService service = new GroupCoordinatorService(
+            new LogContext(),
+            createConfig(),
+            runtime,
+            new GroupCoordinatorMetrics()
+        );
+        int partitionCount = 2;
+        service.startup(() -> partitionCount);
+
+        StreamsGroupDescribeResponseData.DescribedGroup describedGroup1 = new 
StreamsGroupDescribeResponseData.DescribedGroup()
+            .setGroupId("group-id-1");
+        StreamsGroupDescribeResponseData.DescribedGroup describedGroup2 = new 
StreamsGroupDescribeResponseData.DescribedGroup()
+            .setGroupId("group-id-2");
+        List<StreamsGroupDescribeResponseData.DescribedGroup> 
expectedDescribedGroups = Arrays.asList(
+            describedGroup1,
+            describedGroup2
+        );
+
+        when(runtime.scheduleReadOperation(
+            ArgumentMatchers.eq("streams-group-describe"),
+            ArgumentMatchers.eq(new TopicPartition("__consumer_offsets", 0)),
+            ArgumentMatchers.any()
+        
)).thenReturn(CompletableFuture.completedFuture(Collections.singletonList(describedGroup1)));
+
+        CompletableFuture<Object> describedGroupFuture = new 
CompletableFuture<>();
+        when(runtime.scheduleReadOperation(
+            ArgumentMatchers.eq("streams-group-describe"),
+            ArgumentMatchers.eq(new TopicPartition("__consumer_offsets", 1)),
+            ArgumentMatchers.any()
+        )).thenReturn(describedGroupFuture);
+
+        
CompletableFuture<List<StreamsGroupDescribeResponseData.DescribedGroup>> future 
=
+            
service.streamsGroupDescribe(requestContext(ApiKeys.STREAMS_GROUP_DESCRIBE), 
Arrays.asList("group-id-1", "group-id-2"));
+
+        assertFalse(future.isDone());
+        
describedGroupFuture.complete(Collections.singletonList(describedGroup2));
+        assertEquals(expectedDescribedGroups, future.get());
+    }
+
+    @Test
+    public void testStreamsGroupDescribeInvalidGroupId() throws 
ExecutionException, InterruptedException {
+        CoordinatorRuntime<GroupCoordinatorShard, CoordinatorRecord> runtime = 
mockRuntime();
+        GroupCoordinatorService service = new GroupCoordinatorService(
+            new LogContext(),
+            createConfig(),
+            runtime,
+            new GroupCoordinatorMetrics()
+        );
+        int partitionCount = 1;
+        service.startup(() -> partitionCount);
+
+        StreamsGroupDescribeResponseData.DescribedGroup describedGroup = new 
StreamsGroupDescribeResponseData.DescribedGroup()
+            .setGroupId(null)
+            .setErrorCode(Errors.INVALID_GROUP_ID.code());
+        List<StreamsGroupDescribeResponseData.DescribedGroup> 
expectedDescribedGroups = Arrays.asList(
+            new StreamsGroupDescribeResponseData.DescribedGroup()
+                .setGroupId(null)
+                .setErrorCode(Errors.INVALID_GROUP_ID.code()),
+            describedGroup
+        );
+
+        when(runtime.scheduleReadOperation(
+            ArgumentMatchers.eq("streams-group-describe"),
+            ArgumentMatchers.eq(new TopicPartition("__consumer_offsets", 0)),
+            ArgumentMatchers.any()
+        
)).thenReturn(CompletableFuture.completedFuture(Collections.singletonList(describedGroup)));
+
+        
CompletableFuture<List<StreamsGroupDescribeResponseData.DescribedGroup>> future 
=
+            
service.streamsGroupDescribe(requestContext(ApiKeys.STREAMS_GROUP_DESCRIBE), 
Arrays.asList("", null));
+
+        assertEquals(expectedDescribedGroups, future.get());
+    }
+
+    @Test
+    public void testStreamsGroupDescribeCoordinatorLoadInProgress() throws 
ExecutionException, InterruptedException {
+        CoordinatorRuntime<GroupCoordinatorShard, CoordinatorRecord> runtime = 
mockRuntime();
+        GroupCoordinatorService service = new GroupCoordinatorService(
+            new LogContext(),
+            createConfig(),
+            runtime,
+            new GroupCoordinatorMetrics()
+        );
+        int partitionCount = 1;
+        service.startup(() -> partitionCount);
+
+        when(runtime.scheduleReadOperation(
+            ArgumentMatchers.eq("streams-group-describe"),
+            ArgumentMatchers.eq(new TopicPartition("__consumer_offsets", 0)),
+            ArgumentMatchers.any()
+        )).thenReturn(FutureUtils.failedFuture(
+            new CoordinatorLoadInProgressException(null)
+        ));
+
+        
CompletableFuture<List<StreamsGroupDescribeResponseData.DescribedGroup>> future 
=
+            
service.streamsGroupDescribe(requestContext(ApiKeys.STREAMS_GROUP_DESCRIBE), 
Collections.singletonList("group-id"));
+
+        assertEquals(
+            Collections.singletonList(new 
StreamsGroupDescribeResponseData.DescribedGroup()
+                .setGroupId("group-id")
+                .setErrorCode(Errors.COORDINATOR_LOAD_IN_PROGRESS.code())
+            ),
+            future.get()
+        );
+    }
+
+    @Test
+    public void testStreamsGroupDescribeCoordinatorNotActive() throws 
ExecutionException, InterruptedException {
+        CoordinatorRuntime<GroupCoordinatorShard, CoordinatorRecord> runtime = 
mockRuntime();
+        GroupCoordinatorService service = new GroupCoordinatorService(
+            new LogContext(),
+            createConfig(),
+            runtime,
+            new GroupCoordinatorMetrics()
+        );
+        when(runtime.scheduleReadOperation(
+            ArgumentMatchers.eq("streams-group-describe"),
+            ArgumentMatchers.eq(new TopicPartition("__consumer_offsets", 0)),
+            ArgumentMatchers.any()
+        )).thenReturn(FutureUtils.failedFuture(
+            Errors.COORDINATOR_NOT_AVAILABLE.exception()
+        ));
+
+        
CompletableFuture<List<StreamsGroupDescribeResponseData.DescribedGroup>> future 
=
+            
service.streamsGroupDescribe(requestContext(ApiKeys.STREAMS_GROUP_DESCRIBE), 
Collections.singletonList("group-id"));
+
+        assertEquals(
+            Collections.singletonList(new 
StreamsGroupDescribeResponseData.DescribedGroup()
+                .setGroupId("group-id")
+                .setErrorCode(Errors.COORDINATOR_NOT_AVAILABLE.code())
+            ),
+            future.get()
+        );
+    }
+    
     @Test
     public void testDeleteOffsets() throws Exception {
         CoordinatorRuntime<GroupCoordinatorShard, CoordinatorRecord> runtime = 
mockRuntime();
diff --git 
a/group-coordinator/src/test/java/org/apache/kafka/coordinator/group/GroupCoordinatorShardTest.java
 
b/group-coordinator/src/test/java/org/apache/kafka/coordinator/group/GroupCoordinatorShardTest.java
index 2adadc1be4d..e7fffb90c8c 100644
--- 
a/group-coordinator/src/test/java/org/apache/kafka/coordinator/group/GroupCoordinatorShardTest.java
+++ 
b/group-coordinator/src/test/java/org/apache/kafka/coordinator/group/GroupCoordinatorShardTest.java
@@ -100,7 +100,7 @@ import static org.mockito.Mockito.times;
 import static org.mockito.Mockito.verify;
 import static org.mockito.Mockito.when;
 
-@SuppressWarnings({"ClassFanOutComplexity"})
+@SuppressWarnings("ClassFanOutComplexity")
 public class GroupCoordinatorShardTest {
 
     @Test
diff --git 
a/group-coordinator/src/test/java/org/apache/kafka/coordinator/group/GroupMetadataManagerTest.java
 
b/group-coordinator/src/test/java/org/apache/kafka/coordinator/group/GroupMetadataManagerTest.java
index 4602593b6e3..bbbfe346539 100644
--- 
a/group-coordinator/src/test/java/org/apache/kafka/coordinator/group/GroupMetadataManagerTest.java
+++ 
b/group-coordinator/src/test/java/org/apache/kafka/coordinator/group/GroupMetadataManagerTest.java
@@ -54,6 +54,7 @@ import org.apache.kafka.common.message.ListGroupsResponseData;
 import org.apache.kafka.common.message.ShareGroupDescribeResponseData;
 import org.apache.kafka.common.message.ShareGroupHeartbeatRequestData;
 import org.apache.kafka.common.message.ShareGroupHeartbeatResponseData;
+import org.apache.kafka.common.message.StreamsGroupDescribeResponseData;
 import org.apache.kafka.common.message.StreamsGroupHeartbeatRequestData;
 import org.apache.kafka.common.message.SyncGroupRequestData;
 import 
org.apache.kafka.common.message.SyncGroupRequestData.SyncGroupRequestAssignment;
@@ -90,6 +91,10 @@ import 
org.apache.kafka.coordinator.group.modern.consumer.ResolvedRegularExpress
 import org.apache.kafka.coordinator.group.modern.share.ShareGroup;
 import org.apache.kafka.coordinator.group.modern.share.ShareGroupBuilder;
 import org.apache.kafka.coordinator.group.modern.share.ShareGroupMember;
+import 
org.apache.kafka.coordinator.group.streams.CoordinatorStreamsRecordHelpers;
+import org.apache.kafka.coordinator.group.streams.StreamsGroup;
+import org.apache.kafka.coordinator.group.streams.StreamsGroupBuilder;
+import org.apache.kafka.coordinator.group.streams.StreamsGroupMember;
 import org.apache.kafka.image.MetadataDelta;
 import org.apache.kafka.image.MetadataImage;
 import org.apache.kafka.image.MetadataProvenance;
@@ -8839,6 +8844,113 @@ public class GroupMetadataManagerTest {
         assertEquals(expected, actual);
     }
 
+    @Test
+    public void testStreamsGroupDescribeNoErrors() {
+        List<String> streamsGroupIds = Arrays.asList("group-id-1", 
"group-id-2");
+        int epoch = 10;
+        String memberId = "member-id";
+        StreamsGroupMember.Builder memberBuilder = new 
StreamsGroupMember.Builder(memberId)
+            .setClientTags(Collections.singletonMap("clientTag", 
"clientValue"))
+            .setProcessId("processId")
+            .setMemberEpoch(epoch)
+            .setPreviousMemberEpoch(epoch - 1);
+
+        GroupMetadataManagerTestContext context = new 
GroupMetadataManagerTestContext.Builder()
+            .withStreamsGroup(new StreamsGroupBuilder(streamsGroupIds.get(0), 
epoch))
+            .withStreamsGroup(new StreamsGroupBuilder(streamsGroupIds.get(1), 
epoch)
+                .withMember(memberBuilder.build()))
+            .build();
+
+        List<StreamsGroupDescribeResponseData.DescribedGroup> expected = 
Arrays.asList(
+            new StreamsGroupDescribeResponseData.DescribedGroup()
+                .setGroupEpoch(epoch)
+                .setGroupId(streamsGroupIds.get(0))
+                .setGroupState(StreamsGroup.StreamsGroupState.EMPTY.toString())
+                .setAssignmentEpoch(0),
+            new StreamsGroupDescribeResponseData.DescribedGroup()
+                .setGroupEpoch(epoch)
+                .setGroupId(streamsGroupIds.get(1))
+                .setMembers(Collections.singletonList(
+                    memberBuilder.build().asStreamsGroupDescribeMember(
+                        new 
org.apache.kafka.coordinator.group.streams.Assignment(Collections.emptyMap())
+                    )
+                ))
+                
.setGroupState(StreamsGroup.StreamsGroupState.ASSIGNING.toString())
+        );
+        List<StreamsGroupDescribeResponseData.DescribedGroup> actual = 
context.sendStreamsGroupDescribe(streamsGroupIds);
+
+        assertEquals(expected, actual);
+    }
+
+    @Test
+    public void testStreamsGroupDescribeWithErrors() {
+        String groupId = "groupId";
+
+        MockTaskAssignor assignor = new MockTaskAssignor("mock");
+        GroupMetadataManagerTestContext context = new 
GroupMetadataManagerTestContext.Builder()
+            .withTaskAssignors(Collections.singletonList(assignor))
+            .build();
+
+        List<StreamsGroupDescribeResponseData.DescribedGroup> actual = 
context.sendStreamsGroupDescribe(Collections.singletonList(groupId));
+        StreamsGroupDescribeResponseData.DescribedGroup describedGroup = new 
StreamsGroupDescribeResponseData.DescribedGroup()
+            .setGroupId(groupId)
+            .setErrorCode(Errors.GROUP_ID_NOT_FOUND.code());
+        List<StreamsGroupDescribeResponseData.DescribedGroup> expected = 
Collections.singletonList(
+            describedGroup
+        );
+
+        assertEquals(expected, actual);
+    }
+
+    @Test
+    public void testStreamsGroupDescribeBeforeAndAfterCommittingOffset() {
+        String streamsGroupId = "streamsGroupId";
+        int epoch = 10;
+        String memberId1 = "memberId1";
+        String memberId2 = "memberId2";
+        String subtopologyId = "subtopology1";
+
+        MockTaskAssignor assignor = new MockTaskAssignor("mock");
+        GroupMetadataManagerTestContext context = new 
GroupMetadataManagerTestContext.Builder()
+            .withTaskAssignors(Collections.singletonList(assignor))
+            .build();
+
+        StreamsGroupMember.Builder memberBuilder1 = new 
StreamsGroupMember.Builder(memberId1);
+        
context.replay(CoordinatorStreamsRecordHelpers.newStreamsGroupMemberRecord(streamsGroupId,
 memberBuilder1.build()));
+        
context.replay(CoordinatorStreamsRecordHelpers.newStreamsGroupEpochRecord(streamsGroupId,
 epoch + 1));
+
+        Map<String, Set<Integer>> assignmentMap = new HashMap<>();
+        assignmentMap.put(subtopologyId, Collections.emptySet());
+
+        StreamsGroupMember.Builder memberBuilder2 = new 
StreamsGroupMember.Builder(memberId2);
+        
context.replay(CoordinatorStreamsRecordHelpers.newStreamsGroupMemberRecord(streamsGroupId,
 memberBuilder2.build()));
+        
context.replay(CoordinatorStreamsRecordHelpers.newStreamsTargetAssignmentRecord(streamsGroupId,
 memberId2, assignmentMap, assignmentMap, assignmentMap));
+        
context.replay(CoordinatorStreamsRecordHelpers.newStreamsCurrentAssignmentRecord(streamsGroupId,
 memberBuilder2.build()));
+        
context.replay(CoordinatorStreamsRecordHelpers.newStreamsGroupEpochRecord(streamsGroupId,
 epoch + 2));
+
+        List<StreamsGroupDescribeResponseData.DescribedGroup> actual = 
context.groupMetadataManager.streamsGroupDescribe(Collections.singletonList(streamsGroupId),
 context.lastCommittedOffset);
+        StreamsGroupDescribeResponseData.DescribedGroup describedGroup = new 
StreamsGroupDescribeResponseData.DescribedGroup()
+            .setGroupId(streamsGroupId)
+            .setErrorCode(Errors.GROUP_ID_NOT_FOUND.code());
+        assertEquals(1, actual.size());
+        assertEquals(describedGroup, actual.get(0));
+
+        // Commit the offset and test again
+        context.commit();
+
+        actual = 
context.groupMetadataManager.streamsGroupDescribe(Collections.singletonList(streamsGroupId),
 context.lastCommittedOffset);
+        describedGroup = new StreamsGroupDescribeResponseData.DescribedGroup()
+            .setGroupId(streamsGroupId)
+            .setMembers(Arrays.asList(
+                memberBuilder1.build().asStreamsGroupDescribeMember(new 
org.apache.kafka.coordinator.group.streams.Assignment(Collections.emptyMap())),
+                memberBuilder2.build().asStreamsGroupDescribeMember(new 
org.apache.kafka.coordinator.group.streams.Assignment(assignmentMap, 
assignmentMap, assignmentMap))
+            ))
+            .setGroupState(StreamsGroup.StreamsGroupState.ASSIGNING.toString())
+            .setGroupEpoch(epoch + 2);
+        assertEquals(1, actual.size());
+        assertEquals(describedGroup, actual.get(0));
+    }
+    
     @Test
     public void testDescribeGroupStable() {
         GroupMetadataManagerTestContext context = new 
GroupMetadataManagerTestContext.Builder()
diff --git 
a/group-coordinator/src/test/java/org/apache/kafka/coordinator/group/GroupMetadataManagerTestContext.java
 
b/group-coordinator/src/test/java/org/apache/kafka/coordinator/group/GroupMetadataManagerTestContext.java
index 7e38f61ffd5..45654736316 100644
--- 
a/group-coordinator/src/test/java/org/apache/kafka/coordinator/group/GroupMetadataManagerTestContext.java
+++ 
b/group-coordinator/src/test/java/org/apache/kafka/coordinator/group/GroupMetadataManagerTestContext.java
@@ -36,6 +36,7 @@ import org.apache.kafka.common.message.ListGroupsResponseData;
 import org.apache.kafka.common.message.ShareGroupDescribeResponseData;
 import org.apache.kafka.common.message.ShareGroupHeartbeatRequestData;
 import org.apache.kafka.common.message.ShareGroupHeartbeatResponseData;
+import org.apache.kafka.common.message.StreamsGroupDescribeResponseData;
 import org.apache.kafka.common.message.StreamsGroupHeartbeatRequestData;
 import org.apache.kafka.common.message.StreamsGroupHeartbeatResponseData;
 import org.apache.kafka.common.message.StreamsGroupInitializeRequestData;
@@ -88,6 +89,18 @@ import 
org.apache.kafka.coordinator.group.generated.ShareGroupTargetAssignmentMe
 import 
org.apache.kafka.coordinator.group.generated.ShareGroupTargetAssignmentMemberValue;
 import 
org.apache.kafka.coordinator.group.generated.ShareGroupTargetAssignmentMetadataKey;
 import 
org.apache.kafka.coordinator.group.generated.ShareGroupTargetAssignmentMetadataValue;
+import 
org.apache.kafka.coordinator.group.generated.StreamsGroupCurrentMemberAssignmentKey;
+import 
org.apache.kafka.coordinator.group.generated.StreamsGroupCurrentMemberAssignmentValue;
+import 
org.apache.kafka.coordinator.group.generated.StreamsGroupMemberMetadataKey;
+import 
org.apache.kafka.coordinator.group.generated.StreamsGroupMemberMetadataValue;
+import org.apache.kafka.coordinator.group.generated.StreamsGroupMetadataKey;
+import org.apache.kafka.coordinator.group.generated.StreamsGroupMetadataValue;
+import 
org.apache.kafka.coordinator.group.generated.StreamsGroupPartitionMetadataKey;
+import 
org.apache.kafka.coordinator.group.generated.StreamsGroupPartitionMetadataValue;
+import 
org.apache.kafka.coordinator.group.generated.StreamsGroupTargetAssignmentMemberKey;
+import 
org.apache.kafka.coordinator.group.generated.StreamsGroupTargetAssignmentMemberValue;
+import 
org.apache.kafka.coordinator.group.generated.StreamsGroupTargetAssignmentMetadataKey;
+import 
org.apache.kafka.coordinator.group.generated.StreamsGroupTargetAssignmentMetadataValue;
 import org.apache.kafka.coordinator.group.generated.StreamsGroupTopologyKey;
 import org.apache.kafka.coordinator.group.generated.StreamsGroupTopologyValue;
 import org.apache.kafka.coordinator.group.metrics.GroupCoordinatorMetricsShard;
@@ -96,6 +109,7 @@ import 
org.apache.kafka.coordinator.group.modern.consumer.ConsumerGroup;
 import org.apache.kafka.coordinator.group.modern.consumer.ConsumerGroupBuilder;
 import org.apache.kafka.coordinator.group.modern.share.ShareGroup;
 import org.apache.kafka.coordinator.group.modern.share.ShareGroupBuilder;
+import org.apache.kafka.coordinator.group.streams.StreamsGroupBuilder;
 import org.apache.kafka.coordinator.group.taskassignor.TaskAssignor;
 import org.apache.kafka.image.MetadataImage;
 import org.apache.kafka.server.common.ApiMessageAndVersion;
@@ -139,6 +153,7 @@ import static org.junit.jupiter.api.Assertions.assertTrue;
 import static org.junit.jupiter.api.Assertions.fail;
 import static org.mockito.Mockito.mock;
 
+@SuppressWarnings("CyclomaticComplexity")
 public class GroupMetadataManagerTestContext {
     static final String DEFAULT_CLIENT_ID = "client";
     static final InetAddress DEFAULT_CLIENT_ADDRESS = 
InetAddress.getLoopbackAddress();
@@ -417,6 +432,9 @@ public class GroupMetadataManagerTestContext {
         private final List<ConsumerGroupBuilder> consumerGroupBuilders = new 
ArrayList<>();
         private int consumerGroupMaxSize = Integer.MAX_VALUE;
         private int consumerGroupMetadataRefreshIntervalMs = Integer.MAX_VALUE;
+        private final List<StreamsGroupBuilder> streamsGroupBuilders = new 
ArrayList<>();
+        private int streamsGroupMaxSize = Integer.MAX_VALUE;
+        private int streamsGroupMetadataRefreshIntervalMs = Integer.MAX_VALUE;
         private int classicGroupMaxSize = Integer.MAX_VALUE;
         private int classicGroupInitialRebalanceDelayMs = 3000;
         private final int classicGroupNewMemberJoinTimeoutMs = 5 * 60 * 1000;
@@ -428,7 +446,7 @@ public class GroupMetadataManagerTestContext {
         private ShareGroupPartitionAssignor shareGroupAssignor = new 
MockPartitionAssignor("share");
         private final List<ShareGroupBuilder> shareGroupBuilders = new 
ArrayList<>();
         private int shareGroupMaxSize = Integer.MAX_VALUE;
-        private List<TaskAssignor> taskAssignors = 
Collections.singletonList(new MockTaskAssignor("mock"));
+        private List<TaskAssignor> streamsGroupAssignors = 
Collections.singletonList(new MockTaskAssignor("mock"));
 
         public Builder withMetadataImage(MetadataImage metadataImage) {
             this.metadataImage = metadataImage;
@@ -485,6 +503,26 @@ public class GroupMetadataManagerTestContext {
             return this;
         }
 
+        public Builder withTaskAssignors(List<TaskAssignor> assignors) {
+            this.streamsGroupAssignors = assignors;
+            return this;
+        }
+
+        public Builder withStreamsGroup(StreamsGroupBuilder builder) {
+            this.streamsGroupBuilders.add(builder);
+            return this;
+        }
+
+        public Builder withStreamsGroupMaxSize(int streamsGroupMaxSize) {
+            this.streamsGroupMaxSize = streamsGroupMaxSize;
+            return this;
+        }
+
+        public Builder withStreamsGroupMetadataRefreshIntervalMs(int 
streamsGroupMetadataRefreshIntervalMs) {
+            this.streamsGroupMetadataRefreshIntervalMs = 
streamsGroupMetadataRefreshIntervalMs;
+            return this;
+        }
+
         public Builder withShareGroupAssignor(ShareGroupPartitionAssignor 
shareGroupAssignor) {
             this.shareGroupAssignor = shareGroupAssignor;
             return this;
@@ -528,7 +566,11 @@ public class GroupMetadataManagerTestContext {
                     .withShareGroupAssignor(shareGroupAssignor)
                     .withShareGroupMaxSize(shareGroupMaxSize)
                     .withGroupConfigManager(groupConfigManager)
-                    .withStreamsGroupAssignors(taskAssignors)
+                    .withStreamsGroupHeartbeatInterval(5000)
+                    .withStreamsGroupSessionTimeout(45000)
+                    .withStreamsGroupMaxSize(streamsGroupMaxSize)
+                    .withStreamsGroupAssignors(streamsGroupAssignors)
+                    
.withStreamsGroupMetadataRefreshIntervalMs(streamsGroupMetadataRefreshIntervalMs)
                     .build(),
                 groupConfigManager,
                 classicGroupInitialRebalanceDelayMs,
@@ -538,6 +580,8 @@ public class GroupMetadataManagerTestContext {
             consumerGroupBuilders.forEach(builder -> 
builder.build(metadataImage.topics()).forEach(context::replay));
             shareGroupBuilders.forEach(builder -> 
builder.build(metadataImage.topics()).forEach(context::replay));
 
+            streamsGroupBuilders.forEach(builder -> 
builder.build().forEach(context::replay));
+
             context.commit();
 
             return context;
@@ -1354,6 +1398,10 @@ public class GroupMetadataManagerTestContext {
         return groupMetadataManager.consumerGroupDescribe(groupIds, 
lastCommittedOffset);
     }
 
+    public List<StreamsGroupDescribeResponseData.DescribedGroup> 
sendStreamsGroupDescribe(List<String> groupIds) {
+        return groupMetadataManager.streamsGroupDescribe(groupIds, 
lastCommittedOffset);
+    }
+
     public List<DescribeGroupsResponseData.DescribedGroup> 
describeGroups(List<String> groupIds) {
         return groupMetadataManager.describeGroups(groupIds, 
lastCommittedOffset);
     }
@@ -1653,6 +1701,48 @@ public class GroupMetadataManagerTestContext {
                 );
                 break;
 
+            case 
StreamsGroupCurrentMemberAssignmentKey.HIGHEST_SUPPORTED_VERSION:
+                groupMetadataManager.replay(
+                    (StreamsGroupCurrentMemberAssignmentKey) key.message(),
+                    (StreamsGroupCurrentMemberAssignmentValue) 
messageOrNull(value)
+                );
+                break;
+
+            case StreamsGroupMemberMetadataKey.HIGHEST_SUPPORTED_VERSION:
+                groupMetadataManager.replay(
+                    (StreamsGroupMemberMetadataKey) key.message(),
+                    (StreamsGroupMemberMetadataValue) messageOrNull(value)
+                );
+                break;
+
+            case StreamsGroupMetadataKey.HIGHEST_SUPPORTED_VERSION:
+                groupMetadataManager.replay(
+                    (StreamsGroupMetadataKey) key.message(),
+                    (StreamsGroupMetadataValue) messageOrNull(value)
+                );
+                break;
+
+            case StreamsGroupPartitionMetadataKey.HIGHEST_SUPPORTED_VERSION:
+                groupMetadataManager.replay(
+                    (StreamsGroupPartitionMetadataKey) key.message(),
+                    (StreamsGroupPartitionMetadataValue) messageOrNull(value)
+                );
+                break;
+
+            case 
StreamsGroupTargetAssignmentMemberKey.HIGHEST_SUPPORTED_VERSION:
+                groupMetadataManager.replay(
+                    (StreamsGroupTargetAssignmentMemberKey) key.message(),
+                    (StreamsGroupTargetAssignmentMemberValue) 
messageOrNull(value)
+                );
+                break;
+
+            case 
StreamsGroupTargetAssignmentMetadataKey.HIGHEST_SUPPORTED_VERSION:
+                groupMetadataManager.replay(
+                    (StreamsGroupTargetAssignmentMetadataKey) key.message(),
+                    (StreamsGroupTargetAssignmentMetadataValue) 
messageOrNull(value)
+                );
+                break;
+
             case StreamsGroupTopologyKey.HIGHEST_SUPPORTED_VERSION:
                 groupMetadataManager.replay(
                     (StreamsGroupTopologyKey) key.message(),
diff --git 
a/group-coordinator/src/test/java/org/apache/kafka/coordinator/group/streams/StreamsGroupBuilder.java
 
b/group-coordinator/src/test/java/org/apache/kafka/coordinator/group/streams/StreamsGroupBuilder.java
new file mode 100644
index 00000000000..b07dabc05e1
--- /dev/null
+++ 
b/group-coordinator/src/test/java/org/apache/kafka/coordinator/group/streams/StreamsGroupBuilder.java
@@ -0,0 +1,101 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ *    http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.kafka.coordinator.group.streams;
+
+import java.util.ArrayList;
+import java.util.HashMap;
+import java.util.List;
+import java.util.Map;
+import java.util.Set;
+import org.apache.kafka.coordinator.group.CoordinatorRecord;
+
+public class StreamsGroupBuilder {
+
+    private final String groupId;
+    private final int groupEpoch;
+    private int assignmentEpoch;
+    private final Map<String, StreamsGroupMember> members = new HashMap<>();
+    private final Map<String, Assignment> assignments = new HashMap<>();
+    private Map<String, TopicMetadata> partitionMetadata = new HashMap<>();
+
+    public StreamsGroupBuilder(String groupId, int groupEpoch) {
+        this.groupId = groupId;
+        this.groupEpoch = groupEpoch;
+        this.assignmentEpoch = 0;
+    }
+
+    public StreamsGroupBuilder withMember(StreamsGroupMember member) {
+        this.members.put(member.memberId(), member);
+        return this;
+    }
+
+    public StreamsGroupBuilder withPartitionMetadata(
+        Map<String, TopicMetadata> partitionMetadata) {
+        this.partitionMetadata = partitionMetadata;
+        return this;
+    }
+
+    public StreamsGroupBuilder withAssignment(String memberId,
+        Map<String, Set<Integer>> assignment) {
+        this.assignments.put(memberId, new Assignment(assignment));
+        return this;
+    }
+
+    public StreamsGroupBuilder withAssignmentEpoch(int assignmentEpoch) {
+        this.assignmentEpoch = assignmentEpoch;
+        return this;
+    }
+
+    public List<CoordinatorRecord> build() {
+        List<CoordinatorRecord> records = new ArrayList<>();
+
+        // Add records for members.
+        members.forEach((memberId, member) ->
+            records.add(
+                
CoordinatorStreamsRecordHelpers.newStreamsGroupMemberRecord(groupId, member))
+        );
+
+        if (!partitionMetadata.isEmpty()) {
+            records.add(
+                
CoordinatorStreamsRecordHelpers.newStreamsGroupPartitionMetadataRecord(groupId,
+                    partitionMetadata));
+        }
+
+        // Add group epoch record.
+        records.add(
+            
CoordinatorStreamsRecordHelpers.newStreamsGroupEpochRecord(groupId, 
groupEpoch));
+
+        // Add target assignment records.
+        assignments.forEach((memberId, assignment) ->
+            records.add(
+                
CoordinatorStreamsRecordHelpers.newStreamsTargetAssignmentRecord(groupId, 
memberId,
+                    assignment.activeTasks(), assignment.standbyTasks(), 
assignment.warmupTasks()))
+        );
+
+        // Add target assignment epoch.
+        
records.add(CoordinatorStreamsRecordHelpers.newStreamsTargetAssignmentEpochRecord(groupId,
+            assignmentEpoch));
+
+        // Add current assignment records for members.
+        members.forEach((memberId, member) ->
+            records.add(
+                
CoordinatorStreamsRecordHelpers.newStreamsCurrentAssignmentRecord(groupId, 
member))
+        );
+
+        return records;
+    }
+}
diff --git 
a/group-coordinator/src/test/java/org/apache/kafka/coordinator/group/streams/StreamsGroupMemberTest.java
 
b/group-coordinator/src/test/java/org/apache/kafka/coordinator/group/streams/StreamsGroupMemberTest.java
index f4e4eccdaa0..35ab964d1b6 100644
--- 
a/group-coordinator/src/test/java/org/apache/kafka/coordinator/group/streams/StreamsGroupMemberTest.java
+++ 
b/group-coordinator/src/test/java/org/apache/kafka/coordinator/group/streams/StreamsGroupMemberTest.java
@@ -16,6 +16,11 @@
  */
 package org.apache.kafka.coordinator.group.streams;
 
+import java.util.HashSet;
+import java.util.List;
+import java.util.Map;
+import org.apache.kafka.common.Uuid;
+import org.apache.kafka.common.message.StreamsGroupDescribeResponseData;
 import 
org.apache.kafka.coordinator.group.generated.StreamsGroupCurrentMemberAssignmentValue;
 import 
org.apache.kafka.coordinator.group.generated.StreamsGroupCurrentMemberAssignmentValue.TaskIds;
 import 
org.apache.kafka.coordinator.group.generated.StreamsGroupMemberMetadataValue;
@@ -50,7 +55,6 @@ public class StreamsGroupMemberTest {
             .setClientId("client-id")
             .setClientHost("hostname")
             .setTopologyId("topology-hash")
-            .setAssignor("assignor")
             .setProcessId("process-id")
             .setUserEndpoint(new 
StreamsGroupMemberMetadataValue.Endpoint().setHost("host").setPort(9090))
             .setClientTags(mkMap(mkEntry("client", "tag")))
@@ -68,7 +72,6 @@ public class StreamsGroupMemberTest {
         assertEquals("client-id", member.clientId());
         assertEquals("hostname", member.clientHost());
         assertEquals("topology-hash", member.topologyId());
-        assertEquals("assignor", member.assignor().get());
         assertEquals("process-id", member.processId());
         assertEquals(new 
StreamsGroupMemberMetadataValue.Endpoint().setHost("host").setPort(9090), 
member.userEndpoint());
         assertEquals(
@@ -107,7 +110,6 @@ public class StreamsGroupMemberTest {
             .setClientId("client-id")
             .setClientHost("hostname")
             .setTopologyId("topology-hash")
-            .setAssignor("assignor")
             .setProcessId("process-id")
             .setUserEndpoint(new 
StreamsGroupMemberMetadataValue.Endpoint().setHost("host").setPort(9090))
             .setClientTags(mkMap(mkEntry("client", "tag")))
@@ -126,7 +128,6 @@ public class StreamsGroupMemberTest {
             .setClientId("client-id")
             .setClientHost("hostname")
             .setTopologyId("topology-hash")
-            .setAssignor("assignor")
             .setProcessId("process-id")
             .setUserEndpoint(new 
StreamsGroupMemberMetadataValue.Endpoint().setHost("host").setPort(9090))
             .setClientTags(mkMap(mkEntry("client", "tag")))
@@ -145,7 +146,6 @@ public class StreamsGroupMemberTest {
             .setClientId("client-id")
             .setClientHost("hostname")
             .setTopologyId("topology-hash")
-            .setAssignor("assignor")
             .setProcessId("process-id")
             .setUserEndpoint(new 
StreamsGroupMemberMetadataValue.Endpoint().setHost("host").setPort(9090))
             .setClientTags(mkMap(mkEntry("client", "tag")))
@@ -173,7 +173,6 @@ public class StreamsGroupMemberTest {
             .setClientId("client-id")
             .setClientHost("hostname")
             .setTopologyId("topology-hash")
-            .setAssignor("assignor")
             .setProcessId("process-id")
             .setUserEndpoint(new 
StreamsGroupMemberMetadataValue.Endpoint().setHost("host").setPort(9090))
             .setClientTags(mkMap(mkEntry("client", "tag")))
@@ -188,7 +187,6 @@ public class StreamsGroupMemberTest {
             .maybeUpdateRackId(Optional.empty())
             .maybeUpdateInstanceId(Optional.empty())
             .maybeUpdateRebalanceTimeoutMs(OptionalInt.empty())
-            .maybeUpdateAssignor(Optional.empty())
             .build();
 
         assertEquals(member, updatedMember);
@@ -197,13 +195,11 @@ public class StreamsGroupMemberTest {
             .maybeUpdateRackId(Optional.of("new-rack-id"))
             .maybeUpdateInstanceId(Optional.of("new-instance-id"))
             .maybeUpdateRebalanceTimeoutMs(OptionalInt.of(6000))
-            .maybeUpdateAssignor(Optional.of("new-assignor"))
             .build();
 
         assertEquals("new-instance-id", updatedMember.instanceId());
         assertEquals("new-rack-id", updatedMember.rackId());
         assertEquals(6000, updatedMember.rebalanceTimeoutMs());
-        assertEquals("new-assignor", updatedMember.assignor().get());
     }
 
     @Test
@@ -275,4 +271,117 @@ public class StreamsGroupMemberTest {
         assertEquals(mkAssignment(mkTaskAssignment(subtopologyId1, 7, 8, 9)), 
member.assignedWarmupTasks());
         assertEquals(mkAssignment(mkTaskAssignment(subtopologyId2, 2, 3, 1)), 
member.activeTasksPendingRevocation());
     }
+
+    @Test
+    public void testAsStreamsGroupDescribeMember() {
+        String subTopology1 = Uuid.randomUuid().toString();
+        String subTopology2 = Uuid.randomUuid().toString();
+        String subTopology3 = Uuid.randomUuid().toString();
+        String subTopology4 = Uuid.randomUuid().toString();
+        List<Integer> assignedTasks1 = Arrays.asList(0, 1, 2);
+        List<Integer> assignedTasks2 = Arrays.asList(3, 4, 5);
+        List<Integer> assignedTasks3 = Arrays.asList(6, 7, 8);
+        List<Integer> assignedTasks4 = Arrays.asList(5, 6, 7);
+        int epoch = 10;
+        StreamsGroupCurrentMemberAssignmentValue record = new 
StreamsGroupCurrentMemberAssignmentValue()
+            .setMemberEpoch(epoch)
+            .setPreviousMemberEpoch(epoch - 1)
+            .setActiveTasks(Collections.singletonList(new 
StreamsGroupCurrentMemberAssignmentValue.TaskIds()
+                .setSubtopology(subTopology1)
+                .setPartitions(assignedTasks1)))
+            .setStandbyTasks(Collections.singletonList(new 
StreamsGroupCurrentMemberAssignmentValue.TaskIds()
+                .setSubtopology(subTopology2)
+                .setPartitions(assignedTasks2)))
+            .setWarmupTasks(Collections.singletonList(new 
StreamsGroupCurrentMemberAssignmentValue.TaskIds()
+                .setSubtopology(subTopology3)
+                .setPartitions(assignedTasks3)))
+            .setActiveTasksPendingRevocation(Collections.singletonList(new 
StreamsGroupCurrentMemberAssignmentValue.TaskIds()
+                .setSubtopology(subTopology4)
+                .setPartitions(assignedTasks4)));
+        String memberId = Uuid.randomUuid().toString();
+        String clientId = "clientId";
+        String instanceId = "instanceId";
+        String rackId = "rackId";
+        String clientHost = "clientHost";
+        String processId = "processId";
+        String topologyId = "topologyId";
+        Map<String, String> clientTags = Collections.singletonMap("key", 
"value");
+        org.apache.kafka.coordinator.group.streams.Assignment targetAssignment 
= new org.apache.kafka.coordinator.group.streams.Assignment(
+            mkMap(mkEntry(subTopology1, new HashSet<>(assignedTasks3))),
+            mkMap(mkEntry(subTopology2, new HashSet<>(assignedTasks2))),
+            mkMap(mkEntry(subTopology3, new HashSet<>(assignedTasks1)))
+        );
+        StreamsGroupMember member = new StreamsGroupMember.Builder(memberId)
+            .updateWith(record)
+            .setClientId(clientId)
+            .setInstanceId(instanceId)
+            .setRackId(rackId)
+            .setClientHost(clientHost)
+            .setProcessId(processId)
+            .setTopologyId(topologyId)
+            .setClientTags(clientTags)
+            .setAssignedActiveTasks(
+                mkMap(mkEntry(subTopology1, new HashSet<>(assignedTasks1)))
+            )
+            .setAssignedStandbyTasks(
+                mkMap(mkEntry(subTopology2, new HashSet<>(assignedTasks2)))
+            )
+            .setAssignedWarmupTasks(
+                mkMap(mkEntry(subTopology3, new HashSet<>(assignedTasks3)))
+            )
+            .setActiveTasksPendingRevocation(
+                mkMap(mkEntry(subTopology3, new HashSet<>(assignedTasks4)))
+            )
+            .build();
+
+        StreamsGroupDescribeResponseData.Member actual = 
member.asStreamsGroupDescribeMember(targetAssignment);
+        StreamsGroupDescribeResponseData.Member expected = new 
StreamsGroupDescribeResponseData.Member()
+            .setMemberId(memberId)
+            .setMemberEpoch(epoch)
+            .setClientId(clientId)
+            .setInstanceId(instanceId)
+            .setRackId(rackId)
+            .setClientHost(clientHost)
+            .setProcessId(processId)
+            .setTopologyId(topologyId)
+            .setClientTags(Collections.singletonList(new 
StreamsGroupDescribeResponseData.KeyValue().setKey("key").setValue("value")))
+            .setAssignment(
+                new StreamsGroupDescribeResponseData.Assignment()
+                    .setActiveTasks(Collections.singletonList(new 
StreamsGroupDescribeResponseData.TaskIds()
+                        .setSubtopology(subTopology1)
+                        .setPartitions(assignedTasks1)))
+                    .setStandbyTasks(Collections.singletonList(new 
StreamsGroupDescribeResponseData.TaskIds()
+                        .setSubtopology(subTopology2)
+                        .setPartitions(assignedTasks2)))
+                    .setWarmupTasks(Collections.singletonList(new 
StreamsGroupDescribeResponseData.TaskIds()
+                        .setSubtopology(subTopology3)
+                        .setPartitions(assignedTasks3)))
+            )
+            .setTargetAssignment(
+                new StreamsGroupDescribeResponseData.Assignment()
+                    .setActiveTasks(Collections.singletonList(new 
StreamsGroupDescribeResponseData.TaskIds()
+                        .setSubtopology(subTopology1)
+                        .setPartitions(assignedTasks3)))
+                    .setStandbyTasks(Collections.singletonList(new 
StreamsGroupDescribeResponseData.TaskIds()
+                        .setSubtopology(subTopology2)
+                        .setPartitions(assignedTasks2)))
+                    .setWarmupTasks(Collections.singletonList(new 
StreamsGroupDescribeResponseData.TaskIds()
+                        .setSubtopology(subTopology3)
+                        .setPartitions(assignedTasks1)))
+            );
+        // TODO: Add TaskOffsets
+
+        assertEquals(expected, actual);
+    }
+
+    @Test
+    public void testAsStreamsGroupDescribeWithTargetAssignmentNull() {
+        StreamsGroupMember member = new 
StreamsGroupMember.Builder(Uuid.randomUuid().toString())
+            .build();
+
+        StreamsGroupDescribeResponseData.Member streamsGroupDescribeMember = 
member.asStreamsGroupDescribeMember(
+            null);
+
+        assertEquals(new StreamsGroupDescribeResponseData.Assignment(), 
streamsGroupDescribeMember.targetAssignment());
+    }
 }
diff --git 
a/group-coordinator/src/test/java/org/apache/kafka/coordinator/group/streams/StreamsGroupTest.java
 
b/group-coordinator/src/test/java/org/apache/kafka/coordinator/group/streams/StreamsGroupTest.java
index c142c9b9cd7..3da224f3a95 100644
--- 
a/group-coordinator/src/test/java/org/apache/kafka/coordinator/group/streams/StreamsGroupTest.java
+++ 
b/group-coordinator/src/test/java/org/apache/kafka/coordinator/group/streams/StreamsGroupTest.java
@@ -88,9 +88,7 @@ public class StreamsGroupTest {
 
         member = streamsGroup.getOrMaybeCreateMember("member", true);
 
-        member = new StreamsGroupMember.Builder(member)
-            .setAssignor("client")
-            .build();
+        member = new StreamsGroupMember.Builder(member).build();
 
         streamsGroup.updateMember(member);
 
diff --git 
a/group-coordinator/src/test/java/org/apache/kafka/coordinator/group/streams/StreamsTopologyTest.java
 
b/group-coordinator/src/test/java/org/apache/kafka/coordinator/group/streams/StreamsTopologyTest.java
new file mode 100644
index 00000000000..7ef086c9be5
--- /dev/null
+++ 
b/group-coordinator/src/test/java/org/apache/kafka/coordinator/group/streams/StreamsTopologyTest.java
@@ -0,0 +1,182 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ *    http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.kafka.coordinator.group.streams;
+
+import static org.apache.kafka.common.utils.Utils.mkEntry;
+import static org.apache.kafka.common.utils.Utils.mkMap;
+import static org.junit.jupiter.api.Assertions.assertEquals;
+import static org.junit.jupiter.api.Assertions.assertNotEquals;
+import static org.junit.jupiter.api.Assertions.assertTrue;
+
+import java.util.Arrays;
+import java.util.Collections;
+import java.util.HashSet;
+import java.util.List;
+import java.util.Map;
+import java.util.Set;
+import org.apache.kafka.common.message.StreamsGroupDescribeResponseData;
+import org.apache.kafka.coordinator.group.generated.StreamsGroupTopologyValue;
+import 
org.apache.kafka.coordinator.group.generated.StreamsGroupTopologyValue.Subtopology;
+import 
org.apache.kafka.coordinator.group.generated.StreamsGroupTopologyValue.TopicInfo;
+import org.junit.jupiter.api.Test;
+
+public class StreamsTopologyTest {
+
+    @Test
+    public void streamsTopologyIdShouldBeCorrect() {
+        StreamsTopology topology = new StreamsTopology("topology-id", 
Collections.emptyMap());
+        assertEquals("topology-id", topology.topologyId());
+    }
+
+    @Test
+    public void subtopologiesShouldBeCorrect() {
+        Map<String, Subtopology> subtopologies = mkMap(
+            mkEntry("subtopology-1", new 
Subtopology().setSubtopology("subtopology-1")),
+            mkEntry("subtopology-2", new 
Subtopology().setSubtopology("subtopology-2"))
+        );
+        StreamsTopology topology = new StreamsTopology("topology-id", 
subtopologies);
+        assertEquals(subtopologies, topology.subtopologies());
+    }
+
+    @Test
+    public void topicSubscriptionShouldBeCorrect() {
+        Map<String, Subtopology> subtopologies = mkMap(
+            mkEntry("subtopology-1", new Subtopology()
+                .setSourceTopics(Arrays.asList("source-topic-1", 
"source-topic-2"))
+                .setRepartitionSourceTopics(Arrays.asList(
+                    new TopicInfo().setName("repartition-topic-1"),
+                    new TopicInfo().setName("repartition-topic-2")
+                ))
+            ),
+            mkEntry("subtopology-2", new Subtopology()
+                .setSourceTopics(Arrays.asList("source-topic-3", 
"source-topic-4"))
+                .setRepartitionSourceTopics(Arrays.asList(
+                    new TopicInfo().setName("repartition-topic-3"),
+                    new TopicInfo().setName("repartition-topic-4")
+                ))
+            )
+        );
+        StreamsTopology topology = new StreamsTopology("topology-id", 
subtopologies);
+        Set<String> expectedTopics = new HashSet<>(Arrays.asList(
+            "source-topic-1", "source-topic-2", "repartition-topic-1", 
"repartition-topic-2",
+            "source-topic-3", "source-topic-4", "repartition-topic-3", 
"repartition-topic-4"
+        ));
+        assertEquals(expectedTopics, topology.topicSubscription());
+    }
+
+    @Test
+    public void fromRecordShouldCreateCorrectTopology() {
+        StreamsGroupTopologyValue record = new StreamsGroupTopologyValue()
+            .setTopologyId("topology-id")
+            .setTopology(Arrays.asList(
+                new Subtopology().setSubtopology("subtopology-1"),
+                new Subtopology().setSubtopology("subtopology-2")
+            ));
+        StreamsTopology topology = StreamsTopology.fromRecord(record);
+        assertEquals("topology-id", topology.topologyId());
+        assertEquals(2, topology.subtopologies().size());
+        assertTrue(topology.subtopologies().containsKey("subtopology-1"));
+        assertTrue(topology.subtopologies().containsKey("subtopology-2"));
+    }
+
+    @Test
+    public void equalsShouldReturnTrueForEqualTopologies() {
+        Map<String, Subtopology> subtopologies = mkMap(
+            mkEntry("subtopology-1", new 
Subtopology().setSubtopology("subtopology-1")),
+            mkEntry("subtopology-2", new 
Subtopology().setSubtopology("subtopology-2"))
+        );
+        StreamsTopology topology1 = new StreamsTopology("topology-id", 
subtopologies);
+        StreamsTopology topology2 = new StreamsTopology("topology-id", 
subtopologies);
+        assertEquals(topology1, topology2);
+    }
+
+    @Test
+    public void equalsShouldReturnFalseForDifferentTopologies() {
+        Map<String, Subtopology> subtopologies1 = mkMap(
+            mkEntry("subtopology-1", new 
Subtopology().setSubtopology("subtopology-1"))
+        );
+        Map<String, Subtopology> subtopologies2 = mkMap(
+            mkEntry("subtopology-2", new 
Subtopology().setSubtopology("subtopology-2"))
+        );
+        StreamsTopology topology1 = new StreamsTopology("topology-id-1", 
subtopologies1);
+        StreamsTopology topology2 = new StreamsTopology("topology-id-2", 
subtopologies2);
+        assertNotEquals(topology1, topology2);
+    }
+
+    @Test
+    public void hashCodeShouldBeConsistentWithEquals() {
+        Map<String, Subtopology> subtopologies = mkMap(
+            mkEntry("subtopology-1", new 
Subtopology().setSubtopology("subtopology-1")),
+            mkEntry("subtopology-2", new 
Subtopology().setSubtopology("subtopology-2"))
+        );
+        StreamsTopology topology1 = new StreamsTopology("topology-id", 
subtopologies);
+        StreamsTopology topology2 = new StreamsTopology("topology-id", 
subtopologies);
+        assertEquals(topology1.hashCode(), topology2.hashCode());
+    }
+
+    @Test
+    public void toStringShouldReturnCorrectRepresentation() {
+        Map<String, Subtopology> subtopologies = mkMap(
+            mkEntry("subtopology-1", new 
Subtopology().setSubtopology("subtopology-1")),
+            mkEntry("subtopology-2", new 
Subtopology().setSubtopology("subtopology-2"))
+        );
+        StreamsTopology topology = new StreamsTopology("topology-id", 
subtopologies);
+        String expectedString = "StreamsTopology{topologyId=topology-id, 
subtopologies=" + subtopologies + "}";
+        assertEquals(expectedString, topology.toString());
+    }
+
+    @Test
+    public void 
asStreamsGroupDescribeTopologyShouldReturnCorrectSubtopologies() {
+        Map<String, Subtopology> subtopologies = mkMap(
+            mkEntry("subtopology-1", new Subtopology()
+                .setSourceTopicRegex("regex-1")
+                .setSubtopology("subtopology-1")
+                .setSourceTopics(Collections.singletonList("source-topic-1"))
+                
.setRepartitionSinkTopics(Collections.singletonList("sink-topic-1"))
+                .setRepartitionSourceTopics(
+                    Collections.singletonList(new 
TopicInfo().setName("repartition-topic-1")))
+                .setStateChangelogTopics(
+                    Collections.singletonList(new 
TopicInfo().setName("changelog-topic-1")))
+            ),
+            mkEntry("subtopology-2", new Subtopology()
+                .setSourceTopicRegex("regex-2")
+                .setSubtopology("subtopology-2")
+                .setSourceTopics(Collections.singletonList("source-topic-2"))
+                
.setRepartitionSinkTopics(Collections.singletonList("sink-topic-2"))
+                .setRepartitionSourceTopics(
+                    Collections.singletonList(new 
TopicInfo().setName("repartition-topic-2")))
+                .setStateChangelogTopics(
+                    Collections.singletonList(new 
TopicInfo().setName("changelog-topic-2")))
+            )
+        );
+        StreamsTopology topology = new StreamsTopology("topology-id", 
subtopologies);
+        List<StreamsGroupDescribeResponseData.Subtopology> result = 
topology.asStreamsGroupDescribeTopology();
+        assertEquals(2, result.size());
+        assertEquals("regex-1", result.get(0).sourceTopicRegex());
+        assertEquals("subtopology-1", result.get(0).subtopology());
+        assertEquals(Collections.singletonList("source-topic-1"), 
result.get(0).sourceTopics());
+        assertEquals(Collections.singletonList("sink-topic-1"), 
result.get(0).repartitionSinkTopics());
+        assertEquals("repartition-topic-1", 
result.get(0).repartitionSourceTopics().get(0).name());
+        assertEquals("changelog-topic-1", 
result.get(0).stateChangelogTopics().get(0).name());
+        assertEquals("regex-2", result.get(1).sourceTopicRegex());
+        assertEquals("subtopology-2", result.get(1).subtopology());
+        assertEquals(Collections.singletonList("source-topic-2"), 
result.get(1).sourceTopics());
+        assertEquals(Collections.singletonList("sink-topic-2"), 
result.get(1).repartitionSinkTopics());
+        assertEquals("repartition-topic-2", 
result.get(1).repartitionSourceTopics().get(0).name());
+        assertEquals("changelog-topic-2", 
result.get(1).stateChangelogTopics().get(0).name());
+    }
+}
diff --git 
a/group-coordinator/src/test/java/org/apache/kafka/coordinator/group/streams/TargetAssignmentBuilderTest.java
 
b/group-coordinator/src/test/java/org/apache/kafka/coordinator/group/streams/TargetAssignmentBuilderTest.java
index b16664b8f30..d97832eb969 100644
--- 
a/group-coordinator/src/test/java/org/apache/kafka/coordinator/group/streams/TargetAssignmentBuilderTest.java
+++ 
b/group-coordinator/src/test/java/org/apache/kafka/coordinator/group/streams/TargetAssignmentBuilderTest.java
@@ -225,8 +225,7 @@ public class TargetAssignmentBuilderTest {
                 .withTopology(topology)
                 .withStaticMembers(staticMembers)
                 .withSubscriptionMetadata(subscriptionMetadata)
-                .withTargetAssignment(targetAssignment)
-                .withTopicsImage(topicsImage);
+                .withTargetAssignment(targetAssignment);
 
             // Add the updated members or delete the deleted members.
             updatedMembers.forEach((memberId, updatedMemberOrNull) -> {
@@ -262,12 +261,10 @@ public class TargetAssignmentBuilderTest {
             .topics();
 
         final Map<String, String> clientTags = mkMap(mkEntry("tag1", 
"value1"), mkEntry("tag2", "value2"));
-        final Map<String, String> assignmentConfigs = mkMap(mkEntry("conf1", 
"value1"), mkEntry("conf2", "value2"));
         StreamsGroupMember member = new StreamsGroupMember.Builder("member-id")
             .setRackId("rackId")
             .setInstanceId("instanceId")
             .setProcessId("processId")
-            .setAssignor("assignor")
             .setClientTags(clientTags)
             .build();
 


Reply via email to