This is an automated email from the ASF dual-hosted git repository. chia7712 pushed a commit to branch trunk in repository https://gitbox.apache.org/repos/asf/kafka.git
The following commit(s) were added to refs/heads/trunk by this push: new f97b95c60a2 KAFKA-19498 Add include argument to ConsumerPerformance tool (#20221) f97b95c60a2 is described below commit f97b95c60a22cc34c6349bfe0c8596db28b0a17d Author: Federico Valeri <fedeval...@gmail.com> AuthorDate: Sun Aug 24 22:15:37 2025 +0200 KAFKA-19498 Add include argument to ConsumerPerformance tool (#20221) This patch adds the include argument to ConsumerPerformance tool. ConsoleConsumer and ConsumerPerformance serve different purposes but share common functionality for message consumption. Currently, there's an inconsistency in their command-line interfaces: - ConsoleConsumer supports an --include argument that allows users to specify a regular expression pattern to filter topics for consumption - ConsumerPerformance lacks this topic filtering capability, requiring users to specify a single topic explicitly via --topic argument This inconsistency creates two problems: - Similar tools should provide similar topic selection capabilities for better user experience - Users cannot test consumer performance across multiple topics or dynamically matching topic sets, making it difficult to test realistic scenarios Reviewers: Chia-Ping Tsai <chia7...@gmail.com> --- docs/upgrade.html | 4 + .../apache/kafka/server/util/CommandLineUtils.java | 25 +++++ .../kafka/server/util/CommandLineUtilsTest.java | 106 +++++++++++++++++++++ .../apache/kafka/tools/ConsumerPerformance.java | 35 +++++-- .../tools/consumer/ConsoleConsumerOptions.java | 8 +- .../kafka/tools/ConsumerPerformanceTest.java | 43 ++++++++- 6 files changed, 206 insertions(+), 15 deletions(-) diff --git a/docs/upgrade.html b/docs/upgrade.html index f0f3f540738..1dbb7e2d2ee 100644 --- a/docs/upgrade.html +++ b/docs/upgrade.html @@ -40,6 +40,10 @@ <li> The <code>PARTITIONER_ADPATIVE_PARTITIONING_ENABLE_CONFIG</code> in <code>ProducerConfig</code> was deprecated and will be removed in Kafka 5.0. Please use the <code>PARTITIONER_ADAPTIVE_PARTITIONING_ENABLE_CONFIG</code> instead. </li> + <li> + The <code>ConsumerPerformance</code> command line tool has a new <code>include</code> option that is alternative to the <code>topic</code> option. + This new option allows to pass a regular expression specifying a list of topics to include for consumption, which is useful to test consumer performance across multiple topics or dynamically matching topic sets. + </li> </ul> <h4><a id="upgrade_4_1_0" href="#upgrade_4_1_0">Upgrading to 4.1.0</a></h4> diff --git a/server-common/src/main/java/org/apache/kafka/server/util/CommandLineUtils.java b/server-common/src/main/java/org/apache/kafka/server/util/CommandLineUtils.java index 6146daeb45c..c5b973f78e7 100644 --- a/server-common/src/main/java/org/apache/kafka/server/util/CommandLineUtils.java +++ b/server-common/src/main/java/org/apache/kafka/server/util/CommandLineUtils.java @@ -23,10 +23,12 @@ import org.apache.kafka.common.utils.Exit; import java.io.IOException; import java.util.ArrayList; +import java.util.Arrays; import java.util.List; import java.util.Optional; import java.util.Properties; import java.util.Set; +import java.util.stream.Collectors; import joptsimple.OptionParser; import joptsimple.OptionSet; @@ -135,6 +137,29 @@ public class CommandLineUtils { Exit.exit(1, message); } + /** + * Check that exactly one of a set of mutually exclusive arguments is present. + */ + public static void checkOneOfArgs(OptionParser parser, OptionSet options, OptionSpec<?>... optionSpecs) { + if (optionSpecs == null || optionSpecs.length == 0) { + throw new IllegalArgumentException("At least one option must be provided"); + } + + int presentCount = 0; + for (OptionSpec<?> spec : optionSpecs) { + if (options.has(spec)) { + presentCount++; + } + } + + if (presentCount != 1) { + printUsageAndExit(parser, "Exactly one of the following arguments is required: " + + Arrays.stream(optionSpecs) + .map(Object::toString) + .collect(Collectors.joining(", "))); + } + } + public static void printUsageAndExit(OptionParser parser, String message) { System.err.println(message); try { diff --git a/server-common/src/test/java/org/apache/kafka/server/util/CommandLineUtilsTest.java b/server-common/src/test/java/org/apache/kafka/server/util/CommandLineUtilsTest.java index 8fdc6c89d06..a634b21403e 100644 --- a/server-common/src/test/java/org/apache/kafka/server/util/CommandLineUtilsTest.java +++ b/server-common/src/test/java/org/apache/kafka/server/util/CommandLineUtilsTest.java @@ -16,6 +16,8 @@ */ package org.apache.kafka.server.util; +import org.apache.kafka.common.utils.Exit; + import org.junit.jupiter.api.Test; import java.util.List; @@ -26,9 +28,12 @@ import joptsimple.OptionParser; import joptsimple.OptionSet; import joptsimple.OptionSpec; +import static org.junit.jupiter.api.Assertions.assertDoesNotThrow; import static org.junit.jupiter.api.Assertions.assertEquals; import static org.junit.jupiter.api.Assertions.assertNull; import static org.junit.jupiter.api.Assertions.assertThrows; +import static org.mockito.Mockito.mock; +import static org.mockito.Mockito.when; public class CommandLineUtilsTest { @Test @@ -266,4 +271,105 @@ public class CommandLineUtilsTest { () -> CommandLineUtils.initializeBootstrapProperties(createTestProps(), Optional.of("127.0.0.2:9094"), Optional.of("127.0.0.3:9095"))).getMessage()); } + + private OptionSpec<String> createMockOptionSpec(String name) { + OptionSpec<String> spec = mock(OptionSpec.class); + when(spec.toString()).thenReturn("[" + name.replaceAll("--", "") + "]"); + return spec; + } + + @Test + void testCheckOneOfArgsNoOptions() { + OptionParser parser = mock(OptionParser.class); + OptionSet options = mock(OptionSet.class); + + IllegalArgumentException e = assertThrows(IllegalArgumentException.class, () -> + CommandLineUtils.checkOneOfArgs(parser, options) + ); + + assertEquals("At least one option must be provided", e.getMessage()); + } + + @Test + void testCheckOneOfArgsOnePresent() { + OptionParser parser = mock(OptionParser.class); + OptionSet options = mock(OptionSet.class); + OptionSpec<String> opt1 = createMockOptionSpec("--first-option"); + OptionSpec<String> opt2 = createMockOptionSpec("--second-option"); + OptionSpec<String> opt3 = createMockOptionSpec("--third-option"); + + when(options.has(opt1)).thenReturn(true); + when(options.has(opt2)).thenReturn(false); + when(options.has(opt3)).thenReturn(false); + + assertDoesNotThrow(() -> + CommandLineUtils.checkOneOfArgs(parser, options, opt1, opt2, opt3) + ); + + when(options.has(opt1)).thenReturn(false); + when(options.has(opt2)).thenReturn(true); + + assertDoesNotThrow(() -> + CommandLineUtils.checkOneOfArgs(parser, options, opt1, opt2, opt3) + ); + + when(options.has(opt2)).thenReturn(false); + when(options.has(opt3)).thenReturn(true); + + assertDoesNotThrow(() -> + CommandLineUtils.checkOneOfArgs(parser, options, opt1, opt2, opt3) + ); + } + + @Test + void testCheckOneOfArgsNonePresent() { + Exit.setExitProcedure((code, message) -> { + throw new IllegalArgumentException(message); + }); + + OptionParser parser = mock(OptionParser.class); + OptionSet options = mock(OptionSet.class); + OptionSpec<String> opt1 = createMockOptionSpec("--first-option"); + OptionSpec<String> opt2 = createMockOptionSpec("--second-option"); + OptionSpec<String> opt3 = createMockOptionSpec("--third-option"); + + when(options.has(opt1)).thenReturn(false); + when(options.has(opt2)).thenReturn(false); + when(options.has(opt3)).thenReturn(false); + + try { + IllegalArgumentException e = assertThrows(IllegalArgumentException.class, + () -> CommandLineUtils.checkOneOfArgs(parser, options, opt1, opt2, opt3)); + assertEquals("Exactly one of the following arguments is required: " + + "[first-option], [second-option], [third-option]", e.getMessage()); + } finally { + Exit.resetExitProcedure(); + } + } + + @Test + void testCheckOneOfArgsMultiplePresent() { + Exit.setExitProcedure((code, message) -> { + throw new IllegalArgumentException(message); + }); + + OptionParser parser = mock(OptionParser.class); + OptionSet options = mock(OptionSet.class); + OptionSpec<String> opt1 = createMockOptionSpec("--first-option"); + OptionSpec<String> opt2 = createMockOptionSpec("--second-option"); + OptionSpec<String> opt3 = createMockOptionSpec("--third-option"); + + when(options.has(opt1)).thenReturn(true); + when(options.has(opt2)).thenReturn(true); + when(options.has(opt3)).thenReturn(false); + + try { + IllegalArgumentException e = assertThrows(IllegalArgumentException.class, + () -> CommandLineUtils.checkOneOfArgs(parser, options, opt1, opt2, opt3)); + assertEquals("Exactly one of the following arguments is required: " + + "[first-option], [second-option], [third-option]", e.getMessage()); + } finally { + Exit.resetExitProcedure(); + } + } } diff --git a/tools/src/main/java/org/apache/kafka/tools/ConsumerPerformance.java b/tools/src/main/java/org/apache/kafka/tools/ConsumerPerformance.java index 60b4b37abe4..0892693801a 100644 --- a/tools/src/main/java/org/apache/kafka/tools/ConsumerPerformance.java +++ b/tools/src/main/java/org/apache/kafka/tools/ConsumerPerformance.java @@ -37,11 +37,13 @@ import java.io.IOException; import java.text.SimpleDateFormat; import java.time.Duration; import java.util.Collection; +import java.util.List; import java.util.Map; +import java.util.Optional; import java.util.Properties; import java.util.Random; -import java.util.Set; import java.util.concurrent.atomic.AtomicLong; +import java.util.regex.Pattern; import joptsimple.OptionException; import joptsimple.OptionSpec; @@ -134,8 +136,13 @@ public class ConsumerPerformance { long reportingIntervalMs = options.reportingIntervalMs(); boolean showDetailedStats = options.showDetailedStats(); SimpleDateFormat dateFormat = options.dateFormat(); - consumer.subscribe(options.topic(), - new ConsumerPerfRebListener(joinTimeMs, joinStartMs, joinTimeMsInSingleRound)); + + ConsumerPerfRebListener listener = new ConsumerPerfRebListener(joinTimeMs, joinStartMs, joinTimeMsInSingleRound); + if (options.topic().isPresent()) { + consumer.subscribe(options.topic().get(), listener); + } else { + consumer.subscribe(options.include().get(), listener); + } // now start the benchmark long currentTimeMs = System.currentTimeMillis(); @@ -246,6 +253,7 @@ public class ConsumerPerformance { protected static class ConsumerPerfOptions extends CommandDefaultOptions { private final OptionSpec<String> bootstrapServerOpt; private final OptionSpec<String> topicOpt; + private final OptionSpec<String> includeOpt; private final OptionSpec<String> groupIdOpt; private final OptionSpec<Integer> fetchSizeOpt; private final OptionSpec<Void> resetBeginningOffsetOpt; @@ -265,10 +273,14 @@ public class ConsumerPerformance { .withRequiredArg() .describedAs("server to connect to") .ofType(String.class); - topicOpt = parser.accepts("topic", "REQUIRED: The topic to consume from.") + topicOpt = parser.accepts("topic", "The topic to consume from.") .withRequiredArg() .describedAs("topic") .ofType(String.class); + includeOpt = parser.accepts("include", "Regular expression specifying list of topics to include for consumption.") + .withRequiredArg() + .describedAs("Java regex (String)") + .ofType(String.class); groupIdOpt = parser.accepts("group", "The group id to consume on.") .withRequiredArg() .describedAs("gid") @@ -323,7 +335,8 @@ public class ConsumerPerformance { } if (options != null) { CommandLineUtils.maybePrintHelpOrVersion(this, "This tool is used to verify the consumer performance."); - CommandLineUtils.checkRequiredArgs(parser, options, topicOpt, numMessagesOpt); + CommandLineUtils.checkRequiredArgs(parser, options, numMessagesOpt); + CommandLineUtils.checkOneOfArgs(parser, options, topicOpt, includeOpt); } } @@ -353,8 +366,16 @@ public class ConsumerPerformance { return props; } - public Set<String> topic() { - return Set.of(options.valueOf(topicOpt)); + public Optional<Collection<String>> topic() { + return options.has(topicOpt) + ? Optional.of(List.of(options.valueOf(topicOpt))) + : Optional.empty(); + } + + public Optional<Pattern> include() { + return options.has(includeOpt) + ? Optional.of(Pattern.compile(options.valueOf(includeOpt))) + : Optional.empty(); } public long numMessages() { diff --git a/tools/src/main/java/org/apache/kafka/tools/consumer/ConsoleConsumerOptions.java b/tools/src/main/java/org/apache/kafka/tools/consumer/ConsoleConsumerOptions.java index cf4daa0c636..60ee8f61ffa 100644 --- a/tools/src/main/java/org/apache/kafka/tools/consumer/ConsoleConsumerOptions.java +++ b/tools/src/main/java/org/apache/kafka/tools/consumer/ConsoleConsumerOptions.java @@ -25,10 +25,8 @@ import org.apache.kafka.server.util.CommandDefaultOptions; import org.apache.kafka.server.util.CommandLineUtils; import java.io.IOException; -import java.util.ArrayList; import java.util.HashMap; import java.util.HashSet; -import java.util.List; import java.util.Locale; import java.util.Map; import java.util.Optional; @@ -185,12 +183,8 @@ public final class ConsoleConsumerOptions extends CommandDefaultOptions { } private void checkRequiredArgs() { - List<Optional<String>> topicOrFilterArgs = new ArrayList<>(List.of(topicArg(), includedTopicsArg())); - topicOrFilterArgs.removeIf(Optional::isEmpty); // user need to specify value for either --topic or --include options - if (topicOrFilterArgs.size() != 1) { - CommandLineUtils.printUsageAndExit(parser, "Exactly one of --include/--topic is required. "); - } + CommandLineUtils.checkOneOfArgs(parser, options, topicOpt, includeOpt); if (partitionArg().isPresent()) { if (!options.has(topicOpt)) { diff --git a/tools/src/test/java/org/apache/kafka/tools/ConsumerPerformanceTest.java b/tools/src/test/java/org/apache/kafka/tools/ConsumerPerformanceTest.java index 270fab2cf80..d78b65e54a3 100644 --- a/tools/src/test/java/org/apache/kafka/tools/ConsumerPerformanceTest.java +++ b/tools/src/test/java/org/apache/kafka/tools/ConsumerPerformanceTest.java @@ -75,7 +75,7 @@ public class ConsumerPerformanceTest { ConsumerPerformance.ConsumerPerfOptions config = new ConsumerPerformance.ConsumerPerfOptions(args); assertEquals("localhost:9092", config.brokerHostsAndPorts()); - assertTrue(config.topic().contains("test")); + assertTrue(config.topic().get().contains("test")); assertEquals(10, config.numMessages()); } @@ -93,6 +93,47 @@ public class ConsumerPerformanceTest { assertTrue(err.contains("new-consumer is not a recognized option")); } + @Test + public void testConfigWithInclude() { + String[] args = new String[]{ + "--bootstrap-server", "localhost:9092", + "--include", "test.*", + "--messages", "10" + }; + + ConsumerPerformance.ConsumerPerfOptions config = new ConsumerPerformance.ConsumerPerfOptions(args); + + assertEquals("localhost:9092", config.brokerHostsAndPorts()); + assertTrue(config.include().get().toString().contains("test.*")); + assertEquals(10, config.numMessages()); + } + + @Test + public void testConfigWithTopicAndInclude() { + String[] args = new String[]{ + "--bootstrap-server", "localhost:9092", + "--topic", "test", + "--include", "test.*", + "--messages", "10" + }; + + String err = ToolsTestUtils.captureStandardErr(() -> new ConsumerPerformance.ConsumerPerfOptions(args)); + + assertTrue(err.contains("Exactly one of the following arguments is required: [topic], [include]")); + } + + @Test + public void testConfigWithoutTopicAndInclude() { + String[] args = new String[]{ + "--bootstrap-server", "localhost:9092", + "--messages", "10" + }; + + String err = ToolsTestUtils.captureStandardErr(() -> new ConsumerPerformance.ConsumerPerfOptions(args)); + + assertTrue(err.contains("Exactly one of the following arguments is required: [topic], [include]")); + } + @Test public void testClientIdOverride() throws IOException { File tempFile = Files.createFile(tempDir.resolve("test_consumer_config.conf")).toFile();