This is an automated email from the ASF dual-hosted git repository. cadonna pushed a commit to branch trunk in repository https://gitbox.apache.org/repos/asf/kafka.git
The following commit(s) were added to refs/heads/trunk by this push: new fa62bce63df KAFKA-18287: Add support for kafka-streams-groups.sh --list (#19422) fa62bce63df is described below commit fa62bce63df7b48a0af37be9625f952116ad35c5 Author: Alieh Saeedi <107070585+aliehsaee...@users.noreply.github.com> AuthorDate: Wed Apr 9 17:57:34 2025 +0200 KAFKA-18287: Add support for kafka-streams-groups.sh --list (#19422) Implement the core of kafka-streams-groups.sh for `KIP-1071` - Implement `--list` and its options: (only `--state`) Reviewers: Bruno Cadonna <cado...@apache.org> --- bin/kafka-streams-groups.sh | 17 ++ .../kafka/tools/streams/StreamsGroupCommand.java | 168 +++++++++++++++++++ .../tools/streams/StreamsGroupCommandOptions.java | 70 ++++++++ .../tools/streams/StreamsGroupCommandTest.java | 185 +++++++++++++++++++++ 4 files changed, 440 insertions(+) diff --git a/bin/kafka-streams-groups.sh b/bin/kafka-streams-groups.sh new file mode 100755 index 00000000000..a204f4d2c74 --- /dev/null +++ b/bin/kafka-streams-groups.sh @@ -0,0 +1,17 @@ +#!/bin/bash +# Licensed to the Apache Software Foundation (ASF) under one or more +# contributor license agreements. See the NOTICE file distributed with +# this work for additional information regarding copyright ownership. +# The ASF licenses this file to You under the Apache License, Version 2.0 +# (the "License"); you may not use this file except in compliance with +# the License. You may obtain a copy of the License at +# +# http://www.apache.org/licenses/LICENSE-2.0 +# +# Unless required by applicable law or agreed to in writing, software +# distributed under the License is distributed on an "AS IS" BASIS, +# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. +# See the License for the specific language governing permissions and +# limitations under the License. + +exec $(dirname $0)/kafka-run-class.sh org.apache.kafka.tools.streams.StreamsGroupCommand "$@" diff --git a/tools/src/main/java/org/apache/kafka/tools/streams/StreamsGroupCommand.java b/tools/src/main/java/org/apache/kafka/tools/streams/StreamsGroupCommand.java new file mode 100644 index 00000000000..91a22e87281 --- /dev/null +++ b/tools/src/main/java/org/apache/kafka/tools/streams/StreamsGroupCommand.java @@ -0,0 +1,168 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ +package org.apache.kafka.tools.streams; + +import org.apache.kafka.clients.CommonClientConfigs; +import org.apache.kafka.clients.admin.Admin; +import org.apache.kafka.clients.admin.GroupListing; +import org.apache.kafka.clients.admin.ListGroupsOptions; +import org.apache.kafka.clients.admin.ListGroupsResult; +import org.apache.kafka.common.GroupState; +import org.apache.kafka.common.GroupType; +import org.apache.kafka.common.utils.Utils; +import org.apache.kafka.server.util.CommandLineUtils; + +import java.io.IOException; +import java.util.ArrayList; +import java.util.Arrays; +import java.util.Collection; +import java.util.List; +import java.util.Map; +import java.util.Optional; +import java.util.Properties; +import java.util.Set; +import java.util.concurrent.ExecutionException; +import java.util.stream.Collectors; +import java.util.stream.Stream; + +import joptsimple.OptionException; + +public class StreamsGroupCommand { + + public static void main(String[] args) { + StreamsGroupCommandOptions opts = new StreamsGroupCommandOptions(args); + try { + opts.checkArgs(); + + // should have exactly one action + long numberOfActions = Stream.of(opts.listOpt).filter(opts.options::has).count(); + if (numberOfActions != 1) + CommandLineUtils.printUsageAndExit(opts.parser, "Command must include exactly one action: --list."); + + run(opts); + } catch (OptionException e) { + CommandLineUtils.printUsageAndExit(opts.parser, e.getMessage()); + } + } + + public static void run(StreamsGroupCommandOptions opts) { + try (StreamsGroupService streamsGroupService = new StreamsGroupService(opts, Map.of())) { + if (opts.options.has(opts.listOpt)) { + streamsGroupService.listGroups(); + } else { + throw new IllegalArgumentException("Unknown action!"); + } + } catch (IllegalArgumentException e) { + CommandLineUtils.printUsageAndExit(opts.parser, e.getMessage()); + } catch (Throwable e) { + printError("Executing streams group command failed due to " + e.getMessage(), Optional.of(e)); + } + } + + static Set<GroupState> groupStatesFromString(String input) { + Set<GroupState> parsedStates = + Arrays.stream(input.split(",")).map(s -> GroupState.parse(s.trim())).collect(Collectors.toSet()); + Set<GroupState> validStates = GroupState.groupStatesForType(GroupType.STREAMS); + if (!validStates.containsAll(parsedStates)) { + throw new IllegalArgumentException("Invalid state list '" + input + "'. Valid states are: " + + validStates.stream().map(GroupState::toString).collect(Collectors.joining(", "))); + } + return parsedStates; + } + + public static void printError(String msg, Optional<Throwable> e) { + System.out.println("\nError: " + msg); + e.ifPresent(Throwable::printStackTrace); + } + + // Visibility for testing + static class StreamsGroupService implements AutoCloseable { + final StreamsGroupCommandOptions opts; + private final Admin adminClient; + + public StreamsGroupService(StreamsGroupCommandOptions opts, Map<String, String> configOverrides) { + this.opts = opts; + try { + this.adminClient = createAdminClient(configOverrides); + } catch (IOException e) { + throw new RuntimeException(e); + } + } + + public StreamsGroupService(StreamsGroupCommandOptions opts, Admin adminClient) { + this.opts = opts; + this.adminClient = adminClient; + } + + public void listGroups() throws ExecutionException, InterruptedException { + if (opts.options.has(opts.stateOpt)) { + String stateValue = opts.options.valueOf(opts.stateOpt); + Set<GroupState> states = (stateValue == null || stateValue.isEmpty()) + ? Set.of() + : groupStatesFromString(stateValue); + List<GroupListing> listings = listStreamsGroupsInStates(states); + printGroupInfo(listings); + } else + listStreamsGroups().forEach(System.out::println); + } + + List<String> listStreamsGroups() { + try { + ListGroupsResult result = adminClient.listGroups(new ListGroupsOptions() + .timeoutMs(opts.options.valueOf(opts.timeoutMsOpt).intValue()) + .withTypes(Set.of(GroupType.STREAMS))); + Collection<GroupListing> listings = result.all().get(); + return listings.stream().map(GroupListing::groupId).collect(Collectors.toList()); + } catch (InterruptedException | ExecutionException e) { + throw new RuntimeException(e); + } + } + + List<GroupListing> listStreamsGroupsInStates(Set<GroupState> states) throws ExecutionException, InterruptedException { + ListGroupsResult result = adminClient.listGroups(new ListGroupsOptions() + .timeoutMs(opts.options.valueOf(opts.timeoutMsOpt).intValue()) + .withTypes(Set.of(GroupType.STREAMS)) + .inGroupStates(states)); + return new ArrayList<>(result.all().get()); + } + + private void printGroupInfo(List<GroupListing> groups) { + // find proper columns width + int maxGroupLen = 15; + for (GroupListing group : groups) { + maxGroupLen = Math.max(maxGroupLen, group.groupId().length()); + } + System.out.printf("%" + (-maxGroupLen) + "s %s\n", "GROUP", "STATE"); + for (GroupListing group : groups) { + String groupId = group.groupId(); + String state = group.groupState().orElse(GroupState.UNKNOWN).toString(); + System.out.printf("%" + (-maxGroupLen) + "s %s\n", groupId, state); + } + } + + public void close() { + adminClient.close(); + } + + protected Admin createAdminClient(Map<String, String> configOverrides) throws IOException { + Properties props = opts.options.has(opts.commandConfigOpt) ? Utils.loadProps(opts.options.valueOf(opts.commandConfigOpt)) : new Properties(); + props.put(CommonClientConfigs.BOOTSTRAP_SERVERS_CONFIG, opts.options.valueOf(opts.bootstrapServerOpt)); + props.putAll(configOverrides); + return Admin.create(props); + } + } +} diff --git a/tools/src/main/java/org/apache/kafka/tools/streams/StreamsGroupCommandOptions.java b/tools/src/main/java/org/apache/kafka/tools/streams/StreamsGroupCommandOptions.java new file mode 100644 index 00000000000..ced73b44bf1 --- /dev/null +++ b/tools/src/main/java/org/apache/kafka/tools/streams/StreamsGroupCommandOptions.java @@ -0,0 +1,70 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ +package org.apache.kafka.tools.streams; + +import org.apache.kafka.server.util.CommandDefaultOptions; +import org.apache.kafka.server.util.CommandLineUtils; + +import joptsimple.OptionSpec; + +public class StreamsGroupCommandOptions extends CommandDefaultOptions { + public static final String BOOTSTRAP_SERVER_DOC = "REQUIRED: The server(s) to connect to."; + public static final String LIST_DOC = "List all streams groups."; + public static final String TIMEOUT_MS_DOC = "The timeout that can be set for some use cases. For example, it can be used when describing the group " + + "to specify the maximum amount of time in milliseconds to wait before the group stabilizes."; + public static final String COMMAND_CONFIG_DOC = "Property file containing configs to be passed to Admin Client."; + public static final String STATE_DOC = "When specified with '--list', it displays the state of all groups. It can also be used to list groups with specific states. " + + "Valid values are Empty, NotReady, Stable, Assigning, Reconciling, and Dead."; + + public final OptionSpec<String> bootstrapServerOpt; + public final OptionSpec<Void> listOpt; + public final OptionSpec<Long> timeoutMsOpt; + public final OptionSpec<String> commandConfigOpt; + public final OptionSpec<String> stateOpt; + + + public StreamsGroupCommandOptions(String[] args) { + super(args); + + bootstrapServerOpt = parser.accepts("bootstrap-server", BOOTSTRAP_SERVER_DOC) + .withRequiredArg() + .describedAs("server to connect to") + .ofType(String.class); + listOpt = parser.accepts("list", LIST_DOC); + timeoutMsOpt = parser.accepts("timeout", TIMEOUT_MS_DOC) + .withRequiredArg() + .describedAs("timeout (ms)") + .ofType(Long.class) + .defaultsTo(5000L); + commandConfigOpt = parser.accepts("command-config", COMMAND_CONFIG_DOC) + .withRequiredArg() + .describedAs("command config property file") + .ofType(String.class); + stateOpt = parser.accepts("state", STATE_DOC) + .availableIf(listOpt) + .withOptionalArg() + .ofType(String.class); + + options = parser.parse(args); + } + + public void checkArgs() { + CommandLineUtils.maybePrintHelpOrVersion(this, "This tool helps to list streams groups."); + + CommandLineUtils.checkRequiredArgs(parser, options, bootstrapServerOpt); + } +} diff --git a/tools/src/test/java/org/apache/kafka/tools/streams/StreamsGroupCommandTest.java b/tools/src/test/java/org/apache/kafka/tools/streams/StreamsGroupCommandTest.java new file mode 100644 index 00000000000..b97bdeb753c --- /dev/null +++ b/tools/src/test/java/org/apache/kafka/tools/streams/StreamsGroupCommandTest.java @@ -0,0 +1,185 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ +package org.apache.kafka.tools.streams; + +import org.apache.kafka.clients.admin.Admin; +import org.apache.kafka.clients.admin.GroupListing; +import org.apache.kafka.clients.admin.KafkaAdminClient; +import org.apache.kafka.clients.admin.ListGroupsOptions; +import org.apache.kafka.clients.admin.ListGroupsResult; +import org.apache.kafka.clients.admin.MockAdminClient; +import org.apache.kafka.common.GroupState; +import org.apache.kafka.common.GroupType; +import org.apache.kafka.common.KafkaFuture; +import org.apache.kafka.test.TestUtils; + +import org.junit.jupiter.api.Test; + +import java.util.Arrays; +import java.util.HashSet; +import java.util.List; +import java.util.Objects; +import java.util.Optional; +import java.util.Set; +import java.util.stream.Collectors; + +import joptsimple.OptionException; + +import static org.junit.jupiter.api.Assertions.assertEquals; +import static org.junit.jupiter.api.Assertions.assertThrows; +import static org.junit.jupiter.api.Assertions.assertTrue; +import static org.mockito.ArgumentMatchers.any; +import static org.mockito.Mockito.mock; +import static org.mockito.Mockito.when; + +public class StreamsGroupCommandTest { + + @Test + public void testListStreamsGroups() throws Exception { + String firstGroup = "first-group"; + String secondGroup = "second-group"; + String bootstrapServer = "localhost:9092"; + + String[] cgcArgs = new String[]{"--bootstrap-server", bootstrapServer, "--list"}; + Admin adminClient = mock(KafkaAdminClient.class); + ListGroupsResult result = mock(ListGroupsResult.class); + when(result.all()).thenReturn(KafkaFuture.completedFuture(Arrays.asList( + new GroupListing(firstGroup, Optional.of(GroupType.STREAMS), "streams", Optional.of(GroupState.STABLE)), + new GroupListing(secondGroup, Optional.of(GroupType.STREAMS), "streams", Optional.of(GroupState.EMPTY)) + ))); + when(adminClient.listGroups(any(ListGroupsOptions.class))).thenReturn(result); + StreamsGroupCommand.StreamsGroupService service = getStreamsGroupService(cgcArgs, adminClient); + Set<String> expectedGroups = new HashSet<>(Arrays.asList(firstGroup, secondGroup)); + + final Set[] foundGroups = new Set[]{Set.of()}; + TestUtils.waitForCondition(() -> { + foundGroups[0] = new HashSet<>(service.listStreamsGroups()); + return Objects.equals(expectedGroups, foundGroups[0]); + }, "Expected --list to show groups " + expectedGroups + ", but found " + foundGroups[0] + "."); + service.close(); + } + + @Test + public void testListWithUnrecognizedOption() { + String bootstrapServer = "localhost:9092"; + String[] cgcArgs = new String[]{"--frivolous-nonsense", "--bootstrap-server", bootstrapServer, "--list"}; + final Exception exception = assertThrows(OptionException.class, () -> { + getStreamsGroupService(cgcArgs, new MockAdminClient()); + }); + assertEquals("frivolous-nonsense is not a recognized option", exception.getMessage()); + } + + @Test + public void testListStreamsGroupsWithStates() throws Exception { + String firstGroup = "first-group"; + String secondGroup = "second-group"; + String bootstrapServer = "localhost:9092"; + + String[] cgcArgs = new String[]{"--bootstrap-server", bootstrapServer, "--list", "--state"}; + Admin adminClient = mock(KafkaAdminClient.class); + ListGroupsResult resultWithAllStates = mock(ListGroupsResult.class); + when(resultWithAllStates.all()).thenReturn(KafkaFuture.completedFuture(Arrays.asList( + new GroupListing(firstGroup, Optional.of(GroupType.STREAMS), "streams", Optional.of(GroupState.STABLE)), + new GroupListing(secondGroup, Optional.of(GroupType.STREAMS), "streams", Optional.of(GroupState.EMPTY)) + ))); + when(adminClient.listGroups(any(ListGroupsOptions.class))).thenReturn(resultWithAllStates); + StreamsGroupCommand.StreamsGroupService service = getStreamsGroupService(cgcArgs, adminClient); + Set<GroupListing> expectedListing = new HashSet<>(Arrays.asList( + new GroupListing(firstGroup, Optional.of(GroupType.STREAMS), "streams", Optional.of(GroupState.STABLE)), + new GroupListing(secondGroup, Optional.of(GroupType.STREAMS), "streams", Optional.of(GroupState.EMPTY)))); + + final Set[] foundListing = new Set[]{Set.of()}; + TestUtils.waitForCondition(() -> { + foundListing[0] = new HashSet<>(service.listStreamsGroupsInStates(new HashSet<>(Arrays.asList(GroupState.values())))); + return Objects.equals(expectedListing, foundListing[0]); + }, "Expected to show groups " + expectedListing + ", but found " + foundListing[0]); + + ListGroupsResult resultWithStableState = mock(ListGroupsResult.class); + when(resultWithStableState.all()).thenReturn(KafkaFuture.completedFuture(List.of( + new GroupListing(firstGroup, Optional.of(GroupType.STREAMS), "streams", Optional.of(GroupState.STABLE)) + ))); + when(adminClient.listGroups(any(ListGroupsOptions.class))).thenReturn(resultWithStableState); + Set<GroupListing> expectedListingStable = Set.of( + new GroupListing(firstGroup, Optional.of(GroupType.STREAMS), "streams", Optional.of(GroupState.STABLE))); + + foundListing[0] = Set.of(); + + TestUtils.waitForCondition(() -> { + foundListing[0] = new HashSet<>(service.listStreamsGroupsInStates(Set.of(GroupState.STABLE))); + return Objects.equals(expectedListingStable, foundListing[0]); + }, "Expected to show groups " + expectedListingStable + ", but found " + foundListing[0]); + service.close(); + } + + @Test + public void testGroupStatesFromString() { + Set<GroupState> result = StreamsGroupCommand.groupStatesFromString("empty"); + assertEquals(new HashSet<>(List.of(GroupState.EMPTY)), result); + result = StreamsGroupCommand.groupStatesFromString("EMPTY"); + assertEquals(new HashSet<>(List.of(GroupState.EMPTY)), result); + + result = StreamsGroupCommand.groupStatesFromString("notready"); + assertEquals(new HashSet<>(List.of(GroupState.NOT_READY)), result); + result = StreamsGroupCommand.groupStatesFromString("notReady"); + assertEquals(new HashSet<>(List.of(GroupState.NOT_READY)), result); + + result = StreamsGroupCommand.groupStatesFromString("assigning"); + assertEquals(new HashSet<>(List.of(GroupState.ASSIGNING)), result); + result = StreamsGroupCommand.groupStatesFromString("ASSIGNING"); + assertEquals(new HashSet<>(List.of(GroupState.ASSIGNING)), result); + + result = StreamsGroupCommand.groupStatesFromString("RECONCILING"); + assertEquals(new HashSet<>(List.of(GroupState.RECONCILING)), result); + result = StreamsGroupCommand.groupStatesFromString("reconCILING"); + assertEquals(new HashSet<>(List.of(GroupState.RECONCILING)), result); + + result = StreamsGroupCommand.groupStatesFromString("STABLE"); + assertEquals(new HashSet<>(List.of(GroupState.STABLE)), result); + result = StreamsGroupCommand.groupStatesFromString("stable"); + assertEquals(new HashSet<>(List.of(GroupState.STABLE)), result); + + result = StreamsGroupCommand.groupStatesFromString("DEAD"); + assertEquals(new HashSet<>(List.of(GroupState.DEAD)), result); + result = StreamsGroupCommand.groupStatesFromString("dead"); + assertEquals(new HashSet<>(List.of(GroupState.DEAD)), result); + + assertThrow("preparingRebalance"); + assertThrow("completingRebalance"); + assertThrow("bad, wrong"); + assertThrow(" bad, Stable"); + assertThrow(" , ,"); + } + + StreamsGroupCommand.StreamsGroupService getStreamsGroupService(String[] args, Admin adminClient) { + StreamsGroupCommandOptions opts = new StreamsGroupCommandOptions(args); + return new StreamsGroupCommand.StreamsGroupService(opts, adminClient); + } + + private static void assertThrow(final String wrongState) { + final Set<String> validStates = new HashSet<>(Arrays.asList("Assigning", "Dead", "Empty", "Reconciling", "Stable", "NotReady")); + + final Exception exception = assertThrows(IllegalArgumentException.class, () -> StreamsGroupCommand.groupStatesFromString(wrongState)); + + assertTrue(exception.getMessage().contains(" Valid states are: ")); + + final String[] exceptionMessage = exception.getMessage().split(" Valid states are: "); + assertEquals("Invalid state list '" + wrongState + "'.", exceptionMessage[0]); + assertEquals(Arrays.stream(exceptionMessage[1].split(",")) + .map(String::trim) + .collect(Collectors.toSet()), validStates); + } +}