This is an automated email from the ASF dual-hosted git repository.

cadonna pushed a commit to branch trunk
in repository https://gitbox.apache.org/repos/asf/kafka.git


The following commit(s) were added to refs/heads/trunk by this push:
     new fa62bce63df KAFKA-18287: Add support for kafka-streams-groups.sh 
--list (#19422)
fa62bce63df is described below

commit fa62bce63df7b48a0af37be9625f952116ad35c5
Author: Alieh Saeedi <107070585+aliehsaee...@users.noreply.github.com>
AuthorDate: Wed Apr 9 17:57:34 2025 +0200

    KAFKA-18287: Add support for kafka-streams-groups.sh --list (#19422)
    
    Implement the core of kafka-streams-groups.sh for `KIP-1071`
    - Implement `--list` and its options: (only `--state`)
    
    Reviewers: Bruno Cadonna <cado...@apache.org>
---
 bin/kafka-streams-groups.sh                        |  17 ++
 .../kafka/tools/streams/StreamsGroupCommand.java   | 168 +++++++++++++++++++
 .../tools/streams/StreamsGroupCommandOptions.java  |  70 ++++++++
 .../tools/streams/StreamsGroupCommandTest.java     | 185 +++++++++++++++++++++
 4 files changed, 440 insertions(+)

diff --git a/bin/kafka-streams-groups.sh b/bin/kafka-streams-groups.sh
new file mode 100755
index 00000000000..a204f4d2c74
--- /dev/null
+++ b/bin/kafka-streams-groups.sh
@@ -0,0 +1,17 @@
+#!/bin/bash
+# Licensed to the Apache Software Foundation (ASF) under one or more
+# contributor license agreements.  See the NOTICE file distributed with
+# this work for additional information regarding copyright ownership.
+# The ASF licenses this file to You under the Apache License, Version 2.0
+# (the "License"); you may not use this file except in compliance with
+# the License.  You may obtain a copy of the License at
+# 
+#    http://www.apache.org/licenses/LICENSE-2.0
+# 
+# Unless required by applicable law or agreed to in writing, software
+# distributed under the License is distributed on an "AS IS" BASIS,
+# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+# See the License for the specific language governing permissions and
+# limitations under the License.
+
+exec $(dirname $0)/kafka-run-class.sh 
org.apache.kafka.tools.streams.StreamsGroupCommand "$@"
diff --git 
a/tools/src/main/java/org/apache/kafka/tools/streams/StreamsGroupCommand.java 
b/tools/src/main/java/org/apache/kafka/tools/streams/StreamsGroupCommand.java
new file mode 100644
index 00000000000..91a22e87281
--- /dev/null
+++ 
b/tools/src/main/java/org/apache/kafka/tools/streams/StreamsGroupCommand.java
@@ -0,0 +1,168 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ *    http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.kafka.tools.streams;
+
+import org.apache.kafka.clients.CommonClientConfigs;
+import org.apache.kafka.clients.admin.Admin;
+import org.apache.kafka.clients.admin.GroupListing;
+import org.apache.kafka.clients.admin.ListGroupsOptions;
+import org.apache.kafka.clients.admin.ListGroupsResult;
+import org.apache.kafka.common.GroupState;
+import org.apache.kafka.common.GroupType;
+import org.apache.kafka.common.utils.Utils;
+import org.apache.kafka.server.util.CommandLineUtils;
+
+import java.io.IOException;
+import java.util.ArrayList;
+import java.util.Arrays;
+import java.util.Collection;
+import java.util.List;
+import java.util.Map;
+import java.util.Optional;
+import java.util.Properties;
+import java.util.Set;
+import java.util.concurrent.ExecutionException;
+import java.util.stream.Collectors;
+import java.util.stream.Stream;
+
+import joptsimple.OptionException;
+
+public class StreamsGroupCommand {
+
+    public static void main(String[] args) {
+        StreamsGroupCommandOptions opts = new StreamsGroupCommandOptions(args);
+        try {
+            opts.checkArgs();
+
+            // should have exactly one action
+            long numberOfActions = 
Stream.of(opts.listOpt).filter(opts.options::has).count();
+            if (numberOfActions != 1)
+                CommandLineUtils.printUsageAndExit(opts.parser, "Command must 
include exactly one action: --list.");
+
+            run(opts);
+        } catch (OptionException e) {
+            CommandLineUtils.printUsageAndExit(opts.parser, e.getMessage());
+        }
+    }
+
+    public static void run(StreamsGroupCommandOptions opts) {
+        try (StreamsGroupService streamsGroupService = new 
StreamsGroupService(opts, Map.of())) {
+            if (opts.options.has(opts.listOpt)) {
+                streamsGroupService.listGroups();
+            } else {
+                throw new IllegalArgumentException("Unknown action!");
+            }
+        } catch (IllegalArgumentException e) {
+            CommandLineUtils.printUsageAndExit(opts.parser, e.getMessage());
+        } catch (Throwable e) {
+            printError("Executing streams group command failed due to " + 
e.getMessage(), Optional.of(e));
+        }
+    }
+
+    static Set<GroupState> groupStatesFromString(String input) {
+        Set<GroupState> parsedStates =
+            Arrays.stream(input.split(",")).map(s -> 
GroupState.parse(s.trim())).collect(Collectors.toSet());
+        Set<GroupState> validStates = 
GroupState.groupStatesForType(GroupType.STREAMS);
+        if (!validStates.containsAll(parsedStates)) {
+            throw new IllegalArgumentException("Invalid state list '" + input 
+ "'. Valid states are: " +
+                    
validStates.stream().map(GroupState::toString).collect(Collectors.joining(", 
")));
+        }
+        return parsedStates;
+    }
+
+    public static void printError(String msg, Optional<Throwable> e) {
+        System.out.println("\nError: " + msg);
+        e.ifPresent(Throwable::printStackTrace);
+    }
+
+    // Visibility for testing
+    static class StreamsGroupService implements AutoCloseable {
+        final StreamsGroupCommandOptions opts;
+        private final Admin adminClient;
+
+        public StreamsGroupService(StreamsGroupCommandOptions opts, 
Map<String, String> configOverrides) {
+            this.opts = opts;
+            try {
+                this.adminClient = createAdminClient(configOverrides);
+            } catch (IOException e) {
+                throw new RuntimeException(e);
+            }
+        }
+
+        public StreamsGroupService(StreamsGroupCommandOptions opts, Admin 
adminClient) {
+            this.opts = opts;
+            this.adminClient = adminClient;
+        }
+
+        public void listGroups() throws ExecutionException, 
InterruptedException {
+            if (opts.options.has(opts.stateOpt)) {
+                String stateValue = opts.options.valueOf(opts.stateOpt);
+                Set<GroupState> states = (stateValue == null || 
stateValue.isEmpty())
+                    ? Set.of()
+                    : groupStatesFromString(stateValue);
+                List<GroupListing> listings = 
listStreamsGroupsInStates(states);
+                printGroupInfo(listings);
+            } else
+                listStreamsGroups().forEach(System.out::println);
+        }
+
+        List<String> listStreamsGroups() {
+            try {
+                ListGroupsResult result = adminClient.listGroups(new 
ListGroupsOptions()
+                    
.timeoutMs(opts.options.valueOf(opts.timeoutMsOpt).intValue())
+                    .withTypes(Set.of(GroupType.STREAMS)));
+                Collection<GroupListing> listings = result.all().get();
+                return 
listings.stream().map(GroupListing::groupId).collect(Collectors.toList());
+            } catch (InterruptedException | ExecutionException e) {
+                throw new RuntimeException(e);
+            }
+        }
+
+        List<GroupListing> listStreamsGroupsInStates(Set<GroupState> states) 
throws ExecutionException, InterruptedException {
+            ListGroupsResult result = adminClient.listGroups(new 
ListGroupsOptions()
+                .timeoutMs(opts.options.valueOf(opts.timeoutMsOpt).intValue())
+                .withTypes(Set.of(GroupType.STREAMS))
+                .inGroupStates(states));
+            return new ArrayList<>(result.all().get());
+        }
+
+        private void printGroupInfo(List<GroupListing> groups) {
+            // find proper columns width
+            int maxGroupLen = 15;
+            for (GroupListing group : groups) {
+                maxGroupLen = Math.max(maxGroupLen, group.groupId().length());
+            }
+            System.out.printf("%" + (-maxGroupLen) + "s %s\n", "GROUP", 
"STATE");
+            for (GroupListing group : groups) {
+                String groupId = group.groupId();
+                String state = 
group.groupState().orElse(GroupState.UNKNOWN).toString();
+                System.out.printf("%" + (-maxGroupLen) + "s %s\n", groupId, 
state);
+            }
+        }
+
+        public void close() {
+            adminClient.close();
+        }
+
+        protected Admin createAdminClient(Map<String, String> configOverrides) 
throws IOException {
+            Properties props = opts.options.has(opts.commandConfigOpt) ? 
Utils.loadProps(opts.options.valueOf(opts.commandConfigOpt)) : new Properties();
+            props.put(CommonClientConfigs.BOOTSTRAP_SERVERS_CONFIG, 
opts.options.valueOf(opts.bootstrapServerOpt));
+            props.putAll(configOverrides);
+            return Admin.create(props);
+        }
+    }
+}
diff --git 
a/tools/src/main/java/org/apache/kafka/tools/streams/StreamsGroupCommandOptions.java
 
b/tools/src/main/java/org/apache/kafka/tools/streams/StreamsGroupCommandOptions.java
new file mode 100644
index 00000000000..ced73b44bf1
--- /dev/null
+++ 
b/tools/src/main/java/org/apache/kafka/tools/streams/StreamsGroupCommandOptions.java
@@ -0,0 +1,70 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ *    http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.kafka.tools.streams;
+
+import org.apache.kafka.server.util.CommandDefaultOptions;
+import org.apache.kafka.server.util.CommandLineUtils;
+
+import joptsimple.OptionSpec;
+
+public class StreamsGroupCommandOptions extends CommandDefaultOptions {
+    public static final String BOOTSTRAP_SERVER_DOC = "REQUIRED: The server(s) 
to connect to.";
+    public static final String LIST_DOC = "List all streams groups.";
+    public static final String TIMEOUT_MS_DOC = "The timeout that can be set 
for some use cases. For example, it can be used when describing the group " +
+        "to specify the maximum amount of time in milliseconds to wait before 
the group stabilizes.";
+    public static final String COMMAND_CONFIG_DOC = "Property file containing 
configs to be passed to Admin Client.";
+    public static final String STATE_DOC = "When specified with '--list', it 
displays the state of all groups. It can also be used to list groups with 
specific states. " +
+        "Valid values are Empty, NotReady, Stable, Assigning, Reconciling, and 
Dead.";
+
+    public final OptionSpec<String> bootstrapServerOpt;
+    public final OptionSpec<Void> listOpt;
+    public final OptionSpec<Long> timeoutMsOpt;
+    public final OptionSpec<String> commandConfigOpt;
+    public final OptionSpec<String> stateOpt;
+
+
+    public StreamsGroupCommandOptions(String[] args) {
+        super(args);
+
+        bootstrapServerOpt = parser.accepts("bootstrap-server", 
BOOTSTRAP_SERVER_DOC)
+            .withRequiredArg()
+            .describedAs("server to connect to")
+            .ofType(String.class);
+        listOpt = parser.accepts("list", LIST_DOC);
+        timeoutMsOpt = parser.accepts("timeout", TIMEOUT_MS_DOC)
+            .withRequiredArg()
+            .describedAs("timeout (ms)")
+            .ofType(Long.class)
+            .defaultsTo(5000L);
+        commandConfigOpt = parser.accepts("command-config", COMMAND_CONFIG_DOC)
+            .withRequiredArg()
+            .describedAs("command config property file")
+            .ofType(String.class);
+        stateOpt = parser.accepts("state", STATE_DOC)
+            .availableIf(listOpt)
+            .withOptionalArg()
+            .ofType(String.class);
+
+        options = parser.parse(args);
+    }
+
+    public void checkArgs() {
+        CommandLineUtils.maybePrintHelpOrVersion(this, "This tool helps to 
list streams groups.");
+
+        CommandLineUtils.checkRequiredArgs(parser, options, 
bootstrapServerOpt);
+    }
+}
diff --git 
a/tools/src/test/java/org/apache/kafka/tools/streams/StreamsGroupCommandTest.java
 
b/tools/src/test/java/org/apache/kafka/tools/streams/StreamsGroupCommandTest.java
new file mode 100644
index 00000000000..b97bdeb753c
--- /dev/null
+++ 
b/tools/src/test/java/org/apache/kafka/tools/streams/StreamsGroupCommandTest.java
@@ -0,0 +1,185 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ *    http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.kafka.tools.streams;
+
+import org.apache.kafka.clients.admin.Admin;
+import org.apache.kafka.clients.admin.GroupListing;
+import org.apache.kafka.clients.admin.KafkaAdminClient;
+import org.apache.kafka.clients.admin.ListGroupsOptions;
+import org.apache.kafka.clients.admin.ListGroupsResult;
+import org.apache.kafka.clients.admin.MockAdminClient;
+import org.apache.kafka.common.GroupState;
+import org.apache.kafka.common.GroupType;
+import org.apache.kafka.common.KafkaFuture;
+import org.apache.kafka.test.TestUtils;
+
+import org.junit.jupiter.api.Test;
+
+import java.util.Arrays;
+import java.util.HashSet;
+import java.util.List;
+import java.util.Objects;
+import java.util.Optional;
+import java.util.Set;
+import java.util.stream.Collectors;
+
+import joptsimple.OptionException;
+
+import static org.junit.jupiter.api.Assertions.assertEquals;
+import static org.junit.jupiter.api.Assertions.assertThrows;
+import static org.junit.jupiter.api.Assertions.assertTrue;
+import static org.mockito.ArgumentMatchers.any;
+import static org.mockito.Mockito.mock;
+import static org.mockito.Mockito.when;
+
+public class StreamsGroupCommandTest {
+
+    @Test
+    public void testListStreamsGroups() throws Exception {
+        String firstGroup = "first-group";
+        String secondGroup = "second-group";
+        String bootstrapServer = "localhost:9092";
+
+        String[] cgcArgs = new String[]{"--bootstrap-server", bootstrapServer, 
"--list"};
+        Admin adminClient = mock(KafkaAdminClient.class);
+        ListGroupsResult result = mock(ListGroupsResult.class);
+        
when(result.all()).thenReturn(KafkaFuture.completedFuture(Arrays.asList(
+                new GroupListing(firstGroup, Optional.of(GroupType.STREAMS), 
"streams", Optional.of(GroupState.STABLE)),
+                new GroupListing(secondGroup, Optional.of(GroupType.STREAMS), 
"streams", Optional.of(GroupState.EMPTY))
+        )));
+        
when(adminClient.listGroups(any(ListGroupsOptions.class))).thenReturn(result);
+        StreamsGroupCommand.StreamsGroupService service = 
getStreamsGroupService(cgcArgs, adminClient);
+        Set<String> expectedGroups = new HashSet<>(Arrays.asList(firstGroup, 
secondGroup));
+
+        final Set[] foundGroups = new Set[]{Set.of()};
+        TestUtils.waitForCondition(() -> {
+            foundGroups[0] = new HashSet<>(service.listStreamsGroups());
+            return Objects.equals(expectedGroups, foundGroups[0]);
+        }, "Expected --list to show groups " + expectedGroups + ", but found " 
+ foundGroups[0] + ".");
+        service.close();
+    }
+
+    @Test
+    public void testListWithUnrecognizedOption() {
+        String bootstrapServer = "localhost:9092";
+        String[] cgcArgs = new String[]{"--frivolous-nonsense", 
"--bootstrap-server", bootstrapServer, "--list"};
+        final Exception exception = assertThrows(OptionException.class, () -> {
+            getStreamsGroupService(cgcArgs, new MockAdminClient());
+        });
+        assertEquals("frivolous-nonsense is not a recognized option", 
exception.getMessage());
+    }
+
+    @Test
+    public void testListStreamsGroupsWithStates() throws Exception {
+        String firstGroup = "first-group";
+        String secondGroup = "second-group";
+        String bootstrapServer = "localhost:9092";
+
+        String[] cgcArgs = new String[]{"--bootstrap-server", bootstrapServer, 
"--list", "--state"};
+        Admin adminClient = mock(KafkaAdminClient.class);
+        ListGroupsResult resultWithAllStates = mock(ListGroupsResult.class);
+        
when(resultWithAllStates.all()).thenReturn(KafkaFuture.completedFuture(Arrays.asList(
+                new GroupListing(firstGroup, Optional.of(GroupType.STREAMS), 
"streams", Optional.of(GroupState.STABLE)),
+                new GroupListing(secondGroup, Optional.of(GroupType.STREAMS), 
"streams", Optional.of(GroupState.EMPTY))
+        )));
+        
when(adminClient.listGroups(any(ListGroupsOptions.class))).thenReturn(resultWithAllStates);
+        StreamsGroupCommand.StreamsGroupService service = 
getStreamsGroupService(cgcArgs, adminClient);
+        Set<GroupListing> expectedListing = new HashSet<>(Arrays.asList(
+                new GroupListing(firstGroup, Optional.of(GroupType.STREAMS), 
"streams", Optional.of(GroupState.STABLE)),
+                new GroupListing(secondGroup, Optional.of(GroupType.STREAMS), 
"streams", Optional.of(GroupState.EMPTY))));
+
+        final Set[] foundListing = new Set[]{Set.of()};
+        TestUtils.waitForCondition(() -> {
+            foundListing[0] = new 
HashSet<>(service.listStreamsGroupsInStates(new 
HashSet<>(Arrays.asList(GroupState.values()))));
+            return Objects.equals(expectedListing, foundListing[0]);
+        }, "Expected to show groups " + expectedListing + ", but found " + 
foundListing[0]);
+
+        ListGroupsResult resultWithStableState = mock(ListGroupsResult.class);
+        
when(resultWithStableState.all()).thenReturn(KafkaFuture.completedFuture(List.of(
+                new GroupListing(firstGroup, Optional.of(GroupType.STREAMS), 
"streams", Optional.of(GroupState.STABLE))
+        )));
+        
when(adminClient.listGroups(any(ListGroupsOptions.class))).thenReturn(resultWithStableState);
+        Set<GroupListing> expectedListingStable = Set.of(
+                new GroupListing(firstGroup, Optional.of(GroupType.STREAMS), 
"streams", Optional.of(GroupState.STABLE)));
+
+        foundListing[0] = Set.of();
+
+        TestUtils.waitForCondition(() -> {
+            foundListing[0] = new 
HashSet<>(service.listStreamsGroupsInStates(Set.of(GroupState.STABLE)));
+            return Objects.equals(expectedListingStable, foundListing[0]);
+        }, "Expected to show groups " + expectedListingStable + ", but found " 
+ foundListing[0]);
+        service.close();
+    }
+
+    @Test
+    public void testGroupStatesFromString() {
+        Set<GroupState> result = 
StreamsGroupCommand.groupStatesFromString("empty");
+        assertEquals(new HashSet<>(List.of(GroupState.EMPTY)), result);
+        result = StreamsGroupCommand.groupStatesFromString("EMPTY");
+        assertEquals(new HashSet<>(List.of(GroupState.EMPTY)), result);
+
+        result = StreamsGroupCommand.groupStatesFromString("notready");
+        assertEquals(new HashSet<>(List.of(GroupState.NOT_READY)), result);
+        result = StreamsGroupCommand.groupStatesFromString("notReady");
+        assertEquals(new HashSet<>(List.of(GroupState.NOT_READY)), result);
+
+        result = StreamsGroupCommand.groupStatesFromString("assigning");
+        assertEquals(new HashSet<>(List.of(GroupState.ASSIGNING)), result);
+        result = StreamsGroupCommand.groupStatesFromString("ASSIGNING");
+        assertEquals(new HashSet<>(List.of(GroupState.ASSIGNING)), result);
+
+        result = StreamsGroupCommand.groupStatesFromString("RECONCILING");
+        assertEquals(new HashSet<>(List.of(GroupState.RECONCILING)), result);
+        result = StreamsGroupCommand.groupStatesFromString("reconCILING");
+        assertEquals(new HashSet<>(List.of(GroupState.RECONCILING)), result);
+
+        result = StreamsGroupCommand.groupStatesFromString("STABLE");
+        assertEquals(new HashSet<>(List.of(GroupState.STABLE)), result);
+        result = StreamsGroupCommand.groupStatesFromString("stable");
+        assertEquals(new HashSet<>(List.of(GroupState.STABLE)), result);
+
+        result = StreamsGroupCommand.groupStatesFromString("DEAD");
+        assertEquals(new HashSet<>(List.of(GroupState.DEAD)), result);
+        result = StreamsGroupCommand.groupStatesFromString("dead");
+        assertEquals(new HashSet<>(List.of(GroupState.DEAD)), result);
+
+        assertThrow("preparingRebalance");
+        assertThrow("completingRebalance");
+        assertThrow("bad, wrong");
+        assertThrow("  bad, Stable");
+        assertThrow("   ,   ,");
+    }
+
+    StreamsGroupCommand.StreamsGroupService getStreamsGroupService(String[] 
args, Admin adminClient) {
+        StreamsGroupCommandOptions opts = new StreamsGroupCommandOptions(args);
+        return new StreamsGroupCommand.StreamsGroupService(opts, adminClient);
+    }
+
+    private static void assertThrow(final String wrongState) {
+        final Set<String> validStates = new 
HashSet<>(Arrays.asList("Assigning", "Dead", "Empty", "Reconciling", "Stable", 
"NotReady"));
+
+        final Exception exception = 
assertThrows(IllegalArgumentException.class, () -> 
StreamsGroupCommand.groupStatesFromString(wrongState));
+
+        assertTrue(exception.getMessage().contains(" Valid states are: "));
+
+        final String[] exceptionMessage = exception.getMessage().split(" Valid 
states are: ");
+        assertEquals("Invalid state list '" + wrongState + "'.", 
exceptionMessage[0]);
+        assertEquals(Arrays.stream(exceptionMessage[1].split(","))
+            .map(String::trim)
+            .collect(Collectors.toSet()), validStates);
+    }
+}

Reply via email to