This is an automated email from the ASF dual-hosted git repository.
mjsax pushed a commit to branch trunk
in repository https://gitbox.apache.org/repos/asf/kafka.git
The following commit(s) were added to refs/heads/trunk by this push:
new 44edff4bc7a KAFKA-19537: Improve Exit Code Handling in
StreamsGroupCommand (#20293)
44edff4bc7a is described below
commit 44edff4bc7ad38cc9134cbab730c1b71869e3edd
Author: Chih-Yuan Chien <[email protected]>
AuthorDate: Wed Oct 15 09:46:43 2025 +0800
KAFKA-19537: Improve Exit Code Handling in StreamsGroupCommand (#20293)
This pull request enhances the StreamsGroupCommand to return appropriate
exit codes, improving its reliability for scripting and automation.
- Exception handling is now consistently managed within the execute()
helper method.
- The main() explicitly calls Exit.exit(0) on success and Exit.exit(1)
on failure.
- Remove unused method
`org.apache.kafka.tools.streams.StreamsGroupCommandTest#describeTopicsResult`
Reviewers: Chia-Ping Tsai <[email protected]>, Matthias J. Sax
<[email protected]>
---
.../kafka/tools/streams/StreamsGroupCommand.java | 34 ++++++++++----
.../streams/DeleteStreamsGroupOffsetTest.java | 1 +
.../tools/streams/DescribeStreamsGroupTest.java | 7 +--
.../kafka/tools/streams/ListStreamsGroupTest.java | 3 +-
.../tools/streams/ResetStreamsGroupOffsetTest.java | 2 +
.../tools/streams/StreamsGroupCommandTest.java | 52 +++++++++++++++++-----
6 files changed, 75 insertions(+), 24 deletions(-)
diff --git
a/tools/src/main/java/org/apache/kafka/tools/streams/StreamsGroupCommand.java
b/tools/src/main/java/org/apache/kafka/tools/streams/StreamsGroupCommand.java
index 0c54f6c53f9..897615deef8 100644
---
a/tools/src/main/java/org/apache/kafka/tools/streams/StreamsGroupCommand.java
+++
b/tools/src/main/java/org/apache/kafka/tools/streams/StreamsGroupCommand.java
@@ -48,6 +48,7 @@ import org.apache.kafka.common.errors.GroupNotEmptyException;
import org.apache.kafka.common.errors.UnknownTopicOrPartitionException;
import org.apache.kafka.common.errors.UnsupportedVersionException;
import org.apache.kafka.common.protocol.Errors;
+import org.apache.kafka.common.utils.Exit;
import org.apache.kafka.common.utils.Utils;
import org.apache.kafka.server.util.CommandLineUtils;
import org.apache.kafka.tools.OffsetsUtils;
@@ -83,8 +84,14 @@ public class StreamsGroupCommand {
static final String MISSING_COLUMN_VALUE = "-";
public static void main(String[] args) {
- StreamsGroupCommandOptions opts = new StreamsGroupCommandOptions(args);
+ Exit.exit(execute(args));
+ }
+
+ public static int execute(String[] args) {
+ StreamsGroupCommandOptions opts = null;
+ int exitCode = 0;
try {
+ opts = new StreamsGroupCommandOptions(args);
opts.checkArgs();
// should have exactly one action
long numberOfActions = Stream.of(
@@ -95,15 +102,28 @@ public class StreamsGroupCommand {
opts.deleteOffsetsOpt
).filter(opts.options::has).count();
if (numberOfActions != 1)
- CommandLineUtils.printUsageAndExit(opts.parser, "Command must
include exactly one action: --list, --describe, --delete, --reset-offsets, or
--delete-offsets.");
+ throw new IllegalArgumentException("Command must include
exactly one action: --list, --describe, --delete, --reset-offsets, or
--delete-offsets.");
run(opts);
- } catch (OptionException e) {
- CommandLineUtils.printUsageAndExit(opts.parser, e.getMessage());
+ } catch (IllegalArgumentException | OptionException e) {
+ System.err.println(e.getMessage());
+ if (opts != null) {
+ try {
+ opts.parser.printHelpOn(System.err);
+ } catch (IOException ex) {
+ printError(e.getMessage(), Optional.of(ex));
+ }
+ }
+ exitCode = 1;
+ } catch (Throwable e) {
+ printError("Executing streams group command failed due to " +
e.getMessage(), Optional.of(e));
+ exitCode = 1;
}
+
+ return exitCode;
}
- public static void run(StreamsGroupCommandOptions opts) {
+ public static void run(StreamsGroupCommandOptions opts) throws
ExecutionException, InterruptedException {
try (StreamsGroupService streamsGroupService = new
StreamsGroupService(opts, Map.of())) {
if (opts.options.has(opts.listOpt)) {
streamsGroupService.listGroups();
@@ -123,10 +143,6 @@ public class StreamsGroupCommand {
} else {
throw new IllegalArgumentException("Unknown action!");
}
- } catch (IllegalArgumentException e) {
- CommandLineUtils.printUsageAndExit(opts.parser, e.getMessage());
- } catch (Throwable e) {
- printError("Executing streams group command failed due to " +
e.getMessage(), Optional.of(e));
}
}
diff --git
a/tools/src/test/java/org/apache/kafka/tools/streams/DeleteStreamsGroupOffsetTest.java
b/tools/src/test/java/org/apache/kafka/tools/streams/DeleteStreamsGroupOffsetTest.java
index b6e32e8ebda..e932e89a908 100644
---
a/tools/src/test/java/org/apache/kafka/tools/streams/DeleteStreamsGroupOffsetTest.java
+++
b/tools/src/test/java/org/apache/kafka/tools/streams/DeleteStreamsGroupOffsetTest.java
@@ -154,6 +154,7 @@ public class DeleteStreamsGroupOffsetTest {
getStreamsGroupService(args);
} finally {
assertTrue(exited.get());
+ Exit.resetExitProcedure();
}
}
diff --git
a/tools/src/test/java/org/apache/kafka/tools/streams/DescribeStreamsGroupTest.java
b/tools/src/test/java/org/apache/kafka/tools/streams/DescribeStreamsGroupTest.java
index 6d7ea57b8ac..723e928e770 100644
---
a/tools/src/test/java/org/apache/kafka/tools/streams/DescribeStreamsGroupTest.java
+++
b/tools/src/test/java/org/apache/kafka/tools/streams/DescribeStreamsGroupTest.java
@@ -116,6 +116,7 @@ public class DescribeStreamsGroupTest {
getStreamsGroupService(args);
} finally {
assertTrue(exited.get());
+ Exit.resetExitProcedure();
}
}
@@ -290,7 +291,7 @@ public class DescribeStreamsGroupTest {
}
private static void validateDescribeOutput(List<String> args, String
errorMessage) {
- String output = ToolsTestUtils.grabConsoleOutput(() ->
StreamsGroupCommand.main(args.toArray(new String[0])));
+ String output = ToolsTestUtils.grabConsoleOutput(() -> assertEquals(1,
StreamsGroupCommand.execute(args.toArray(new String[0]))));
assertEquals(errorMessage, output.trim());
}
@@ -302,7 +303,7 @@ public class DescribeStreamsGroupTest {
) throws InterruptedException {
final AtomicReference<String> out = new AtomicReference<>("");
TestUtils.waitForCondition(() -> {
- String output = ToolsTestUtils.grabConsoleOutput(() ->
StreamsGroupCommand.main(args.toArray(new String[0])));
+ String output = ToolsTestUtils.grabConsoleOutput(() ->
assertEquals(0, StreamsGroupCommand.execute(args.toArray(new String[0]))));
out.set(output);
String[] lines = output.split("\n");
@@ -337,7 +338,7 @@ public class DescribeStreamsGroupTest {
) throws InterruptedException {
final AtomicReference<String> out = new AtomicReference<>("");
TestUtils.waitForCondition(() -> {
- String output = ToolsTestUtils.grabConsoleOutput(() ->
StreamsGroupCommand.main(args.toArray(new String[0])));
+ String output = ToolsTestUtils.grabConsoleOutput(() ->
assertEquals(0, StreamsGroupCommand.execute(args.toArray(new String[0]))));
out.set(output);
String[] lines = output.split("\n");
diff --git
a/tools/src/test/java/org/apache/kafka/tools/streams/ListStreamsGroupTest.java
b/tools/src/test/java/org/apache/kafka/tools/streams/ListStreamsGroupTest.java
index 57dc8941f2b..ecbab7b7269 100644
---
a/tools/src/test/java/org/apache/kafka/tools/streams/ListStreamsGroupTest.java
+++
b/tools/src/test/java/org/apache/kafka/tools/streams/ListStreamsGroupTest.java
@@ -56,6 +56,7 @@ import java.util.stream.Collectors;
import joptsimple.OptionException;
import static
org.apache.kafka.streams.integration.utils.IntegrationTestUtils.startApplicationAndWaitUntilRunning;
+import static org.junit.jupiter.api.Assertions.assertEquals;
@Timeout(600)
@Tag("integration")
@@ -223,7 +224,7 @@ public class ListStreamsGroupTest {
) throws InterruptedException {
final AtomicReference<String> out = new AtomicReference<>("");
TestUtils.waitForCondition(() -> {
- String output = ToolsTestUtils.grabConsoleOutput(() ->
StreamsGroupCommand.main(args.toArray(new String[0])));
+ String output = ToolsTestUtils.grabConsoleOutput(() ->
assertEquals(0, StreamsGroupCommand.execute(args.toArray(new String[0]))));
out.set(output);
String[] lines = output.split("\n");
diff --git
a/tools/src/test/java/org/apache/kafka/tools/streams/ResetStreamsGroupOffsetTest.java
b/tools/src/test/java/org/apache/kafka/tools/streams/ResetStreamsGroupOffsetTest.java
index ed888abe378..d5a86c70305 100644
---
a/tools/src/test/java/org/apache/kafka/tools/streams/ResetStreamsGroupOffsetTest.java
+++
b/tools/src/test/java/org/apache/kafka/tools/streams/ResetStreamsGroupOffsetTest.java
@@ -152,6 +152,7 @@ public class ResetStreamsGroupOffsetTest {
getStreamsGroupService(args);
} finally {
assertTrue(exited.get());
+ Exit.resetExitProcedure();
}
}
@@ -168,6 +169,7 @@ public class ResetStreamsGroupOffsetTest {
getStreamsGroupService(args);
} finally {
assertTrue(exited.get());
+ Exit.resetExitProcedure();
}
}
diff --git
a/tools/src/test/java/org/apache/kafka/tools/streams/StreamsGroupCommandTest.java
b/tools/src/test/java/org/apache/kafka/tools/streams/StreamsGroupCommandTest.java
index c91a4b2090b..cd1b66d993f 100644
---
a/tools/src/test/java/org/apache/kafka/tools/streams/StreamsGroupCommandTest.java
+++
b/tools/src/test/java/org/apache/kafka/tools/streams/StreamsGroupCommandTest.java
@@ -17,7 +17,6 @@
package org.apache.kafka.tools.streams;
import org.apache.kafka.clients.admin.Admin;
-import org.apache.kafka.clients.admin.AdminClientTestUtils;
import org.apache.kafka.clients.admin.DeleteStreamsGroupsOptions;
import org.apache.kafka.clients.admin.DeleteStreamsGroupsResult;
import org.apache.kafka.clients.admin.DeleteTopicsResult;
@@ -49,10 +48,10 @@ import org.apache.kafka.test.TestUtils;
import org.junit.jupiter.api.Test;
import org.mockito.ArgumentMatchers;
+import org.mockito.MockedStatic;
import java.util.ArrayList;
import java.util.Arrays;
-import java.util.Collection;
import java.util.HashMap;
import java.util.HashSet;
import java.util.List;
@@ -60,9 +59,9 @@ import java.util.Map;
import java.util.Objects;
import java.util.Optional;
import java.util.Set;
+import java.util.concurrent.ExecutionException;
import java.util.function.Function;
import java.util.stream.Collectors;
-import java.util.stream.IntStream;
import joptsimple.OptionException;
@@ -77,6 +76,7 @@ import static org.mockito.ArgumentMatchers.any;
import static org.mockito.ArgumentMatchers.anyCollection;
import static org.mockito.ArgumentMatchers.eq;
import static org.mockito.Mockito.mock;
+import static org.mockito.Mockito.mockStatic;
import static org.mockito.Mockito.times;
import static org.mockito.Mockito.verify;
import static org.mockito.Mockito.when;
@@ -477,6 +477,44 @@ public class StreamsGroupCommandTest {
service.close();
}
+ @Test
+ public void testExitCodeOnInvalidOption() {
+ String[] args = new String[]{"--invalid-option"};
+ assertEquals(1, StreamsGroupCommand.execute(args));
+ }
+
+ @Test
+ public void testExitCodeOnIllegalArguments() {
+ String[] args = new String[]{"--bootstrap-server", BOOTSTRAP_SERVERS};
+ assertEquals(1, StreamsGroupCommand.execute(args));
+ }
+
+ @Test
+ public void testExitCodeOnExecutionException() {
+ try (MockedStatic<StreamsGroupCommand> mockedStreamGroupCommand =
mockStatic(StreamsGroupCommand.class)) {
+ String[] args = new String[]{"--bootstrap-server",
BOOTSTRAP_SERVERS, "--list"};
+ mockedStreamGroupCommand.when(() ->
StreamsGroupCommand.execute(any(String[].class))).thenCallRealMethod();
+ mockedStreamGroupCommand.when(() ->
StreamsGroupCommand.run(any(StreamsGroupCommandOptions.class))).thenThrow(new
ExecutionException("ExecutionException", new RuntimeException()));
+
+ assertEquals(1, StreamsGroupCommand.execute(args));
+
+ mockedStreamGroupCommand.verify(() ->
StreamsGroupCommand.run(any(StreamsGroupCommandOptions.class)));
+ }
+ }
+
+ @Test
+ public void testExitCodeOnInterruptedException() {
+ try (MockedStatic<StreamsGroupCommand> mockedStreamGroupCommand =
mockStatic(StreamsGroupCommand.class)) {
+ String[] args = new String[]{"--bootstrap-server",
BOOTSTRAP_SERVERS, "--list"};
+ mockedStreamGroupCommand.when(() ->
StreamsGroupCommand.execute(any(String[].class))).thenCallRealMethod();
+ mockedStreamGroupCommand.when(() ->
StreamsGroupCommand.run(any(StreamsGroupCommandOptions.class))).thenThrow(new
InterruptedException("InterruptedException"));
+
+ assertEquals(1, StreamsGroupCommand.execute(args));
+
+ mockedStreamGroupCommand.verify(() ->
StreamsGroupCommand.run(any(StreamsGroupCommandOptions.class)));
+ }
+ }
+
private ListGroupsResult listGroupResult(String groupId) {
ListGroupsResult listGroupsResult = mock(ListGroupsResult.class);
when(listGroupsResult.all()).thenReturn(KafkaFuture.completedFuture(List.of(
@@ -529,14 +567,6 @@ public class StreamsGroupCommandTest {
return new DescribeStreamsGroupsResult(Map.of(groupId, future));
}
- private DescribeTopicsResult describeTopicsResult(Collection<String>
topics, int numOfPartitions) {
- var topicDescriptions =
topics.stream().collect(Collectors.toMap(Function.identity(),
- topic -> new TopicDescription(topic, false, IntStream.range(0,
numOfPartitions)
- .mapToObj(i -> new TopicPartitionInfo(i, null, List.of(),
List.of()))
- .toList())));
- return AdminClientTestUtils.describeTopicsResult(topicDescriptions);
- }
-
private ListOffsetsResult listOffsetsResult() {
List<TopicPartition> topicPartitions = new ArrayList<>();
topicPartitions.add(new TopicPartition("topic1", 0));