This is an automated email from the ASF dual-hosted git repository.

chia7712 pushed a commit to branch trunk
in repository https://gitbox.apache.org/repos/asf/kafka.git


The following commit(s) were added to refs/heads/trunk by this push:
     new f97b95c60a2 KAFKA-19498 Add include argument to ConsumerPerformance 
tool (#20221)
f97b95c60a2 is described below

commit f97b95c60a22cc34c6349bfe0c8596db28b0a17d
Author: Federico Valeri <fedeval...@gmail.com>
AuthorDate: Sun Aug 24 22:15:37 2025 +0200

    KAFKA-19498 Add include argument to ConsumerPerformance tool (#20221)
    
    This patch adds the include argument to ConsumerPerformance tool.
    
    ConsoleConsumer and ConsumerPerformance serve different purposes but
    share common functionality for message consumption. Currently, there's
    an inconsistency in their command-line interfaces:
    
    - ConsoleConsumer supports an --include argument that allows users to
    specify a regular expression pattern to filter topics for consumption
    - ConsumerPerformance lacks this topic filtering capability, requiring
    users to specify a single topic explicitly via --topic argument
    
    This inconsistency creates two problems:
    
    - Similar tools should provide similar topic selection capabilities for
    better user experience
    - Users cannot test consumer performance across multiple topics or
    dynamically matching topic sets, making it difficult to test realistic
    scenarios
    
    Reviewers: Chia-Ping Tsai <chia7...@gmail.com>
---
 docs/upgrade.html                                  |   4 +
 .../apache/kafka/server/util/CommandLineUtils.java |  25 +++++
 .../kafka/server/util/CommandLineUtilsTest.java    | 106 +++++++++++++++++++++
 .../apache/kafka/tools/ConsumerPerformance.java    |  35 +++++--
 .../tools/consumer/ConsoleConsumerOptions.java     |   8 +-
 .../kafka/tools/ConsumerPerformanceTest.java       |  43 ++++++++-
 6 files changed, 206 insertions(+), 15 deletions(-)

diff --git a/docs/upgrade.html b/docs/upgrade.html
index f0f3f540738..1dbb7e2d2ee 100644
--- a/docs/upgrade.html
+++ b/docs/upgrade.html
@@ -40,6 +40,10 @@
     <li>
         The <code>PARTITIONER_ADPATIVE_PARTITIONING_ENABLE_CONFIG</code> in 
<code>ProducerConfig</code> was deprecated and will be removed in Kafka 5.0. 
Please use the <code>PARTITIONER_ADAPTIVE_PARTITIONING_ENABLE_CONFIG</code> 
instead.
     </li>
+    <li>
+        The <code>ConsumerPerformance</code> command line tool has a new 
<code>include</code> option that is alternative to the <code>topic</code> 
option.
+        This new option allows to pass a regular expression specifying a list 
of topics to include for consumption, which is useful to test consumer 
performance across multiple topics or dynamically matching topic sets.
+    </li>
 </ul>
 
 <h4><a id="upgrade_4_1_0" href="#upgrade_4_1_0">Upgrading to 4.1.0</a></h4>
diff --git 
a/server-common/src/main/java/org/apache/kafka/server/util/CommandLineUtils.java
 
b/server-common/src/main/java/org/apache/kafka/server/util/CommandLineUtils.java
index 6146daeb45c..c5b973f78e7 100644
--- 
a/server-common/src/main/java/org/apache/kafka/server/util/CommandLineUtils.java
+++ 
b/server-common/src/main/java/org/apache/kafka/server/util/CommandLineUtils.java
@@ -23,10 +23,12 @@ import org.apache.kafka.common.utils.Exit;
 
 import java.io.IOException;
 import java.util.ArrayList;
+import java.util.Arrays;
 import java.util.List;
 import java.util.Optional;
 import java.util.Properties;
 import java.util.Set;
+import java.util.stream.Collectors;
 
 import joptsimple.OptionParser;
 import joptsimple.OptionSet;
@@ -135,6 +137,29 @@ public class CommandLineUtils {
         Exit.exit(1, message);
     }
 
+    /**
+     * Check that exactly one of a set of mutually exclusive arguments is 
present.
+     */
+    public static void checkOneOfArgs(OptionParser parser, OptionSet options, 
OptionSpec<?>... optionSpecs) {
+        if (optionSpecs == null || optionSpecs.length == 0) {
+            throw new IllegalArgumentException("At least one option must be 
provided");
+        }
+
+        int presentCount = 0;
+        for (OptionSpec<?> spec : optionSpecs) {
+            if (options.has(spec)) {
+                presentCount++;
+            }
+        }
+
+        if (presentCount != 1) {
+            printUsageAndExit(parser, "Exactly one of the following arguments 
is required: " +
+                    Arrays.stream(optionSpecs)
+                            .map(Object::toString)
+                            .collect(Collectors.joining(", ")));
+        }
+    }
+
     public static void printUsageAndExit(OptionParser parser, String message) {
         System.err.println(message);
         try {
diff --git 
a/server-common/src/test/java/org/apache/kafka/server/util/CommandLineUtilsTest.java
 
b/server-common/src/test/java/org/apache/kafka/server/util/CommandLineUtilsTest.java
index 8fdc6c89d06..a634b21403e 100644
--- 
a/server-common/src/test/java/org/apache/kafka/server/util/CommandLineUtilsTest.java
+++ 
b/server-common/src/test/java/org/apache/kafka/server/util/CommandLineUtilsTest.java
@@ -16,6 +16,8 @@
  */
 package org.apache.kafka.server.util;
 
+import org.apache.kafka.common.utils.Exit;
+
 import org.junit.jupiter.api.Test;
 
 import java.util.List;
@@ -26,9 +28,12 @@ import joptsimple.OptionParser;
 import joptsimple.OptionSet;
 import joptsimple.OptionSpec;
 
+import static org.junit.jupiter.api.Assertions.assertDoesNotThrow;
 import static org.junit.jupiter.api.Assertions.assertEquals;
 import static org.junit.jupiter.api.Assertions.assertNull;
 import static org.junit.jupiter.api.Assertions.assertThrows;
+import static org.mockito.Mockito.mock;
+import static org.mockito.Mockito.when;
 
 public class CommandLineUtilsTest {
     @Test
@@ -266,4 +271,105 @@ public class CommandLineUtilsTest {
                 () -> 
CommandLineUtils.initializeBootstrapProperties(createTestProps(),
                     Optional.of("127.0.0.2:9094"), 
Optional.of("127.0.0.3:9095"))).getMessage());
     }
+
+    private OptionSpec<String> createMockOptionSpec(String name) {
+        OptionSpec<String> spec = mock(OptionSpec.class);
+        when(spec.toString()).thenReturn("[" + name.replaceAll("--", "") + 
"]");
+        return spec;
+    }
+
+    @Test
+    void testCheckOneOfArgsNoOptions() {
+        OptionParser parser = mock(OptionParser.class);
+        OptionSet options = mock(OptionSet.class);
+
+        IllegalArgumentException e = 
assertThrows(IllegalArgumentException.class, () ->
+                CommandLineUtils.checkOneOfArgs(parser, options)
+        );
+
+        assertEquals("At least one option must be provided", e.getMessage());
+    }
+
+    @Test
+    void testCheckOneOfArgsOnePresent() {
+        OptionParser parser = mock(OptionParser.class);
+        OptionSet options = mock(OptionSet.class);
+        OptionSpec<String> opt1 = createMockOptionSpec("--first-option");
+        OptionSpec<String> opt2 = createMockOptionSpec("--second-option");
+        OptionSpec<String> opt3 = createMockOptionSpec("--third-option");
+
+        when(options.has(opt1)).thenReturn(true);
+        when(options.has(opt2)).thenReturn(false);
+        when(options.has(opt3)).thenReturn(false);
+
+        assertDoesNotThrow(() ->
+                CommandLineUtils.checkOneOfArgs(parser, options, opt1, opt2, 
opt3)
+        );
+
+        when(options.has(opt1)).thenReturn(false);
+        when(options.has(opt2)).thenReturn(true);
+
+        assertDoesNotThrow(() ->
+                CommandLineUtils.checkOneOfArgs(parser, options, opt1, opt2, 
opt3)
+        );
+
+        when(options.has(opt2)).thenReturn(false);
+        when(options.has(opt3)).thenReturn(true);
+
+        assertDoesNotThrow(() ->
+                CommandLineUtils.checkOneOfArgs(parser, options, opt1, opt2, 
opt3)
+        );
+    }
+
+    @Test
+    void testCheckOneOfArgsNonePresent() {
+        Exit.setExitProcedure((code, message) -> {
+            throw new IllegalArgumentException(message);
+        });
+
+        OptionParser parser = mock(OptionParser.class);
+        OptionSet options = mock(OptionSet.class);
+        OptionSpec<String> opt1 = createMockOptionSpec("--first-option");
+        OptionSpec<String> opt2 = createMockOptionSpec("--second-option");
+        OptionSpec<String> opt3 = createMockOptionSpec("--third-option");
+
+        when(options.has(opt1)).thenReturn(false);
+        when(options.has(opt2)).thenReturn(false);
+        when(options.has(opt3)).thenReturn(false);
+
+        try {
+            IllegalArgumentException e = 
assertThrows(IllegalArgumentException.class,
+                    () -> CommandLineUtils.checkOneOfArgs(parser, options, 
opt1, opt2, opt3));
+            assertEquals("Exactly one of the following arguments is required: 
" +
+                    "[first-option], [second-option], [third-option]", 
e.getMessage());
+        } finally {
+            Exit.resetExitProcedure();
+        }
+    }
+
+    @Test
+    void testCheckOneOfArgsMultiplePresent() {
+        Exit.setExitProcedure((code, message) -> {
+            throw new IllegalArgumentException(message);
+        });
+
+        OptionParser parser = mock(OptionParser.class);
+        OptionSet options = mock(OptionSet.class);
+        OptionSpec<String> opt1 = createMockOptionSpec("--first-option");
+        OptionSpec<String> opt2 = createMockOptionSpec("--second-option");
+        OptionSpec<String> opt3 = createMockOptionSpec("--third-option");
+
+        when(options.has(opt1)).thenReturn(true);
+        when(options.has(opt2)).thenReturn(true);
+        when(options.has(opt3)).thenReturn(false);
+
+        try {
+            IllegalArgumentException e = 
assertThrows(IllegalArgumentException.class,
+                    () -> CommandLineUtils.checkOneOfArgs(parser, options, 
opt1, opt2, opt3));
+            assertEquals("Exactly one of the following arguments is required: 
" +
+                    "[first-option], [second-option], [third-option]", 
e.getMessage());
+        } finally {
+            Exit.resetExitProcedure();
+        }
+    }
 }
diff --git 
a/tools/src/main/java/org/apache/kafka/tools/ConsumerPerformance.java 
b/tools/src/main/java/org/apache/kafka/tools/ConsumerPerformance.java
index 60b4b37abe4..0892693801a 100644
--- a/tools/src/main/java/org/apache/kafka/tools/ConsumerPerformance.java
+++ b/tools/src/main/java/org/apache/kafka/tools/ConsumerPerformance.java
@@ -37,11 +37,13 @@ import java.io.IOException;
 import java.text.SimpleDateFormat;
 import java.time.Duration;
 import java.util.Collection;
+import java.util.List;
 import java.util.Map;
+import java.util.Optional;
 import java.util.Properties;
 import java.util.Random;
-import java.util.Set;
 import java.util.concurrent.atomic.AtomicLong;
+import java.util.regex.Pattern;
 
 import joptsimple.OptionException;
 import joptsimple.OptionSpec;
@@ -134,8 +136,13 @@ public class ConsumerPerformance {
         long reportingIntervalMs = options.reportingIntervalMs();
         boolean showDetailedStats = options.showDetailedStats();
         SimpleDateFormat dateFormat = options.dateFormat();
-        consumer.subscribe(options.topic(),
-            new ConsumerPerfRebListener(joinTimeMs, joinStartMs, 
joinTimeMsInSingleRound));
+
+        ConsumerPerfRebListener listener = new 
ConsumerPerfRebListener(joinTimeMs, joinStartMs, joinTimeMsInSingleRound);
+        if (options.topic().isPresent()) {
+            consumer.subscribe(options.topic().get(), listener);
+        } else {
+            consumer.subscribe(options.include().get(), listener);
+        }
 
         // now start the benchmark
         long currentTimeMs = System.currentTimeMillis();
@@ -246,6 +253,7 @@ public class ConsumerPerformance {
     protected static class ConsumerPerfOptions extends CommandDefaultOptions {
         private final OptionSpec<String> bootstrapServerOpt;
         private final OptionSpec<String> topicOpt;
+        private final OptionSpec<String> includeOpt;
         private final OptionSpec<String> groupIdOpt;
         private final OptionSpec<Integer> fetchSizeOpt;
         private final OptionSpec<Void> resetBeginningOffsetOpt;
@@ -265,10 +273,14 @@ public class ConsumerPerformance {
                 .withRequiredArg()
                 .describedAs("server to connect to")
                 .ofType(String.class);
-            topicOpt = parser.accepts("topic", "REQUIRED: The topic to consume 
from.")
+            topicOpt = parser.accepts("topic", "The topic to consume from.")
                 .withRequiredArg()
                 .describedAs("topic")
                 .ofType(String.class);
+            includeOpt = parser.accepts("include", "Regular expression 
specifying list of topics to include for consumption.")
+                .withRequiredArg()
+                .describedAs("Java regex (String)")
+                .ofType(String.class);
             groupIdOpt = parser.accepts("group", "The group id to consume on.")
                 .withRequiredArg()
                 .describedAs("gid")
@@ -323,7 +335,8 @@ public class ConsumerPerformance {
             }
             if (options != null) {
                 CommandLineUtils.maybePrintHelpOrVersion(this, "This tool is 
used to verify the consumer performance.");
-                CommandLineUtils.checkRequiredArgs(parser, options, topicOpt, 
numMessagesOpt);
+                CommandLineUtils.checkRequiredArgs(parser, options, 
numMessagesOpt);
+                CommandLineUtils.checkOneOfArgs(parser, options, topicOpt, 
includeOpt);
             }
         }
 
@@ -353,8 +366,16 @@ public class ConsumerPerformance {
             return props;
         }
 
-        public Set<String> topic() {
-            return Set.of(options.valueOf(topicOpt));
+        public Optional<Collection<String>> topic() {
+            return options.has(topicOpt)
+                    ? Optional.of(List.of(options.valueOf(topicOpt)))
+                    : Optional.empty();
+        }
+
+        public Optional<Pattern> include() {
+            return options.has(includeOpt)
+                    ? Optional.of(Pattern.compile(options.valueOf(includeOpt)))
+                    : Optional.empty();
         }
 
         public long numMessages() {
diff --git 
a/tools/src/main/java/org/apache/kafka/tools/consumer/ConsoleConsumerOptions.java
 
b/tools/src/main/java/org/apache/kafka/tools/consumer/ConsoleConsumerOptions.java
index cf4daa0c636..60ee8f61ffa 100644
--- 
a/tools/src/main/java/org/apache/kafka/tools/consumer/ConsoleConsumerOptions.java
+++ 
b/tools/src/main/java/org/apache/kafka/tools/consumer/ConsoleConsumerOptions.java
@@ -25,10 +25,8 @@ import org.apache.kafka.server.util.CommandDefaultOptions;
 import org.apache.kafka.server.util.CommandLineUtils;
 
 import java.io.IOException;
-import java.util.ArrayList;
 import java.util.HashMap;
 import java.util.HashSet;
-import java.util.List;
 import java.util.Locale;
 import java.util.Map;
 import java.util.Optional;
@@ -185,12 +183,8 @@ public final class ConsoleConsumerOptions extends 
CommandDefaultOptions {
     }
 
     private void checkRequiredArgs() {
-        List<Optional<String>> topicOrFilterArgs = new 
ArrayList<>(List.of(topicArg(), includedTopicsArg()));
-        topicOrFilterArgs.removeIf(Optional::isEmpty);
         // user need to specify value for either --topic or --include options
-        if (topicOrFilterArgs.size() != 1) {
-            CommandLineUtils.printUsageAndExit(parser, "Exactly one of 
--include/--topic is required. ");
-        }
+        CommandLineUtils.checkOneOfArgs(parser, options, topicOpt, includeOpt);
 
         if (partitionArg().isPresent()) {
             if (!options.has(topicOpt)) {
diff --git 
a/tools/src/test/java/org/apache/kafka/tools/ConsumerPerformanceTest.java 
b/tools/src/test/java/org/apache/kafka/tools/ConsumerPerformanceTest.java
index 270fab2cf80..d78b65e54a3 100644
--- a/tools/src/test/java/org/apache/kafka/tools/ConsumerPerformanceTest.java
+++ b/tools/src/test/java/org/apache/kafka/tools/ConsumerPerformanceTest.java
@@ -75,7 +75,7 @@ public class ConsumerPerformanceTest {
         ConsumerPerformance.ConsumerPerfOptions config = new 
ConsumerPerformance.ConsumerPerfOptions(args);
 
         assertEquals("localhost:9092", config.brokerHostsAndPorts());
-        assertTrue(config.topic().contains("test"));
+        assertTrue(config.topic().get().contains("test"));
         assertEquals(10, config.numMessages());
     }
 
@@ -93,6 +93,47 @@ public class ConsumerPerformanceTest {
         assertTrue(err.contains("new-consumer is not a recognized option"));
     }
 
+    @Test
+    public void testConfigWithInclude() {
+        String[] args = new String[]{
+            "--bootstrap-server", "localhost:9092",
+            "--include", "test.*",
+            "--messages", "10"
+        };
+
+        ConsumerPerformance.ConsumerPerfOptions config = new 
ConsumerPerformance.ConsumerPerfOptions(args);
+
+        assertEquals("localhost:9092", config.brokerHostsAndPorts());
+        assertTrue(config.include().get().toString().contains("test.*"));
+        assertEquals(10, config.numMessages());
+    }
+
+    @Test
+    public void testConfigWithTopicAndInclude() {
+        String[] args = new String[]{
+            "--bootstrap-server", "localhost:9092",
+            "--topic", "test",
+            "--include", "test.*",
+            "--messages", "10"
+        };
+
+        String err = ToolsTestUtils.captureStandardErr(() -> new 
ConsumerPerformance.ConsumerPerfOptions(args));
+
+        assertTrue(err.contains("Exactly one of the following arguments is 
required: [topic], [include]"));
+    }
+
+    @Test
+    public void testConfigWithoutTopicAndInclude() {
+        String[] args = new String[]{
+            "--bootstrap-server", "localhost:9092",
+            "--messages", "10"
+        };
+
+        String err = ToolsTestUtils.captureStandardErr(() -> new 
ConsumerPerformance.ConsumerPerfOptions(args));
+
+        assertTrue(err.contains("Exactly one of the following arguments is 
required: [topic], [include]"));
+    }
+
     @Test
     public void testClientIdOverride() throws IOException {
         File tempFile = 
Files.createFile(tempDir.resolve("test_consumer_config.conf")).toFile();

Reply via email to