This is an automated email from the ASF dual-hosted git repository. frankvicky pushed a commit to branch trunk in repository https://gitbox.apache.org/repos/asf/kafka.git
The following commit(s) were added to refs/heads/trunk by this push: new f1e9aa1c655 MINOR: Fix flaky tests in Tools modules (#20225) f1e9aa1c655 is described below commit f1e9aa1c6553855206f52b17c50eed537a6611e2 Author: Sanskar Jhajharia <sjhajha...@confluent.io> AuthorDate: Wed Jul 23 12:10:18 2025 +0530 MINOR: Fix flaky tests in Tools modules (#20225) ### Problem The `ShareGroupCommandTest.testDeleteShareGroupOffsetsArgsWithoutTopic()`, `ShareGroupCommandTest.testDeleteShareGroupOffsetsArgsWithoutGroup()`, `ResetStreamsGroupOffsetTest.testResetOffsetsWithoutGroupOption()`, `DeleteStreamsGroupTest.testDeleteWithoutGroupOption()`, `DescribeStreamsGroupTest.testDescribeWithoutGroupOption()` tests were flaky due to a dependency on Set iteration order in error message generation. ### Root Cause The cleanup [commit](https://github.com/apache/kafka/pull/20091) that replaced `new HashSet<>(Arrays.asList(...))` with `Set.of(...)` in ShareGroupCommandOptions and StreamsGroupCommandOptions changed the iteration characteristics of collections used for error message generation: This produces different orders like `[topic], [group]` vs `[group], [topic]`, but the tests expected a specific order, causing intermittent failures. ### Solution Fix the root cause by ensuring deterministic error message generation through alphabetical sorting of option names. Reviewers: ShivsundarR <s...@confluent.io>, Ken Huang <s7133...@gmail.com>, TengYao Chi <frankvi...@apache.org> --- .../kafka/tools/consumer/group/ShareGroupCommandOptions.java | 6 +++--- .../kafka/tools/streams/StreamsGroupCommandOptions.java | 12 ++++++------ .../kafka/tools/consumer/group/ShareGroupCommandTest.java | 4 ++-- 3 files changed, 11 insertions(+), 11 deletions(-) diff --git a/tools/src/main/java/org/apache/kafka/tools/consumer/group/ShareGroupCommandOptions.java b/tools/src/main/java/org/apache/kafka/tools/consumer/group/ShareGroupCommandOptions.java index 9a96cad00ed..f155a9c4b5e 100644 --- a/tools/src/main/java/org/apache/kafka/tools/consumer/group/ShareGroupCommandOptions.java +++ b/tools/src/main/java/org/apache/kafka/tools/consumer/group/ShareGroupCommandOptions.java @@ -159,11 +159,11 @@ public class ShareGroupCommandOptions extends CommandDefaultOptions { if (options.has(describeOpt)) { if (!options.has(groupOpt) && !options.has(allGroupsOpt)) CommandLineUtils.printUsageAndExit(parser, - "Option " + describeOpt + " takes one of these options: " + allGroupSelectionScopeOpts.stream().map(Object::toString).collect(Collectors.joining(", "))); + "Option " + describeOpt + " takes one of these options: " + allGroupSelectionScopeOpts.stream().map(Object::toString).sorted().collect(Collectors.joining(", "))); List<OptionSpec<?>> mutuallyExclusiveOpts = List.of(membersOpt, offsetsOpt, stateOpt); if (mutuallyExclusiveOpts.stream().mapToInt(o -> options.has(o) ? 1 : 0).sum() > 1) { CommandLineUtils.printUsageAndExit(parser, - "Option " + describeOpt + " takes at most one of these options: " + mutuallyExclusiveOpts.stream().map(Object::toString).collect(Collectors.joining(", "))); + "Option " + describeOpt + " takes at most one of these options: " + mutuallyExclusiveOpts.stream().map(Object::toString).sorted().collect(Collectors.joining(", "))); } if (options.has(stateOpt) && options.valueOf(stateOpt) != null) CommandLineUtils.printUsageAndExit(parser, @@ -185,7 +185,7 @@ public class ShareGroupCommandOptions extends CommandDefaultOptions { if (options.has(deleteOffsetsOpt)) { if (!options.has(groupOpt) || !options.has(topicOpt)) CommandLineUtils.printUsageAndExit(parser, - "Option " + deleteOffsetsOpt + " takes the following options: " + allDeleteOffsetsOpts.stream().map(Object::toString).collect(Collectors.joining(", "))); + "Option " + deleteOffsetsOpt + " takes the following options: " + allDeleteOffsetsOpts.stream().map(Object::toString).sorted().collect(Collectors.joining(", "))); } if (options.has(resetOffsetsOpt)) { diff --git a/tools/src/main/java/org/apache/kafka/tools/streams/StreamsGroupCommandOptions.java b/tools/src/main/java/org/apache/kafka/tools/streams/StreamsGroupCommandOptions.java index 6ce387d3dbd..d6715f833d2 100644 --- a/tools/src/main/java/org/apache/kafka/tools/streams/StreamsGroupCommandOptions.java +++ b/tools/src/main/java/org/apache/kafka/tools/streams/StreamsGroupCommandOptions.java @@ -222,7 +222,7 @@ public class StreamsGroupCommandOptions extends CommandDefaultOptions { if (options.has(deleteOpt)) { if (!options.has(groupOpt) && !options.has(allGroupsOpt)) CommandLineUtils.printUsageAndExit(parser, - "Option " + deleteOpt + " takes one of these options: " + allGroupSelectionScopeOpts.stream().map(Object::toString).collect(Collectors.joining(", "))); + "Option " + deleteOpt + " takes one of these options: " + allGroupSelectionScopeOpts.stream().map(Object::toString).sorted().collect(Collectors.joining(", "))); if (options.has(inputTopicOpt) || options.has(allInputTopicsOpt)) CommandLineUtils.printUsageAndExit(parser, "Kafka Streams does not support topic-specific offset " + "deletion from a streams group."); @@ -253,11 +253,11 @@ public class StreamsGroupCommandOptions extends CommandDefaultOptions { if (!options.has(groupOpt) && !options.has(allGroupsOpt)) CommandLineUtils.printUsageAndExit(parser, - "Option " + describeOpt + " takes one of these options: " + allGroupSelectionScopeOpts.stream().map(Object::toString).collect(Collectors.joining(", "))); + "Option " + describeOpt + " takes one of these options: " + allGroupSelectionScopeOpts.stream().map(Object::toString).sorted().collect(Collectors.joining(", "))); List<OptionSpec<?>> mutuallyExclusiveOpts = List.of(membersOpt, offsetsOpt, stateOpt); if (mutuallyExclusiveOpts.stream().mapToInt(o -> options.has(o) ? 1 : 0).sum() > 1) { CommandLineUtils.printUsageAndExit(parser, - "Option " + describeOpt + " takes at most one of these options: " + mutuallyExclusiveOpts.stream().map(Object::toString).collect(Collectors.joining(", "))); + "Option " + describeOpt + " takes at most one of these options: " + mutuallyExclusiveOpts.stream().map(Object::toString).sorted().collect(Collectors.joining(", "))); } if (options.has(stateOpt) && options.valueOf(stateOpt) != null) CommandLineUtils.printUsageAndExit(parser, @@ -267,7 +267,7 @@ public class StreamsGroupCommandOptions extends CommandDefaultOptions { private void checkDeleteOffsetsArgs() { if ((!options.has(inputTopicOpt) && !options.has(allInputTopicsOpt)) || !options.has(groupOpt)) CommandLineUtils.printUsageAndExit(parser, - "Option " + deleteOffsetsOpt + " takes the " + groupOpt + " and one of these options: " + allDeleteOffsetsOpts.stream().map(Object::toString).collect(Collectors.joining(", "))); + "Option " + deleteOffsetsOpt + " takes the " + groupOpt + " and one of these options: " + allDeleteOffsetsOpts.stream().map(Object::toString).sorted().collect(Collectors.joining(", "))); if (options.valuesOf(groupOpt).size() > 1) CommandLineUtils.printUsageAndExit(parser, "Option " + deleteOffsetsOpt + " supports only one " + groupOpt + " at a time, but found: " + options.valuesOf(groupOpt)); @@ -286,7 +286,7 @@ public class StreamsGroupCommandOptions extends CommandDefaultOptions { if (!options.has(groupOpt) && !options.has(allGroupsOpt)) CommandLineUtils.printUsageAndExit(parser, - "Option " + resetOffsetsOpt + " takes one of these options: " + allGroupSelectionScopeOpts.stream().map(Object::toString).collect(Collectors.joining(", "))); + "Option " + resetOffsetsOpt + " takes one of these options: " + allGroupSelectionScopeOpts.stream().map(Object::toString).sorted().collect(Collectors.joining(", "))); CommandLineUtils.checkInvalidArgs(parser, options, resetToOffsetOpt, minus(allResetOffsetScenarioOpts, resetToOffsetOpt)); CommandLineUtils.checkInvalidArgs(parser, options, resetToDatetimeOpt, minus(allResetOffsetScenarioOpts, resetToDatetimeOpt)); @@ -301,7 +301,7 @@ public class StreamsGroupCommandOptions extends CommandDefaultOptions { private void checkDeleteAllInternalTopicsArgs() { if (!options.has(resetOffsetsOpt) && !options.has(deleteOpt)) { CommandLineUtils.printUsageAndExit(parser, - "Option " + deleteAllInternalTopicsOpt + " takes one of these options: " + allDeleteInternalGroupsOpts.stream().map(Object::toString).collect(Collectors.joining(", "))); + "Option " + deleteAllInternalTopicsOpt + " takes one of these options: " + allDeleteInternalGroupsOpts.stream().map(Object::toString).sorted().collect(Collectors.joining(", "))); } else if (options.has(resetOffsetsOpt) && !options.has(executeOpt)) { CommandLineUtils.printUsageAndExit(parser, "Option " + deleteAllInternalTopicsOpt + " takes " + executeOpt + " when " + resetOffsetsOpt + " is used."); diff --git a/tools/src/test/java/org/apache/kafka/tools/consumer/group/ShareGroupCommandTest.java b/tools/src/test/java/org/apache/kafka/tools/consumer/group/ShareGroupCommandTest.java index b7f59f1a991..19a11ce0f90 100644 --- a/tools/src/test/java/org/apache/kafka/tools/consumer/group/ShareGroupCommandTest.java +++ b/tools/src/test/java/org/apache/kafka/tools/consumer/group/ShareGroupCommandTest.java @@ -626,7 +626,7 @@ public class ShareGroupCommandTest { AtomicBoolean exited = new AtomicBoolean(false); Exit.setExitProcedure(((statusCode, message) -> { assertNotEquals(0, statusCode); - assertTrue(message.contains("Option [delete-offsets] takes the following options: [topic], [group]")); + assertTrue(message.contains("Option [delete-offsets] takes the following options: [group], [topic]")); exited.set(true); })); try { @@ -646,7 +646,7 @@ public class ShareGroupCommandTest { AtomicBoolean exited = new AtomicBoolean(false); Exit.setExitProcedure(((statusCode, message) -> { assertNotEquals(0, statusCode); - assertTrue(message.contains("Option [delete-offsets] takes the following options: [topic], [group]")); + assertTrue(message.contains("Option [delete-offsets] takes the following options: [group], [topic]")); exited.set(true); })); try {