This is an automated email from the ASF dual-hosted git repository.
chia7712 pushed a commit to branch 4.2
in repository https://gitbox.apache.org/repos/asf/kafka.git
The following commit(s) were added to refs/heads/4.2 by this push:
new fe8160e52cc MINOR: Require --dry-run or --execute for reset-offsets in
streams-groups tool (#20933)
fe8160e52cc is described below
commit fe8160e52cc79225466fabee1b4f6964ab2e4c4d
Author: Ming-Yen Chung <[email protected]>
AuthorDate: Sat Nov 22 12:06:26 2025 +0800
MINOR: Require --dry-run or --execute for reset-offsets in streams-groups
tool (#20933)
As discussed in
https://github.com/apache/kafka/pull/20867#discussion_r2520631578 the
streams-groups tool (KIP-1071) states:
> Fails if neither '--dry-run' nor '--execute' is specified.
This commit corrects the behavior in the **streams** group tool to align
with this specification.
Additionally, fixes `RESET_OFFSETS_DOC` for the **share** group option
because the **share** group tool also requires one of these options.
Reviewers: Chia-Ping Tsai <[email protected]>
---
.../group/ConsumerGroupCommandOptions.java | 6 ++--
.../consumer/group/ShareGroupCommandOptions.java | 5 ++--
.../tools/streams/StreamsGroupCommandOptions.java | 13 ++++-----
.../tools/streams/ResetStreamsGroupOffsetTest.java | 33 ++++++++++++++++------
4 files changed, 35 insertions(+), 22 deletions(-)
diff --git
a/tools/src/main/java/org/apache/kafka/tools/consumer/group/ConsumerGroupCommandOptions.java
b/tools/src/main/java/org/apache/kafka/tools/consumer/group/ConsumerGroupCommandOptions.java
index 2d51d0525ac..94cc72aa097 100644
---
a/tools/src/main/java/org/apache/kafka/tools/consumer/group/ConsumerGroupCommandOptions.java
+++
b/tools/src/main/java/org/apache/kafka/tools/consumer/group/ConsumerGroupCommandOptions.java
@@ -249,9 +249,9 @@ public class ConsumerGroupCommandOptions extends
CommandDefaultOptions {
if (!options.has(dryRunOpt) && !options.has(executeOpt)) {
System.err.println("WARN: No action will be performed as the
--execute option is missing. " +
- "In a future major release, the default behavior of this
command will be to prompt the user before " +
- "executing the reset rather than doing a dry run. You
should add the --dry-run option explicitly " +
- "if you are scripting this command and want to keep the
current default behavior without prompting.");
+ "In version 5.0, this command will require either
--dry-run or --execute to be specified. " +
+ "You should add the --dry-run option explicitly if you
are scripting this command and want to " +
+ "keep the current default behavior without
prompting.");
}
if (!options.has(groupOpt) && !options.has(allGroupsOpt))
diff --git
a/tools/src/main/java/org/apache/kafka/tools/consumer/group/ShareGroupCommandOptions.java
b/tools/src/main/java/org/apache/kafka/tools/consumer/group/ShareGroupCommandOptions.java
index e85822d4971..8bb50ba3446 100644
---
a/tools/src/main/java/org/apache/kafka/tools/consumer/group/ShareGroupCommandOptions.java
+++
b/tools/src/main/java/org/apache/kafka/tools/consumer/group/ShareGroupCommandOptions.java
@@ -47,9 +47,10 @@ public class ShareGroupCommandOptions extends
CommandDefaultOptions {
"to specify the maximum amount of time in milliseconds to wait before
the group stabilizes.";
private static final String COMMAND_CONFIG_DOC = "Property file containing
configs to be passed to Admin Client.";
private static final String RESET_OFFSETS_DOC = "Reset offsets of share
group. Supports one share group at the time, and instances must be inactive." +
NL +
- "Has 2 execution options: --dry-run (the default) to plan which
offsets to reset, and --execute to reset the offsets. " + NL +
+ "Has 2 execution options: --dry-run to plan which offsets to reset,
and --execute to reset the offsets. " + NL +
"You must choose one of the following reset specifications:
--to-datetime, --to-earliest, --to-latest." + NL +
- "To define the scope use --all-topics or --topic.";
+ "To define the scope use --all-topics or --topic." + NL +
+ "Fails if neither '--dry-run' nor '–execute' is specified.";
private static final String DRY_RUN_DOC = "Only show results without
executing changes on share groups. Supported operations: reset-offsets.";
private static final String EXECUTE_DOC = "Execute operation. Supported
operations: reset-offsets.";
private static final String RESET_TO_DATETIME_DOC = "Reset offsets to
offset from datetime. Format: 'YYYY-MM-DDThh:mm:ss.sss'";
diff --git
a/tools/src/main/java/org/apache/kafka/tools/streams/StreamsGroupCommandOptions.java
b/tools/src/main/java/org/apache/kafka/tools/streams/StreamsGroupCommandOptions.java
index a69e0bbe3a4..058485727e5 100644
---
a/tools/src/main/java/org/apache/kafka/tools/streams/StreamsGroupCommandOptions.java
+++
b/tools/src/main/java/org/apache/kafka/tools/streams/StreamsGroupCommandOptions.java
@@ -51,11 +51,12 @@ public class StreamsGroupCommandOptions extends
CommandDefaultOptions {
private static final String OFFSETS_DOC = "Describe the group and list all
topic partitions in the group along with their offset information." +
"This is the default sub-action and may be used with the '--describe'
option only.";
private static final String RESET_OFFSETS_DOC = "Reset offsets of streams
group. The instances should be inactive." + NL +
- "Has 2 execution options: --dry-run (the default) to plan which
offsets to reset, and --execute to update the offsets." + NL +
+ "Has 2 execution options: --dry-run to plan which offsets to reset,
and --execute to update the offsets." + NL +
"If you use --execute, all internal topics linked to the group will
also be deleted." + NL +
"You must choose one of the following reset specifications:
--to-datetime, --by-duration, --to-earliest, " +
"--to-latest, --shift-by, --from-file, --to-current, --to-offset." +
NL +
- "To define the scope use --all-input-topics or --input-topic. One
scope must be specified unless you use '--from-file'.";
+ "To define the scope use --all-input-topics or --input-topic. One
scope must be specified unless you use '--from-file'." + NL +
+ "Fails if neither '--dry-run' nor '–execute' is specified.";
private static final String DRY_RUN_DOC = "Only show results without
executing changes on streams group. Supported operations: reset-offsets.";
private static final String EXECUTE_DOC = "Execute operation. Supported
operations: reset-offsets.";
private static final String EXPORT_DOC = "Export operation execution to a
CSV file. Supported operations: reset-offsets.";
@@ -269,12 +270,8 @@ public class StreamsGroupCommandOptions extends
CommandDefaultOptions {
if (options.has(dryRunOpt) && options.has(executeOpt))
CommandLineUtils.printUsageAndExit(parser, "Option " +
resetOffsetsOpt + " only accepts one of " + executeOpt + " and " + dryRunOpt);
- if (!options.has(dryRunOpt) && !options.has(executeOpt)) {
- System.err.println("WARN: No action will be performed as the
--execute option is missing. " +
- "In a future major release, the default behavior of this
command will be to prompt the user before " +
- "executing the reset rather than doing a dry run. You should
add the --dry-run option explicitly " +
- "if you are scripting this command and want to keep the
current default behavior without prompting.");
- }
+ if (!options.has(dryRunOpt) && !options.has(executeOpt))
+ CommandLineUtils.printUsageAndExit(parser, "Option " +
resetOffsetsOpt + " takes the option: " + executeOpt + " or " + dryRunOpt);
if (!options.has(groupOpt) && !options.has(allGroupsOpt))
CommandLineUtils.printUsageAndExit(parser,
diff --git
a/tools/src/test/java/org/apache/kafka/tools/streams/ResetStreamsGroupOffsetTest.java
b/tools/src/test/java/org/apache/kafka/tools/streams/ResetStreamsGroupOffsetTest.java
index d5a86c70305..219335dc200 100644
---
a/tools/src/test/java/org/apache/kafka/tools/streams/ResetStreamsGroupOffsetTest.java
+++
b/tools/src/test/java/org/apache/kafka/tools/streams/ResetStreamsGroupOffsetTest.java
@@ -141,7 +141,7 @@ public class ResetStreamsGroupOffsetTest {
@Test
public void testResetOffsetsWithoutGroupOption() {
- final String[] args = new String[]{"--bootstrap-server",
bootstrapServers, "--reset-offsets", "--to-offset", "5"};
+ final String[] args = new String[]{"--bootstrap-server",
bootstrapServers, "--reset-offsets", "--dry-run", "--to-offset", "5"};
AtomicBoolean exited = new AtomicBoolean(false);
Exit.setExitProcedure(((statusCode, message) -> {
assertNotEquals(0, statusCode);
@@ -156,9 +156,26 @@ public class ResetStreamsGroupOffsetTest {
}
}
+ @Test
+ public void testResetOffsetsWithoutDryRunOrExecuteOption() {
+ final String[] args = new String[]{"--bootstrap-server",
bootstrapServers, "--reset-offsets", "--all-groups", "--all-input-topics",
"--to-offset", "5"};
+ AtomicBoolean exited = new AtomicBoolean(false);
+ Exit.setExitProcedure(((statusCode, message) -> {
+ assertNotEquals(0, statusCode);
+ assertTrue(message.contains("Option [reset-offsets] takes the
option: [execute] or [dry-run]"));
+ exited.set(true);
+ }));
+ try {
+ getStreamsGroupService(args);
+ } finally {
+ assertTrue(exited.get());
+ Exit.resetExitProcedure();
+ }
+ }
+
@Test
public void testResetOffsetsWithDeleteInternalTopicsOption() {
- final String[] args = new String[]{"--bootstrap-server",
bootstrapServers, "--reset-offsets", "--all-groups", "--all-input-topics",
"--to-offset", "5", "--delete-all-internal-topics"};
+ final String[] args = new String[]{"--bootstrap-server",
bootstrapServers, "--reset-offsets", "--dry-run", "--all-groups",
"--all-input-topics", "--to-offset", "5", "--delete-all-internal-topics"};
AtomicBoolean exited = new AtomicBoolean(false);
Exit.setExitProcedure(((statusCode, message) -> {
assertNotEquals(0, statusCode);
@@ -247,7 +264,7 @@ public class ResetStreamsGroupOffsetTest {
resetOffsetsAndAssertForDryRunAndExecute(args, appId, topic1, 20L, 0L,
0, 1);
// export to file
- args = new String[]{"--bootstrap-server", bootstrapServers,
"--reset-offsets", "--group", appId, "--input-topic", topic1, "--to-offset",
"5", "--export"};
+ args = new String[]{"--bootstrap-server", bootstrapServers,
"--reset-offsets", "--dry-run", "--group", appId, "--input-topic", topic1,
"--to-offset", "5", "--export"};
File file = TestUtils.tempFile("reset", ".csv");
Map<TopicPartition, Long> exp = Map.of(new TopicPartition(topic1, 0),
5L, new TopicPartition(topic1, 1), 5L);
try (StreamsGroupCommand.StreamsGroupService service =
getStreamsGroupService(args)) {
@@ -256,7 +273,7 @@ public class ResetStreamsGroupOffsetTest {
assertEquals(exp, toOffsetMap(exportedOffsets.get(appId)));
}
- args = new String[]{"--bootstrap-server", bootstrapServers,
"--reset-offsets", "--group", appId, "--input-topic", topic1, "--from-file",
file.getCanonicalPath()};
+ args = new String[]{"--bootstrap-server", bootstrapServers,
"--reset-offsets", "--dry-run", "--group", appId, "--input-topic", topic1,
"--from-file", file.getCanonicalPath()};
try (StreamsGroupCommand.StreamsGroupService service =
getStreamsGroupService(args)) {
Map<String, Map<TopicPartition, OffsetAndMetadata>>
importedOffsets = service.resetOffsets();
assertEquals(exp, toOffsetMap(importedOffsets.get(appId)));
@@ -279,7 +296,7 @@ public class ResetStreamsGroupOffsetTest {
new TopicPartition(topic1, 1), 5L,
new TopicPartition(topic2, 1), 5L);
- resetOffsetsAndAssert(args, appId, List.of(topic1, topic2),
expectedOffsets,
+ resetOffsetsAndAssert(addTo(args, "--dry-run"), appId, List.of(topic1,
topic2), expectedOffsets,
Map.of(
new TopicPartition(topic1, 0), 10L,
new TopicPartition(topic1, 1), 10L,
@@ -307,7 +324,7 @@ public class ResetStreamsGroupOffsetTest {
resetForNextTest(appId, 10L, topic1, topic2);
// export to file
- args = new String[]{"--bootstrap-server", bootstrapServers,
"--reset-offsets", "--group", appId, "--all-input-topics", "--to-offset", "5",
"--export"};
+ args = new String[]{"--bootstrap-server", bootstrapServers,
"--reset-offsets", "--dry-run", "--group", appId, "--all-input-topics",
"--to-offset", "5", "--export"};
file = TestUtils.tempFile("reset-all", ".csv");
exp = Map.of(new TopicPartition(topic1, 0), 5L,
new TopicPartition(topic1, 1), 5L,
@@ -319,7 +336,7 @@ public class ResetStreamsGroupOffsetTest {
assertEquals(exp, toOffsetMap(exportedOffsets.get(appId)));
}
- args = new String[]{"--bootstrap-server", bootstrapServers,
"--reset-offsets", "--group", appId, "--input-topic", topic1, "--from-file",
file.getCanonicalPath()};
+ args = new String[]{"--bootstrap-server", bootstrapServers,
"--reset-offsets", "--dry-run", "--group", appId, "--input-topic", topic1,
"--from-file", file.getCanonicalPath()};
try (StreamsGroupCommand.StreamsGroupService service =
getStreamsGroupService(args)) {
Map<String, Map<TopicPartition, OffsetAndMetadata>>
importedOffsets = service.resetOffsets();
@@ -577,7 +594,6 @@ public class ResetStreamsGroupOffsetTest {
long expectedOffset,
long
expectedCommittedOffset,
int... partitions)
throws ExecutionException, InterruptedException {
- resetOffsetsAndAssert(args, appId, topic, expectedOffset,
expectedCommittedOffset, partitions);
resetOffsetsAndAssert(addTo(args, "--dry-run"), appId, topic,
expectedOffset, expectedCommittedOffset, partitions);
resetOffsetsAndAssert(addTo(args, "--execute"), appId, topic,
expectedOffset, expectedOffset, partitions);
}
@@ -588,7 +604,6 @@ public class ResetStreamsGroupOffsetTest {
String topic2,
long expectedOffset,
long expectedCommittedOffset) throws
ExecutionException, InterruptedException {
- resetOffsetsAndAssert(args, appId, topic1, topic2, expectedOffset,
expectedCommittedOffset);
resetOffsetsAndAssert(addTo(args, "--dry-run"), appId, topic1, topic2,
expectedOffset, expectedCommittedOffset);
resetOffsetsAndAssert(addTo(args, "--execute"), appId, topic1, topic2,
expectedOffset, expectedOffset);
}