This is an automated email from the ASF dual-hosted git repository.

schofielaj pushed a commit to branch trunk
in repository https://gitbox.apache.org/repos/asf/kafka.git


The following commit(s) were added to refs/heads/trunk by this push:
     new 3fc103b48b6 KAFKA-18629: ShareGroupDeleteState admin client impl. 
(#18928)
3fc103b48b6 is described below

commit 3fc103b48b65fc37d8ae22d4d7c36104ae1b3b35
Author: Sushant Mahajan <[email protected]>
AuthorDate: Sat Feb 22 21:51:10 2025 +0530

    KAFKA-18629: ShareGroupDeleteState admin client impl. (#18928)
    
    * In this PR, we add various infra classes needed to support the
    `deleteShareGroups` functionality via the `kafka-share-groups.sh`
    script, as well as the implementation of `kafka-share-groups.sh --delete`.
    
    Reviewers: Andrew Schofield <[email protected]>
---
 .../java/org/apache/kafka/clients/admin/Admin.java |  17 ++
 .../clients/admin/DeleteConsumerGroupsResult.java  |  33 +--
 ...erGroupsResult.java => DeleteGroupsResult.java} |  11 +-
 ...psResult.java => DeleteShareGroupsOptions.java} |  32 +--
 .../clients/admin/DeleteShareGroupsResult.java     |  32 +++
 .../kafka/clients/admin/ForwardingAdmin.java       |   5 +
 .../kafka/clients/admin/KafkaAdminClient.java      |  11 +
 .../internals/DeleteConsumerGroupsHandler.java     | 116 +--------
 ...GroupsHandler.java => DeleteGroupsHandler.java} |  39 ++-
 .../admin/internals/DeleteShareGroupsHandler.java  |  38 +++
 .../kafka/clients/admin/MockAdminClient.java       |   5 +
 .../internals/DeleteConsumerGroupsHandlerTest.java | 112 +--------
 ...ndlerTest.java => DeleteGroupsHandlerTest.java} |  16 +-
 .../internals/DeleteShareGroupsHandlerTest.java    |  26 ++
 .../TestingMetricsInterceptingAdminClient.java     |   7 +
 .../tools/consumer/group/ShareGroupCommand.java    |  84 ++++++-
 .../consumer/group/ShareGroupCommandOptions.java   |   7 +-
 .../consumer/group/ShareGroupCommandTest.java      | 276 +++++++++++++++++++++
 18 files changed, 558 insertions(+), 309 deletions(-)

diff --git a/clients/src/main/java/org/apache/kafka/clients/admin/Admin.java 
b/clients/src/main/java/org/apache/kafka/clients/admin/Admin.java
index b0248b43842..d7479b5ba1a 100644
--- a/clients/src/main/java/org/apache/kafka/clients/admin/Admin.java
+++ b/clients/src/main/java/org/apache/kafka/clients/admin/Admin.java
@@ -1841,6 +1841,23 @@ public interface Admin extends AutoCloseable {
         return listShareGroupOffsets(groupSpecs, new 
ListShareGroupOffsetsOptions());
     }
 
+    /**
+     * Delete share groups from the cluster with the default options.
+     *
+     * @return The DeleteShareGroupsResult.
+     */
+    default DeleteShareGroupsResult deleteShareGroups(Collection<String> 
groupIds) {
+        return deleteShareGroups(groupIds, new DeleteShareGroupsOptions());
+    }
+
+    /**
+     * Delete share groups from the cluster.
+     *
+     * @param options The options to use when deleting a share group.
+     * @return The DeleteShareGroupsResult.
+     */
+    DeleteShareGroupsResult deleteShareGroups(Collection<String> groupIds, 
DeleteShareGroupsOptions options);
+
     /**
      * Describe some classic groups in the cluster.
      *
diff --git 
a/clients/src/main/java/org/apache/kafka/clients/admin/DeleteConsumerGroupsResult.java
 
b/clients/src/main/java/org/apache/kafka/clients/admin/DeleteConsumerGroupsResult.java
index 5a7ad396935..902dfd101c1 100644
--- 
a/clients/src/main/java/org/apache/kafka/clients/admin/DeleteConsumerGroupsResult.java
+++ 
b/clients/src/main/java/org/apache/kafka/clients/admin/DeleteConsumerGroupsResult.java
@@ -14,42 +14,19 @@
  * See the License for the specific language governing permissions and
  * limitations under the License.
  */
+
 package org.apache.kafka.clients.admin;
 
 import org.apache.kafka.common.KafkaFuture;
-import org.apache.kafka.common.annotation.InterfaceStability;
 
 import java.util.Collection;
-import java.util.HashMap;
 import java.util.Map;
 
 /**
- * The result of the {@link Admin#deleteConsumerGroups(Collection)} call.
- *
- * The API of this class is evolving, see {@link Admin} for details.
+ * The result of the {@link Admin#deleteConsumerGroups(Collection <String>, 
DeleteConsumerGroupsOptions)} call.
  */
