This is an automated email from the ASF dual-hosted git repository.

chia7712 pushed a commit to branch trunk
in repository https://gitbox.apache.org/repos/asf/kafka.git


The following commit(s) were added to refs/heads/trunk by this push:
     new 3d9f88daf3d KAFKA-17546 Admin.listGroups and kafka-groups.sh (#17626)
3d9f88daf3d is described below

commit 3d9f88daf3de7afec2c825f6def0ac12c8a546c7
Author: Andrew Schofield <[email protected]>
AuthorDate: Fri Nov 1 21:37:04 2024 +0000

    KAFKA-17546 Admin.listGroups and kafka-groups.sh (#17626)
    
    This implements the kafka-groups.sh tool and Admin.listGroups method 
defined in KIP-1043.
    
    Reviewers: Chia-Ping Tsai <[email protected]>
---
 bin/kafka-groups.sh                                |  17 +
 bin/windows/kafka-groups.bat                       |  17 +
 .../java/org/apache/kafka/clients/admin/Admin.java |  20 ++
 .../kafka/clients/admin/ForwardingAdmin.java       |   5 +
 .../apache/kafka/clients/admin/GroupListing.java   | 108 ++++++
 .../kafka/clients/admin/KafkaAdminClient.java      | 135 ++++++++
 .../kafka/clients/admin/ListGroupsOptions.java     |  51 +++
 .../kafka/clients/admin/ListGroupsResult.java      | 102 ++++++
 .../kafka/clients/admin/AdminClientTestUtils.java  |  10 +
 .../kafka/clients/admin/GroupListingTest.java      |  48 +++
 .../kafka/clients/admin/KafkaAdminClientTest.java  | 260 ++++++++++++++
 .../kafka/clients/admin/MockAdminClient.java       |   9 +
 .../TestingMetricsInterceptingAdminClient.java     |   7 +
 .../java/org/apache/kafka/tools/GroupsCommand.java | 295 ++++++++++++++++
 .../org/apache/kafka/tools/GroupsCommandTest.java  | 381 +++++++++++++++++++++
 15 files changed, 1465 insertions(+)

diff --git a/bin/kafka-groups.sh b/bin/kafka-groups.sh
new file mode 100755
index 00000000000..9c84746e962
--- /dev/null
+++ b/bin/kafka-groups.sh
@@ -0,0 +1,17 @@
+#!/bin/bash
+# Licensed to the Apache Software Foundation (ASF) under one or more
+# contributor license agreements.  See the NOTICE file distributed with
+# this work for additional information regarding copyright ownership.
+# The ASF licenses this file to You under the Apache License, Version 2.0
+# (the "License"); you may not use this file except in compliance with
+# the License.  You may obtain a copy of the License at
+#
+#    http://www.apache.org/licenses/LICENSE-2.0
+#
+# Unless required by applicable law or agreed to in writing, software
+# distributed under the License is distributed on an "AS IS" BASIS,
+# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+# See the License for the specific language governing permissions and
+# limitations under the License.
+
+exec $(dirname $0)/kafka-run-class.sh org.apache.kafka.tools.GroupsCommand "$@"
diff --git a/bin/windows/kafka-groups.bat b/bin/windows/kafka-groups.bat
new file mode 100644
index 00000000000..de079ed1568
--- /dev/null
+++ b/bin/windows/kafka-groups.bat
@@ -0,0 +1,17 @@
+@echo off
+rem Licensed to the Apache Software Foundation (ASF) under one or more
+rem contributor license agreements.  See the NOTICE file distributed with
+rem this work for additional information regarding copyright ownership.
+rem The ASF licenses this file to You under the Apache License, Version 2.0
+rem (the "License"); you may not use this file except in compliance with
+rem the License.  You may obtain a copy of the License at
+rem
+rem     http://www.apache.org/licenses/LICENSE-2.0
+rem
+rem Unless required by applicable law or agreed to in writing, software
+rem distributed under the License is distributed on an "AS IS" BASIS,
+rem WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+rem See the License for the specific language governing permissions and
+rem limitations under the License.
+
+"%~dp0kafka-run-class.bat" org.apache.kafka.tools.GroupsCommand %*
diff --git a/clients/src/main/java/org/apache/kafka/clients/admin/Admin.java 
b/clients/src/main/java/org/apache/kafka/clients/admin/Admin.java
index 6470042754c..d6695566bc2 100644
--- a/clients/src/main/java/org/apache/kafka/clients/admin/Admin.java
+++ b/clients/src/main/java/org/apache/kafka/clients/admin/Admin.java
@@ -1022,6 +1022,26 @@ public interface Admin extends AutoCloseable {
         return deleteConsumerGroupOffsets(groupId, partitions, new 
DeleteConsumerGroupOffsetsOptions());
     }
 
+    /**
+     * List the groups available in the cluster with the default options.
+     *
+     * <p>This is a convenience method for {@link 
#listGroups(ListGroupsOptions)} with default options.
+     * See the overload for more details.
+     *
+     * @return The ListGroupsResult.
+     */
+    default ListGroupsResult listGroups() {
+        return listGroups(new ListGroupsOptions());
+    }
+
+    /**
+     * List the groups available in the cluster.
+     *
+     * @param options The options to use when listing the groups.
+     * @return The ListGroupsResult.
+     */
+    ListGroupsResult listGroups(ListGroupsOptions options);
+
     /**
      * Elect a replica as leader for topic partitions.
      * <p>
diff --git 
a/clients/src/main/java/org/apache/kafka/clients/admin/ForwardingAdmin.java 
b/clients/src/main/java/org/apache/kafka/clients/admin/ForwardingAdmin.java
index ef34367de86..87e350c5e7a 100644
--- a/clients/src/main/java/org/apache/kafka/clients/admin/ForwardingAdmin.java
+++ b/clients/src/main/java/org/apache/kafka/clients/admin/ForwardingAdmin.java
@@ -309,6 +309,11 @@ public class ForwardingAdmin implements Admin {
         return delegate.listShareGroups(options);
     }
 
+    @Override
+    public ListGroupsResult listGroups(ListGroupsOptions options) {
+        return delegate.listGroups(options);
+    }
+
     @Override
     public void registerMetricForSubscription(KafkaMetric metric) {
         throw new UnsupportedOperationException();
diff --git 
a/clients/src/main/java/org/apache/kafka/clients/admin/GroupListing.java 
b/clients/src/main/java/org/apache/kafka/clients/admin/GroupListing.java
new file mode 100644
index 00000000000..0ee2a211e70
--- /dev/null
+++ b/clients/src/main/java/org/apache/kafka/clients/admin/GroupListing.java
@@ -0,0 +1,108 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ *    http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.kafka.clients.admin;
+
+import org.apache.kafka.common.GroupType;
+
+import java.util.Objects;
+import java.util.Optional;
+
+/**
+ * A listing of a group in the cluster.
+ */
+public class GroupListing {
+    private final String groupId;
+    private final Optional<GroupType> type;
+    private final String protocol;
+
+    /**
+     * Create an instance with the specified parameters.
+     *
+     * @param groupId Group Id
+     * @param type Group type
+     * @param protocol Protocol
+     */
+    public GroupListing(String groupId, Optional<GroupType> type, String 
protocol) {
+        this.groupId = groupId;
+        this.type = Objects.requireNonNull(type);
+        this.protocol = protocol;
+    }
+
+    /**
+     * The group Id.
+     *
+     * @return Group Id
+     */
+    public String groupId() {
+        return groupId;
+    }
+
+    /**
+     * The type of the group.
+     * <p>
+     * If the broker returns a group type which is not recognised, as might
+     * happen when talking to a broker with a later version, the type will be
+     * <code>Optional.of(GroupType.UNKNOWN)</code>. If the broker is earlier 
than version 2.6.0,
+     * the group type will not be available, and the type will be 
<code>Optional.empty()</code>.
+     *
+     * @return An Optional containing the type, if available
+     */
+    public Optional<GroupType> type() {
+        return type;
+    }
+
+    /**
+     * The protocol of the group.
+     *
+     * @return The protocol
+     */
+    public String protocol() {
+        return protocol;
+    }
+
+    /**
+     * If the group is a simple consumer group or not.
+     */
+    public boolean isSimpleConsumerGroup() {
+        return type.filter(gt -> gt == GroupType.CLASSIC).isPresent() && 
protocol.isEmpty();
+    }
+
+    @Override
+    public String toString() {
+        return "(" +
+            "groupId='" + groupId + '\'' +
+            ", type=" + type.map(GroupType::toString).orElse("none") +
+            ", protocol='" + protocol + '\'' +
+            ')';
+    }
+
+    @Override
+    public int hashCode() {
+        return Objects.hash(groupId, type, protocol);
+    }
+
+    @Override
+    public boolean equals(Object o) {
+        if (this == o) return true;
+        if (!(o instanceof GroupListing)) return false;
+        GroupListing that = (GroupListing) o;
+        return Objects.equals(groupId, that.groupId) &&
+            Objects.equals(type, that.type) &&
+            Objects.equals(protocol, that.protocol);
+    }
+}
diff --git 
a/clients/src/main/java/org/apache/kafka/clients/admin/KafkaAdminClient.java 
b/clients/src/main/java/org/apache/kafka/clients/admin/KafkaAdminClient.java
index 269ffd1099b..30fdac4687d 100644
--- a/clients/src/main/java/org/apache/kafka/clients/admin/KafkaAdminClient.java
+++ b/clients/src/main/java/org/apache/kafka/clients/admin/KafkaAdminClient.java
@@ -3490,6 +3490,141 @@ public class KafkaAdminClient extends AdminClient {
         return new DescribeDelegationTokenResult(tokensFuture);
     }
 
+    private static final class ListGroupsResults {
+        private final List<Throwable> errors;
+        private final HashMap<String, GroupListing> listings;
+        private final HashSet<Node> remaining;
+        private final KafkaFutureImpl<Collection<Object>> future;
+
+        ListGroupsResults(Collection<Node> leaders,
+                          KafkaFutureImpl<Collection<Object>> future) {
+            this.errors = new ArrayList<>();
+            this.listings = new HashMap<>();
+            this.remaining = new HashSet<>(leaders);
+            this.future = future;
+            tryComplete();
+        }
+
+        synchronized void addError(Throwable throwable, Node node) {
+            ApiError error = ApiError.fromThrowable(throwable);
+            if (error.message() == null || error.message().isEmpty()) {
+                errors.add(error.error().exception("Error listing groups on " 
+ node));
+            } else {
+                errors.add(error.error().exception("Error listing groups on " 
+ node + ": " + error.message()));
+            }
+        }
+
+        synchronized void addListing(GroupListing listing) {
+            listings.put(listing.groupId(), listing);
+        }
+
+        synchronized void tryComplete(Node leader) {
+            remaining.remove(leader);
+            tryComplete();
+        }
+
+        private synchronized void tryComplete() {
+            if (remaining.isEmpty()) {
+                ArrayList<Object> results = new ArrayList<>(listings.values());
+                results.addAll(errors);
+                future.complete(results);
+            }
+        }
+    }
+
+    @Override
+    public ListGroupsResult listGroups(ListGroupsOptions options) {
+        final KafkaFutureImpl<Collection<Object>> all = new 
KafkaFutureImpl<>();
+        final long nowMetadata = time.milliseconds();
+        final long deadline = calcDeadlineMs(nowMetadata, options.timeoutMs());
+        runnable.call(new Call("findAllBrokers", deadline, new 
LeastLoadedNodeProvider()) {
+            @Override
+            MetadataRequest.Builder createRequest(int timeoutMs) {
+                return new MetadataRequest.Builder(new MetadataRequestData()
+                    .setTopics(Collections.emptyList())
+                    .setAllowAutoTopicCreation(true));
+            }
+
+            @Override
+            void handleResponse(AbstractResponse abstractResponse) {
+                MetadataResponse metadataResponse = (MetadataResponse) 
abstractResponse;
+                Collection<Node> nodes = metadataResponse.brokers();
+                if (nodes.isEmpty())
+                    throw new StaleMetadataException("Metadata fetch failed 
due to missing broker list");
+
+                HashSet<Node> allNodes = new HashSet<>(nodes);
+                final ListGroupsResults results = new 
ListGroupsResults(allNodes, all);
+
+                for (final Node node : allNodes) {
+                    final long nowList = time.milliseconds();
+                    runnable.call(new Call("listGroups", deadline, new 
ConstantNodeIdProvider(node.id())) {
+                        @Override
+                        ListGroupsRequest.Builder createRequest(int timeoutMs) 
{
+                            List<String> groupTypes = options.types()
+                                .stream()
+                                .map(GroupType::toString)
+                                .collect(Collectors.toList());
+                            return new ListGroupsRequest.Builder(new 
ListGroupsRequestData()
+                                .setTypesFilter(groupTypes)
+                            );
+                        }
+
+                        private void 
maybeAddGroup(ListGroupsResponseData.ListedGroup group) {
+                            final String groupId = group.groupId();
+                            final Optional<GroupType> type;
+                            if (group.groupType() == null || 
group.groupType().isEmpty()) {
+                                type = Optional.empty();
+                            } else {
+                                type = 
Optional.of(GroupType.parse(group.groupType()));
+                            }
+                            final String protocolType = group.protocolType();
+                            final GroupListing groupListing = new GroupListing(
+                                groupId,
+                                type,
+                                protocolType
+                            );
+                            results.addListing(groupListing);
+                        }
+
+                        @Override
+                        void handleResponse(AbstractResponse abstractResponse) 
{
+                            final ListGroupsResponse response = 
(ListGroupsResponse) abstractResponse;
+                            synchronized (results) {
+                                Errors error = 
Errors.forCode(response.data().errorCode());
+                                if (error == 
Errors.COORDINATOR_LOAD_IN_PROGRESS || error == 
Errors.COORDINATOR_NOT_AVAILABLE) {
+                                    throw error.exception();
+                                } else if (error != Errors.NONE) {
+                                    results.addError(error.exception(), node);
+                                } else {
+                                    for (ListGroupsResponseData.ListedGroup 
group : response.data().groups()) {
+                                        maybeAddGroup(group);
+                                    }
+                                }
+                                results.tryComplete(node);
+                            }
+                        }
+
+                        @Override
+                        void handleFailure(Throwable throwable) {
+                            synchronized (results) {
+                                results.addError(throwable, node);
+                                results.tryComplete(node);
+                            }
+                        }
+                    }, nowList);
+                }
+            }
+
+            @Override
+            void handleFailure(Throwable throwable) {
+                KafkaException exception = new KafkaException("Failed to find 
brokers to send ListGroups", throwable);
+                all.complete(Collections.singletonList(exception));
+            }
+        }, nowMetadata);
+
+        return new ListGroupsResult(all);
+    }
+
     @Override
     public DescribeConsumerGroupsResult describeConsumerGroups(final 
Collection<String> groupIds,
                                                                final 
DescribeConsumerGroupsOptions options) {
diff --git 
a/clients/src/main/java/org/apache/kafka/clients/admin/ListGroupsOptions.java 
b/clients/src/main/java/org/apache/kafka/clients/admin/ListGroupsOptions.java
new file mode 100644
index 00000000000..042c88dc80f
--- /dev/null
+++ 
b/clients/src/main/java/org/apache/kafka/clients/admin/ListGroupsOptions.java
@@ -0,0 +1,51 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ *    http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.kafka.clients.admin;
+
+import org.apache.kafka.common.GroupType;
+import org.apache.kafka.common.annotation.InterfaceStability;
+
+import java.util.Collections;
+import java.util.Set;
+
+/**
+ * Options for {@link Admin#listGroups()}.
+ * <p>
+ * The API of this class is evolving, see {@link Admin} for details.
+ */
[email protected]
+public class ListGroupsOptions extends AbstractOptions<ListGroupsOptions> {
+
+    private Set<GroupType> types = Collections.emptySet();
+
+    /**
+     * If types is set, only groups of these types will be returned by 
listGroups().
+     * Otherwise, all groups are returned.
+     */
+    public ListGroupsOptions withTypes(Set<GroupType> types) {
+        this.types = (types == null || types.isEmpty()) ? Set.of() : 
Set.copyOf(types);
+        return this;
+    }
+
+    /**
+     * Returns the list of group types that are requested or empty if no types 
have been specified.
+     */
+    public Set<GroupType> types() {
+        return types;
+    }
+}
diff --git 
a/clients/src/main/java/org/apache/kafka/clients/admin/ListGroupsResult.java 
b/clients/src/main/java/org/apache/kafka/clients/admin/ListGroupsResult.java
new file mode 100644
index 00000000000..b19c3e38e9c
--- /dev/null
+++ b/clients/src/main/java/org/apache/kafka/clients/admin/ListGroupsResult.java
@@ -0,0 +1,102 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ *    http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.kafka.clients.admin;
+
+import org.apache.kafka.common.KafkaFuture;
+import org.apache.kafka.common.annotation.InterfaceStability;
+import org.apache.kafka.common.internals.KafkaFutureImpl;
+
+import java.util.ArrayList;
+import java.util.Collection;
+import java.util.Collections;
+import java.util.List;
+
+/**
+ * The result of the {@link Admin#listGroups()} call.
+ * <p>
+ * The API of this class is evolving, see {@link Admin} for details.
+ */
[email protected]
+public class ListGroupsResult {
+    private final KafkaFutureImpl<Collection<GroupListing>> all;
+    private final KafkaFutureImpl<Collection<GroupListing>> valid;
+    private final KafkaFutureImpl<Collection<Throwable>> errors;
+
+    ListGroupsResult(KafkaFuture<Collection<Object>> future) {
+        this.all = new KafkaFutureImpl<>();
+        this.valid = new KafkaFutureImpl<>();
+        this.errors = new KafkaFutureImpl<>();
+        future.thenApply(results -> {
+            ArrayList<Throwable> curErrors = new ArrayList<>();
+            ArrayList<GroupListing> curValid = new ArrayList<>();
+            for (Object resultObject : results) {
+                if (resultObject instanceof Throwable) {
+                    curErrors.add((Throwable) resultObject);
+                } else {
+                    curValid.add((GroupListing) resultObject);
+                }
+            }
+            List<GroupListing> validResult = 
Collections.unmodifiableList(curValid);
+            List<Throwable> errorsResult = 
Collections.unmodifiableList(curErrors);
+            if (!errorsResult.isEmpty()) {
+                all.completeExceptionally(errorsResult.get(0));
+            } else {
+                all.complete(validResult);
+            }
+            valid.complete(validResult);
+            errors.complete(errorsResult);
+            return null;
+        });
+    }
+
+    /**
+     * Returns a future that yields either an exception, or the full set of 
group listings.
+     * <p>
+     * In the event of a failure, the future yields nothing but the first 
exception which
+     * occurred.
+     */
+    public KafkaFuture<Collection<GroupListing>> all() {
+        return all;
+    }
+
+    /**
+     * Returns a future which yields just the valid listings.
+     * <p>
+     * This future never fails with an error, no matter what happens.  Errors 
are completely
+     * ignored.  If nothing can be fetched, an empty collection is yielded.
+     * If there is an error, but some results can be returned, this future 
will yield
+     * those partial results.  When using this future, it is a good idea to 
also check
+     * the errors future so that errors can be displayed and handled.
+     */
+    public KafkaFuture<Collection<GroupListing>> valid() {
+        return valid;
+    }
+
+    /**
+     * Returns a future which yields just the errors which occurred.
+     * <p>
+     * If this future yields a non-empty collection, it is very likely that 
elements are
+     * missing from the valid() set.
+     * <p>
+     * This future itself never fails with an error.  In the event of an 
error, this future
+     * will successfully yield a collection containing at least one exception.
+     */
+    public KafkaFuture<Collection<Throwable>> errors() {
+        return errors;
+    }
+}
diff --git 
a/clients/src/test/java/org/apache/kafka/clients/admin/AdminClientTestUtils.java
 
b/clients/src/test/java/org/apache/kafka/clients/admin/AdminClientTestUtils.java
index 6a79299d3b2..1a549a6fdec 100644
--- 
a/clients/src/test/java/org/apache/kafka/clients/admin/AdminClientTestUtils.java
+++ 
b/clients/src/test/java/org/apache/kafka/clients/admin/AdminClientTestUtils.java
@@ -140,6 +140,16 @@ public class AdminClientTestUtils {
                 .collect(Collectors.toMap(Map.Entry::getKey, e -> 
KafkaFuture.completedFuture(e.getValue()))));
     }
 
+    public static ListGroupsResult listGroupsResult(GroupListing... groups) {
+        return new ListGroupsResult(
+            KafkaFuture.completedFuture(Arrays.stream(groups)
+                .collect(Collectors.toList())));
+    }
+
+    public static ListGroupsResult listGroupsResult(KafkaException exception) {
+        return new 
ListGroupsResult(KafkaFuture.completedFuture(Collections.singleton(exception)));
+    }
+
     public static ListConsumerGroupOffsetsResult 
listConsumerGroupOffsetsResult(Map<String, Map<TopicPartition, 
OffsetAndMetadata>> offsets) {
         Map<CoordinatorKey, KafkaFuture<Map<TopicPartition, 
OffsetAndMetadata>>> resultMap = offsets.entrySet().stream()
             .collect(Collectors.toMap(e -> 
CoordinatorKey.byGroupId(e.getKey()),
diff --git 
a/clients/src/test/java/org/apache/kafka/clients/admin/GroupListingTest.java 
b/clients/src/test/java/org/apache/kafka/clients/admin/GroupListingTest.java
new file mode 100644
index 00000000000..7a8279be34a
--- /dev/null
+++ b/clients/src/test/java/org/apache/kafka/clients/admin/GroupListingTest.java
@@ -0,0 +1,48 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ *    http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.kafka.clients.admin;
+
+import org.apache.kafka.clients.consumer.internals.ConsumerProtocol;
+import org.apache.kafka.common.GroupType;
+
+import org.junit.jupiter.api.Test;
+
+import java.util.Optional;
+
+import static org.junit.jupiter.api.Assertions.assertFalse;
+import static org.junit.jupiter.api.Assertions.assertTrue;
+
+public class GroupListingTest {
+
+    private static final String GROUP_ID = "mygroup";
+
+    @Test
+    public void testSimpleConsumerGroup() {
+        GroupListing gl = new GroupListing(GROUP_ID, 
Optional.of(GroupType.CLASSIC), "");
+        assertTrue(gl.isSimpleConsumerGroup());
+
+        gl = new GroupListing(GROUP_ID, Optional.of(GroupType.CLASSIC), 
ConsumerProtocol.PROTOCOL_TYPE);
+        assertFalse(gl.isSimpleConsumerGroup());
+
+        gl = new GroupListing(GROUP_ID, Optional.of(GroupType.CONSUMER), "");
+        assertFalse(gl.isSimpleConsumerGroup());
+
+        gl = new GroupListing(GROUP_ID, Optional.empty(), "");
+        assertFalse(gl.isSimpleConsumerGroup());
+    }
+}
diff --git 
a/clients/src/test/java/org/apache/kafka/clients/admin/KafkaAdminClientTest.java
 
b/clients/src/test/java/org/apache/kafka/clients/admin/KafkaAdminClientTest.java
index 0fb6280b61c..a864503e1c9 100644
--- 
a/clients/src/test/java/org/apache/kafka/clients/admin/KafkaAdminClientTest.java
+++ 
b/clients/src/test/java/org/apache/kafka/clients/admin/KafkaAdminClientTest.java
@@ -2934,6 +2934,266 @@ public class KafkaAdminClientTest {
         }
     }
 
+    @Test
+    public void testListGroups() throws Exception {
+        try (AdminClientUnitTestEnv env = new 
AdminClientUnitTestEnv(mockCluster(4, 0),
+            AdminClientConfig.RETRIES_CONFIG, "2")) {
+            env.kafkaClient().setNodeApiVersions(NodeApiVersions.create());
+
+            // Empty metadata response should be retried
+            env.kafkaClient().prepareResponse(
+                RequestTestUtils.metadataResponse(
+                    Collections.emptyList(),
+                    env.cluster().clusterResource().clusterId(),
+                    -1,
+                    Collections.emptyList()));
+
+            env.kafkaClient().prepareResponse(
+                RequestTestUtils.metadataResponse(
+                    env.cluster().nodes(),
+                    env.cluster().clusterResource().clusterId(),
+                    env.cluster().controller().id(),
+                    Collections.emptyList()));
+
+            env.kafkaClient().prepareResponseFrom(
+                new ListGroupsResponse(
+                    new ListGroupsResponseData()
+                        .setErrorCode(Errors.NONE.code())
+                        .setGroups(asList(
+                            new ListGroupsResponseData.ListedGroup()
+                                .setGroupId("group-1")
+                                
.setProtocolType(ConsumerProtocol.PROTOCOL_TYPE)
+                                .setGroupType(GroupType.CONSUMER.toString())
+                                .setGroupState("Stable"),
+                            new ListGroupsResponseData.ListedGroup()
+                                .setGroupId("group-connect-1")
+                                .setProtocolType("connector")
+                                .setGroupType(GroupType.CLASSIC.toString())
+                                .setGroupState("Stable")
+                        ))),
+                env.cluster().nodeById(0));
+
+            // handle retriable errors
+            env.kafkaClient().prepareResponseFrom(
+                new ListGroupsResponse(
+                    new ListGroupsResponseData()
+                        .setErrorCode(Errors.COORDINATOR_NOT_AVAILABLE.code())
+                        .setGroups(Collections.emptyList())
+                ),
+                env.cluster().nodeById(1));
+            env.kafkaClient().prepareResponseFrom(
+                new ListGroupsResponse(
+                    new ListGroupsResponseData()
+                        
.setErrorCode(Errors.COORDINATOR_LOAD_IN_PROGRESS.code())
+                        .setGroups(Collections.emptyList())
+                ),
+                env.cluster().nodeById(1));
+            env.kafkaClient().prepareResponseFrom(
+                new ListGroupsResponse(
+                    new ListGroupsResponseData()
+                        .setErrorCode(Errors.NONE.code())
+                        .setGroups(asList(
+                            new ListGroupsResponseData.ListedGroup()
+                                .setGroupId("group-2")
+                                .setProtocolType("anyproto")
+                                .setGroupType(GroupType.CLASSIC.toString())
+                                .setGroupState("Stable"),
+                            new ListGroupsResponseData.ListedGroup()
+                                .setGroupId("group-connect-2")
+                                .setProtocolType("connector")
+                                .setGroupType(GroupType.CLASSIC.toString())
+                                .setGroupState("Stable")
+                        ))),
+                env.cluster().nodeById(1));
+
+            env.kafkaClient().prepareResponseFrom(
+                new ListGroupsResponse(
+                    new ListGroupsResponseData()
+                        .setErrorCode(Errors.NONE.code())
+                        .setGroups(asList(
+                            new ListGroupsResponseData.ListedGroup()
+                                .setGroupId("group-3")
+                                .setProtocolType("share")
+                                .setGroupType(GroupType.SHARE.toString())
+                                .setGroupState("Stable"),
+                            new ListGroupsResponseData.ListedGroup()
+                                .setGroupId("group-connect-3")
+                                .setProtocolType("connector")
+                                .setGroupType(GroupType.CLASSIC.toString())
+                                .setGroupState("Stable")
+                        ))),
+                env.cluster().nodeById(2));
+
+            // fatal error
+            env.kafkaClient().prepareResponseFrom(
+                new ListGroupsResponse(
+                    new ListGroupsResponseData()
+                        .setErrorCode(Errors.UNKNOWN_SERVER_ERROR.code())
+                        .setGroups(Collections.emptyList())),
+                env.cluster().nodeById(3));
+
+            final ListGroupsResult result = env.adminClient().listGroups();
+            TestUtils.assertFutureError(result.all(), 
UnknownServerException.class);
+
+            Collection<GroupListing> listings = result.valid().get();
+            assertEquals(6, listings.size());
+
+            Set<String> groupIds = new HashSet<>();
+            for (GroupListing listing : listings) {
+                groupIds.add(listing.groupId());
+            }
+
+            assertEquals(Set.of("group-1", "group-connect-1", "group-2", 
"group-connect-2", "group-3", "group-connect-3"), groupIds);
+            assertEquals(1, result.errors().get().size());
+        }
+    }
+
+    @Test
+    public void testListGroupsMetadataFailure() throws Exception {
+        final Cluster cluster = mockCluster(3, 0);
+        final Time time = new MockTime();
+
+        try (AdminClientUnitTestEnv env = new AdminClientUnitTestEnv(time, 
cluster,
+            AdminClientConfig.RETRIES_CONFIG, "0")) {
+            env.kafkaClient().setNodeApiVersions(NodeApiVersions.create());
+
+            // Empty metadata causes the request to fail since we have no list 
of brokers
+            // to send the ListGroups requests to
+            env.kafkaClient().prepareResponse(
+                RequestTestUtils.metadataResponse(
+                    Collections.emptyList(),
+                    env.cluster().clusterResource().clusterId(),
+                    -1,
+                    Collections.emptyList()));
+
+            final ListGroupsResult result = env.adminClient().listGroups();
+            TestUtils.assertFutureError(result.all(), KafkaException.class);
+        }
+    }
+
+    @Test
+    public void testListGroupsEmptyProtocol() throws Exception {
+        try (AdminClientUnitTestEnv env = new 
AdminClientUnitTestEnv(mockCluster(1, 0))) {
+            env.kafkaClient().setNodeApiVersions(NodeApiVersions.create());
+
+            
env.kafkaClient().prepareResponse(prepareMetadataResponse(env.cluster(), 
Errors.NONE));
+
+            env.kafkaClient().prepareResponseFrom(
+                new ListGroupsResponse(new ListGroupsResponseData()
+                    .setErrorCode(Errors.NONE.code())
+                    .setGroups(asList(
+                        new ListGroupsResponseData.ListedGroup()
+                            .setGroupId("group-1")
+                            .setProtocolType(ConsumerProtocol.PROTOCOL_TYPE)
+                            .setGroupType(GroupType.CONSUMER.toString())
+                            .setGroupState("Stable"),
+                        new ListGroupsResponseData.ListedGroup()
+                            .setGroupId("group-2")
+                            .setGroupType(GroupType.CLASSIC.toString())
+                            .setGroupState("Empty")))),
+                env.cluster().nodeById(0));
+
+            final ListGroupsOptions options = new ListGroupsOptions();
+            final ListGroupsResult result = 
env.adminClient().listGroups(options);
+            Collection<GroupListing> listings = result.valid().get();
+
+            assertEquals(2, listings.size());
+            List<GroupListing> expected = new ArrayList<>();
+            expected.add(new GroupListing("group-2", 
Optional.of(GroupType.CLASSIC), ""));
+            expected.add(new GroupListing("group-1", 
Optional.of(GroupType.CONSUMER), ConsumerProtocol.PROTOCOL_TYPE));
+            assertEquals(expected, listings);
+            assertEquals(0, result.errors().get().size());
+        }
+    }
+
+    @Test
+    public void testListGroupsEmptyGroupType() throws Exception {
+        try (AdminClientUnitTestEnv env = new 
AdminClientUnitTestEnv(mockCluster(1, 0))) {
+            env.kafkaClient().setNodeApiVersions(NodeApiVersions.create());
+
+            
env.kafkaClient().prepareResponse(prepareMetadataResponse(env.cluster(), 
Errors.NONE));
+
+            env.kafkaClient().prepareResponseFrom(
+                new ListGroupsResponse(new ListGroupsResponseData()
+                    .setErrorCode(Errors.NONE.code())
+                    .setGroups(List.of(
+                        new ListGroupsResponseData.ListedGroup()
+                            .setGroupId("group-1")
+                            .setProtocolType("any")))),
+                env.cluster().nodeById(0));
+
+            final ListGroupsOptions options = new ListGroupsOptions();
+            final ListGroupsResult result = 
env.adminClient().listGroups(options);
+            Collection<GroupListing> listings = result.valid().get();
+
+            assertEquals(1, listings.size());
+            List<GroupListing> expected = new ArrayList<>();
+            expected.add(new GroupListing("group-1", Optional.empty(), "any"));
+            assertEquals(expected, listings);
+            assertEquals(0, result.errors().get().size());
+        }
+    }
+
+    @Test
+    public void testListGroupsWithTypes() throws Exception {
+        try (AdminClientUnitTestEnv env = new 
AdminClientUnitTestEnv(mockCluster(1, 0))) {
+            env.kafkaClient().setNodeApiVersions(NodeApiVersions.create());
+
+            // Test with list group options.
+            
env.kafkaClient().prepareResponse(prepareMetadataResponse(env.cluster(), 
Errors.NONE));
+
+            env.kafkaClient().prepareResponseFrom(
+                expectListGroupsRequestWithFilters(Collections.emptySet(), 
singleton(GroupType.CONSUMER.toString())),
+                new ListGroupsResponse(new ListGroupsResponseData()
+                    .setErrorCode(Errors.NONE.code())
+                    .setGroups(asList(
+                        new ListGroupsResponseData.ListedGroup()
+                            .setGroupId("group-1")
+                            .setProtocolType(ConsumerProtocol.PROTOCOL_TYPE)
+                            .setGroupState("Stable")
+                            .setGroupType(GroupType.CONSUMER.toString()),
+                        new ListGroupsResponseData.ListedGroup()
+                            .setGroupId("group-2")
+                            .setGroupState("Empty")
+                            .setGroupType(GroupType.CONSUMER.toString())))),
+                env.cluster().nodeById(0));
+
+            final ListGroupsOptions options = new 
ListGroupsOptions().withTypes(singleton(GroupType.CONSUMER));
+            final ListGroupsResult result = 
env.adminClient().listGroups(options);
+            Collection<GroupListing> listing = result.valid().get();
+
+            assertEquals(2, listing.size());
+            List<GroupListing> expected = new ArrayList<>();
+            expected.add(new GroupListing("group-2", 
Optional.of(GroupType.CONSUMER), ""));
+            expected.add(new GroupListing("group-1", 
Optional.of(GroupType.CONSUMER), ConsumerProtocol.PROTOCOL_TYPE));
+            assertEquals(expected, listing);
+            assertEquals(0, result.errors().get().size());
+        }
+    }
+
+    @Test
+    public void testListGroupsWithTypesOlderBrokerVersion() throws Exception {
+        ApiVersion listGroupV4 = new ApiVersion()
+            .setApiKey(ApiKeys.LIST_GROUPS.id)
+            .setMinVersion((short) 0)
+            .setMaxVersion((short) 4);
+        try (AdminClientUnitTestEnv env = new 
AdminClientUnitTestEnv(mockCluster(1, 0))) {
+            
env.kafkaClient().setNodeApiVersions(NodeApiVersions.create(Collections.singletonList(listGroupV4)));
+
+            
env.kafkaClient().prepareResponse(prepareMetadataResponse(env.cluster(), 
Errors.NONE));
+
+            // Check that we cannot set a type filter with an older broker.
+            
env.kafkaClient().prepareResponse(prepareMetadataResponse(env.cluster(), 
Errors.NONE));
+            env.kafkaClient().prepareUnsupportedVersionResponse(request ->
+                request instanceof ListGroupsRequest && !((ListGroupsRequest) 
request).data().typesFilter().isEmpty()
+            );
+
+            ListGroupsOptions options = new 
ListGroupsOptions().withTypes(singleton(GroupType.CLASSIC));
+            ListGroupsResult result = env.adminClient().listGroups(options);
+            TestUtils.assertFutureThrows(result.all(), 
UnsupportedVersionException.class);
+        }
+    }
+
     @Test
     public void testListConsumerGroups() throws Exception {
         try (AdminClientUnitTestEnv env = new 
AdminClientUnitTestEnv(mockCluster(4, 0),
diff --git 
a/clients/src/test/java/org/apache/kafka/clients/admin/MockAdminClient.java 
b/clients/src/test/java/org/apache/kafka/clients/admin/MockAdminClient.java
index 44ea2bfe8e9..8b6e1267c2f 100644
--- a/clients/src/test/java/org/apache/kafka/clients/admin/MockAdminClient.java
+++ b/clients/src/test/java/org/apache/kafka/clients/admin/MockAdminClient.java
@@ -19,7 +19,9 @@ package org.apache.kafka.clients.admin;
 import 
org.apache.kafka.clients.admin.DescribeReplicaLogDirsResult.ReplicaLogDirInfo;
 import org.apache.kafka.clients.admin.internals.CoordinatorKey;
 import org.apache.kafka.clients.consumer.OffsetAndMetadata;
+import org.apache.kafka.clients.consumer.internals.ConsumerProtocol;
 import org.apache.kafka.common.ElectionType;
+import org.apache.kafka.common.GroupType;
 import org.apache.kafka.common.KafkaFuture;
 import org.apache.kafka.common.Metric;
 import org.apache.kafka.common.MetricName;
@@ -721,6 +723,13 @@ public class MockAdminClient extends AdminClient {
         return new DescribeDelegationTokenResult(future);
     }
 
+    @Override
+    public synchronized ListGroupsResult listGroups(ListGroupsOptions options) 
{
+        KafkaFutureImpl<Collection<Object>> future = new KafkaFutureImpl<>();
+        future.complete(groupConfigs.keySet().stream().map(g -> new 
GroupListing(g, Optional.of(GroupType.CONSUMER), 
ConsumerProtocol.PROTOCOL_TYPE)).collect(Collectors.toList()));
+        return new ListGroupsResult(future);
+    }
+
     @Override
     public synchronized DescribeConsumerGroupsResult 
describeConsumerGroups(Collection<String> groupIds, 
DescribeConsumerGroupsOptions options) {
         throw new UnsupportedOperationException("Not implemented yet");
diff --git 
a/streams/integration-tests/src/test/java/org/apache/kafka/streams/integration/TestingMetricsInterceptingAdminClient.java
 
b/streams/integration-tests/src/test/java/org/apache/kafka/streams/integration/TestingMetricsInterceptingAdminClient.java
index 793392b6086..e1cb7411f80 100644
--- 
a/streams/integration-tests/src/test/java/org/apache/kafka/streams/integration/TestingMetricsInterceptingAdminClient.java
+++ 
b/streams/integration-tests/src/test/java/org/apache/kafka/streams/integration/TestingMetricsInterceptingAdminClient.java
@@ -100,6 +100,8 @@ import 
org.apache.kafka.clients.admin.ListConsumerGroupOffsetsResult;
 import org.apache.kafka.clients.admin.ListConsumerGroupOffsetsSpec;
 import org.apache.kafka.clients.admin.ListConsumerGroupsOptions;
 import org.apache.kafka.clients.admin.ListConsumerGroupsResult;
+import org.apache.kafka.clients.admin.ListGroupsOptions;
+import org.apache.kafka.clients.admin.ListGroupsResult;
 import org.apache.kafka.clients.admin.ListOffsetsOptions;
 import org.apache.kafka.clients.admin.ListOffsetsResult;
 import org.apache.kafka.clients.admin.ListPartitionReassignmentsOptions;
@@ -268,6 +270,11 @@ public class TestingMetricsInterceptingAdminClient extends 
AdminClient {
         return adminDelegate.describeDelegationToken(options);
     }
 
+    @Override
+    public ListGroupsResult listGroups(final ListGroupsOptions options) {
+        return adminDelegate.listGroups(options);
+    }
+
     @Override
     public DescribeConsumerGroupsResult describeConsumerGroups(final 
Collection<String> groupIds, final DescribeConsumerGroupsOptions options) {
         return adminDelegate.describeConsumerGroups(groupIds, options);
diff --git a/tools/src/main/java/org/apache/kafka/tools/GroupsCommand.java 
b/tools/src/main/java/org/apache/kafka/tools/GroupsCommand.java
new file mode 100644
index 00000000000..8f5046d6b8a
--- /dev/null
+++ b/tools/src/main/java/org/apache/kafka/tools/GroupsCommand.java
@@ -0,0 +1,295 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ *    http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.kafka.tools;
+
+import org.apache.kafka.clients.admin.Admin;
+import org.apache.kafka.clients.admin.AdminClientConfig;
+import org.apache.kafka.clients.admin.GroupListing;
+import org.apache.kafka.common.GroupType;
+import org.apache.kafka.common.utils.Exit;
+import org.apache.kafka.common.utils.Utils;
+import org.apache.kafka.server.util.CommandDefaultOptions;
+import org.apache.kafka.server.util.CommandLineUtils;
+
+import org.slf4j.Logger;
+import org.slf4j.LoggerFactory;
+
+import java.io.IOException;
+import java.util.ArrayList;
+import java.util.Collection;
+import java.util.List;
+import java.util.Optional;
+import java.util.Properties;
+import java.util.concurrent.ExecutionException;
+import java.util.concurrent.TimeUnit;
+import java.util.stream.Stream;
+
+import joptsimple.ArgumentAcceptingOptionSpec;
+import joptsimple.OptionSpec;
+import joptsimple.OptionSpecBuilder;
+
+public class GroupsCommand {
+    private static final Logger LOG = 
LoggerFactory.getLogger(GroupsCommand.class);
+
+    public static void main(String... args) {
+        Exit.exit(mainNoExit(args));
+    }
+
+    static int mainNoExit(String... args) {
+        try {
+            execute(args);
+            return 0;
+        } catch (Throwable e) {
+            System.err.println(e.getMessage());
+            System.err.println(Utils.stackTrace(e));
+            return 1;
+        }
+    }
+
+    static void execute(String... args) throws Exception {
+        GroupsCommandOptions opts = new GroupsCommandOptions(args);
+
+        Properties config = opts.commandConfig();
+        config.put(AdminClientConfig.BOOTSTRAP_SERVERS_CONFIG, 
opts.bootstrapServer());
+
+        int exitCode = 0;
+        try (GroupsService service = new GroupsService(config)) {
+            if (opts.hasListOption()) {
+                service.listGroups(opts);
+            }
+        } catch (ExecutionException e) {
+            Throwable cause = e.getCause();
+            if (cause != null) {
+                printException(cause);
+            } else {
+                printException(e);
+            }
+            exitCode = 1;
+        } catch (Throwable t) {
+            printException(t);
+            exitCode = 1;
+        } finally {
+            Exit.exit(exitCode);
+        }
+    }
+
+    public static class GroupsService implements AutoCloseable {
+        private final Admin adminClient;
+
+        public GroupsService(Properties config) {
+            this.adminClient = Admin.create(config);
+        }
+
+        // Visible for testing
+        GroupsService(Admin adminClient) {
+            this.adminClient = adminClient;
+        }
+
+        public void listGroups(GroupsCommandOptions opts) throws Exception {
+            Collection<GroupListing> resources = adminClient.listGroups()
+                    .all().get(30, TimeUnit.SECONDS);
+            printGroupDetails(resources, opts.groupType(), opts.protocol(), 
opts.hasConsumerOption(), opts.hasShareOption());
+        }
+
+        private void printGroupDetails(Collection<GroupListing> groups,
+                                       Optional<GroupType> groupTypeFilter,
+                                       Optional<String> protocolFilter,
+                                       boolean consumerGroupFilter,
+                                       boolean shareGroupFilter) {
+            List<List<String>> lineItems = new ArrayList<>();
+            int maxLen = 20;
+            for (GroupListing group : groups) {
+                if (combinedFilter(group, groupTypeFilter, protocolFilter, 
consumerGroupFilter, shareGroupFilter)) {
+                    List<String> lineItem = new ArrayList<>();
+                    lineItem.add(group.groupId());
+                    
lineItem.add(group.type().map(GroupType::toString).orElse(""));
+                    lineItem.add(group.protocol());
+                    for (String item : lineItem) {
+                        if (item != null) {
+                            maxLen = Math.max(maxLen, item.length());
+                        }
+                    }
+                    lineItems.add(lineItem);
+                }
+            }
+
+            String fmt = "%" + (-maxLen) + "s";
+            String header = fmt + " " + fmt + " " + fmt;
+            System.out.printf(header, "GROUP", "TYPE", "PROTOCOL");
+            System.out.println();
+            for (List<String> item : lineItems) {
+                for (String atom : item) {
+                    System.out.printf(fmt + " ", atom);
+                }
+                System.out.println();
+            }
+        }
+
+        private boolean combinedFilter(GroupListing group,
+                                       Optional<GroupType> groupTypeFilter,
+                                       Optional<String> protocolFilter,
+                                       boolean consumerGroupFilter,
+                                       boolean shareGroupFilter) {
+            boolean pass = true;
+            Optional<GroupType> groupType = group.type();
+            String protocol = group.protocol();
+
+            if (groupTypeFilter.isPresent()) {
+                pass = groupType.filter(gt -> gt == 
groupTypeFilter.get()).isPresent()
+                    && protocolFilter.map(protocol::equals).orElse(true);
+            } else if (protocolFilter.isPresent()) {
+                pass = protocol.equals(protocolFilter.get());
+            } else if (consumerGroupFilter) {
+                pass = protocol.equals("consumer") || protocol.isEmpty() || 
groupType.filter(gt -> gt == GroupType.CONSUMER).isPresent();
+            } else if (shareGroupFilter) {
+                pass = groupType.filter(gt -> gt == 
GroupType.SHARE).isPresent();
+            }
+            return pass;
+        }
+
+        @Override
+        public void close() throws Exception {
+            adminClient.close();
+        }
+    }
+
+    private static void printException(Throwable e) {
+        System.out.println("Error while executing groups command : " + 
e.getMessage());
+        LOG.error(Utils.stackTrace(e));
+    }
+
+    public static final class GroupsCommandOptions extends 
CommandDefaultOptions {
+        private final ArgumentAcceptingOptionSpec<String> bootstrapServerOpt;
+
+        private final ArgumentAcceptingOptionSpec<String> commandConfigOpt;
+
+        private final OptionSpecBuilder listOpt;
+
+        private final ArgumentAcceptingOptionSpec<String> groupTypeOpt;
+
+        private final ArgumentAcceptingOptionSpec<String> protocolOpt;
+
+        private final OptionSpecBuilder consumerOpt;
+
+        private final OptionSpecBuilder shareOpt;
+
+        public GroupsCommandOptions(String[] args) {
+            super(args);
+            bootstrapServerOpt = parser.accepts("bootstrap-server", "REQUIRED: 
The Kafka server to connect to.")
+                    .withRequiredArg()
+                    .describedAs("server to connect to")
+                    .required()
+                    .ofType(String.class);
+            commandConfigOpt = parser.accepts("command-config", "Property file 
containing configs to be passed to Admin Client.")
+                    .withRequiredArg()
+                    .describedAs("command config property file")
+                    .ofType(String.class);
+
+            listOpt = parser.accepts("list", "List the groups.");
+
+            groupTypeOpt = parser.accepts("group-type", "Filter the groups 
based on group type. "
+                            + "Valid types are: 'classic', 'consumer' and 
'share'.")
+                    .withRequiredArg()
+                    .describedAs("type")
+                    .ofType(String.class);
+
+            protocolOpt = parser.accepts("protocol", "Filter the groups based 
on protocol type.")
+                    .withRequiredArg()
+                    .describedAs("protocol")
+                    .ofType(String.class);
+
+            consumerOpt = parser.accepts("consumer", "Filter the groups to 
show all kinds of consumer groups, including classic and simple consumer 
groups. "
+                            + "This matches group type 'consumer', and group 
type 'classic' where the protocol type is 'consumer' or empty.");
+            shareOpt = parser.accepts("share", "Filter the groups to show 
share groups.");
+
+            options = parser.parse(args);
+
+            checkArgs();
+        }
+
+        public Boolean has(OptionSpec<?> builder) {
+            return options.has(builder);
+        }
+
+        public <A> Optional<A> valueAsOption(OptionSpec<A> option) {
+            return valueAsOption(option, Optional.empty());
+        }
+
+        public <A> Optional<A> valueAsOption(OptionSpec<A> option, Optional<A> 
defaultValue) {
+            if (has(option)) {
+                return Optional.of(options.valueOf(option));
+            } else {
+                return defaultValue;
+            }
+        }
+
+        public String bootstrapServer() {
+            return options.valueOf(bootstrapServerOpt);
+        }
+
+        public Properties commandConfig() throws IOException {
+            if (has(commandConfigOpt)) {
+                return Utils.loadProps(options.valueOf(commandConfigOpt));
+            } else {
+                return new Properties();
+            }
+        }
+
+        public Optional<GroupType> groupType() {
+            return valueAsOption(groupTypeOpt).map(GroupType::parse).filter(gt 
-> gt != GroupType.UNKNOWN);
+        }
+
+        public Optional<String> protocol() {
+            return valueAsOption(protocolOpt);
+        }
+
+        public boolean hasConsumerOption() {
+            return has(consumerOpt);
+        }
+
+        public boolean hasListOption() {
+            return has(listOpt);
+        }
+
+        public boolean hasShareOption() {
+            return has(shareOpt);
+        }
+
+        public void checkArgs() {
+            if (args.length == 0)
+                CommandLineUtils.printUsageAndExit(parser, "This tool helps to 
list groups of all types.");
+
+            CommandLineUtils.maybePrintHelpOrVersion(this, "This tool helps to 
list groups of all types.");
+
+            // should have exactly one action
+            long actions = Stream.of(listOpt).filter(options::has).count();
+            if (actions != 1)
+                CommandLineUtils.printUsageAndExit(parser, "Command must 
include exactly one action: --list.");
+
+            if (has(groupTypeOpt)) {
+                if (groupType().isEmpty()) {
+                    throw new IllegalArgumentException("--group-type must be a 
valid group type.");
+                }
+            }
+
+            // check invalid args
+            CommandLineUtils.checkInvalidArgs(parser, options, consumerOpt, 
groupTypeOpt, protocolOpt, shareOpt);
+            CommandLineUtils.checkInvalidArgs(parser, options, shareOpt, 
consumerOpt, groupTypeOpt, protocolOpt);
+        }
+    }
+}
diff --git a/tools/src/test/java/org/apache/kafka/tools/GroupsCommandTest.java 
b/tools/src/test/java/org/apache/kafka/tools/GroupsCommandTest.java
new file mode 100644
index 00000000000..cdea5583653
--- /dev/null
+++ b/tools/src/test/java/org/apache/kafka/tools/GroupsCommandTest.java
@@ -0,0 +1,381 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ *    http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.kafka.tools;
+
+import org.apache.kafka.clients.admin.Admin;
+import org.apache.kafka.clients.admin.AdminClientTestUtils;
+import org.apache.kafka.clients.admin.GroupListing;
+import org.apache.kafka.clients.admin.ListGroupsResult;
+import org.apache.kafka.common.GroupType;
+import org.apache.kafka.common.protocol.Errors;
+import org.apache.kafka.common.utils.Exit;
+
+import org.junit.jupiter.api.AfterEach;
+import org.junit.jupiter.api.BeforeEach;
+import org.junit.jupiter.api.Test;
+
+import java.util.Optional;
+import java.util.concurrent.ExecutionException;
+
+import static org.junit.jupiter.api.Assertions.assertEquals;
+import static org.junit.jupiter.api.Assertions.assertThrows;
+import static org.junit.jupiter.api.Assertions.assertTrue;
+import static org.junit.jupiter.api.Assertions.fail;
+import static org.mockito.Mockito.mock;
+import static org.mockito.Mockito.when;
+
+public class GroupsCommandTest {
+
+    private final String bootstrapServer = "localhost:9092";
+    private final ToolsTestUtils.MockExitProcedure exitProcedure = new 
ToolsTestUtils.MockExitProcedure();
+
+    @BeforeEach
+    public void setupExitProcedure() {
+        Exit.setExitProcedure(exitProcedure);
+    }
+
+    @AfterEach
+    public void resetExitProcedure() {
+        Exit.resetExitProcedure();
+    }
+
+    @Test
+    public void testOptionsNoActionFails() {
+        assertInitializeInvalidOptionsExitCode(1,
+                new String[] {"--bootstrap-server", bootstrapServer});
+    }
+
+    @Test
+    public void testOptionsListSucceeds() {
+        GroupsCommand.GroupsCommandOptions opts = new 
GroupsCommand.GroupsCommandOptions(
+                new String[] {"--bootstrap-server", bootstrapServer, 
"--list"});
+        assertTrue(opts.hasListOption());
+    }
+
+    @Test
+    public void testOptionsListConsumerFilterSucceeds() {
+        GroupsCommand.GroupsCommandOptions opts = new 
GroupsCommand.GroupsCommandOptions(
+                new String[] {"--bootstrap-server", bootstrapServer, "--list", 
"--consumer"});
+        assertTrue(opts.hasListOption());
+        assertTrue(opts.hasConsumerOption());
+    }
+
+    @Test
+    public void testOptionsListShareFilterSucceeds() {
+        GroupsCommand.GroupsCommandOptions opts = new 
GroupsCommand.GroupsCommandOptions(
+            new String[] {"--bootstrap-server", bootstrapServer, "--list", 
"--share"});
+        assertTrue(opts.hasListOption());
+        assertTrue(opts.hasShareOption());
+    }
+
+    @Test
+    public void testOptionsListProtocolFilterSucceeds() {
+        GroupsCommand.GroupsCommandOptions opts = new 
GroupsCommand.GroupsCommandOptions(
+                new String[] {"--bootstrap-server", bootstrapServer, "--list", 
"--protocol", "anyproto"});
+        assertTrue(opts.hasListOption());
+        assertTrue(opts.protocol().isPresent());
+        assertEquals("anyproto", opts.protocol().get());
+    }
+
+    @Test
+    public void testOptionsListTypeFilterSucceeds() {
+        GroupsCommand.GroupsCommandOptions opts = new 
GroupsCommand.GroupsCommandOptions(
+                new String[] {"--bootstrap-server", bootstrapServer, "--list", 
"--group-type", "share"});
+        assertTrue(opts.hasListOption());
+        assertTrue(opts.groupType().isPresent());
+        assertEquals(GroupType.SHARE, opts.groupType().get());
+    }
+
+    @Test
+    public void testOptionsListInvalidTypeFilterFails() {
+        assertInitializeInvalidOptionsExitCode(1,
+                new String[] {"--bootstrap-server", bootstrapServer, "--list", 
"--group-type", "invalid"});
+    }
+
+    @Test
+    public void testOptionsListProtocolAndTypeFiltersSucceeds() {
+        GroupsCommand.GroupsCommandOptions opts = new 
GroupsCommand.GroupsCommandOptions(
+                new String[] {"--bootstrap-server", bootstrapServer, "--list", 
"--protocol", "anyproto", "--group-type", "share"});
+        assertTrue(opts.hasListOption());
+        assertTrue(opts.protocol().isPresent());
+        assertEquals("anyproto", opts.protocol().get());
+        assertTrue(opts.groupType().isPresent());
+        assertEquals(GroupType.SHARE, opts.groupType().get());
+    }
+
+    @Test
+    public void testOptionsListConsumerAndShareFilterFails() {
+        assertInitializeInvalidOptionsExitCode(1,
+            new String[] {"--bootstrap-server", bootstrapServer, "--list", 
"--consumer", "--share"});
+    }
+
+    @Test
+    public void testOptionsListConsumerAndProtocolFilterFails() {
+        assertInitializeInvalidOptionsExitCode(1,
+                new String[] {"--bootstrap-server", bootstrapServer, "--list", 
"--consumer", "--protocol", "anyproto"});
+    }
+
+    @Test
+    public void testOptionsListConsumerAndTypeFilterFails() {
+        assertInitializeInvalidOptionsExitCode(1,
+                new String[] {"--bootstrap-server", bootstrapServer, "--list", 
"--consumer", "--group-type", "share"});
+    }
+
+    @Test
+    public void testOptionsListShareAndProtocolFilterFails() {
+        assertInitializeInvalidOptionsExitCode(1,
+            new String[] {"--bootstrap-server", bootstrapServer, "--list", 
"--share", "--protocol", "anyproto"});
+    }
+
+    @Test
+    public void testOptionsListShareAndTypeFilterFails() {
+        assertInitializeInvalidOptionsExitCode(1,
+            new String[] {"--bootstrap-server", bootstrapServer, "--list", 
"--share", "--group-type", "classic"});
+    }
+
+    @Test
+    public void testListGroupsEmpty() {
+        Admin adminClient = mock(Admin.class);
+        GroupsCommand.GroupsService service = new 
GroupsCommand.GroupsService(adminClient);
+
+        ListGroupsResult result = AdminClientTestUtils.listGroupsResult();
+        when(adminClient.listGroups()).thenReturn(result);
+
+        String capturedOutput = ToolsTestUtils.captureStandardOut(() -> {
+            try {
+                service.listGroups(new GroupsCommand.GroupsCommandOptions(
+                        new String[]{"--bootstrap-server", bootstrapServer, 
"--list"}
+                ));
+            } catch (Throwable t) {
+                fail(t);
+            }
+        });
+        assertCapturedListOutput(capturedOutput);
+    }
+
+    @Test
+    public void testListGroups() {
+        Admin adminClient = mock(Admin.class);
+        GroupsCommand.GroupsService service = new 
GroupsCommand.GroupsService(adminClient);
+
+        ListGroupsResult result = AdminClientTestUtils.listGroupsResult(
+                new GroupListing("CGclassic", Optional.of(GroupType.CLASSIC), 
"consumer"),
+                new GroupListing("CGconsumer", 
Optional.of(GroupType.CONSUMER), "consumer"),
+                new GroupListing("SG", Optional.of(GroupType.SHARE), "share")
+        );
+        when(adminClient.listGroups()).thenReturn(result);
+
+        String capturedOutput = ToolsTestUtils.captureStandardOut(() -> {
+            try {
+                service.listGroups(new GroupsCommand.GroupsCommandOptions(
+                        new String[]{"--bootstrap-server", bootstrapServer, 
"--list"}
+                ));
+            } catch (Throwable t) {
+                fail(t);
+            }
+        });
+        assertCapturedListOutput(capturedOutput,
+                new String[]{"CGclassic", "Classic", "consumer"},
+                new String[]{"CGconsumer", "Consumer", "consumer"},
+                new String[]{"SG", "Share", "share"});
+    }
+
+    @Test
+    public void testListGroupsConsumerFilter() {
+        Admin adminClient = mock(Admin.class);
+        GroupsCommand.GroupsService service = new 
GroupsCommand.GroupsService(adminClient);
+
+        ListGroupsResult result = AdminClientTestUtils.listGroupsResult(
+                new GroupListing("CGclassic", Optional.of(GroupType.CLASSIC), 
"consumer"),
+                new GroupListing("CGconsumer", 
Optional.of(GroupType.CONSUMER), "consumer"),
+                new GroupListing("SG", Optional.of(GroupType.SHARE), "share")
+        );
+        when(adminClient.listGroups()).thenReturn(result);
+
+        String capturedOutput = ToolsTestUtils.captureStandardOut(() -> {
+            try {
+                service.listGroups(new GroupsCommand.GroupsCommandOptions(
+                        new String[]{"--bootstrap-server", bootstrapServer, 
"--list", "--consumer"}
+                ));
+            } catch (Throwable t) {
+                fail(t);
+            }
+        });
+        assertCapturedListOutput(capturedOutput,
+                new String[]{"CGclassic", "Classic", "consumer"},
+                new String[]{"CGconsumer", "Consumer", "consumer"});
+    }
+
+    @Test
+    public void testListGroupsShareFilter() {
+        Admin adminClient = mock(Admin.class);
+        GroupsCommand.GroupsService service = new 
GroupsCommand.GroupsService(adminClient);
+
+        ListGroupsResult result = AdminClientTestUtils.listGroupsResult(
+            new GroupListing("CGclassic", Optional.of(GroupType.CLASSIC), 
"consumer"),
+            new GroupListing("CGconsumer", Optional.of(GroupType.CONSUMER), 
"consumer"),
+            new GroupListing("SG", Optional.of(GroupType.SHARE), "share")
+        );
+        when(adminClient.listGroups()).thenReturn(result);
+
+        String capturedOutput = ToolsTestUtils.captureStandardOut(() -> {
+            try {
+                service.listGroups(new GroupsCommand.GroupsCommandOptions(
+                    new String[]{"--bootstrap-server", bootstrapServer, 
"--list", "--share"}
+                ));
+            } catch (Throwable t) {
+                fail(t);
+            }
+        });
+        assertCapturedListOutput(capturedOutput,
+            new String[]{"SG", "Share", "share"});
+    }
+
+    @Test
+    public void testListGroupsProtocolFilter() {
+        Admin adminClient = mock(Admin.class);
+        GroupsCommand.GroupsService service = new 
GroupsCommand.GroupsService(adminClient);
+
+        ListGroupsResult result = AdminClientTestUtils.listGroupsResult(
+                new GroupListing("CGclassic", Optional.of(GroupType.CLASSIC), 
"consumer"),
+                new GroupListing("CGconsumer", 
Optional.of(GroupType.CONSUMER), "consumer"),
+                new GroupListing("SG", Optional.of(GroupType.SHARE), "share")
+        );
+        when(adminClient.listGroups()).thenReturn(result);
+
+        String capturedOutput = ToolsTestUtils.captureStandardOut(() -> {
+            try {
+                service.listGroups(new GroupsCommand.GroupsCommandOptions(
+                        new String[]{"--bootstrap-server", bootstrapServer, 
"--list", "--protocol", "consumer"}
+                ));
+            } catch (Throwable t) {
+                fail(t);
+            }
+        });
+        assertCapturedListOutput(capturedOutput,
+                new String[]{"CGclassic", "Classic", "consumer"},
+                new String[]{"CGconsumer", "Consumer", "consumer"});
+    }
+
+    @Test
+    public void testListGroupsTypeFilter() {
+        Admin adminClient = mock(Admin.class);
+        GroupsCommand.GroupsService service = new 
GroupsCommand.GroupsService(adminClient);
+
+        ListGroupsResult result = AdminClientTestUtils.listGroupsResult(
+                new GroupListing("CGclassic", Optional.of(GroupType.CLASSIC), 
"consumer"),
+                new GroupListing("CGconsumer", 
Optional.of(GroupType.CONSUMER), "consumer"),
+                new GroupListing("SG", Optional.of(GroupType.SHARE), "share")
+        );
+        when(adminClient.listGroups()).thenReturn(result);
+
+        String capturedOutput = ToolsTestUtils.captureStandardOut(() -> {
+            try {
+                service.listGroups(new GroupsCommand.GroupsCommandOptions(
+                        new String[]{"--bootstrap-server", bootstrapServer, 
"--list", "--group-type", "share"}
+                ));
+            } catch (Throwable t) {
+                fail(t);
+            }
+        });
+        assertCapturedListOutput(capturedOutput,
+                new String[]{"SG", "Share", "share"});
+    }
+
+    @Test
+    public void testListGroupsProtocolAndTypeFilter() {
+        Admin adminClient = mock(Admin.class);
+        GroupsCommand.GroupsService service = new 
GroupsCommand.GroupsService(adminClient);
+
+        ListGroupsResult result = AdminClientTestUtils.listGroupsResult(
+                new GroupListing("CGclassic", Optional.of(GroupType.CLASSIC), 
"consumer"),
+                new GroupListing("CGconsumer", 
Optional.of(GroupType.CONSUMER), "consumer"),
+                new GroupListing("SG", Optional.of(GroupType.SHARE), "share")
+        );
+        when(adminClient.listGroups()).thenReturn(result);
+
+        String capturedOutput = ToolsTestUtils.captureStandardOut(() -> {
+            try {
+                service.listGroups(new GroupsCommand.GroupsCommandOptions(
+                        new String[]{"--bootstrap-server", bootstrapServer, 
"--list", "--protocol", "consumer", "--group-type", "classic"}
+                ));
+            } catch (Throwable t) {
+                fail(t);
+            }
+        });
+        assertCapturedListOutput(capturedOutput,
+                new String[]{"CGclassic", "Classic", "consumer"});
+    }
+
+    @Test
+    public void testListGroupsProtocolAndTypeFilterNoMatch() {
+        Admin adminClient = mock(Admin.class);
+        GroupsCommand.GroupsService service = new 
GroupsCommand.GroupsService(adminClient);
+
+        ListGroupsResult result = AdminClientTestUtils.listGroupsResult(
+                new GroupListing("CGconsumer", 
Optional.of(GroupType.CONSUMER), "consumer"),
+                new GroupListing("SG", Optional.of(GroupType.SHARE), "share")
+        );
+        when(adminClient.listGroups()).thenReturn(result);
+
+        String capturedOutput = ToolsTestUtils.captureStandardOut(() -> {
+            try {
+                service.listGroups(new GroupsCommand.GroupsCommandOptions(
+                        new String[]{"--bootstrap-server", bootstrapServer, 
"--list", "--protocol", "consumer", "--group-type", "classic"}
+                ));
+            } catch (Throwable t) {
+                fail(t);
+            }
+        });
+        assertCapturedListOutput(capturedOutput);
+    }
+
+    @Test
+    public void testListGroupsFailsWithException() {
+        Admin adminClient = mock(Admin.class);
+        GroupsCommand.GroupsService service = new 
GroupsCommand.GroupsService(adminClient);
+
+        ListGroupsResult result = 
AdminClientTestUtils.listGroupsResult(Errors.COORDINATOR_NOT_AVAILABLE.exception());
+        when(adminClient.listGroups()).thenReturn(result);
+
+        assertThrows(ExecutionException.class, () -> service.listGroups(new 
GroupsCommand.GroupsCommandOptions(
+            new String[]{"--bootstrap-server", bootstrapServer, "--list"}
+        )));
+    }
+
+    private void assertInitializeInvalidOptionsExitCode(int expected, String[] 
options) {
+        Exit.setExitProcedure((exitCode, message) -> {
+            assertEquals(expected, exitCode);
+            throw new RuntimeException();
+        });
+        try {
+            assertThrows(RuntimeException.class, () -> new 
GroupsCommand.GroupsCommandOptions(options));
+        } finally {
+            Exit.resetExitProcedure();
+        }
+    }
+
+    private void assertCapturedListOutput(String capturedOutput, String[]... 
expectedLines) {
+        String[] capturedLines = capturedOutput.split("\n");
+        assertEquals(expectedLines.length + 1, capturedLines.length);
+        assertEquals("GROUP,TYPE,PROTOCOL", String.join(",", 
capturedLines[0].split(" +")));
+        int i = 1;
+        for (String[] line : expectedLines) {
+            assertEquals(String.join(",", line), String.join(",", 
capturedLines[i++].split(" +")));
+        }
+    }
+}

Reply via email to