This is an automated email from the ASF dual-hosted git repository.

mjsax pushed a commit to branch trunk
in repository https://gitbox.apache.org/repos/asf/kafka.git


The following commit(s) were added to refs/heads/trunk by this push:
     new 44edff4bc7a KAFKA-19537: Improve Exit Code Handling in 
StreamsGroupCommand (#20293)
44edff4bc7a is described below

commit 44edff4bc7ad38cc9134cbab730c1b71869e3edd
Author: Chih-Yuan Chien <[email protected]>
AuthorDate: Wed Oct 15 09:46:43 2025 +0800

    KAFKA-19537: Improve Exit Code Handling in StreamsGroupCommand (#20293)
    
    This pull request enhances the StreamsGroupCommand to return appropriate
    exit codes, improving its reliability for scripting and automation.
    
    - Exception handling is now consistently managed within the execute()
    helper method.
    - The main() explicitly calls Exit.exit(0) on success and Exit.exit(1)
    on failure.
    - Remove unused method
    
    
`org.apache.kafka.tools.streams.StreamsGroupCommandTest#describeTopicsResult`
    
    Reviewers: Chia-Ping Tsai <[email protected]>, Matthias J. Sax
    <[email protected]>
---
 .../kafka/tools/streams/StreamsGroupCommand.java   | 34 ++++++++++----
 .../streams/DeleteStreamsGroupOffsetTest.java      |  1 +
 .../tools/streams/DescribeStreamsGroupTest.java    |  7 +--
 .../kafka/tools/streams/ListStreamsGroupTest.java  |  3 +-
 .../tools/streams/ResetStreamsGroupOffsetTest.java |  2 +
 .../tools/streams/StreamsGroupCommandTest.java     | 52 +++++++++++++++++-----
 6 files changed, 75 insertions(+), 24 deletions(-)

diff --git 
a/tools/src/main/java/org/apache/kafka/tools/streams/StreamsGroupCommand.java 
b/tools/src/main/java/org/apache/kafka/tools/streams/StreamsGroupCommand.java
index 0c54f6c53f9..897615deef8 100644
--- 
a/tools/src/main/java/org/apache/kafka/tools/streams/StreamsGroupCommand.java
+++ 
b/tools/src/main/java/org/apache/kafka/tools/streams/StreamsGroupCommand.java
@@ -48,6 +48,7 @@ import org.apache.kafka.common.errors.GroupNotEmptyException;
 import org.apache.kafka.common.errors.UnknownTopicOrPartitionException;
 import org.apache.kafka.common.errors.UnsupportedVersionException;
 import org.apache.kafka.common.protocol.Errors;
+import org.apache.kafka.common.utils.Exit;
 import org.apache.kafka.common.utils.Utils;
 import org.apache.kafka.server.util.CommandLineUtils;
 import org.apache.kafka.tools.OffsetsUtils;
@@ -83,8 +84,14 @@ public class StreamsGroupCommand {
     static final String MISSING_COLUMN_VALUE = "-";
 
     public static void main(String[] args) {
-        StreamsGroupCommandOptions opts = new StreamsGroupCommandOptions(args);
+        Exit.exit(execute(args));
+    }
+
+    public static int execute(String[] args) {
+        StreamsGroupCommandOptions opts = null;
+        int exitCode = 0;
         try {
+            opts = new StreamsGroupCommandOptions(args);
             opts.checkArgs();
             // should have exactly one action
             long numberOfActions = Stream.of(
@@ -95,15 +102,28 @@ public class StreamsGroupCommand {
                 opts.deleteOffsetsOpt
             ).filter(opts.options::has).count();
             if (numberOfActions != 1)
-                CommandLineUtils.printUsageAndExit(opts.parser, "Command must 
include exactly one action: --list, --describe, --delete, --reset-offsets, or 
--delete-offsets.");
+                throw new IllegalArgumentException("Command must include 
exactly one action: --list, --describe, --delete, --reset-offsets, or 
--delete-offsets.");
 
             run(opts);
-        } catch (OptionException e) {
-            CommandLineUtils.printUsageAndExit(opts.parser, e.getMessage());
+        } catch (IllegalArgumentException | OptionException e) {
+            System.err.println(e.getMessage());
+            if (opts != null) {
+                try {
+                    opts.parser.printHelpOn(System.err);
+                } catch (IOException ex) {
+                    printError(e.getMessage(), Optional.of(ex));
+                }
+            }
+            exitCode = 1;
+        } catch (Throwable e) {
+            printError("Executing streams group command failed due to " + 
e.getMessage(), Optional.of(e));
+            exitCode = 1;
         }
+
+        return exitCode;
     }
 
-    public static void run(StreamsGroupCommandOptions opts) {
+    public static void run(StreamsGroupCommandOptions opts) throws 
ExecutionException, InterruptedException {
         try (StreamsGroupService streamsGroupService = new 
StreamsGroupService(opts, Map.of())) {
             if (opts.options.has(opts.listOpt)) {
                 streamsGroupService.listGroups();
@@ -123,10 +143,6 @@ public class StreamsGroupCommand {
             } else {
                 throw new IllegalArgumentException("Unknown action!");
             }
-        } catch (IllegalArgumentException e) {
-            CommandLineUtils.printUsageAndExit(opts.parser, e.getMessage());
-        } catch (Throwable e) {
-            printError("Executing streams group command failed due to " + 
e.getMessage(), Optional.of(e));
         }
     }
 
diff --git 
a/tools/src/test/java/org/apache/kafka/tools/streams/DeleteStreamsGroupOffsetTest.java
 
b/tools/src/test/java/org/apache/kafka/tools/streams/DeleteStreamsGroupOffsetTest.java
index b6e32e8ebda..e932e89a908 100644
--- 
a/tools/src/test/java/org/apache/kafka/tools/streams/DeleteStreamsGroupOffsetTest.java
+++ 
b/tools/src/test/java/org/apache/kafka/tools/streams/DeleteStreamsGroupOffsetTest.java
@@ -154,6 +154,7 @@ public class DeleteStreamsGroupOffsetTest {
             getStreamsGroupService(args);
         } finally {
             assertTrue(exited.get());
+            Exit.resetExitProcedure();
         }
     }
 
diff --git 
a/tools/src/test/java/org/apache/kafka/tools/streams/DescribeStreamsGroupTest.java
 
b/tools/src/test/java/org/apache/kafka/tools/streams/DescribeStreamsGroupTest.java
index 6d7ea57b8ac..723e928e770 100644
--- 
a/tools/src/test/java/org/apache/kafka/tools/streams/DescribeStreamsGroupTest.java
+++ 
b/tools/src/test/java/org/apache/kafka/tools/streams/DescribeStreamsGroupTest.java
@@ -116,6 +116,7 @@ public class DescribeStreamsGroupTest {
             getStreamsGroupService(args);
         } finally {
             assertTrue(exited.get());
+            Exit.resetExitProcedure();
         }
     }
 
@@ -290,7 +291,7 @@ public class DescribeStreamsGroupTest {
     }
 
     private static void validateDescribeOutput(List<String> args, String 
errorMessage) {
-        String output = ToolsTestUtils.grabConsoleOutput(() -> 
StreamsGroupCommand.main(args.toArray(new String[0])));
+        String output = ToolsTestUtils.grabConsoleOutput(() -> assertEquals(1, 
StreamsGroupCommand.execute(args.toArray(new String[0]))));
         assertEquals(errorMessage, output.trim());
     }
 
@@ -302,7 +303,7 @@ public class DescribeStreamsGroupTest {
     ) throws InterruptedException {
         final AtomicReference<String> out = new AtomicReference<>("");
         TestUtils.waitForCondition(() -> {
-            String output = ToolsTestUtils.grabConsoleOutput(() -> 
StreamsGroupCommand.main(args.toArray(new String[0])));
+            String output = ToolsTestUtils.grabConsoleOutput(() -> 
assertEquals(0, StreamsGroupCommand.execute(args.toArray(new String[0]))));
             out.set(output);
 
             String[] lines = output.split("\n");
@@ -337,7 +338,7 @@ public class DescribeStreamsGroupTest {
     ) throws InterruptedException {
         final AtomicReference<String> out = new AtomicReference<>("");
         TestUtils.waitForCondition(() -> {
-            String output = ToolsTestUtils.grabConsoleOutput(() -> 
StreamsGroupCommand.main(args.toArray(new String[0])));
+            String output = ToolsTestUtils.grabConsoleOutput(() -> 
assertEquals(0, StreamsGroupCommand.execute(args.toArray(new String[0]))));
             out.set(output);
 
             String[] lines = output.split("\n");
diff --git 
a/tools/src/test/java/org/apache/kafka/tools/streams/ListStreamsGroupTest.java 
b/tools/src/test/java/org/apache/kafka/tools/streams/ListStreamsGroupTest.java
index 57dc8941f2b..ecbab7b7269 100644
--- 
a/tools/src/test/java/org/apache/kafka/tools/streams/ListStreamsGroupTest.java
+++ 
b/tools/src/test/java/org/apache/kafka/tools/streams/ListStreamsGroupTest.java
@@ -56,6 +56,7 @@ import java.util.stream.Collectors;
 import joptsimple.OptionException;
 
 import static 
org.apache.kafka.streams.integration.utils.IntegrationTestUtils.startApplicationAndWaitUntilRunning;
+import static org.junit.jupiter.api.Assertions.assertEquals;
 
 @Timeout(600)
 @Tag("integration")
@@ -223,7 +224,7 @@ public class ListStreamsGroupTest {
     ) throws InterruptedException {
         final AtomicReference<String> out = new AtomicReference<>("");
         TestUtils.waitForCondition(() -> {
-            String output = ToolsTestUtils.grabConsoleOutput(() -> 
StreamsGroupCommand.main(args.toArray(new String[0])));
+            String output = ToolsTestUtils.grabConsoleOutput(() -> 
assertEquals(0, StreamsGroupCommand.execute(args.toArray(new String[0]))));
             out.set(output);
 
             String[] lines = output.split("\n");
diff --git 
a/tools/src/test/java/org/apache/kafka/tools/streams/ResetStreamsGroupOffsetTest.java
 
b/tools/src/test/java/org/apache/kafka/tools/streams/ResetStreamsGroupOffsetTest.java
index ed888abe378..d5a86c70305 100644
--- 
a/tools/src/test/java/org/apache/kafka/tools/streams/ResetStreamsGroupOffsetTest.java
+++ 
b/tools/src/test/java/org/apache/kafka/tools/streams/ResetStreamsGroupOffsetTest.java
@@ -152,6 +152,7 @@ public class ResetStreamsGroupOffsetTest {
             getStreamsGroupService(args);
         } finally {
             assertTrue(exited.get());
+            Exit.resetExitProcedure();
         }
     }
 
@@ -168,6 +169,7 @@ public class ResetStreamsGroupOffsetTest {
             getStreamsGroupService(args);
         } finally {
             assertTrue(exited.get());
+            Exit.resetExitProcedure();
         }
     }
 
diff --git 
a/tools/src/test/java/org/apache/kafka/tools/streams/StreamsGroupCommandTest.java
 
b/tools/src/test/java/org/apache/kafka/tools/streams/StreamsGroupCommandTest.java
index c91a4b2090b..cd1b66d993f 100644
--- 
a/tools/src/test/java/org/apache/kafka/tools/streams/StreamsGroupCommandTest.java
+++ 
b/tools/src/test/java/org/apache/kafka/tools/streams/StreamsGroupCommandTest.java
@@ -17,7 +17,6 @@
 package org.apache.kafka.tools.streams;
 
 import org.apache.kafka.clients.admin.Admin;
-import org.apache.kafka.clients.admin.AdminClientTestUtils;
 import org.apache.kafka.clients.admin.DeleteStreamsGroupsOptions;
 import org.apache.kafka.clients.admin.DeleteStreamsGroupsResult;
 import org.apache.kafka.clients.admin.DeleteTopicsResult;
@@ -49,10 +48,10 @@ import org.apache.kafka.test.TestUtils;
 
 import org.junit.jupiter.api.Test;
 import org.mockito.ArgumentMatchers;
+import org.mockito.MockedStatic;
 
 import java.util.ArrayList;
 import java.util.Arrays;
-import java.util.Collection;
 import java.util.HashMap;
 import java.util.HashSet;
 import java.util.List;
@@ -60,9 +59,9 @@ import java.util.Map;
 import java.util.Objects;
 import java.util.Optional;
 import java.util.Set;
+import java.util.concurrent.ExecutionException;
 import java.util.function.Function;
 import java.util.stream.Collectors;
-import java.util.stream.IntStream;
 
 import joptsimple.OptionException;
 
@@ -77,6 +76,7 @@ import static org.mockito.ArgumentMatchers.any;
 import static org.mockito.ArgumentMatchers.anyCollection;
 import static org.mockito.ArgumentMatchers.eq;
 import static org.mockito.Mockito.mock;
+import static org.mockito.Mockito.mockStatic;
 import static org.mockito.Mockito.times;
 import static org.mockito.Mockito.verify;
 import static org.mockito.Mockito.when;
@@ -477,6 +477,44 @@ public class StreamsGroupCommandTest {
         service.close();
     }
 
+    @Test
+    public void testExitCodeOnInvalidOption() {
+        String[] args = new String[]{"--invalid-option"};
+        assertEquals(1, StreamsGroupCommand.execute(args));
+    }
+
+    @Test
+    public void testExitCodeOnIllegalArguments() {
+        String[] args = new String[]{"--bootstrap-server", BOOTSTRAP_SERVERS};
+        assertEquals(1, StreamsGroupCommand.execute(args));
+    }
+
+    @Test
+    public void testExitCodeOnExecutionException() {
+        try (MockedStatic<StreamsGroupCommand> mockedStreamGroupCommand = 
mockStatic(StreamsGroupCommand.class)) {
+            String[] args = new String[]{"--bootstrap-server", 
BOOTSTRAP_SERVERS, "--list"};
+            mockedStreamGroupCommand.when(() -> 
StreamsGroupCommand.execute(any(String[].class))).thenCallRealMethod();
+            mockedStreamGroupCommand.when(() -> 
StreamsGroupCommand.run(any(StreamsGroupCommandOptions.class))).thenThrow(new 
ExecutionException("ExecutionException", new RuntimeException()));
+            
+            assertEquals(1, StreamsGroupCommand.execute(args));
+            
+            mockedStreamGroupCommand.verify(() -> 
StreamsGroupCommand.run(any(StreamsGroupCommandOptions.class)));
+        }
+    }
+
+    @Test
+    public void testExitCodeOnInterruptedException() {
+        try (MockedStatic<StreamsGroupCommand> mockedStreamGroupCommand = 
mockStatic(StreamsGroupCommand.class)) {
+            String[] args = new String[]{"--bootstrap-server", 
BOOTSTRAP_SERVERS, "--list"};
+            mockedStreamGroupCommand.when(() -> 
StreamsGroupCommand.execute(any(String[].class))).thenCallRealMethod();
+            mockedStreamGroupCommand.when(() -> 
StreamsGroupCommand.run(any(StreamsGroupCommandOptions.class))).thenThrow(new 
InterruptedException("InterruptedException"));
+            
+            assertEquals(1, StreamsGroupCommand.execute(args));
+            
+            mockedStreamGroupCommand.verify(() -> 
StreamsGroupCommand.run(any(StreamsGroupCommandOptions.class)));
+        }
+    }
+
     private ListGroupsResult listGroupResult(String groupId) {
         ListGroupsResult listGroupsResult = mock(ListGroupsResult.class);
         
when(listGroupsResult.all()).thenReturn(KafkaFuture.completedFuture(List.of(
@@ -529,14 +567,6 @@ public class StreamsGroupCommandTest {
         return new DescribeStreamsGroupsResult(Map.of(groupId, future));
     }
 
-    private DescribeTopicsResult describeTopicsResult(Collection<String> 
topics, int numOfPartitions) {
-        var topicDescriptions = 
topics.stream().collect(Collectors.toMap(Function.identity(),
-            topic -> new TopicDescription(topic, false, IntStream.range(0, 
numOfPartitions)
-                .mapToObj(i -> new TopicPartitionInfo(i, null, List.of(), 
List.of()))
-                .toList())));
-        return AdminClientTestUtils.describeTopicsResult(topicDescriptions);
-    }
-
     private ListOffsetsResult listOffsetsResult() {
         List<TopicPartition> topicPartitions = new ArrayList<>();
         topicPartitions.add(new TopicPartition("topic1", 0));

Reply via email to