This is an automated email from the ASF dual-hosted git repository.

lucasbru pushed a commit to branch kip1071
in repository https://gitbox.apache.org/repos/asf/kafka.git


The following commit(s) were added to refs/heads/kip1071 by this push:
     new faf000be732 Implement kafka-streams-groups.sh --list (#18167)
faf000be732 is described below

commit faf000be732b6eff312f27191e9d95d888963dd3
Author: Alieh Saeedi <[email protected]>
AuthorDate: Mon Dec 16 11:58:05 2024 +0100

    Implement kafka-streams-groups.sh --list (#18167)
    
    Implement the core of kafka-streams-groups.sh
    Implement --list and its options: (only --state)
---
 bin/kafka-streams-groups.sh                        |  17 +++
 .../java/org/apache/kafka/common/GroupState.java   |   2 +-
 .../kafka/tools/streams/StreamsGroupCommand.java   | 170 +++++++++++++++++++++
 .../tools/streams/StreamsGroupCommandOptions.java  |  70 +++++++++
 .../tools/streams/StreamsGroupCommandUnitTest.java | 170 +++++++++++++++++++++
 5 files changed, 428 insertions(+), 1 deletion(-)

diff --git a/bin/kafka-streams-groups.sh b/bin/kafka-streams-groups.sh
new file mode 100755
index 00000000000..a204f4d2c74
--- /dev/null
+++ b/bin/kafka-streams-groups.sh
@@ -0,0 +1,17 @@
+#!/bin/bash
+# Licensed to the Apache Software Foundation (ASF) under one or more
+# contributor license agreements.  See the NOTICE file distributed with
+# this work for additional information regarding copyright ownership.
+# The ASF licenses this file to You under the Apache License, Version 2.0
+# (the "License"); you may not use this file except in compliance with
+# the License.  You may obtain a copy of the License at
+# 
+#    http://www.apache.org/licenses/LICENSE-2.0
+# 
+# Unless required by applicable law or agreed to in writing, software
+# distributed under the License is distributed on an "AS IS" BASIS,
+# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+# See the License for the specific language governing permissions and
+# limitations under the License.
+
+exec $(dirname $0)/kafka-run-class.sh 
org.apache.kafka.tools.streams.StreamsGroupCommand "$@"
diff --git a/clients/src/main/java/org/apache/kafka/common/GroupState.java 
b/clients/src/main/java/org/apache/kafka/common/GroupState.java
index cbeec4c7341..8e7a68c9689 100644
--- a/clients/src/main/java/org/apache/kafka/common/GroupState.java
+++ b/clients/src/main/java/org/apache/kafka/common/GroupState.java
@@ -82,7 +82,7 @@ public enum GroupState {
         } else if (type == GroupType.CONSUMER) {
             return Set.of(PREPARING_REBALANCE, COMPLETING_REBALANCE, STABLE, 
DEAD, EMPTY, ASSIGNING, RECONCILING);
         } else if (type == GroupType.STREAMS) {
-            return Set.of(PREPARING_REBALANCE, COMPLETING_REBALANCE, STABLE, 
DEAD, EMPTY, ASSIGNING, RECONCILING, NOT_READY);
+            return Set.of(STABLE, DEAD, EMPTY, ASSIGNING, RECONCILING, 
NOT_READY);
         } else if (type == GroupType.SHARE) {
             return Set.of(STABLE, DEAD, EMPTY);
         } else {
diff --git 
a/tools/src/main/java/org/apache/kafka/tools/streams/StreamsGroupCommand.java 
b/tools/src/main/java/org/apache/kafka/tools/streams/StreamsGroupCommand.java
new file mode 100644
index 00000000000..6c3d1a5631b
--- /dev/null
+++ 
b/tools/src/main/java/org/apache/kafka/tools/streams/StreamsGroupCommand.java
@@ -0,0 +1,170 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ *    http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.kafka.tools.streams;
+
+import org.apache.kafka.clients.CommonClientConfigs;
+import org.apache.kafka.clients.admin.Admin;
+import org.apache.kafka.clients.admin.GroupListing;
+import org.apache.kafka.clients.admin.ListGroupsOptions;
+import org.apache.kafka.clients.admin.ListGroupsResult;
+import org.apache.kafka.common.GroupState;
+import org.apache.kafka.common.GroupType;
+import org.apache.kafka.common.utils.Utils;
+import org.apache.kafka.server.util.CommandLineUtils;
+
+import java.io.IOException;
+import java.util.ArrayList;
+import java.util.Arrays;
+import java.util.Collection;
+import java.util.List;
+import java.util.Map;
+import java.util.Optional;
+import java.util.Properties;
+import java.util.Set;
+import java.util.concurrent.ExecutionException;
+import java.util.stream.Collectors;
+import java.util.stream.Stream;
+
+import joptsimple.OptionException;
+
+public class StreamsGroupCommand {
+
+    public static void main(String[] args) {
+        StreamsGroupCommandOptions opts = new StreamsGroupCommandOptions(args);
+        try {
+            opts.checkArgs();
+
+            CommandLineUtils.maybePrintHelpOrVersion(opts, "This tool helps to 
list all streams groups.");
+
+            // should have exactly one action
+            long actions = 
Stream.of(opts.listOpt).filter(opts.options::has).count();
+            if (actions != 1)
+                CommandLineUtils.printUsageAndExit(opts.parser, "Command must 
include exactly one action: --list.");
+
+            run(opts);
+        } catch (OptionException e) {
+            CommandLineUtils.printUsageAndExit(opts.parser, e.getMessage());
+        }
+    }
+
+    public static void run(StreamsGroupCommandOptions opts) {
+        try (StreamsGroupService streamsGroupService = new 
StreamsGroupService(opts, Map.of())) {
+            if (opts.options.has(opts.listOpt)) {
+                streamsGroupService.listGroups();
+            } else {
+                throw new IllegalArgumentException("Unknown action!");
+            }
+        } catch (IllegalArgumentException e) {
+            CommandLineUtils.printUsageAndExit(opts.parser, e.getMessage());
+        } catch (Throwable e) {
+            printError("Executing streams group command failed due to " + 
e.getMessage(), Optional.of(e));
+        }
+    }
+
+    static Set<GroupState> groupStatesFromString(String input) {
+        Set<GroupState> parsedStates =
+            Arrays.stream(input.split(",")).map(s -> 
GroupState.parse(s.trim())).collect(Collectors.toSet());
+        Set<GroupState> validStates = 
GroupState.groupStatesForType(GroupType.STREAMS);
+        if (!validStates.containsAll(parsedStates)) {
+            throw new IllegalArgumentException("Invalid state list '" + input 
+ "'. Valid states are: " +
+                    
validStates.stream().map(GroupState::toString).collect(Collectors.joining(", 
")));
+        }
+        return parsedStates;
+    }
+
+    public static void printError(String msg, Optional<Throwable> e) {
+        System.out.println("\nError: " + msg);
+        e.ifPresent(Throwable::printStackTrace);
+    }
+
+    // Visibility for testing
+    static class StreamsGroupService implements AutoCloseable {
+        final StreamsGroupCommandOptions opts;
+        private final Admin adminClient;
+
+        public StreamsGroupService(StreamsGroupCommandOptions opts, 
Map<String, String> configOverrides) {
+            this.opts = opts;
+            try {
+                this.adminClient = createAdminClient(configOverrides);
+            } catch (IOException e) {
+                throw new RuntimeException(e);
+            }
+        }
+
+        public StreamsGroupService(StreamsGroupCommandOptions opts, Admin 
adminClient) {
+            this.opts = opts;
+            this.adminClient = adminClient;
+        }
+
+        public void listGroups() throws ExecutionException, 
InterruptedException {
+            if (opts.options.has(opts.stateOpt)) {
+                String stateValue = opts.options.valueOf(opts.stateOpt);
+                Set<GroupState> states = (stateValue == null || 
stateValue.isEmpty())
+                    ? Set.of()
+                    : groupStatesFromString(stateValue);
+                List<GroupListing> listings = 
listStreamsGroupsInStates(states);
+                printGroupInfo(listings);
+            } else
+                listStreamsGroups().forEach(System.out::println);
+        }
+
+        List<String> listStreamsGroups() {
+            try {
+                ListGroupsResult result = adminClient.listGroups(new 
ListGroupsOptions()
+                    
.timeoutMs(opts.options.valueOf(opts.timeoutMsOpt).intValue())
+                    .withTypes(Set.of(GroupType.STREAMS)));
+                Collection<GroupListing> listings = result.all().get();
+                return 
listings.stream().map(GroupListing::groupId).collect(Collectors.toList());
+            } catch (InterruptedException | ExecutionException e) {
+                throw new RuntimeException(e);
+            }
+        }
+
+        List<GroupListing> listStreamsGroupsInStates(Set<GroupState> states) 
throws ExecutionException, InterruptedException {
+            ListGroupsResult result = adminClient.listGroups(new 
ListGroupsOptions()
+                .timeoutMs(opts.options.valueOf(opts.timeoutMsOpt).intValue())
+                .withTypes(Set.of(GroupType.STREAMS))
+                .inGroupStates(states));
+            return new ArrayList<>(result.all().get());
+        }
+
+        private void printGroupInfo(List<GroupListing> groups) {
+            // find proper columns width
+            int maxGroupLen = 15;
+            for (GroupListing group : groups) {
+                maxGroupLen = Math.max(maxGroupLen, group.groupId().length());
+            }
+            System.out.printf("%" + (-maxGroupLen) + "s %s\n", "GROUP", 
"STATE");
+            for (GroupListing group : groups) {
+                String groupId = group.groupId();
+                String state = 
group.groupState().orElse(GroupState.UNKNOWN).toString();
+                System.out.printf("%" + (-maxGroupLen) + "s %s\n", groupId, 
state);
+            }
+        }
+
+        public void close() {
+            adminClient.close();
+        }
+
+        protected Admin createAdminClient(Map<String, String> configOverrides) 
throws IOException {
+            Properties props = opts.options.has(opts.commandConfigOpt) ? 
Utils.loadProps(opts.options.valueOf(opts.commandConfigOpt)) : new Properties();
+            props.put(CommonClientConfigs.BOOTSTRAP_SERVERS_CONFIG, 
opts.options.valueOf(opts.bootstrapServerOpt));
+            props.putAll(configOverrides);
+            return Admin.create(props);
+        }
+    }
+}
diff --git 
a/tools/src/main/java/org/apache/kafka/tools/streams/StreamsGroupCommandOptions.java
 
b/tools/src/main/java/org/apache/kafka/tools/streams/StreamsGroupCommandOptions.java
new file mode 100644
index 00000000000..ced73b44bf1
--- /dev/null
+++ 
b/tools/src/main/java/org/apache/kafka/tools/streams/StreamsGroupCommandOptions.java
@@ -0,0 +1,70 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ *    http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.kafka.tools.streams;
+
+import org.apache.kafka.server.util.CommandDefaultOptions;
+import org.apache.kafka.server.util.CommandLineUtils;
+
+import joptsimple.OptionSpec;
+
+public class StreamsGroupCommandOptions extends CommandDefaultOptions {
+    public static final String BOOTSTRAP_SERVER_DOC = "REQUIRED: The server(s) 
to connect to.";
+    public static final String LIST_DOC = "List all streams groups.";
+    public static final String TIMEOUT_MS_DOC = "The timeout that can be set 
for some use cases. For example, it can be used when describing the group " +
+        "to specify the maximum amount of time in milliseconds to wait before 
the group stabilizes.";
+    public static final String COMMAND_CONFIG_DOC = "Property file containing 
configs to be passed to Admin Client.";
+    public static final String STATE_DOC = "When specified with '--list', it 
displays the state of all groups. It can also be used to list groups with 
specific states. " +
+        "Valid values are Empty, NotReady, Stable, Assigning, Reconciling, and 
Dead.";
+
+    public final OptionSpec<String> bootstrapServerOpt;
+    public final OptionSpec<Void> listOpt;
+    public final OptionSpec<Long> timeoutMsOpt;
+    public final OptionSpec<String> commandConfigOpt;
+    public final OptionSpec<String> stateOpt;
+
+
+    public StreamsGroupCommandOptions(String[] args) {
+        super(args);
+
+        bootstrapServerOpt = parser.accepts("bootstrap-server", 
BOOTSTRAP_SERVER_DOC)
+            .withRequiredArg()
+            .describedAs("server to connect to")
+            .ofType(String.class);
+        listOpt = parser.accepts("list", LIST_DOC);
+        timeoutMsOpt = parser.accepts("timeout", TIMEOUT_MS_DOC)
+            .withRequiredArg()
+            .describedAs("timeout (ms)")
+            .ofType(Long.class)
+            .defaultsTo(5000L);
+        commandConfigOpt = parser.accepts("command-config", COMMAND_CONFIG_DOC)
+            .withRequiredArg()
+            .describedAs("command config property file")
+            .ofType(String.class);
+        stateOpt = parser.accepts("state", STATE_DOC)
+            .availableIf(listOpt)
+            .withOptionalArg()
+            .ofType(String.class);
+
+        options = parser.parse(args);
+    }
+
+    public void checkArgs() {
+        CommandLineUtils.maybePrintHelpOrVersion(this, "This tool helps to 
list streams groups.");
+
+        CommandLineUtils.checkRequiredArgs(parser, options, 
bootstrapServerOpt);
+    }
+}
diff --git 
a/tools/src/test/java/org/apache/kafka/tools/streams/StreamsGroupCommandUnitTest.java
 
b/tools/src/test/java/org/apache/kafka/tools/streams/StreamsGroupCommandUnitTest.java
new file mode 100644
index 00000000000..ff84df127aa
--- /dev/null
+++ 
b/tools/src/test/java/org/apache/kafka/tools/streams/StreamsGroupCommandUnitTest.java
@@ -0,0 +1,170 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ *    http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.kafka.tools.streams;
+
+import org.apache.kafka.clients.admin.Admin;
+import org.apache.kafka.clients.admin.GroupListing;
+import org.apache.kafka.clients.admin.KafkaAdminClient;
+import org.apache.kafka.clients.admin.ListGroupsOptions;
+import org.apache.kafka.clients.admin.ListGroupsResult;
+import org.apache.kafka.clients.admin.MockAdminClient;
+import org.apache.kafka.common.GroupState;
+import org.apache.kafka.common.GroupType;
+import org.apache.kafka.common.KafkaFuture;
+import org.apache.kafka.test.TestUtils;
+
+import org.junit.jupiter.api.Test;
+
+import java.util.Arrays;
+import java.util.Collections;
+import java.util.HashSet;
+import java.util.Objects;
+import java.util.Optional;
+import java.util.Set;
+
+import joptsimple.OptionException;
+
+import static org.junit.jupiter.api.Assertions.assertEquals;
+import static org.junit.jupiter.api.Assertions.assertThrows;
+import static org.mockito.ArgumentMatchers.any;
+import static org.mockito.Mockito.mock;
+import static org.mockito.Mockito.when;
+
+public class StreamsGroupCommandUnitTest {
+
+    @Test
+    public void testListStreamsGroups() throws Exception {
+        String firstGroup = "first-group";
+        String secondGroup = "second-group";
+        String bootstrapServer = "localhost:9092";
+
+        String[] cgcArgs = new String[]{"--bootstrap-server", bootstrapServer, 
"--list"};
+        Admin adminClient = mock(KafkaAdminClient.class);
+        ListGroupsResult result = mock(ListGroupsResult.class);
+        
when(result.all()).thenReturn(KafkaFuture.completedFuture(Arrays.asList(
+                new GroupListing(firstGroup, Optional.of(GroupType.STREAMS), 
"streams", Optional.of(GroupState.STABLE)),
+                new GroupListing(secondGroup, Optional.of(GroupType.STREAMS), 
"streams", Optional.of(GroupState.EMPTY))
+        )));
+        
when(adminClient.listGroups(any(ListGroupsOptions.class))).thenReturn(result);
+        StreamsGroupCommand.StreamsGroupService service = 
getStreamsGroupService(cgcArgs, adminClient);
+        Set<String> expectedGroups = new HashSet<>(Arrays.asList(firstGroup, 
secondGroup));
+
+        final Set[] foundGroups = new Set[]{Collections.emptySet()};
+        TestUtils.waitForCondition(() -> {
+            foundGroups[0] = new HashSet<>(service.listStreamsGroups());
+            return Objects.equals(expectedGroups, foundGroups[0]);
+        }, "Expected --list to show groups " + expectedGroups + ", but found " 
+ foundGroups[0] + ".");
+        service.close();
+    }
+
+    @Test
+    public void testListWithUnrecognizedOption() {
+        String bootstrapServer = "localhost:9092";
+        String[] cgcArgs = new String[]{"--frivolous-nonsense", 
"--bootstrap-server", bootstrapServer, "--list"};
+        assertThrows(OptionException.class, () -> 
getStreamsGroupService(cgcArgs, new MockAdminClient()));
+    }
+
+    @Test
+    public void testListStreamsGroupsWithStates() throws Exception {
+        String firstGroup = "first-group";
+        String secondGroup = "second-group";
+        String bootstrapServer = "localhost:9092";
+
+        String[] cgcArgs = new String[]{"--bootstrap-server", bootstrapServer, 
"--list", "--state"};
+        Admin adminClient = mock(KafkaAdminClient.class);
+        ListGroupsResult resultWithAllStates = mock(ListGroupsResult.class);
+        
when(resultWithAllStates.all()).thenReturn(KafkaFuture.completedFuture(Arrays.asList(
+                new GroupListing(firstGroup, Optional.of(GroupType.STREAMS), 
"streams", Optional.of(GroupState.STABLE)),
+                new GroupListing(secondGroup, Optional.of(GroupType.STREAMS), 
"streams", Optional.of(GroupState.EMPTY))
+        )));
+        
when(adminClient.listGroups(any(ListGroupsOptions.class))).thenReturn(resultWithAllStates);
+        StreamsGroupCommand.StreamsGroupService service = 
getStreamsGroupService(cgcArgs, adminClient);
+        Set<GroupListing> expectedListing = new HashSet<>(Arrays.asList(
+                new GroupListing(firstGroup, Optional.of(GroupType.STREAMS), 
"streams", Optional.of(GroupState.STABLE)),
+                new GroupListing(secondGroup, Optional.of(GroupType.STREAMS), 
"streams", Optional.of(GroupState.EMPTY))));
+
+        final Set[] foundListing = new Set[]{Collections.emptySet()};
+        TestUtils.waitForCondition(() -> {
+            foundListing[0] = new 
HashSet<>(service.listStreamsGroupsInStates(new 
HashSet<>(Arrays.asList(GroupState.values()))));
+            return Objects.equals(expectedListing, foundListing[0]);
+        }, "Expected to show groups " + expectedListing + ", but found " + 
foundListing[0]);
+
+        ListGroupsResult resultWithStableState = mock(ListGroupsResult.class);
+        
when(resultWithStableState.all()).thenReturn(KafkaFuture.completedFuture(Collections.singletonList(
+                new GroupListing(firstGroup, Optional.of(GroupType.STREAMS), 
"streams", Optional.of(GroupState.STABLE))
+        )));
+        
when(adminClient.listGroups(any(ListGroupsOptions.class))).thenReturn(resultWithStableState);
+        Set<GroupListing> expectedListingStable = Collections.singleton(
+                new GroupListing(firstGroup, Optional.of(GroupType.STREAMS), 
"streams", Optional.of(GroupState.STABLE)));
+
+        foundListing[0] = Collections.emptySet();
+
+        TestUtils.waitForCondition(() -> {
+            foundListing[0] = new 
HashSet<>(service.listStreamsGroupsInStates(Collections.singleton(GroupState.STABLE)));
+            return Objects.equals(expectedListingStable, foundListing[0]);
+        }, "Expected to show groups " + expectedListingStable + ", but found " 
+ foundListing[0]);
+        service.close();
+    }
+
+    @Test
+    public void testGroupStatesFromString() {
+        Set<GroupState> result = 
StreamsGroupCommand.groupStatesFromString("empty");
+        assertEquals(new 
HashSet<>(Collections.singletonList(GroupState.EMPTY)), result);
+        result = StreamsGroupCommand.groupStatesFromString("EMPTY");
+        assertEquals(new 
HashSet<>(Collections.singletonList(GroupState.EMPTY)), result);
+
+        result = StreamsGroupCommand.groupStatesFromString("notready");
+        assertEquals(new 
HashSet<>(Collections.singletonList(GroupState.NOT_READY)), result);
+        result = StreamsGroupCommand.groupStatesFromString("notReady");
+        assertEquals(new 
HashSet<>(Collections.singletonList(GroupState.NOT_READY)), result);
+
+        result = StreamsGroupCommand.groupStatesFromString("assigning");
+        assertEquals(new 
HashSet<>(Collections.singletonList(GroupState.ASSIGNING)), result);
+        result = StreamsGroupCommand.groupStatesFromString("ASSIGNING");
+        assertEquals(new 
HashSet<>(Collections.singletonList(GroupState.ASSIGNING)), result);
+
+        result = StreamsGroupCommand.groupStatesFromString("RECONCILING");
+        assertEquals(new 
HashSet<>(Collections.singletonList(GroupState.RECONCILING)), result);
+        result = StreamsGroupCommand.groupStatesFromString("reconCILING");
+        assertEquals(new 
HashSet<>(Collections.singletonList(GroupState.RECONCILING)), result);
+
+        result = StreamsGroupCommand.groupStatesFromString("STABLE");
+        assertEquals(new 
HashSet<>(Collections.singletonList(GroupState.STABLE)), result);
+        result = StreamsGroupCommand.groupStatesFromString("stable");
+        assertEquals(new 
HashSet<>(Collections.singletonList(GroupState.STABLE)), result);
+
+        result = StreamsGroupCommand.groupStatesFromString("DEAD");
+        assertEquals(new 
HashSet<>(Collections.singletonList(GroupState.DEAD)), result);
+        result = StreamsGroupCommand.groupStatesFromString("dead");
+        assertEquals(new 
HashSet<>(Collections.singletonList(GroupState.DEAD)), result);
+
+        assertThrows(IllegalArgumentException.class, () -> 
StreamsGroupCommand.groupStatesFromString("preparingRebalance"));
+
+        assertThrows(IllegalArgumentException.class, () -> 
StreamsGroupCommand.groupStatesFromString("completingRebalance"));
+
+        assertThrows(IllegalArgumentException.class, () -> 
StreamsGroupCommand.groupStatesFromString("bad, wrong"));
+
+        assertThrows(IllegalArgumentException.class, () -> 
StreamsGroupCommand.groupStatesFromString("  bad, Stable"));
+
+        assertThrows(IllegalArgumentException.class, () -> 
StreamsGroupCommand.groupStatesFromString("   ,   ,"));
+    }
+
+    StreamsGroupCommand.StreamsGroupService getStreamsGroupService(String[] 
args, Admin adminClient) {
+        StreamsGroupCommandOptions opts = new StreamsGroupCommandOptions(args);
+        return new StreamsGroupCommand.StreamsGroupService(opts, adminClient);
+    }
+}

Reply via email to