This is an automated email from the ASF dual-hosted git repository.
manikumar pushed a commit to branch trunk
in repository https://gitbox.apache.org/repos/asf/kafka.git
The following commit(s) were added to refs/heads/trunk by this push:
new b9099301fbd KAFKA-16716: Add admin list and describe share groups
(#16827)
b9099301fbd is described below
commit b9099301fbd0a5b35e46b4f9ac026263c753cce6
Author: Andrew Schofield <[email protected]>
AuthorDate: Wed Aug 7 20:16:25 2024 +0100
KAFKA-16716: Add admin list and describe share groups (#16827)
Adds the methods to the admin client for listing and describing share
groups.
There are some unit tests, but integration tests will be in a follow-on PR.
Reviewers: Manikumar Reddy <[email protected]>
---
.../java/org/apache/kafka/clients/admin/Admin.java | 51 ++-
.../clients/admin/DescribeShareGroupsOptions.java | 41 +++
.../clients/admin/DescribeShareGroupsResult.java | 68 ++++
.../kafka/clients/admin/ForwardingAdmin.java | 10 +
.../kafka/clients/admin/KafkaAdminClient.java | 142 ++++++++
.../clients/admin/ListShareGroupsOptions.java | 51 +++
.../kafka/clients/admin/ListShareGroupsResult.java | 84 +++++
.../kafka/clients/admin/ShareGroupDescription.java | 124 +++++++
.../kafka/clients/admin/ShareGroupListing.java | 97 ++++++
.../internals/DescribeShareGroupsHandler.java | 198 +++++++++++
.../java/org/apache/kafka/common/GroupType.java | 3 +-
.../kafka/clients/admin/KafkaAdminClientTest.java | 363 +++++++++++++++++++++
.../kafka/clients/admin/MockAdminClient.java | 10 +
13 files changed, 1237 insertions(+), 5 deletions(-)
diff --git a/clients/src/main/java/org/apache/kafka/clients/admin/Admin.java
b/clients/src/main/java/org/apache/kafka/clients/admin/Admin.java
index 8d62069b279..71166ed0872 100644
--- a/clients/src/main/java/org/apache/kafka/clients/admin/Admin.java
+++ b/clients/src/main/java/org/apache/kafka/clients/admin/Admin.java
@@ -875,23 +875,23 @@ public interface Admin extends AutoCloseable {
DescribeDelegationTokenResult
describeDelegationToken(DescribeDelegationTokenOptions options);
/**
- * Describe some group IDs in the cluster.
+ * Describe some consumer groups in the cluster.
*
* @param groupIds The IDs of the groups to describe.
* @param options The options to use when describing the groups.
- * @return The DescribeConsumerGroupResult.
+ * @return The DescribeConsumerGroupsResult.
*/
DescribeConsumerGroupsResult describeConsumerGroups(Collection<String>
groupIds,
DescribeConsumerGroupsOptions options);
/**
- * Describe some group IDs in the cluster, with the default options.
+ * Describe some consumer groups in the cluster, with the default options.
* <p>
* This is a convenience method for {@link
#describeConsumerGroups(Collection, DescribeConsumerGroupsOptions)}
* with default options. See the overload for more details.
*
* @param groupIds The IDs of the groups to describe.
- * @return The DescribeConsumerGroupResult.
+ * @return The DescribeConsumerGroupsResult.
*/
default DescribeConsumerGroupsResult
describeConsumerGroups(Collection<String> groupIds) {
return describeConsumerGroups(groupIds, new
DescribeConsumerGroupsOptions());
@@ -1767,6 +1767,49 @@ public interface Admin extends AutoCloseable {
RemoveRaftVoterOptions options
);
+ /**
+ * Describe some share groups in the cluster.
+ *
+ * @param groupIds The IDs of the groups to describe.
+ * @param options The options to use when describing the groups.
+ * @return The DescribeShareGroupsResult.
+ */
+ DescribeShareGroupsResult describeShareGroups(Collection<String> groupIds,
+ DescribeShareGroupsOptions
options);
+
+ /**
+ * Describe some share groups in the cluster, with the default options.
+ * <p>
+ * This is a convenience method for {@link
#describeShareGroups(Collection, DescribeShareGroupsOptions)}
+ * with default options. See the overload for more details.
+ *
+ * @param groupIds The IDs of the groups to describe.
+ * @return The DescribeShareGroupsResult.
+ */
+ default DescribeShareGroupsResult describeShareGroups(Collection<String>
groupIds) {
+ return describeShareGroups(groupIds, new DescribeShareGroupsOptions());
+ }
+
+ /**
+ * List the share groups available in the cluster.
+ *
+ * @param options The options to use when listing the share groups.
+ * @return The ListShareGroupsResult.
+ */
+ ListShareGroupsResult listShareGroups(ListShareGroupsOptions options);
+
+ /**
+ * List the share groups available in the cluster with the default options.
+ * <p>
+ * This is a convenience method for {@link
#listShareGroups(ListShareGroupsOptions)} with default options.
+ * See the overload for more details.
+ *
+ * @return The ListShareGroupsResult.
+ */
+ default ListShareGroupsResult listShareGroups() {
+ return listShareGroups(new ListShareGroupsOptions());
+ }
+
/**
* Get the metrics kept by the adminClient
*/
diff --git
a/clients/src/main/java/org/apache/kafka/clients/admin/DescribeShareGroupsOptions.java
b/clients/src/main/java/org/apache/kafka/clients/admin/DescribeShareGroupsOptions.java
new file mode 100644
index 00000000000..2ac37bafed1
--- /dev/null
+++
b/clients/src/main/java/org/apache/kafka/clients/admin/DescribeShareGroupsOptions.java
@@ -0,0 +1,41 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.kafka.clients.admin;
+
+import org.apache.kafka.common.annotation.InterfaceStability;
+
+import java.util.Collection;
+
+/**
+ * Options for {@link Admin#describeShareGroups(Collection,
DescribeShareGroupsOptions)}.
+ * <p>
+ * The API of this class is evolving, see {@link Admin} for details.
+ */
[email protected]
+public class DescribeShareGroupsOptions extends
AbstractOptions<DescribeShareGroupsOptions> {
+ private boolean includeAuthorizedOperations;
+
+ public DescribeShareGroupsOptions includeAuthorizedOperations(boolean
includeAuthorizedOperations) {
+ this.includeAuthorizedOperations = includeAuthorizedOperations;
+ return this;
+ }
+
+ public boolean includeAuthorizedOperations() {
+ return includeAuthorizedOperations;
+ }
+}
\ No newline at end of file
diff --git
a/clients/src/main/java/org/apache/kafka/clients/admin/DescribeShareGroupsResult.java
b/clients/src/main/java/org/apache/kafka/clients/admin/DescribeShareGroupsResult.java
new file mode 100644
index 00000000000..0536b9e3f9d
--- /dev/null
+++
b/clients/src/main/java/org/apache/kafka/clients/admin/DescribeShareGroupsResult.java
@@ -0,0 +1,68 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.kafka.clients.admin;
+
+import org.apache.kafka.common.KafkaFuture;
+import org.apache.kafka.common.annotation.InterfaceStability;
+
+import java.util.Collection;
+import java.util.HashMap;
+import java.util.Map;
+import java.util.concurrent.ExecutionException;
+
+/**
+ * The result of the {@link KafkaAdminClient#describeShareGroups(Collection,
DescribeShareGroupsOptions)}} call.
+ * <p>
+ * The API of this class is evolving, see {@link Admin} for details.
+ */
[email protected]
+public class DescribeShareGroupsResult {
+
+ private final Map<String, KafkaFuture<ShareGroupDescription>> futures;
+
+ public DescribeShareGroupsResult(final Map<String,
KafkaFuture<ShareGroupDescription>> futures) {
+ this.futures = futures;
+ }
+
+ /**
+ * Return a map from group id to futures which yield share group
descriptions.
+ */
+ public Map<String, KafkaFuture<ShareGroupDescription>> describedGroups() {
+ return new HashMap<>(futures);
+ }
+
+ /**
+ * Return a future which yields all ShareGroupDescription objects, if all
the describes succeed.
+ */
+ public KafkaFuture<Map<String, ShareGroupDescription>> all() {
+ return KafkaFuture.allOf(futures.values().toArray(new
KafkaFuture[0])).thenApply(
+ nil -> {
+ Map<String, ShareGroupDescription> descriptions = new
HashMap<>(futures.size());
+ futures.forEach((key, future) -> {
+ try {
+ descriptions.put(key, future.get());
+ } catch (InterruptedException | ExecutionException e) {
+ // This should be unreachable, since the
KafkaFuture#allOf already ensured
+ // that all of the futures completed successfully.
+ throw new RuntimeException(e);
+ }
+ });
+ return descriptions;
+ });
+ }
+}
diff --git
a/clients/src/main/java/org/apache/kafka/clients/admin/ForwardingAdmin.java
b/clients/src/main/java/org/apache/kafka/clients/admin/ForwardingAdmin.java
index ad15c22498f..1f842444802 100644
--- a/clients/src/main/java/org/apache/kafka/clients/admin/ForwardingAdmin.java
+++ b/clients/src/main/java/org/apache/kafka/clients/admin/ForwardingAdmin.java
@@ -298,6 +298,16 @@ public class ForwardingAdmin implements Admin {
return delegate.removeRaftVoter(voterId, voterDirectoryId, options);
}
+ @Override
+ public DescribeShareGroupsResult describeShareGroups(Collection<String>
groupIds, DescribeShareGroupsOptions options) {
+ return delegate.describeShareGroups(groupIds, options);
+ }
+
+ @Override
+ public ListShareGroupsResult listShareGroups(ListShareGroupsOptions
options) {
+ return delegate.listShareGroups(options);
+ }
+
@Override
public Map<MetricName, ? extends Metric> metrics() {
return delegate.metrics();
diff --git
a/clients/src/main/java/org/apache/kafka/clients/admin/KafkaAdminClient.java
b/clients/src/main/java/org/apache/kafka/clients/admin/KafkaAdminClient.java
index 3672b61647d..d6dea2c57a0 100644
--- a/clients/src/main/java/org/apache/kafka/clients/admin/KafkaAdminClient.java
+++ b/clients/src/main/java/org/apache/kafka/clients/admin/KafkaAdminClient.java
@@ -50,6 +50,7 @@ import
org.apache.kafka.clients.admin.internals.DeleteConsumerGroupsHandler;
import org.apache.kafka.clients.admin.internals.DeleteRecordsHandler;
import org.apache.kafka.clients.admin.internals.DescribeConsumerGroupsHandler;
import org.apache.kafka.clients.admin.internals.DescribeProducersHandler;
+import org.apache.kafka.clients.admin.internals.DescribeShareGroupsHandler;
import org.apache.kafka.clients.admin.internals.DescribeTransactionsHandler;
import org.apache.kafka.clients.admin.internals.FenceProducersHandler;
import
org.apache.kafka.clients.admin.internals.ListConsumerGroupOffsetsHandler;
@@ -68,6 +69,7 @@ import org.apache.kafka.common.Metric;
import org.apache.kafka.common.MetricName;
import org.apache.kafka.common.Node;
import org.apache.kafka.common.PartitionInfo;
+import org.apache.kafka.common.ShareGroupState;
import org.apache.kafka.common.TopicCollection;
import org.apache.kafka.common.TopicCollection.TopicIdCollection;
import org.apache.kafka.common.TopicCollection.TopicNameCollection;
@@ -3671,6 +3673,146 @@ public class KafkaAdminClient extends AdminClient {
return new
DeleteConsumerGroupOffsetsResult(future.get(CoordinatorKey.byGroupId(groupId)),
partitions);
}
+ private static final class ListShareGroupsResults {
+ private final List<Throwable> errors;
+ private final HashMap<String, ShareGroupListing> listings;
+ private final HashSet<Node> remaining;
+ private final KafkaFutureImpl<Collection<Object>> future;
+
+ ListShareGroupsResults(Collection<Node> leaders,
+ KafkaFutureImpl<Collection<Object>> future) {
+ this.errors = new ArrayList<>();
+ this.listings = new HashMap<>();
+ this.remaining = new HashSet<>(leaders);
+ this.future = future;
+ tryComplete();
+ }
+
+ synchronized void addError(Throwable throwable, Node node) {
+ ApiError error = ApiError.fromThrowable(throwable);
+ if (error.message() == null || error.message().isEmpty()) {
+ errors.add(error.error().exception("Error listing groups on "
+ node));
+ } else {
+ errors.add(error.error().exception("Error listing groups on "
+ node + ": " + error.message()));
+ }
+ }
+
+ synchronized void addListing(ShareGroupListing listing) {
+ listings.put(listing.groupId(), listing);
+ }
+
+ synchronized void tryComplete(Node leader) {
+ remaining.remove(leader);
+ tryComplete();
+ }
+
+ private synchronized void tryComplete() {
+ if (remaining.isEmpty()) {
+ ArrayList<Object> results = new ArrayList<>(listings.values());
+ results.addAll(errors);
+ future.complete(results);
+ }
+ }
+ }
+
+ @Override
+ public DescribeShareGroupsResult describeShareGroups(final
Collection<String> groupIds,
+ final
DescribeShareGroupsOptions options) {
+ SimpleAdminApiFuture<CoordinatorKey, ShareGroupDescription> future =
+ DescribeShareGroupsHandler.newFuture(groupIds);
+ DescribeShareGroupsHandler handler = new
DescribeShareGroupsHandler(options.includeAuthorizedOperations(), logContext);
+ invokeDriver(handler, future, options.timeoutMs);
+ return new DescribeShareGroupsResult(future.all().entrySet().stream()
+ .collect(Collectors.toMap(entry -> entry.getKey().idValue,
Map.Entry::getValue)));
+ }
+
+ @Override
+ public ListShareGroupsResult listShareGroups(ListShareGroupsOptions
options) {
+ final KafkaFutureImpl<Collection<Object>> all = new
KafkaFutureImpl<>();
+ final long nowMetadata = time.milliseconds();
+ final long deadline = calcDeadlineMs(nowMetadata, options.timeoutMs());
+ runnable.call(new Call("findAllBrokers", deadline, new
LeastLoadedNodeProvider()) {
+ @Override
+ MetadataRequest.Builder createRequest(int timeoutMs) {
+ return new MetadataRequest.Builder(new MetadataRequestData()
+ .setTopics(Collections.emptyList())
+ .setAllowAutoTopicCreation(true));
+ }
+
+ @Override
+ void handleResponse(AbstractResponse abstractResponse) {
+ MetadataResponse metadataResponse = (MetadataResponse)
abstractResponse;
+ Collection<Node> nodes = metadataResponse.brokers();
+ if (nodes.isEmpty())
+ throw new StaleMetadataException("Metadata fetch failed
due to missing broker list");
+
+ HashSet<Node> allNodes = new HashSet<>(nodes);
+ final ListShareGroupsResults results = new
ListShareGroupsResults(allNodes, all);
+
+ for (final Node node : allNodes) {
+ final long nowList = time.milliseconds();
+ runnable.call(new Call("listShareGroups", deadline, new
ConstantNodeIdProvider(node.id())) {
+ @Override
+ ListGroupsRequest.Builder createRequest(int timeoutMs)
{
+ List<String> states = options.states()
+ .stream()
+ .map(ShareGroupState::toString)
+ .collect(Collectors.toList());
+ List<String> types =
Collections.singletonList(GroupType.SHARE.toString());
+ return new ListGroupsRequest.Builder(new
ListGroupsRequestData()
+ .setStatesFilter(states)
+ .setTypesFilter(types)
+ );
+ }
+
+ private void
maybeAddShareGroup(ListGroupsResponseData.ListedGroup group) {
+ final String groupId = group.groupId();
+ final Optional<ShareGroupState> state =
group.groupState().isEmpty()
+ ? Optional.empty()
+ :
Optional.of(ShareGroupState.parse(group.groupState()));
+ final ShareGroupListing groupListing = new
ShareGroupListing(groupId, state);
+ results.addListing(groupListing);
+ }
+
+ @Override
+ void handleResponse(AbstractResponse abstractResponse)
{
+ final ListGroupsResponse response =
(ListGroupsResponse) abstractResponse;
+ synchronized (results) {
+ Errors error =
Errors.forCode(response.data().errorCode());
+ if (error ==
Errors.COORDINATOR_LOAD_IN_PROGRESS || error ==
Errors.COORDINATOR_NOT_AVAILABLE) {
+ throw error.exception();
+ } else if (error != Errors.NONE) {
+ results.addError(error.exception(), node);
+ } else {
+ for (ListGroupsResponseData.ListedGroup
group : response.data().groups()) {
+ maybeAddShareGroup(group);
+ }
+ }
+ results.tryComplete(node);
+ }
+ }
+
+ @Override
+ void handleFailure(Throwable throwable) {
+ synchronized (results) {
+ results.addError(throwable, node);
+ results.tryComplete(node);
+ }
+ }
+ }, nowList);
+ }
+ }
+
+ @Override
+ void handleFailure(Throwable throwable) {
+ KafkaException exception = new KafkaException("Failed to find
brokers to send ListGroups", throwable);
+ all.complete(Collections.singletonList(exception));
+ }
+ }, nowMetadata);
+
+ return new ListShareGroupsResult(all);
+ }
+
@Override
public Map<MetricName, ? extends Metric> metrics() {
return Collections.unmodifiableMap(this.metrics.metrics());
diff --git
a/clients/src/main/java/org/apache/kafka/clients/admin/ListShareGroupsOptions.java
b/clients/src/main/java/org/apache/kafka/clients/admin/ListShareGroupsOptions.java
new file mode 100644
index 00000000000..61f0aa40eb2
--- /dev/null
+++
b/clients/src/main/java/org/apache/kafka/clients/admin/ListShareGroupsOptions.java
@@ -0,0 +1,51 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.kafka.clients.admin;
+
+import org.apache.kafka.common.ShareGroupState;
+import org.apache.kafka.common.annotation.InterfaceStability;
+
+import java.util.Collections;
+import java.util.HashSet;
+import java.util.Set;
+
+/**
+ * Options for {@link Admin#listShareGroups(ListShareGroupsOptions)}.
+ *
+ * The API of this class is evolving, see {@link Admin} for details.
+ */
[email protected]
+public class ListShareGroupsOptions extends
AbstractOptions<ListShareGroupsOptions> {
+
+ private Set<ShareGroupState> states = Collections.emptySet();
+
+ /**
+ * If states is set, only groups in these states will be returned.
Otherwise, all groups are returned.
+ */
+ public ListShareGroupsOptions inStates(Set<ShareGroupState> states) {
+ this.states = (states == null) ? Collections.emptySet() : new
HashSet<>(states);
+ return this;
+ }
+
+ /**
+ * Return the list of States that are requested or empty if no states have
been specified.
+ */
+ public Set<ShareGroupState> states() {
+ return states;
+ }
+}
\ No newline at end of file
diff --git
a/clients/src/main/java/org/apache/kafka/clients/admin/ListShareGroupsResult.java
b/clients/src/main/java/org/apache/kafka/clients/admin/ListShareGroupsResult.java
new file mode 100644
index 00000000000..10d325bae54
--- /dev/null
+++
b/clients/src/main/java/org/apache/kafka/clients/admin/ListShareGroupsResult.java
@@ -0,0 +1,84 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.kafka.clients.admin;
+
+import org.apache.kafka.common.KafkaFuture;
+import org.apache.kafka.common.annotation.InterfaceStability;
+import org.apache.kafka.common.internals.KafkaFutureImpl;
+
+import java.util.ArrayList;
+import java.util.Collection;
+
+/**
+ * The result of the {@link Admin#listShareGroups(ListShareGroupsOptions)}
call.
+ * <p>
+ * The API of this class is evolving, see {@link Admin} for details.
+ */
[email protected]
+public class ListShareGroupsResult {
+
+ private final KafkaFutureImpl<Collection<ShareGroupListing>> all;
+ private final KafkaFutureImpl<Collection<ShareGroupListing>> valid;
+ private final KafkaFutureImpl<Collection<Throwable>> errors;
+
+ ListShareGroupsResult(KafkaFuture<Collection<Object>> future) {
+ this.all = new KafkaFutureImpl<>();
+ this.valid = new KafkaFutureImpl<>();
+ this.errors = new KafkaFutureImpl<>();
+ future.thenApply((KafkaFuture.BaseFunction<Collection<Object>, Void>)
results -> {
+ ArrayList<Throwable> curErrors = new ArrayList<>();
+ ArrayList<ShareGroupListing> curValid = new ArrayList<>();
+ for (Object resultObject : results) {
+ if (resultObject instanceof Throwable) {
+ curErrors.add((Throwable) resultObject);
+ } else {
+ curValid.add((ShareGroupListing) resultObject);
+ }
+ }
+ if (!curErrors.isEmpty()) {
+ all.completeExceptionally(curErrors.get(0));
+ } else {
+ all.complete(curValid);
+ }
+ valid.complete(curValid);
+ errors.complete(curErrors);
+ return null;
+ });
+ }
+
+ /**
+ * Returns a future that yields either an exception, or the full set of
share group listings.
+ */
+ public KafkaFuture<Collection<ShareGroupListing>> all() {
+ return all;
+ }
+
+ /**
+ * Returns a future which yields just the valid listings.
+ */
+ public KafkaFuture<Collection<ShareGroupListing>> valid() {
+ return valid;
+ }
+
+ /**
+ * Returns a future which yields just the errors which occurred.
+ */
+ public KafkaFuture<Collection<Throwable>> errors() {
+ return errors;
+ }
+}
\ No newline at end of file
diff --git
a/clients/src/main/java/org/apache/kafka/clients/admin/ShareGroupDescription.java
b/clients/src/main/java/org/apache/kafka/clients/admin/ShareGroupDescription.java
new file mode 100644
index 00000000000..30d88e31192
--- /dev/null
+++
b/clients/src/main/java/org/apache/kafka/clients/admin/ShareGroupDescription.java
@@ -0,0 +1,124 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.kafka.clients.admin;
+
+import org.apache.kafka.common.Node;
+import org.apache.kafka.common.ShareGroupState;
+import org.apache.kafka.common.acl.AclOperation;
+import org.apache.kafka.common.annotation.InterfaceStability;
+
+import java.util.ArrayList;
+import java.util.Collection;
+import java.util.Collections;
+import java.util.Objects;
+import java.util.Set;
+import java.util.stream.Collectors;
+
+/**
+ * A detailed description of a single share group in the cluster.
+ */
[email protected]
+public class ShareGroupDescription {
+ private final String groupId;
+ private final Collection<MemberDescription> members;
+ private final ShareGroupState state;
+ private final Node coordinator;
+ private final Set<AclOperation> authorizedOperations;
+
+ public ShareGroupDescription(String groupId,
+ Collection<MemberDescription> members,
+ ShareGroupState state,
+ Node coordinator) {
+ this(groupId, members, state, coordinator, Collections.emptySet());
+ }
+
+ public ShareGroupDescription(String groupId,
+ Collection<MemberDescription> members,
+ ShareGroupState state,
+ Node coordinator,
+ Set<AclOperation> authorizedOperations) {
+ this.groupId = groupId == null ? "" : groupId;
+ this.members = members == null ? Collections.emptyList() :
+ Collections.unmodifiableList(new ArrayList<>(members));
+ this.state = state;
+ this.coordinator = coordinator;
+ this.authorizedOperations = authorizedOperations;
+ }
+
+ @Override
+ public boolean equals(final Object o) {
+ if (this == o) return true;
+ if (o == null || getClass() != o.getClass()) return false;
+ final ShareGroupDescription that = (ShareGroupDescription) o;
+ return Objects.equals(groupId, that.groupId) &&
+ Objects.equals(members, that.members) &&
+ state == that.state &&
+ Objects.equals(coordinator, that.coordinator) &&
+ Objects.equals(authorizedOperations, that.authorizedOperations);
+ }
+
+ @Override
+ public int hashCode() {
+ return Objects.hash(groupId, members, state, coordinator,
authorizedOperations);
+ }
+
+ /**
+ * The id of the share group.
+ */
+ public String groupId() {
+ return groupId;
+ }
+
+ /**
+ * A list of the members of the share group.
+ */
+ public Collection<MemberDescription> members() {
+ return members;
+ }
+
+ /**
+ * The share group state, or UNKNOWN if the state is too new for us to
parse.
+ */
+ public ShareGroupState state() {
+ return state;
+ }
+
+ /**
+ * The share group coordinator, or null if the coordinator is not known.
+ */
+ public Node coordinator() {
+ return coordinator;
+ }
+
+ /**
+ * authorizedOperations for this group, or null if that information is not
known.
+ */
+ public Set<AclOperation> authorizedOperations() {
+ return authorizedOperations;
+ }
+
+ @Override
+ public String toString() {
+ return "(groupId=" + groupId +
+ ", members=" +
members.stream().map(MemberDescription::toString).collect(Collectors.joining(","))
+
+ ", state=" + state +
+ ", coordinator=" + coordinator +
+ ", authorizedOperations=" + authorizedOperations +
+ ")";
+ }
+}
diff --git
a/clients/src/main/java/org/apache/kafka/clients/admin/ShareGroupListing.java
b/clients/src/main/java/org/apache/kafka/clients/admin/ShareGroupListing.java
new file mode 100644
index 00000000000..54af225d2ca
--- /dev/null
+++
b/clients/src/main/java/org/apache/kafka/clients/admin/ShareGroupListing.java
@@ -0,0 +1,97 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.kafka.clients.admin;
+
+import org.apache.kafka.common.ShareGroupState;
+import org.apache.kafka.common.annotation.InterfaceStability;
+
+import java.util.Objects;
+import java.util.Optional;
+
+/**
+ * A listing of a share group in the cluster.
+ * <p>
+ * The API of this class is evolving, see {@link Admin} for details.
+ */
[email protected]
+public class ShareGroupListing {
+ private final String groupId;
+ private final Optional<ShareGroupState> state;
+
+ /**
+ * Create an instance with the specified parameters.
+ *
+ * @param groupId Group Id
+ */
+ public ShareGroupListing(String groupId) {
+ this(groupId, Optional.empty());
+ }
+
+ /**
+ * Create an instance with the specified parameters.
+ *
+ * @param groupId Group Id
+ * @param state The state of the share group
+ */
+ public ShareGroupListing(String groupId, Optional<ShareGroupState> state) {
+ this.groupId = groupId;
+ this.state = Objects.requireNonNull(state);
+ }
+
+ /**
+ * The id of the share group.
+ */
+ public String groupId() {
+ return groupId;
+ }
+
+ /**
+ * The share group state.
+ */
+ public Optional<ShareGroupState> state() {
+ return state;
+ }
+
+ @Override
+ public String toString() {
+ return "(" +
+ "groupId='" + groupId + '\'' +
+ ", state=" + state +
+ ')';
+ }
+
+ @Override
+ public int hashCode() {
+ return Objects.hash(groupId, state);
+ }
+
+ @Override
+ public boolean equals(Object o) {
+ if (this == o)
+ return true;
+ if (o == null || getClass() != o.getClass())
+ return false;
+ if (!(o instanceof ShareGroupListing))
+ return false;
+ if (getClass() != o.getClass())
+ return false;
+ ShareGroupListing that = (ShareGroupListing) o;
+ return Objects.equals(groupId, that.groupId) &&
+ Objects.equals(state, that.state);
+ }
+}
\ No newline at end of file
diff --git
a/clients/src/main/java/org/apache/kafka/clients/admin/internals/DescribeShareGroupsHandler.java
b/clients/src/main/java/org/apache/kafka/clients/admin/internals/DescribeShareGroupsHandler.java
new file mode 100644
index 00000000000..97c44f3e784
--- /dev/null
+++
b/clients/src/main/java/org/apache/kafka/clients/admin/internals/DescribeShareGroupsHandler.java
@@ -0,0 +1,198 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.kafka.clients.admin.internals;
+
+import org.apache.kafka.clients.admin.MemberAssignment;
+import org.apache.kafka.clients.admin.MemberDescription;
+import org.apache.kafka.clients.admin.ShareGroupDescription;
+import org.apache.kafka.common.Node;
+import org.apache.kafka.common.ShareGroupState;
+import org.apache.kafka.common.TopicPartition;
+import org.apache.kafka.common.acl.AclOperation;
+import org.apache.kafka.common.message.ShareGroupDescribeRequestData;
+import org.apache.kafka.common.message.ShareGroupDescribeResponseData;
+import org.apache.kafka.common.protocol.Errors;
+import org.apache.kafka.common.requests.AbstractResponse;
+import org.apache.kafka.common.requests.FindCoordinatorRequest;
+import org.apache.kafka.common.requests.FindCoordinatorRequest.CoordinatorType;
+import org.apache.kafka.common.requests.MetadataResponse;
+import org.apache.kafka.common.requests.ShareGroupDescribeRequest;
+import org.apache.kafka.common.requests.ShareGroupDescribeResponse;
+import org.apache.kafka.common.utils.LogContext;
+import org.apache.kafka.common.utils.Utils;
+
+import org.slf4j.Logger;
+
+import java.util.ArrayList;
+import java.util.Collection;
+import java.util.HashMap;
+import java.util.HashSet;
+import java.util.List;
+import java.util.Map;
+import java.util.Set;
+import java.util.stream.Collectors;
+
+public class DescribeShareGroupsHandler extends
AdminApiHandler.Batched<CoordinatorKey, ShareGroupDescription> {
+
+ private final boolean includeAuthorizedOperations;
+ private final Logger log;
+ private final AdminApiLookupStrategy<CoordinatorKey> lookupStrategy;
+
+ public DescribeShareGroupsHandler(
+ boolean includeAuthorizedOperations,
+ LogContext logContext) {
+ this.includeAuthorizedOperations = includeAuthorizedOperations;
+ this.log = logContext.logger(DescribeShareGroupsHandler.class);
+ this.lookupStrategy = new CoordinatorStrategy(CoordinatorType.GROUP,
logContext);
+ }
+
+ private static Set<CoordinatorKey> buildKeySet(Collection<String>
groupIds) {
+ return groupIds.stream()
+ .map(CoordinatorKey::byGroupId)
+ .collect(Collectors.toSet());
+ }
+
+ public static AdminApiFuture.SimpleAdminApiFuture<CoordinatorKey,
ShareGroupDescription> newFuture(Collection<String> groupIds) {
+ return AdminApiFuture.forKeys(buildKeySet(groupIds));
+ }
+
+ @Override
+ public String apiName() {
+ return "describeShareGroups";
+ }
+
+ @Override
+ public AdminApiLookupStrategy<CoordinatorKey> lookupStrategy() {
+ return lookupStrategy;
+ }
+
+ @Override
+ public ShareGroupDescribeRequest.Builder buildBatchedRequest(int
coordinatorId, Set<CoordinatorKey> keys) {
+ List<String> groupIds = keys.stream().map(key -> {
+ if (key.type != FindCoordinatorRequest.CoordinatorType.GROUP) {
+ throw new IllegalArgumentException("Invalid transaction
coordinator key " + key +
+ " when building `DescribeShareGroups` request");
+ }
+ return key.idValue;
+ }).collect(Collectors.toList());
+ ShareGroupDescribeRequestData data = new
ShareGroupDescribeRequestData()
+ .setGroupIds(groupIds)
+ .setIncludeAuthorizedOperations(includeAuthorizedOperations);
+ return new ShareGroupDescribeRequest.Builder(data, true);
+ }
+
+ @Override
+ public ApiResult<CoordinatorKey, ShareGroupDescription> handleResponse(
+ Node coordinator,
+ Set<CoordinatorKey> groupIds,
+ AbstractResponse abstractResponse) {
+ final ShareGroupDescribeResponse response =
(ShareGroupDescribeResponse) abstractResponse;
+ final Map<CoordinatorKey, ShareGroupDescription> completed = new
HashMap<>();
+ final Map<CoordinatorKey, Throwable> failed = new HashMap<>();
+ final Set<CoordinatorKey> groupsToUnmap = new HashSet<>();
+
+ for (ShareGroupDescribeResponseData.DescribedGroup describedGroup :
response.data().groups()) {
+ CoordinatorKey groupIdKey =
CoordinatorKey.byGroupId(describedGroup.groupId());
+ Errors error = Errors.forCode(describedGroup.errorCode());
+ if (error != Errors.NONE) {
+ handleError(groupIdKey, error, describedGroup.errorMessage(),
failed, groupsToUnmap);
+ continue;
+ }
+
+ final List<MemberDescription> memberDescriptions = new
ArrayList<>(describedGroup.members().size());
+ final Set<AclOperation> authorizedOperations =
validAclOperations(describedGroup.authorizedOperations());
+
+ describedGroup.members().forEach(groupMember ->
+ memberDescriptions.add(new MemberDescription(
+ groupMember.memberId(),
+ groupMember.clientId(),
+ groupMember.clientHost(),
+ new
MemberAssignment(convertAssignment(groupMember.assignment()))
+ ))
+ );
+
+ final ShareGroupDescription shareGroupDescription =
+ new ShareGroupDescription(groupIdKey.idValue,
+ memberDescriptions,
+ ShareGroupState.parse(describedGroup.groupState()),
+ coordinator,
+ authorizedOperations);
+ completed.put(groupIdKey, shareGroupDescription);
+ }
+
+ return new ApiResult<>(completed, failed, new
ArrayList<>(groupsToUnmap));
+ }
+
+ private Set<TopicPartition>
convertAssignment(ShareGroupDescribeResponseData.Assignment assignment) {
+ return assignment.topicPartitions().stream().flatMap(topic ->
+ topic.partitions().stream().map(partition ->
+ new TopicPartition(topic.topicName(), partition)
+ )
+ ).collect(Collectors.toSet());
+ }
+
+ private void handleError(
+ CoordinatorKey groupId,
+ Errors error,
+ String errorMsg,
+ Map<CoordinatorKey, Throwable> failed,
+ Set<CoordinatorKey> groupsToUnmap) {
+ switch (error) {
+ case GROUP_AUTHORIZATION_FAILED:
+ log.debug("`DescribeShareGroups` request for group id {}
failed due to error {}", groupId.idValue, error);
+ failed.put(groupId, error.exception(errorMsg));
+ break;
+
+ case COORDINATOR_LOAD_IN_PROGRESS:
+ // If the coordinator is in the middle of loading, then we
just need to retry
+ log.debug("`DescribeShareGroups` request for group id {}
failed because the coordinator " +
+ "is still in the process of loading state. Will retry",
groupId.idValue);
+ break;
+
+ case COORDINATOR_NOT_AVAILABLE:
+ case NOT_COORDINATOR:
+ // If the coordinator is unavailable or there was a
coordinator change, then we unmap
+ // the key so that we retry the `FindCoordinator` request
+ log.debug("`DescribeShareGroups` request for group id {}
returned error {}. " +
+ "Will attempt to find the coordinator again and retry",
groupId.idValue, error);
+ groupsToUnmap.add(groupId);
+ break;
+
+ case GROUP_ID_NOT_FOUND:
+ log.error("`DescribeShareGroups` request for group id {}
failed because the group does not exist.", groupId.idValue);
+ failed.put(groupId, error.exception(errorMsg));
+ break;
+
+ default:
+ log.error("`DescribeShareGroups` request for group id {}
failed due to unexpected error {}", groupId.idValue, error);
+ failed.put(groupId, error.exception(errorMsg));
+ }
+ }
+
+ private Set<AclOperation> validAclOperations(final int
authorizedOperations) {
+ if (authorizedOperations ==
MetadataResponse.AUTHORIZED_OPERATIONS_OMITTED) {
+ return null;
+ }
+ return Utils.from32BitField(authorizedOperations)
+ .stream()
+ .map(AclOperation::fromCode)
+ .filter(operation -> operation != AclOperation.UNKNOWN
+ && operation != AclOperation.ALL
+ && operation != AclOperation.ANY)
+ .collect(Collectors.toSet());
+ }
+}
diff --git a/clients/src/main/java/org/apache/kafka/common/GroupType.java
b/clients/src/main/java/org/apache/kafka/common/GroupType.java
index 10bb1fc348b..eeb79ea2825 100644
--- a/clients/src/main/java/org/apache/kafka/common/GroupType.java
+++ b/clients/src/main/java/org/apache/kafka/common/GroupType.java
@@ -25,7 +25,8 @@ import java.util.stream.Collectors;
public enum GroupType {
UNKNOWN("Unknown"),
CONSUMER("Consumer"),
- CLASSIC("Classic");
+ CLASSIC("Classic"),
+ SHARE("Share");
private static final Map<String, GroupType> NAME_TO_ENUM =
Arrays.stream(values())
.collect(Collectors.toMap(type -> type.name.toLowerCase(Locale.ROOT),
Function.identity()));
diff --git
a/clients/src/test/java/org/apache/kafka/clients/admin/KafkaAdminClientTest.java
b/clients/src/test/java/org/apache/kafka/clients/admin/KafkaAdminClientTest.java
index dc74a178442..bc4ccb52f8b 100644
---
a/clients/src/test/java/org/apache/kafka/clients/admin/KafkaAdminClientTest.java
+++
b/clients/src/test/java/org/apache/kafka/clients/admin/KafkaAdminClientTest.java
@@ -35,6 +35,7 @@ import org.apache.kafka.common.KafkaException;
import org.apache.kafka.common.KafkaFuture;
import org.apache.kafka.common.Node;
import org.apache.kafka.common.PartitionInfo;
+import org.apache.kafka.common.ShareGroupState;
import org.apache.kafka.common.TopicCollection;
import org.apache.kafka.common.TopicPartition;
import org.apache.kafka.common.TopicPartitionReplica;
@@ -141,6 +142,7 @@ import
org.apache.kafka.common.message.OffsetDeleteResponseData.OffsetDeleteResp
import org.apache.kafka.common.message.OffsetFetchRequestData;
import
org.apache.kafka.common.message.OffsetFetchRequestData.OffsetFetchRequestGroup;
import
org.apache.kafka.common.message.OffsetFetchRequestData.OffsetFetchRequestTopics;
+import org.apache.kafka.common.message.ShareGroupDescribeResponseData;
import org.apache.kafka.common.message.UnregisterBrokerResponseData;
import org.apache.kafka.common.message.WriteTxnMarkersResponseData;
import org.apache.kafka.common.protocol.ApiKeys;
@@ -213,6 +215,7 @@ import org.apache.kafka.common.requests.OffsetFetchRequest;
import org.apache.kafka.common.requests.OffsetFetchResponse;
import org.apache.kafka.common.requests.OffsetFetchResponse.PartitionData;
import org.apache.kafka.common.requests.RequestTestUtils;
+import org.apache.kafka.common.requests.ShareGroupDescribeResponse;
import org.apache.kafka.common.requests.UnregisterBrokerResponse;
import org.apache.kafka.common.requests.UpdateFeaturesRequest;
import org.apache.kafka.common.requests.UpdateFeaturesResponse;
@@ -4629,6 +4632,357 @@ public class KafkaAdminClientTest {
}
}
+ @Test
+ public void testDescribeShareGroups() throws Exception {
+ try (AdminClientUnitTestEnv env = new
AdminClientUnitTestEnv(mockCluster(1, 0))) {
+ env.kafkaClient().setNodeApiVersions(NodeApiVersions.create());
+
+ // Retriable FindCoordinatorResponse errors should be retried
+
env.kafkaClient().prepareResponse(prepareFindCoordinatorResponse(Errors.COORDINATOR_NOT_AVAILABLE,
Node.noNode()));
+
env.kafkaClient().prepareResponse(prepareFindCoordinatorResponse(Errors.COORDINATOR_LOAD_IN_PROGRESS,
Node.noNode()));
+
env.kafkaClient().prepareResponse(prepareFindCoordinatorResponse(Errors.NONE,
env.cluster().controller()));
+
+ ShareGroupDescribeResponseData data = new
ShareGroupDescribeResponseData();
+
+ // Retriable errors should be retried
+ data.groups().add(new
ShareGroupDescribeResponseData.DescribedGroup()
+ .setGroupId(GROUP_ID)
+ .setErrorCode(Errors.COORDINATOR_LOAD_IN_PROGRESS.code()));
+ env.kafkaClient().prepareResponse(new
ShareGroupDescribeResponse(data));
+
+ /*
+ * We need to return two responses here, one with NOT_COORDINATOR
error when calling describe share group
+ * api using coordinator that has moved. This will retry whole
operation. So we need to again respond with a
+ * FindCoordinatorResponse.
+ *
+ * And the same reason for COORDINATOR_NOT_AVAILABLE error response
+ */
+ data = new ShareGroupDescribeResponseData();
+ data.groups().add(new
ShareGroupDescribeResponseData.DescribedGroup()
+ .setGroupId(GROUP_ID)
+ .setErrorCode(Errors.NOT_COORDINATOR.code()));
+ env.kafkaClient().prepareResponse(new
ShareGroupDescribeResponse(data));
+
env.kafkaClient().prepareResponse(prepareFindCoordinatorResponse(Errors.NONE,
env.cluster().controller()));
+
+ data = new ShareGroupDescribeResponseData();
+ data.groups().add(new
ShareGroupDescribeResponseData.DescribedGroup()
+ .setGroupId(GROUP_ID)
+ .setErrorCode(Errors.COORDINATOR_NOT_AVAILABLE.code()));
+ env.kafkaClient().prepareResponse(new
ShareGroupDescribeResponse(data));
+
env.kafkaClient().prepareResponse(prepareFindCoordinatorResponse(Errors.NONE,
env.cluster().controller()));
+
+ data = new ShareGroupDescribeResponseData();
+ ShareGroupDescribeResponseData.TopicPartitions topicPartitions =
new ShareGroupDescribeResponseData.TopicPartitions()
+ .setTopicName("my_topic")
+ .setPartitions(asList(0, 1, 2));
+ ShareGroupDescribeResponseData.Assignment memberAssignment = new
ShareGroupDescribeResponseData.Assignment()
+ .setTopicPartitions(asList(topicPartitions));
+ ShareGroupDescribeResponseData.Member memberOne = new
ShareGroupDescribeResponseData.Member()
+ .setMemberId("0")
+ .setClientId("clientId0")
+ .setClientHost("clientHost")
+ .setAssignment(memberAssignment);
+ ShareGroupDescribeResponseData.Member memberTwo = new
ShareGroupDescribeResponseData.Member()
+ .setMemberId("1")
+ .setClientId("clientId1")
+ .setClientHost("clientHost")
+ .setAssignment(memberAssignment);
+
+ ShareGroupDescribeResponseData group0Data = new
ShareGroupDescribeResponseData();
+ group0Data.groups().add(new
ShareGroupDescribeResponseData.DescribedGroup()
+ .setGroupId(GROUP_ID)
+ .setGroupState(ShareGroupState.STABLE.toString())
+ .setMembers(asList(memberOne, memberTwo)));
+
+ final List<TopicPartition> expectedTopicPartitions = new
ArrayList<>();
+ expectedTopicPartitions.add(0, new TopicPartition("my_topic", 0));
+ expectedTopicPartitions.add(1, new TopicPartition("my_topic", 1));
+ expectedTopicPartitions.add(2, new TopicPartition("my_topic", 2));
+
+ List<MemberDescription> expectedMemberDescriptions = new
ArrayList<>();
+
expectedMemberDescriptions.add(convertToMemberDescriptions(memberOne,
+ new MemberAssignment(new HashSet<>(expectedTopicPartitions))));
+
expectedMemberDescriptions.add(convertToMemberDescriptions(memberTwo,
+ new MemberAssignment(new HashSet<>(expectedTopicPartitions))));
+ data.groups().add(new
ShareGroupDescribeResponseData.DescribedGroup()
+ .setGroupId(GROUP_ID)
+ .setGroupState(ShareGroupState.STABLE.toString())
+ .setMembers(asList(memberOne, memberTwo)));
+
+ env.kafkaClient().prepareResponse(new
ShareGroupDescribeResponse(data));
+
+ final DescribeShareGroupsResult result =
env.adminClient().describeShareGroups(singletonList(GROUP_ID));
+ final ShareGroupDescription groupDescription =
result.describedGroups().get(GROUP_ID).get();
+
+ assertEquals(1, result.describedGroups().size());
+ assertEquals(GROUP_ID, groupDescription.groupId());
+ assertEquals(2, groupDescription.members().size());
+ assertEquals(expectedMemberDescriptions,
groupDescription.members());
+ }
+ }
+
+ @Test
+ public void testDescribeShareGroupsWithAuthorizedOperationsOmitted()
throws Exception {
+ try (AdminClientUnitTestEnv env = new
AdminClientUnitTestEnv(mockCluster(1, 0))) {
+ env.kafkaClient().setNodeApiVersions(NodeApiVersions.create());
+
+ env.kafkaClient().prepareResponse(
+ prepareFindCoordinatorResponse(Errors.NONE,
env.cluster().controller()));
+
+ ShareGroupDescribeResponseData data = new
ShareGroupDescribeResponseData();
+
+ data.groups().add(new
ShareGroupDescribeResponseData.DescribedGroup()
+ .setGroupId(GROUP_ID)
+
.setAuthorizedOperations(MetadataResponse.AUTHORIZED_OPERATIONS_OMITTED));
+
+ env.kafkaClient().prepareResponse(new
ShareGroupDescribeResponse(data));
+
+ final DescribeShareGroupsResult result =
env.adminClient().describeShareGroups(singletonList(GROUP_ID));
+ final ShareGroupDescription groupDescription =
result.describedGroups().get(GROUP_ID).get();
+
+ assertNull(groupDescription.authorizedOperations());
+ }
+ }
+
+ @Test
+ public void testDescribeMultipleShareGroups() {
+ try (AdminClientUnitTestEnv env = new
AdminClientUnitTestEnv(mockCluster(1, 0))) {
+ env.kafkaClient().setNodeApiVersions(NodeApiVersions.create());
+
+
env.kafkaClient().prepareResponse(prepareFindCoordinatorResponse(Errors.NONE,
env.cluster().controller()));
+
+ ShareGroupDescribeResponseData.TopicPartitions topicPartitions =
new ShareGroupDescribeResponseData.TopicPartitions()
+ .setTopicName("my_topic")
+ .setPartitions(asList(0, 1, 2));
+ final ShareGroupDescribeResponseData.Assignment memberAssignment =
new ShareGroupDescribeResponseData.Assignment()
+ .setTopicPartitions(asList(topicPartitions));
+ ShareGroupDescribeResponseData group0Data = new
ShareGroupDescribeResponseData();
+ group0Data.groups().add(new
ShareGroupDescribeResponseData.DescribedGroup()
+ .setGroupId(GROUP_ID)
+ .setGroupState(ShareGroupState.STABLE.toString())
+ .setMembers(asList(
+ new ShareGroupDescribeResponseData.Member()
+ .setMemberId("0")
+ .setClientId("clientId0")
+ .setClientHost("clientHost")
+ .setAssignment(memberAssignment),
+ new ShareGroupDescribeResponseData.Member()
+ .setMemberId("1")
+ .setClientId("clientId1")
+ .setClientHost("clientHost")
+ .setAssignment(memberAssignment))));
+
+ ShareGroupDescribeResponseData group1Data = new
ShareGroupDescribeResponseData();
+ group1Data.groups().add(new
ShareGroupDescribeResponseData.DescribedGroup()
+ .setGroupId("group-1")
+ .setGroupState(ShareGroupState.STABLE.toString())
+ .setMembers(asList(
+ new ShareGroupDescribeResponseData.Member()
+ .setMemberId("0")
+ .setClientId("clientId0")
+ .setClientHost("clientHost")
+ .setAssignment(memberAssignment),
+ new ShareGroupDescribeResponseData.Member()
+ .setMemberId("1")
+ .setClientId("clientId1")
+ .setClientHost("clientHost")
+ .setAssignment(memberAssignment))));
+
+ env.kafkaClient().prepareResponse(new
ShareGroupDescribeResponse(group0Data));
+ env.kafkaClient().prepareResponse(new
ShareGroupDescribeResponse(group1Data));
+
+ Collection<String> groups = new HashSet<>();
+ groups.add(GROUP_ID);
+ groups.add("group-1");
+ final DescribeShareGroupsResult result =
env.adminClient().describeShareGroups(groups);
+ assertEquals(2, result.describedGroups().size());
+ assertEquals(groups, result.describedGroups().keySet());
+ }
+ }
+
+ @Test
+ public void testListShareGroups() throws Exception {
+ try (AdminClientUnitTestEnv env = new
AdminClientUnitTestEnv(mockCluster(4, 0),
+ AdminClientConfig.RETRIES_CONFIG, "2")) {
+ env.kafkaClient().setNodeApiVersions(NodeApiVersions.create());
+
+ // Empty metadata response should be retried
+ env.kafkaClient().prepareResponse(
+ RequestTestUtils.metadataResponse(
+ Collections.emptyList(),
+ env.cluster().clusterResource().clusterId(),
+ -1,
+ Collections.emptyList()));
+
+ env.kafkaClient().prepareResponse(
+ RequestTestUtils.metadataResponse(
+ env.cluster().nodes(),
+ env.cluster().clusterResource().clusterId(),
+ env.cluster().controller().id(),
+ Collections.emptyList()));
+
+ env.kafkaClient().prepareResponseFrom(
+ new ListGroupsResponse(
+ new ListGroupsResponseData()
+ .setErrorCode(Errors.NONE.code())
+ .setGroups(Arrays.asList(
+ new ListGroupsResponseData.ListedGroup()
+ .setGroupId("share-group-1")
+ .setGroupType(GroupType.SHARE.toString())
+ .setGroupState("Stable")
+ ))),
+ env.cluster().nodeById(0));
+
+ // handle retriable errors
+ env.kafkaClient().prepareResponseFrom(
+ new ListGroupsResponse(
+ new ListGroupsResponseData()
+ .setErrorCode(Errors.COORDINATOR_NOT_AVAILABLE.code())
+ .setGroups(Collections.emptyList())
+ ),
+ env.cluster().nodeById(1));
+ env.kafkaClient().prepareResponseFrom(
+ new ListGroupsResponse(
+ new ListGroupsResponseData()
+
.setErrorCode(Errors.COORDINATOR_LOAD_IN_PROGRESS.code())
+ .setGroups(Collections.emptyList())
+ ),
+ env.cluster().nodeById(1));
+ env.kafkaClient().prepareResponseFrom(
+ new ListGroupsResponse(
+ new ListGroupsResponseData()
+ .setErrorCode(Errors.NONE.code())
+ .setGroups(Arrays.asList(
+ new ListGroupsResponseData.ListedGroup()
+ .setGroupId("share-group-2")
+ .setGroupType(GroupType.SHARE.toString())
+ .setGroupState("Stable"),
+ new ListGroupsResponseData.ListedGroup()
+ .setGroupId("share-group-3")
+ .setGroupType(GroupType.SHARE.toString())
+ .setGroupState("Stable")
+ ))),
+ env.cluster().nodeById(1));
+
+ env.kafkaClient().prepareResponseFrom(
+ new ListGroupsResponse(
+ new ListGroupsResponseData()
+ .setErrorCode(Errors.NONE.code())
+ .setGroups(Arrays.asList(
+ new ListGroupsResponseData.ListedGroup()
+ .setGroupId("share-group-4")
+ .setGroupType(GroupType.SHARE.toString())
+ .setGroupState("Stable")
+ ))),
+ env.cluster().nodeById(2));
+
+ // fatal error
+ env.kafkaClient().prepareResponseFrom(
+ new ListGroupsResponse(
+ new ListGroupsResponseData()
+ .setErrorCode(Errors.UNKNOWN_SERVER_ERROR.code())
+ .setGroups(Collections.emptyList())),
+ env.cluster().nodeById(3));
+
+ final ListShareGroupsResult result =
env.adminClient().listShareGroups();
+ TestUtils.assertFutureError(result.all(),
UnknownServerException.class);
+
+ Collection<ShareGroupListing> listings = result.valid().get();
+ assertEquals(4, listings.size());
+
+ Set<String> groupIds = new HashSet<>();
+ for (ShareGroupListing listing : listings) {
+ groupIds.add(listing.groupId());
+ assertTrue(listing.state().isPresent());
+ }
+
+ assertEquals(Utils.mkSet("share-group-1", "share-group-2",
"share-group-3", "share-group-4"), groupIds);
+ assertEquals(1, result.errors().get().size());
+ }
+ }
+
+ @Test
+ public void testListShareGroupsMetadataFailure() throws Exception {
+ final Cluster cluster = mockCluster(3, 0);
+ final Time time = new MockTime();
+
+ try (AdminClientUnitTestEnv env = new AdminClientUnitTestEnv(time,
cluster,
+ AdminClientConfig.RETRIES_CONFIG, "0")) {
+ env.kafkaClient().setNodeApiVersions(NodeApiVersions.create());
+
+ // Empty metadata causes the request to fail since we have no list
of brokers
+ // to send the ListGroups requests to
+ env.kafkaClient().prepareResponse(
+ RequestTestUtils.metadataResponse(
+ Collections.emptyList(),
+ env.cluster().clusterResource().clusterId(),
+ -1,
+ Collections.emptyList()));
+
+ final ListShareGroupsResult result =
env.adminClient().listShareGroups();
+ TestUtils.assertFutureError(result.all(), KafkaException.class);
+ }
+ }
+
+ @Test
+ public void testListShareGroupsWithStates() throws Exception {
+ try (AdminClientUnitTestEnv env = new
AdminClientUnitTestEnv(mockCluster(1, 0))) {
+ env.kafkaClient().setNodeApiVersions(NodeApiVersions.create());
+
+
env.kafkaClient().prepareResponse(prepareMetadataResponse(env.cluster(),
Errors.NONE));
+
+ env.kafkaClient().prepareResponseFrom(
+ new ListGroupsResponse(new ListGroupsResponseData()
+ .setErrorCode(Errors.NONE.code())
+ .setGroups(Arrays.asList(
+ new ListGroupsResponseData.ListedGroup()
+ .setGroupId("share-group-1")
+ .setGroupType(GroupType.SHARE.toString())
+ .setGroupState("Stable"),
+ new ListGroupsResponseData.ListedGroup()
+ .setGroupId("share-group-2")
+ .setGroupType(GroupType.SHARE.toString())
+ .setGroupState("Empty")))),
+ env.cluster().nodeById(0));
+
+ final ListShareGroupsOptions options = new
ListShareGroupsOptions();
+ final ListShareGroupsResult result =
env.adminClient().listShareGroups(options);
+ Collection<ShareGroupListing> listings = result.valid().get();
+
+ assertEquals(2, listings.size());
+ List<ShareGroupListing> expected = new ArrayList<>();
+ expected.add(new ShareGroupListing("share-group-1",
Optional.of(ShareGroupState.STABLE)));
+ expected.add(new ShareGroupListing("share-group-2",
Optional.of(ShareGroupState.EMPTY)));
+ assertEquals(expected, listings);
+ assertEquals(0, result.errors().get().size());
+ }
+ }
+
+ @Test
+ public void testListShareGroupsWithStatesOlderBrokerVersion() throws
Exception {
+ ApiVersion listGroupV4 = new ApiVersion()
+ .setApiKey(ApiKeys.LIST_GROUPS.id)
+ .setMinVersion((short) 0)
+ .setMaxVersion((short) 4);
+ try (AdminClientUnitTestEnv env = new
AdminClientUnitTestEnv(mockCluster(1, 0))) {
+
env.kafkaClient().setNodeApiVersions(NodeApiVersions.create(Collections.singletonList(listGroupV4)));
+
+
env.kafkaClient().prepareResponse(prepareMetadataResponse(env.cluster(),
Errors.NONE));
+
+ // Check we should not be able to list share groups with broker
having version < 5
+ env.kafkaClient().prepareResponseFrom(
+ new ListGroupsResponse(new ListGroupsResponseData()
+ .setErrorCode(Errors.NONE.code())
+ .setGroups(Collections.singletonList(
+ new ListGroupsResponseData.ListedGroup()
+ .setGroupId("share-group-1")))),
+ env.cluster().nodeById(0));
+ ListShareGroupsOptions options = new ListShareGroupsOptions();
+ ListShareGroupsResult result =
env.adminClient().listShareGroups(options);
+ TestUtils.assertFutureThrows(result.all(),
UnsupportedVersionException.class);
+ }
+ }
+
@Test
public void testIncrementalAlterConfigs() throws Exception {
try (AdminClientUnitTestEnv env = mockClientEnv()) {
@@ -7754,6 +8108,15 @@ public class KafkaAdminClientTest {
assignment);
}
+ private static MemberDescription
convertToMemberDescriptions(ShareGroupDescribeResponseData.Member member,
+
MemberAssignment assignment) {
+ return new MemberDescription(member.memberId(),
+ Optional.empty(),
+ member.clientId(),
+ member.clientHost(),
+ assignment);
+ }
+
@Test
public void testListClientMetricsResources() throws Exception {
try (AdminClientUnitTestEnv env = mockClientEnv()) {
diff --git
a/clients/src/test/java/org/apache/kafka/clients/admin/MockAdminClient.java
b/clients/src/test/java/org/apache/kafka/clients/admin/MockAdminClient.java
index 8ff65b51a22..6758f21a3f4 100644
--- a/clients/src/test/java/org/apache/kafka/clients/admin/MockAdminClient.java
+++ b/clients/src/test/java/org/apache/kafka/clients/admin/MockAdminClient.java
@@ -1316,6 +1316,16 @@ public class MockAdminClient extends AdminClient {
throw new UnsupportedOperationException("Not implemented yet");
}
+ @Override
+ public synchronized DescribeShareGroupsResult
describeShareGroups(Collection<String> groupIds, DescribeShareGroupsOptions
options) {
+ throw new UnsupportedOperationException("Not implemented yet");
+ }
+
+ @Override
+ public synchronized ListShareGroupsResult
listShareGroups(ListShareGroupsOptions options) {
+ throw new UnsupportedOperationException("Not implemented yet");
+ }
+
@Override
public synchronized void close(Duration timeout) {}