This is an automated email from the ASF dual-hosted git repository.

chia7712 pushed a commit to branch 4.2
in repository https://gitbox.apache.org/repos/asf/kafka.git


The following commit(s) were added to refs/heads/4.2 by this push:
     new fe8160e52cc MINOR: Require --dry-run or --execute for reset-offsets in 
streams-groups tool (#20933)
fe8160e52cc is described below

commit fe8160e52cc79225466fabee1b4f6964ab2e4c4d
Author: Ming-Yen Chung <[email protected]>
AuthorDate: Sat Nov 22 12:06:26 2025 +0800

    MINOR: Require --dry-run or --execute for reset-offsets in streams-groups 
tool (#20933)
    
    As discussed in
    https://github.com/apache/kafka/pull/20867#discussion_r2520631578 the
    streams-groups tool (KIP-1071) states:
    
    > Fails if neither '--dry-run' nor '--execute' is specified.
    
    This commit corrects the behavior in the **streams** group tool to align
    with this specification.
    
    Additionally, fixes `RESET_OFFSETS_DOC` for the **share** group option
    because the **share** group tool also requires one of these options.
    
    Reviewers: Chia-Ping Tsai <[email protected]>
---
 .../group/ConsumerGroupCommandOptions.java         |  6 ++--
 .../consumer/group/ShareGroupCommandOptions.java   |  5 ++--
 .../tools/streams/StreamsGroupCommandOptions.java  | 13 ++++-----
 .../tools/streams/ResetStreamsGroupOffsetTest.java | 33 ++++++++++++++++------
 4 files changed, 35 insertions(+), 22 deletions(-)

diff --git 
a/tools/src/main/java/org/apache/kafka/tools/consumer/group/ConsumerGroupCommandOptions.java
 
b/tools/src/main/java/org/apache/kafka/tools/consumer/group/ConsumerGroupCommandOptions.java
index 2d51d0525ac..94cc72aa097 100644
--- 
a/tools/src/main/java/org/apache/kafka/tools/consumer/group/ConsumerGroupCommandOptions.java
+++ 
b/tools/src/main/java/org/apache/kafka/tools/consumer/group/ConsumerGroupCommandOptions.java
@@ -249,9 +249,9 @@ public class ConsumerGroupCommandOptions extends 
CommandDefaultOptions {
 
             if (!options.has(dryRunOpt) && !options.has(executeOpt)) {
                 System.err.println("WARN: No action will be performed as the 
--execute option is missing. " +
-                    "In a future major release, the default behavior of this 
command will be to prompt the user before " +
-                    "executing the reset rather than doing a dry run. You 
should add the --dry-run option explicitly " +
-                    "if you are scripting this command and want to keep the 
current default behavior without prompting.");
+                        "In version 5.0, this command will require either 
--dry-run or --execute to be specified. " +
+                        "You should add the --dry-run option explicitly if you 
are scripting this command and want to " +
+                        "keep the current default behavior without 
prompting.");
             }
 
             if (!options.has(groupOpt) && !options.has(allGroupsOpt))
diff --git 
a/tools/src/main/java/org/apache/kafka/tools/consumer/group/ShareGroupCommandOptions.java
 
b/tools/src/main/java/org/apache/kafka/tools/consumer/group/ShareGroupCommandOptions.java
index e85822d4971..8bb50ba3446 100644
--- 
a/tools/src/main/java/org/apache/kafka/tools/consumer/group/ShareGroupCommandOptions.java
+++ 
b/tools/src/main/java/org/apache/kafka/tools/consumer/group/ShareGroupCommandOptions.java
@@ -47,9 +47,10 @@ public class ShareGroupCommandOptions extends 
CommandDefaultOptions {
         "to specify the maximum amount of time in milliseconds to wait before 
the group stabilizes.";
     private static final String COMMAND_CONFIG_DOC = "Property file containing 
configs to be passed to Admin Client.";
     private static final String RESET_OFFSETS_DOC = "Reset offsets of share 
group. Supports one share group at the time, and instances must be inactive." + 
NL +
-        "Has 2 execution options: --dry-run (the default) to plan which 
offsets to reset, and --execute to reset the offsets. " + NL +
+        "Has 2 execution options: --dry-run to plan which offsets to reset, 
and --execute to reset the offsets. " + NL +
         "You must choose one of the following reset specifications: 
--to-datetime, --to-earliest, --to-latest." + NL +
-        "To define the scope use --all-topics or --topic.";
+        "To define the scope use --all-topics or --topic." + NL +
+        "Fails if neither '--dry-run' nor '–execute' is specified.";
     private static final String DRY_RUN_DOC = "Only show results without 
executing changes on share groups. Supported operations: reset-offsets.";
     private static final String EXECUTE_DOC = "Execute operation. Supported 
operations: reset-offsets.";
     private static final String RESET_TO_DATETIME_DOC = "Reset offsets to 
offset from datetime. Format: 'YYYY-MM-DDThh:mm:ss.sss'";
diff --git 
a/tools/src/main/java/org/apache/kafka/tools/streams/StreamsGroupCommandOptions.java
 
b/tools/src/main/java/org/apache/kafka/tools/streams/StreamsGroupCommandOptions.java
index a69e0bbe3a4..058485727e5 100644
--- 
a/tools/src/main/java/org/apache/kafka/tools/streams/StreamsGroupCommandOptions.java
+++ 
b/tools/src/main/java/org/apache/kafka/tools/streams/StreamsGroupCommandOptions.java
@@ -51,11 +51,12 @@ public class StreamsGroupCommandOptions extends 
CommandDefaultOptions {
     private static final String OFFSETS_DOC = "Describe the group and list all 
topic partitions in the group along with their offset information." +
         "This is the default sub-action and may be used with the '--describe' 
option only.";
     private static final String RESET_OFFSETS_DOC = "Reset offsets of streams 
group. The instances should be inactive." + NL +
-        "Has 2 execution options: --dry-run (the default) to plan which 
offsets to reset, and --execute to update the offsets." + NL +
+        "Has 2 execution options: --dry-run to plan which offsets to reset, 
and --execute to update the offsets." + NL +
         "If you use --execute, all internal topics linked to the group will 
also be deleted." + NL +
         "You must choose one of the following reset specifications: 
--to-datetime, --by-duration, --to-earliest, " +
         "--to-latest, --shift-by, --from-file, --to-current, --to-offset." + 
NL +
-        "To define the scope use --all-input-topics or --input-topic. One 
scope must be specified unless you use '--from-file'.";
+        "To define the scope use --all-input-topics or --input-topic. One 
scope must be specified unless you use '--from-file'." + NL +
+        "Fails if neither '--dry-run' nor '–execute' is specified.";
     private static final String DRY_RUN_DOC = "Only show results without 
executing changes on streams group. Supported operations: reset-offsets.";
     private static final String EXECUTE_DOC = "Execute operation. Supported 
operations: reset-offsets.";
     private static final String EXPORT_DOC = "Export operation execution to a 
CSV file. Supported operations: reset-offsets.";
@@ -269,12 +270,8 @@ public class StreamsGroupCommandOptions extends 
CommandDefaultOptions {
         if (options.has(dryRunOpt) && options.has(executeOpt))
             CommandLineUtils.printUsageAndExit(parser, "Option " + 
resetOffsetsOpt + " only accepts one of " + executeOpt + " and " + dryRunOpt);
 
-        if (!options.has(dryRunOpt) && !options.has(executeOpt)) {
-            System.err.println("WARN: No action will be performed as the 
--execute option is missing. " +
-                "In a future major release, the default behavior of this 
command will be to prompt the user before " +
-                "executing the reset rather than doing a dry run. You should 
add the --dry-run option explicitly " +
-                "if you are scripting this command and want to keep the 
current default behavior without prompting.");
-        }
+        if (!options.has(dryRunOpt) && !options.has(executeOpt))
+            CommandLineUtils.printUsageAndExit(parser, "Option " + 
resetOffsetsOpt + " takes the option: " + executeOpt + " or " + dryRunOpt);
 
         if (!options.has(groupOpt) && !options.has(allGroupsOpt))
             CommandLineUtils.printUsageAndExit(parser,
diff --git 
a/tools/src/test/java/org/apache/kafka/tools/streams/ResetStreamsGroupOffsetTest.java
 
b/tools/src/test/java/org/apache/kafka/tools/streams/ResetStreamsGroupOffsetTest.java
index d5a86c70305..219335dc200 100644
--- 
a/tools/src/test/java/org/apache/kafka/tools/streams/ResetStreamsGroupOffsetTest.java
+++ 
b/tools/src/test/java/org/apache/kafka/tools/streams/ResetStreamsGroupOffsetTest.java
@@ -141,7 +141,7 @@ public class ResetStreamsGroupOffsetTest {
 
     @Test
     public void testResetOffsetsWithoutGroupOption() {
-        final String[] args = new String[]{"--bootstrap-server", 
bootstrapServers, "--reset-offsets", "--to-offset", "5"};
+        final String[] args = new String[]{"--bootstrap-server", 
bootstrapServers, "--reset-offsets", "--dry-run", "--to-offset", "5"};
         AtomicBoolean exited = new AtomicBoolean(false);
         Exit.setExitProcedure(((statusCode, message) -> {
             assertNotEquals(0, statusCode);
@@ -156,9 +156,26 @@ public class ResetStreamsGroupOffsetTest {
         }
     }
 
+    @Test
+    public void testResetOffsetsWithoutDryRunOrExecuteOption() {
+        final String[] args = new String[]{"--bootstrap-server", 
bootstrapServers, "--reset-offsets", "--all-groups", "--all-input-topics", 
"--to-offset", "5"};
+        AtomicBoolean exited = new AtomicBoolean(false);
+        Exit.setExitProcedure(((statusCode, message) -> {
+            assertNotEquals(0, statusCode);
+            assertTrue(message.contains("Option [reset-offsets] takes the 
option: [execute] or [dry-run]"));
+            exited.set(true);
+        }));
+        try {
+            getStreamsGroupService(args);
+        } finally {
+            assertTrue(exited.get());
+            Exit.resetExitProcedure();
+        }
+    }
+
     @Test
     public void testResetOffsetsWithDeleteInternalTopicsOption() {
-        final String[] args = new String[]{"--bootstrap-server", 
bootstrapServers, "--reset-offsets", "--all-groups", "--all-input-topics", 
"--to-offset", "5", "--delete-all-internal-topics"};
+        final String[] args = new String[]{"--bootstrap-server", 
bootstrapServers, "--reset-offsets", "--dry-run", "--all-groups", 
"--all-input-topics", "--to-offset", "5", "--delete-all-internal-topics"};
         AtomicBoolean exited = new AtomicBoolean(false);
         Exit.setExitProcedure(((statusCode, message) -> {
             assertNotEquals(0, statusCode);
@@ -247,7 +264,7 @@ public class ResetStreamsGroupOffsetTest {
         resetOffsetsAndAssertForDryRunAndExecute(args, appId, topic1, 20L, 0L, 
0, 1);
 
         // export to file
-        args = new String[]{"--bootstrap-server", bootstrapServers, 
"--reset-offsets", "--group", appId, "--input-topic", topic1, "--to-offset", 
"5", "--export"};
+        args = new String[]{"--bootstrap-server", bootstrapServers, 
"--reset-offsets", "--dry-run", "--group", appId, "--input-topic", topic1, 
"--to-offset", "5", "--export"};
         File file = TestUtils.tempFile("reset", ".csv");
         Map<TopicPartition, Long> exp = Map.of(new TopicPartition(topic1, 0), 
5L, new TopicPartition(topic1, 1), 5L);
         try (StreamsGroupCommand.StreamsGroupService service = 
getStreamsGroupService(args)) {
@@ -256,7 +273,7 @@ public class ResetStreamsGroupOffsetTest {
 
             assertEquals(exp, toOffsetMap(exportedOffsets.get(appId)));
         }
-        args = new String[]{"--bootstrap-server", bootstrapServers, 
"--reset-offsets", "--group", appId, "--input-topic", topic1, "--from-file", 
file.getCanonicalPath()};
+        args = new String[]{"--bootstrap-server", bootstrapServers, 
"--reset-offsets", "--dry-run", "--group", appId, "--input-topic", topic1, 
"--from-file", file.getCanonicalPath()};
         try (StreamsGroupCommand.StreamsGroupService service = 
getStreamsGroupService(args)) {
             Map<String, Map<TopicPartition, OffsetAndMetadata>> 
importedOffsets = service.resetOffsets();
             assertEquals(exp, toOffsetMap(importedOffsets.get(appId)));
@@ -279,7 +296,7 @@ public class ResetStreamsGroupOffsetTest {
             new TopicPartition(topic1, 1), 5L,
             new TopicPartition(topic2, 1), 5L);
 
-        resetOffsetsAndAssert(args, appId, List.of(topic1, topic2), 
expectedOffsets,
+        resetOffsetsAndAssert(addTo(args, "--dry-run"), appId, List.of(topic1, 
topic2), expectedOffsets,
             Map.of(
                 new TopicPartition(topic1, 0), 10L,
                 new TopicPartition(topic1, 1), 10L,
@@ -307,7 +324,7 @@ public class ResetStreamsGroupOffsetTest {
         resetForNextTest(appId, 10L, topic1, topic2);
 
         // export to file
-        args = new String[]{"--bootstrap-server", bootstrapServers, 
"--reset-offsets", "--group", appId, "--all-input-topics", "--to-offset", "5", 
"--export"};
+        args = new String[]{"--bootstrap-server", bootstrapServers, 
"--reset-offsets", "--dry-run", "--group", appId, "--all-input-topics", 
"--to-offset", "5", "--export"};
         file = TestUtils.tempFile("reset-all", ".csv");
         exp = Map.of(new TopicPartition(topic1, 0), 5L,
             new TopicPartition(topic1, 1), 5L,
@@ -319,7 +336,7 @@ public class ResetStreamsGroupOffsetTest {
 
             assertEquals(exp, toOffsetMap(exportedOffsets.get(appId)));
         }
-        args = new String[]{"--bootstrap-server", bootstrapServers, 
"--reset-offsets", "--group", appId, "--input-topic", topic1, "--from-file", 
file.getCanonicalPath()};
+        args = new String[]{"--bootstrap-server", bootstrapServers, 
"--reset-offsets", "--dry-run", "--group", appId, "--input-topic", topic1, 
"--from-file", file.getCanonicalPath()};
         try (StreamsGroupCommand.StreamsGroupService service = 
getStreamsGroupService(args)) {
             Map<String, Map<TopicPartition, OffsetAndMetadata>> 
importedOffsets = service.resetOffsets();
 
@@ -577,7 +594,6 @@ public class ResetStreamsGroupOffsetTest {
                                                           long expectedOffset,
                                                           long 
expectedCommittedOffset,
                                                           int... partitions) 
throws ExecutionException, InterruptedException {
-        resetOffsetsAndAssert(args, appId, topic, expectedOffset, 
expectedCommittedOffset, partitions);
         resetOffsetsAndAssert(addTo(args, "--dry-run"), appId, topic,  
expectedOffset, expectedCommittedOffset, partitions);
         resetOffsetsAndAssert(addTo(args, "--execute"), appId, topic, 
expectedOffset, expectedOffset, partitions);
     }
@@ -588,7 +604,6 @@ public class ResetStreamsGroupOffsetTest {
                                        String topic2,
                                        long expectedOffset,
                                        long expectedCommittedOffset) throws 
ExecutionException, InterruptedException {
-        resetOffsetsAndAssert(args, appId, topic1, topic2, expectedOffset, 
expectedCommittedOffset);
         resetOffsetsAndAssert(addTo(args, "--dry-run"), appId, topic1, topic2, 
expectedOffset, expectedCommittedOffset);
         resetOffsetsAndAssert(addTo(args, "--execute"), appId, topic1, topic2, 
expectedOffset, expectedOffset);
     }

Reply via email to