This is an automated email from the ASF dual-hosted git repository.
lucasbru pushed a commit to branch trunk
in repository https://gitbox.apache.org/repos/asf/kafka.git
The following commit(s) were added to refs/heads/trunk by this push:
new 618ea2c1ca6 KAFKA-18285: Add describeStreamsGroup to Admin API (#19116)
618ea2c1ca6 is described below
commit 618ea2c1ca60825a86a76453d4fe60eecdc01dee
Author: Lucas Brutschy <[email protected]>
AuthorDate: Fri Mar 7 15:56:07 2025 +0100
KAFKA-18285: Add describeStreamsGroup to Admin API (#19116)
Adds `describeStreamsGroup` to Admin API.
This exposes the result of the `DESCRIBE_STREAMS_GROUP` RPC in the Admin
API.
Reviewers: Bill Bejeck <[email protected]>
---
.../java/org/apache/kafka/clients/admin/Admin.java | 23 ++
.../admin/DescribeStreamsGroupsOptions.java | 41 +++
.../clients/admin/DescribeStreamsGroupsResult.java | 68 ++++
.../kafka/clients/admin/ForwardingAdmin.java | 5 +
.../kafka/clients/admin/KafkaAdminClient.java | 12 +
.../clients/admin/StreamsGroupDescription.java | 180 ++++++++++
.../admin/StreamsGroupMemberAssignment.java | 150 +++++++++
.../admin/StreamsGroupMemberDescription.java | 371 +++++++++++++++++++++
.../admin/StreamsGroupSubtopologyDescription.java | 196 +++++++++++
.../internals/DescribeStreamsGroupsHandler.java | 284 ++++++++++++++++
.../kafka/clients/admin/KafkaAdminClientTest.java | 341 +++++++++++++++++++
.../kafka/clients/admin/MockAdminClient.java | 5 +
.../TestingMetricsInterceptingAdminClient.java | 7 +
13 files changed, 1683 insertions(+)
diff --git a/clients/src/main/java/org/apache/kafka/clients/admin/Admin.java
b/clients/src/main/java/org/apache/kafka/clients/admin/Admin.java
index 7f433736a9a..9b537f79305 100644
--- a/clients/src/main/java/org/apache/kafka/clients/admin/Admin.java
+++ b/clients/src/main/java/org/apache/kafka/clients/admin/Admin.java
@@ -1959,6 +1959,29 @@ public interface Admin extends AutoCloseable {
*/
DeleteShareGroupsResult deleteShareGroups(Collection<String> groupIds,
DeleteShareGroupsOptions options);
+ /**
+ * Describe streams groups in the cluster.
+ *
+ * @param groupIds The IDs of the groups to describe.
+ * @param options The options to use when describing the groups.
+ * @return The DescribeStreamsGroupsResult.
+ */
+ DescribeStreamsGroupsResult describeStreamsGroups(Collection<String>
groupIds,
+
DescribeStreamsGroupsOptions options);
+
+ /**
+ * Describe streams groups in the cluster, with the default options.
+ * <p>
+ * This is a convenience method for {@link
#describeStreamsGroups(Collection, DescribeStreamsGroupsOptions)}
+ * with default options. See the overload for more details.
+ *
+ * @param groupIds The IDs of the groups to describe.
+ * @return The DescribeStreamsGroupsResult.
+ */
+ default DescribeStreamsGroupsResult
describeStreamsGroups(Collection<String> groupIds) {
+ return describeStreamsGroups(groupIds, new
DescribeStreamsGroupsOptions());
+ }
+
/**
* Describe some classic groups in the cluster.
*
diff --git
a/clients/src/main/java/org/apache/kafka/clients/admin/DescribeStreamsGroupsOptions.java
b/clients/src/main/java/org/apache/kafka/clients/admin/DescribeStreamsGroupsOptions.java
new file mode 100644
index 00000000000..88e6d097768
--- /dev/null
+++
b/clients/src/main/java/org/apache/kafka/clients/admin/DescribeStreamsGroupsOptions.java
@@ -0,0 +1,41 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.kafka.clients.admin;
+
+import org.apache.kafka.common.annotation.InterfaceStability;
+
+import java.util.Collection;
+
+/**
+ * Options for {@link Admin#describeStreamsGroups(Collection,
DescribeStreamsGroupsOptions)}.
+ * <p>
+ * The API of this class is evolving, see {@link Admin} for details.
+ */
[email protected]
+public class DescribeStreamsGroupsOptions extends
AbstractOptions<DescribeStreamsGroupsOptions> {
+ private boolean includeAuthorizedOperations;
+
+ public DescribeStreamsGroupsOptions includeAuthorizedOperations(boolean
includeAuthorizedOperations) {
+ this.includeAuthorizedOperations = includeAuthorizedOperations;
+ return this;
+ }
+
+ public boolean includeAuthorizedOperations() {
+ return includeAuthorizedOperations;
+ }
+}
\ No newline at end of file
diff --git
a/clients/src/main/java/org/apache/kafka/clients/admin/DescribeStreamsGroupsResult.java
b/clients/src/main/java/org/apache/kafka/clients/admin/DescribeStreamsGroupsResult.java
new file mode 100644
index 00000000000..1276da2de66
--- /dev/null
+++
b/clients/src/main/java/org/apache/kafka/clients/admin/DescribeStreamsGroupsResult.java
@@ -0,0 +1,68 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.kafka.clients.admin;
+
+import org.apache.kafka.common.KafkaFuture;
+import org.apache.kafka.common.annotation.InterfaceStability;
+
+import java.util.Collection;
+import java.util.HashMap;
+import java.util.Map;
+import java.util.concurrent.ExecutionException;
+
+/**
+ * The result of the {@link KafkaAdminClient#describeStreamsGroups(Collection,
DescribeStreamsGroupsOptions)}} call.
+ * <p>
+ * The API of this class is evolving, see {@link Admin} for details.
+ */
[email protected]
+public class DescribeStreamsGroupsResult {
+
+ private final Map<String, KafkaFuture<StreamsGroupDescription>> futures;
+
+ public DescribeStreamsGroupsResult(final Map<String,
KafkaFuture<StreamsGroupDescription>> futures) {
+ this.futures = Map.copyOf(futures);
+ }
+
+ /**
+ * Return a map from group id to futures which yield streams group
descriptions.
+ */
+ public Map<String, KafkaFuture<StreamsGroupDescription>> describedGroups()
{
+ return new HashMap<>(futures);
+ }
+
+ /**
+ * Return a future which yields all StreamsGroupDescription objects, if
all the describes succeed.
+ */
+ public KafkaFuture<Map<String, StreamsGroupDescription>> all() {
+ return KafkaFuture.allOf(futures.values().toArray(new
KafkaFuture<?>[0])).thenApply(
+ nil -> {
+ Map<String, StreamsGroupDescription> descriptions = new
HashMap<>(futures.size());
+ futures.forEach((key, future) -> {
+ try {
+ descriptions.put(key, future.get());
+ } catch (InterruptedException | ExecutionException e) {
+ // This should be unreachable, since the
KafkaFuture#allOf already ensured
+ // that all of the futures completed successfully.
+ throw new RuntimeException(e);
+ }
+ });
+ return descriptions;
+ });
+ }
+}
diff --git
a/clients/src/main/java/org/apache/kafka/clients/admin/ForwardingAdmin.java
b/clients/src/main/java/org/apache/kafka/clients/admin/ForwardingAdmin.java
index 8fb550ad542..f4c7b4c4876 100644
--- a/clients/src/main/java/org/apache/kafka/clients/admin/ForwardingAdmin.java
+++ b/clients/src/main/java/org/apache/kafka/clients/admin/ForwardingAdmin.java
@@ -333,6 +333,11 @@ public class ForwardingAdmin implements Admin {
return delegate.deleteShareGroups(groupIds, options);
}
+ @Override
+ public DescribeStreamsGroupsResult
describeStreamsGroups(Collection<String> groupIds, DescribeStreamsGroupsOptions
options) {
+ return delegate.describeStreamsGroups(groupIds, options);
+ }
+
@Override
public ListGroupsResult listGroups(ListGroupsOptions options) {
return delegate.listGroups(options);
diff --git
a/clients/src/main/java/org/apache/kafka/clients/admin/KafkaAdminClient.java
b/clients/src/main/java/org/apache/kafka/clients/admin/KafkaAdminClient.java
index af42f4421f4..f6f484ca7bf 100644
--- a/clients/src/main/java/org/apache/kafka/clients/admin/KafkaAdminClient.java
+++ b/clients/src/main/java/org/apache/kafka/clients/admin/KafkaAdminClient.java
@@ -55,6 +55,7 @@ import
org.apache.kafka.clients.admin.internals.DescribeClassicGroupsHandler;
import org.apache.kafka.clients.admin.internals.DescribeConsumerGroupsHandler;
import org.apache.kafka.clients.admin.internals.DescribeProducersHandler;
import org.apache.kafka.clients.admin.internals.DescribeShareGroupsHandler;
+import org.apache.kafka.clients.admin.internals.DescribeStreamsGroupsHandler;
import org.apache.kafka.clients.admin.internals.DescribeTransactionsHandler;
import org.apache.kafka.clients.admin.internals.FenceProducersHandler;
import
org.apache.kafka.clients.admin.internals.ListConsumerGroupOffsetsHandler;
@@ -3840,6 +3841,17 @@ public class KafkaAdminClient extends AdminClient {
return new ListShareGroupOffsetsResult(future.all());
}
+ @Override
+ public DescribeStreamsGroupsResult describeStreamsGroups(final
Collection<String> groupIds,
+ final
DescribeStreamsGroupsOptions options) {
+ SimpleAdminApiFuture<CoordinatorKey, StreamsGroupDescription> future =
+ DescribeStreamsGroupsHandler.newFuture(groupIds);
+ DescribeStreamsGroupsHandler handler = new
DescribeStreamsGroupsHandler(options.includeAuthorizedOperations(), logContext);
+ invokeDriver(handler, future, options.timeoutMs);
+ return new DescribeStreamsGroupsResult(future.all().entrySet().stream()
+ .collect(Collectors.toMap(entry -> entry.getKey().idValue,
Map.Entry::getValue)));
+ }
+
@Override
public DescribeClassicGroupsResult describeClassicGroups(final
Collection<String> groupIds,
final
DescribeClassicGroupsOptions options) {
diff --git
a/clients/src/main/java/org/apache/kafka/clients/admin/StreamsGroupDescription.java
b/clients/src/main/java/org/apache/kafka/clients/admin/StreamsGroupDescription.java
new file mode 100644
index 00000000000..024285324a0
--- /dev/null
+++
b/clients/src/main/java/org/apache/kafka/clients/admin/StreamsGroupDescription.java
@@ -0,0 +1,180 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.kafka.clients.admin;
+
+import org.apache.kafka.common.GroupState;
+import org.apache.kafka.common.Node;
+import org.apache.kafka.common.acl.AclOperation;
+import org.apache.kafka.common.annotation.InterfaceStability;
+
+import java.util.Collection;
+import java.util.Objects;
+import java.util.Set;
+import java.util.stream.Collectors;
+
+/**
+ * A detailed description of a single streams group in the cluster.
+ */
[email protected]
+public class StreamsGroupDescription {
+
+ private final String groupId;
+ private final int groupEpoch;
+ private final int targetAssignmentEpoch;
+ private final int topologyEpoch;
+ private final Collection<StreamsGroupSubtopologyDescription> subtopologies;
+ private final Collection<StreamsGroupMemberDescription> members;
+ private final GroupState groupState;
+ private final Node coordinator;
+ private final Set<AclOperation> authorizedOperations;
+
+ public StreamsGroupDescription(
+ final String groupId,
+ final int groupEpoch,
+ final int targetAssignmentEpoch,
+ final int topologyEpoch,
+ final Collection<StreamsGroupSubtopologyDescription> subtopologies,
+ final Collection<StreamsGroupMemberDescription> members,
+ final GroupState groupState,
+ final Node coordinator,
+ final Set<AclOperation> authorizedOperations
+ ) {
+ this.groupId = Objects.requireNonNull(groupId, "groupId must be
non-null");
+ this.groupEpoch = groupEpoch;
+ this.targetAssignmentEpoch = targetAssignmentEpoch;
+ this.topologyEpoch = topologyEpoch;
+ this.subtopologies = Objects.requireNonNull(subtopologies,
"subtopologies must be non-null");
+ this.members = Objects.requireNonNull(members, "members must be
non-null");
+ this.groupState = Objects.requireNonNull(groupState, "groupState must
be non-null");
+ this.coordinator = Objects.requireNonNull(coordinator, "coordinator
must be non-null");
+ this.authorizedOperations = authorizedOperations;
+ }
+
+ /**
+ * The id of the streams group.
+ */
+ public String groupId() {
+ return groupId;
+ }
+
+ /**
+ * The epoch of the consumer group.
+ */
+ public int groupEpoch() {
+ return groupEpoch;
+ }
+
+ /**
+ * The epoch of the target assignment.
+ */
+ public int targetAssignmentEpoch() {
+ return targetAssignmentEpoch;
+ }
+
+ /**
+ * The epoch of the currently used topology.
+ */
+ public int topologyEpoch() {
+ return topologyEpoch;
+ }
+
+ /**
+ * A list of the members of the streams group.
+ */
+ public Collection<StreamsGroupMemberDescription> members() {
+ return members;
+ }
+
+ /**
+ * A list of the subtopologies in the streams group.
+ */
+ public Collection<StreamsGroupSubtopologyDescription> subtopologies() {
+ return subtopologies;
+ }
+
+ /**
+ * The state of the streams group, or UNKNOWN if the state is too new for
us to parse.
+ */
+ public GroupState groupState() {
+ return groupState;
+ }
+
+ /**
+ * The group coordinator, or null if the coordinator is not known.
+ */
+ public Node coordinator() {
+ return coordinator;
+ }
+
+ /**
+ * authorizedOperations for this group, or null if that information is not
known.
+ */
+ public Set<AclOperation> authorizedOperations() {
+ return authorizedOperations;
+ }
+
+ @Override
+ public boolean equals(final Object o) {
+ if (this == o) {
+ return true;
+ }
+ if (o == null || getClass() != o.getClass()) {
+ return false;
+ }
+ final StreamsGroupDescription that = (StreamsGroupDescription) o;
+ return groupEpoch == that.groupEpoch
+ && targetAssignmentEpoch == that.targetAssignmentEpoch
+ && topologyEpoch == that.topologyEpoch
+ && Objects.equals(groupId, that.groupId)
+ && Objects.equals(subtopologies, that.subtopologies)
+ && Objects.equals(members, that.members)
+ && groupState == that.groupState
+ && Objects.equals(coordinator, that.coordinator)
+ && Objects.equals(authorizedOperations, that.authorizedOperations);
+ }
+
+ @Override
+ public int hashCode() {
+ return Objects.hash(
+ groupId,
+ groupEpoch,
+ targetAssignmentEpoch,
+ topologyEpoch,
+ subtopologies,
+ members,
+ groupState,
+ coordinator,
+ authorizedOperations
+ );
+ }
+
+ @Override
+ public String toString() {
+ return "(" +
+ "groupId=" + groupId +
+ ", groupEpoch=" + groupEpoch +
+ ", targetAssignmentEpoch=" + targetAssignmentEpoch +
+ ", topologyEpoch=" + topologyEpoch +
+ ", subtopologies=" +
subtopologies.stream().map(StreamsGroupSubtopologyDescription::toString).collect(Collectors.joining(","))
+
+ ", members=" +
members.stream().map(StreamsGroupMemberDescription::toString).collect(Collectors.joining(","))
+
+ ", groupState=" + groupState +
+ ", coordinator=" + coordinator +
+ ", authorizedOperations=" +
authorizedOperations.stream().map(AclOperation::toString).collect(Collectors.joining(","))
+
+ ')';
+ }
+}
diff --git
a/clients/src/main/java/org/apache/kafka/clients/admin/StreamsGroupMemberAssignment.java
b/clients/src/main/java/org/apache/kafka/clients/admin/StreamsGroupMemberAssignment.java
new file mode 100644
index 00000000000..276058143ff
--- /dev/null
+++
b/clients/src/main/java/org/apache/kafka/clients/admin/StreamsGroupMemberAssignment.java
@@ -0,0 +1,150 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.kafka.clients.admin;
+
+import org.apache.kafka.common.annotation.InterfaceStability;
+
+import java.util.List;
+import java.util.Objects;
+import java.util.stream.Collectors;
+
+/**
+ * A description of the assignments of a specific group member.
+ */
[email protected]
+public class StreamsGroupMemberAssignment {
+
+ private final List<TaskIds> activeTasks;
+ private final List<TaskIds> standbyTasks;
+ private final List<TaskIds> warmupTasks;
+
+ public StreamsGroupMemberAssignment(
+ final List<TaskIds> activeTasks,
+ final List<TaskIds> standbyTasks,
+ final List<TaskIds> warmupTasks
+ ) {
+ this.activeTasks = activeTasks;
+ this.standbyTasks = standbyTasks;
+ this.warmupTasks = warmupTasks;
+ }
+
+ /**
+ * Active tasks for this client.
+ */
+ public List<TaskIds> activeTasks() {
+ return List.copyOf(activeTasks);
+ }
+
+ /**
+ * Standby tasks for this client.
+ */
+ public List<TaskIds> standbyTasks() {
+ return List.copyOf(standbyTasks);
+ }
+
+ /**
+ * Warmup tasks for this client.
+ */
+ public List<TaskIds> warmupTasks() {
+ return List.copyOf(warmupTasks);
+ }
+
+ @Override
+ public boolean equals(final Object o) {
+ if (this == o) {
+ return true;
+ }
+ if (o == null || getClass() != o.getClass()) {
+ return false;
+ }
+ final StreamsGroupMemberAssignment that =
(StreamsGroupMemberAssignment) o;
+ return Objects.equals(activeTasks, that.activeTasks)
+ && Objects.equals(standbyTasks, that.standbyTasks)
+ && Objects.equals(warmupTasks, that.warmupTasks);
+ }
+
+ @Override
+ public int hashCode() {
+ return Objects.hash(
+ activeTasks,
+ standbyTasks,
+ warmupTasks
+ );
+ }
+
+ @Override
+ public String toString() {
+ return "(" +
+ "activeTasks=" +
activeTasks.stream().map(TaskIds::toString).collect(Collectors.joining(",")) +
+ ", standbyTasks=" +
standbyTasks.stream().map(TaskIds::toString).collect(Collectors.joining(",")) +
+ ", warmupTasks=" +
warmupTasks.stream().map(TaskIds::toString).collect(Collectors.joining(",")) +
+ ')';
+ }
+
+ /**
+ * All tasks for one subtopology of a member.
+ */
+ public static class TaskIds {
+ private final String subtopologyId;
+ private final List<Integer> partitions;
+
+ public TaskIds(final String subtopologyId, final List<Integer>
partitions) {
+ this.subtopologyId = Objects.requireNonNull(subtopologyId,
"subtopologyId must be non-null");
+ this.partitions = Objects.requireNonNull(partitions, "partitions
must be non-null");
+ }
+
+ /**
+ * The subtopology identifier.
+ */
+ public String subtopologyId() {
+ return subtopologyId;
+ }
+
+ /**
+ * The partitions of the subtopology processed by this member.
+ */
+ public List<Integer> partitions() {
+ return List.copyOf(partitions);
+ }
+
+ @Override
+ public boolean equals(final Object o) {
+ if (this == o) {
+ return true;
+ }
+ if (o == null || getClass() != o.getClass()) {
+ return false;
+ }
+ final TaskIds taskIds = (TaskIds) o;
+ return Objects.equals(subtopologyId, taskIds.subtopologyId)
+ && Objects.equals(partitions, taskIds.partitions);
+ }
+
+ @Override
+ public int hashCode() {
+ return Objects.hash(
+ subtopologyId,
+ partitions
+ );
+ }
+
+ @Override
+ public String toString() {
+ return partitions.stream().map(x -> subtopologyId + "_" +
x).collect(Collectors.joining(","));
+ }
+ }
+}
diff --git
a/clients/src/main/java/org/apache/kafka/clients/admin/StreamsGroupMemberDescription.java
b/clients/src/main/java/org/apache/kafka/clients/admin/StreamsGroupMemberDescription.java
new file mode 100644
index 00000000000..8f4fa126e13
--- /dev/null
+++
b/clients/src/main/java/org/apache/kafka/clients/admin/StreamsGroupMemberDescription.java
@@ -0,0 +1,371 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.kafka.clients.admin;
+
+import org.apache.kafka.common.annotation.InterfaceStability;
+
+import java.util.List;
+import java.util.Map;
+import java.util.Objects;
+import java.util.Optional;
+import java.util.stream.Collectors;
+
+/**
+ * A detailed description of a single streams groups member in the cluster.
+ */
[email protected]
+public class StreamsGroupMemberDescription {
+
+ private final String memberId;
+ private final int memberEpoch;
+ private final Optional<String> instanceId;
+ private final Optional<String> rackId;
+ private final String clientId;
+ private final String clientHost;
+ private final int topologyEpoch;
+ private final String processId;
+ private final Optional<Endpoint> userEndpoint;
+ private final Map<String, String> clientTags;
+ private final List<TaskOffset> taskOffsets;
+ private final List<TaskOffset> taskEndOffsets;
+ private final StreamsGroupMemberAssignment assignment;
+ private final StreamsGroupMemberAssignment targetAssignment;
+ private final boolean isClassic;
+
+ @SuppressWarnings("ParameterNumber")
+ public StreamsGroupMemberDescription(
+ final String memberId,
+ final int memberEpoch,
+ final Optional<String> instanceId,
+ final Optional<String> rackId,
+ final String clientId,
+ final String clientHost,
+ final int topologyEpoch,
+ final String processId,
+ final Optional<Endpoint> userEndpoint,
+ final Map<String, String> clientTags,
+ final List<TaskOffset> taskOffsets,
+ final List<TaskOffset> taskEndOffsets,
+ final StreamsGroupMemberAssignment assignment,
+ final StreamsGroupMemberAssignment targetAssignment,
+ final boolean isClassic
+ ) {
+ this.memberId = Objects.requireNonNull(memberId);
+ this.memberEpoch = memberEpoch;
+ this.instanceId = Objects.requireNonNull(instanceId);
+ this.rackId = Objects.requireNonNull(rackId);
+ this.clientId = Objects.requireNonNull(clientId);
+ this.clientHost = Objects.requireNonNull(clientHost);
+ this.topologyEpoch = topologyEpoch;
+ this.processId = Objects.requireNonNull(processId);
+ this.userEndpoint = Objects.requireNonNull(userEndpoint);
+ this.clientTags = Objects.requireNonNull(clientTags);
+ this.taskOffsets = Objects.requireNonNull(taskOffsets);
+ this.taskEndOffsets = Objects.requireNonNull(taskEndOffsets);
+ this.assignment = Objects.requireNonNull(assignment);
+ this.targetAssignment = Objects.requireNonNull(targetAssignment);
+ this.isClassic = isClassic;
+ }
+
+ /**
+ * The id of the group member.
+ */
+ public String memberId() {
+ return memberId;
+ }
+
+ /**
+ * The epoch of the group member.
+ */
+ public int memberEpoch() {
+ return memberEpoch;
+ }
+
+ /**
+ * The id of the instance, used for static membership, if available.
+ */
+ public Optional<String> instanceId() {
+ return instanceId;
+ }
+
+ /**
+ * The rack ID of the group member.
+ */
+ public Optional<String> rackId() {
+ return rackId;
+ }
+
+ /**
+ * The client ID of the group member.
+ */
+ public String clientId() {
+ return clientId;
+ }
+
+ /**
+ * The host of the group member.
+ */
+ public String clientHost() {
+ return clientHost;
+ }
+
+ /**
+ * The epoch of the topology present on the client.
+ */
+ public int topologyEpoch() {
+ return topologyEpoch;
+ }
+
+ /**
+ * Identity of the streams instance that may have multiple clients.
+ */
+ public String processId() {
+ return processId;
+ }
+
+ /**
+ * User-defined endpoint for Interactive Queries.
+ */
+ public Optional<Endpoint> userEndpoint() {
+ return userEndpoint;
+ }
+
+ /**
+ * Used for rack-aware assignment algorithm.
+ */
+ public Map<String, String> clientTags() {
+ return Map.copyOf(clientTags);
+ }
+
+ /**
+ * Cumulative offsets for tasks.
+ */
+ public List<TaskOffset> taskOffsets() {
+ return List.copyOf(taskOffsets);
+ }
+
+ /**
+ * Cumulative task changelog end offsets for tasks.
+ */
+ public List<TaskOffset> taskEndOffsets() {
+ return List.copyOf(taskEndOffsets);
+ }
+
+ /**
+ * The current assignment.
+ */
+ public StreamsGroupMemberAssignment assignment() {
+ return assignment;
+ }
+
+ /**
+ * The target assignment.
+ */
+ public StreamsGroupMemberAssignment targetAssignment() {
+ return targetAssignment;
+ }
+
+ /**
+ * The flag indicating whether a member is classic.
+ */
+ public boolean isClassic() {
+ return isClassic;
+ }
+
+ @SuppressWarnings("CyclomaticComplexity")
+ @Override
+ public boolean equals(final Object o) {
+ if (this == o) {
+ return true;
+ }
+ if (o == null || getClass() != o.getClass()) {
+ return false;
+ }
+ final StreamsGroupMemberDescription that =
(StreamsGroupMemberDescription) o;
+ return memberEpoch == that.memberEpoch
+ && topologyEpoch == that.topologyEpoch
+ && isClassic == that.isClassic
+ && Objects.equals(memberId, that.memberId)
+ && Objects.equals(instanceId, that.instanceId)
+ && Objects.equals(rackId, that.rackId)
+ && Objects.equals(clientId, that.clientId)
+ && Objects.equals(clientHost, that.clientHost)
+ && Objects.equals(processId, that.processId)
+ && Objects.equals(userEndpoint, that.userEndpoint)
+ && Objects.equals(clientTags, that.clientTags)
+ && Objects.equals(taskOffsets, that.taskOffsets)
+ && Objects.equals(taskEndOffsets, that.taskEndOffsets)
+ && Objects.equals(assignment, that.assignment)
+ && Objects.equals(targetAssignment, that.targetAssignment);
+ }
+
+ @Override
+ public int hashCode() {
+ return Objects.hash(
+ memberId,
+ memberEpoch,
+ instanceId,
+ rackId,
+ clientId,
+ clientHost,
+ topologyEpoch,
+ processId,
+ userEndpoint,
+ clientTags,
+ taskOffsets,
+ taskEndOffsets,
+ assignment,
+ targetAssignment,
+ isClassic
+ );
+ }
+
+ @Override
+ public String toString() {
+ return "(" +
+ "memberId=" + memberId +
+ ", memberEpoch=" + memberEpoch +
+ ", instanceId=" + instanceId.orElse("null") +
+ ", rackId=" + rackId.orElse("null") +
+ ", clientId=" + clientId +
+ ", clientHost=" + clientHost +
+ ", topologyEpoch=" + topologyEpoch +
+ ", processId=" + processId +
+ ", userEndpoint=" +
userEndpoint.map(Endpoint::toString).orElse("null") +
+ ", clientTags=" + clientTags +
+ ", taskOffsets=" +
taskOffsets.stream().map(TaskOffset::toString).collect(Collectors.joining(","))
+
+ ", taskEndOffsets=" +
taskEndOffsets.stream().map(TaskOffset::toString).collect(Collectors.joining(","))
+
+ ", assignment=" + assignment +
+ ", targetAssignment=" + targetAssignment +
+ ", isClassic=" + isClassic +
+ ')';
+ }
+
+ /**
+ * The user-defined endpoint for the member.
+ */
+ public static class Endpoint {
+
+ private final String host;
+ private final int port;
+
+ public Endpoint(final String host, final int port) {
+ this.host = Objects.requireNonNull(host);
+ this.port = port;
+ }
+
+ public String host() {
+ return host;
+ }
+
+ public int port() {
+ return port;
+ }
+
+ @Override
+ public boolean equals(final Object o) {
+ if (this == o) {
+ return true;
+ }
+ if (o == null || getClass() != o.getClass()) {
+ return false;
+ }
+ final Endpoint endpoint = (Endpoint) o;
+ return port == endpoint.port && Objects.equals(host,
endpoint.host);
+ }
+
+ @Override
+ public int hashCode() {
+ return Objects.hash(host, port);
+ }
+
+ @Override
+ public String toString() {
+ return "(" +
+ "host=" + host +
+ ", port=" + port +
+ ')';
+ }
+ }
+
+ /**
+ * The cumulative offset for one task.
+ */
+ public static class TaskOffset {
+
+ private final String subtopologyId;
+ private final int partition;
+ private final long offset;
+
+ public TaskOffset(final String subtopologyId, final int partition,
final long offset) {
+ this.subtopologyId = Objects.requireNonNull(subtopologyId);
+ this.partition = partition;
+ this.offset = offset;
+ }
+
+ /**
+ * The subtopology identifier.
+ */
+ public String subtopologyId() {
+ return subtopologyId;
+ }
+
+ /**
+ * The partition of the task.
+ */
+ public int partition() {
+ return partition;
+ }
+
+ /**
+ * The cumulative offset (sum of offsets in all input partitions).
+ */
+ public long offset() {
+ return offset;
+ }
+
+ @Override
+ public boolean equals(final Object o) {
+ if (this == o) {
+ return true;
+ }
+ if (o == null || getClass() != o.getClass()) {
+ return false;
+ }
+ final TaskOffset that = (TaskOffset) o;
+ return partition == that.partition
+ && offset == that.offset
+ && Objects.equals(subtopologyId, that.subtopologyId);
+ }
+
+ @Override
+ public int hashCode() {
+ return Objects.hash(
+ subtopologyId,
+ partition,
+ offset
+ );
+ }
+
+ @Override
+ public String toString() {
+ return subtopologyId +
+ "_" + partition +
+ "=" + offset;
+ }
+ }
+}
diff --git
a/clients/src/main/java/org/apache/kafka/clients/admin/StreamsGroupSubtopologyDescription.java
b/clients/src/main/java/org/apache/kafka/clients/admin/StreamsGroupSubtopologyDescription.java
new file mode 100644
index 00000000000..e01cafc98d4
--- /dev/null
+++
b/clients/src/main/java/org/apache/kafka/clients/admin/StreamsGroupSubtopologyDescription.java
@@ -0,0 +1,196 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.kafka.clients.admin;
+
+import org.apache.kafka.common.annotation.InterfaceStability;
+
+import java.util.List;
+import java.util.Map;
+import java.util.Objects;
+import java.util.stream.Collectors;
+
+/**
+ * A detailed description of a subtopology in a streams group.
+ */
[email protected]
+public class StreamsGroupSubtopologyDescription {
+
+ private final String subtopologyId;
+ private final List<String> sourceTopics;
+ private final List<String> repartitionSinkTopics;
+ private final Map<String, TopicInfo> stateChangelogTopics;
+ private final Map<String, TopicInfo> repartitionSourceTopics;
+
+ public StreamsGroupSubtopologyDescription(
+ final String subtopologyId,
+ final List<String> sourceTopics,
+ final List<String> repartitionSinkTopics,
+ final Map<String, TopicInfo> stateChangelogTopics,
+ final Map<String, TopicInfo> repartitionSourceTopics
+ ) {
+ this.subtopologyId = Objects.requireNonNull(subtopologyId,
"subtopologyId must be non-null");
+ this.sourceTopics = Objects.requireNonNull(sourceTopics, "sourceTopics
must be non-null");
+ this.repartitionSinkTopics =
Objects.requireNonNull(repartitionSinkTopics, "repartitionSinkTopics must be
non-null");
+ this.stateChangelogTopics =
Objects.requireNonNull(stateChangelogTopics, "stateChangelogTopics must be
non-null");
+ this.repartitionSourceTopics =
Objects.requireNonNull(repartitionSourceTopics, "repartitionSourceTopics must
be non-null");
+ }
+
+ /**
+ * String to uniquely identify the subtopology.
+ */
+ public String subtopologyId() {
+ return subtopologyId;
+ }
+
+ /**
+ * The topics the topology reads from.
+ */
+ public List<String> sourceTopics() {
+ return List.copyOf(sourceTopics);
+ }
+
+ /**
+ * The repartition topics the topology writes to.
+ */
+ public List<String> repartitionSinkTopics() {
+ return List.copyOf(repartitionSinkTopics);
+ }
+
+ /**
+ * The set of state changelog topics associated with this subtopology.
+ */
+ public Map<String, TopicInfo> stateChangelogTopics() {
+ return Map.copyOf(stateChangelogTopics);
+ }
+
+ /**
+ * The set of source topics that are internally created repartition topics.
+ */
+ public Map<String, TopicInfo> repartitionSourceTopics() {
+ return Map.copyOf(repartitionSourceTopics);
+ }
+
+ @Override
+ public boolean equals(final Object o) {
+ if (this == o) {
+ return true;
+ }
+ if (o == null || getClass() != o.getClass()) {
+ return false;
+ }
+ final StreamsGroupSubtopologyDescription that =
(StreamsGroupSubtopologyDescription) o;
+ return Objects.equals(subtopologyId, that.subtopologyId)
+ && Objects.equals(sourceTopics, that.sourceTopics)
+ && Objects.equals(repartitionSinkTopics,
that.repartitionSinkTopics)
+ && Objects.equals(stateChangelogTopics, that.stateChangelogTopics)
+ && Objects.equals(repartitionSourceTopics,
that.repartitionSourceTopics);
+ }
+
+ @Override
+ public int hashCode() {
+ return Objects.hash(
+ subtopologyId,
+ sourceTopics,
+ repartitionSinkTopics,
+ stateChangelogTopics,
+ repartitionSourceTopics
+ );
+ }
+
+ @Override
+ public String toString() {
+ return "(" +
+ "subtopologyId='" + subtopologyId + '\'' +
+ ", sourceTopics=" + sourceTopics +
+ ", repartitionSinkTopics=" + repartitionSinkTopics +
+ ", stateChangelogTopics=" + stateChangelogTopics +
+ ", repartitionSourceTopics=" + repartitionSourceTopics +
+ ')';
+ }
+
+ /**
+ * Information about a topic. These configs reflect what is required by
the topology, but may differ from the current state on the
+ * broker.
+ */
+ public static class TopicInfo {
+
+ private final int partitions;
+ private final int replicationFactor;
+ private final Map<String, String> topicConfigs;
+
+ public TopicInfo(final int partitions, final int replicationFactor,
final Map<String, String> topicConfigs) {
+ this.partitions = partitions;
+ this.replicationFactor = replicationFactor;
+ this.topicConfigs = Objects.requireNonNull(topicConfigs,
"topicConfigs must be non-null");
+ }
+
+ /**
+ * The number of partitions in the topic. Can be 0 if no specific
number of partitions is enforced.
+ */
+ public int partitions() {
+ return partitions;
+ }
+
+ /**
+ * The replication factor of the topic. Can be 0 if the default
replication factor is used.
+ */
+ public int replicationFactor() {
+ return replicationFactor;
+ }
+
+ /**
+ * Topic-level configurations as key-value pairs. Default
configuration can be omitted.
+ */
+ public Map<String, String> topicConfigs() {
+ return Map.copyOf(topicConfigs);
+ }
+
+ @Override
+ public boolean equals(final Object o) {
+ if (this == o) {
+ return true;
+ }
+ if (o == null || getClass() != o.getClass()) {
+ return false;
+ }
+ final TopicInfo topicInfo = (TopicInfo) o;
+ return partitions == topicInfo.partitions
+ && replicationFactor == topicInfo.replicationFactor
+ && Objects.equals(topicConfigs, topicInfo.topicConfigs);
+ }
+
+ @Override
+ public int hashCode() {
+ return Objects.hash(
+ partitions,
+ replicationFactor,
+ topicConfigs
+ );
+ }
+
+ @Override
+ public String toString() {
+ return "TopicInfo(" +
+ "partitions=" + partitions +
+ ", replicationFactor=" + replicationFactor +
+ ", topicConfigs=" + topicConfigs.entrySet().stream().map(x ->
x.getKey() + "=" + x.getValue())
+ .collect(Collectors.joining(",")) +
+ ')';
+ }
+ }
+
+}
diff --git
a/clients/src/main/java/org/apache/kafka/clients/admin/internals/DescribeStreamsGroupsHandler.java
b/clients/src/main/java/org/apache/kafka/clients/admin/internals/DescribeStreamsGroupsHandler.java
new file mode 100644
index 00000000000..8bf793bd31c
--- /dev/null
+++
b/clients/src/main/java/org/apache/kafka/clients/admin/internals/DescribeStreamsGroupsHandler.java
@@ -0,0 +1,284 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.kafka.clients.admin.internals;
+
+import org.apache.kafka.clients.admin.StreamsGroupDescription;
+import org.apache.kafka.clients.admin.StreamsGroupMemberAssignment;
+import org.apache.kafka.clients.admin.StreamsGroupMemberDescription;
+import org.apache.kafka.clients.admin.StreamsGroupSubtopologyDescription;
+import org.apache.kafka.common.GroupState;
+import org.apache.kafka.common.Node;
+import org.apache.kafka.common.acl.AclOperation;
+import org.apache.kafka.common.message.StreamsGroupDescribeRequestData;
+import org.apache.kafka.common.message.StreamsGroupDescribeResponseData;
+import org.apache.kafka.common.protocol.Errors;
+import org.apache.kafka.common.requests.AbstractResponse;
+import org.apache.kafka.common.requests.FindCoordinatorRequest.CoordinatorType;
+import org.apache.kafka.common.requests.StreamsGroupDescribeRequest;
+import org.apache.kafka.common.requests.StreamsGroupDescribeResponse;
+import org.apache.kafka.common.utils.LogContext;
+
+import org.slf4j.Logger;
+
+import java.util.ArrayList;
+import java.util.Collection;
+import java.util.Collections;
+import java.util.HashMap;
+import java.util.HashSet;
+import java.util.List;
+import java.util.Map;
+import java.util.Optional;
+import java.util.Set;
+import java.util.stream.Collectors;
+
+import static
org.apache.kafka.clients.admin.internals.AdminUtils.validAclOperations;
+
+public class DescribeStreamsGroupsHandler extends
AdminApiHandler.Batched<CoordinatorKey, StreamsGroupDescription> {
+
+ private final boolean includeAuthorizedOperations;
+ private final Logger log;
+ private final AdminApiLookupStrategy<CoordinatorKey> lookupStrategy;
+
+ public DescribeStreamsGroupsHandler(
+ boolean includeAuthorizedOperations,
+ LogContext logContext) {
+ this.includeAuthorizedOperations = includeAuthorizedOperations;
+ this.log = logContext.logger(DescribeStreamsGroupsHandler.class);
+ this.lookupStrategy = new CoordinatorStrategy(CoordinatorType.GROUP,
logContext);
+ }
+
+ private static Set<CoordinatorKey> buildKeySet(Collection<String>
groupIds) {
+ return groupIds.stream()
+ .map(CoordinatorKey::byGroupId)
+ .collect(Collectors.toSet());
+ }
+
+ public static AdminApiFuture.SimpleAdminApiFuture<CoordinatorKey,
StreamsGroupDescription> newFuture(Collection<String> groupIds) {
+ return AdminApiFuture.forKeys(buildKeySet(groupIds));
+ }
+
+ @Override
+ public String apiName() {
+ return "describeStreamsGroups";
+ }
+
+ @Override
+ public AdminApiLookupStrategy<CoordinatorKey> lookupStrategy() {
+ return lookupStrategy;
+ }
+
+ @Override
+ public StreamsGroupDescribeRequest.Builder buildBatchedRequest(int
coordinatorId, Set<CoordinatorKey> keys) {
+ List<String> groupIds = keys.stream().map(key -> {
+ if (key.type != CoordinatorType.GROUP) {
+ throw new IllegalArgumentException("Invalid group coordinator
key " + key +
+ " when building `DescribeStreamsGroups` request");
+ }
+ return key.idValue;
+ }).collect(Collectors.toList());
+ StreamsGroupDescribeRequestData data = new
StreamsGroupDescribeRequestData()
+ .setGroupIds(groupIds)
+ .setIncludeAuthorizedOperations(includeAuthorizedOperations);
+ return new StreamsGroupDescribeRequest.Builder(data, true);
+ }
+
+ @Override
+ public ApiResult<CoordinatorKey, StreamsGroupDescription> handleResponse(
+ Node coordinator,
+ Set<CoordinatorKey> groupIds,
+ AbstractResponse abstractResponse) {
+ final StreamsGroupDescribeResponse response =
(StreamsGroupDescribeResponse) abstractResponse;
+ final Map<CoordinatorKey, StreamsGroupDescription> completed = new
HashMap<>();
+ final Map<CoordinatorKey, Throwable> failed = new HashMap<>();
+ final Set<CoordinatorKey> groupsToUnmap = new HashSet<>();
+
+ for (StreamsGroupDescribeResponseData.DescribedGroup describedGroup :
response.data().groups()) {
+ CoordinatorKey groupIdKey =
CoordinatorKey.byGroupId(describedGroup.groupId());
+ Errors error = Errors.forCode(describedGroup.errorCode());
+ if (error != Errors.NONE) {
+ handleError(groupIdKey, describedGroup, coordinator, error,
describedGroup.errorMessage(), completed, failed, groupsToUnmap);
+ continue;
+ }
+ if (describedGroup.topology() == null) {
+ log.error("`DescribeStreamsGroups` response for group id {} is
missing the topology information", groupIdKey.idValue);
+ failed.put(groupIdKey, new IllegalStateException("Topology
information is missing"));
+ continue;
+ }
+
+ final Set<AclOperation> authorizedOperations =
validAclOperations(describedGroup.authorizedOperations());
+
+ final StreamsGroupDescription streamsGroupDescription = new
StreamsGroupDescription(
+ describedGroup.groupId(),
+ describedGroup.groupEpoch(),
+ describedGroup.assignmentEpoch(),
+ describedGroup.topology().epoch(),
+
convertSubtopologies(describedGroup.topology().subtopologies()),
+ convertMembers(describedGroup.members()),
+ GroupState.parse(describedGroup.groupState()),
+ coordinator,
+ authorizedOperations
+ );
+ completed.put(groupIdKey, streamsGroupDescription);
+ }
+
+ return new ApiResult<>(completed, failed, new
ArrayList<>(groupsToUnmap));
+ }
+
+ private Collection<StreamsGroupMemberDescription> convertMembers(final
List<StreamsGroupDescribeResponseData.Member> members) {
+ final List<StreamsGroupMemberDescription> memberDescriptions = new
ArrayList<>(members.size());
+ members.forEach(groupMember ->
+ memberDescriptions.add(new StreamsGroupMemberDescription(
+ groupMember.memberId(),
+ groupMember.memberEpoch(),
+ Optional.ofNullable(groupMember.instanceId()),
+ Optional.ofNullable(groupMember.rackId()),
+ groupMember.clientId(),
+ groupMember.clientHost(),
+ groupMember.topologyEpoch(),
+ groupMember.processId(),
+
Optional.ofNullable(groupMember.userEndpoint()).map(this::convertEndpoint),
+ convertClientTags(groupMember.clientTags()),
+ convertTaskOffsets(groupMember.taskOffsets()),
+ convertTaskOffsets(groupMember.taskEndOffsets()),
+ convertAssignment(groupMember.assignment()),
+ convertAssignment(groupMember.targetAssignment()),
+ groupMember.isClassic()
+ ))
+ );
+ return memberDescriptions;
+ }
+
+ private Collection<StreamsGroupSubtopologyDescription>
convertSubtopologies(final List<StreamsGroupDescribeResponseData.Subtopology>
subtopologies) {
+ final List<StreamsGroupSubtopologyDescription> subtopologyDescriptions
= new ArrayList<>(subtopologies.size());
+ subtopologies.forEach(subtopology ->
+ subtopologyDescriptions.add(new StreamsGroupSubtopologyDescription(
+ subtopology.subtopologyId(),
+ subtopology.sourceTopics(),
+ subtopology.repartitionSinkTopics(),
+ convertTopicInfos(subtopology.stateChangelogTopics()),
+ convertTopicInfos(subtopology.repartitionSourceTopics())
+ ))
+ );
+ return subtopologyDescriptions;
+ }
+
+ private Map<String, StreamsGroupSubtopologyDescription.TopicInfo>
convertTopicInfos(final List<StreamsGroupDescribeResponseData.TopicInfo>
topicInfos) {
+ return topicInfos.stream().collect(Collectors.toMap(
+ StreamsGroupDescribeResponseData.TopicInfo::name,
+ topicInfo -> new StreamsGroupSubtopologyDescription.TopicInfo(
+ topicInfo.partitions(),
+ topicInfo.replicationFactor(),
+ topicInfo.topicConfigs().stream().collect(Collectors.toMap(
+ StreamsGroupDescribeResponseData.KeyValue::key,
+ StreamsGroupDescribeResponseData.KeyValue::value
+ ))
+ )
+ ));
+ }
+
+ private StreamsGroupMemberAssignment.TaskIds convertTaskIds(final
StreamsGroupDescribeResponseData.TaskIds taskIds) {
+ return new StreamsGroupMemberAssignment.TaskIds(
+ taskIds.subtopologyId(),
+ taskIds.partitions()
+ );
+ }
+
+ private StreamsGroupMemberAssignment convertAssignment(final
StreamsGroupDescribeResponseData.Assignment assignment) {
+ return new StreamsGroupMemberAssignment(
+
assignment.activeTasks().stream().map(this::convertTaskIds).collect(Collectors.toList()),
+
assignment.standbyTasks().stream().map(this::convertTaskIds).collect(Collectors.toList()),
+
assignment.warmupTasks().stream().map(this::convertTaskIds).collect(Collectors.toList())
+ );
+ }
+
+ private List<StreamsGroupMemberDescription.TaskOffset>
convertTaskOffsets(final List<StreamsGroupDescribeResponseData.TaskOffset>
taskOffsets) {
+ return taskOffsets.stream().map(taskOffset ->
+ new StreamsGroupMemberDescription.TaskOffset(
+ taskOffset.subtopologyId(),
+ taskOffset.partition(),
+ taskOffset.offset()
+ )
+ ).collect(Collectors.toList());
+ }
+
+ private Map<String, String> convertClientTags(final
List<StreamsGroupDescribeResponseData.KeyValue> keyValues) {
+ return keyValues.stream().collect(Collectors.toMap(
+ StreamsGroupDescribeResponseData.KeyValue::key,
+ StreamsGroupDescribeResponseData.KeyValue::value
+ ));
+ }
+
+ private StreamsGroupMemberDescription.Endpoint convertEndpoint(final
StreamsGroupDescribeResponseData.Endpoint endpoint) {
+ return new StreamsGroupMemberDescription.Endpoint(endpoint.host(),
endpoint.port());
+ }
+
+
+ private void handleError(
+ CoordinatorKey groupId,
+ StreamsGroupDescribeResponseData.DescribedGroup describedGroup,
+ Node coordinator,
+ Errors error,
+ String errorMsg,
+ Map<CoordinatorKey, StreamsGroupDescription> completed,
+ Map<CoordinatorKey, Throwable> failed,
+ Set<CoordinatorKey> groupsToUnmap) {
+ switch (error) {
+ case GROUP_AUTHORIZATION_FAILED:
+ log.debug("`DescribeStreamsGroups` request for group id {}
failed due to error {}", groupId.idValue, error);
+ failed.put(groupId, error.exception(errorMsg));
+ break;
+
+ case COORDINATOR_LOAD_IN_PROGRESS:
+ // If the coordinator is in the middle of loading, then we
just need to retry
+ log.debug("`DescribeStreamsGroups` request for group id {}
failed because the coordinator " +
+ "is still in the process of loading state. Will retry",
groupId.idValue);
+ break;
+
+ case COORDINATOR_NOT_AVAILABLE:
+ case NOT_COORDINATOR:
+ // If the coordinator is unavailable or there was a
coordinator change, then we unmap
+ // the key so that we retry the `FindCoordinator` request
+ log.debug("`DescribeStreamsGroups` request for group id {}
returned error {}. " +
+ "Will attempt to find the coordinator again and retry",
groupId.idValue, error);
+ groupsToUnmap.add(groupId);
+ break;
+
+ case GROUP_ID_NOT_FOUND:
+ // In order to maintain compatibility with
describeConsumerGroups, an unknown group ID is
+ // reported as a DEAD streams group, and the admin client
operation did not fail
+ log.debug("`DescribeStreamsGroups` request for group id {}
failed because the group does not exist. {}",
+ groupId.idValue, errorMsg != null ? errorMsg : "");
+ final StreamsGroupDescription streamsGroupDescription =
+ new StreamsGroupDescription(
+ groupId.idValue,
+ -1,
+ -1,
+ -1,
+ Collections.emptySet(),
+ Collections.emptySet(),
+ GroupState.DEAD,
+ coordinator,
+
validAclOperations(describedGroup.authorizedOperations()));
+ completed.put(groupId, streamsGroupDescription);
+ break;
+
+ default:
+ log.error("`DescribeStreamsGroups` request for group id {}
failed due to unexpected error {}", groupId.idValue, error);
+ failed.put(groupId, error.exception(errorMsg));
+ }
+ }
+}
diff --git
a/clients/src/test/java/org/apache/kafka/clients/admin/KafkaAdminClientTest.java
b/clients/src/test/java/org/apache/kafka/clients/admin/KafkaAdminClientTest.java
index 7b888e83962..17569ed3956 100644
---
a/clients/src/test/java/org/apache/kafka/clients/admin/KafkaAdminClientTest.java
+++
b/clients/src/test/java/org/apache/kafka/clients/admin/KafkaAdminClientTest.java
@@ -160,6 +160,7 @@ import
org.apache.kafka.common.message.OffsetFetchRequestData.OffsetFetchRequest
import org.apache.kafka.common.message.RemoveRaftVoterRequestData;
import org.apache.kafka.common.message.RemoveRaftVoterResponseData;
import org.apache.kafka.common.message.ShareGroupDescribeResponseData;
+import org.apache.kafka.common.message.StreamsGroupDescribeResponseData;
import org.apache.kafka.common.message.UnregisterBrokerResponseData;
import org.apache.kafka.common.message.WriteTxnMarkersResponseData;
import org.apache.kafka.common.protocol.ApiKeys;
@@ -237,6 +238,7 @@ import
org.apache.kafka.common.requests.RemoveRaftVoterRequest;
import org.apache.kafka.common.requests.RemoveRaftVoterResponse;
import org.apache.kafka.common.requests.RequestTestUtils;
import org.apache.kafka.common.requests.ShareGroupDescribeResponse;
+import org.apache.kafka.common.requests.StreamsGroupDescribeResponse;
import org.apache.kafka.common.requests.UnregisterBrokerResponse;
import org.apache.kafka.common.requests.UpdateFeaturesRequest;
import org.apache.kafka.common.requests.UpdateFeaturesResponse;
@@ -5762,6 +5764,233 @@ public class KafkaAdminClientTest {
}
}
+ @Test
+ public void testDescribeStreamsGroups() throws Exception {
+ try (AdminClientUnitTestEnv env = new
AdminClientUnitTestEnv(mockCluster(1, 0))) {
+ env.kafkaClient().setNodeApiVersions(NodeApiVersions.create());
+
+ // Retriable FindCoordinatorResponse errors should be retried
+
env.kafkaClient().prepareResponse(prepareFindCoordinatorResponse(Errors.COORDINATOR_NOT_AVAILABLE,
Node.noNode()));
+
env.kafkaClient().prepareResponse(prepareFindCoordinatorResponse(Errors.COORDINATOR_LOAD_IN_PROGRESS,
Node.noNode()));
+
env.kafkaClient().prepareResponse(prepareFindCoordinatorResponse(Errors.NONE,
env.cluster().controller()));
+
+ StreamsGroupDescribeResponseData data = new
StreamsGroupDescribeResponseData();
+
+ // Retriable errors should be retried
+ data.groups().add(new
StreamsGroupDescribeResponseData.DescribedGroup()
+ .setGroupId(GROUP_ID)
+ .setErrorCode(Errors.COORDINATOR_LOAD_IN_PROGRESS.code()));
+ env.kafkaClient().prepareResponse(new
StreamsGroupDescribeResponse(data));
+
+ // We need to return two responses here, one with NOT_COORDINATOR
error when calling describe streams group
+ // api using coordinator that has moved. This will retry whole
operation. So we need to again respond with a
+ // FindCoordinatorResponse.
+ //
+ // And the same reason for COORDINATOR_NOT_AVAILABLE error response
+ data = new StreamsGroupDescribeResponseData();
+ data.groups().add(new
StreamsGroupDescribeResponseData.DescribedGroup()
+ .setGroupId(GROUP_ID)
+ .setErrorCode(Errors.NOT_COORDINATOR.code()));
+ env.kafkaClient().prepareResponse(new
StreamsGroupDescribeResponse(data));
+
env.kafkaClient().prepareResponse(prepareFindCoordinatorResponse(Errors.NONE,
env.cluster().controller()));
+
+ data = new StreamsGroupDescribeResponseData();
+ data.groups().add(new
StreamsGroupDescribeResponseData.DescribedGroup()
+ .setGroupId(GROUP_ID)
+ .setErrorCode(Errors.COORDINATOR_NOT_AVAILABLE.code()));
+ env.kafkaClient().prepareResponse(new
StreamsGroupDescribeResponse(data));
+
env.kafkaClient().prepareResponse(prepareFindCoordinatorResponse(Errors.NONE,
env.cluster().controller()));
+
+ data = makeFullStreamsGroupDescribeResponse();
+
+ env.kafkaClient().prepareResponse(new
StreamsGroupDescribeResponse(data));
+
+ final DescribeStreamsGroupsResult result =
env.adminClient().describeStreamsGroups(singletonList(GROUP_ID));
+ final StreamsGroupDescription groupDescription =
result.describedGroups().get(GROUP_ID).get();
+
+ final String subtopologyId = "my_subtopology";
+ StreamsGroupMemberAssignment.TaskIds expectedActiveTasks1 =
+ new StreamsGroupMemberAssignment.TaskIds(subtopologyId,
asList(0, 1, 2));
+ StreamsGroupMemberAssignment.TaskIds expectedStandbyTasks1 =
+ new StreamsGroupMemberAssignment.TaskIds(subtopologyId,
asList(3, 4, 5));
+ StreamsGroupMemberAssignment.TaskIds expectedWarmupTasks1 =
+ new StreamsGroupMemberAssignment.TaskIds(subtopologyId,
asList(6, 7, 8));
+ StreamsGroupMemberAssignment.TaskIds expectedActiveTasks2 =
+ new StreamsGroupMemberAssignment.TaskIds(subtopologyId,
asList(3, 4, 5));
+ StreamsGroupMemberAssignment.TaskIds expectedStandbyTasks2 =
+ new StreamsGroupMemberAssignment.TaskIds(subtopologyId,
asList(6, 7, 8));
+ StreamsGroupMemberAssignment.TaskIds expectedWarmupTasks2 =
+ new StreamsGroupMemberAssignment.TaskIds(subtopologyId,
asList(0, 1, 2));
+ StreamsGroupMemberAssignment expectedMemberAssignment = new
StreamsGroupMemberAssignment(
+ singletonList(expectedActiveTasks1),
+ singletonList(expectedStandbyTasks1),
+ singletonList(expectedWarmupTasks1)
+ );
+ StreamsGroupMemberAssignment expectedTargetAssignment = new
StreamsGroupMemberAssignment(
+ singletonList(expectedActiveTasks2),
+ singletonList(expectedStandbyTasks2),
+ singletonList(expectedWarmupTasks2)
+ );
+ final String instanceId = "instance-id";
+ final String rackId = "rack-id";
+ StreamsGroupMemberDescription expectedMemberOne = new
StreamsGroupMemberDescription(
+ "0",
+ 1,
+ Optional.of(instanceId),
+ Optional.of(rackId),
+ "clientId0",
+ "clientHost",
+ 0,
+ "processId",
+ Optional.of(new
StreamsGroupMemberDescription.Endpoint("localhost", 8080)),
+ Collections.singletonMap("key", "value"),
+ Collections.singletonList(new
StreamsGroupMemberDescription.TaskOffset(subtopologyId, 0, 0)),
+ Collections.singletonList(new
StreamsGroupMemberDescription.TaskOffset(subtopologyId, 0, 1)),
+ expectedMemberAssignment,
+ expectedTargetAssignment,
+ true
+ );
+
+ StreamsGroupMemberDescription expectedMemberTwo = new
StreamsGroupMemberDescription(
+ "1",
+ 2,
+ Optional.empty(),
+ Optional.empty(),
+ "clientId1",
+ "clientHost",
+ 1,
+ "processId2",
+ Optional.empty(),
+ Collections.emptyMap(),
+ Collections.emptyList(),
+ Collections.emptyList(),
+ new StreamsGroupMemberAssignment(Collections.emptyList(),
Collections.emptyList(), Collections.emptyList()),
+ new StreamsGroupMemberAssignment(Collections.emptyList(),
Collections.emptyList(), Collections.emptyList()),
+ false
+ );
+
+ StreamsGroupSubtopologyDescription expectedSubtopologyDescription
= new StreamsGroupSubtopologyDescription(
+ subtopologyId,
+ Collections.singletonList("my_source_topic"),
+ Collections.singletonList("my_repartition_sink_topic"),
+ Collections.singletonMap(
+ "my_changelog_topic",
+ new StreamsGroupSubtopologyDescription.TopicInfo(
+ 0,
+ (short) 3,
+ Collections.singletonMap("key1", "value1")
+ )
+ ),
+ Collections.singletonMap(
+ "my_repartition_topic",
+ new StreamsGroupSubtopologyDescription.TopicInfo(
+ 99,
+ (short) 0,
+ Collections.emptyMap()
+ )
+ )
+ );
+
+ assertEquals(1, result.describedGroups().size());
+ assertEquals(GROUP_ID, groupDescription.groupId());
+ assertEquals(2, groupDescription.members().size());
+ Iterator<StreamsGroupMemberDescription> members =
groupDescription.members().iterator();
+ assertEquals(expectedMemberOne, members.next());
+ assertEquals(expectedMemberTwo, members.next());
+ assertEquals(1, groupDescription.subtopologies().size());
+ assertEquals(expectedSubtopologyDescription,
groupDescription.subtopologies().iterator().next());
+ assertEquals(2, groupDescription.groupEpoch());
+ assertEquals(1, groupDescription.targetAssignmentEpoch());
+
+ }
+ }
+
+ @Test
+ public void testDescribeStreamsGroupsWithAuthorizedOperationsOmitted()
throws Exception {
+ try (AdminClientUnitTestEnv env = new
AdminClientUnitTestEnv(mockCluster(1, 0))) {
+ env.kafkaClient().setNodeApiVersions(NodeApiVersions.create());
+
+ env.kafkaClient().prepareResponse(
+ prepareFindCoordinatorResponse(Errors.NONE,
env.cluster().controller()));
+
+ StreamsGroupDescribeResponseData data =
makeFullStreamsGroupDescribeResponse();
+
+ data.groups().iterator().next()
+
.setAuthorizedOperations(MetadataResponse.AUTHORIZED_OPERATIONS_OMITTED);
+
+ env.kafkaClient().prepareResponse(new
StreamsGroupDescribeResponse(data));
+
+ final DescribeStreamsGroupsResult result =
env.adminClient().describeStreamsGroups(singletonList(GROUP_ID));
+ final StreamsGroupDescription groupDescription =
result.describedGroups().get(GROUP_ID).get();
+
+ assertNull(groupDescription.authorizedOperations());
+ }
+ }
+
+ @Test
+ public void testDescribeMultipleStreamsGroups() {
+ try (AdminClientUnitTestEnv env = new
AdminClientUnitTestEnv(mockCluster(1, 0))) {
+ env.kafkaClient().setNodeApiVersions(NodeApiVersions.create());
+
+
env.kafkaClient().prepareResponse(prepareFindCoordinatorResponse(Errors.NONE,
env.cluster().controller()));
+
+ StreamsGroupDescribeResponseData.TaskIds activeTasks = new
StreamsGroupDescribeResponseData.TaskIds()
+ .setSubtopologyId("my_subtopology")
+ .setPartitions(asList(0, 1, 2));
+ StreamsGroupDescribeResponseData.TaskIds standbyTasks = new
StreamsGroupDescribeResponseData.TaskIds()
+ .setSubtopologyId("my_subtopology")
+ .setPartitions(asList(3, 4, 5));
+ StreamsGroupDescribeResponseData.TaskIds warmupTasks = new
StreamsGroupDescribeResponseData.TaskIds()
+ .setSubtopologyId("my_subtopology")
+ .setPartitions(asList(6, 7, 8));
+ final StreamsGroupDescribeResponseData.Assignment memberAssignment
= new StreamsGroupDescribeResponseData.Assignment()
+ .setActiveTasks(singletonList(activeTasks))
+ .setStandbyTasks(singletonList(standbyTasks))
+ .setWarmupTasks(singletonList(warmupTasks));
+ StreamsGroupDescribeResponseData group0Data = new
StreamsGroupDescribeResponseData();
+ group0Data.groups().add(new
StreamsGroupDescribeResponseData.DescribedGroup()
+ .setGroupId(GROUP_ID)
+ .setGroupState(GroupState.STABLE.toString())
+ .setMembers(asList(
+ new StreamsGroupDescribeResponseData.Member()
+ .setMemberId("0")
+ .setClientId("clientId0")
+ .setClientHost("clientHost")
+ .setAssignment(memberAssignment),
+ new StreamsGroupDescribeResponseData.Member()
+ .setMemberId("1")
+ .setClientId("clientId1")
+ .setClientHost("clientHost")
+ .setAssignment(memberAssignment))));
+
+ StreamsGroupDescribeResponseData group1Data = new
StreamsGroupDescribeResponseData();
+ group1Data.groups().add(new
StreamsGroupDescribeResponseData.DescribedGroup()
+ .setGroupId("group-1")
+ .setGroupState(GroupState.STABLE.toString())
+ .setMembers(asList(
+ new StreamsGroupDescribeResponseData.Member()
+ .setMemberId("0")
+ .setClientId("clientId0")
+ .setClientHost("clientHost")
+ .setAssignment(memberAssignment),
+ new StreamsGroupDescribeResponseData.Member()
+ .setMemberId("1")
+ .setClientId("clientId1")
+ .setClientHost("clientHost")
+ .setAssignment(memberAssignment))));
+
+ env.kafkaClient().prepareResponse(new
StreamsGroupDescribeResponse(group0Data));
+ env.kafkaClient().prepareResponse(new
StreamsGroupDescribeResponse(group1Data));
+
+ Collection<String> groups = new HashSet<>();
+ groups.add(GROUP_ID);
+ groups.add("group-1");
+ final DescribeStreamsGroupsResult result =
env.adminClient().describeStreamsGroups(groups);
+ assertEquals(2, result.describedGroups().size());
+ assertEquals(groups, result.describedGroups().keySet());
+ }
+ }
+
@Test
public void testDescribeShareGroups() throws Exception {
try (AdminClientUnitTestEnv env = new
AdminClientUnitTestEnv(mockCluster(1, 0))) {
@@ -10281,4 +10510,116 @@ public class KafkaAdminClientTest {
assertNull(result.partitionResult(barPartition0).get());
}
}
+
+ private static StreamsGroupDescribeResponseData
makeFullStreamsGroupDescribeResponse() {
+ StreamsGroupDescribeResponseData data;
+ StreamsGroupDescribeResponseData.TaskIds activeTasks1 = new
StreamsGroupDescribeResponseData.TaskIds()
+ .setSubtopologyId("my_subtopology")
+ .setPartitions(asList(0, 1, 2));
+ StreamsGroupDescribeResponseData.TaskIds standbyTasks1 = new
StreamsGroupDescribeResponseData.TaskIds()
+ .setSubtopologyId("my_subtopology")
+ .setPartitions(asList(3, 4, 5));
+ StreamsGroupDescribeResponseData.TaskIds warmupTasks1 = new
StreamsGroupDescribeResponseData.TaskIds()
+ .setSubtopologyId("my_subtopology")
+ .setPartitions(asList(6, 7, 8));
+ StreamsGroupDescribeResponseData.TaskIds activeTasks2 = new
StreamsGroupDescribeResponseData.TaskIds()
+ .setSubtopologyId("my_subtopology")
+ .setPartitions(asList(3, 4, 5));
+ StreamsGroupDescribeResponseData.TaskIds standbyTasks2 = new
StreamsGroupDescribeResponseData.TaskIds()
+ .setSubtopologyId("my_subtopology")
+ .setPartitions(asList(6, 7, 8));
+ StreamsGroupDescribeResponseData.TaskIds warmupTasks2 = new
StreamsGroupDescribeResponseData.TaskIds()
+ .setSubtopologyId("my_subtopology")
+ .setPartitions(asList(0, 1, 2));
+ StreamsGroupDescribeResponseData.Assignment memberAssignment = new
StreamsGroupDescribeResponseData.Assignment()
+ .setActiveTasks(singletonList(activeTasks1))
+ .setStandbyTasks(singletonList(standbyTasks1))
+ .setWarmupTasks(singletonList(warmupTasks1));
+ StreamsGroupDescribeResponseData.Assignment targetAssignment = new
StreamsGroupDescribeResponseData.Assignment()
+ .setActiveTasks(singletonList(activeTasks2))
+ .setStandbyTasks(singletonList(standbyTasks2))
+ .setWarmupTasks(singletonList(warmupTasks2));
+ StreamsGroupDescribeResponseData.Member memberOne = new
StreamsGroupDescribeResponseData.Member()
+ .setMemberId("0")
+ .setMemberEpoch(1)
+ .setInstanceId("instance-id")
+ .setRackId("rack-id")
+ .setClientId("clientId0")
+ .setClientHost("clientHost")
+ .setTopologyEpoch(0)
+ .setProcessId("processId")
+ .setUserEndpoint(new StreamsGroupDescribeResponseData.Endpoint()
+ .setHost("localhost")
+ .setPort(8080)
+ )
+ .setClientTags(Collections.singletonList(new
StreamsGroupDescribeResponseData.KeyValue()
+ .setKey("key")
+ .setValue("value")
+ ))
+ .setTaskOffsets(Collections.singletonList(new
StreamsGroupDescribeResponseData.TaskOffset()
+ .setSubtopologyId("my_subtopology")
+ .setPartition(0)
+ .setOffset(0)
+ ))
+ .setTaskEndOffsets(Collections.singletonList(new
StreamsGroupDescribeResponseData.TaskOffset()
+ .setSubtopologyId("my_subtopology")
+ .setPartition(0)
+ .setOffset(1)
+ ))
+ .setAssignment(memberAssignment)
+ .setTargetAssignment(targetAssignment)
+ .setIsClassic(true);
+
+ StreamsGroupDescribeResponseData.Member memberTwo = new
StreamsGroupDescribeResponseData.Member()
+ .setMemberId("1")
+ .setMemberEpoch(2)
+ .setInstanceId(null)
+ .setRackId(null)
+ .setClientId("clientId1")
+ .setClientHost("clientHost")
+ .setTopologyEpoch(1)
+ .setProcessId("processId2")
+ .setUserEndpoint(null)
+ .setClientTags(Collections.emptyList())
+ .setTaskOffsets(Collections.emptyList())
+ .setTaskEndOffsets(Collections.emptyList())
+ .setAssignment(new StreamsGroupDescribeResponseData.Assignment())
+ .setTargetAssignment(new
StreamsGroupDescribeResponseData.Assignment())
+ .setIsClassic(false);
+
+ StreamsGroupDescribeResponseData.Subtopology subtopologyDescription =
new StreamsGroupDescribeResponseData.Subtopology()
+ .setSubtopologyId("my_subtopology")
+ .setSourceTopics(Collections.singletonList("my_source_topic"))
+
.setRepartitionSinkTopics(Collections.singletonList("my_repartition_sink_topic"))
+ .setStateChangelogTopics(Collections.singletonList(
+ new StreamsGroupDescribeResponseData.TopicInfo()
+ .setName("my_changelog_topic")
+ .setPartitions(0)
+ .setReplicationFactor((short) 3)
+ .setTopicConfigs(Collections.singletonList(new
StreamsGroupDescribeResponseData.KeyValue()
+ .setKey("key1")
+ .setValue("value1")
+ ))
+ ))
+ .setRepartitionSourceTopics(Collections.singletonList(
+ new StreamsGroupDescribeResponseData.TopicInfo()
+ .setName("my_repartition_topic")
+ .setPartitions(99)
+ .setReplicationFactor((short) 0)
+ .setTopicConfigs(Collections.emptyList())
+ ));
+
+ data = new StreamsGroupDescribeResponseData();
+ data.groups().add(new StreamsGroupDescribeResponseData.DescribedGroup()
+ .setGroupId(GROUP_ID)
+ .setGroupState(GroupState.STABLE.toString())
+ .setMembers(asList(memberOne, memberTwo))
+ .setTopology(new StreamsGroupDescribeResponseData.Topology()
+ .setEpoch(1)
+
.setSubtopologies(Collections.singletonList(subtopologyDescription))
+ )
+ .setGroupEpoch(2)
+ .setAssignmentEpoch(1));
+ return data;
+ }
}
diff --git
a/clients/src/test/java/org/apache/kafka/clients/admin/MockAdminClient.java
b/clients/src/test/java/org/apache/kafka/clients/admin/MockAdminClient.java
index b61915e4a8d..9bd566ac11c 100644
--- a/clients/src/test/java/org/apache/kafka/clients/admin/MockAdminClient.java
+++ b/clients/src/test/java/org/apache/kafka/clients/admin/MockAdminClient.java
@@ -1429,6 +1429,11 @@ public class MockAdminClient extends AdminClient {
throw new UnsupportedOperationException("Not implemented yet");
}
+ @Override
+ public synchronized DescribeStreamsGroupsResult
describeStreamsGroups(Collection<String> groupIds, DescribeStreamsGroupsOptions
options) {
+ throw new UnsupportedOperationException("Not implemented yet");
+ }
+
@Override
public synchronized DescribeClassicGroupsResult
describeClassicGroups(Collection<String> groupIds, DescribeClassicGroupsOptions
options) {
throw new UnsupportedOperationException("Not implemented yet");
diff --git
a/streams/integration-tests/src/test/java/org/apache/kafka/streams/integration/TestingMetricsInterceptingAdminClient.java
b/streams/integration-tests/src/test/java/org/apache/kafka/streams/integration/TestingMetricsInterceptingAdminClient.java
index ede78ca71ab..60a042aa28d 100644
---
a/streams/integration-tests/src/test/java/org/apache/kafka/streams/integration/TestingMetricsInterceptingAdminClient.java
+++
b/streams/integration-tests/src/test/java/org/apache/kafka/streams/integration/TestingMetricsInterceptingAdminClient.java
@@ -91,6 +91,8 @@ import
org.apache.kafka.clients.admin.DescribeReplicaLogDirsOptions;
import org.apache.kafka.clients.admin.DescribeReplicaLogDirsResult;
import org.apache.kafka.clients.admin.DescribeShareGroupsOptions;
import org.apache.kafka.clients.admin.DescribeShareGroupsResult;
+import org.apache.kafka.clients.admin.DescribeStreamsGroupsOptions;
+import org.apache.kafka.clients.admin.DescribeStreamsGroupsResult;
import org.apache.kafka.clients.admin.DescribeTopicsOptions;
import org.apache.kafka.clients.admin.DescribeTopicsResult;
import org.apache.kafka.clients.admin.DescribeTransactionsOptions;
@@ -448,6 +450,11 @@ public class TestingMetricsInterceptingAdminClient extends
AdminClient {
public DescribeShareGroupsResult describeShareGroups(final
Collection<String> groupIds, final DescribeShareGroupsOptions options) {
return adminDelegate.describeShareGroups(groupIds, options);
}
+
+ @Override
+ public DescribeStreamsGroupsResult describeStreamsGroups(final
Collection<String> groupIds, final DescribeStreamsGroupsOptions options) {
+ return adminDelegate.describeStreamsGroups(groupIds, options);
+ }
@Override
public AlterShareGroupOffsetsResult alterShareGroupOffsets(final String
groupId, final Map<TopicPartition, Long> offsets, final
AlterShareGroupOffsetsOptions options) {