This is an automated email from the ASF dual-hosted git repository.
schofielaj pushed a commit to branch trunk
in repository https://gitbox.apache.org/repos/asf/kafka.git
The following commit(s) were added to refs/heads/trunk by this push:
new 3fc103b48b6 KAFKA-18629: ShareGroupDeleteState admin client impl.
(#18928)
3fc103b48b6 is described below
commit 3fc103b48b65fc37d8ae22d4d7c36104ae1b3b35
Author: Sushant Mahajan <[email protected]>
AuthorDate: Sat Feb 22 21:51:10 2025 +0530
KAFKA-18629: ShareGroupDeleteState admin client impl. (#18928)
* In this PR, we add various infra classes needed to support the
`deleteShareGroups` functionality via the `kafka-share-groups.sh`
script, as well as the implementation of `kafka-share-groups.sh --delete`.
Reviewers: Andrew Schofield <[email protected]>
---
.../java/org/apache/kafka/clients/admin/Admin.java | 17 ++
.../clients/admin/DeleteConsumerGroupsResult.java | 33 +--
...erGroupsResult.java => DeleteGroupsResult.java} | 11 +-
...psResult.java => DeleteShareGroupsOptions.java} | 32 +--
.../clients/admin/DeleteShareGroupsResult.java | 32 +++
.../kafka/clients/admin/ForwardingAdmin.java | 5 +
.../kafka/clients/admin/KafkaAdminClient.java | 11 +
.../internals/DeleteConsumerGroupsHandler.java | 116 +--------
...GroupsHandler.java => DeleteGroupsHandler.java} | 39 ++-
.../admin/internals/DeleteShareGroupsHandler.java | 38 +++
.../kafka/clients/admin/MockAdminClient.java | 5 +
.../internals/DeleteConsumerGroupsHandlerTest.java | 112 +--------
...ndlerTest.java => DeleteGroupsHandlerTest.java} | 16 +-
.../internals/DeleteShareGroupsHandlerTest.java | 26 ++
.../TestingMetricsInterceptingAdminClient.java | 7 +
.../tools/consumer/group/ShareGroupCommand.java | 84 ++++++-
.../consumer/group/ShareGroupCommandOptions.java | 7 +-
.../consumer/group/ShareGroupCommandTest.java | 276 +++++++++++++++++++++
18 files changed, 558 insertions(+), 309 deletions(-)
diff --git a/clients/src/main/java/org/apache/kafka/clients/admin/Admin.java
b/clients/src/main/java/org/apache/kafka/clients/admin/Admin.java
index b0248b43842..d7479b5ba1a 100644
--- a/clients/src/main/java/org/apache/kafka/clients/admin/Admin.java
+++ b/clients/src/main/java/org/apache/kafka/clients/admin/Admin.java
@@ -1841,6 +1841,23 @@ public interface Admin extends AutoCloseable {
return listShareGroupOffsets(groupSpecs, new
ListShareGroupOffsetsOptions());
}
+ /**
+ * Delete share groups from the cluster with the default options.
+ *
+ * @return The DeleteShareGroupsResult.
+ */
+ default DeleteShareGroupsResult deleteShareGroups(Collection<String>
groupIds) {
+ return deleteShareGroups(groupIds, new DeleteShareGroupsOptions());
+ }
+
+ /**
+ * Delete share groups from the cluster.
+ *
+ * @param options The options to use when deleting a share group.
+ * @return The DeleteShareGroupsResult.
+ */
+ DeleteShareGroupsResult deleteShareGroups(Collection<String> groupIds,
DeleteShareGroupsOptions options);
+
/**
* Describe some classic groups in the cluster.
*
diff --git
a/clients/src/main/java/org/apache/kafka/clients/admin/DeleteConsumerGroupsResult.java
b/clients/src/main/java/org/apache/kafka/clients/admin/DeleteConsumerGroupsResult.java
index 5a7ad396935..902dfd101c1 100644
---
a/clients/src/main/java/org/apache/kafka/clients/admin/DeleteConsumerGroupsResult.java
+++
b/clients/src/main/java/org/apache/kafka/clients/admin/DeleteConsumerGroupsResult.java
@@ -14,42 +14,19 @@
* See the License for the specific language governing permissions and
* limitations under the License.
*/
+
package org.apache.kafka.clients.admin;
import org.apache.kafka.common.KafkaFuture;
-import org.apache.kafka.common.annotation.InterfaceStability;
import java.util.Collection;
-import java.util.HashMap;
import java.util.Map;
/**
- * The result of the {@link Admin#deleteConsumerGroups(Collection)} call.
- *
- * The API of this class is evolving, see {@link Admin} for details.
+ * The result of the {@link Admin#deleteConsumerGroups(Collection <String>,
DeleteConsumerGroupsOptions)} call.
*/
[email protected]
-public class DeleteConsumerGroupsResult {
- private final Map<String, KafkaFuture<Void>> futures;
-
- DeleteConsumerGroupsResult(final Map<String, KafkaFuture<Void>> futures) {
- this.futures = futures;
- }
-
- /**
- * Return a map from group id to futures which can be used to check the
status of
- * individual deletions.
- */
- public Map<String, KafkaFuture<Void>> deletedGroups() {
- Map<String, KafkaFuture<Void>> deletedGroups = new
HashMap<>(futures.size());
- deletedGroups.putAll(futures);
- return deletedGroups;
- }
-
- /**
- * Return a future which succeeds only if all the consumer group deletions
succeed.
- */
- public KafkaFuture<Void> all() {
- return KafkaFuture.allOf(futures.values().toArray(new
KafkaFuture<?>[0]));
+public class DeleteConsumerGroupsResult extends DeleteGroupsResult {
+ public DeleteConsumerGroupsResult(final Map<String, KafkaFuture<Void>>
futures) {
+ super(futures);
}
}
diff --git
a/clients/src/main/java/org/apache/kafka/clients/admin/DeleteConsumerGroupsResult.java
b/clients/src/main/java/org/apache/kafka/clients/admin/DeleteGroupsResult.java
similarity index 83%
copy from
clients/src/main/java/org/apache/kafka/clients/admin/DeleteConsumerGroupsResult.java
copy to
clients/src/main/java/org/apache/kafka/clients/admin/DeleteGroupsResult.java
index 5a7ad396935..2be42025e04 100644
---
a/clients/src/main/java/org/apache/kafka/clients/admin/DeleteConsumerGroupsResult.java
+++
b/clients/src/main/java/org/apache/kafka/clients/admin/DeleteGroupsResult.java
@@ -24,15 +24,16 @@ import java.util.HashMap;
import java.util.Map;
/**
- * The result of the {@link Admin#deleteConsumerGroups(Collection)} call.
- *
+ * The parent class of result of the {@link
Admin#deleteConsumerGroups(Collection)},
+ * {@link Admin#deleteShareGroups(Collection)} calls.
+ * <p>
* The API of this class is evolving, see {@link Admin} for details.
*/
@InterfaceStability.Evolving
-public class DeleteConsumerGroupsResult {
+public abstract class DeleteGroupsResult {
private final Map<String, KafkaFuture<Void>> futures;
- DeleteConsumerGroupsResult(final Map<String, KafkaFuture<Void>> futures) {
+ DeleteGroupsResult(final Map<String, KafkaFuture<Void>> futures) {
this.futures = futures;
}
@@ -47,7 +48,7 @@ public class DeleteConsumerGroupsResult {
}
/**
- * Return a future which succeeds only if all the consumer group deletions
succeed.
+ * Return a future which succeeds only if all the group deletions succeed.
*/
public KafkaFuture<Void> all() {
return KafkaFuture.allOf(futures.values().toArray(new
KafkaFuture<?>[0]));
diff --git
a/clients/src/main/java/org/apache/kafka/clients/admin/DeleteConsumerGroupsResult.java
b/clients/src/main/java/org/apache/kafka/clients/admin/DeleteShareGroupsOptions.java
similarity index 51%
copy from
clients/src/main/java/org/apache/kafka/clients/admin/DeleteConsumerGroupsResult.java
copy to
clients/src/main/java/org/apache/kafka/clients/admin/DeleteShareGroupsOptions.java
index 5a7ad396935..a41ec6d00b3 100644
---
a/clients/src/main/java/org/apache/kafka/clients/admin/DeleteConsumerGroupsResult.java
+++
b/clients/src/main/java/org/apache/kafka/clients/admin/DeleteShareGroupsOptions.java
@@ -14,42 +14,18 @@
* See the License for the specific language governing permissions and
* limitations under the License.
*/
+
package org.apache.kafka.clients.admin;
-import org.apache.kafka.common.KafkaFuture;
import org.apache.kafka.common.annotation.InterfaceStability;
import java.util.Collection;
-import java.util.HashMap;
-import java.util.Map;
/**
- * The result of the {@link Admin#deleteConsumerGroups(Collection)} call.
- *
+ * Options for the {@link Admin#deleteShareGroups(Collection <String>,
DeleteShareGroupsOptions)} call.
+ * <p>
* The API of this class is evolving, see {@link Admin} for details.
*/
@InterfaceStability.Evolving
-public class DeleteConsumerGroupsResult {
- private final Map<String, KafkaFuture<Void>> futures;
-
- DeleteConsumerGroupsResult(final Map<String, KafkaFuture<Void>> futures) {
- this.futures = futures;
- }
-
- /**
- * Return a map from group id to futures which can be used to check the
status of
- * individual deletions.
- */
- public Map<String, KafkaFuture<Void>> deletedGroups() {
- Map<String, KafkaFuture<Void>> deletedGroups = new
HashMap<>(futures.size());
- deletedGroups.putAll(futures);
- return deletedGroups;
- }
-
- /**
- * Return a future which succeeds only if all the consumer group deletions
succeed.
- */
- public KafkaFuture<Void> all() {
- return KafkaFuture.allOf(futures.values().toArray(new
KafkaFuture<?>[0]));
- }
+public class DeleteShareGroupsOptions extends
AbstractOptions<DeleteShareGroupsOptions> {
}
diff --git
a/clients/src/main/java/org/apache/kafka/clients/admin/DeleteShareGroupsResult.java
b/clients/src/main/java/org/apache/kafka/clients/admin/DeleteShareGroupsResult.java
new file mode 100644
index 00000000000..8abd3e73263
--- /dev/null
+++
b/clients/src/main/java/org/apache/kafka/clients/admin/DeleteShareGroupsResult.java
@@ -0,0 +1,32 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.kafka.clients.admin;
+
+import org.apache.kafka.common.KafkaFuture;
+
+import java.util.Collection;
+import java.util.Map;
+
+/**
+ * The result of the {@link Admin#deleteShareGroups(Collection <String>,
DeleteShareGroupsOptions)} call.
+ */
+public class DeleteShareGroupsResult extends DeleteGroupsResult {
+ DeleteShareGroupsResult(final Map<String, KafkaFuture<Void>> futures) {
+ super(futures);
+ }
+}
diff --git
a/clients/src/main/java/org/apache/kafka/clients/admin/ForwardingAdmin.java
b/clients/src/main/java/org/apache/kafka/clients/admin/ForwardingAdmin.java
index b52688e7d3b..ee2ac0942a9 100644
--- a/clients/src/main/java/org/apache/kafka/clients/admin/ForwardingAdmin.java
+++ b/clients/src/main/java/org/apache/kafka/clients/admin/ForwardingAdmin.java
@@ -308,6 +308,11 @@ public class ForwardingAdmin implements Admin {
return delegate.listShareGroupOffsets(groupSpecs, options);
}
+ @Override
+ public DeleteShareGroupsResult deleteShareGroups(Collection<String>
groupIds, DeleteShareGroupsOptions options) {
+ return delegate.deleteShareGroups(groupIds, options);
+ }
+
@Override
public ListGroupsResult listGroups(ListGroupsOptions options) {
return delegate.listGroups(options);
diff --git
a/clients/src/main/java/org/apache/kafka/clients/admin/KafkaAdminClient.java
b/clients/src/main/java/org/apache/kafka/clients/admin/KafkaAdminClient.java
index 7c8842d4061..3b31efec17a 100644
--- a/clients/src/main/java/org/apache/kafka/clients/admin/KafkaAdminClient.java
+++ b/clients/src/main/java/org/apache/kafka/clients/admin/KafkaAdminClient.java
@@ -50,6 +50,7 @@ import
org.apache.kafka.clients.admin.internals.CoordinatorKey;
import
org.apache.kafka.clients.admin.internals.DeleteConsumerGroupOffsetsHandler;
import org.apache.kafka.clients.admin.internals.DeleteConsumerGroupsHandler;
import org.apache.kafka.clients.admin.internals.DeleteRecordsHandler;
+import org.apache.kafka.clients.admin.internals.DeleteShareGroupsHandler;
import org.apache.kafka.clients.admin.internals.DescribeClassicGroupsHandler;
import org.apache.kafka.clients.admin.internals.DescribeConsumerGroupsHandler;
import org.apache.kafka.clients.admin.internals.DescribeProducersHandler;
@@ -3829,6 +3830,16 @@ public class KafkaAdminClient extends AdminClient {
.collect(Collectors.toMap(entry -> entry.getKey().idValue,
Map.Entry::getValue)));
}
+ @Override
+ public DeleteShareGroupsResult deleteShareGroups(Collection<String>
groupIds, DeleteShareGroupsOptions options) {
+ SimpleAdminApiFuture<CoordinatorKey, Void> future =
+ DeleteShareGroupsHandler.newFuture(groupIds);
+ DeleteShareGroupsHandler handler = new
DeleteShareGroupsHandler(logContext);
+ invokeDriver(handler, future, options.timeoutMs);
+ return new DeleteShareGroupsResult(future.all().entrySet().stream()
+ .collect(Collectors.toMap(entry -> entry.getKey().idValue,
Map.Entry::getValue)));
+ }
+
@Override
public Map<MetricName, ? extends Metric> metrics() {
return Collections.unmodifiableMap(this.metrics.metrics());
diff --git
a/clients/src/main/java/org/apache/kafka/clients/admin/internals/DeleteConsumerGroupsHandler.java
b/clients/src/main/java/org/apache/kafka/clients/admin/internals/DeleteConsumerGroupsHandler.java
index 0d581243ddc..c3b248838f3 100644
---
a/clients/src/main/java/org/apache/kafka/clients/admin/internals/DeleteConsumerGroupsHandler.java
+++
b/clients/src/main/java/org/apache/kafka/clients/admin/internals/DeleteConsumerGroupsHandler.java
@@ -16,37 +16,14 @@
*/
package org.apache.kafka.clients.admin.internals;
-import org.apache.kafka.common.Node;
-import org.apache.kafka.common.message.DeleteGroupsRequestData;
-import
org.apache.kafka.common.message.DeleteGroupsResponseData.DeletableGroupResult;
-import org.apache.kafka.common.protocol.Errors;
-import org.apache.kafka.common.requests.AbstractResponse;
-import org.apache.kafka.common.requests.DeleteGroupsRequest;
-import org.apache.kafka.common.requests.DeleteGroupsResponse;
-import org.apache.kafka.common.requests.FindCoordinatorRequest.CoordinatorType;
import org.apache.kafka.common.utils.LogContext;
-import org.slf4j.Logger;
-
-import java.util.ArrayList;
-import java.util.Collection;
-import java.util.HashMap;
-import java.util.HashSet;
-import java.util.List;
-import java.util.Map;
-import java.util.Set;
-import java.util.stream.Collectors;
-
-public class DeleteConsumerGroupsHandler extends
AdminApiHandler.Batched<CoordinatorKey, Void> {
-
- private final Logger log;
- private final AdminApiLookupStrategy<CoordinatorKey> lookupStrategy;
+public class DeleteConsumerGroupsHandler extends DeleteGroupsHandler {
public DeleteConsumerGroupsHandler(
LogContext logContext
) {
- this.log = logContext.logger(DeleteConsumerGroupsHandler.class);
- this.lookupStrategy = new CoordinatorStrategy(CoordinatorType.GROUP,
logContext);
+ super(logContext, DeleteConsumerGroupsHandler.class);
}
@Override
@@ -55,92 +32,7 @@ public class DeleteConsumerGroupsHandler extends
AdminApiHandler.Batched<Coordin
}
@Override
- public AdminApiLookupStrategy<CoordinatorKey> lookupStrategy() {
- return lookupStrategy;
- }
-
- public static AdminApiFuture.SimpleAdminApiFuture<CoordinatorKey, Void>
newFuture(
- Collection<String> groupIds
- ) {
- return AdminApiFuture.forKeys(buildKeySet(groupIds));
- }
-
- private static Set<CoordinatorKey> buildKeySet(Collection<String>
groupIds) {
- return groupIds.stream()
- .map(CoordinatorKey::byGroupId)
- .collect(Collectors.toSet());
- }
-
- @Override
- public DeleteGroupsRequest.Builder buildBatchedRequest(
- int coordinatorId,
- Set<CoordinatorKey> keys
- ) {
- List<String> groupIds = keys.stream().map(key ->
key.idValue).collect(Collectors.toList());
- DeleteGroupsRequestData data = new DeleteGroupsRequestData()
- .setGroupsNames(groupIds);
- return new DeleteGroupsRequest.Builder(data);
- }
-
- @Override
- public ApiResult<CoordinatorKey, Void> handleResponse(
- Node coordinator,
- Set<CoordinatorKey> groupIds,
- AbstractResponse abstractResponse
- ) {
- final DeleteGroupsResponse response = (DeleteGroupsResponse)
abstractResponse;
- final Map<CoordinatorKey, Void> completed = new HashMap<>();
- final Map<CoordinatorKey, Throwable> failed = new HashMap<>();
- final Set<CoordinatorKey> groupsToUnmap = new HashSet<>();
-
- for (DeletableGroupResult deletedGroup : response.data().results()) {
- CoordinatorKey groupIdKey =
CoordinatorKey.byGroupId(deletedGroup.groupId());
- Errors error = Errors.forCode(deletedGroup.errorCode());
- if (error != Errors.NONE) {
- handleError(groupIdKey, error, failed, groupsToUnmap);
- continue;
- }
-
- completed.put(groupIdKey, null);
- }
-
- return new ApiResult<>(completed, failed, new
ArrayList<>(groupsToUnmap));
+ public String displayName() {
+ return "DeleteConsumerGroups";
}
-
- private void handleError(
- CoordinatorKey groupId,
- Errors error,
- Map<CoordinatorKey, Throwable> failed,
- Set<CoordinatorKey> groupsToUnmap
- ) {
- switch (error) {
- case GROUP_AUTHORIZATION_FAILED:
- case INVALID_GROUP_ID:
- case NON_EMPTY_GROUP:
- case GROUP_ID_NOT_FOUND:
- log.debug("`DeleteConsumerGroups` request for group id {}
failed due to error {}", groupId.idValue, error);
- failed.put(groupId, error.exception());
- break;
-
- case COORDINATOR_LOAD_IN_PROGRESS:
- // If the coordinator is in the middle of loading, then we
just need to retry
- log.debug("`DeleteConsumerGroups` request for group id {}
failed because the coordinator " +
- "is still in the process of loading state. Will retry",
groupId.idValue);
- break;
-
- case COORDINATOR_NOT_AVAILABLE:
- case NOT_COORDINATOR:
- // If the coordinator is unavailable or there was a
coordinator change, then we unmap
- // the key so that we retry the `FindCoordinator` request
- log.debug("`DeleteConsumerGroups` request for group id {}
returned error {}. " +
- "Will attempt to find the coordinator again and retry",
groupId.idValue, error);
- groupsToUnmap.add(groupId);
- break;
-
- default:
- log.error("`DeleteConsumerGroups` request for group id {}
failed due to unexpected error {}", groupId.idValue, error);
- failed.put(groupId, error.exception());
- }
- }
-
}
diff --git
a/clients/src/main/java/org/apache/kafka/clients/admin/internals/DeleteConsumerGroupsHandler.java
b/clients/src/main/java/org/apache/kafka/clients/admin/internals/DeleteGroupsHandler.java
similarity index 79%
copy from
clients/src/main/java/org/apache/kafka/clients/admin/internals/DeleteConsumerGroupsHandler.java
copy to
clients/src/main/java/org/apache/kafka/clients/admin/internals/DeleteGroupsHandler.java
index 0d581243ddc..bf8b7a2210b 100644
---
a/clients/src/main/java/org/apache/kafka/clients/admin/internals/DeleteConsumerGroupsHandler.java
+++
b/clients/src/main/java/org/apache/kafka/clients/admin/internals/DeleteGroupsHandler.java
@@ -14,16 +14,17 @@
* See the License for the specific language governing permissions and
* limitations under the License.
*/
+
package org.apache.kafka.clients.admin.internals;
import org.apache.kafka.common.Node;
import org.apache.kafka.common.message.DeleteGroupsRequestData;
-import
org.apache.kafka.common.message.DeleteGroupsResponseData.DeletableGroupResult;
+import org.apache.kafka.common.message.DeleteGroupsResponseData;
import org.apache.kafka.common.protocol.Errors;
import org.apache.kafka.common.requests.AbstractResponse;
import org.apache.kafka.common.requests.DeleteGroupsRequest;
import org.apache.kafka.common.requests.DeleteGroupsResponse;
-import org.apache.kafka.common.requests.FindCoordinatorRequest.CoordinatorType;
+import org.apache.kafka.common.requests.FindCoordinatorRequest;
import org.apache.kafka.common.utils.LogContext;
import org.slf4j.Logger;
@@ -37,22 +38,21 @@ import java.util.Map;
import java.util.Set;
import java.util.stream.Collectors;
-public class DeleteConsumerGroupsHandler extends
AdminApiHandler.Batched<CoordinatorKey, Void> {
-
+public abstract class DeleteGroupsHandler extends
AdminApiHandler.Batched<CoordinatorKey, Void> {
private final Logger log;
private final AdminApiLookupStrategy<CoordinatorKey> lookupStrategy;
- public DeleteConsumerGroupsHandler(
- LogContext logContext
+ public DeleteGroupsHandler(
+ LogContext logContext,
+ Class<?> loggerClass
) {
- this.log = logContext.logger(DeleteConsumerGroupsHandler.class);
- this.lookupStrategy = new CoordinatorStrategy(CoordinatorType.GROUP,
logContext);
+ this.log = logContext.logger(loggerClass);
+ this.lookupStrategy = new
CoordinatorStrategy(FindCoordinatorRequest.CoordinatorType.GROUP, logContext);
}
- @Override
- public String apiName() {
- return "deleteConsumerGroups";
- }
+ public abstract String apiName();
+
+ public abstract String displayName();
@Override
public AdminApiLookupStrategy<CoordinatorKey> lookupStrategy() {
@@ -93,7 +93,7 @@ public class DeleteConsumerGroupsHandler extends
AdminApiHandler.Batched<Coordin
final Map<CoordinatorKey, Throwable> failed = new HashMap<>();
final Set<CoordinatorKey> groupsToUnmap = new HashSet<>();
- for (DeletableGroupResult deletedGroup : response.data().results()) {
+ for (DeleteGroupsResponseData.DeletableGroupResult deletedGroup :
response.data().results()) {
CoordinatorKey groupIdKey =
CoordinatorKey.byGroupId(deletedGroup.groupId());
Errors error = Errors.forCode(deletedGroup.errorCode());
if (error != Errors.NONE) {
@@ -118,29 +118,28 @@ public class DeleteConsumerGroupsHandler extends
AdminApiHandler.Batched<Coordin
case INVALID_GROUP_ID:
case NON_EMPTY_GROUP:
case GROUP_ID_NOT_FOUND:
- log.debug("`DeleteConsumerGroups` request for group id {}
failed due to error {}", groupId.idValue, error);
+ log.debug("`{}` request for group id {} failed due to error
{}", displayName(), groupId.idValue, error);
failed.put(groupId, error.exception());
break;
case COORDINATOR_LOAD_IN_PROGRESS:
// If the coordinator is in the middle of loading, then we
just need to retry
- log.debug("`DeleteConsumerGroups` request for group id {}
failed because the coordinator " +
- "is still in the process of loading state. Will retry",
groupId.idValue);
+ log.debug("`{}` request for group id {} failed because the
coordinator " +
+ "is still in the process of loading state. Will retry",
displayName(), groupId.idValue);
break;
case COORDINATOR_NOT_AVAILABLE:
case NOT_COORDINATOR:
// If the coordinator is unavailable or there was a
coordinator change, then we unmap
// the key so that we retry the `FindCoordinator` request
- log.debug("`DeleteConsumerGroups` request for group id {}
returned error {}. " +
- "Will attempt to find the coordinator again and retry",
groupId.idValue, error);
+ log.debug("`{}` request for group id {} returned error {}. " +
+ "Will attempt to find the coordinator again and retry",
displayName(), groupId.idValue, error);
groupsToUnmap.add(groupId);
break;
default:
- log.error("`DeleteConsumerGroups` request for group id {}
failed due to unexpected error {}", groupId.idValue, error);
+ log.error("`{}` request for group id {} failed due to
unexpected error {}", displayName(), groupId.idValue, error);
failed.put(groupId, error.exception());
}
}
-
}
diff --git
a/clients/src/main/java/org/apache/kafka/clients/admin/internals/DeleteShareGroupsHandler.java
b/clients/src/main/java/org/apache/kafka/clients/admin/internals/DeleteShareGroupsHandler.java
new file mode 100644
index 00000000000..9b43509d415
--- /dev/null
+++
b/clients/src/main/java/org/apache/kafka/clients/admin/internals/DeleteShareGroupsHandler.java
@@ -0,0 +1,38 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.kafka.clients.admin.internals;
+
+import org.apache.kafka.common.utils.LogContext;
+
+public class DeleteShareGroupsHandler extends DeleteGroupsHandler {
+ public DeleteShareGroupsHandler(
+ LogContext logContext
+ ) {
+ super(logContext, DeleteShareGroupsHandler.class);
+ }
+
+ @Override
+ public String apiName() {
+ return "deleteShareGroups";
+ }
+
+ @Override
+ public String displayName() {
+ return "DeleteShareGroups";
+ }
+}
diff --git
a/clients/src/test/java/org/apache/kafka/clients/admin/MockAdminClient.java
b/clients/src/test/java/org/apache/kafka/clients/admin/MockAdminClient.java
index e91a2f92204..5c56665eb7d 100644
--- a/clients/src/test/java/org/apache/kafka/clients/admin/MockAdminClient.java
+++ b/clients/src/test/java/org/apache/kafka/clients/admin/MockAdminClient.java
@@ -1399,6 +1399,11 @@ public class MockAdminClient extends AdminClient {
throw new UnsupportedOperationException("Not implemented yet");
}
+ @Override
+ public synchronized DeleteShareGroupsResult
deleteShareGroups(Collection<String> groupIds, DeleteShareGroupsOptions
options) {
+ throw new UnsupportedOperationException("Not implemented yet");
+ }
+
@Override
public synchronized DescribeClassicGroupsResult
describeClassicGroups(Collection<String> groupIds, DescribeClassicGroupsOptions
options) {
throw new UnsupportedOperationException("Not implemented yet");
diff --git
a/clients/src/test/java/org/apache/kafka/clients/admin/internals/DeleteConsumerGroupsHandlerTest.java
b/clients/src/test/java/org/apache/kafka/clients/admin/internals/DeleteConsumerGroupsHandlerTest.java
index 8c4d9eb0eff..773708aa2f6 100644
---
a/clients/src/test/java/org/apache/kafka/clients/admin/internals/DeleteConsumerGroupsHandlerTest.java
+++
b/clients/src/test/java/org/apache/kafka/clients/admin/internals/DeleteConsumerGroupsHandlerTest.java
@@ -14,117 +14,13 @@
* See the License for the specific language governing permissions and
* limitations under the License.
*/
+
package org.apache.kafka.clients.admin.internals;
-import org.apache.kafka.common.Node;
-import org.apache.kafka.common.errors.GroupAuthorizationException;
-import org.apache.kafka.common.errors.GroupIdNotFoundException;
-import org.apache.kafka.common.errors.GroupNotEmptyException;
-import org.apache.kafka.common.errors.InvalidGroupIdException;
-import org.apache.kafka.common.message.DeleteGroupsResponseData;
-import
org.apache.kafka.common.message.DeleteGroupsResponseData.DeletableGroupResult;
-import
org.apache.kafka.common.message.DeleteGroupsResponseData.DeletableGroupResultCollection;
-import org.apache.kafka.common.protocol.Errors;
-import org.apache.kafka.common.requests.DeleteGroupsRequest;
-import org.apache.kafka.common.requests.DeleteGroupsResponse;
import org.apache.kafka.common.utils.LogContext;
-import org.junit.jupiter.api.Test;
-
-import static java.util.Collections.emptyList;
-import static java.util.Collections.emptySet;
-import static java.util.Collections.singleton;
-import static java.util.Collections.singletonList;
-import static org.junit.jupiter.api.Assertions.assertEquals;
-import static org.junit.jupiter.api.Assertions.assertInstanceOf;
-
-public class DeleteConsumerGroupsHandlerTest {
-
- private final LogContext logContext = new LogContext();
- private final String groupId1 = "group-id1";
-
- @Test
- public void testBuildRequest() {
- DeleteConsumerGroupsHandler handler = new
DeleteConsumerGroupsHandler(logContext);
- DeleteGroupsRequest request = handler.buildBatchedRequest(1,
singleton(CoordinatorKey.byGroupId(groupId1))).build();
- assertEquals(1, request.data().groupsNames().size());
- assertEquals(groupId1, request.data().groupsNames().get(0));
- }
-
- @Test
- public void testSuccessfulHandleResponse() {
- assertCompleted(handleWithError(Errors.NONE));
- }
-
- @Test
- public void testUnmappedHandleResponse() {
- assertUnmapped(handleWithError(Errors.NOT_COORDINATOR));
- assertUnmapped(handleWithError(Errors.COORDINATOR_NOT_AVAILABLE));
- }
-
- @Test
- public void testRetriableHandleResponse() {
- assertRetriable(handleWithError(Errors.COORDINATOR_LOAD_IN_PROGRESS));
- }
-
- @Test
- public void testFailedHandleResponse() {
- assertFailed(GroupAuthorizationException.class,
handleWithError(Errors.GROUP_AUTHORIZATION_FAILED));
- assertFailed(GroupIdNotFoundException.class,
handleWithError(Errors.GROUP_ID_NOT_FOUND));
- assertFailed(InvalidGroupIdException.class,
handleWithError(Errors.INVALID_GROUP_ID));
- assertFailed(GroupNotEmptyException.class,
handleWithError(Errors.NON_EMPTY_GROUP));
- }
-
- private DeleteGroupsResponse buildResponse(Errors error) {
- return new DeleteGroupsResponse(
- new DeleteGroupsResponseData()
- .setResults(new
DeletableGroupResultCollection(singletonList(
- new DeletableGroupResult()
- .setErrorCode(error.code())
- .setGroupId(groupId1)).iterator())));
- }
-
- private AdminApiHandler.ApiResult<CoordinatorKey, Void> handleWithError(
- Errors error
- ) {
- DeleteConsumerGroupsHandler handler = new
DeleteConsumerGroupsHandler(logContext);
- DeleteGroupsResponse response = buildResponse(error);
- return handler.handleResponse(new Node(1, "host", 1234),
singleton(CoordinatorKey.byGroupId(groupId1)), response);
- }
-
- private void assertUnmapped(
- AdminApiHandler.ApiResult<CoordinatorKey, Void> result
- ) {
- assertEquals(emptySet(), result.completedKeys.keySet());
- assertEquals(emptySet(), result.failedKeys.keySet());
- assertEquals(singletonList(CoordinatorKey.byGroupId(groupId1)),
result.unmappedKeys);
- }
-
- private void assertRetriable(
- AdminApiHandler.ApiResult<CoordinatorKey, Void> result
- ) {
- assertEquals(emptySet(), result.completedKeys.keySet());
- assertEquals(emptySet(), result.failedKeys.keySet());
- assertEquals(emptyList(), result.unmappedKeys);
- }
-
- private void assertCompleted(
- AdminApiHandler.ApiResult<CoordinatorKey, Void> result
- ) {
- CoordinatorKey key = CoordinatorKey.byGroupId(groupId1);
- assertEquals(emptySet(), result.failedKeys.keySet());
- assertEquals(emptyList(), result.unmappedKeys);
- assertEquals(singleton(key), result.completedKeys.keySet());
- }
-
- private void assertFailed(
- Class<? extends Throwable> expectedExceptionType,
- AdminApiHandler.ApiResult<CoordinatorKey, Void> result
- ) {
- CoordinatorKey key = CoordinatorKey.byGroupId(groupId1);
- assertEquals(emptySet(), result.completedKeys.keySet());
- assertEquals(emptyList(), result.unmappedKeys);
- assertEquals(singleton(key), result.failedKeys.keySet());
- assertInstanceOf(expectedExceptionType, result.failedKeys.get(key));
+public class DeleteConsumerGroupsHandlerTest extends DeleteGroupsHandlerTest {
+ protected DeleteGroupsHandler handler() {
+ return new DeleteConsumerGroupsHandler(new LogContext());
}
}
diff --git
a/clients/src/test/java/org/apache/kafka/clients/admin/internals/DeleteConsumerGroupsHandlerTest.java
b/clients/src/test/java/org/apache/kafka/clients/admin/internals/DeleteGroupsHandlerTest.java
similarity index 94%
copy from
clients/src/test/java/org/apache/kafka/clients/admin/internals/DeleteConsumerGroupsHandlerTest.java
copy to
clients/src/test/java/org/apache/kafka/clients/admin/internals/DeleteGroupsHandlerTest.java
index 8c4d9eb0eff..6b65de8d716 100644
---
a/clients/src/test/java/org/apache/kafka/clients/admin/internals/DeleteConsumerGroupsHandlerTest.java
+++
b/clients/src/test/java/org/apache/kafka/clients/admin/internals/DeleteGroupsHandlerTest.java
@@ -27,8 +27,8 @@ import
org.apache.kafka.common.message.DeleteGroupsResponseData.DeletableGroupRe
import org.apache.kafka.common.protocol.Errors;
import org.apache.kafka.common.requests.DeleteGroupsRequest;
import org.apache.kafka.common.requests.DeleteGroupsResponse;
-import org.apache.kafka.common.utils.LogContext;
+import org.junit.jupiter.api.BeforeEach;
import org.junit.jupiter.api.Test;
import static java.util.Collections.emptyList;
@@ -38,14 +38,21 @@ import static java.util.Collections.singletonList;
import static org.junit.jupiter.api.Assertions.assertEquals;
import static org.junit.jupiter.api.Assertions.assertInstanceOf;
-public class DeleteConsumerGroupsHandlerTest {
+public abstract class DeleteGroupsHandlerTest {
- private final LogContext logContext = new LogContext();
private final String groupId1 = "group-id1";
+ private DeleteGroupsHandler handler;
+
+ @BeforeEach
+ public void setup() {
+ handler = handler();
+ }
+
+ protected abstract DeleteGroupsHandler handler();
+
@Test
public void testBuildRequest() {
- DeleteConsumerGroupsHandler handler = new
DeleteConsumerGroupsHandler(logContext);
DeleteGroupsRequest request = handler.buildBatchedRequest(1,
singleton(CoordinatorKey.byGroupId(groupId1))).build();
assertEquals(1, request.data().groupsNames().size());
assertEquals(groupId1, request.data().groupsNames().get(0));
@@ -87,7 +94,6 @@ public class DeleteConsumerGroupsHandlerTest {
private AdminApiHandler.ApiResult<CoordinatorKey, Void> handleWithError(
Errors error
) {
- DeleteConsumerGroupsHandler handler = new
DeleteConsumerGroupsHandler(logContext);
DeleteGroupsResponse response = buildResponse(error);
return handler.handleResponse(new Node(1, "host", 1234),
singleton(CoordinatorKey.byGroupId(groupId1)), response);
}
diff --git
a/clients/src/test/java/org/apache/kafka/clients/admin/internals/DeleteShareGroupsHandlerTest.java
b/clients/src/test/java/org/apache/kafka/clients/admin/internals/DeleteShareGroupsHandlerTest.java
new file mode 100644
index 00000000000..26f9d97f77e
--- /dev/null
+++
b/clients/src/test/java/org/apache/kafka/clients/admin/internals/DeleteShareGroupsHandlerTest.java
@@ -0,0 +1,26 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.kafka.clients.admin.internals;
+
+import org.apache.kafka.common.utils.LogContext;
+
+public class DeleteShareGroupsHandlerTest extends DeleteGroupsHandlerTest {
+ protected DeleteGroupsHandler handler() {
+ return new DeleteShareGroupsHandler(new LogContext());
+ }
+}
diff --git
a/streams/integration-tests/src/test/java/org/apache/kafka/streams/integration/TestingMetricsInterceptingAdminClient.java
b/streams/integration-tests/src/test/java/org/apache/kafka/streams/integration/TestingMetricsInterceptingAdminClient.java
index 888d690e1f6..c996e148b2b 100644
---
a/streams/integration-tests/src/test/java/org/apache/kafka/streams/integration/TestingMetricsInterceptingAdminClient.java
+++
b/streams/integration-tests/src/test/java/org/apache/kafka/streams/integration/TestingMetricsInterceptingAdminClient.java
@@ -55,6 +55,8 @@ import
org.apache.kafka.clients.admin.DeleteConsumerGroupsOptions;
import org.apache.kafka.clients.admin.DeleteConsumerGroupsResult;
import org.apache.kafka.clients.admin.DeleteRecordsOptions;
import org.apache.kafka.clients.admin.DeleteRecordsResult;
+import org.apache.kafka.clients.admin.DeleteShareGroupsOptions;
+import org.apache.kafka.clients.admin.DeleteShareGroupsResult;
import org.apache.kafka.clients.admin.DeleteTopicsOptions;
import org.apache.kafka.clients.admin.DeleteTopicsResult;
import org.apache.kafka.clients.admin.DescribeAclsOptions;
@@ -428,6 +430,11 @@ public class TestingMetricsInterceptingAdminClient extends
AdminClient {
return adminDelegate.listShareGroupOffsets(groupSpecs, options);
}
+ @Override
+ public DeleteShareGroupsResult deleteShareGroups(final Collection<String>
groupIds, final DeleteShareGroupsOptions options) {
+ return adminDelegate.deleteShareGroups(groupIds, options);
+ }
+
@Override
public DescribeClassicGroupsResult describeClassicGroups(final
Collection<String> groupIds, final DescribeClassicGroupsOptions options) {
return adminDelegate.describeClassicGroups(groupIds, options);
diff --git
a/tools/src/main/java/org/apache/kafka/tools/consumer/group/ShareGroupCommand.java
b/tools/src/main/java/org/apache/kafka/tools/consumer/group/ShareGroupCommand.java
index e5b01a12d9e..246a5eecbf1 100644
---
a/tools/src/main/java/org/apache/kafka/tools/consumer/group/ShareGroupCommand.java
+++
b/tools/src/main/java/org/apache/kafka/tools/consumer/group/ShareGroupCommand.java
@@ -17,7 +17,9 @@
package org.apache.kafka.tools.consumer.group;
import org.apache.kafka.clients.CommonClientConfigs;
+import org.apache.kafka.clients.admin.AbstractOptions;
import org.apache.kafka.clients.admin.Admin;
+import org.apache.kafka.clients.admin.DeleteShareGroupsOptions;
import org.apache.kafka.clients.admin.DescribeShareGroupsOptions;
import org.apache.kafka.clients.admin.GroupListing;
import org.apache.kafka.clients.admin.ListGroupsOptions;
@@ -30,6 +32,7 @@ import org.apache.kafka.common.GroupState;
import org.apache.kafka.common.GroupType;
import org.apache.kafka.common.KafkaFuture;
import org.apache.kafka.common.TopicPartition;
+import org.apache.kafka.common.errors.GroupNotEmptyException;
import org.apache.kafka.common.utils.Utils;
import org.apache.kafka.server.util.CommandLineUtils;
@@ -40,6 +43,7 @@ import java.util.Arrays;
import java.util.Collection;
import java.util.HashMap;
import java.util.HashSet;
+import java.util.LinkedHashSet;
import java.util.List;
import java.util.Map;
import java.util.Map.Entry;
@@ -81,7 +85,7 @@ public class ShareGroupCommand {
} else if (opts.options.has(opts.describeOpt)) {
shareGroupService.describeGroups();
} else if (opts.options.has(opts.deleteOpt)) {
- throw new UnsupportedOperationException("--delete option is
not yet implemented");
+ shareGroupService.deleteShareGroups();
} else if (opts.options.has(opts.resetOffsetsOpt)) {
throw new UnsupportedOperationException("--reset-offsets
option is not yet implemented");
} else if (opts.options.has(opts.deleteOffsetsOpt)) {
@@ -154,6 +158,18 @@ public class ShareGroupCommand {
}
}
+ List<GroupListing> listDetailedShareGroups() {
+ try {
+ ListGroupsResult result = adminClient.listGroups(new
ListGroupsOptions()
+
.timeoutMs(opts.options.valueOf(opts.timeoutMsOpt).intValue())
+ .withTypes(Set.of(GroupType.SHARE)));
+ Collection<GroupListing> listings = result.all().get();
+ return listings.stream().toList();
+ } catch (InterruptedException | ExecutionException e) {
+ throw new RuntimeException(e);
+ }
+ }
+
List<GroupListing> listShareGroupsInStates(Set<GroupState> states)
throws ExecutionException, InterruptedException {
ListGroupsResult result = adminClient.listGroups(new
ListGroupsOptions()
.timeoutMs(opts.options.valueOf(opts.timeoutMsOpt).intValue())
@@ -208,6 +224,72 @@ public class ShareGroupCommand {
}
}
+ Map<String, Throwable> deleteShareGroups() {
+ List<GroupListing> shareGroupIds = listDetailedShareGroups();
+ List<String> groupIds = opts.options.has(opts.allGroupsOpt)
+ ? shareGroupIds.stream().map(GroupListing::groupId).toList()
+ : opts.options.valuesOf(opts.groupOpt);
+
+ // Pre admin call checks
+ LinkedHashSet<String> groupIdSet = new LinkedHashSet<>(groupIds);
+ Map<String, Exception> errGroups = new HashMap<>();
+ for (String groupId : groupIdSet) {
+ Optional<GroupListing> listing =
shareGroupIds.stream().filter(item -> item.groupId().equals(groupId)).findAny();
+ if (listing.isEmpty()) {
+ errGroups.put(groupId, new IllegalArgumentException("Group
'" + groupId + "' is not a share group."));
+ } else {
+ Optional<GroupState> groupState =
listing.get().groupState();
+ groupState.ifPresent(state -> {
+ if (state == GroupState.DEAD) {
+ errGroups.put(groupId, new
IllegalStateException("Share group '" + groupId + "' group state is DEAD."));
+ } else if (state != GroupState.EMPTY) {
+ errGroups.put(groupId, new
GroupNotEmptyException("Share group '" + groupId + "' is not EMPTY."));
+ }
+ });
+ }
+ }
+
+ groupIdSet.removeAll(errGroups.keySet());
+
+ Map<String, KafkaFuture<Void>> groupsToDelete =
groupIdSet.isEmpty() ? Map.of() : adminClient.deleteShareGroups(
+ groupIdSet.stream().toList(),
+ withTimeoutMs(new DeleteShareGroupsOptions())
+ ).deletedGroups();
+
+ Map<String, Throwable> success = new HashMap<>();
+ Map<String, Throwable> failed = new HashMap<>(errGroups);
+
+ groupsToDelete.forEach((g, f) -> {
+ try {
+ f.get();
+ success.put(g, null);
+ } catch (InterruptedException ie) {
+ failed.put(g, ie);
+ } catch (ExecutionException e) {
+ failed.put(g, e.getCause());
+ }
+ });
+
+ if (failed.isEmpty())
+ System.out.println("Deletion of requested share groups (" +
success.keySet().stream().map(group -> "'" + group +
"'").collect(Collectors.joining(", ")) + ") was successful.");
+ else {
+ printError("Deletion of some share groups failed:",
Optional.empty());
+ failed.forEach((group, error) -> System.out.println("* Share
group '" + group + "' could not be deleted due to: " + error));
+
+ if (!success.isEmpty())
+ System.out.println("\nThese share groups were deleted
successfully: " + success.keySet().stream().map(group -> "'" + group +
"'").collect(Collectors.joining(",")));
+ }
+
+ failed.putAll(success);
+
+ return failed;
+ }
+
+ private <T extends AbstractOptions<T>> T withTimeoutMs(T options) {
+ int t = opts.options.valueOf(opts.timeoutMsOpt).intValue();
+ return options.timeoutMs(t);
+ }
+
Map<String, ShareGroupDescription>
describeShareGroups(Collection<String> groupIds) throws ExecutionException,
InterruptedException {
Map<String, ShareGroupDescription> res = new HashMap<>();
Map<String, KafkaFuture<ShareGroupDescription>>
stringKafkaFutureMap = adminClient.describeShareGroups(
diff --git
a/tools/src/main/java/org/apache/kafka/tools/consumer/group/ShareGroupCommandOptions.java
b/tools/src/main/java/org/apache/kafka/tools/consumer/group/ShareGroupCommandOptions.java
index 0c1a2a285ec..3ba0a707ee5 100644
---
a/tools/src/main/java/org/apache/kafka/tools/consumer/group/ShareGroupCommandOptions.java
+++
b/tools/src/main/java/org/apache/kafka/tools/consumer/group/ShareGroupCommandOptions.java
@@ -173,9 +173,12 @@ public class ShareGroupCommandOptions extends
CommandDefaultOptions {
}
if (options.has(deleteOpt)) {
- if (!options.has(groupOpt))
+ if (!options.has(groupOpt) && !options.has(allGroupsOpt))
+ CommandLineUtils.printUsageAndExit(parser,
+ String.format("Option %s takes the options %s or %s",
deleteOpt, groupOpt, allGroupsOpt));
+ if (options.has(allGroupsOpt) && options.has(groupOpt))
CommandLineUtils.printUsageAndExit(parser,
- "Option " + deleteOpt + " takes the option: " + groupOpt);
+ String.format("Option %s takes either %s or %s, not
both.", deleteOpt, groupOpt, allGroupsOpt));
if (options.has(topicOpt))
CommandLineUtils.printUsageAndExit(parser,
"Option " + deleteOpt + " does not take the option: " +
topicOpt);
diff --git
a/tools/src/test/java/org/apache/kafka/tools/consumer/group/ShareGroupCommandTest.java
b/tools/src/test/java/org/apache/kafka/tools/consumer/group/ShareGroupCommandTest.java
index db58847d34a..46103e10f43 100644
---
a/tools/src/test/java/org/apache/kafka/tools/consumer/group/ShareGroupCommandTest.java
+++
b/tools/src/test/java/org/apache/kafka/tools/consumer/group/ShareGroupCommandTest.java
@@ -16,8 +16,10 @@
*/
package org.apache.kafka.tools.consumer.group;
+
import org.apache.kafka.clients.admin.Admin;
import org.apache.kafka.clients.admin.AdminClientTestUtils;
+import org.apache.kafka.clients.admin.DeleteShareGroupsResult;
import org.apache.kafka.clients.admin.DescribeShareGroupsOptions;
import org.apache.kafka.clients.admin.DescribeShareGroupsResult;
import org.apache.kafka.clients.admin.GroupListing;
@@ -36,11 +38,14 @@ import org.apache.kafka.common.Node;
import org.apache.kafka.common.TopicPartition;
import org.apache.kafka.common.errors.GroupIdNotFoundException;
import org.apache.kafka.common.internals.KafkaFutureImpl;
+import org.apache.kafka.common.utils.Exit;
import org.apache.kafka.test.TestUtils;
import org.apache.kafka.tools.ToolsTestUtils;
import
org.apache.kafka.tools.consumer.group.ShareGroupCommand.ShareGroupService;
+import org.junit.jupiter.api.AfterEach;
import org.junit.jupiter.api.Assertions;
+import org.junit.jupiter.api.BeforeEach;
import org.junit.jupiter.api.Test;
import org.mockito.ArgumentMatchers;
@@ -48,7 +53,9 @@ import java.util.ArrayList;
import java.util.Arrays;
import java.util.Collection;
import java.util.Collections;
+import java.util.HashMap;
import java.util.HashSet;
+import java.util.LinkedHashMap;
import java.util.List;
import java.util.Map;
import java.util.Map.Entry;
@@ -56,6 +63,7 @@ import java.util.Objects;
import java.util.Optional;
import java.util.Set;
import java.util.concurrent.ExecutionException;
+import java.util.concurrent.atomic.AtomicBoolean;
import java.util.stream.Stream;
import joptsimple.OptionException;
@@ -63,11 +71,15 @@ import joptsimple.OptionException;
import static org.junit.jupiter.api.Assertions.assertEquals;
import static org.junit.jupiter.api.Assertions.assertFalse;
import static org.junit.jupiter.api.Assertions.assertInstanceOf;
+import static org.junit.jupiter.api.Assertions.assertNotEquals;
import static org.junit.jupiter.api.Assertions.assertThrows;
import static org.junit.jupiter.api.Assertions.assertTrue;
import static org.junit.jupiter.api.Assertions.fail;
import static org.mockito.ArgumentMatchers.any;
+import static org.mockito.ArgumentMatchers.anyList;
import static org.mockito.Mockito.mock;
+import static org.mockito.Mockito.times;
+import static org.mockito.Mockito.verify;
import static org.mockito.Mockito.when;
public class ShareGroupCommandTest {
@@ -76,6 +88,18 @@ public class ShareGroupCommandTest {
private static final List<List<String>> DESCRIBE_TYPE_STATE =
List.of(List.of("--state"), List.of("--state", "--verbose"));
private static final List<List<String>> DESCRIBE_TYPES =
Stream.of(DESCRIBE_TYPE_OFFSETS, DESCRIBE_TYPE_MEMBERS,
DESCRIBE_TYPE_STATE).flatMap(Collection::stream).toList();
+ @BeforeEach
+ public void setup() {
+ // nothing by default
+ Exit.setExitProcedure(((statusCode, message) -> {
+ }));
+ }
+
+ @AfterEach
+ public void teardown() {
+ Exit.resetExitProcedure();
+ }
+
@Test
public void testListShareGroups() throws Exception {
String firstGroup = "first-group";
@@ -588,8 +612,260 @@ public class ShareGroupCommandTest {
assertThrows(IllegalArgumentException.class, () ->
ShareGroupCommand.groupStatesFromString(" , ,"));
}
+ @Test
+ public void testDeleteShareGroupsArgs() {
+ String bootstrapServer = "localhost:9092";
+ Admin adminClient = mock(KafkaAdminClient.class);
+
+ mockListShareGroups(adminClient, new LinkedHashMap<>());
+
+ // no group spec args
+ String[] cgcArgs = new String[]{"--bootstrap-server", bootstrapServer,
"--delete"};
+ AtomicBoolean exited = new AtomicBoolean(false);
+ Exit.setExitProcedure(((statusCode, message) -> {
+ assertNotEquals(0, statusCode);
+ assertTrue(message.contains("Option [delete] takes the options
[group] or [all-groups]"));
+ exited.set(true);
+ }));
+ try {
+ getShareGroupService(cgcArgs, adminClient);
+ } finally {
+ assertTrue(exited.get());
+ }
+ }
+
+ @Test
+ public void testDeleteShareGroupsSuccess() {
+ String firstGroup = "first-group";
+ String secondGroup = "second-group";
+ String bootstrapServer = "localhost:9092";
+
+ String[] cgcArgs = new String[]{"--bootstrap-server", bootstrapServer,
"--delete", "--group", firstGroup, "--group", secondGroup};
+ Admin adminClient = mock(KafkaAdminClient.class);
+ DeleteShareGroupsResult result = mock(DeleteShareGroupsResult.class);
+ Map<String, KafkaFuture<Void>> deletedGroups = Map.of(
+ firstGroup, KafkaFuture.completedFuture(null),
+ secondGroup, KafkaFuture.completedFuture(null)
+ );
+
+ LinkedHashMap<String, GroupState> shareGroupMap = new
LinkedHashMap<>();
+ shareGroupMap.put(firstGroup, GroupState.EMPTY);
+ shareGroupMap.put(secondGroup, GroupState.EMPTY);
+ mockListShareGroups(adminClient, shareGroupMap);
+
+ when(result.deletedGroups()).thenReturn(deletedGroups);
+
+ Map<String, Throwable> expectedResults = new HashMap<>();
+ expectedResults.put(firstGroup, null);
+ expectedResults.put(secondGroup, null);
+
+ when(adminClient.deleteShareGroups(anyList(),
any())).thenReturn(result);
+
+ try (ShareGroupService service = getShareGroupService(cgcArgs,
adminClient)) {
+ assertEquals(expectedResults, service.deleteShareGroups());
+ }
+ }
+
+ @Test
+ public void testDeleteShareGroupsAllGroupsSuccess() {
+ String firstGroup = "first-group";
+ String secondGroup = "second-group";
+ String bootstrapServer = "localhost:9092";
+
+ String[] cgcArgs = new String[]{"--bootstrap-server", bootstrapServer,
"--delete", "--all-groups"};
+ Admin adminClient = mock(KafkaAdminClient.class);
+ DeleteShareGroupsResult result = mock(DeleteShareGroupsResult.class);
+ Map<String, KafkaFuture<Void>> deletedGroups = Map.of(
+ firstGroup, KafkaFuture.completedFuture(null),
+ secondGroup, KafkaFuture.completedFuture(null)
+ );
+
+ LinkedHashMap<String, GroupState> shareGroupMap = new
LinkedHashMap<>();
+ shareGroupMap.put(firstGroup, GroupState.EMPTY);
+ shareGroupMap.put(secondGroup, GroupState.EMPTY);
+ mockListShareGroups(adminClient, shareGroupMap);
+
+ when(result.deletedGroups()).thenReturn(deletedGroups);
+
+ Map<String, Throwable> expectedResults = new HashMap<>();
+ expectedResults.put(firstGroup, null);
+ expectedResults.put(secondGroup, null);
+
+ when(adminClient.deleteShareGroups(anyList(),
any())).thenReturn(result);
+
+ try (ShareGroupService service = getShareGroupService(cgcArgs,
adminClient)) {
+ assertEquals(expectedResults, service.deleteShareGroups());
+ }
+ }
+
+ @Test
+ public void testDeleteShareGroupsAllGroupsPartialFail() {
+ String firstGroup = "first-group";
+ String secondGroup = "second-group";
+ String bootstrapServer = "localhost:9092";
+
+ String[] cgcArgs = new String[]{"--bootstrap-server", bootstrapServer,
"--delete", "--all-groups"};
+ Admin adminClient = mock(KafkaAdminClient.class);
+ DeleteShareGroupsResult result = mock(DeleteShareGroupsResult.class);
+ KafkaFutureImpl<Void> future1 = new KafkaFutureImpl<>();
+ KafkaFutureImpl<Void> future2 = new KafkaFutureImpl<>();
+ future1.complete(null);
+ Exception exp = new Exception("bad");
+ future2.completeExceptionally(exp);
+ Map<String, KafkaFuture<Void>> deletedGroups = Map.of(
+ firstGroup, future1,
+ secondGroup, future2
+ );
+
+ LinkedHashMap<String, GroupState> shareGroupMap = new
LinkedHashMap<>();
+ shareGroupMap.put(firstGroup, GroupState.EMPTY);
+ shareGroupMap.put(secondGroup, GroupState.EMPTY);
+ mockListShareGroups(adminClient, shareGroupMap);
+
+ when(result.deletedGroups()).thenReturn(deletedGroups);
+
+ Map<String, Throwable> expectedResults = new HashMap<>();
+ expectedResults.put(firstGroup, null);
+ expectedResults.put(secondGroup, exp);
+
+ when(adminClient.deleteShareGroups(anyList(),
any())).thenReturn(result);
+
+ try (ShareGroupService service = getShareGroupService(cgcArgs,
adminClient)) {
+ assertEquals(expectedResults, service.deleteShareGroups());
+ }
+ }
+
+ @Test
+ public void testDeleteShareGroupsDeleteFailure() {
+ String firstGroup = "first-group";
+ String secondGroup = "second-group";
+ String bootstrapServer = "localhost:9092";
+
+ String[] cgcArgs = new String[]{"--bootstrap-server", bootstrapServer,
"--delete", "--group", firstGroup, "--group", secondGroup};
+ Admin adminClient = mock(KafkaAdminClient.class);
+ DeleteShareGroupsResult result = mock(DeleteShareGroupsResult.class);
+
+ LinkedHashMap<String, GroupState> shareGroupMap = new
LinkedHashMap<>();
+ shareGroupMap.put(firstGroup, GroupState.EMPTY);
+ shareGroupMap.put(secondGroup, GroupState.EMPTY);
+ mockListShareGroups(adminClient, shareGroupMap);
+
+ KafkaFutureImpl<Void> future = new KafkaFutureImpl<>();
+ Exception exp = new Exception("bad");
+ future.completeExceptionally(exp);
+ Map<String, KafkaFuture<Void>> deletedGroups = Map.of(
+ firstGroup, future,
+ secondGroup, future
+ );
+
+ when(result.deletedGroups()).thenReturn(deletedGroups);
+
+ Map<String, Throwable> expectedResults = new HashMap<>();
+ expectedResults.put(firstGroup, exp);
+ expectedResults.put(secondGroup, exp);
+
+ when(adminClient.deleteShareGroups(anyList(),
any())).thenReturn(result);
+
+ try (ShareGroupService service = getShareGroupService(cgcArgs,
adminClient)) {
+ assertEquals(expectedResults, service.deleteShareGroups());
+ }
+ }
+
+ @Test
+ public void testDeleteShareGroupsFailureNonShareGroup() {
+ String firstGroup = "first-group";
+ String bootstrapServer = "localhost:9092";
+
+ String[] cgcArgs = new String[]{"--bootstrap-server", bootstrapServer,
"--delete", "--group", firstGroup};
+ Admin adminClient = mock(KafkaAdminClient.class);
+ DeleteShareGroupsResult result = mock(DeleteShareGroupsResult.class);
+ mockListShareGroups(adminClient, new LinkedHashMap<>());
+
+ when(result.deletedGroups()).thenReturn(Map.of());
+
+ when(adminClient.deleteShareGroups(anyList(),
any())).thenReturn(result);
+
+ try (ShareGroupService service = getShareGroupService(cgcArgs,
adminClient)) {
+ service.deleteShareGroups();
+ verify(result, times(0)).deletedGroups();
+ verify(adminClient, times(0)).deleteShareGroups(anyList());
+ }
+ }
+
+ @Test
+ public void testDeleteShareGroupsFailureNonEmptyGroup() {
+ String firstGroup = "first-group";
+ String bootstrapServer = "localhost:9092";
+
+ String[] cgcArgs = new String[]{"--bootstrap-server", bootstrapServer,
"--delete", "--group", firstGroup};
+ Admin adminClient = mock(KafkaAdminClient.class);
+ DeleteShareGroupsResult result = mock(DeleteShareGroupsResult.class);
+
+ LinkedHashMap<String, GroupState> shareGroupMap = new
LinkedHashMap<>();
+ shareGroupMap.put(firstGroup, GroupState.STABLE);
+ mockListShareGroups(adminClient, shareGroupMap);
+
+ when(result.deletedGroups()).thenReturn(Map.of());
+
+ when(adminClient.deleteShareGroups(anyList(),
any())).thenReturn(result);
+
+ try (ShareGroupService service = getShareGroupService(cgcArgs,
adminClient)) {
+ service.deleteShareGroups();
+ verify(result, times(0)).deletedGroups();
+ verify(adminClient, times(0)).deleteShareGroups(anyList());
+ }
+ }
+
+ @Test
+ public void testDeleteShareGroupsPartialFailure() {
+ String firstGroup = "first-group";
+ String secondGroup = "second-group";
+ String bootstrapServer = "localhost:9092";
+
+ String[] cgcArgs = new String[]{"--bootstrap-server", bootstrapServer,
"--delete", "--group", firstGroup, "--group", secondGroup};
+ Admin adminClient = mock(KafkaAdminClient.class);
+ DeleteShareGroupsResult result = mock(DeleteShareGroupsResult.class);
+ LinkedHashMap<String, GroupState> shareGroupMap = new
LinkedHashMap<>();
+ shareGroupMap.put(firstGroup, GroupState.EMPTY);
+ shareGroupMap.put(secondGroup, GroupState.EMPTY);
+ mockListShareGroups(adminClient, shareGroupMap);
+ KafkaFutureImpl<Void> future1 = new KafkaFutureImpl<>();
+ KafkaFutureImpl<Void> future2 = new KafkaFutureImpl<>();
+ future1.complete(null);
+ Exception exp = new Exception("bad");
+ future2.completeExceptionally(exp);
+ Map<String, KafkaFuture<Void>> deletedGroups = Map.of(
+ firstGroup, future1,
+ secondGroup, future2
+ );
+
+ when(result.deletedGroups()).thenReturn(deletedGroups);
+
+ when(adminClient.deleteShareGroups(anyList(),
any())).thenReturn(result);
+ Map<String, Throwable> expectedResults = new HashMap<>();
+ expectedResults.put(firstGroup, null);
+ expectedResults.put(secondGroup, exp);
+
+ try (ShareGroupService service = getShareGroupService(cgcArgs,
adminClient)) {
+ assertEquals(expectedResults, service.deleteShareGroups());
+ }
+ }
+
+ private void mockListShareGroups(Admin client, LinkedHashMap<String,
GroupState> groupIds) {
+ ListGroupsResult listResult = mock(ListGroupsResult.class);
+ KafkaFutureImpl<Collection<GroupListing>> listFuture = new
KafkaFutureImpl<>();
+ List<GroupListing> groupListings = new ArrayList<>();
+ groupIds.forEach((groupId, state) -> groupListings.add(
+ new GroupListing(groupId, Optional.of(GroupType.SHARE), "share",
Optional.of(state))
+ ));
+ listFuture.complete(groupListings);
+ when(listResult.all()).thenReturn(listFuture);
+ when(client.listGroups(any())).thenReturn(listResult);
+ }
+
ShareGroupService getShareGroupService(String[] args, Admin adminClient) {
ShareGroupCommandOptions opts = new ShareGroupCommandOptions(args);
+ opts.checkArgs();
return new ShareGroupService(opts, adminClient);
}