This is an automated email from the ASF dual-hosted git repository.

manikumar pushed a commit to branch trunk
in repository https://gitbox.apache.org/repos/asf/kafka.git


The following commit(s) were added to refs/heads/trunk by this push:
     new b9099301fbd KAFKA-16716: Add admin list and describe share groups 
(#16827)
b9099301fbd is described below

commit b9099301fbd0a5b35e46b4f9ac026263c753cce6
Author: Andrew Schofield <[email protected]>
AuthorDate: Wed Aug 7 20:16:25 2024 +0100

    KAFKA-16716: Add admin list and describe share groups (#16827)
    
    Adds the methods to the admin client for listing and describing share 
groups.
    There are some unit tests, but integration tests will be in a follow-on PR.
    
    Reviewers:  Manikumar Reddy <[email protected]>
---
 .../java/org/apache/kafka/clients/admin/Admin.java |  51 ++-
 .../clients/admin/DescribeShareGroupsOptions.java  |  41 +++
 .../clients/admin/DescribeShareGroupsResult.java   |  68 ++++
 .../kafka/clients/admin/ForwardingAdmin.java       |  10 +
 .../kafka/clients/admin/KafkaAdminClient.java      | 142 ++++++++
 .../clients/admin/ListShareGroupsOptions.java      |  51 +++
 .../kafka/clients/admin/ListShareGroupsResult.java |  84 +++++
 .../kafka/clients/admin/ShareGroupDescription.java | 124 +++++++
 .../kafka/clients/admin/ShareGroupListing.java     |  97 ++++++
 .../internals/DescribeShareGroupsHandler.java      | 198 +++++++++++
 .../java/org/apache/kafka/common/GroupType.java    |   3 +-
 .../kafka/clients/admin/KafkaAdminClientTest.java  | 363 +++++++++++++++++++++
 .../kafka/clients/admin/MockAdminClient.java       |  10 +
 13 files changed, 1237 insertions(+), 5 deletions(-)

diff --git a/clients/src/main/java/org/apache/kafka/clients/admin/Admin.java 
b/clients/src/main/java/org/apache/kafka/clients/admin/Admin.java
index 8d62069b279..71166ed0872 100644
--- a/clients/src/main/java/org/apache/kafka/clients/admin/Admin.java
+++ b/clients/src/main/java/org/apache/kafka/clients/admin/Admin.java
@@ -875,23 +875,23 @@ public interface Admin extends AutoCloseable {
     DescribeDelegationTokenResult 
describeDelegationToken(DescribeDelegationTokenOptions options);
 
     /**
-     * Describe some group IDs in the cluster.
+     * Describe some consumer groups in the cluster.
      *
      * @param groupIds The IDs of the groups to describe.
      * @param options  The options to use when describing the groups.
-     * @return The DescribeConsumerGroupResult.
+     * @return The DescribeConsumerGroupsResult.
      */
     DescribeConsumerGroupsResult describeConsumerGroups(Collection<String> 
groupIds,
                                                         
DescribeConsumerGroupsOptions options);
 
     /**
-     * Describe some group IDs in the cluster, with the default options.
+     * Describe some consumer groups in the cluster, with the default options.
      * <p>
      * This is a convenience method for {@link 
#describeConsumerGroups(Collection, DescribeConsumerGroupsOptions)}
      * with default options. See the overload for more details.
      *
      * @param groupIds The IDs of the groups to describe.
-     * @return The DescribeConsumerGroupResult.
+     * @return The DescribeConsumerGroupsResult.
      */
     default DescribeConsumerGroupsResult 
describeConsumerGroups(Collection<String> groupIds) {
         return describeConsumerGroups(groupIds, new 
DescribeConsumerGroupsOptions());
@@ -1767,6 +1767,49 @@ public interface Admin extends AutoCloseable {
         RemoveRaftVoterOptions options
     );
 
+    /**
+     * Describe some share groups in the cluster.
+     *
+     * @param groupIds The IDs of the groups to describe.
+     * @param options  The options to use when describing the groups.
+     * @return The DescribeShareGroupsResult.
+     */
+    DescribeShareGroupsResult describeShareGroups(Collection<String> groupIds,
+                                                  DescribeShareGroupsOptions 
options);
+
+    /**
+     * Describe some share groups in the cluster, with the default options.
+     * <p>
+     * This is a convenience method for {@link 
#describeShareGroups(Collection, DescribeShareGroupsOptions)}
+     * with default options. See the overload for more details.
+     *
+     * @param groupIds The IDs of the groups to describe.
+     * @return The DescribeShareGroupsResult.
+     */
+    default DescribeShareGroupsResult describeShareGroups(Collection<String> 
groupIds) {
+        return describeShareGroups(groupIds, new DescribeShareGroupsOptions());
+    }
+
+    /**
+     * List the share groups available in the cluster.
+     *
+     * @param options The options to use when listing the share groups.
+     * @return The ListShareGroupsResult.
+     */
+    ListShareGroupsResult listShareGroups(ListShareGroupsOptions options);
+
+    /**
+     * List the share groups available in the cluster with the default options.
+     * <p>
+     * This is a convenience method for {@link 
#listShareGroups(ListShareGroupsOptions)} with default options.
+     * See the overload for more details.
+     *
+     * @return The ListShareGroupsResult.
+     */
+    default ListShareGroupsResult listShareGroups() {
+        return listShareGroups(new ListShareGroupsOptions());
+    }
+
     /**
      * Get the metrics kept by the adminClient
      */
diff --git 
a/clients/src/main/java/org/apache/kafka/clients/admin/DescribeShareGroupsOptions.java
 
b/clients/src/main/java/org/apache/kafka/clients/admin/DescribeShareGroupsOptions.java
new file mode 100644
index 00000000000..2ac37bafed1
--- /dev/null
+++ 
b/clients/src/main/java/org/apache/kafka/clients/admin/DescribeShareGroupsOptions.java
@@ -0,0 +1,41 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ *    http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.kafka.clients.admin;
+
+import org.apache.kafka.common.annotation.InterfaceStability;
+
+import java.util.Collection;
+
+/**
+ * Options for {@link Admin#describeShareGroups(Collection, 
DescribeShareGroupsOptions)}.
+ * <p>
+ * The API of this class is evolving, see {@link Admin} for details.
+ */
[email protected]
+public class DescribeShareGroupsOptions extends 
AbstractOptions<DescribeShareGroupsOptions> {
+    private boolean includeAuthorizedOperations;
+
+    public DescribeShareGroupsOptions includeAuthorizedOperations(boolean 
includeAuthorizedOperations) {
+        this.includeAuthorizedOperations = includeAuthorizedOperations;
+        return this;
+    }
+
+    public boolean includeAuthorizedOperations() {
+        return includeAuthorizedOperations;
+    }
+}
\ No newline at end of file
diff --git 
a/clients/src/main/java/org/apache/kafka/clients/admin/DescribeShareGroupsResult.java
 
b/clients/src/main/java/org/apache/kafka/clients/admin/DescribeShareGroupsResult.java
new file mode 100644
index 00000000000..0536b9e3f9d
--- /dev/null
+++ 
b/clients/src/main/java/org/apache/kafka/clients/admin/DescribeShareGroupsResult.java
@@ -0,0 +1,68 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ *    http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.kafka.clients.admin;
+
+import org.apache.kafka.common.KafkaFuture;
+import org.apache.kafka.common.annotation.InterfaceStability;
+
+import java.util.Collection;
+import java.util.HashMap;
+import java.util.Map;
+import java.util.concurrent.ExecutionException;
+
+/**
+ * The result of the {@link KafkaAdminClient#describeShareGroups(Collection, 
DescribeShareGroupsOptions)}} call.
+ * <p>
+ * The API of this class is evolving, see {@link Admin} for details.
+ */
[email protected]
+public class DescribeShareGroupsResult {
+
+    private final Map<String, KafkaFuture<ShareGroupDescription>> futures;
+
+    public DescribeShareGroupsResult(final Map<String, 
KafkaFuture<ShareGroupDescription>> futures) {
+        this.futures = futures;
+    }
+
+    /**
+     * Return a map from group id to futures which yield share group 
descriptions.
+     */
+    public Map<String, KafkaFuture<ShareGroupDescription>> describedGroups() {
+        return new HashMap<>(futures);
+    }
+
+    /**
+     * Return a future which yields all ShareGroupDescription objects, if all 
the describes succeed.
+     */
+    public KafkaFuture<Map<String, ShareGroupDescription>> all() {
+        return KafkaFuture.allOf(futures.values().toArray(new 
KafkaFuture[0])).thenApply(
+            nil -> {
+                Map<String, ShareGroupDescription> descriptions = new 
HashMap<>(futures.size());
+                futures.forEach((key, future) -> {
+                    try {
+                        descriptions.put(key, future.get());
+                    } catch (InterruptedException | ExecutionException e) {
+                        // This should be unreachable, since the 
KafkaFuture#allOf already ensured
+                        // that all of the futures completed successfully.
+                        throw new RuntimeException(e);
+                    }
+                });
+                return descriptions;
+            });
+    }
+}
diff --git 
a/clients/src/main/java/org/apache/kafka/clients/admin/ForwardingAdmin.java 
b/clients/src/main/java/org/apache/kafka/clients/admin/ForwardingAdmin.java
index ad15c22498f..1f842444802 100644
--- a/clients/src/main/java/org/apache/kafka/clients/admin/ForwardingAdmin.java
+++ b/clients/src/main/java/org/apache/kafka/clients/admin/ForwardingAdmin.java
@@ -298,6 +298,16 @@ public class ForwardingAdmin implements Admin {
         return delegate.removeRaftVoter(voterId, voterDirectoryId, options);
     }
 
+    @Override
+    public DescribeShareGroupsResult describeShareGroups(Collection<String> 
groupIds, DescribeShareGroupsOptions options) {
+        return delegate.describeShareGroups(groupIds, options);
+    }
+
+    @Override
+    public ListShareGroupsResult listShareGroups(ListShareGroupsOptions 
options) {
+        return delegate.listShareGroups(options);
+    }
+
     @Override
     public Map<MetricName, ? extends Metric> metrics() {
         return delegate.metrics();
diff --git 
a/clients/src/main/java/org/apache/kafka/clients/admin/KafkaAdminClient.java 
b/clients/src/main/java/org/apache/kafka/clients/admin/KafkaAdminClient.java
index 3672b61647d..d6dea2c57a0 100644
--- a/clients/src/main/java/org/apache/kafka/clients/admin/KafkaAdminClient.java
+++ b/clients/src/main/java/org/apache/kafka/clients/admin/KafkaAdminClient.java
@@ -50,6 +50,7 @@ import 
org.apache.kafka.clients.admin.internals.DeleteConsumerGroupsHandler;
 import org.apache.kafka.clients.admin.internals.DeleteRecordsHandler;
 import org.apache.kafka.clients.admin.internals.DescribeConsumerGroupsHandler;
 import org.apache.kafka.clients.admin.internals.DescribeProducersHandler;
+import org.apache.kafka.clients.admin.internals.DescribeShareGroupsHandler;
 import org.apache.kafka.clients.admin.internals.DescribeTransactionsHandler;
 import org.apache.kafka.clients.admin.internals.FenceProducersHandler;
 import 
org.apache.kafka.clients.admin.internals.ListConsumerGroupOffsetsHandler;
@@ -68,6 +69,7 @@ import org.apache.kafka.common.Metric;
 import org.apache.kafka.common.MetricName;
 import org.apache.kafka.common.Node;
 import org.apache.kafka.common.PartitionInfo;
+import org.apache.kafka.common.ShareGroupState;
 import org.apache.kafka.common.TopicCollection;
 import org.apache.kafka.common.TopicCollection.TopicIdCollection;
 import org.apache.kafka.common.TopicCollection.TopicNameCollection;
@@ -3671,6 +3673,146 @@ public class KafkaAdminClient extends AdminClient {
         return new 
DeleteConsumerGroupOffsetsResult(future.get(CoordinatorKey.byGroupId(groupId)), 
partitions);
     }
 
+    private static final class ListShareGroupsResults {
+        private final List<Throwable> errors;
+        private final HashMap<String, ShareGroupListing> listings;
+        private final HashSet<Node> remaining;
+        private final KafkaFutureImpl<Collection<Object>> future;
+
+        ListShareGroupsResults(Collection<Node> leaders,
+                               KafkaFutureImpl<Collection<Object>> future) {
+            this.errors = new ArrayList<>();
+            this.listings = new HashMap<>();
+            this.remaining = new HashSet<>(leaders);
+            this.future = future;
+            tryComplete();
+        }
+
+        synchronized void addError(Throwable throwable, Node node) {
+            ApiError error = ApiError.fromThrowable(throwable);
+            if (error.message() == null || error.message().isEmpty()) {
+                errors.add(error.error().exception("Error listing groups on " 
+ node));
+            } else {
+                errors.add(error.error().exception("Error listing groups on " 
+ node + ": " + error.message()));
+            }
+        }
+
+        synchronized void addListing(ShareGroupListing listing) {
+            listings.put(listing.groupId(), listing);
+        }
+
+        synchronized void tryComplete(Node leader) {
+            remaining.remove(leader);
+            tryComplete();
+        }
+
+        private synchronized void tryComplete() {
+            if (remaining.isEmpty()) {
+                ArrayList<Object> results = new ArrayList<>(listings.values());
+                results.addAll(errors);
+                future.complete(results);
+            }
+        }
+    }
+
+    @Override
+    public DescribeShareGroupsResult describeShareGroups(final 
Collection<String> groupIds,
+                                                         final 
DescribeShareGroupsOptions options) {
+        SimpleAdminApiFuture<CoordinatorKey, ShareGroupDescription> future =
+                DescribeShareGroupsHandler.newFuture(groupIds);
+        DescribeShareGroupsHandler handler = new 
DescribeShareGroupsHandler(options.includeAuthorizedOperations(), logContext);
+        invokeDriver(handler, future, options.timeoutMs);
+        return new DescribeShareGroupsResult(future.all().entrySet().stream()
+                .collect(Collectors.toMap(entry -> entry.getKey().idValue, 
Map.Entry::getValue)));
+    }
+
+    @Override
+    public ListShareGroupsResult listShareGroups(ListShareGroupsOptions 
options) {
+        final KafkaFutureImpl<Collection<Object>> all = new 
KafkaFutureImpl<>();
+        final long nowMetadata = time.milliseconds();
+        final long deadline = calcDeadlineMs(nowMetadata, options.timeoutMs());
+        runnable.call(new Call("findAllBrokers", deadline, new 
LeastLoadedNodeProvider()) {
+            @Override
+            MetadataRequest.Builder createRequest(int timeoutMs) {
+                return new MetadataRequest.Builder(new MetadataRequestData()
+                        .setTopics(Collections.emptyList())
+                        .setAllowAutoTopicCreation(true));
+            }
+
+            @Override
+            void handleResponse(AbstractResponse abstractResponse) {
+                MetadataResponse metadataResponse = (MetadataResponse) 
abstractResponse;
+                Collection<Node> nodes = metadataResponse.brokers();
+                if (nodes.isEmpty())
+                    throw new StaleMetadataException("Metadata fetch failed 
due to missing broker list");
+
+                HashSet<Node> allNodes = new HashSet<>(nodes);
+                final ListShareGroupsResults results = new 
ListShareGroupsResults(allNodes, all);
+
+                for (final Node node : allNodes) {
+                    final long nowList = time.milliseconds();
+                    runnable.call(new Call("listShareGroups", deadline, new 
ConstantNodeIdProvider(node.id())) {
+                        @Override
+                        ListGroupsRequest.Builder createRequest(int timeoutMs) 
{
+                            List<String> states = options.states()
+                                    .stream()
+                                    .map(ShareGroupState::toString)
+                                    .collect(Collectors.toList());
+                            List<String> types = 
Collections.singletonList(GroupType.SHARE.toString());
+                            return new ListGroupsRequest.Builder(new 
ListGroupsRequestData()
+                                    .setStatesFilter(states)
+                                    .setTypesFilter(types)
+                            );
+                        }
+
+                        private void 
maybeAddShareGroup(ListGroupsResponseData.ListedGroup group) {
+                            final String groupId = group.groupId();
+                            final Optional<ShareGroupState> state = 
group.groupState().isEmpty()
+                                    ? Optional.empty()
+                                    : 
Optional.of(ShareGroupState.parse(group.groupState()));
+                            final ShareGroupListing groupListing = new 
ShareGroupListing(groupId, state);
+                            results.addListing(groupListing);
+                        }
+
+                        @Override
+                        void handleResponse(AbstractResponse abstractResponse) 
{
+                            final ListGroupsResponse response = 
(ListGroupsResponse) abstractResponse;
+                            synchronized (results) {
+                                Errors error = 
Errors.forCode(response.data().errorCode());
+                                if (error == 
Errors.COORDINATOR_LOAD_IN_PROGRESS || error == 
Errors.COORDINATOR_NOT_AVAILABLE) {
+                                    throw error.exception();
+                                } else if (error != Errors.NONE) {
+                                    results.addError(error.exception(), node);
+                                } else {
+                                    for (ListGroupsResponseData.ListedGroup 
group : response.data().groups()) {
+                                        maybeAddShareGroup(group);
+                                    }
+                                }
+                                results.tryComplete(node);
+                            }
+                        }
+
+                        @Override
+                        void handleFailure(Throwable throwable) {
+                            synchronized (results) {
+                                results.addError(throwable, node);
+                                results.tryComplete(node);
+                            }
+                        }
+                    }, nowList);
+                }
+            }
+
+            @Override
+            void handleFailure(Throwable throwable) {
+                KafkaException exception = new KafkaException("Failed to find 
brokers to send ListGroups", throwable);
+                all.complete(Collections.singletonList(exception));
+            }
+        }, nowMetadata);
+
+        return new ListShareGroupsResult(all);
+    }
+
     @Override
     public Map<MetricName, ? extends Metric> metrics() {
         return Collections.unmodifiableMap(this.metrics.metrics());
diff --git 
a/clients/src/main/java/org/apache/kafka/clients/admin/ListShareGroupsOptions.java
 
b/clients/src/main/java/org/apache/kafka/clients/admin/ListShareGroupsOptions.java
new file mode 100644
index 00000000000..61f0aa40eb2
--- /dev/null
+++ 
b/clients/src/main/java/org/apache/kafka/clients/admin/ListShareGroupsOptions.java
@@ -0,0 +1,51 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ *    http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.kafka.clients.admin;
+
+import org.apache.kafka.common.ShareGroupState;
+import org.apache.kafka.common.annotation.InterfaceStability;
+
+import java.util.Collections;
+import java.util.HashSet;
+import java.util.Set;
+
+/**
+ * Options for {@link Admin#listShareGroups(ListShareGroupsOptions)}.
+ *
+ * The API of this class is evolving, see {@link Admin} for details.
+ */
[email protected]
+public class ListShareGroupsOptions extends 
AbstractOptions<ListShareGroupsOptions> {
+
+    private Set<ShareGroupState> states = Collections.emptySet();
+
+    /**
+     * If states is set, only groups in these states will be returned. 
Otherwise, all groups are returned.
+     */
+    public ListShareGroupsOptions inStates(Set<ShareGroupState> states) {
+        this.states = (states == null) ? Collections.emptySet() : new 
HashSet<>(states);
+        return this;
+    }
+
+    /**
+     * Return the list of States that are requested or empty if no states have 
been specified.
+     */
+    public Set<ShareGroupState> states() {
+        return states;
+    }
+}
\ No newline at end of file
diff --git 
a/clients/src/main/java/org/apache/kafka/clients/admin/ListShareGroupsResult.java
 
b/clients/src/main/java/org/apache/kafka/clients/admin/ListShareGroupsResult.java
new file mode 100644
index 00000000000..10d325bae54
--- /dev/null
+++ 
b/clients/src/main/java/org/apache/kafka/clients/admin/ListShareGroupsResult.java
@@ -0,0 +1,84 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ *    http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.kafka.clients.admin;
+
+import org.apache.kafka.common.KafkaFuture;
+import org.apache.kafka.common.annotation.InterfaceStability;
+import org.apache.kafka.common.internals.KafkaFutureImpl;
+
+import java.util.ArrayList;
+import java.util.Collection;
+
+/**
+ * The result of the {@link Admin#listShareGroups(ListShareGroupsOptions)} 
call.
+ * <p>
+ * The API of this class is evolving, see {@link Admin} for details.
+ */
[email protected]
+public class ListShareGroupsResult {
+
+    private final KafkaFutureImpl<Collection<ShareGroupListing>> all;
+    private final KafkaFutureImpl<Collection<ShareGroupListing>> valid;
+    private final KafkaFutureImpl<Collection<Throwable>> errors;
+
+    ListShareGroupsResult(KafkaFuture<Collection<Object>> future) {
+        this.all = new KafkaFutureImpl<>();
+        this.valid = new KafkaFutureImpl<>();
+        this.errors = new KafkaFutureImpl<>();
+        future.thenApply((KafkaFuture.BaseFunction<Collection<Object>, Void>) 
results -> {
+            ArrayList<Throwable> curErrors = new ArrayList<>();
+            ArrayList<ShareGroupListing> curValid = new ArrayList<>();
+            for (Object resultObject : results) {
+                if (resultObject instanceof Throwable) {
+                    curErrors.add((Throwable) resultObject);
+                } else {
+                    curValid.add((ShareGroupListing) resultObject);
+                }
+            }
+            if (!curErrors.isEmpty()) {
+                all.completeExceptionally(curErrors.get(0));
+            } else {
+                all.complete(curValid);
+            }
+            valid.complete(curValid);
+            errors.complete(curErrors);
+            return null;
+        });
+    }
+
+    /**
+     * Returns a future that yields either an exception, or the full set of 
share group listings.
+     */
+    public KafkaFuture<Collection<ShareGroupListing>> all() {
+        return all;
+    }
+
+    /**
+     * Returns a future which yields just the valid listings.
+     */
+    public KafkaFuture<Collection<ShareGroupListing>> valid() {
+        return valid;
+    }
+
+    /**
+     * Returns a future which yields just the errors which occurred.
+     */
+    public KafkaFuture<Collection<Throwable>> errors() {
+        return errors;
+    }
+}
\ No newline at end of file
diff --git 
a/clients/src/main/java/org/apache/kafka/clients/admin/ShareGroupDescription.java
 
b/clients/src/main/java/org/apache/kafka/clients/admin/ShareGroupDescription.java
new file mode 100644
index 00000000000..30d88e31192
--- /dev/null
+++ 
b/clients/src/main/java/org/apache/kafka/clients/admin/ShareGroupDescription.java
@@ -0,0 +1,124 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ *    http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.kafka.clients.admin;
+
+import org.apache.kafka.common.Node;
+import org.apache.kafka.common.ShareGroupState;
+import org.apache.kafka.common.acl.AclOperation;
+import org.apache.kafka.common.annotation.InterfaceStability;
+
+import java.util.ArrayList;
+import java.util.Collection;
+import java.util.Collections;
+import java.util.Objects;
+import java.util.Set;
+import java.util.stream.Collectors;
+
+/**
+ * A detailed description of a single share group in the cluster.
+ */
[email protected]
+public class ShareGroupDescription {
+    private final String groupId;
+    private final Collection<MemberDescription> members;
+    private final ShareGroupState state;
+    private final Node coordinator;
+    private final Set<AclOperation> authorizedOperations;
+
+    public ShareGroupDescription(String groupId,
+                                 Collection<MemberDescription> members,
+                                 ShareGroupState state,
+                                 Node coordinator) {
+        this(groupId, members, state, coordinator, Collections.emptySet());
+    }
+
+    public ShareGroupDescription(String groupId,
+                                 Collection<MemberDescription> members,
+                                 ShareGroupState state,
+                                 Node coordinator,
+                                 Set<AclOperation> authorizedOperations) {
+        this.groupId = groupId == null ? "" : groupId;
+        this.members = members == null ? Collections.emptyList() :
+            Collections.unmodifiableList(new ArrayList<>(members));
+        this.state = state;
+        this.coordinator = coordinator;
+        this.authorizedOperations = authorizedOperations;
+    }
+
+    @Override
+    public boolean equals(final Object o) {
+        if (this == o) return true;
+        if (o == null || getClass() != o.getClass()) return false;
+        final ShareGroupDescription that = (ShareGroupDescription) o;
+        return Objects.equals(groupId, that.groupId) &&
+            Objects.equals(members, that.members) &&
+            state == that.state &&
+            Objects.equals(coordinator, that.coordinator) &&
+            Objects.equals(authorizedOperations, that.authorizedOperations);
+    }
+
+    @Override
+    public int hashCode() {
+        return Objects.hash(groupId, members, state, coordinator, 
authorizedOperations);
+    }
+
+    /**
+     * The id of the share group.
+     */
+    public String groupId() {
+        return groupId;
+    }
+
+    /**
+     * A list of the members of the share group.
+     */
+    public Collection<MemberDescription> members() {
+        return members;
+    }
+
+    /**
+     * The share group state, or UNKNOWN if the state is too new for us to 
parse.
+     */
+    public ShareGroupState state() {
+        return state;
+    }
+
+    /**
+     * The share group coordinator, or null if the coordinator is not known.
+     */
+    public Node coordinator() {
+        return coordinator;
+    }
+
+    /**
+     * authorizedOperations for this group, or null if that information is not 
known.
+     */
+    public Set<AclOperation> authorizedOperations() {
+        return authorizedOperations;
+    }
+
+    @Override
+    public String toString() {
+        return "(groupId=" + groupId +
+            ", members=" + 
members.stream().map(MemberDescription::toString).collect(Collectors.joining(","))
 +
+            ", state=" + state +
+            ", coordinator=" + coordinator +
+            ", authorizedOperations=" + authorizedOperations +
+            ")";
+    }
+}
diff --git 
a/clients/src/main/java/org/apache/kafka/clients/admin/ShareGroupListing.java 
b/clients/src/main/java/org/apache/kafka/clients/admin/ShareGroupListing.java
new file mode 100644
index 00000000000..54af225d2ca
--- /dev/null
+++ 
b/clients/src/main/java/org/apache/kafka/clients/admin/ShareGroupListing.java
@@ -0,0 +1,97 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ *    http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.kafka.clients.admin;
+
+import org.apache.kafka.common.ShareGroupState;
+import org.apache.kafka.common.annotation.InterfaceStability;
+
+import java.util.Objects;
+import java.util.Optional;
+
+/**
+ * A listing of a share group in the cluster.
+ * <p>
+ * The API of this class is evolving, see {@link Admin} for details.
+ */
[email protected]
+public class ShareGroupListing {
+    private final String groupId;
+    private final Optional<ShareGroupState> state;
+
+    /**
+     * Create an instance with the specified parameters.
+     *
+     * @param groupId Group Id
+     */
+    public ShareGroupListing(String groupId) {
+        this(groupId, Optional.empty());
+    }
+
+    /**
+     * Create an instance with the specified parameters.
+     *
+     * @param groupId Group Id
+     * @param state The state of the share group
+     */
+    public ShareGroupListing(String groupId, Optional<ShareGroupState> state) {
+        this.groupId = groupId;
+        this.state = Objects.requireNonNull(state);
+    }
+
+    /**
+     * The id of the share group.
+     */
+    public String groupId() {
+        return groupId;
+    }
+
+    /**
+     * The share group state.
+     */
+    public Optional<ShareGroupState> state() {
+        return state;
+    }
+
+    @Override
+    public String toString() {
+        return "(" +
+            "groupId='" + groupId + '\'' +
+            ", state=" + state +
+            ')';
+    }
+
+    @Override
+    public int hashCode() {
+        return Objects.hash(groupId, state);
+    }
+
+    @Override
+    public boolean equals(Object o) {
+        if (this == o)
+            return true;
+        if (o == null || getClass() != o.getClass())
+            return false;
+        if (!(o instanceof ShareGroupListing))
+            return false;
+        if (getClass() != o.getClass())
+            return false;
+        ShareGroupListing that = (ShareGroupListing) o;
+        return Objects.equals(groupId, that.groupId) &&
+            Objects.equals(state, that.state);
+    }
+}
\ No newline at end of file
diff --git 
a/clients/src/main/java/org/apache/kafka/clients/admin/internals/DescribeShareGroupsHandler.java
 
b/clients/src/main/java/org/apache/kafka/clients/admin/internals/DescribeShareGroupsHandler.java
new file mode 100644
index 00000000000..97c44f3e784
--- /dev/null
+++ 
b/clients/src/main/java/org/apache/kafka/clients/admin/internals/DescribeShareGroupsHandler.java
@@ -0,0 +1,198 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ *    http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.kafka.clients.admin.internals;
+
+import org.apache.kafka.clients.admin.MemberAssignment;
+import org.apache.kafka.clients.admin.MemberDescription;
+import org.apache.kafka.clients.admin.ShareGroupDescription;
+import org.apache.kafka.common.Node;
+import org.apache.kafka.common.ShareGroupState;
+import org.apache.kafka.common.TopicPartition;
+import org.apache.kafka.common.acl.AclOperation;
+import org.apache.kafka.common.message.ShareGroupDescribeRequestData;
+import org.apache.kafka.common.message.ShareGroupDescribeResponseData;
+import org.apache.kafka.common.protocol.Errors;
+import org.apache.kafka.common.requests.AbstractResponse;
+import org.apache.kafka.common.requests.FindCoordinatorRequest;
+import org.apache.kafka.common.requests.FindCoordinatorRequest.CoordinatorType;
+import org.apache.kafka.common.requests.MetadataResponse;
+import org.apache.kafka.common.requests.ShareGroupDescribeRequest;
+import org.apache.kafka.common.requests.ShareGroupDescribeResponse;
+import org.apache.kafka.common.utils.LogContext;
+import org.apache.kafka.common.utils.Utils;
+
+import org.slf4j.Logger;
+
+import java.util.ArrayList;
+import java.util.Collection;
+import java.util.HashMap;
+import java.util.HashSet;
+import java.util.List;
+import java.util.Map;
+import java.util.Set;
+import java.util.stream.Collectors;
+
+public class DescribeShareGroupsHandler extends 
AdminApiHandler.Batched<CoordinatorKey, ShareGroupDescription> {
+
+    private final boolean includeAuthorizedOperations;
+    private final Logger log;
+    private final AdminApiLookupStrategy<CoordinatorKey> lookupStrategy;
+
+    public DescribeShareGroupsHandler(
+          boolean includeAuthorizedOperations,
+          LogContext logContext) {
+        this.includeAuthorizedOperations = includeAuthorizedOperations;
+        this.log = logContext.logger(DescribeShareGroupsHandler.class);
+        this.lookupStrategy = new CoordinatorStrategy(CoordinatorType.GROUP, 
logContext);
+    }
+
+    private static Set<CoordinatorKey> buildKeySet(Collection<String> 
groupIds) {
+        return groupIds.stream()
+            .map(CoordinatorKey::byGroupId)
+            .collect(Collectors.toSet());
+    }
+
+    public static AdminApiFuture.SimpleAdminApiFuture<CoordinatorKey, 
ShareGroupDescription> newFuture(Collection<String> groupIds) {
+        return AdminApiFuture.forKeys(buildKeySet(groupIds));
+    }
+
+    @Override
+    public String apiName() {
+        return "describeShareGroups";
+    }
+
+    @Override
+    public AdminApiLookupStrategy<CoordinatorKey> lookupStrategy() {
+        return lookupStrategy;
+    }
+
+    @Override
+    public ShareGroupDescribeRequest.Builder buildBatchedRequest(int 
coordinatorId, Set<CoordinatorKey> keys) {
+        List<String> groupIds = keys.stream().map(key -> {
+            if (key.type != FindCoordinatorRequest.CoordinatorType.GROUP) {
+                throw new IllegalArgumentException("Invalid transaction 
coordinator key " + key +
+                    " when building `DescribeShareGroups` request");
+            }
+            return key.idValue;
+        }).collect(Collectors.toList());
+        ShareGroupDescribeRequestData data = new 
ShareGroupDescribeRequestData()
+            .setGroupIds(groupIds)
+            .setIncludeAuthorizedOperations(includeAuthorizedOperations);
+        return new ShareGroupDescribeRequest.Builder(data, true);
+    }
+
+    @Override
+    public ApiResult<CoordinatorKey, ShareGroupDescription> handleResponse(
+            Node coordinator,
+            Set<CoordinatorKey> groupIds,
+            AbstractResponse abstractResponse) {
+        final ShareGroupDescribeResponse response = 
(ShareGroupDescribeResponse) abstractResponse;
+        final Map<CoordinatorKey, ShareGroupDescription> completed = new 
HashMap<>();
+        final Map<CoordinatorKey, Throwable> failed = new HashMap<>();
+        final Set<CoordinatorKey> groupsToUnmap = new HashSet<>();
+
+        for (ShareGroupDescribeResponseData.DescribedGroup describedGroup : 
response.data().groups()) {
+            CoordinatorKey groupIdKey = 
CoordinatorKey.byGroupId(describedGroup.groupId());
+            Errors error = Errors.forCode(describedGroup.errorCode());
+            if (error != Errors.NONE) {
+                handleError(groupIdKey, error, describedGroup.errorMessage(), 
failed, groupsToUnmap);
+                continue;
+            }
+
+            final List<MemberDescription> memberDescriptions = new 
ArrayList<>(describedGroup.members().size());
+            final Set<AclOperation> authorizedOperations = 
validAclOperations(describedGroup.authorizedOperations());
+
+            describedGroup.members().forEach(groupMember ->
+                memberDescriptions.add(new MemberDescription(
+                    groupMember.memberId(),
+                    groupMember.clientId(),
+                    groupMember.clientHost(),
+                    new 
MemberAssignment(convertAssignment(groupMember.assignment()))
+                ))
+            );
+
+            final ShareGroupDescription shareGroupDescription =
+                new ShareGroupDescription(groupIdKey.idValue,
+                    memberDescriptions,
+                    ShareGroupState.parse(describedGroup.groupState()),
+                    coordinator,
+                    authorizedOperations);
+            completed.put(groupIdKey, shareGroupDescription);
+        }
+
+        return new ApiResult<>(completed, failed, new 
ArrayList<>(groupsToUnmap));
+    }
+
+    private Set<TopicPartition> 
convertAssignment(ShareGroupDescribeResponseData.Assignment assignment) {
+        return assignment.topicPartitions().stream().flatMap(topic ->
+            topic.partitions().stream().map(partition ->
+                new TopicPartition(topic.topicName(), partition)
+            )
+        ).collect(Collectors.toSet());
+    }
+
+    private void handleError(
+            CoordinatorKey groupId,
+            Errors error,
+            String errorMsg,
+            Map<CoordinatorKey, Throwable> failed,
+            Set<CoordinatorKey> groupsToUnmap) {
+        switch (error) {
+            case GROUP_AUTHORIZATION_FAILED:
+                log.debug("`DescribeShareGroups` request for group id {} 
failed due to error {}", groupId.idValue, error);
+                failed.put(groupId, error.exception(errorMsg));
+                break;
+
+            case COORDINATOR_LOAD_IN_PROGRESS:
+                // If the coordinator is in the middle of loading, then we 
just need to retry
+                log.debug("`DescribeShareGroups` request for group id {} 
failed because the coordinator " +
+                    "is still in the process of loading state. Will retry", 
groupId.idValue);
+                break;
+
+            case COORDINATOR_NOT_AVAILABLE:
+            case NOT_COORDINATOR:
+                // If the coordinator is unavailable or there was a 
coordinator change, then we unmap
+                // the key so that we retry the `FindCoordinator` request
+                log.debug("`DescribeShareGroups` request for group id {} 
returned error {}. " +
+                    "Will attempt to find the coordinator again and retry", 
groupId.idValue, error);
+                groupsToUnmap.add(groupId);
+                break;
+
+            case GROUP_ID_NOT_FOUND:
+                log.error("`DescribeShareGroups` request for group id {} 
failed because the group does not exist.", groupId.idValue);
+                failed.put(groupId, error.exception(errorMsg));
+                break;
+
+            default:
+                log.error("`DescribeShareGroups` request for group id {} 
failed due to unexpected error {}", groupId.idValue, error);
+                failed.put(groupId, error.exception(errorMsg));
+        }
+    }
+
+    private Set<AclOperation> validAclOperations(final int 
authorizedOperations) {
+        if (authorizedOperations == 
MetadataResponse.AUTHORIZED_OPERATIONS_OMITTED) {
+            return null;
+        }
+        return Utils.from32BitField(authorizedOperations)
+            .stream()
+            .map(AclOperation::fromCode)
+            .filter(operation -> operation != AclOperation.UNKNOWN
+                && operation != AclOperation.ALL
+                && operation != AclOperation.ANY)
+            .collect(Collectors.toSet());
+    }
+}
diff --git a/clients/src/main/java/org/apache/kafka/common/GroupType.java 
b/clients/src/main/java/org/apache/kafka/common/GroupType.java
index 10bb1fc348b..eeb79ea2825 100644
--- a/clients/src/main/java/org/apache/kafka/common/GroupType.java
+++ b/clients/src/main/java/org/apache/kafka/common/GroupType.java
@@ -25,7 +25,8 @@ import java.util.stream.Collectors;
 public enum GroupType {
     UNKNOWN("Unknown"),
     CONSUMER("Consumer"),
-    CLASSIC("Classic");
+    CLASSIC("Classic"),
+    SHARE("Share");
 
     private static final Map<String, GroupType> NAME_TO_ENUM = 
Arrays.stream(values())
         .collect(Collectors.toMap(type -> type.name.toLowerCase(Locale.ROOT), 
Function.identity()));
diff --git 
a/clients/src/test/java/org/apache/kafka/clients/admin/KafkaAdminClientTest.java
 
b/clients/src/test/java/org/apache/kafka/clients/admin/KafkaAdminClientTest.java
index dc74a178442..bc4ccb52f8b 100644
--- 
a/clients/src/test/java/org/apache/kafka/clients/admin/KafkaAdminClientTest.java
+++ 
b/clients/src/test/java/org/apache/kafka/clients/admin/KafkaAdminClientTest.java
@@ -35,6 +35,7 @@ import org.apache.kafka.common.KafkaException;
 import org.apache.kafka.common.KafkaFuture;
 import org.apache.kafka.common.Node;
 import org.apache.kafka.common.PartitionInfo;
+import org.apache.kafka.common.ShareGroupState;
 import org.apache.kafka.common.TopicCollection;
 import org.apache.kafka.common.TopicPartition;
 import org.apache.kafka.common.TopicPartitionReplica;
@@ -141,6 +142,7 @@ import 
org.apache.kafka.common.message.OffsetDeleteResponseData.OffsetDeleteResp
 import org.apache.kafka.common.message.OffsetFetchRequestData;
 import 
org.apache.kafka.common.message.OffsetFetchRequestData.OffsetFetchRequestGroup;
 import 
org.apache.kafka.common.message.OffsetFetchRequestData.OffsetFetchRequestTopics;
+import org.apache.kafka.common.message.ShareGroupDescribeResponseData;
 import org.apache.kafka.common.message.UnregisterBrokerResponseData;
 import org.apache.kafka.common.message.WriteTxnMarkersResponseData;
 import org.apache.kafka.common.protocol.ApiKeys;
@@ -213,6 +215,7 @@ import org.apache.kafka.common.requests.OffsetFetchRequest;
 import org.apache.kafka.common.requests.OffsetFetchResponse;
 import org.apache.kafka.common.requests.OffsetFetchResponse.PartitionData;
 import org.apache.kafka.common.requests.RequestTestUtils;
+import org.apache.kafka.common.requests.ShareGroupDescribeResponse;
 import org.apache.kafka.common.requests.UnregisterBrokerResponse;
 import org.apache.kafka.common.requests.UpdateFeaturesRequest;
 import org.apache.kafka.common.requests.UpdateFeaturesResponse;
@@ -4629,6 +4632,357 @@ public class KafkaAdminClientTest {
         }
     }
 
+    @Test
+    public void testDescribeShareGroups() throws Exception {
+        try (AdminClientUnitTestEnv env = new 
AdminClientUnitTestEnv(mockCluster(1, 0))) {
+            env.kafkaClient().setNodeApiVersions(NodeApiVersions.create());
+
+            // Retriable FindCoordinatorResponse errors should be retried
+            
env.kafkaClient().prepareResponse(prepareFindCoordinatorResponse(Errors.COORDINATOR_NOT_AVAILABLE,
  Node.noNode()));
+            
env.kafkaClient().prepareResponse(prepareFindCoordinatorResponse(Errors.COORDINATOR_LOAD_IN_PROGRESS,
  Node.noNode()));
+            
env.kafkaClient().prepareResponse(prepareFindCoordinatorResponse(Errors.NONE, 
env.cluster().controller()));
+
+            ShareGroupDescribeResponseData data = new 
ShareGroupDescribeResponseData();
+
+            // Retriable errors should be retried
+            data.groups().add(new 
ShareGroupDescribeResponseData.DescribedGroup()
+                .setGroupId(GROUP_ID)
+                .setErrorCode(Errors.COORDINATOR_LOAD_IN_PROGRESS.code()));
+            env.kafkaClient().prepareResponse(new 
ShareGroupDescribeResponse(data));
+
+            /*
+             * We need to return two responses here, one with NOT_COORDINATOR 
error when calling describe share group
+             * api using coordinator that has moved. This will retry whole 
operation. So we need to again respond with a
+             * FindCoordinatorResponse.
+             *
+             * And the same reason for COORDINATOR_NOT_AVAILABLE error response
+             */
+            data = new ShareGroupDescribeResponseData();
+            data.groups().add(new 
ShareGroupDescribeResponseData.DescribedGroup()
+                .setGroupId(GROUP_ID)
+                .setErrorCode(Errors.NOT_COORDINATOR.code()));
+            env.kafkaClient().prepareResponse(new 
ShareGroupDescribeResponse(data));
+            
env.kafkaClient().prepareResponse(prepareFindCoordinatorResponse(Errors.NONE, 
env.cluster().controller()));
+
+            data = new ShareGroupDescribeResponseData();
+            data.groups().add(new 
ShareGroupDescribeResponseData.DescribedGroup()
+                .setGroupId(GROUP_ID)
+                .setErrorCode(Errors.COORDINATOR_NOT_AVAILABLE.code()));
+            env.kafkaClient().prepareResponse(new 
ShareGroupDescribeResponse(data));
+            
env.kafkaClient().prepareResponse(prepareFindCoordinatorResponse(Errors.NONE, 
env.cluster().controller()));
+
+            data = new ShareGroupDescribeResponseData();
+            ShareGroupDescribeResponseData.TopicPartitions topicPartitions = 
new ShareGroupDescribeResponseData.TopicPartitions()
+                .setTopicName("my_topic")
+                .setPartitions(asList(0, 1, 2));
+            ShareGroupDescribeResponseData.Assignment memberAssignment = new 
ShareGroupDescribeResponseData.Assignment()
+                .setTopicPartitions(asList(topicPartitions));
+            ShareGroupDescribeResponseData.Member memberOne = new 
ShareGroupDescribeResponseData.Member()
+                .setMemberId("0")
+                .setClientId("clientId0")
+                .setClientHost("clientHost")
+                .setAssignment(memberAssignment);
+            ShareGroupDescribeResponseData.Member memberTwo = new 
ShareGroupDescribeResponseData.Member()
+                .setMemberId("1")
+                .setClientId("clientId1")
+                .setClientHost("clientHost")
+                .setAssignment(memberAssignment);
+
+            ShareGroupDescribeResponseData group0Data = new 
ShareGroupDescribeResponseData();
+            group0Data.groups().add(new 
ShareGroupDescribeResponseData.DescribedGroup()
+                .setGroupId(GROUP_ID)
+                .setGroupState(ShareGroupState.STABLE.toString())
+                .setMembers(asList(memberOne, memberTwo)));
+
+            final List<TopicPartition> expectedTopicPartitions = new 
ArrayList<>();
+            expectedTopicPartitions.add(0, new TopicPartition("my_topic", 0));
+            expectedTopicPartitions.add(1, new TopicPartition("my_topic", 1));
+            expectedTopicPartitions.add(2, new TopicPartition("my_topic", 2));
+
+            List<MemberDescription> expectedMemberDescriptions = new 
ArrayList<>();
+            
expectedMemberDescriptions.add(convertToMemberDescriptions(memberOne,
+                new MemberAssignment(new HashSet<>(expectedTopicPartitions))));
+            
expectedMemberDescriptions.add(convertToMemberDescriptions(memberTwo,
+                new MemberAssignment(new HashSet<>(expectedTopicPartitions))));
+            data.groups().add(new 
ShareGroupDescribeResponseData.DescribedGroup()
+                .setGroupId(GROUP_ID)
+                .setGroupState(ShareGroupState.STABLE.toString())
+                .setMembers(asList(memberOne, memberTwo)));
+
+            env.kafkaClient().prepareResponse(new 
ShareGroupDescribeResponse(data));
+
+            final DescribeShareGroupsResult result = 
env.adminClient().describeShareGroups(singletonList(GROUP_ID));
+            final ShareGroupDescription groupDescription = 
result.describedGroups().get(GROUP_ID).get();
+
+            assertEquals(1, result.describedGroups().size());
+            assertEquals(GROUP_ID, groupDescription.groupId());
+            assertEquals(2, groupDescription.members().size());
+            assertEquals(expectedMemberDescriptions, 
groupDescription.members());
+        }
+    }
+
+    @Test
+    public void testDescribeShareGroupsWithAuthorizedOperationsOmitted() 
throws Exception {
+        try (AdminClientUnitTestEnv env = new 
AdminClientUnitTestEnv(mockCluster(1, 0))) {
+            env.kafkaClient().setNodeApiVersions(NodeApiVersions.create());
+
+            env.kafkaClient().prepareResponse(
+                prepareFindCoordinatorResponse(Errors.NONE, 
env.cluster().controller()));
+
+            ShareGroupDescribeResponseData data = new 
ShareGroupDescribeResponseData();
+
+            data.groups().add(new 
ShareGroupDescribeResponseData.DescribedGroup()
+                .setGroupId(GROUP_ID)
+                
.setAuthorizedOperations(MetadataResponse.AUTHORIZED_OPERATIONS_OMITTED));
+
+            env.kafkaClient().prepareResponse(new 
ShareGroupDescribeResponse(data));
+
+            final DescribeShareGroupsResult result = 
env.adminClient().describeShareGroups(singletonList(GROUP_ID));
+            final ShareGroupDescription groupDescription = 
result.describedGroups().get(GROUP_ID).get();
+
+            assertNull(groupDescription.authorizedOperations());
+        }
+    }
+
+    @Test
+    public void testDescribeMultipleShareGroups() {
+        try (AdminClientUnitTestEnv env = new 
AdminClientUnitTestEnv(mockCluster(1, 0))) {
+            env.kafkaClient().setNodeApiVersions(NodeApiVersions.create());
+
+            
env.kafkaClient().prepareResponse(prepareFindCoordinatorResponse(Errors.NONE, 
env.cluster().controller()));
+
+            ShareGroupDescribeResponseData.TopicPartitions topicPartitions = 
new ShareGroupDescribeResponseData.TopicPartitions()
+                .setTopicName("my_topic")
+                .setPartitions(asList(0, 1, 2));
+            final ShareGroupDescribeResponseData.Assignment memberAssignment = 
new ShareGroupDescribeResponseData.Assignment()
+                .setTopicPartitions(asList(topicPartitions));
+            ShareGroupDescribeResponseData group0Data = new 
ShareGroupDescribeResponseData();
+            group0Data.groups().add(new 
ShareGroupDescribeResponseData.DescribedGroup()
+                .setGroupId(GROUP_ID)
+                .setGroupState(ShareGroupState.STABLE.toString())
+                .setMembers(asList(
+                    new ShareGroupDescribeResponseData.Member()
+                        .setMemberId("0")
+                        .setClientId("clientId0")
+                        .setClientHost("clientHost")
+                        .setAssignment(memberAssignment),
+                    new ShareGroupDescribeResponseData.Member()
+                        .setMemberId("1")
+                        .setClientId("clientId1")
+                        .setClientHost("clientHost")
+                        .setAssignment(memberAssignment))));
+
+            ShareGroupDescribeResponseData group1Data = new 
ShareGroupDescribeResponseData();
+            group1Data.groups().add(new 
ShareGroupDescribeResponseData.DescribedGroup()
+                .setGroupId("group-1")
+                .setGroupState(ShareGroupState.STABLE.toString())
+                .setMembers(asList(
+                    new ShareGroupDescribeResponseData.Member()
+                        .setMemberId("0")
+                        .setClientId("clientId0")
+                        .setClientHost("clientHost")
+                        .setAssignment(memberAssignment),
+                    new ShareGroupDescribeResponseData.Member()
+                        .setMemberId("1")
+                        .setClientId("clientId1")
+                        .setClientHost("clientHost")
+                        .setAssignment(memberAssignment))));
+
+            env.kafkaClient().prepareResponse(new 
ShareGroupDescribeResponse(group0Data));
+            env.kafkaClient().prepareResponse(new 
ShareGroupDescribeResponse(group1Data));
+
+            Collection<String> groups = new HashSet<>();
+            groups.add(GROUP_ID);
+            groups.add("group-1");
+            final DescribeShareGroupsResult result = 
env.adminClient().describeShareGroups(groups);
+            assertEquals(2, result.describedGroups().size());
+            assertEquals(groups, result.describedGroups().keySet());
+        }
+    }
+
+    @Test
+    public void testListShareGroups() throws Exception {
+        try (AdminClientUnitTestEnv env = new 
AdminClientUnitTestEnv(mockCluster(4, 0),
+                AdminClientConfig.RETRIES_CONFIG, "2")) {
+            env.kafkaClient().setNodeApiVersions(NodeApiVersions.create());
+
+            // Empty metadata response should be retried
+            env.kafkaClient().prepareResponse(
+                RequestTestUtils.metadataResponse(
+                    Collections.emptyList(),
+                    env.cluster().clusterResource().clusterId(),
+                    -1,
+                    Collections.emptyList()));
+
+            env.kafkaClient().prepareResponse(
+                RequestTestUtils.metadataResponse(
+                    env.cluster().nodes(),
+                    env.cluster().clusterResource().clusterId(),
+                    env.cluster().controller().id(),
+                    Collections.emptyList()));
+
+            env.kafkaClient().prepareResponseFrom(
+                new ListGroupsResponse(
+                    new ListGroupsResponseData()
+                        .setErrorCode(Errors.NONE.code())
+                        .setGroups(Arrays.asList(
+                            new ListGroupsResponseData.ListedGroup()
+                                .setGroupId("share-group-1")
+                                .setGroupType(GroupType.SHARE.toString())
+                                .setGroupState("Stable")
+                            ))),
+                env.cluster().nodeById(0));
+
+            // handle retriable errors
+            env.kafkaClient().prepareResponseFrom(
+                new ListGroupsResponse(
+                    new ListGroupsResponseData()
+                        .setErrorCode(Errors.COORDINATOR_NOT_AVAILABLE.code())
+                        .setGroups(Collections.emptyList())
+                ),
+                env.cluster().nodeById(1));
+            env.kafkaClient().prepareResponseFrom(
+                new ListGroupsResponse(
+                    new ListGroupsResponseData()
+                        
.setErrorCode(Errors.COORDINATOR_LOAD_IN_PROGRESS.code())
+                        .setGroups(Collections.emptyList())
+                ),
+                env.cluster().nodeById(1));
+            env.kafkaClient().prepareResponseFrom(
+                new ListGroupsResponse(
+                    new ListGroupsResponseData()
+                        .setErrorCode(Errors.NONE.code())
+                        .setGroups(Arrays.asList(
+                            new ListGroupsResponseData.ListedGroup()
+                                .setGroupId("share-group-2")
+                                .setGroupType(GroupType.SHARE.toString())
+                                .setGroupState("Stable"),
+                            new ListGroupsResponseData.ListedGroup()
+                                .setGroupId("share-group-3")
+                                .setGroupType(GroupType.SHARE.toString())
+                                .setGroupState("Stable")
+                        ))),
+                env.cluster().nodeById(1));
+
+            env.kafkaClient().prepareResponseFrom(
+                new ListGroupsResponse(
+                    new ListGroupsResponseData()
+                        .setErrorCode(Errors.NONE.code())
+                        .setGroups(Arrays.asList(
+                            new ListGroupsResponseData.ListedGroup()
+                                .setGroupId("share-group-4")
+                                .setGroupType(GroupType.SHARE.toString())
+                                .setGroupState("Stable")
+                        ))),
+                env.cluster().nodeById(2));
+
+            // fatal error
+            env.kafkaClient().prepareResponseFrom(
+                new ListGroupsResponse(
+                    new ListGroupsResponseData()
+                        .setErrorCode(Errors.UNKNOWN_SERVER_ERROR.code())
+                        .setGroups(Collections.emptyList())),
+                env.cluster().nodeById(3));
+
+            final ListShareGroupsResult result = 
env.adminClient().listShareGroups();
+            TestUtils.assertFutureError(result.all(), 
UnknownServerException.class);
+
+            Collection<ShareGroupListing> listings = result.valid().get();
+            assertEquals(4, listings.size());
+
+            Set<String> groupIds = new HashSet<>();
+            for (ShareGroupListing listing : listings) {
+                groupIds.add(listing.groupId());
+                assertTrue(listing.state().isPresent());
+            }
+
+            assertEquals(Utils.mkSet("share-group-1", "share-group-2", 
"share-group-3", "share-group-4"), groupIds);
+            assertEquals(1, result.errors().get().size());
+        }
+    }
+
+    @Test
+    public void testListShareGroupsMetadataFailure() throws Exception {
+        final Cluster cluster = mockCluster(3, 0);
+        final Time time = new MockTime();
+
+        try (AdminClientUnitTestEnv env = new AdminClientUnitTestEnv(time, 
cluster,
+                AdminClientConfig.RETRIES_CONFIG, "0")) {
+            env.kafkaClient().setNodeApiVersions(NodeApiVersions.create());
+
+            // Empty metadata causes the request to fail since we have no list 
of brokers
+            // to send the ListGroups requests to
+            env.kafkaClient().prepareResponse(
+                RequestTestUtils.metadataResponse(
+                    Collections.emptyList(),
+                    env.cluster().clusterResource().clusterId(),
+                    -1,
+                    Collections.emptyList()));
+
+            final ListShareGroupsResult result = 
env.adminClient().listShareGroups();
+            TestUtils.assertFutureError(result.all(), KafkaException.class);
+        }
+    }
+
+    @Test
+    public void testListShareGroupsWithStates() throws Exception {
+        try (AdminClientUnitTestEnv env = new 
AdminClientUnitTestEnv(mockCluster(1, 0))) {
+            env.kafkaClient().setNodeApiVersions(NodeApiVersions.create());
+
+            
env.kafkaClient().prepareResponse(prepareMetadataResponse(env.cluster(), 
Errors.NONE));
+
+            env.kafkaClient().prepareResponseFrom(
+                    new ListGroupsResponse(new ListGroupsResponseData()
+                        .setErrorCode(Errors.NONE.code())
+                        .setGroups(Arrays.asList(
+                            new ListGroupsResponseData.ListedGroup()
+                                .setGroupId("share-group-1")
+                                .setGroupType(GroupType.SHARE.toString())
+                                .setGroupState("Stable"),
+                            new ListGroupsResponseData.ListedGroup()
+                                .setGroupId("share-group-2")
+                                .setGroupType(GroupType.SHARE.toString())
+                                .setGroupState("Empty")))),
+                    env.cluster().nodeById(0));
+
+            final ListShareGroupsOptions options = new 
ListShareGroupsOptions();
+            final ListShareGroupsResult result = 
env.adminClient().listShareGroups(options);
+            Collection<ShareGroupListing> listings = result.valid().get();
+
+            assertEquals(2, listings.size());
+            List<ShareGroupListing> expected = new ArrayList<>();
+            expected.add(new ShareGroupListing("share-group-1", 
Optional.of(ShareGroupState.STABLE)));
+            expected.add(new ShareGroupListing("share-group-2", 
Optional.of(ShareGroupState.EMPTY)));
+            assertEquals(expected, listings);
+            assertEquals(0, result.errors().get().size());
+        }
+    }
+
+    @Test
+    public void testListShareGroupsWithStatesOlderBrokerVersion() throws 
Exception {
+        ApiVersion listGroupV4 = new ApiVersion()
+            .setApiKey(ApiKeys.LIST_GROUPS.id)
+            .setMinVersion((short) 0)
+            .setMaxVersion((short) 4);
+        try (AdminClientUnitTestEnv env = new 
AdminClientUnitTestEnv(mockCluster(1, 0))) {
+            
env.kafkaClient().setNodeApiVersions(NodeApiVersions.create(Collections.singletonList(listGroupV4)));
+
+            
env.kafkaClient().prepareResponse(prepareMetadataResponse(env.cluster(), 
Errors.NONE));
+
+            // Check we should not be able to list share groups with broker 
having version < 5
+            env.kafkaClient().prepareResponseFrom(
+                new ListGroupsResponse(new ListGroupsResponseData()
+                    .setErrorCode(Errors.NONE.code())
+                    .setGroups(Collections.singletonList(
+                        new ListGroupsResponseData.ListedGroup()
+                            .setGroupId("share-group-1")))),
+                env.cluster().nodeById(0));
+            ListShareGroupsOptions options = new ListShareGroupsOptions();
+            ListShareGroupsResult result = 
env.adminClient().listShareGroups(options);
+            TestUtils.assertFutureThrows(result.all(), 
UnsupportedVersionException.class);
+        }
+    }
+
     @Test
     public void testIncrementalAlterConfigs()  throws Exception {
         try (AdminClientUnitTestEnv env = mockClientEnv()) {
@@ -7754,6 +8108,15 @@ public class KafkaAdminClientTest {
                                      assignment);
     }
 
+    private static MemberDescription 
convertToMemberDescriptions(ShareGroupDescribeResponseData.Member member,
+                                                                 
MemberAssignment assignment) {
+        return new MemberDescription(member.memberId(),
+                                     Optional.empty(),
+                                     member.clientId(),
+                                     member.clientHost(),
+                                     assignment);
+    }
+
     @Test
     public void testListClientMetricsResources() throws Exception {
         try (AdminClientUnitTestEnv env = mockClientEnv()) {
diff --git 
a/clients/src/test/java/org/apache/kafka/clients/admin/MockAdminClient.java 
b/clients/src/test/java/org/apache/kafka/clients/admin/MockAdminClient.java
index 8ff65b51a22..6758f21a3f4 100644
--- a/clients/src/test/java/org/apache/kafka/clients/admin/MockAdminClient.java
+++ b/clients/src/test/java/org/apache/kafka/clients/admin/MockAdminClient.java
@@ -1316,6 +1316,16 @@ public class MockAdminClient extends AdminClient {
         throw new UnsupportedOperationException("Not implemented yet");
     }
 
+    @Override
+    public synchronized DescribeShareGroupsResult 
describeShareGroups(Collection<String> groupIds, DescribeShareGroupsOptions 
options) {
+        throw new UnsupportedOperationException("Not implemented yet");
+    }
+
+    @Override
+    public synchronized ListShareGroupsResult 
listShareGroups(ListShareGroupsOptions options) {
+        throw new UnsupportedOperationException("Not implemented yet");
+    }
+
     @Override
     public synchronized void close(Duration timeout) {}
 

Reply via email to