This is an automated email from the ASF dual-hosted git repository.
lucasbru pushed a commit to branch kip1071
in repository https://gitbox.apache.org/repos/asf/kafka.git
The following commit(s) were added to refs/heads/kip1071 by this push:
new faf000be732 Implement kafka-streams-groups.sh --list (#18167)
faf000be732 is described below
commit faf000be732b6eff312f27191e9d95d888963dd3
Author: Alieh Saeedi <[email protected]>
AuthorDate: Mon Dec 16 11:58:05 2024 +0100
Implement kafka-streams-groups.sh --list (#18167)
Implement the core of kafka-streams-groups.sh
Implement --list and its options: (only --state)
---
bin/kafka-streams-groups.sh | 17 +++
.../java/org/apache/kafka/common/GroupState.java | 2 +-
.../kafka/tools/streams/StreamsGroupCommand.java | 170 +++++++++++++++++++++
.../tools/streams/StreamsGroupCommandOptions.java | 70 +++++++++
.../tools/streams/StreamsGroupCommandUnitTest.java | 170 +++++++++++++++++++++
5 files changed, 428 insertions(+), 1 deletion(-)
diff --git a/bin/kafka-streams-groups.sh b/bin/kafka-streams-groups.sh
new file mode 100755
index 00000000000..a204f4d2c74
--- /dev/null
+++ b/bin/kafka-streams-groups.sh
@@ -0,0 +1,17 @@
+#!/bin/bash
+# Licensed to the Apache Software Foundation (ASF) under one or more
+# contributor license agreements. See the NOTICE file distributed with
+# this work for additional information regarding copyright ownership.
+# The ASF licenses this file to You under the Apache License, Version 2.0
+# (the "License"); you may not use this file except in compliance with
+# the License. You may obtain a copy of the License at
+#
+# http://www.apache.org/licenses/LICENSE-2.0
+#
+# Unless required by applicable law or agreed to in writing, software
+# distributed under the License is distributed on an "AS IS" BASIS,
+# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+# See the License for the specific language governing permissions and
+# limitations under the License.
+
+exec $(dirname $0)/kafka-run-class.sh
org.apache.kafka.tools.streams.StreamsGroupCommand "$@"
diff --git a/clients/src/main/java/org/apache/kafka/common/GroupState.java
b/clients/src/main/java/org/apache/kafka/common/GroupState.java
index cbeec4c7341..8e7a68c9689 100644
--- a/clients/src/main/java/org/apache/kafka/common/GroupState.java
+++ b/clients/src/main/java/org/apache/kafka/common/GroupState.java
@@ -82,7 +82,7 @@ public enum GroupState {
} else if (type == GroupType.CONSUMER) {
return Set.of(PREPARING_REBALANCE, COMPLETING_REBALANCE, STABLE,
DEAD, EMPTY, ASSIGNING, RECONCILING);
} else if (type == GroupType.STREAMS) {
- return Set.of(PREPARING_REBALANCE, COMPLETING_REBALANCE, STABLE,
DEAD, EMPTY, ASSIGNING, RECONCILING, NOT_READY);
+ return Set.of(STABLE, DEAD, EMPTY, ASSIGNING, RECONCILING,
NOT_READY);
} else if (type == GroupType.SHARE) {
return Set.of(STABLE, DEAD, EMPTY);
} else {
diff --git
a/tools/src/main/java/org/apache/kafka/tools/streams/StreamsGroupCommand.java
b/tools/src/main/java/org/apache/kafka/tools/streams/StreamsGroupCommand.java
new file mode 100644
index 00000000000..6c3d1a5631b
--- /dev/null
+++
b/tools/src/main/java/org/apache/kafka/tools/streams/StreamsGroupCommand.java
@@ -0,0 +1,170 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.kafka.tools.streams;
+
+import org.apache.kafka.clients.CommonClientConfigs;
+import org.apache.kafka.clients.admin.Admin;
+import org.apache.kafka.clients.admin.GroupListing;
+import org.apache.kafka.clients.admin.ListGroupsOptions;
+import org.apache.kafka.clients.admin.ListGroupsResult;
+import org.apache.kafka.common.GroupState;
+import org.apache.kafka.common.GroupType;
+import org.apache.kafka.common.utils.Utils;
+import org.apache.kafka.server.util.CommandLineUtils;
+
+import java.io.IOException;
+import java.util.ArrayList;
+import java.util.Arrays;
+import java.util.Collection;
+import java.util.List;
+import java.util.Map;
+import java.util.Optional;
+import java.util.Properties;
+import java.util.Set;
+import java.util.concurrent.ExecutionException;
+import java.util.stream.Collectors;
+import java.util.stream.Stream;
+
+import joptsimple.OptionException;
+
+public class StreamsGroupCommand {
+
+ public static void main(String[] args) {
+ StreamsGroupCommandOptions opts = new StreamsGroupCommandOptions(args);
+ try {
+ opts.checkArgs();
+
+ CommandLineUtils.maybePrintHelpOrVersion(opts, "This tool helps to
list all streams groups.");
+
+ // should have exactly one action
+ long actions =
Stream.of(opts.listOpt).filter(opts.options::has).count();
+ if (actions != 1)
+ CommandLineUtils.printUsageAndExit(opts.parser, "Command must
include exactly one action: --list.");
+
+ run(opts);
+ } catch (OptionException e) {
+ CommandLineUtils.printUsageAndExit(opts.parser, e.getMessage());
+ }
+ }
+
+ public static void run(StreamsGroupCommandOptions opts) {
+ try (StreamsGroupService streamsGroupService = new
StreamsGroupService(opts, Map.of())) {
+ if (opts.options.has(opts.listOpt)) {
+ streamsGroupService.listGroups();
+ } else {
+ throw new IllegalArgumentException("Unknown action!");
+ }
+ } catch (IllegalArgumentException e) {
+ CommandLineUtils.printUsageAndExit(opts.parser, e.getMessage());
+ } catch (Throwable e) {
+ printError("Executing streams group command failed due to " +
e.getMessage(), Optional.of(e));
+ }
+ }
+
+ static Set<GroupState> groupStatesFromString(String input) {
+ Set<GroupState> parsedStates =
+ Arrays.stream(input.split(",")).map(s ->
GroupState.parse(s.trim())).collect(Collectors.toSet());
+ Set<GroupState> validStates =
GroupState.groupStatesForType(GroupType.STREAMS);
+ if (!validStates.containsAll(parsedStates)) {
+ throw new IllegalArgumentException("Invalid state list '" + input
+ "'. Valid states are: " +
+
validStates.stream().map(GroupState::toString).collect(Collectors.joining(",
")));
+ }
+ return parsedStates;
+ }
+
+ public static void printError(String msg, Optional<Throwable> e) {
+ System.out.println("\nError: " + msg);
+ e.ifPresent(Throwable::printStackTrace);
+ }
+
+ // Visibility for testing
+ static class StreamsGroupService implements AutoCloseable {
+ final StreamsGroupCommandOptions opts;
+ private final Admin adminClient;
+
+ public StreamsGroupService(StreamsGroupCommandOptions opts,
Map<String, String> configOverrides) {
+ this.opts = opts;
+ try {
+ this.adminClient = createAdminClient(configOverrides);
+ } catch (IOException e) {
+ throw new RuntimeException(e);
+ }
+ }
+
+ public StreamsGroupService(StreamsGroupCommandOptions opts, Admin
adminClient) {
+ this.opts = opts;
+ this.adminClient = adminClient;
+ }
+
+ public void listGroups() throws ExecutionException,
InterruptedException {
+ if (opts.options.has(opts.stateOpt)) {
+ String stateValue = opts.options.valueOf(opts.stateOpt);
+ Set<GroupState> states = (stateValue == null ||
stateValue.isEmpty())
+ ? Set.of()
+ : groupStatesFromString(stateValue);
+ List<GroupListing> listings =
listStreamsGroupsInStates(states);
+ printGroupInfo(listings);
+ } else
+ listStreamsGroups().forEach(System.out::println);
+ }
+
+ List<String> listStreamsGroups() {
+ try {
+ ListGroupsResult result = adminClient.listGroups(new
ListGroupsOptions()
+
.timeoutMs(opts.options.valueOf(opts.timeoutMsOpt).intValue())
+ .withTypes(Set.of(GroupType.STREAMS)));
+ Collection<GroupListing> listings = result.all().get();
+ return
listings.stream().map(GroupListing::groupId).collect(Collectors.toList());
+ } catch (InterruptedException | ExecutionException e) {
+ throw new RuntimeException(e);
+ }
+ }
+
+ List<GroupListing> listStreamsGroupsInStates(Set<GroupState> states)
throws ExecutionException, InterruptedException {
+ ListGroupsResult result = adminClient.listGroups(new
ListGroupsOptions()
+ .timeoutMs(opts.options.valueOf(opts.timeoutMsOpt).intValue())
+ .withTypes(Set.of(GroupType.STREAMS))
+ .inGroupStates(states));
+ return new ArrayList<>(result.all().get());
+ }
+
+ private void printGroupInfo(List<GroupListing> groups) {
+ // find proper columns width
+ int maxGroupLen = 15;
+ for (GroupListing group : groups) {
+ maxGroupLen = Math.max(maxGroupLen, group.groupId().length());
+ }
+ System.out.printf("%" + (-maxGroupLen) + "s %s\n", "GROUP",
"STATE");
+ for (GroupListing group : groups) {
+ String groupId = group.groupId();
+ String state =
group.groupState().orElse(GroupState.UNKNOWN).toString();
+ System.out.printf("%" + (-maxGroupLen) + "s %s\n", groupId,
state);
+ }
+ }
+
+ public void close() {
+ adminClient.close();
+ }
+
+ protected Admin createAdminClient(Map<String, String> configOverrides)
throws IOException {
+ Properties props = opts.options.has(opts.commandConfigOpt) ?
Utils.loadProps(opts.options.valueOf(opts.commandConfigOpt)) : new Properties();
+ props.put(CommonClientConfigs.BOOTSTRAP_SERVERS_CONFIG,
opts.options.valueOf(opts.bootstrapServerOpt));
+ props.putAll(configOverrides);
+ return Admin.create(props);
+ }
+ }
+}
diff --git
a/tools/src/main/java/org/apache/kafka/tools/streams/StreamsGroupCommandOptions.java
b/tools/src/main/java/org/apache/kafka/tools/streams/StreamsGroupCommandOptions.java
new file mode 100644
index 00000000000..ced73b44bf1
--- /dev/null
+++
b/tools/src/main/java/org/apache/kafka/tools/streams/StreamsGroupCommandOptions.java
@@ -0,0 +1,70 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.kafka.tools.streams;
+
+import org.apache.kafka.server.util.CommandDefaultOptions;
+import org.apache.kafka.server.util.CommandLineUtils;
+
+import joptsimple.OptionSpec;
+
+public class StreamsGroupCommandOptions extends CommandDefaultOptions {
+ public static final String BOOTSTRAP_SERVER_DOC = "REQUIRED: The server(s)
to connect to.";
+ public static final String LIST_DOC = "List all streams groups.";
+ public static final String TIMEOUT_MS_DOC = "The timeout that can be set
for some use cases. For example, it can be used when describing the group " +
+ "to specify the maximum amount of time in milliseconds to wait before
the group stabilizes.";
+ public static final String COMMAND_CONFIG_DOC = "Property file containing
configs to be passed to Admin Client.";
+ public static final String STATE_DOC = "When specified with '--list', it
displays the state of all groups. It can also be used to list groups with
specific states. " +
+ "Valid values are Empty, NotReady, Stable, Assigning, Reconciling, and
Dead.";
+
+ public final OptionSpec<String> bootstrapServerOpt;
+ public final OptionSpec<Void> listOpt;
+ public final OptionSpec<Long> timeoutMsOpt;
+ public final OptionSpec<String> commandConfigOpt;
+ public final OptionSpec<String> stateOpt;
+
+
+ public StreamsGroupCommandOptions(String[] args) {
+ super(args);
+
+ bootstrapServerOpt = parser.accepts("bootstrap-server",
BOOTSTRAP_SERVER_DOC)
+ .withRequiredArg()
+ .describedAs("server to connect to")
+ .ofType(String.class);
+ listOpt = parser.accepts("list", LIST_DOC);
+ timeoutMsOpt = parser.accepts("timeout", TIMEOUT_MS_DOC)
+ .withRequiredArg()
+ .describedAs("timeout (ms)")
+ .ofType(Long.class)
+ .defaultsTo(5000L);
+ commandConfigOpt = parser.accepts("command-config", COMMAND_CONFIG_DOC)
+ .withRequiredArg()
+ .describedAs("command config property file")
+ .ofType(String.class);
+ stateOpt = parser.accepts("state", STATE_DOC)
+ .availableIf(listOpt)
+ .withOptionalArg()
+ .ofType(String.class);
+
+ options = parser.parse(args);
+ }
+
+ public void checkArgs() {
+ CommandLineUtils.maybePrintHelpOrVersion(this, "This tool helps to
list streams groups.");
+
+ CommandLineUtils.checkRequiredArgs(parser, options,
bootstrapServerOpt);
+ }
+}
diff --git
a/tools/src/test/java/org/apache/kafka/tools/streams/StreamsGroupCommandUnitTest.java
b/tools/src/test/java/org/apache/kafka/tools/streams/StreamsGroupCommandUnitTest.java
new file mode 100644
index 00000000000..ff84df127aa
--- /dev/null
+++
b/tools/src/test/java/org/apache/kafka/tools/streams/StreamsGroupCommandUnitTest.java
@@ -0,0 +1,170 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.kafka.tools.streams;
+
+import org.apache.kafka.clients.admin.Admin;
+import org.apache.kafka.clients.admin.GroupListing;
+import org.apache.kafka.clients.admin.KafkaAdminClient;
+import org.apache.kafka.clients.admin.ListGroupsOptions;
+import org.apache.kafka.clients.admin.ListGroupsResult;
+import org.apache.kafka.clients.admin.MockAdminClient;
+import org.apache.kafka.common.GroupState;
+import org.apache.kafka.common.GroupType;
+import org.apache.kafka.common.KafkaFuture;
+import org.apache.kafka.test.TestUtils;
+
+import org.junit.jupiter.api.Test;
+
+import java.util.Arrays;
+import java.util.Collections;
+import java.util.HashSet;
+import java.util.Objects;
+import java.util.Optional;
+import java.util.Set;
+
+import joptsimple.OptionException;
+
+import static org.junit.jupiter.api.Assertions.assertEquals;
+import static org.junit.jupiter.api.Assertions.assertThrows;
+import static org.mockito.ArgumentMatchers.any;
+import static org.mockito.Mockito.mock;
+import static org.mockito.Mockito.when;
+
+public class StreamsGroupCommandUnitTest {
+
+ @Test
+ public void testListStreamsGroups() throws Exception {
+ String firstGroup = "first-group";
+ String secondGroup = "second-group";
+ String bootstrapServer = "localhost:9092";
+
+ String[] cgcArgs = new String[]{"--bootstrap-server", bootstrapServer,
"--list"};
+ Admin adminClient = mock(KafkaAdminClient.class);
+ ListGroupsResult result = mock(ListGroupsResult.class);
+
when(result.all()).thenReturn(KafkaFuture.completedFuture(Arrays.asList(
+ new GroupListing(firstGroup, Optional.of(GroupType.STREAMS),
"streams", Optional.of(GroupState.STABLE)),
+ new GroupListing(secondGroup, Optional.of(GroupType.STREAMS),
"streams", Optional.of(GroupState.EMPTY))
+ )));
+
when(adminClient.listGroups(any(ListGroupsOptions.class))).thenReturn(result);
+ StreamsGroupCommand.StreamsGroupService service =
getStreamsGroupService(cgcArgs, adminClient);
+ Set<String> expectedGroups = new HashSet<>(Arrays.asList(firstGroup,
secondGroup));
+
+ final Set[] foundGroups = new Set[]{Collections.emptySet()};
+ TestUtils.waitForCondition(() -> {
+ foundGroups[0] = new HashSet<>(service.listStreamsGroups());
+ return Objects.equals(expectedGroups, foundGroups[0]);
+ }, "Expected --list to show groups " + expectedGroups + ", but found "
+ foundGroups[0] + ".");
+ service.close();
+ }
+
+ @Test
+ public void testListWithUnrecognizedOption() {
+ String bootstrapServer = "localhost:9092";
+ String[] cgcArgs = new String[]{"--frivolous-nonsense",
"--bootstrap-server", bootstrapServer, "--list"};
+ assertThrows(OptionException.class, () ->
getStreamsGroupService(cgcArgs, new MockAdminClient()));
+ }
+
+ @Test
+ public void testListStreamsGroupsWithStates() throws Exception {
+ String firstGroup = "first-group";
+ String secondGroup = "second-group";
+ String bootstrapServer = "localhost:9092";
+
+ String[] cgcArgs = new String[]{"--bootstrap-server", bootstrapServer,
"--list", "--state"};
+ Admin adminClient = mock(KafkaAdminClient.class);
+ ListGroupsResult resultWithAllStates = mock(ListGroupsResult.class);
+
when(resultWithAllStates.all()).thenReturn(KafkaFuture.completedFuture(Arrays.asList(
+ new GroupListing(firstGroup, Optional.of(GroupType.STREAMS),
"streams", Optional.of(GroupState.STABLE)),
+ new GroupListing(secondGroup, Optional.of(GroupType.STREAMS),
"streams", Optional.of(GroupState.EMPTY))
+ )));
+
when(adminClient.listGroups(any(ListGroupsOptions.class))).thenReturn(resultWithAllStates);
+ StreamsGroupCommand.StreamsGroupService service =
getStreamsGroupService(cgcArgs, adminClient);
+ Set<GroupListing> expectedListing = new HashSet<>(Arrays.asList(
+ new GroupListing(firstGroup, Optional.of(GroupType.STREAMS),
"streams", Optional.of(GroupState.STABLE)),
+ new GroupListing(secondGroup, Optional.of(GroupType.STREAMS),
"streams", Optional.of(GroupState.EMPTY))));
+
+ final Set[] foundListing = new Set[]{Collections.emptySet()};
+ TestUtils.waitForCondition(() -> {
+ foundListing[0] = new
HashSet<>(service.listStreamsGroupsInStates(new
HashSet<>(Arrays.asList(GroupState.values()))));
+ return Objects.equals(expectedListing, foundListing[0]);
+ }, "Expected to show groups " + expectedListing + ", but found " +
foundListing[0]);
+
+ ListGroupsResult resultWithStableState = mock(ListGroupsResult.class);
+
when(resultWithStableState.all()).thenReturn(KafkaFuture.completedFuture(Collections.singletonList(
+ new GroupListing(firstGroup, Optional.of(GroupType.STREAMS),
"streams", Optional.of(GroupState.STABLE))
+ )));
+
when(adminClient.listGroups(any(ListGroupsOptions.class))).thenReturn(resultWithStableState);
+ Set<GroupListing> expectedListingStable = Collections.singleton(
+ new GroupListing(firstGroup, Optional.of(GroupType.STREAMS),
"streams", Optional.of(GroupState.STABLE)));
+
+ foundListing[0] = Collections.emptySet();
+
+ TestUtils.waitForCondition(() -> {
+ foundListing[0] = new
HashSet<>(service.listStreamsGroupsInStates(Collections.singleton(GroupState.STABLE)));
+ return Objects.equals(expectedListingStable, foundListing[0]);
+ }, "Expected to show groups " + expectedListingStable + ", but found "
+ foundListing[0]);
+ service.close();
+ }
+
+ @Test
+ public void testGroupStatesFromString() {
+ Set<GroupState> result =
StreamsGroupCommand.groupStatesFromString("empty");
+ assertEquals(new
HashSet<>(Collections.singletonList(GroupState.EMPTY)), result);
+ result = StreamsGroupCommand.groupStatesFromString("EMPTY");
+ assertEquals(new
HashSet<>(Collections.singletonList(GroupState.EMPTY)), result);
+
+ result = StreamsGroupCommand.groupStatesFromString("notready");
+ assertEquals(new
HashSet<>(Collections.singletonList(GroupState.NOT_READY)), result);
+ result = StreamsGroupCommand.groupStatesFromString("notReady");
+ assertEquals(new
HashSet<>(Collections.singletonList(GroupState.NOT_READY)), result);
+
+ result = StreamsGroupCommand.groupStatesFromString("assigning");
+ assertEquals(new
HashSet<>(Collections.singletonList(GroupState.ASSIGNING)), result);
+ result = StreamsGroupCommand.groupStatesFromString("ASSIGNING");
+ assertEquals(new
HashSet<>(Collections.singletonList(GroupState.ASSIGNING)), result);
+
+ result = StreamsGroupCommand.groupStatesFromString("RECONCILING");
+ assertEquals(new
HashSet<>(Collections.singletonList(GroupState.RECONCILING)), result);
+ result = StreamsGroupCommand.groupStatesFromString("reconCILING");
+ assertEquals(new
HashSet<>(Collections.singletonList(GroupState.RECONCILING)), result);
+
+ result = StreamsGroupCommand.groupStatesFromString("STABLE");
+ assertEquals(new
HashSet<>(Collections.singletonList(GroupState.STABLE)), result);
+ result = StreamsGroupCommand.groupStatesFromString("stable");
+ assertEquals(new
HashSet<>(Collections.singletonList(GroupState.STABLE)), result);
+
+ result = StreamsGroupCommand.groupStatesFromString("DEAD");
+ assertEquals(new
HashSet<>(Collections.singletonList(GroupState.DEAD)), result);
+ result = StreamsGroupCommand.groupStatesFromString("dead");
+ assertEquals(new
HashSet<>(Collections.singletonList(GroupState.DEAD)), result);
+
+ assertThrows(IllegalArgumentException.class, () ->
StreamsGroupCommand.groupStatesFromString("preparingRebalance"));
+
+ assertThrows(IllegalArgumentException.class, () ->
StreamsGroupCommand.groupStatesFromString("completingRebalance"));
+
+ assertThrows(IllegalArgumentException.class, () ->
StreamsGroupCommand.groupStatesFromString("bad, wrong"));
+
+ assertThrows(IllegalArgumentException.class, () ->
StreamsGroupCommand.groupStatesFromString(" bad, Stable"));
+
+ assertThrows(IllegalArgumentException.class, () ->
StreamsGroupCommand.groupStatesFromString(" , ,"));
+ }
+
+ StreamsGroupCommand.StreamsGroupService getStreamsGroupService(String[]
args, Admin adminClient) {
+ StreamsGroupCommandOptions opts = new StreamsGroupCommandOptions(args);
+ return new StreamsGroupCommand.StreamsGroupService(opts, adminClient);
+ }
+}