[email protected]
-public class DeleteConsumerGroupsResult {
-    private final Map<String, KafkaFuture<Void>> futures;
-
-    DeleteConsumerGroupsResult(final Map<String, KafkaFuture<Void>> futures) {
-        this.futures = futures;
-    }
-
-    /**
-     * Return a map from group id to futures which can be used to check the 
status of
-     * individual deletions.
-     */
-    public Map<String, KafkaFuture<Void>> deletedGroups() {
-        Map<String, KafkaFuture<Void>> deletedGroups = new 
HashMap<>(futures.size());
-        deletedGroups.putAll(futures);
-        return deletedGroups;
-    }
-
-    /**
-     * Return a future which succeeds only if all the consumer group deletions 
succeed.
-     */
-    public KafkaFuture<Void> all() {
-        return KafkaFuture.allOf(futures.values().toArray(new 
KafkaFuture<?>[0]));
+public class DeleteConsumerGroupsResult extends DeleteGroupsResult {
+    public DeleteConsumerGroupsResult(final Map<String, KafkaFuture<Void>> 
futures) {
+        super(futures);
     }
 }
diff --git 
a/clients/src/main/java/org/apache/kafka/clients/admin/DeleteConsumerGroupsResult.java
 b/clients/src/main/java/org/apache/kafka/clients/admin/DeleteGroupsResult.java
similarity index 83%
copy from 
clients/src/main/java/org/apache/kafka/clients/admin/DeleteConsumerGroupsResult.java
copy to 
clients/src/main/java/org/apache/kafka/clients/admin/DeleteGroupsResult.java
index 5a7ad396935..2be42025e04 100644
--- 
a/clients/src/main/java/org/apache/kafka/clients/admin/DeleteConsumerGroupsResult.java
+++ 
b/clients/src/main/java/org/apache/kafka/clients/admin/DeleteGroupsResult.java
@@ -24,15 +24,16 @@ import java.util.HashMap;
 import java.util.Map;
 
 /**
- * The result of the {@link Admin#deleteConsumerGroups(Collection)} call.
- *
+ * The parent class of result of the {@link 
Admin#deleteConsumerGroups(Collection)},
+ * {@link Admin#deleteShareGroups(Collection)} calls.
+ * <p>
  * The API of this class is evolving, see {@link Admin} for details.
  */
 @InterfaceStability.Evolving
-public class DeleteConsumerGroupsResult {
+public abstract class DeleteGroupsResult {
     private final Map<String, KafkaFuture<Void>> futures;
 
-    DeleteConsumerGroupsResult(final Map<String, KafkaFuture<Void>> futures) {
+    DeleteGroupsResult(final Map<String, KafkaFuture<Void>> futures) {
         this.futures = futures;
     }
 
@@ -47,7 +48,7 @@ public class DeleteConsumerGroupsResult {
     }
 
     /**
-     * Return a future which succeeds only if all the consumer group deletions 
succeed.
+     * Return a future which succeeds only if all the group deletions succeed.
      */
     public KafkaFuture<Void> all() {
         return KafkaFuture.allOf(futures.values().toArray(new 
KafkaFuture<?>[0]));
diff --git 
a/clients/src/main/java/org/apache/kafka/clients/admin/DeleteConsumerGroupsResult.java
 
b/clients/src/main/java/org/apache/kafka/clients/admin/DeleteShareGroupsOptions.java
similarity index 51%
copy from 
clients/src/main/java/org/apache/kafka/clients/admin/DeleteConsumerGroupsResult.java
copy to 
clients/src/main/java/org/apache/kafka/clients/admin/DeleteShareGroupsOptions.java
index 5a7ad396935..a41ec6d00b3 100644
--- 
a/clients/src/main/java/org/apache/kafka/clients/admin/DeleteConsumerGroupsResult.java
+++ 
b/clients/src/main/java/org/apache/kafka/clients/admin/DeleteShareGroupsOptions.java
@@ -14,42 +14,18 @@
  * See the License for the specific language governing permissions and
  * limitations under the License.
  */
+
 package org.apache.kafka.clients.admin;
 
-import org.apache.kafka.common.KafkaFuture;
 import org.apache.kafka.common.annotation.InterfaceStability;
 
 import java.util.Collection;
-import java.util.HashMap;
-import java.util.Map;
 
 /**
- * The result of the {@link Admin#deleteConsumerGroups(Collection)} call.
- *
+ * Options for the {@link Admin#deleteShareGroups(Collection <String>, 
DeleteShareGroupsOptions)} call.
+ * <p>
  * The API of this class is evolving, see {@link Admin} for details.
  */
 @InterfaceStability.Evolving
-public class DeleteConsumerGroupsResult {
-    private final Map<String, KafkaFuture<Void>> futures;
-
-    DeleteConsumerGroupsResult(final Map<String, KafkaFuture<Void>> futures) {
-        this.futures = futures;
-    }
-
-    /**
-     * Return a map from group id to futures which can be used to check the 
status of
-     * individual deletions.
-     */
-    public Map<String, KafkaFuture<Void>> deletedGroups() {
-        Map<String, KafkaFuture<Void>> deletedGroups = new 
HashMap<>(futures.size());
-        deletedGroups.putAll(futures);
-        return deletedGroups;
-    }
-
-    /**
-     * Return a future which succeeds only if all the consumer group deletions 
succeed.
-     */
-    public KafkaFuture<Void> all() {
-        return KafkaFuture.allOf(futures.values().toArray(new 
KafkaFuture<?>[0]));
-    }
+public class DeleteShareGroupsOptions extends 
AbstractOptions<DeleteShareGroupsOptions> {
 }
diff --git 
a/clients/src/main/java/org/apache/kafka/clients/admin/DeleteShareGroupsResult.java
 
b/clients/src/main/java/org/apache/kafka/clients/admin/DeleteShareGroupsResult.java
new file mode 100644
index 00000000000..8abd3e73263
--- /dev/null
+++ 
b/clients/src/main/java/org/apache/kafka/clients/admin/DeleteShareGroupsResult.java
@@ -0,0 +1,32 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ *    http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.kafka.clients.admin;
+
+import org.apache.kafka.common.KafkaFuture;
+
+import java.util.Collection;
+import java.util.Map;
+
+/**
+ * The result of the {@link Admin#deleteShareGroups(Collection <String>, 
DeleteShareGroupsOptions)} call.
+ */
+public class DeleteShareGroupsResult extends DeleteGroupsResult {
+    DeleteShareGroupsResult(final Map<String, KafkaFuture<Void>> futures) {
+        super(futures);
+    }
+}
diff --git 
a/clients/src/main/java/org/apache/kafka/clients/admin/ForwardingAdmin.java 
b/clients/src/main/java/org/apache/kafka/clients/admin/ForwardingAdmin.java
index b52688e7d3b..ee2ac0942a9 100644
--- a/clients/src/main/java/org/apache/kafka/clients/admin/ForwardingAdmin.java
+++ b/clients/src/main/java/org/apache/kafka/clients/admin/ForwardingAdmin.java
@@ -308,6 +308,11 @@ public class ForwardingAdmin implements Admin {
         return delegate.listShareGroupOffsets(groupSpecs, options);
     }
 
+    @Override
+    public DeleteShareGroupsResult deleteShareGroups(Collection<String> 
groupIds, DeleteShareGroupsOptions options) {
+        return delegate.deleteShareGroups(groupIds, options);
+    }
+
     @Override
     public ListGroupsResult listGroups(ListGroupsOptions options) {
         return delegate.listGroups(options);
diff --git 
a/clients/src/main/java/org/apache/kafka/clients/admin/KafkaAdminClient.java 
b/clients/src/main/java/org/apache/kafka/clients/admin/KafkaAdminClient.java
index 7c8842d4061..3b31efec17a 100644
--- a/clients/src/main/java/org/apache/kafka/clients/admin/KafkaAdminClient.java
+++ b/clients/src/main/java/org/apache/kafka/clients/admin/KafkaAdminClient.java
@@ -50,6 +50,7 @@ import 
org.apache.kafka.clients.admin.internals.CoordinatorKey;
 import 
org.apache.kafka.clients.admin.internals.DeleteConsumerGroupOffsetsHandler;
 import org.apache.kafka.clients.admin.internals.DeleteConsumerGroupsHandler;
 import org.apache.kafka.clients.admin.internals.DeleteRecordsHandler;
+import org.apache.kafka.clients.admin.internals.DeleteShareGroupsHandler;
 import org.apache.kafka.clients.admin.internals.DescribeClassicGroupsHandler;
 import org.apache.kafka.clients.admin.internals.DescribeConsumerGroupsHandler;
 import org.apache.kafka.clients.admin.internals.DescribeProducersHandler;
@@ -3829,6 +3830,16 @@ public class KafkaAdminClient extends AdminClient {
             .collect(Collectors.toMap(entry -> entry.getKey().idValue, 
Map.Entry::getValue)));
     }
 
+    @Override
+    public DeleteShareGroupsResult deleteShareGroups(Collection<String> 
groupIds, DeleteShareGroupsOptions options) {
+        SimpleAdminApiFuture<CoordinatorKey, Void> future =
+            DeleteShareGroupsHandler.newFuture(groupIds);
+        DeleteShareGroupsHandler handler = new 
DeleteShareGroupsHandler(logContext);
+        invokeDriver(handler, future, options.timeoutMs);
+        return new DeleteShareGroupsResult(future.all().entrySet().stream()
+            .collect(Collectors.toMap(entry -> entry.getKey().idValue, 
Map.Entry::getValue)));
+    }
+
     @Override
     public Map<MetricName, ? extends Metric> metrics() {
         return Collections.unmodifiableMap(this.metrics.metrics());
diff --git 
a/clients/src/main/java/org/apache/kafka/clients/admin/internals/DeleteConsumerGroupsHandler.java
 
b/clients/src/main/java/org/apache/kafka/clients/admin/internals/DeleteConsumerGroupsHandler.java
index 0d581243ddc..c3b248838f3 100644
--- 
a/clients/src/main/java/org/apache/kafka/clients/admin/internals/DeleteConsumerGroupsHandler.java
+++ 
b/clients/src/main/java/org/apache/kafka/clients/admin/internals/DeleteConsumerGroupsHandler.java
@@ -16,37 +16,14 @@
  */
 package org.apache.kafka.clients.admin.internals;
 
-import org.apache.kafka.common.Node;
-import org.apache.kafka.common.message.DeleteGroupsRequestData;
-import 
org.apache.kafka.common.message.DeleteGroupsResponseData.DeletableGroupResult;
-import org.apache.kafka.common.protocol.Errors;
-import org.apache.kafka.common.requests.AbstractResponse;
-import org.apache.kafka.common.requests.DeleteGroupsRequest;
-import org.apache.kafka.common.requests.DeleteGroupsResponse;
-import org.apache.kafka.common.requests.FindCoordinatorRequest.CoordinatorType;
 import org.apache.kafka.common.utils.LogContext;
 
-import org.slf4j.Logger;
-
-import java.util.ArrayList;
-import java.util.Collection;
-import java.util.HashMap;
-import java.util.HashSet;
-import java.util.List;
-import java.util.Map;
-import java.util.Set;
-import java.util.stream.Collectors;
-
-public class DeleteConsumerGroupsHandler extends 
AdminApiHandler.Batched<CoordinatorKey, Void> {
-
-    private final Logger log;
-    private final AdminApiLookupStrategy<CoordinatorKey> lookupStrategy;
+public class DeleteConsumerGroupsHandler extends DeleteGroupsHandler {
 
     public DeleteConsumerGroupsHandler(
         LogContext logContext
     ) {
-        this.log = logContext.logger(DeleteConsumerGroupsHandler.class);
-        this.lookupStrategy = new CoordinatorStrategy(CoordinatorType.GROUP, 
logContext);
+        super(logContext, DeleteConsumerGroupsHandler.class);
     }
 
     @Override
@@ -55,92 +32,7 @@ public class DeleteConsumerGroupsHandler extends 
AdminApiHandler.Batched<Coordin
     }
 
     @Override
-    public AdminApiLookupStrategy<CoordinatorKey> lookupStrategy() {
-        return lookupStrategy;
-    }
-
-    public static AdminApiFuture.SimpleAdminApiFuture<CoordinatorKey, Void> 
newFuture(
-        Collection<String> groupIds
-    ) {
-        return AdminApiFuture.forKeys(buildKeySet(groupIds));
-    }
-
-    private static Set<CoordinatorKey> buildKeySet(Collection<String> 
groupIds) {
-        return groupIds.stream()
-            .map(CoordinatorKey::byGroupId)
-            .collect(Collectors.toSet());
-    }
-
-    @Override
-    public DeleteGroupsRequest.Builder buildBatchedRequest(
-        int coordinatorId,
-        Set<CoordinatorKey> keys
-    ) {
-        List<String> groupIds = keys.stream().map(key -> 
key.idValue).collect(Collectors.toList());
-        DeleteGroupsRequestData data = new DeleteGroupsRequestData()
-            .setGroupsNames(groupIds);
-        return new DeleteGroupsRequest.Builder(data);
-    }
-
-    @Override
-    public ApiResult<CoordinatorKey, Void> handleResponse(
-        Node coordinator,
-        Set<CoordinatorKey> groupIds,
-        AbstractResponse abstractResponse
-    ) {
-        final DeleteGroupsResponse response = (DeleteGroupsResponse) 
abstractResponse;
-        final Map<CoordinatorKey, Void> completed = new HashMap<>();
-        final Map<CoordinatorKey, Throwable> failed = new HashMap<>();
-        final Set<CoordinatorKey> groupsToUnmap = new HashSet<>();
-
-        for (DeletableGroupResult deletedGroup : response.data().results()) {
-            CoordinatorKey groupIdKey = 
CoordinatorKey.byGroupId(deletedGroup.groupId());
-            Errors error = Errors.forCode(deletedGroup.errorCode());
-            if (error != Errors.NONE) {
-                handleError(groupIdKey, error, failed, groupsToUnmap);
-                continue;
-            }
-
-            completed.put(groupIdKey, null);
-        }
-
-        return new ApiResult<>(completed, failed, new 
ArrayList<>(groupsToUnmap));
+    public String displayName() {
+        return "DeleteConsumerGroups";
     }
-
-    private void handleError(
-        CoordinatorKey groupId,
-        Errors error,
-        Map<CoordinatorKey, Throwable> failed,
-        Set<CoordinatorKey> groupsToUnmap
-    ) {
-        switch (error) {
-            case GROUP_AUTHORIZATION_FAILED:
-            case INVALID_GROUP_ID:
-            case NON_EMPTY_GROUP:
-            case GROUP_ID_NOT_FOUND:
-                log.debug("`DeleteConsumerGroups` request for group id {} 
failed due to error {}", groupId.idValue, error);
-                failed.put(groupId, error.exception());
-                break;
-
-            case COORDINATOR_LOAD_IN_PROGRESS:
-                // If the coordinator is in the middle of loading, then we 
just need to retry
-                log.debug("`DeleteConsumerGroups` request for group id {} 
failed because the coordinator " +
-                    "is still in the process of loading state. Will retry", 
groupId.idValue);
-                break;
-
-            case COORDINATOR_NOT_AVAILABLE:
-            case NOT_COORDINATOR:
-                // If the coordinator is unavailable or there was a 
coordinator change, then we unmap
-                // the key so that we retry the `FindCoordinator` request
-                log.debug("`DeleteConsumerGroups` request for group id {} 
returned error {}. " +
-                    "Will attempt to find the coordinator again and retry", 
groupId.idValue, error);
-                groupsToUnmap.add(groupId);
-                break;
-
-            default:
-                log.error("`DeleteConsumerGroups` request for group id {} 
failed due to unexpected error {}", groupId.idValue, error);
-                failed.put(groupId, error.exception());
-        }
-    }
-
 }
diff --git 
a/clients/src/main/java/org/apache/kafka/clients/admin/internals/DeleteConsumerGroupsHandler.java
 
b/clients/src/main/java/org/apache/kafka/clients/admin/internals/DeleteGroupsHandler.java
similarity index 79%
copy from 
clients/src/main/java/org/apache/kafka/clients/admin/internals/DeleteConsumerGroupsHandler.java
copy to 
clients/src/main/java/org/apache/kafka/clients/admin/internals/DeleteGroupsHandler.java
index 0d581243ddc..bf8b7a2210b 100644
--- 
a/clients/src/main/java/org/apache/kafka/clients/admin/internals/DeleteConsumerGroupsHandler.java
+++ 
b/clients/src/main/java/org/apache/kafka/clients/admin/internals/DeleteGroupsHandler.java
@@ -14,16 +14,17 @@
  * See the License for the specific language governing permissions and
  * limitations under the License.
  */
+
 package org.apache.kafka.clients.admin.internals;
 
 import org.apache.kafka.common.Node;
 import org.apache.kafka.common.message.DeleteGroupsRequestData;
-import 
org.apache.kafka.common.message.DeleteGroupsResponseData.DeletableGroupResult;
+import org.apache.kafka.common.message.DeleteGroupsResponseData;
 import org.apache.kafka.common.protocol.Errors;
 import org.apache.kafka.common.requests.AbstractResponse;
 import org.apache.kafka.common.requests.DeleteGroupsRequest;
 import org.apache.kafka.common.requests.DeleteGroupsResponse;
-import org.apache.kafka.common.requests.FindCoordinatorRequest.CoordinatorType;
+import org.apache.kafka.common.requests.FindCoordinatorRequest;
 import org.apache.kafka.common.utils.LogContext;
 
 import org.slf4j.Logger;
@@ -37,22 +38,21 @@ import java.util.Map;
 import java.util.Set;
 import java.util.stream.Collectors;
 
-public class DeleteConsumerGroupsHandler extends 
AdminApiHandler.Batched<CoordinatorKey, Void> {
-
+public abstract class DeleteGroupsHandler extends 
AdminApiHandler.Batched<CoordinatorKey, Void> {
     private final Logger log;
     private final AdminApiLookupStrategy<CoordinatorKey> lookupStrategy;
 
-    public DeleteConsumerGroupsHandler(
-        LogContext logContext
+    public DeleteGroupsHandler(
+        LogContext logContext,
+        Class<?> loggerClass
     ) {
-        this.log = logContext.logger(DeleteConsumerGroupsHandler.class);
-        this.lookupStrategy = new CoordinatorStrategy(CoordinatorType.GROUP, 
logContext);
+        this.log = logContext.logger(loggerClass);
+        this.lookupStrategy = new 
CoordinatorStrategy(FindCoordinatorRequest.CoordinatorType.GROUP, logContext);
     }
 
-    @Override
-    public String apiName() {
-        return "deleteConsumerGroups";
-    }
+    public abstract String apiName();
+
+    public abstract String displayName();
 
     @Override
     public AdminApiLookupStrategy<CoordinatorKey> lookupStrategy() {
@@ -93,7 +93,7 @@ public class DeleteConsumerGroupsHandler extends 
AdminApiHandler.Batched<Coordin
         final Map<CoordinatorKey, Throwable> failed = new HashMap<>();
         final Set<CoordinatorKey> groupsToUnmap = new HashSet<>();
 
-        for (DeletableGroupResult deletedGroup : response.data().results()) {
+        for (DeleteGroupsResponseData.DeletableGroupResult deletedGroup : 
response.data().results()) {
             CoordinatorKey groupIdKey = 
CoordinatorKey.byGroupId(deletedGroup.groupId());
             Errors error = Errors.forCode(deletedGroup.errorCode());
             if (error != Errors.NONE) {
@@ -118,29 +118,28 @@ public class DeleteConsumerGroupsHandler extends 
AdminApiHandler.Batched<Coordin
             case INVALID_GROUP_ID:
             case NON_EMPTY_GROUP:
             case GROUP_ID_NOT_FOUND:
-                log.debug("`DeleteConsumerGroups` request for group id {} 
failed due to error {}", groupId.idValue, error);
+                log.debug("`{}` request for group id {} failed due to error 
{}", displayName(), groupId.idValue, error);
                 failed.put(groupId, error.exception());
                 break;
 
             case COORDINATOR_LOAD_IN_PROGRESS:
                 // If the coordinator is in the middle of loading, then we 
just need to retry
-                log.debug("`DeleteConsumerGroups` request for group id {} 
failed because the coordinator " +
-                    "is still in the process of loading state. Will retry", 
groupId.idValue);
+                log.debug("`{}` request for group id {} failed because the 
coordinator " +
+                    "is still in the process of loading state. Will retry", 
displayName(), groupId.idValue);
                 break;
 
             case COORDINATOR_NOT_AVAILABLE:
             case NOT_COORDINATOR:
                 // If the coordinator is unavailable or there was a 
coordinator change, then we unmap
                 // the key so that we retry the `FindCoordinator` request
-                log.debug("`DeleteConsumerGroups` request for group id {} 
returned error {}. " +
-                    "Will attempt to find the coordinator again and retry", 
groupId.idValue, error);
+                log.debug("`{}` request for group id {} returned error {}. " +
+                    "Will attempt to find the coordinator again and retry", 
displayName(), groupId.idValue, error);
                 groupsToUnmap.add(groupId);
                 break;
 
             default:
-                log.error("`DeleteConsumerGroups` request for group id {} 
failed due to unexpected error {}", groupId.idValue, error);
+                log.error("`{}` request for group id {} failed due to 
unexpected error {}", displayName(), groupId.idValue, error);
                 failed.put(groupId, error.exception());
         }
     }
-
 }
diff --git 
a/clients/src/main/java/org/apache/kafka/clients/admin/internals/DeleteShareGroupsHandler.java
 
b/clients/src/main/java/org/apache/kafka/clients/admin/internals/DeleteShareGroupsHandler.java
new file mode 100644
index 00000000000..9b43509d415
--- /dev/null
+++ 
b/clients/src/main/java/org/apache/kafka/clients/admin/internals/DeleteShareGroupsHandler.java
@@ -0,0 +1,38 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ *    http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.kafka.clients.admin.internals;
+
+import org.apache.kafka.common.utils.LogContext;
+
+public class DeleteShareGroupsHandler extends DeleteGroupsHandler {
+    public DeleteShareGroupsHandler(
+        LogContext logContext
+    ) {
+        super(logContext, DeleteShareGroupsHandler.class);
+    }
+
+    @Override
+    public String apiName() {
+        return "deleteShareGroups";
+    }
+
+    @Override
+    public String displayName() {
+        return "DeleteShareGroups";
+    }
+}
diff --git 
a/clients/src/test/java/org/apache/kafka/clients/admin/MockAdminClient.java 
b/clients/src/test/java/org/apache/kafka/clients/admin/MockAdminClient.java
index e91a2f92204..5c56665eb7d 100644
--- a/clients/src/test/java/org/apache/kafka/clients/admin/MockAdminClient.java
+++ b/clients/src/test/java/org/apache/kafka/clients/admin/MockAdminClient.java
@@ -1399,6 +1399,11 @@ public class MockAdminClient extends AdminClient {
         throw new UnsupportedOperationException("Not implemented yet");
     }
 
+    @Override
+    public synchronized DeleteShareGroupsResult 
deleteShareGroups(Collection<String> groupIds, DeleteShareGroupsOptions 
options) {
+        throw new UnsupportedOperationException("Not implemented yet");
+    }
+
     @Override
     public synchronized DescribeClassicGroupsResult 
describeClassicGroups(Collection<String> groupIds, DescribeClassicGroupsOptions 
options) {
         throw new UnsupportedOperationException("Not implemented yet");
diff --git 
a/clients/src/test/java/org/apache/kafka/clients/admin/internals/DeleteConsumerGroupsHandlerTest.java
 
b/clients/src/test/java/org/apache/kafka/clients/admin/internals/DeleteConsumerGroupsHandlerTest.java
index 8c4d9eb0eff..773708aa2f6 100644
--- 
a/clients/src/test/java/org/apache/kafka/clients/admin/internals/DeleteConsumerGroupsHandlerTest.java
+++ 
b/clients/src/test/java/org/apache/kafka/clients/admin/internals/DeleteConsumerGroupsHandlerTest.java
@@ -14,117 +14,13 @@
  * See the License for the specific language governing permissions and
  * limitations under the License.
  */
+
 package org.apache.kafka.clients.admin.internals;
 
-import org.apache.kafka.common.Node;
-import org.apache.kafka.common.errors.GroupAuthorizationException;
-import org.apache.kafka.common.errors.GroupIdNotFoundException;
-import org.apache.kafka.common.errors.GroupNotEmptyException;
-import org.apache.kafka.common.errors.InvalidGroupIdException;
-import org.apache.kafka.common.message.DeleteGroupsResponseData;
-import 
org.apache.kafka.common.message.DeleteGroupsResponseData.DeletableGroupResult;
-import 
org.apache.kafka.common.message.DeleteGroupsResponseData.DeletableGroupResultCollection;
-import org.apache.kafka.common.protocol.Errors;
-import org.apache.kafka.common.requests.DeleteGroupsRequest;
-import org.apache.kafka.common.requests.DeleteGroupsResponse;
 import org.apache.kafka.common.utils.LogContext;
 
-import org.junit.jupiter.api.Test;
-
-import static java.util.Collections.emptyList;
-import static java.util.Collections.emptySet;
-import static java.util.Collections.singleton;
-import static java.util.Collections.singletonList;
-import static org.junit.jupiter.api.Assertions.assertEquals;
-import static org.junit.jupiter.api.Assertions.assertInstanceOf;
-
-public class DeleteConsumerGroupsHandlerTest {
-
-    private final LogContext logContext = new LogContext();
-    private final String groupId1 = "group-id1";
-
-    @Test
-    public void testBuildRequest() {
-        DeleteConsumerGroupsHandler handler = new 
DeleteConsumerGroupsHandler(logContext);
-        DeleteGroupsRequest request = handler.buildBatchedRequest(1, 
singleton(CoordinatorKey.byGroupId(groupId1))).build();
-        assertEquals(1, request.data().groupsNames().size());
-        assertEquals(groupId1, request.data().groupsNames().get(0));
-    }
-
-    @Test
-    public void testSuccessfulHandleResponse() {
-        assertCompleted(handleWithError(Errors.NONE));
-    }
-
-    @Test
-    public void testUnmappedHandleResponse() {
-        assertUnmapped(handleWithError(Errors.NOT_COORDINATOR));
-        assertUnmapped(handleWithError(Errors.COORDINATOR_NOT_AVAILABLE));
-    }
-
-    @Test
-    public void testRetriableHandleResponse() {
-        assertRetriable(handleWithError(Errors.COORDINATOR_LOAD_IN_PROGRESS));
-    }
-
-    @Test
-    public void testFailedHandleResponse() {
-        assertFailed(GroupAuthorizationException.class, 
handleWithError(Errors.GROUP_AUTHORIZATION_FAILED));
-        assertFailed(GroupIdNotFoundException.class, 
handleWithError(Errors.GROUP_ID_NOT_FOUND));
-        assertFailed(InvalidGroupIdException.class, 
handleWithError(Errors.INVALID_GROUP_ID));
-        assertFailed(GroupNotEmptyException.class, 
handleWithError(Errors.NON_EMPTY_GROUP));
-    }
-
-    private DeleteGroupsResponse buildResponse(Errors error) {
-        return new DeleteGroupsResponse(
-                new DeleteGroupsResponseData()
-                    .setResults(new 
DeletableGroupResultCollection(singletonList(
-                            new DeletableGroupResult()
-                                .setErrorCode(error.code())
-                                .setGroupId(groupId1)).iterator())));
-    }
-
-    private AdminApiHandler.ApiResult<CoordinatorKey, Void> handleWithError(
-        Errors error
-    ) {
-        DeleteConsumerGroupsHandler handler = new 
DeleteConsumerGroupsHandler(logContext);
-        DeleteGroupsResponse response = buildResponse(error);
-        return handler.handleResponse(new Node(1, "host", 1234), 
singleton(CoordinatorKey.byGroupId(groupId1)), response);
-    }
-
-    private void assertUnmapped(
-        AdminApiHandler.ApiResult<CoordinatorKey, Void> result
-    ) {
-        assertEquals(emptySet(), result.completedKeys.keySet());
-        assertEquals(emptySet(), result.failedKeys.keySet());
-        assertEquals(singletonList(CoordinatorKey.byGroupId(groupId1)), 
result.unmappedKeys);
-    }
-
-    private void assertRetriable(
-        AdminApiHandler.ApiResult<CoordinatorKey, Void> result
-    ) {
-        assertEquals(emptySet(), result.completedKeys.keySet());
-        assertEquals(emptySet(), result.failedKeys.keySet());
-        assertEquals(emptyList(), result.unmappedKeys);
-    }
-
-    private void assertCompleted(
-        AdminApiHandler.ApiResult<CoordinatorKey, Void> result
-    ) {
-        CoordinatorKey key = CoordinatorKey.byGroupId(groupId1);
-        assertEquals(emptySet(), result.failedKeys.keySet());
-        assertEquals(emptyList(), result.unmappedKeys);
-        assertEquals(singleton(key), result.completedKeys.keySet());
-    }
-
-    private void assertFailed(
-        Class<? extends Throwable> expectedExceptionType,
-        AdminApiHandler.ApiResult<CoordinatorKey, Void> result
-    ) {
-        CoordinatorKey key = CoordinatorKey.byGroupId(groupId1);
-        assertEquals(emptySet(), result.completedKeys.keySet());
-        assertEquals(emptyList(), result.unmappedKeys);
-        assertEquals(singleton(key), result.failedKeys.keySet());
-        assertInstanceOf(expectedExceptionType, result.failedKeys.get(key));
+public class DeleteConsumerGroupsHandlerTest extends DeleteGroupsHandlerTest {
+    protected DeleteGroupsHandler handler() {
+        return new DeleteConsumerGroupsHandler(new LogContext());
     }
 }
diff --git 
a/clients/src/test/java/org/apache/kafka/clients/admin/internals/DeleteConsumerGroupsHandlerTest.java
 
b/clients/src/test/java/org/apache/kafka/clients/admin/internals/DeleteGroupsHandlerTest.java
similarity index 94%
copy from 
clients/src/test/java/org/apache/kafka/clients/admin/internals/DeleteConsumerGroupsHandlerTest.java
copy to 
clients/src/test/java/org/apache/kafka/clients/admin/internals/DeleteGroupsHandlerTest.java
index 8c4d9eb0eff..6b65de8d716 100644
--- 
a/clients/src/test/java/org/apache/kafka/clients/admin/internals/DeleteConsumerGroupsHandlerTest.java
+++ 
b/clients/src/test/java/org/apache/kafka/clients/admin/internals/DeleteGroupsHandlerTest.java
@@ -27,8 +27,8 @@ import 
org.apache.kafka.common.message.DeleteGroupsResponseData.DeletableGroupRe
 import org.apache.kafka.common.protocol.Errors;
 import org.apache.kafka.common.requests.DeleteGroupsRequest;
 import org.apache.kafka.common.requests.DeleteGroupsResponse;
-import org.apache.kafka.common.utils.LogContext;
 
+import org.junit.jupiter.api.BeforeEach;
 import org.junit.jupiter.api.Test;
 
 import static java.util.Collections.emptyList;
@@ -38,14 +38,21 @@ import static java.util.Collections.singletonList;
 import static org.junit.jupiter.api.Assertions.assertEquals;
 import static org.junit.jupiter.api.Assertions.assertInstanceOf;
 
-public class DeleteConsumerGroupsHandlerTest {
+public abstract class DeleteGroupsHandlerTest {
 
-    private final LogContext logContext = new LogContext();
     private final String groupId1 = "group-id1";
 
+    private DeleteGroupsHandler handler;
+
+    @BeforeEach
+    public void setup() {
+        handler = handler();
+    }
+
+    protected abstract DeleteGroupsHandler handler();
+
     @Test
     public void testBuildRequest() {
-        DeleteConsumerGroupsHandler handler = new 
DeleteConsumerGroupsHandler(logContext);
         DeleteGroupsRequest request = handler.buildBatchedRequest(1, 
singleton(CoordinatorKey.byGroupId(groupId1))).build();
         assertEquals(1, request.data().groupsNames().size());
         assertEquals(groupId1, request.data().groupsNames().get(0));
@@ -87,7 +94,6 @@ public class DeleteConsumerGroupsHandlerTest {
     private AdminApiHandler.ApiResult<CoordinatorKey, Void> handleWithError(
         Errors error
     ) {
-        DeleteConsumerGroupsHandler handler = new 
DeleteConsumerGroupsHandler(logContext);
         DeleteGroupsResponse response = buildResponse(error);
         return handler.handleResponse(new Node(1, "host", 1234), 
singleton(CoordinatorKey.byGroupId(groupId1)), response);
     }
diff --git 
a/clients/src/test/java/org/apache/kafka/clients/admin/internals/DeleteShareGroupsHandlerTest.java
 
b/clients/src/test/java/org/apache/kafka/clients/admin/internals/DeleteShareGroupsHandlerTest.java
new file mode 100644
index 00000000000..26f9d97f77e
--- /dev/null
+++ 
b/clients/src/test/java/org/apache/kafka/clients/admin/internals/DeleteShareGroupsHandlerTest.java
@@ -0,0 +1,26 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ *    http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.kafka.clients.admin.internals;
+
+import org.apache.kafka.common.utils.LogContext;
+
+public class DeleteShareGroupsHandlerTest extends DeleteGroupsHandlerTest {
+    protected DeleteGroupsHandler handler() {
+        return new DeleteShareGroupsHandler(new LogContext());
+    }
+}
diff --git 
a/streams/integration-tests/src/test/java/org/apache/kafka/streams/integration/TestingMetricsInterceptingAdminClient.java
 
b/streams/integration-tests/src/test/java/org/apache/kafka/streams/integration/TestingMetricsInterceptingAdminClient.java
index 888d690e1f6..c996e148b2b 100644
--- 
a/streams/integration-tests/src/test/java/org/apache/kafka/streams/integration/TestingMetricsInterceptingAdminClient.java
+++ 
b/streams/integration-tests/src/test/java/org/apache/kafka/streams/integration/TestingMetricsInterceptingAdminClient.java
@@ -55,6 +55,8 @@ import 
org.apache.kafka.clients.admin.DeleteConsumerGroupsOptions;
 import org.apache.kafka.clients.admin.DeleteConsumerGroupsResult;
 import org.apache.kafka.clients.admin.DeleteRecordsOptions;
 import org.apache.kafka.clients.admin.DeleteRecordsResult;
+import org.apache.kafka.clients.admin.DeleteShareGroupsOptions;
+import org.apache.kafka.clients.admin.DeleteShareGroupsResult;
 import org.apache.kafka.clients.admin.DeleteTopicsOptions;
 import org.apache.kafka.clients.admin.DeleteTopicsResult;
 import org.apache.kafka.clients.admin.DescribeAclsOptions;
@@ -428,6 +430,11 @@ public class TestingMetricsInterceptingAdminClient extends 
AdminClient {
         return adminDelegate.listShareGroupOffsets(groupSpecs, options);
     }
 
+    @Override
+    public DeleteShareGroupsResult deleteShareGroups(final Collection<String> 
groupIds, final DeleteShareGroupsOptions options) {
+        return adminDelegate.deleteShareGroups(groupIds, options);
+    }
+
     @Override
     public DescribeClassicGroupsResult describeClassicGroups(final 
Collection<String> groupIds, final DescribeClassicGroupsOptions options) {
         return adminDelegate.describeClassicGroups(groupIds, options);
diff --git 
a/tools/src/main/java/org/apache/kafka/tools/consumer/group/ShareGroupCommand.java
 
b/tools/src/main/java/org/apache/kafka/tools/consumer/group/ShareGroupCommand.java
index e5b01a12d9e..246a5eecbf1 100644
--- 
a/tools/src/main/java/org/apache/kafka/tools/consumer/group/ShareGroupCommand.java
+++ 
b/tools/src/main/java/org/apache/kafka/tools/consumer/group/ShareGroupCommand.java
@@ -17,7 +17,9 @@
 package org.apache.kafka.tools.consumer.group;
 
 import org.apache.kafka.clients.CommonClientConfigs;
+import org.apache.kafka.clients.admin.AbstractOptions;
 import org.apache.kafka.clients.admin.Admin;
+import org.apache.kafka.clients.admin.DeleteShareGroupsOptions;
 import org.apache.kafka.clients.admin.DescribeShareGroupsOptions;
 import org.apache.kafka.clients.admin.GroupListing;
 import org.apache.kafka.clients.admin.ListGroupsOptions;
@@ -30,6 +32,7 @@ import org.apache.kafka.common.GroupState;
 import org.apache.kafka.common.GroupType;
 import org.apache.kafka.common.KafkaFuture;
 import org.apache.kafka.common.TopicPartition;
+import org.apache.kafka.common.errors.GroupNotEmptyException;
 import org.apache.kafka.common.utils.Utils;
 import org.apache.kafka.server.util.CommandLineUtils;
 
@@ -40,6 +43,7 @@ import java.util.Arrays;
 import java.util.Collection;
 import java.util.HashMap;
 import java.util.HashSet;
+import java.util.LinkedHashSet;
 import java.util.List;
 import java.util.Map;
 import java.util.Map.Entry;
@@ -81,7 +85,7 @@ public class ShareGroupCommand {
             } else if (opts.options.has(opts.describeOpt)) {
                 shareGroupService.describeGroups();
             } else if (opts.options.has(opts.deleteOpt)) {
-                throw new UnsupportedOperationException("--delete option is 
not yet implemented");
+                shareGroupService.deleteShareGroups();
             } else if (opts.options.has(opts.resetOffsetsOpt)) {
                 throw new UnsupportedOperationException("--reset-offsets 
option is not yet implemented");
             } else if (opts.options.has(opts.deleteOffsetsOpt)) {
@@ -154,6 +158,18 @@ public class ShareGroupCommand {
             }
         }
 
+        List<GroupListing> listDetailedShareGroups() {
+            try {
+                ListGroupsResult result = adminClient.listGroups(new 
ListGroupsOptions()
+                    
.timeoutMs(opts.options.valueOf(opts.timeoutMsOpt).intValue())
+                    .withTypes(Set.of(GroupType.SHARE)));
+                Collection<GroupListing> listings = result.all().get();
+                return listings.stream().toList();
+            } catch (InterruptedException | ExecutionException e) {
+                throw new RuntimeException(e);
+            }
+        }
+
         List<GroupListing> listShareGroupsInStates(Set<GroupState> states) 
throws ExecutionException, InterruptedException {
             ListGroupsResult result = adminClient.listGroups(new 
ListGroupsOptions()
                 .timeoutMs(opts.options.valueOf(opts.timeoutMsOpt).intValue())
@@ -208,6 +224,72 @@ public class ShareGroupCommand {
             }
         }
 
+        Map<String, Throwable> deleteShareGroups() {
+            List<GroupListing> shareGroupIds = listDetailedShareGroups();
+            List<String> groupIds = opts.options.has(opts.allGroupsOpt)
+                ? shareGroupIds.stream().map(GroupListing::groupId).toList()
+                : opts.options.valuesOf(opts.groupOpt);
+
+            // Pre admin call checks
+            LinkedHashSet<String> groupIdSet = new LinkedHashSet<>(groupIds);
+            Map<String, Exception> errGroups = new HashMap<>();
+            for (String groupId : groupIdSet) {
+                Optional<GroupListing> listing = 
shareGroupIds.stream().filter(item -> item.groupId().equals(groupId)).findAny();
+                if (listing.isEmpty()) {
+                    errGroups.put(groupId, new IllegalArgumentException("Group 
'" + groupId + "' is not a share group."));
+                } else {
+                    Optional<GroupState> groupState = 
listing.get().groupState();
+                    groupState.ifPresent(state -> {
+                        if (state == GroupState.DEAD) {
+                            errGroups.put(groupId, new 
IllegalStateException("Share group '" + groupId + "' group state is DEAD."));
+                        } else if (state != GroupState.EMPTY) {
+                            errGroups.put(groupId, new 
GroupNotEmptyException("Share group '" + groupId + "' is not EMPTY."));
+                        }
+                    });
+                }
+            }
+
+            groupIdSet.removeAll(errGroups.keySet());
+
+            Map<String, KafkaFuture<Void>> groupsToDelete = 
groupIdSet.isEmpty() ? Map.of() : adminClient.deleteShareGroups(
+                groupIdSet.stream().toList(),
+                withTimeoutMs(new DeleteShareGroupsOptions())
+            ).deletedGroups();
+
+            Map<String, Throwable> success = new HashMap<>();
+            Map<String, Throwable> failed = new HashMap<>(errGroups);
+
+            groupsToDelete.forEach((g, f) -> {
+                try {
+                    f.get();
+                    success.put(g, null);
+                } catch (InterruptedException ie) {
+                    failed.put(g, ie);
+                } catch (ExecutionException e) {
+                    failed.put(g, e.getCause());
+                }
+            });
+
+            if (failed.isEmpty())
+                System.out.println("Deletion of requested share groups (" + 
success.keySet().stream().map(group -> "'" + group + 
"'").collect(Collectors.joining(", ")) + ") was successful.");
+            else {
+                printError("Deletion of some share groups failed:", 
Optional.empty());
+                failed.forEach((group, error) -> System.out.println("* Share 
group '" + group + "' could not be deleted due to: " + error));
+
+                if (!success.isEmpty())
+                    System.out.println("\nThese share groups were deleted 
successfully: " + success.keySet().stream().map(group -> "'" + group + 
"'").collect(Collectors.joining(",")));
+            }
+
+            failed.putAll(success);
+
+            return failed;
+        }
+
+        private <T extends AbstractOptions<T>> T withTimeoutMs(T options) {
+            int t = opts.options.valueOf(opts.timeoutMsOpt).intValue();
+            return options.timeoutMs(t);
+        }
+
         Map<String, ShareGroupDescription> 
describeShareGroups(Collection<String> groupIds) throws ExecutionException, 
InterruptedException {
             Map<String, ShareGroupDescription> res = new HashMap<>();
             Map<String, KafkaFuture<ShareGroupDescription>> 
stringKafkaFutureMap = adminClient.describeShareGroups(
diff --git 
a/tools/src/main/java/org/apache/kafka/tools/consumer/group/ShareGroupCommandOptions.java
 
b/tools/src/main/java/org/apache/kafka/tools/consumer/group/ShareGroupCommandOptions.java
index 0c1a2a285ec..3ba0a707ee5 100644
--- 
a/tools/src/main/java/org/apache/kafka/tools/consumer/group/ShareGroupCommandOptions.java
+++ 
b/tools/src/main/java/org/apache/kafka/tools/consumer/group/ShareGroupCommandOptions.java
@@ -173,9 +173,12 @@ public class ShareGroupCommandOptions extends 
CommandDefaultOptions {
         }
 
         if (options.has(deleteOpt)) {
-            if (!options.has(groupOpt))
+            if (!options.has(groupOpt) && !options.has(allGroupsOpt))
+                CommandLineUtils.printUsageAndExit(parser,
+                    String.format("Option %s takes the options %s or %s", 
deleteOpt, groupOpt, allGroupsOpt));
+            if (options.has(allGroupsOpt) && options.has(groupOpt))
                 CommandLineUtils.printUsageAndExit(parser,
-                    "Option " + deleteOpt + " takes the option: " + groupOpt);
+                    String.format("Option %s takes either %s or %s, not 
both.", deleteOpt, groupOpt, allGroupsOpt));
             if (options.has(topicOpt))
                 CommandLineUtils.printUsageAndExit(parser,
                     "Option " + deleteOpt + " does not take the option: " + 
topicOpt);
diff --git 
a/tools/src/test/java/org/apache/kafka/tools/consumer/group/ShareGroupCommandTest.java
 
b/tools/src/test/java/org/apache/kafka/tools/consumer/group/ShareGroupCommandTest.java
index db58847d34a..46103e10f43 100644
--- 
a/tools/src/test/java/org/apache/kafka/tools/consumer/group/ShareGroupCommandTest.java
+++ 
b/tools/src/test/java/org/apache/kafka/tools/consumer/group/ShareGroupCommandTest.java
@@ -16,8 +16,10 @@
  */
 package org.apache.kafka.tools.consumer.group;
 
+
 import org.apache.kafka.clients.admin.Admin;
 import org.apache.kafka.clients.admin.AdminClientTestUtils;
+import org.apache.kafka.clients.admin.DeleteShareGroupsResult;
 import org.apache.kafka.clients.admin.DescribeShareGroupsOptions;
 import org.apache.kafka.clients.admin.DescribeShareGroupsResult;
 import org.apache.kafka.clients.admin.GroupListing;
@@ -36,11 +38,14 @@ import org.apache.kafka.common.Node;
 import org.apache.kafka.common.TopicPartition;
 import org.apache.kafka.common.errors.GroupIdNotFoundException;
 import org.apache.kafka.common.internals.KafkaFutureImpl;
+import org.apache.kafka.common.utils.Exit;
 import org.apache.kafka.test.TestUtils;
 import org.apache.kafka.tools.ToolsTestUtils;
 import 
org.apache.kafka.tools.consumer.group.ShareGroupCommand.ShareGroupService;
 
+import org.junit.jupiter.api.AfterEach;
 import org.junit.jupiter.api.Assertions;
+import org.junit.jupiter.api.BeforeEach;
 import org.junit.jupiter.api.Test;
 import org.mockito.ArgumentMatchers;
 
@@ -48,7 +53,9 @@ import java.util.ArrayList;
 import java.util.Arrays;
 import java.util.Collection;
 import java.util.Collections;
+import java.util.HashMap;
 import java.util.HashSet;
+import java.util.LinkedHashMap;
 import java.util.List;
 import java.util.Map;
 import java.util.Map.Entry;
@@ -56,6 +63,7 @@ import java.util.Objects;
 import java.util.Optional;
 import java.util.Set;
 import java.util.concurrent.ExecutionException;
+import java.util.concurrent.atomic.AtomicBoolean;
 import java.util.stream.Stream;
 
 import joptsimple.OptionException;
@@ -63,11 +71,15 @@ import joptsimple.OptionException;
 import static org.junit.jupiter.api.Assertions.assertEquals;
 import static org.junit.jupiter.api.Assertions.assertFalse;
 import static org.junit.jupiter.api.Assertions.assertInstanceOf;
+import static org.junit.jupiter.api.Assertions.assertNotEquals;
 import static org.junit.jupiter.api.Assertions.assertThrows;
 import static org.junit.jupiter.api.Assertions.assertTrue;
 import static org.junit.jupiter.api.Assertions.fail;
 import static org.mockito.ArgumentMatchers.any;
+import static org.mockito.ArgumentMatchers.anyList;
 import static org.mockito.Mockito.mock;
+import static org.mockito.Mockito.times;
+import static org.mockito.Mockito.verify;
 import static org.mockito.Mockito.when;
 
 public class ShareGroupCommandTest {
@@ -76,6 +88,18 @@ public class ShareGroupCommandTest {
     private static final List<List<String>> DESCRIBE_TYPE_STATE = 
List.of(List.of("--state"), List.of("--state", "--verbose"));
     private static final List<List<String>> DESCRIBE_TYPES = 
Stream.of(DESCRIBE_TYPE_OFFSETS, DESCRIBE_TYPE_MEMBERS, 
DESCRIBE_TYPE_STATE).flatMap(Collection::stream).toList();
 
+    @BeforeEach
+    public void setup() {
+        // nothing by default
+        Exit.setExitProcedure(((statusCode, message) -> {
+        }));
+    }
+
+    @AfterEach
+    public void teardown() {
+        Exit.resetExitProcedure();
+    }
+
     @Test
     public void testListShareGroups() throws Exception {
         String firstGroup = "first-group";
@@ -588,8 +612,260 @@ public class ShareGroupCommandTest {
         assertThrows(IllegalArgumentException.class, () -> 
ShareGroupCommand.groupStatesFromString("   ,   ,"));
     }
 
+    @Test
+    public void testDeleteShareGroupsArgs() {
+        String bootstrapServer = "localhost:9092";
+        Admin adminClient = mock(KafkaAdminClient.class);
+
+        mockListShareGroups(adminClient, new LinkedHashMap<>());
+
+        // no group spec args
+        String[] cgcArgs = new String[]{"--bootstrap-server", bootstrapServer, 
"--delete"};
+        AtomicBoolean exited = new AtomicBoolean(false);
+        Exit.setExitProcedure(((statusCode, message) -> {
+            assertNotEquals(0, statusCode);
+            assertTrue(message.contains("Option [delete] takes the options 
[group] or [all-groups]"));
+            exited.set(true);
+        }));
+        try {
+            getShareGroupService(cgcArgs, adminClient);
+        } finally {
+            assertTrue(exited.get());
+        }
+    }
+
+    @Test
+    public void testDeleteShareGroupsSuccess() {
+        String firstGroup = "first-group";
+        String secondGroup = "second-group";
+        String bootstrapServer = "localhost:9092";
+
+        String[] cgcArgs = new String[]{"--bootstrap-server", bootstrapServer, 
"--delete", "--group", firstGroup, "--group", secondGroup};
+        Admin adminClient = mock(KafkaAdminClient.class);
+        DeleteShareGroupsResult result = mock(DeleteShareGroupsResult.class);
+        Map<String, KafkaFuture<Void>> deletedGroups = Map.of(
+            firstGroup, KafkaFuture.completedFuture(null),
+            secondGroup, KafkaFuture.completedFuture(null)
+        );
+
+        LinkedHashMap<String, GroupState> shareGroupMap = new 
LinkedHashMap<>();
+        shareGroupMap.put(firstGroup, GroupState.EMPTY);
+        shareGroupMap.put(secondGroup, GroupState.EMPTY);
+        mockListShareGroups(adminClient, shareGroupMap);
+
+        when(result.deletedGroups()).thenReturn(deletedGroups);
+
+        Map<String, Throwable> expectedResults = new HashMap<>();
+        expectedResults.put(firstGroup, null);
+        expectedResults.put(secondGroup, null);
+
+        when(adminClient.deleteShareGroups(anyList(), 
any())).thenReturn(result);
+
+        try (ShareGroupService service = getShareGroupService(cgcArgs, 
adminClient)) {
+            assertEquals(expectedResults, service.deleteShareGroups());
+        }
+    }
+
+    @Test
+    public void testDeleteShareGroupsAllGroupsSuccess() {
+        String firstGroup = "first-group";
+        String secondGroup = "second-group";
+        String bootstrapServer = "localhost:9092";
+
+        String[] cgcArgs = new String[]{"--bootstrap-server", bootstrapServer, 
"--delete", "--all-groups"};
+        Admin adminClient = mock(KafkaAdminClient.class);
+        DeleteShareGroupsResult result = mock(DeleteShareGroupsResult.class);
+        Map<String, KafkaFuture<Void>> deletedGroups = Map.of(
+            firstGroup, KafkaFuture.completedFuture(null),
+            secondGroup, KafkaFuture.completedFuture(null)
+        );
+
+        LinkedHashMap<String, GroupState> shareGroupMap = new 
LinkedHashMap<>();
+        shareGroupMap.put(firstGroup, GroupState.EMPTY);
+        shareGroupMap.put(secondGroup, GroupState.EMPTY);
+        mockListShareGroups(adminClient, shareGroupMap);
+
+        when(result.deletedGroups()).thenReturn(deletedGroups);
+
+        Map<String, Throwable> expectedResults = new HashMap<>();
+        expectedResults.put(firstGroup, null);
+        expectedResults.put(secondGroup, null);
+
+        when(adminClient.deleteShareGroups(anyList(), 
any())).thenReturn(result);
+
+        try (ShareGroupService service = getShareGroupService(cgcArgs, 
adminClient)) {
+            assertEquals(expectedResults, service.deleteShareGroups());
+        }
+    }
+
+    @Test
+    public void testDeleteShareGroupsAllGroupsPartialFail() {
+        String firstGroup = "first-group";
+        String secondGroup = "second-group";
+        String bootstrapServer = "localhost:9092";
+
+        String[] cgcArgs = new String[]{"--bootstrap-server", bootstrapServer, 
"--delete", "--all-groups"};
+        Admin adminClient = mock(KafkaAdminClient.class);
+        DeleteShareGroupsResult result = mock(DeleteShareGroupsResult.class);
+        KafkaFutureImpl<Void> future1 = new KafkaFutureImpl<>();
+        KafkaFutureImpl<Void> future2 = new KafkaFutureImpl<>();
+        future1.complete(null);
+        Exception exp = new Exception("bad");
+        future2.completeExceptionally(exp);
+        Map<String, KafkaFuture<Void>> deletedGroups = Map.of(
+            firstGroup, future1,
+            secondGroup, future2
+        );
+
+        LinkedHashMap<String, GroupState> shareGroupMap = new 
LinkedHashMap<>();
+        shareGroupMap.put(firstGroup, GroupState.EMPTY);
+        shareGroupMap.put(secondGroup, GroupState.EMPTY);
+        mockListShareGroups(adminClient, shareGroupMap);
+
+        when(result.deletedGroups()).thenReturn(deletedGroups);
+
+        Map<String, Throwable> expectedResults = new HashMap<>();
+        expectedResults.put(firstGroup, null);
+        expectedResults.put(secondGroup, exp);
+
+        when(adminClient.deleteShareGroups(anyList(), 
any())).thenReturn(result);
+
+        try (ShareGroupService service = getShareGroupService(cgcArgs, 
adminClient)) {
+            assertEquals(expectedResults, service.deleteShareGroups());
+        }
+    }
+
+    @Test
+    public void testDeleteShareGroupsDeleteFailure() {
+        String firstGroup = "first-group";
+        String secondGroup = "second-group";
+        String bootstrapServer = "localhost:9092";
+
+        String[] cgcArgs = new String[]{"--bootstrap-server", bootstrapServer, 
"--delete", "--group", firstGroup, "--group", secondGroup};
+        Admin adminClient = mock(KafkaAdminClient.class);
+        DeleteShareGroupsResult result = mock(DeleteShareGroupsResult.class);
+
+        LinkedHashMap<String, GroupState> shareGroupMap = new 
LinkedHashMap<>();
+        shareGroupMap.put(firstGroup, GroupState.EMPTY);
+        shareGroupMap.put(secondGroup, GroupState.EMPTY);
+        mockListShareGroups(adminClient, shareGroupMap);
+
+        KafkaFutureImpl<Void> future = new KafkaFutureImpl<>();
+        Exception exp = new Exception("bad");
+        future.completeExceptionally(exp);
+        Map<String, KafkaFuture<Void>> deletedGroups = Map.of(
+            firstGroup, future,
+            secondGroup, future
+        );
+
+        when(result.deletedGroups()).thenReturn(deletedGroups);
+
+        Map<String, Throwable> expectedResults = new HashMap<>();
+        expectedResults.put(firstGroup, exp);
+        expectedResults.put(secondGroup, exp);
+
+        when(adminClient.deleteShareGroups(anyList(), 
any())).thenReturn(result);
+
+        try (ShareGroupService service = getShareGroupService(cgcArgs, 
adminClient)) {
+            assertEquals(expectedResults, service.deleteShareGroups());
+        }
+    }
+
+    @Test
+    public void testDeleteShareGroupsFailureNonShareGroup() {
+        String firstGroup = "first-group";
+        String bootstrapServer = "localhost:9092";
+
+        String[] cgcArgs = new String[]{"--bootstrap-server", bootstrapServer, 
"--delete", "--group", firstGroup};
+        Admin adminClient = mock(KafkaAdminClient.class);
+        DeleteShareGroupsResult result = mock(DeleteShareGroupsResult.class);
+        mockListShareGroups(adminClient, new LinkedHashMap<>());
+
+        when(result.deletedGroups()).thenReturn(Map.of());
+
+        when(adminClient.deleteShareGroups(anyList(), 
any())).thenReturn(result);
+
+        try (ShareGroupService service = getShareGroupService(cgcArgs, 
adminClient)) {
+            service.deleteShareGroups();
+            verify(result, times(0)).deletedGroups();
+            verify(adminClient, times(0)).deleteShareGroups(anyList());
+        }
+    }
+
+    @Test
+    public void testDeleteShareGroupsFailureNonEmptyGroup() {
+        String firstGroup = "first-group";
+        String bootstrapServer = "localhost:9092";
+
+        String[] cgcArgs = new String[]{"--bootstrap-server", bootstrapServer, 
"--delete", "--group", firstGroup};
+        Admin adminClient = mock(KafkaAdminClient.class);
+        DeleteShareGroupsResult result = mock(DeleteShareGroupsResult.class);
+
+        LinkedHashMap<String, GroupState> shareGroupMap = new 
LinkedHashMap<>();
+        shareGroupMap.put(firstGroup, GroupState.STABLE);
+        mockListShareGroups(adminClient, shareGroupMap);
+
+        when(result.deletedGroups()).thenReturn(Map.of());
+
+        when(adminClient.deleteShareGroups(anyList(), 
any())).thenReturn(result);
+
+        try (ShareGroupService service = getShareGroupService(cgcArgs, 
adminClient)) {
+            service.deleteShareGroups();
+            verify(result, times(0)).deletedGroups();
+            verify(adminClient, times(0)).deleteShareGroups(anyList());
+        }
+    }
+
+    @Test
+    public void testDeleteShareGroupsPartialFailure() {
+        String firstGroup = "first-group";
+        String secondGroup = "second-group";
+        String bootstrapServer = "localhost:9092";
+
+        String[] cgcArgs = new String[]{"--bootstrap-server", bootstrapServer, 
"--delete", "--group", firstGroup, "--group", secondGroup};
+        Admin adminClient = mock(KafkaAdminClient.class);
+        DeleteShareGroupsResult result = mock(DeleteShareGroupsResult.class);
+        LinkedHashMap<String, GroupState> shareGroupMap = new 
LinkedHashMap<>();
+        shareGroupMap.put(firstGroup, GroupState.EMPTY);
+        shareGroupMap.put(secondGroup, GroupState.EMPTY);
+        mockListShareGroups(adminClient, shareGroupMap);
+        KafkaFutureImpl<Void> future1 = new KafkaFutureImpl<>();
+        KafkaFutureImpl<Void> future2 = new KafkaFutureImpl<>();
+        future1.complete(null);
+        Exception exp = new Exception("bad");
+        future2.completeExceptionally(exp);
+        Map<String, KafkaFuture<Void>> deletedGroups = Map.of(
+            firstGroup, future1,
+            secondGroup, future2
+        );
+
+        when(result.deletedGroups()).thenReturn(deletedGroups);
+
+        when(adminClient.deleteShareGroups(anyList(), 
any())).thenReturn(result);
+        Map<String, Throwable> expectedResults = new HashMap<>();
+        expectedResults.put(firstGroup, null);
+        expectedResults.put(secondGroup, exp);
+
+        try (ShareGroupService service = getShareGroupService(cgcArgs, 
adminClient)) {
+            assertEquals(expectedResults, service.deleteShareGroups());
+        }
+    }
+
+    private void mockListShareGroups(Admin client, LinkedHashMap<String, 
GroupState> groupIds) {
+        ListGroupsResult listResult = mock(ListGroupsResult.class);
+        KafkaFutureImpl<Collection<GroupListing>> listFuture = new 
KafkaFutureImpl<>();
+        List<GroupListing> groupListings = new ArrayList<>();
+        groupIds.forEach((groupId, state) -> groupListings.add(
+            new GroupListing(groupId, Optional.of(GroupType.SHARE), "share", 
Optional.of(state))
+        ));
+        listFuture.complete(groupListings);
+        when(listResult.all()).thenReturn(listFuture);
+        when(client.listGroups(any())).thenReturn(listResult);
+    }
+
     ShareGroupService getShareGroupService(String[] args, Admin adminClient) {
         ShareGroupCommandOptions opts = new ShareGroupCommandOptions(args);
+        opts.checkArgs();
         return new ShareGroupService(opts, adminClient);
     }
 

Reply via email to