This is an automated email from the ASF dual-hosted git repository.

lucasbru pushed a commit to branch trunk
in repository https://gitbox.apache.org/repos/asf/kafka.git


The following commit(s) were added to refs/heads/trunk by this push:
     new 618ea2c1ca6 KAFKA-18285: Add describeStreamsGroup to Admin API (#19116)
618ea2c1ca6 is described below

commit 618ea2c1ca60825a86a76453d4fe60eecdc01dee
Author: Lucas Brutschy <[email protected]>
AuthorDate: Fri Mar 7 15:56:07 2025 +0100

    KAFKA-18285: Add describeStreamsGroup to Admin API (#19116)
    
    Adds `describeStreamsGroup` to Admin API.
    
    This exposes the result of the `DESCRIBE_STREAMS_GROUP` RPC in the Admin
    API.
    
    Reviewers: Bill Bejeck <[email protected]>
---
 .../java/org/apache/kafka/clients/admin/Admin.java |  23 ++
 .../admin/DescribeStreamsGroupsOptions.java        |  41 +++
 .../clients/admin/DescribeStreamsGroupsResult.java |  68 ++++
 .../kafka/clients/admin/ForwardingAdmin.java       |   5 +
 .../kafka/clients/admin/KafkaAdminClient.java      |  12 +
 .../clients/admin/StreamsGroupDescription.java     | 180 ++++++++++
 .../admin/StreamsGroupMemberAssignment.java        | 150 +++++++++
 .../admin/StreamsGroupMemberDescription.java       | 371 +++++++++++++++++++++
 .../admin/StreamsGroupSubtopologyDescription.java  | 196 +++++++++++
 .../internals/DescribeStreamsGroupsHandler.java    | 284 ++++++++++++++++
 .../kafka/clients/admin/KafkaAdminClientTest.java  | 341 +++++++++++++++++++
 .../kafka/clients/admin/MockAdminClient.java       |   5 +
 .../TestingMetricsInterceptingAdminClient.java     |   7 +
 13 files changed, 1683 insertions(+)

diff --git a/clients/src/main/java/org/apache/kafka/clients/admin/Admin.java 
b/clients/src/main/java/org/apache/kafka/clients/admin/Admin.java
index 7f433736a9a..9b537f79305 100644
--- a/clients/src/main/java/org/apache/kafka/clients/admin/Admin.java
+++ b/clients/src/main/java/org/apache/kafka/clients/admin/Admin.java
@@ -1959,6 +1959,29 @@ public interface Admin extends AutoCloseable {
      */
     DeleteShareGroupsResult deleteShareGroups(Collection<String> groupIds, 
DeleteShareGroupsOptions options);
 
+    /**
+     * Describe streams groups in the cluster.
+     *
+     * @param groupIds The IDs of the groups to describe.
+     * @param options  The options to use when describing the groups.
+     * @return The DescribeStreamsGroupsResult.
+     */
+    DescribeStreamsGroupsResult describeStreamsGroups(Collection<String> 
groupIds,
+                                                      
DescribeStreamsGroupsOptions options);
+
+    /**
+     * Describe streams groups in the cluster, with the default options.
+     * <p>
+     * This is a convenience method for {@link 
#describeStreamsGroups(Collection, DescribeStreamsGroupsOptions)}
+     * with default options. See the overload for more details.
+     *
+     * @param groupIds The IDs of the groups to describe.
+     * @return The DescribeStreamsGroupsResult.
+     */
+    default DescribeStreamsGroupsResult 
describeStreamsGroups(Collection<String> groupIds) {
+        return describeStreamsGroups(groupIds, new 
DescribeStreamsGroupsOptions());
+    }
+
     /**
      * Describe some classic groups in the cluster.
      *
diff --git 
a/clients/src/main/java/org/apache/kafka/clients/admin/DescribeStreamsGroupsOptions.java
 
b/clients/src/main/java/org/apache/kafka/clients/admin/DescribeStreamsGroupsOptions.java
new file mode 100644
index 00000000000..88e6d097768
--- /dev/null
+++ 
b/clients/src/main/java/org/apache/kafka/clients/admin/DescribeStreamsGroupsOptions.java
@@ -0,0 +1,41 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ *    http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.kafka.clients.admin;
+
+import org.apache.kafka.common.annotation.InterfaceStability;
+
+import java.util.Collection;
+
+/**
+ * Options for {@link Admin#describeStreamsGroups(Collection, 
DescribeStreamsGroupsOptions)}.
+ * <p>
+ * The API of this class is evolving, see {@link Admin} for details.
+ */
[email protected]
+public class DescribeStreamsGroupsOptions extends 
AbstractOptions<DescribeStreamsGroupsOptions> {
+    private boolean includeAuthorizedOperations;
+
+    public DescribeStreamsGroupsOptions includeAuthorizedOperations(boolean 
includeAuthorizedOperations) {
+        this.includeAuthorizedOperations = includeAuthorizedOperations;
+        return this;
+    }
+
+    public boolean includeAuthorizedOperations() {
+        return includeAuthorizedOperations;
+    }
+}
\ No newline at end of file
diff --git 
a/clients/src/main/java/org/apache/kafka/clients/admin/DescribeStreamsGroupsResult.java
 
b/clients/src/main/java/org/apache/kafka/clients/admin/DescribeStreamsGroupsResult.java
new file mode 100644
index 00000000000..1276da2de66
--- /dev/null
+++ 
b/clients/src/main/java/org/apache/kafka/clients/admin/DescribeStreamsGroupsResult.java
@@ -0,0 +1,68 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ *    http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.kafka.clients.admin;
+
+import org.apache.kafka.common.KafkaFuture;
+import org.apache.kafka.common.annotation.InterfaceStability;
+
+import java.util.Collection;
+import java.util.HashMap;
+import java.util.Map;
+import java.util.concurrent.ExecutionException;
+
+/**
+ * The result of the {@link KafkaAdminClient#describeStreamsGroups(Collection, 
DescribeStreamsGroupsOptions)}} call.
+ * <p>
+ * The API of this class is evolving, see {@link Admin} for details.
+ */
[email protected]
+public class DescribeStreamsGroupsResult {
+
+    private final Map<String, KafkaFuture<StreamsGroupDescription>> futures;
+
+    public DescribeStreamsGroupsResult(final Map<String, 
KafkaFuture<StreamsGroupDescription>> futures) {
+        this.futures = Map.copyOf(futures);
+    }
+
+    /**
+     * Return a map from group id to futures which yield streams group 
descriptions.
+     */
+    public Map<String, KafkaFuture<StreamsGroupDescription>> describedGroups() 
{
+        return new HashMap<>(futures);
+    }
+
+    /**
+     * Return a future which yields all StreamsGroupDescription objects, if 
all the describes succeed.
+     */
+    public KafkaFuture<Map<String, StreamsGroupDescription>> all() {
+        return KafkaFuture.allOf(futures.values().toArray(new 
KafkaFuture<?>[0])).thenApply(
+            nil -> {
+                Map<String, StreamsGroupDescription> descriptions = new 
HashMap<>(futures.size());
+                futures.forEach((key, future) -> {
+                    try {
+                        descriptions.put(key, future.get());
+                    } catch (InterruptedException | ExecutionException e) {
+                        // This should be unreachable, since the 
KafkaFuture#allOf already ensured
+                        // that all of the futures completed successfully.
+                        throw new RuntimeException(e);
+                    }
+                });
+                return descriptions;
+            });
+    }
+}
diff --git 
a/clients/src/main/java/org/apache/kafka/clients/admin/ForwardingAdmin.java 
b/clients/src/main/java/org/apache/kafka/clients/admin/ForwardingAdmin.java
index 8fb550ad542..f4c7b4c4876 100644
--- a/clients/src/main/java/org/apache/kafka/clients/admin/ForwardingAdmin.java
+++ b/clients/src/main/java/org/apache/kafka/clients/admin/ForwardingAdmin.java
@@ -333,6 +333,11 @@ public class ForwardingAdmin implements Admin {
         return delegate.deleteShareGroups(groupIds, options);
     }
 
+    @Override
+    public DescribeStreamsGroupsResult 
describeStreamsGroups(Collection<String> groupIds, DescribeStreamsGroupsOptions 
options) {
+        return delegate.describeStreamsGroups(groupIds, options);
+    }
+
     @Override
     public ListGroupsResult listGroups(ListGroupsOptions options) {
         return delegate.listGroups(options);
diff --git 
a/clients/src/main/java/org/apache/kafka/clients/admin/KafkaAdminClient.java 
b/clients/src/main/java/org/apache/kafka/clients/admin/KafkaAdminClient.java
index af42f4421f4..f6f484ca7bf 100644
--- a/clients/src/main/java/org/apache/kafka/clients/admin/KafkaAdminClient.java
+++ b/clients/src/main/java/org/apache/kafka/clients/admin/KafkaAdminClient.java
@@ -55,6 +55,7 @@ import 
org.apache.kafka.clients.admin.internals.DescribeClassicGroupsHandler;
 import org.apache.kafka.clients.admin.internals.DescribeConsumerGroupsHandler;
 import org.apache.kafka.clients.admin.internals.DescribeProducersHandler;
 import org.apache.kafka.clients.admin.internals.DescribeShareGroupsHandler;
+import org.apache.kafka.clients.admin.internals.DescribeStreamsGroupsHandler;
 import org.apache.kafka.clients.admin.internals.DescribeTransactionsHandler;
 import org.apache.kafka.clients.admin.internals.FenceProducersHandler;
 import 
org.apache.kafka.clients.admin.internals.ListConsumerGroupOffsetsHandler;
@@ -3840,6 +3841,17 @@ public class KafkaAdminClient extends AdminClient {
         return new ListShareGroupOffsetsResult(future.all());
     }
 
+    @Override
+    public DescribeStreamsGroupsResult describeStreamsGroups(final 
Collection<String> groupIds,
+                                                             final 
DescribeStreamsGroupsOptions options) {
+        SimpleAdminApiFuture<CoordinatorKey, StreamsGroupDescription> future =
+            DescribeStreamsGroupsHandler.newFuture(groupIds);
+        DescribeStreamsGroupsHandler handler = new 
DescribeStreamsGroupsHandler(options.includeAuthorizedOperations(), logContext);
+        invokeDriver(handler, future, options.timeoutMs);
+        return new DescribeStreamsGroupsResult(future.all().entrySet().stream()
+            .collect(Collectors.toMap(entry -> entry.getKey().idValue, 
Map.Entry::getValue)));
+    }
+    
     @Override
     public DescribeClassicGroupsResult describeClassicGroups(final 
Collection<String> groupIds,
                                                              final 
DescribeClassicGroupsOptions options) {
diff --git 
a/clients/src/main/java/org/apache/kafka/clients/admin/StreamsGroupDescription.java
 
b/clients/src/main/java/org/apache/kafka/clients/admin/StreamsGroupDescription.java
new file mode 100644
index 00000000000..024285324a0
--- /dev/null
+++ 
b/clients/src/main/java/org/apache/kafka/clients/admin/StreamsGroupDescription.java
@@ -0,0 +1,180 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ *    http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.kafka.clients.admin;
+
+import org.apache.kafka.common.GroupState;
+import org.apache.kafka.common.Node;
+import org.apache.kafka.common.acl.AclOperation;
+import org.apache.kafka.common.annotation.InterfaceStability;
+
+import java.util.Collection;
+import java.util.Objects;
+import java.util.Set;
+import java.util.stream.Collectors;
+
+/**
+ * A detailed description of a single streams group in the cluster.
+ */
[email protected]
+public class StreamsGroupDescription {
+
+    private final String groupId;
+    private final int groupEpoch;
+    private final int targetAssignmentEpoch;
+    private final int topologyEpoch;
+    private final Collection<StreamsGroupSubtopologyDescription> subtopologies;
+    private final Collection<StreamsGroupMemberDescription> members;
+    private final GroupState groupState;
+    private final Node coordinator;
+    private final Set<AclOperation> authorizedOperations;
+
+    public StreamsGroupDescription(
+            final String groupId,
+            final int groupEpoch,
+            final int targetAssignmentEpoch,
+            final int topologyEpoch,
+            final Collection<StreamsGroupSubtopologyDescription> subtopologies,
+            final Collection<StreamsGroupMemberDescription> members,
+            final GroupState groupState,
+            final Node coordinator,
+            final Set<AclOperation> authorizedOperations
+    ) {
+        this.groupId = Objects.requireNonNull(groupId, "groupId must be 
non-null");
+        this.groupEpoch = groupEpoch;
+        this.targetAssignmentEpoch = targetAssignmentEpoch;
+        this.topologyEpoch = topologyEpoch;
+        this.subtopologies = Objects.requireNonNull(subtopologies, 
"subtopologies must be non-null");
+        this.members = Objects.requireNonNull(members, "members must be 
non-null");
+        this.groupState = Objects.requireNonNull(groupState, "groupState must 
be non-null");
+        this.coordinator = Objects.requireNonNull(coordinator, "coordinator 
must be non-null");
+        this.authorizedOperations = authorizedOperations;
+    }
+
+    /**
+     * The id of the streams group.
+     */
+    public String groupId() {
+        return groupId;
+    }
+
+    /**
+     * The epoch of the consumer group.
+     */
+    public int groupEpoch() {
+        return groupEpoch;
+    }
+
+    /**
+     * The epoch of the target assignment.
+     */
+    public int targetAssignmentEpoch() {
+        return targetAssignmentEpoch;
+    }
+
+    /**
+     * The epoch of the currently used topology.
+     */
+    public int topologyEpoch() {
+        return topologyEpoch;
+    }
+
+    /**
+     * A list of the members of the streams group.
+     */
+    public Collection<StreamsGroupMemberDescription> members() {
+        return members;
+    }
+
+    /**
+     * A list of the subtopologies in the streams group.
+     */
+    public Collection<StreamsGroupSubtopologyDescription> subtopologies() {
+        return subtopologies;
+    }
+
+    /**
+     * The state of the streams group, or UNKNOWN if the state is too new for 
us to parse.
+     */
+    public GroupState groupState() {
+        return groupState;
+    }
+
+    /**
+     * The group coordinator, or null if the coordinator is not known.
+     */
+    public Node coordinator() {
+        return coordinator;
+    }
+
+    /**
+     * authorizedOperations for this group, or null if that information is not 
known.
+     */
+    public Set<AclOperation> authorizedOperations() {
+        return authorizedOperations;
+    }
+
+    @Override
+    public boolean equals(final Object o) {
+        if (this == o) {
+            return true;
+        }
+        if (o == null || getClass() != o.getClass()) {
+            return false;
+        }
+        final StreamsGroupDescription that = (StreamsGroupDescription) o;
+        return groupEpoch == that.groupEpoch
+            && targetAssignmentEpoch == that.targetAssignmentEpoch
+            && topologyEpoch == that.topologyEpoch
+            && Objects.equals(groupId, that.groupId)
+            && Objects.equals(subtopologies, that.subtopologies)
+            && Objects.equals(members, that.members)
+            && groupState == that.groupState
+            && Objects.equals(coordinator, that.coordinator)
+            && Objects.equals(authorizedOperations, that.authorizedOperations);
+    }
+
+    @Override
+    public int hashCode() {
+        return Objects.hash(
+            groupId,
+            groupEpoch,
+            targetAssignmentEpoch,
+            topologyEpoch,
+            subtopologies,
+            members,
+            groupState,
+            coordinator,
+            authorizedOperations
+        );
+    }
+
+    @Override
+    public String toString() {
+        return "(" +
+            "groupId=" + groupId +
+            ", groupEpoch=" + groupEpoch +
+            ", targetAssignmentEpoch=" + targetAssignmentEpoch +
+            ", topologyEpoch=" + topologyEpoch +
+            ", subtopologies=" + 
subtopologies.stream().map(StreamsGroupSubtopologyDescription::toString).collect(Collectors.joining(","))
 +
+            ", members=" + 
members.stream().map(StreamsGroupMemberDescription::toString).collect(Collectors.joining(","))
 +
+            ", groupState=" + groupState +
+            ", coordinator=" + coordinator +
+            ", authorizedOperations=" + 
authorizedOperations.stream().map(AclOperation::toString).collect(Collectors.joining(","))
 +
+            ')';
+    }
+}
diff --git 
a/clients/src/main/java/org/apache/kafka/clients/admin/StreamsGroupMemberAssignment.java
 
b/clients/src/main/java/org/apache/kafka/clients/admin/StreamsGroupMemberAssignment.java
new file mode 100644
index 00000000000..276058143ff
--- /dev/null
+++ 
b/clients/src/main/java/org/apache/kafka/clients/admin/StreamsGroupMemberAssignment.java
@@ -0,0 +1,150 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ *    http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.kafka.clients.admin;
+
+import org.apache.kafka.common.annotation.InterfaceStability;
+
+import java.util.List;
+import java.util.Objects;
+import java.util.stream.Collectors;
+
+/**
+ * A description of the assignments of a specific group member.
+ */
[email protected]
+public class StreamsGroupMemberAssignment {
+
+    private final List<TaskIds> activeTasks;
+    private final List<TaskIds> standbyTasks;
+    private final List<TaskIds> warmupTasks;
+
+    public StreamsGroupMemberAssignment(
+        final List<TaskIds> activeTasks,
+        final List<TaskIds> standbyTasks,
+        final List<TaskIds> warmupTasks
+    ) {
+        this.activeTasks = activeTasks;
+        this.standbyTasks = standbyTasks;
+        this.warmupTasks = warmupTasks;
+    }
+
+    /**
+     * Active tasks for this client.
+     */
+    public List<TaskIds> activeTasks() {
+        return List.copyOf(activeTasks);
+    }
+
+    /**
+     * Standby tasks for this client.
+     */
+    public List<TaskIds> standbyTasks() {
+        return List.copyOf(standbyTasks);
+    }
+
+    /**
+     * Warmup tasks for this client.
+     */
+    public List<TaskIds> warmupTasks() {
+        return List.copyOf(warmupTasks);
+    }
+
+    @Override
+    public boolean equals(final Object o) {
+        if (this == o) {
+            return true;
+        }
+        if (o == null || getClass() != o.getClass()) {
+            return false;
+        }
+        final StreamsGroupMemberAssignment that = 
(StreamsGroupMemberAssignment) o;
+        return Objects.equals(activeTasks, that.activeTasks)
+            && Objects.equals(standbyTasks, that.standbyTasks)
+            && Objects.equals(warmupTasks, that.warmupTasks);
+    }
+
+    @Override
+    public int hashCode() {
+        return Objects.hash(
+            activeTasks,
+            standbyTasks,
+            warmupTasks
+        );
+    }
+
+    @Override
+    public String toString() {
+        return "(" +
+            "activeTasks=" + 
activeTasks.stream().map(TaskIds::toString).collect(Collectors.joining(",")) +
+            ", standbyTasks=" + 
standbyTasks.stream().map(TaskIds::toString).collect(Collectors.joining(",")) +
+            ", warmupTasks=" + 
warmupTasks.stream().map(TaskIds::toString).collect(Collectors.joining(",")) +
+            ')';
+    }
+
+    /**
+     * All tasks for one subtopology of a member.
+     */
+    public static class TaskIds {
+        private final String subtopologyId;
+        private final List<Integer> partitions;
+
+        public TaskIds(final String subtopologyId, final List<Integer> 
partitions) {
+            this.subtopologyId = Objects.requireNonNull(subtopologyId, 
"subtopologyId must be non-null");
+            this.partitions = Objects.requireNonNull(partitions, "partitions 
must be non-null");
+        }
+
+        /**
+         * The subtopology identifier.
+         */
+        public String subtopologyId() {
+            return subtopologyId;
+        }
+
+        /**
+         * The partitions of the subtopology processed by this member.
+         */
+        public List<Integer> partitions() {
+            return List.copyOf(partitions);
+        }
+
+        @Override
+        public boolean equals(final Object o) {
+            if (this == o) {
+                return true;
+            }
+            if (o == null || getClass() != o.getClass()) {
+                return false;
+            }
+            final TaskIds taskIds = (TaskIds) o;
+            return Objects.equals(subtopologyId, taskIds.subtopologyId)
+                && Objects.equals(partitions, taskIds.partitions);
+        }
+
+        @Override
+        public int hashCode() {
+            return Objects.hash(
+                subtopologyId,
+                partitions
+            );
+        }
+
+        @Override
+        public String toString() {
+            return partitions.stream().map(x -> subtopologyId + "_" + 
x).collect(Collectors.joining(","));
+        }
+    }
+}
diff --git 
a/clients/src/main/java/org/apache/kafka/clients/admin/StreamsGroupMemberDescription.java
 
b/clients/src/main/java/org/apache/kafka/clients/admin/StreamsGroupMemberDescription.java
new file mode 100644
index 00000000000..8f4fa126e13
--- /dev/null
+++ 
b/clients/src/main/java/org/apache/kafka/clients/admin/StreamsGroupMemberDescription.java
@@ -0,0 +1,371 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ *    http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.kafka.clients.admin;
+
+import org.apache.kafka.common.annotation.InterfaceStability;
+
+import java.util.List;
+import java.util.Map;
+import java.util.Objects;
+import java.util.Optional;
+import java.util.stream.Collectors;
+
+/**
+ * A detailed description of a single streams groups member in the cluster.
+ */
[email protected]
+public class StreamsGroupMemberDescription {
+
+    private final String memberId;
+    private final int memberEpoch;
+    private final Optional<String> instanceId;
+    private final Optional<String> rackId;
+    private final String clientId;
+    private final String clientHost;
+    private final int topologyEpoch;
+    private final String processId;
+    private final Optional<Endpoint> userEndpoint;
+    private final Map<String, String> clientTags;
+    private final List<TaskOffset> taskOffsets;
+    private final List<TaskOffset> taskEndOffsets;
+    private final StreamsGroupMemberAssignment assignment;
+    private final StreamsGroupMemberAssignment targetAssignment;
+    private final boolean isClassic;
+
+    @SuppressWarnings("ParameterNumber")
+    public StreamsGroupMemberDescription(
+        final String memberId,
+        final int memberEpoch,
+        final Optional<String> instanceId,
+        final Optional<String> rackId,
+        final String clientId,
+        final String clientHost,
+        final int topologyEpoch,
+        final String processId,
+        final Optional<Endpoint> userEndpoint,
+        final Map<String, String> clientTags,
+        final List<TaskOffset> taskOffsets,
+        final List<TaskOffset> taskEndOffsets,
+        final StreamsGroupMemberAssignment assignment,
+        final StreamsGroupMemberAssignment targetAssignment,
+        final boolean isClassic
+    ) {
+        this.memberId = Objects.requireNonNull(memberId);
+        this.memberEpoch = memberEpoch;
+        this.instanceId = Objects.requireNonNull(instanceId);
+        this.rackId = Objects.requireNonNull(rackId);
+        this.clientId = Objects.requireNonNull(clientId);
+        this.clientHost = Objects.requireNonNull(clientHost);
+        this.topologyEpoch = topologyEpoch;
+        this.processId = Objects.requireNonNull(processId);
+        this.userEndpoint = Objects.requireNonNull(userEndpoint);
+        this.clientTags = Objects.requireNonNull(clientTags);
+        this.taskOffsets = Objects.requireNonNull(taskOffsets);
+        this.taskEndOffsets = Objects.requireNonNull(taskEndOffsets);
+        this.assignment = Objects.requireNonNull(assignment);
+        this.targetAssignment = Objects.requireNonNull(targetAssignment);
+        this.isClassic = isClassic;
+    }
+
+    /**
+     * The id of the group member.
+     */
+    public String memberId() {
+        return memberId;
+    }
+
+    /**
+     * The epoch of the group member.
+     */
+    public int memberEpoch() {
+        return memberEpoch;
+    }
+
+    /**
+     * The id of the instance, used for static membership, if available.
+     */
+    public Optional<String> instanceId() {
+        return instanceId;
+    }
+
+    /**
+     * The rack ID of the group member.
+     */
+    public Optional<String> rackId() {
+        return rackId;
+    }
+
+    /**
+     * The client ID of the group member.
+     */
+    public String clientId() {
+        return clientId;
+    }
+
+    /**
+     * The host of the group member.
+     */
+    public String clientHost() {
+        return clientHost;
+    }
+
+    /**
+     * The epoch of the topology present on the client.
+     */
+    public int topologyEpoch() {
+        return topologyEpoch;
+    }
+
+    /**
+     * Identity of the streams instance that may have multiple clients.
+     */
+    public String processId() {
+        return processId;
+    }
+
+    /**
+     * User-defined endpoint for Interactive Queries.
+     */
+    public Optional<Endpoint> userEndpoint() {
+        return userEndpoint;
+    }
+
+    /**
+     * Used for rack-aware assignment algorithm.
+     */
+    public Map<String, String> clientTags() {
+        return Map.copyOf(clientTags);
+    }
+
+    /**
+     * Cumulative offsets for tasks.
+     */
+    public List<TaskOffset> taskOffsets() {
+        return List.copyOf(taskOffsets);
+    }
+
+    /**
+     * Cumulative task changelog end offsets for tasks.
+     */
+    public List<TaskOffset> taskEndOffsets() {
+        return List.copyOf(taskEndOffsets);
+    }
+
+    /**
+     * The current assignment.
+     */
+    public StreamsGroupMemberAssignment assignment() {
+        return assignment;
+    }
+
+    /**
+     * The target assignment.
+     */
+    public StreamsGroupMemberAssignment targetAssignment() {
+        return targetAssignment;
+    }
+
+    /**
+     * The flag indicating whether a member is classic.
+     */
+    public boolean isClassic() {
+        return isClassic;
+    }
+
+    @SuppressWarnings("CyclomaticComplexity")
+    @Override
+    public boolean equals(final Object o) {
+        if (this == o) {
+            return true;
+        }
+        if (o == null || getClass() != o.getClass()) {
+            return false;
+        }
+        final StreamsGroupMemberDescription that = 
(StreamsGroupMemberDescription) o;
+        return memberEpoch == that.memberEpoch
+            && topologyEpoch == that.topologyEpoch
+            && isClassic == that.isClassic
+            && Objects.equals(memberId, that.memberId)
+            && Objects.equals(instanceId, that.instanceId)
+            && Objects.equals(rackId, that.rackId)
+            && Objects.equals(clientId, that.clientId)
+            && Objects.equals(clientHost, that.clientHost)
+            && Objects.equals(processId, that.processId)
+            && Objects.equals(userEndpoint, that.userEndpoint)
+            && Objects.equals(clientTags, that.clientTags)
+            && Objects.equals(taskOffsets, that.taskOffsets)
+            && Objects.equals(taskEndOffsets, that.taskEndOffsets)
+            && Objects.equals(assignment, that.assignment)
+            && Objects.equals(targetAssignment, that.targetAssignment);
+    }
+
+    @Override
+    public int hashCode() {
+        return Objects.hash(
+            memberId,
+            memberEpoch,
+            instanceId,
+            rackId,
+            clientId,
+            clientHost,
+            topologyEpoch,
+            processId,
+            userEndpoint,
+            clientTags,
+            taskOffsets,
+            taskEndOffsets,
+            assignment,
+            targetAssignment,
+            isClassic
+        );
+    }
+
+    @Override
+    public String toString() {
+        return "(" +
+            "memberId=" + memberId +
+            ", memberEpoch=" + memberEpoch +
+            ", instanceId=" + instanceId.orElse("null") +
+            ", rackId=" + rackId.orElse("null") +
+            ", clientId=" + clientId +
+            ", clientHost=" + clientHost +
+            ", topologyEpoch=" + topologyEpoch +
+            ", processId=" + processId +
+            ", userEndpoint=" + 
userEndpoint.map(Endpoint::toString).orElse("null") +
+            ", clientTags=" + clientTags +
+            ", taskOffsets=" + 
taskOffsets.stream().map(TaskOffset::toString).collect(Collectors.joining(",")) 
+
+            ", taskEndOffsets=" + 
taskEndOffsets.stream().map(TaskOffset::toString).collect(Collectors.joining(","))
 +
+            ", assignment=" + assignment +
+            ", targetAssignment=" + targetAssignment +
+            ", isClassic=" + isClassic +
+            ')';
+    }
+
+    /**
+     * The user-defined endpoint for the member.
+     */
+    public static class Endpoint {
+
+        private final String host;
+        private final int port;
+
+        public Endpoint(final String host, final int port) {
+            this.host = Objects.requireNonNull(host);
+            this.port = port;
+        }
+
+        public String host() {
+            return host;
+        }
+
+        public int port() {
+            return port;
+        }
+
+        @Override
+        public boolean equals(final Object o) {
+            if (this == o) {
+                return true;
+            }
+            if (o == null || getClass() != o.getClass()) {
+                return false;
+            }
+            final Endpoint endpoint = (Endpoint) o;
+            return port == endpoint.port && Objects.equals(host, 
endpoint.host);
+        }
+
+        @Override
+        public int hashCode() {
+            return Objects.hash(host, port);
+        }
+
+        @Override
+        public String toString() {
+            return "(" +
+                "host=" + host +
+                ", port=" + port +
+                ')';
+        }
+    }
+
+    /**
+     * The cumulative offset for one task.
+     */
+    public static class TaskOffset {
+
+        private final String subtopologyId;
+        private final int partition;
+        private final long offset;
+
+        public TaskOffset(final String subtopologyId, final int partition, 
final long offset) {
+            this.subtopologyId = Objects.requireNonNull(subtopologyId);
+            this.partition = partition;
+            this.offset = offset;
+        }
+
+        /**
+         * The subtopology identifier.
+         */
+        public String subtopologyId() {
+            return subtopologyId;
+        }
+
+        /**
+         * The partition of the task.
+         */
+        public int partition() {
+            return partition;
+        }
+
+        /**
+         * The cumulative offset (sum of offsets in all input partitions).
+         */
+        public long offset() {
+            return offset;
+        }
+
+        @Override
+        public boolean equals(final Object o) {
+            if (this == o) {
+                return true;
+            }
+            if (o == null || getClass() != o.getClass()) {
+                return false;
+            }
+            final TaskOffset that = (TaskOffset) o;
+            return partition == that.partition
+                && offset == that.offset
+                && Objects.equals(subtopologyId, that.subtopologyId);
+        }
+
+        @Override
+        public int hashCode() {
+            return Objects.hash(
+                subtopologyId,
+                partition,
+                offset
+            );
+        }
+
+        @Override
+        public String toString() {
+            return subtopologyId +
+                "_" + partition +
+                "=" + offset;
+        }
+    }
+}
diff --git 
a/clients/src/main/java/org/apache/kafka/clients/admin/StreamsGroupSubtopologyDescription.java
 
b/clients/src/main/java/org/apache/kafka/clients/admin/StreamsGroupSubtopologyDescription.java
new file mode 100644
index 00000000000..e01cafc98d4
--- /dev/null
+++ 
b/clients/src/main/java/org/apache/kafka/clients/admin/StreamsGroupSubtopologyDescription.java
@@ -0,0 +1,196 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ *    http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.kafka.clients.admin;
+
+import org.apache.kafka.common.annotation.InterfaceStability;
+
+import java.util.List;
+import java.util.Map;
+import java.util.Objects;
+import java.util.stream.Collectors;
+
+/**
+ * A detailed description of a subtopology in a streams group.
+ */
[email protected]
+public class StreamsGroupSubtopologyDescription {
+
+    private final String subtopologyId;
+    private final List<String> sourceTopics;
+    private final List<String> repartitionSinkTopics;
+    private final Map<String, TopicInfo> stateChangelogTopics;
+    private final Map<String, TopicInfo> repartitionSourceTopics;
+
+    public StreamsGroupSubtopologyDescription(
+        final String subtopologyId,
+        final List<String> sourceTopics,
+        final List<String> repartitionSinkTopics,
+        final Map<String, TopicInfo> stateChangelogTopics,
+        final Map<String, TopicInfo> repartitionSourceTopics
+    ) {
+        this.subtopologyId = Objects.requireNonNull(subtopologyId, 
"subtopologyId must be non-null");
+        this.sourceTopics = Objects.requireNonNull(sourceTopics, "sourceTopics 
must be non-null");
+        this.repartitionSinkTopics = 
Objects.requireNonNull(repartitionSinkTopics, "repartitionSinkTopics must be 
non-null");
+        this.stateChangelogTopics = 
Objects.requireNonNull(stateChangelogTopics, "stateChangelogTopics must be 
non-null");
+        this.repartitionSourceTopics = 
Objects.requireNonNull(repartitionSourceTopics, "repartitionSourceTopics must 
be non-null");
+    }
+
+    /**
+     * String to uniquely identify the subtopology.
+     */
+    public String subtopologyId() {
+        return subtopologyId;
+    }
+
+    /**
+     * The topics the topology reads from.
+     */
+    public List<String> sourceTopics() {
+        return List.copyOf(sourceTopics);
+    }
+
+    /**
+     * The repartition topics the topology writes to.
+     */
+    public List<String> repartitionSinkTopics() {
+        return List.copyOf(repartitionSinkTopics);
+    }
+
+    /**
+     * The set of state changelog topics associated with this subtopology.
+     */
+    public Map<String, TopicInfo> stateChangelogTopics() {
+        return Map.copyOf(stateChangelogTopics);
+    }
+
+    /**
+     * The set of source topics that are internally created repartition topics.
+     */
+    public Map<String, TopicInfo> repartitionSourceTopics() {
+        return Map.copyOf(repartitionSourceTopics);
+    }
+
+    @Override
+    public boolean equals(final Object o) {
+        if (this == o) {
+            return true;
+        }
+        if (o == null || getClass() != o.getClass()) {
+            return false;
+        }
+        final StreamsGroupSubtopologyDescription that = 
(StreamsGroupSubtopologyDescription) o;
+        return Objects.equals(subtopologyId, that.subtopologyId)
+            && Objects.equals(sourceTopics, that.sourceTopics)
+            && Objects.equals(repartitionSinkTopics, 
that.repartitionSinkTopics)
+            && Objects.equals(stateChangelogTopics, that.stateChangelogTopics)
+            && Objects.equals(repartitionSourceTopics, 
that.repartitionSourceTopics);
+    }
+
+    @Override
+    public int hashCode() {
+        return Objects.hash(
+            subtopologyId,
+            sourceTopics,
+            repartitionSinkTopics,
+            stateChangelogTopics,
+            repartitionSourceTopics
+        );
+    }
+
+    @Override
+    public String toString() {
+        return "(" +
+            "subtopologyId='" + subtopologyId + '\'' +
+            ", sourceTopics=" + sourceTopics +
+            ", repartitionSinkTopics=" + repartitionSinkTopics +
+            ", stateChangelogTopics=" + stateChangelogTopics +
+            ", repartitionSourceTopics=" + repartitionSourceTopics +
+            ')';
+    }
+
+    /**
+     * Information about a topic. These configs reflect what is required by 
the topology, but may differ from the current state on the
+     * broker.
+     */
+    public static class TopicInfo {
+
+        private final int partitions;
+        private final int replicationFactor;
+        private final Map<String, String> topicConfigs;
+
+        public TopicInfo(final int partitions, final int replicationFactor, 
final Map<String, String> topicConfigs) {
+            this.partitions = partitions;
+            this.replicationFactor = replicationFactor;
+            this.topicConfigs = Objects.requireNonNull(topicConfigs, 
"topicConfigs must be non-null");
+        }
+
+        /**
+         * The number of partitions in the topic. Can be 0 if no specific 
number of partitions is enforced.
+         */
+        public int partitions() {
+            return partitions;
+        }
+
+        /**
+         * The replication factor of the topic. Can be 0 if the default 
replication factor is used.
+         */
+        public int replicationFactor() {
+            return replicationFactor;
+        }
+
+        /**
+         * Topic-level configurations as key-value pairs. Default 
configuration can be omitted.
+         */
+        public Map<String, String> topicConfigs() {
+            return Map.copyOf(topicConfigs);
+        }
+
+        @Override
+        public boolean equals(final Object o) {
+            if (this == o) {
+                return true;
+            }
+            if (o == null || getClass() != o.getClass()) {
+                return false;
+            }
+            final TopicInfo topicInfo = (TopicInfo) o;
+            return partitions == topicInfo.partitions
+                && replicationFactor == topicInfo.replicationFactor
+                && Objects.equals(topicConfigs, topicInfo.topicConfigs);
+        }
+
+        @Override
+        public int hashCode() {
+            return Objects.hash(
+                partitions,
+                replicationFactor,
+                topicConfigs
+            );
+        }
+
+        @Override
+        public String toString() {
+            return "TopicInfo(" +
+                "partitions=" + partitions +
+                ", replicationFactor=" + replicationFactor +
+                ", topicConfigs=" + topicConfigs.entrySet().stream().map(x -> 
x.getKey() + "=" + x.getValue())
+                .collect(Collectors.joining(",")) +
+                ')';
+        }
+    }
+
+}
diff --git 
a/clients/src/main/java/org/apache/kafka/clients/admin/internals/DescribeStreamsGroupsHandler.java
 
b/clients/src/main/java/org/apache/kafka/clients/admin/internals/DescribeStreamsGroupsHandler.java
new file mode 100644
index 00000000000..8bf793bd31c
--- /dev/null
+++ 
b/clients/src/main/java/org/apache/kafka/clients/admin/internals/DescribeStreamsGroupsHandler.java
@@ -0,0 +1,284 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ *    http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.kafka.clients.admin.internals;
+
+import org.apache.kafka.clients.admin.StreamsGroupDescription;
+import org.apache.kafka.clients.admin.StreamsGroupMemberAssignment;
+import org.apache.kafka.clients.admin.StreamsGroupMemberDescription;
+import org.apache.kafka.clients.admin.StreamsGroupSubtopologyDescription;
+import org.apache.kafka.common.GroupState;
+import org.apache.kafka.common.Node;
+import org.apache.kafka.common.acl.AclOperation;
+import org.apache.kafka.common.message.StreamsGroupDescribeRequestData;
+import org.apache.kafka.common.message.StreamsGroupDescribeResponseData;
+import org.apache.kafka.common.protocol.Errors;
+import org.apache.kafka.common.requests.AbstractResponse;
+import org.apache.kafka.common.requests.FindCoordinatorRequest.CoordinatorType;
+import org.apache.kafka.common.requests.StreamsGroupDescribeRequest;
+import org.apache.kafka.common.requests.StreamsGroupDescribeResponse;
+import org.apache.kafka.common.utils.LogContext;
+
+import org.slf4j.Logger;
+
+import java.util.ArrayList;
+import java.util.Collection;
+import java.util.Collections;
+import java.util.HashMap;
+import java.util.HashSet;
+import java.util.List;
+import java.util.Map;
+import java.util.Optional;
+import java.util.Set;
+import java.util.stream.Collectors;
+
+import static 
org.apache.kafka.clients.admin.internals.AdminUtils.validAclOperations;
+
+public class DescribeStreamsGroupsHandler extends 
AdminApiHandler.Batched<CoordinatorKey, StreamsGroupDescription> {
+
+    private final boolean includeAuthorizedOperations;
+    private final Logger log;
+    private final AdminApiLookupStrategy<CoordinatorKey> lookupStrategy;
+
+    public DescribeStreamsGroupsHandler(
+          boolean includeAuthorizedOperations,
+          LogContext logContext) {
+        this.includeAuthorizedOperations = includeAuthorizedOperations;
+        this.log = logContext.logger(DescribeStreamsGroupsHandler.class);
+        this.lookupStrategy = new CoordinatorStrategy(CoordinatorType.GROUP, 
logContext);
+    }
+
+    private static Set<CoordinatorKey> buildKeySet(Collection<String> 
groupIds) {
+        return groupIds.stream()
+            .map(CoordinatorKey::byGroupId)
+            .collect(Collectors.toSet());
+    }
+
+    public static AdminApiFuture.SimpleAdminApiFuture<CoordinatorKey, 
StreamsGroupDescription> newFuture(Collection<String> groupIds) {
+        return AdminApiFuture.forKeys(buildKeySet(groupIds));
+    }
+
+    @Override
+    public String apiName() {
+        return "describeStreamsGroups";
+    }
+
+    @Override
+    public AdminApiLookupStrategy<CoordinatorKey> lookupStrategy() {
+        return lookupStrategy;
+    }
+
+    @Override
+    public StreamsGroupDescribeRequest.Builder buildBatchedRequest(int 
coordinatorId, Set<CoordinatorKey> keys) {
+        List<String> groupIds = keys.stream().map(key -> {
+            if (key.type != CoordinatorType.GROUP) {
+                throw new IllegalArgumentException("Invalid group coordinator 
key " + key +
+                    " when building `DescribeStreamsGroups` request");
+            }
+            return key.idValue;
+        }).collect(Collectors.toList());
+        StreamsGroupDescribeRequestData data = new 
StreamsGroupDescribeRequestData()
+            .setGroupIds(groupIds)
+            .setIncludeAuthorizedOperations(includeAuthorizedOperations);
+        return new StreamsGroupDescribeRequest.Builder(data, true);
+    }
+
+    @Override
+    public ApiResult<CoordinatorKey, StreamsGroupDescription> handleResponse(
+            Node coordinator,
+            Set<CoordinatorKey> groupIds,
+            AbstractResponse abstractResponse) {
+        final StreamsGroupDescribeResponse response = 
(StreamsGroupDescribeResponse) abstractResponse;
+        final Map<CoordinatorKey, StreamsGroupDescription> completed = new 
HashMap<>();
+        final Map<CoordinatorKey, Throwable> failed = new HashMap<>();
+        final Set<CoordinatorKey> groupsToUnmap = new HashSet<>();
+
+        for (StreamsGroupDescribeResponseData.DescribedGroup describedGroup : 
response.data().groups()) {
+            CoordinatorKey groupIdKey = 
CoordinatorKey.byGroupId(describedGroup.groupId());
+            Errors error = Errors.forCode(describedGroup.errorCode());
+            if (error != Errors.NONE) {
+                handleError(groupIdKey, describedGroup, coordinator, error, 
describedGroup.errorMessage(), completed, failed, groupsToUnmap);
+                continue;
+            }
+            if (describedGroup.topology() == null) {
+                log.error("`DescribeStreamsGroups` response for group id {} is 
missing the topology information", groupIdKey.idValue);
+                failed.put(groupIdKey, new IllegalStateException("Topology 
information is missing"));
+                continue;
+            }
+
+            final Set<AclOperation> authorizedOperations = 
validAclOperations(describedGroup.authorizedOperations());
+
+            final StreamsGroupDescription streamsGroupDescription = new 
StreamsGroupDescription(
+                    describedGroup.groupId(),
+                    describedGroup.groupEpoch(),
+                    describedGroup.assignmentEpoch(),
+                    describedGroup.topology().epoch(),
+                    
convertSubtopologies(describedGroup.topology().subtopologies()),
+                    convertMembers(describedGroup.members()),
+                    GroupState.parse(describedGroup.groupState()),
+                    coordinator,
+                    authorizedOperations
+            );
+            completed.put(groupIdKey, streamsGroupDescription);
+        }
+
+        return new ApiResult<>(completed, failed, new 
ArrayList<>(groupsToUnmap));
+    }
+
+    private Collection<StreamsGroupMemberDescription> convertMembers(final 
List<StreamsGroupDescribeResponseData.Member> members) {
+        final List<StreamsGroupMemberDescription> memberDescriptions = new 
ArrayList<>(members.size());
+        members.forEach(groupMember ->
+            memberDescriptions.add(new StreamsGroupMemberDescription(
+                groupMember.memberId(),
+                groupMember.memberEpoch(),
+                Optional.ofNullable(groupMember.instanceId()),
+                Optional.ofNullable(groupMember.rackId()),
+                groupMember.clientId(),
+                groupMember.clientHost(),
+                groupMember.topologyEpoch(),
+                groupMember.processId(),
+                
Optional.ofNullable(groupMember.userEndpoint()).map(this::convertEndpoint),
+                convertClientTags(groupMember.clientTags()),
+                convertTaskOffsets(groupMember.taskOffsets()),
+                convertTaskOffsets(groupMember.taskEndOffsets()),
+                convertAssignment(groupMember.assignment()),
+                convertAssignment(groupMember.targetAssignment()),
+                groupMember.isClassic()
+            ))
+        );
+        return memberDescriptions;
+    }
+
+    private Collection<StreamsGroupSubtopologyDescription> 
convertSubtopologies(final List<StreamsGroupDescribeResponseData.Subtopology> 
subtopologies) {
+        final List<StreamsGroupSubtopologyDescription> subtopologyDescriptions 
= new ArrayList<>(subtopologies.size());
+        subtopologies.forEach(subtopology ->
+            subtopologyDescriptions.add(new StreamsGroupSubtopologyDescription(
+                subtopology.subtopologyId(),
+                subtopology.sourceTopics(),
+                subtopology.repartitionSinkTopics(),
+                convertTopicInfos(subtopology.stateChangelogTopics()),
+                convertTopicInfos(subtopology.repartitionSourceTopics())
+            ))
+        );
+        return subtopologyDescriptions;
+    }
+
+    private Map<String, StreamsGroupSubtopologyDescription.TopicInfo> 
convertTopicInfos(final List<StreamsGroupDescribeResponseData.TopicInfo> 
topicInfos) {
+        return topicInfos.stream().collect(Collectors.toMap(
+            StreamsGroupDescribeResponseData.TopicInfo::name,
+            topicInfo -> new StreamsGroupSubtopologyDescription.TopicInfo(
+                topicInfo.partitions(),
+                topicInfo.replicationFactor(),
+                topicInfo.topicConfigs().stream().collect(Collectors.toMap(
+                    StreamsGroupDescribeResponseData.KeyValue::key,
+                    StreamsGroupDescribeResponseData.KeyValue::value
+                ))
+            )
+        ));
+    }
+
+    private StreamsGroupMemberAssignment.TaskIds convertTaskIds(final 
StreamsGroupDescribeResponseData.TaskIds taskIds) {
+        return new StreamsGroupMemberAssignment.TaskIds(
+            taskIds.subtopologyId(),
+            taskIds.partitions()
+        );
+    }
+
+    private StreamsGroupMemberAssignment convertAssignment(final 
StreamsGroupDescribeResponseData.Assignment assignment) {
+        return new StreamsGroupMemberAssignment(
+            
assignment.activeTasks().stream().map(this::convertTaskIds).collect(Collectors.toList()),
+            
assignment.standbyTasks().stream().map(this::convertTaskIds).collect(Collectors.toList()),
+            
assignment.warmupTasks().stream().map(this::convertTaskIds).collect(Collectors.toList())
+        );
+    }
+
+    private List<StreamsGroupMemberDescription.TaskOffset> 
convertTaskOffsets(final List<StreamsGroupDescribeResponseData.TaskOffset> 
taskOffsets) {
+        return taskOffsets.stream().map(taskOffset ->
+            new StreamsGroupMemberDescription.TaskOffset(
+                taskOffset.subtopologyId(),
+                taskOffset.partition(),
+                taskOffset.offset()
+            )
+        ).collect(Collectors.toList());
+    }
+
+    private Map<String, String> convertClientTags(final 
List<StreamsGroupDescribeResponseData.KeyValue> keyValues) {
+        return keyValues.stream().collect(Collectors.toMap(
+            StreamsGroupDescribeResponseData.KeyValue::key,
+            StreamsGroupDescribeResponseData.KeyValue::value
+        ));
+    }
+
+    private StreamsGroupMemberDescription.Endpoint convertEndpoint(final 
StreamsGroupDescribeResponseData.Endpoint endpoint) {
+        return new StreamsGroupMemberDescription.Endpoint(endpoint.host(), 
endpoint.port());
+    }
+
+
+    private void handleError(
+            CoordinatorKey groupId,
+            StreamsGroupDescribeResponseData.DescribedGroup describedGroup,
+            Node coordinator,
+            Errors error,
+            String errorMsg,
+            Map<CoordinatorKey, StreamsGroupDescription> completed,
+            Map<CoordinatorKey, Throwable> failed,
+            Set<CoordinatorKey> groupsToUnmap) {
+        switch (error) {
+            case GROUP_AUTHORIZATION_FAILED:
+                log.debug("`DescribeStreamsGroups` request for group id {} 
failed due to error {}", groupId.idValue, error);
+                failed.put(groupId, error.exception(errorMsg));
+                break;
+
+            case COORDINATOR_LOAD_IN_PROGRESS:
+                // If the coordinator is in the middle of loading, then we 
just need to retry
+                log.debug("`DescribeStreamsGroups` request for group id {} 
failed because the coordinator " +
+                    "is still in the process of loading state. Will retry", 
groupId.idValue);
+                break;
+
+            case COORDINATOR_NOT_AVAILABLE:
+            case NOT_COORDINATOR:
+                // If the coordinator is unavailable or there was a 
coordinator change, then we unmap
+                // the key so that we retry the `FindCoordinator` request
+                log.debug("`DescribeStreamsGroups` request for group id {} 
returned error {}. " +
+                    "Will attempt to find the coordinator again and retry", 
groupId.idValue, error);
+                groupsToUnmap.add(groupId);
+                break;
+
+            case GROUP_ID_NOT_FOUND:
+                // In order to maintain compatibility with 
describeConsumerGroups, an unknown group ID is
+                // reported as a DEAD streams group, and the admin client 
operation did not fail
+                log.debug("`DescribeStreamsGroups` request for group id {} 
failed because the group does not exist. {}",
+                    groupId.idValue, errorMsg != null ? errorMsg : "");
+                final StreamsGroupDescription streamsGroupDescription =
+                    new StreamsGroupDescription(
+                        groupId.idValue,
+                        -1,
+                        -1,
+                        -1,
+                        Collections.emptySet(),
+                        Collections.emptySet(),
+                        GroupState.DEAD,
+                        coordinator,
+                        
validAclOperations(describedGroup.authorizedOperations()));
+                completed.put(groupId, streamsGroupDescription);
+                break;
+
+            default:
+                log.error("`DescribeStreamsGroups` request for group id {} 
failed due to unexpected error {}", groupId.idValue, error);
+                failed.put(groupId, error.exception(errorMsg));
+        }
+    }
+}
diff --git 
a/clients/src/test/java/org/apache/kafka/clients/admin/KafkaAdminClientTest.java
 
b/clients/src/test/java/org/apache/kafka/clients/admin/KafkaAdminClientTest.java
index 7b888e83962..17569ed3956 100644
--- 
a/clients/src/test/java/org/apache/kafka/clients/admin/KafkaAdminClientTest.java
+++ 
b/clients/src/test/java/org/apache/kafka/clients/admin/KafkaAdminClientTest.java
@@ -160,6 +160,7 @@ import 
org.apache.kafka.common.message.OffsetFetchRequestData.OffsetFetchRequest
 import org.apache.kafka.common.message.RemoveRaftVoterRequestData;
 import org.apache.kafka.common.message.RemoveRaftVoterResponseData;
 import org.apache.kafka.common.message.ShareGroupDescribeResponseData;
+import org.apache.kafka.common.message.StreamsGroupDescribeResponseData;
 import org.apache.kafka.common.message.UnregisterBrokerResponseData;
 import org.apache.kafka.common.message.WriteTxnMarkersResponseData;
 import org.apache.kafka.common.protocol.ApiKeys;
@@ -237,6 +238,7 @@ import 
org.apache.kafka.common.requests.RemoveRaftVoterRequest;
 import org.apache.kafka.common.requests.RemoveRaftVoterResponse;
 import org.apache.kafka.common.requests.RequestTestUtils;
 import org.apache.kafka.common.requests.ShareGroupDescribeResponse;
+import org.apache.kafka.common.requests.StreamsGroupDescribeResponse;
 import org.apache.kafka.common.requests.UnregisterBrokerResponse;
 import org.apache.kafka.common.requests.UpdateFeaturesRequest;
 import org.apache.kafka.common.requests.UpdateFeaturesResponse;
@@ -5762,6 +5764,233 @@ public class KafkaAdminClientTest {
         }
     }
 
+    @Test
+    public void testDescribeStreamsGroups() throws Exception {
+        try (AdminClientUnitTestEnv env = new 
AdminClientUnitTestEnv(mockCluster(1, 0))) {
+            env.kafkaClient().setNodeApiVersions(NodeApiVersions.create());
+
+            // Retriable FindCoordinatorResponse errors should be retried
+            
env.kafkaClient().prepareResponse(prepareFindCoordinatorResponse(Errors.COORDINATOR_NOT_AVAILABLE,
  Node.noNode()));
+            
env.kafkaClient().prepareResponse(prepareFindCoordinatorResponse(Errors.COORDINATOR_LOAD_IN_PROGRESS,
  Node.noNode()));
+            
env.kafkaClient().prepareResponse(prepareFindCoordinatorResponse(Errors.NONE, 
env.cluster().controller()));
+
+            StreamsGroupDescribeResponseData data = new 
StreamsGroupDescribeResponseData();
+
+            // Retriable errors should be retried
+            data.groups().add(new 
StreamsGroupDescribeResponseData.DescribedGroup()
+                .setGroupId(GROUP_ID)
+                .setErrorCode(Errors.COORDINATOR_LOAD_IN_PROGRESS.code()));
+            env.kafkaClient().prepareResponse(new 
StreamsGroupDescribeResponse(data));
+
+            // We need to return two responses here, one with NOT_COORDINATOR 
error when calling describe streams group
+            // api using coordinator that has moved. This will retry whole 
operation. So we need to again respond with a
+            // FindCoordinatorResponse.
+            //
+            // And the same reason for COORDINATOR_NOT_AVAILABLE error response
+            data = new StreamsGroupDescribeResponseData();
+            data.groups().add(new 
StreamsGroupDescribeResponseData.DescribedGroup()
+                .setGroupId(GROUP_ID)
+                .setErrorCode(Errors.NOT_COORDINATOR.code()));
+            env.kafkaClient().prepareResponse(new 
StreamsGroupDescribeResponse(data));
+            
env.kafkaClient().prepareResponse(prepareFindCoordinatorResponse(Errors.NONE, 
env.cluster().controller()));
+
+            data = new StreamsGroupDescribeResponseData();
+            data.groups().add(new 
StreamsGroupDescribeResponseData.DescribedGroup()
+                .setGroupId(GROUP_ID)
+                .setErrorCode(Errors.COORDINATOR_NOT_AVAILABLE.code()));
+            env.kafkaClient().prepareResponse(new 
StreamsGroupDescribeResponse(data));
+            
env.kafkaClient().prepareResponse(prepareFindCoordinatorResponse(Errors.NONE, 
env.cluster().controller()));
+
+            data = makeFullStreamsGroupDescribeResponse();
+
+            env.kafkaClient().prepareResponse(new 
StreamsGroupDescribeResponse(data));
+
+            final DescribeStreamsGroupsResult result = 
env.adminClient().describeStreamsGroups(singletonList(GROUP_ID));
+            final StreamsGroupDescription groupDescription = 
result.describedGroups().get(GROUP_ID).get();
+
+            final String subtopologyId = "my_subtopology";
+            StreamsGroupMemberAssignment.TaskIds expectedActiveTasks1 =
+                new StreamsGroupMemberAssignment.TaskIds(subtopologyId, 
asList(0, 1, 2));
+            StreamsGroupMemberAssignment.TaskIds expectedStandbyTasks1 =
+                new StreamsGroupMemberAssignment.TaskIds(subtopologyId, 
asList(3, 4, 5));
+            StreamsGroupMemberAssignment.TaskIds expectedWarmupTasks1 =
+                new StreamsGroupMemberAssignment.TaskIds(subtopologyId, 
asList(6, 7, 8));
+            StreamsGroupMemberAssignment.TaskIds expectedActiveTasks2 =
+                new StreamsGroupMemberAssignment.TaskIds(subtopologyId, 
asList(3, 4, 5));
+            StreamsGroupMemberAssignment.TaskIds expectedStandbyTasks2 =
+                new StreamsGroupMemberAssignment.TaskIds(subtopologyId, 
asList(6, 7, 8));
+            StreamsGroupMemberAssignment.TaskIds expectedWarmupTasks2 =
+                new StreamsGroupMemberAssignment.TaskIds(subtopologyId, 
asList(0, 1, 2));
+            StreamsGroupMemberAssignment expectedMemberAssignment = new 
StreamsGroupMemberAssignment(
+                singletonList(expectedActiveTasks1),
+                singletonList(expectedStandbyTasks1),
+                singletonList(expectedWarmupTasks1)
+            );
+            StreamsGroupMemberAssignment expectedTargetAssignment = new 
StreamsGroupMemberAssignment(
+                singletonList(expectedActiveTasks2),
+                singletonList(expectedStandbyTasks2),
+                singletonList(expectedWarmupTasks2)
+            );
+            final String instanceId = "instance-id";
+            final String rackId = "rack-id";
+            StreamsGroupMemberDescription expectedMemberOne = new 
StreamsGroupMemberDescription(
+                "0",
+                1,
+                Optional.of(instanceId),
+                Optional.of(rackId),
+                "clientId0",
+                "clientHost",
+                0,
+                "processId",
+                Optional.of(new 
StreamsGroupMemberDescription.Endpoint("localhost", 8080)),
+                Collections.singletonMap("key", "value"),
+                Collections.singletonList(new 
StreamsGroupMemberDescription.TaskOffset(subtopologyId, 0, 0)),
+                Collections.singletonList(new 
StreamsGroupMemberDescription.TaskOffset(subtopologyId, 0, 1)),
+                expectedMemberAssignment,
+                expectedTargetAssignment,
+                true
+            );
+
+            StreamsGroupMemberDescription expectedMemberTwo = new 
StreamsGroupMemberDescription(
+                "1",
+                2,
+                Optional.empty(),
+                Optional.empty(),
+                "clientId1",
+                "clientHost",
+                1,
+                "processId2",
+                Optional.empty(),
+                Collections.emptyMap(),
+                Collections.emptyList(),
+                Collections.emptyList(),
+                new StreamsGroupMemberAssignment(Collections.emptyList(), 
Collections.emptyList(), Collections.emptyList()),
+                new StreamsGroupMemberAssignment(Collections.emptyList(), 
Collections.emptyList(), Collections.emptyList()),
+                false
+            );
+
+            StreamsGroupSubtopologyDescription expectedSubtopologyDescription 
= new StreamsGroupSubtopologyDescription(
+                subtopologyId,
+                Collections.singletonList("my_source_topic"),
+                Collections.singletonList("my_repartition_sink_topic"),
+                Collections.singletonMap(
+                    "my_changelog_topic",
+                    new StreamsGroupSubtopologyDescription.TopicInfo(
+                        0,
+                        (short) 3,
+                        Collections.singletonMap("key1", "value1")
+                    )
+                ),
+                Collections.singletonMap(
+                    "my_repartition_topic",
+                    new StreamsGroupSubtopologyDescription.TopicInfo(
+                        99,
+                        (short) 0,
+                        Collections.emptyMap()
+                    )
+                )
+            );
+
+            assertEquals(1, result.describedGroups().size());
+            assertEquals(GROUP_ID, groupDescription.groupId());
+            assertEquals(2, groupDescription.members().size());
+            Iterator<StreamsGroupMemberDescription> members = 
groupDescription.members().iterator();
+            assertEquals(expectedMemberOne, members.next());
+            assertEquals(expectedMemberTwo, members.next());
+            assertEquals(1, groupDescription.subtopologies().size());
+            assertEquals(expectedSubtopologyDescription, 
groupDescription.subtopologies().iterator().next());
+            assertEquals(2, groupDescription.groupEpoch());
+            assertEquals(1, groupDescription.targetAssignmentEpoch());
+
+        }
+    }
+
+    @Test
+    public void testDescribeStreamsGroupsWithAuthorizedOperationsOmitted() 
throws Exception {
+        try (AdminClientUnitTestEnv env = new 
AdminClientUnitTestEnv(mockCluster(1, 0))) {
+            env.kafkaClient().setNodeApiVersions(NodeApiVersions.create());
+
+            env.kafkaClient().prepareResponse(
+                prepareFindCoordinatorResponse(Errors.NONE, 
env.cluster().controller()));
+
+            StreamsGroupDescribeResponseData data = 
makeFullStreamsGroupDescribeResponse();
+
+            data.groups().iterator().next()
+                
.setAuthorizedOperations(MetadataResponse.AUTHORIZED_OPERATIONS_OMITTED);
+
+            env.kafkaClient().prepareResponse(new 
StreamsGroupDescribeResponse(data));
+
+            final DescribeStreamsGroupsResult result = 
env.adminClient().describeStreamsGroups(singletonList(GROUP_ID));
+            final StreamsGroupDescription groupDescription = 
result.describedGroups().get(GROUP_ID).get();
+
+            assertNull(groupDescription.authorizedOperations());
+        }
+    }
+
+    @Test
+    public void testDescribeMultipleStreamsGroups() {
+        try (AdminClientUnitTestEnv env = new 
AdminClientUnitTestEnv(mockCluster(1, 0))) {
+            env.kafkaClient().setNodeApiVersions(NodeApiVersions.create());
+
+            
env.kafkaClient().prepareResponse(prepareFindCoordinatorResponse(Errors.NONE, 
env.cluster().controller()));
+
+            StreamsGroupDescribeResponseData.TaskIds activeTasks = new 
StreamsGroupDescribeResponseData.TaskIds()
+                .setSubtopologyId("my_subtopology")
+                .setPartitions(asList(0, 1, 2));
+            StreamsGroupDescribeResponseData.TaskIds standbyTasks = new 
StreamsGroupDescribeResponseData.TaskIds()
+                .setSubtopologyId("my_subtopology")
+                .setPartitions(asList(3, 4, 5));
+            StreamsGroupDescribeResponseData.TaskIds warmupTasks = new 
StreamsGroupDescribeResponseData.TaskIds()
+                .setSubtopologyId("my_subtopology")
+                .setPartitions(asList(6, 7, 8));
+            final StreamsGroupDescribeResponseData.Assignment memberAssignment 
= new StreamsGroupDescribeResponseData.Assignment()
+                .setActiveTasks(singletonList(activeTasks))
+                .setStandbyTasks(singletonList(standbyTasks))
+                .setWarmupTasks(singletonList(warmupTasks));
+            StreamsGroupDescribeResponseData group0Data = new 
StreamsGroupDescribeResponseData();
+            group0Data.groups().add(new 
StreamsGroupDescribeResponseData.DescribedGroup()
+                .setGroupId(GROUP_ID)
+                .setGroupState(GroupState.STABLE.toString())
+                .setMembers(asList(
+                    new StreamsGroupDescribeResponseData.Member()
+                        .setMemberId("0")
+                        .setClientId("clientId0")
+                        .setClientHost("clientHost")
+                        .setAssignment(memberAssignment),
+                    new StreamsGroupDescribeResponseData.Member()
+                        .setMemberId("1")
+                        .setClientId("clientId1")
+                        .setClientHost("clientHost")
+                        .setAssignment(memberAssignment))));
+
+            StreamsGroupDescribeResponseData group1Data = new 
StreamsGroupDescribeResponseData();
+            group1Data.groups().add(new 
StreamsGroupDescribeResponseData.DescribedGroup()
+                .setGroupId("group-1")
+                .setGroupState(GroupState.STABLE.toString())
+                .setMembers(asList(
+                    new StreamsGroupDescribeResponseData.Member()
+                        .setMemberId("0")
+                        .setClientId("clientId0")
+                        .setClientHost("clientHost")
+                        .setAssignment(memberAssignment),
+                    new StreamsGroupDescribeResponseData.Member()
+                        .setMemberId("1")
+                        .setClientId("clientId1")
+                        .setClientHost("clientHost")
+                        .setAssignment(memberAssignment))));
+
+            env.kafkaClient().prepareResponse(new 
StreamsGroupDescribeResponse(group0Data));
+            env.kafkaClient().prepareResponse(new 
StreamsGroupDescribeResponse(group1Data));
+
+            Collection<String> groups = new HashSet<>();
+            groups.add(GROUP_ID);
+            groups.add("group-1");
+            final DescribeStreamsGroupsResult result = 
env.adminClient().describeStreamsGroups(groups);
+            assertEquals(2, result.describedGroups().size());
+            assertEquals(groups, result.describedGroups().keySet());
+        }
+    }
+
     @Test
     public void testDescribeShareGroups() throws Exception {
         try (AdminClientUnitTestEnv env = new 
AdminClientUnitTestEnv(mockCluster(1, 0))) {
@@ -10281,4 +10510,116 @@ public class KafkaAdminClientTest {
             assertNull(result.partitionResult(barPartition0).get());
         }
     }
+
+    private static StreamsGroupDescribeResponseData 
makeFullStreamsGroupDescribeResponse() {
+        StreamsGroupDescribeResponseData data;
+        StreamsGroupDescribeResponseData.TaskIds activeTasks1 = new 
StreamsGroupDescribeResponseData.TaskIds()
+            .setSubtopologyId("my_subtopology")
+            .setPartitions(asList(0, 1, 2));
+        StreamsGroupDescribeResponseData.TaskIds standbyTasks1 = new 
StreamsGroupDescribeResponseData.TaskIds()
+            .setSubtopologyId("my_subtopology")
+            .setPartitions(asList(3, 4, 5));
+        StreamsGroupDescribeResponseData.TaskIds warmupTasks1 = new 
StreamsGroupDescribeResponseData.TaskIds()
+            .setSubtopologyId("my_subtopology")
+            .setPartitions(asList(6, 7, 8));
+        StreamsGroupDescribeResponseData.TaskIds activeTasks2 = new 
StreamsGroupDescribeResponseData.TaskIds()
+            .setSubtopologyId("my_subtopology")
+            .setPartitions(asList(3, 4, 5));
+        StreamsGroupDescribeResponseData.TaskIds standbyTasks2 = new 
StreamsGroupDescribeResponseData.TaskIds()
+            .setSubtopologyId("my_subtopology")
+            .setPartitions(asList(6, 7, 8));
+        StreamsGroupDescribeResponseData.TaskIds warmupTasks2 = new 
StreamsGroupDescribeResponseData.TaskIds()
+            .setSubtopologyId("my_subtopology")
+            .setPartitions(asList(0, 1, 2));
+        StreamsGroupDescribeResponseData.Assignment memberAssignment = new 
StreamsGroupDescribeResponseData.Assignment()
+            .setActiveTasks(singletonList(activeTasks1))
+            .setStandbyTasks(singletonList(standbyTasks1))
+            .setWarmupTasks(singletonList(warmupTasks1));
+        StreamsGroupDescribeResponseData.Assignment targetAssignment = new 
StreamsGroupDescribeResponseData.Assignment()
+            .setActiveTasks(singletonList(activeTasks2))
+            .setStandbyTasks(singletonList(standbyTasks2))
+            .setWarmupTasks(singletonList(warmupTasks2));
+        StreamsGroupDescribeResponseData.Member memberOne = new 
StreamsGroupDescribeResponseData.Member()
+            .setMemberId("0")
+            .setMemberEpoch(1)
+            .setInstanceId("instance-id")
+            .setRackId("rack-id")
+            .setClientId("clientId0")
+            .setClientHost("clientHost")
+            .setTopologyEpoch(0)
+            .setProcessId("processId")
+            .setUserEndpoint(new StreamsGroupDescribeResponseData.Endpoint()
+                .setHost("localhost")
+                .setPort(8080)
+            )
+            .setClientTags(Collections.singletonList(new 
StreamsGroupDescribeResponseData.KeyValue()
+                .setKey("key")
+                .setValue("value")
+            ))
+            .setTaskOffsets(Collections.singletonList(new 
StreamsGroupDescribeResponseData.TaskOffset()
+                .setSubtopologyId("my_subtopology")
+                .setPartition(0)
+                .setOffset(0)
+            ))
+            .setTaskEndOffsets(Collections.singletonList(new 
StreamsGroupDescribeResponseData.TaskOffset()
+                .setSubtopologyId("my_subtopology")
+                .setPartition(0)
+                .setOffset(1)
+            ))
+            .setAssignment(memberAssignment)
+            .setTargetAssignment(targetAssignment)
+            .setIsClassic(true);
+
+        StreamsGroupDescribeResponseData.Member memberTwo = new 
StreamsGroupDescribeResponseData.Member()
+            .setMemberId("1")
+            .setMemberEpoch(2)
+            .setInstanceId(null)
+            .setRackId(null)
+            .setClientId("clientId1")
+            .setClientHost("clientHost")
+            .setTopologyEpoch(1)
+            .setProcessId("processId2")
+            .setUserEndpoint(null)
+            .setClientTags(Collections.emptyList())
+            .setTaskOffsets(Collections.emptyList())
+            .setTaskEndOffsets(Collections.emptyList())
+            .setAssignment(new StreamsGroupDescribeResponseData.Assignment())
+            .setTargetAssignment(new 
StreamsGroupDescribeResponseData.Assignment())
+            .setIsClassic(false);
+
+        StreamsGroupDescribeResponseData.Subtopology subtopologyDescription = 
new StreamsGroupDescribeResponseData.Subtopology()
+            .setSubtopologyId("my_subtopology")
+            .setSourceTopics(Collections.singletonList("my_source_topic"))
+            
.setRepartitionSinkTopics(Collections.singletonList("my_repartition_sink_topic"))
+            .setStateChangelogTopics(Collections.singletonList(
+                new StreamsGroupDescribeResponseData.TopicInfo()
+                    .setName("my_changelog_topic")
+                    .setPartitions(0)
+                    .setReplicationFactor((short) 3)
+                    .setTopicConfigs(Collections.singletonList(new 
StreamsGroupDescribeResponseData.KeyValue()
+                        .setKey("key1")
+                        .setValue("value1")
+                    ))
+            ))
+            .setRepartitionSourceTopics(Collections.singletonList(
+                new StreamsGroupDescribeResponseData.TopicInfo()
+                    .setName("my_repartition_topic")
+                    .setPartitions(99)
+                    .setReplicationFactor((short) 0)
+                    .setTopicConfigs(Collections.emptyList())
+            ));
+
+        data = new StreamsGroupDescribeResponseData();
+        data.groups().add(new StreamsGroupDescribeResponseData.DescribedGroup()
+            .setGroupId(GROUP_ID)
+            .setGroupState(GroupState.STABLE.toString())
+            .setMembers(asList(memberOne, memberTwo))
+            .setTopology(new StreamsGroupDescribeResponseData.Topology()
+                .setEpoch(1)
+                
.setSubtopologies(Collections.singletonList(subtopologyDescription))
+            )
+            .setGroupEpoch(2)
+            .setAssignmentEpoch(1));
+        return data;
+    }
 }
diff --git 
a/clients/src/test/java/org/apache/kafka/clients/admin/MockAdminClient.java 
b/clients/src/test/java/org/apache/kafka/clients/admin/MockAdminClient.java
index b61915e4a8d..9bd566ac11c 100644
--- a/clients/src/test/java/org/apache/kafka/clients/admin/MockAdminClient.java
+++ b/clients/src/test/java/org/apache/kafka/clients/admin/MockAdminClient.java
@@ -1429,6 +1429,11 @@ public class MockAdminClient extends AdminClient {
         throw new UnsupportedOperationException("Not implemented yet");
     }
 
+    @Override
+    public synchronized DescribeStreamsGroupsResult 
describeStreamsGroups(Collection<String> groupIds, DescribeStreamsGroupsOptions 
options) {
+        throw new UnsupportedOperationException("Not implemented yet");
+    }
+    
     @Override
     public synchronized DescribeClassicGroupsResult 
describeClassicGroups(Collection<String> groupIds, DescribeClassicGroupsOptions 
options) {
         throw new UnsupportedOperationException("Not implemented yet");
diff --git 
a/streams/integration-tests/src/test/java/org/apache/kafka/streams/integration/TestingMetricsInterceptingAdminClient.java
 
b/streams/integration-tests/src/test/java/org/apache/kafka/streams/integration/TestingMetricsInterceptingAdminClient.java
index ede78ca71ab..60a042aa28d 100644
--- 
a/streams/integration-tests/src/test/java/org/apache/kafka/streams/integration/TestingMetricsInterceptingAdminClient.java
+++ 
b/streams/integration-tests/src/test/java/org/apache/kafka/streams/integration/TestingMetricsInterceptingAdminClient.java
@@ -91,6 +91,8 @@ import 
org.apache.kafka.clients.admin.DescribeReplicaLogDirsOptions;
 import org.apache.kafka.clients.admin.DescribeReplicaLogDirsResult;
 import org.apache.kafka.clients.admin.DescribeShareGroupsOptions;
 import org.apache.kafka.clients.admin.DescribeShareGroupsResult;
+import org.apache.kafka.clients.admin.DescribeStreamsGroupsOptions;
+import org.apache.kafka.clients.admin.DescribeStreamsGroupsResult;
 import org.apache.kafka.clients.admin.DescribeTopicsOptions;
 import org.apache.kafka.clients.admin.DescribeTopicsResult;
 import org.apache.kafka.clients.admin.DescribeTransactionsOptions;
@@ -448,6 +450,11 @@ public class TestingMetricsInterceptingAdminClient extends 
AdminClient {
     public DescribeShareGroupsResult describeShareGroups(final 
Collection<String> groupIds, final DescribeShareGroupsOptions options) {
         return adminDelegate.describeShareGroups(groupIds, options);
     }
+    
+    @Override
+    public DescribeStreamsGroupsResult describeStreamsGroups(final 
Collection<String> groupIds, final DescribeStreamsGroupsOptions options) {
+        return adminDelegate.describeStreamsGroups(groupIds, options);
+    }
 
     @Override
     public AlterShareGroupOffsetsResult alterShareGroupOffsets(final String 
groupId, final Map<TopicPartition, Long> offsets, final 
AlterShareGroupOffsetsOptions options) {

Reply via email to