This is an automated email from the ASF dual-hosted git repository.

schofielaj pushed a commit to branch trunk
in repository https://gitbox.apache.org/repos/asf/kafka.git


The following commit(s) were added to refs/heads/trunk by this push:
     new dbe9d34e474 KAFKA-19624: Improve consistency of command-line arguments 
for consumer performance tests (#20385)
dbe9d34e474 is described below

commit dbe9d34e47452059b78a73f782c4911ec7e366f7
Author: ally heev <[email protected]>
AuthorDate: Tue Sep 23 14:31:40 2025 +0530

    KAFKA-19624: Improve consistency of command-line arguments for consumer 
performance tests (#20385)
    
    resolves https://issues.apache.org/jira/browse/KAFKA-19624
    
    Reviewers: @brandboat, @AndrewJSchofield, @m1a2st
---
 .../services/performance/consumer_performance.py   |  15 ++-
 .../performance/share_consumer_performance.py      |  17 ++-
 .../apache/kafka/tools/ConsumerPerformance.java    | 133 +++++++++++++------
 .../kafka/tools/ShareConsumerPerformance.java      | 145 ++++++++++++++-------
 .../kafka/tools/ConsumerPerformanceTest.java       | 136 +++++++++++++++++--
 .../kafka/tools/ShareConsumerPerformanceTest.java  | 132 +++++++++++++++++--
 6 files changed, 458 insertions(+), 120 deletions(-)

diff --git a/tests/kafkatest/services/performance/consumer_performance.py 
b/tests/kafkatest/services/performance/consumer_performance.py
index 28086e82818..aa414e5a89f 100644
--- a/tests/kafkatest/services/performance/consumer_performance.py
+++ b/tests/kafkatest/services/performance/consumer_performance.py
@@ -40,7 +40,7 @@ class ConsumerPerformanceService(PerformanceService):
         "socket-buffer-size", "The size of the tcp RECV size."
 
         "new-consumer", "Use the new consumer implementation."
-        "consumer.config", "Consumer config properties file."
+        "command-config", "Config properties file."
     """
 
     # Root directory for persistent output
@@ -83,10 +83,14 @@ class ConsumerPerformanceService(PerformanceService):
     def args(self, version):
         """Dictionary of arguments used to start the Consumer Performance 
script."""
         args = {
-            'topic': self.topic,
-            'messages': self.messages
+            'topic': self.topic
         }
 
+        if version.supports_command_config():
+            args['num-records'] = self.messages
+        else:
+            args['messages'] = self.messages
+
         if version < V_2_5_0:
             args['broker-list'] = 
self.kafka.bootstrap_servers(self.security_config.security_protocol)
         else:
@@ -115,7 +119,10 @@ class ConsumerPerformanceService(PerformanceService):
         for key, value in self.args(node.version).items():
             cmd += " --%s %s" % (key, value)
 
-        cmd += " --consumer.config %s" % ConsumerPerformanceService.CONFIG_FILE
+        if node.version.supports_command_config():
+            cmd += " --command-config %s" % 
ConsumerPerformanceService.CONFIG_FILE
+        else:
+            cmd += " --consumer.config %s" % 
ConsumerPerformanceService.CONFIG_FILE
 
         for key, value in self.settings.items():
             cmd += " %s=%s" % (str(key), str(value))
diff --git a/tests/kafkatest/services/performance/share_consumer_performance.py 
b/tests/kafkatest/services/performance/share_consumer_performance.py
index ccb09524580..432d1e1da81 100644
--- a/tests/kafkatest/services/performance/share_consumer_performance.py
+++ b/tests/kafkatest/services/performance/share_consumer_performance.py
@@ -33,7 +33,7 @@ class ShareConsumerPerformanceService(PerformanceService):
 
         "socket-buffer-size", "The size of the tcp RECV size."
 
-        "consumer.config", "Consumer config properties file."
+        "command-config", "Config properties file."
     """
 
     # Root directory for persistent output
@@ -73,16 +73,20 @@ class ShareConsumerPerformanceService(PerformanceService):
         for node in self.nodes:
             node.version = version
 
-    def args(self):
+    def args(self, version):
         """Dictionary of arguments used to start the Share Consumer 
Performance script."""
         args = {
             'topic': self.topic,
-            'messages': self.messages,
             'bootstrap-server': 
self.kafka.bootstrap_servers(self.security_config.security_protocol),
             'group': self.group,
             'timeout': self.timeout
         }
 
+        if version.supports_command_config():
+            args['num-records'] = self.messages
+        else:
+            args['messages'] = self.messages
+
         if self.fetch_size is not None:
             args['fetch-size'] = self.fetch_size
 
@@ -97,10 +101,13 @@ class ShareConsumerPerformanceService(PerformanceService):
         cmd += " export KAFKA_OPTS=%s;" % self.security_config.kafka_opts
         cmd += " export KAFKA_LOG4J_OPTS=\"%s%s\";" % 
(get_log4j_config_param(node), get_log4j_config_for_tools(node))
         cmd += " %s" % self.path.script("kafka-share-consumer-perf-test.sh", 
node)
-        for key, value in self.args().items():
+        for key, value in self.args(node.version).items():
             cmd += " --%s %s" % (key, value)
 
-        cmd += " --consumer.config %s" % 
ShareConsumerPerformanceService.CONFIG_FILE
+        if node.version.supports_command_config():
+            cmd += " --command-config %s" % 
ShareConsumerPerformanceService.CONFIG_FILE
+        else:
+            cmd += " --consumer.config %s" % 
ShareConsumerPerformanceService.CONFIG_FILE
 
         for key, value in self.settings.items():
             cmd += " %s=%s" % (str(key), str(value))
diff --git 
a/tools/src/main/java/org/apache/kafka/tools/ConsumerPerformance.java 
b/tools/src/main/java/org/apache/kafka/tools/ConsumerPerformance.java
index 62def15d324..0334af83aa1 100644
--- a/tools/src/main/java/org/apache/kafka/tools/ConsumerPerformance.java
+++ b/tools/src/main/java/org/apache/kafka/tools/ConsumerPerformance.java
@@ -48,6 +48,7 @@ import joptsimple.OptionException;
 import joptsimple.OptionSpec;
 
 import static joptsimple.util.RegexMatcher.regex;
+import static org.apache.kafka.server.util.CommandLineUtils.parseKeyValueArgs;
 
 public class ConsumerPerformance {
     private static final Logger LOG = 
LoggerFactory.getLogger(ConsumerPerformance.class);
@@ -61,7 +62,7 @@ public class ConsumerPerformance {
         try {
             LOG.info("Starting consumer...");
             ConsumerPerfOptions options = new ConsumerPerfOptions(args);
-            AtomicLong totalMessagesRead = new AtomicLong(0);
+            AtomicLong totalRecordsRead = new AtomicLong(0);
             AtomicLong totalBytesRead = new AtomicLong(0);
             AtomicLong joinTimeMs = new AtomicLong(0);
             AtomicLong joinTimeMsInSingleRound = new AtomicLong(0);
@@ -71,14 +72,14 @@ public class ConsumerPerformance {
 
             try (Consumer<byte[], byte[]> consumer = 
consumerCreator.apply(options.props())) {
                 long bytesRead = 0L;
-                long messagesRead = 0L;
+                long recordsRead = 0L;
                 long lastBytesRead = 0L;
-                long lastMessagesRead = 0L;
+                long lastRecordsRead = 0L;
                 long currentTimeMs = System.currentTimeMillis();
                 long joinStartMs = currentTimeMs;
                 long startMs = currentTimeMs;
-                consume(consumer, options, totalMessagesRead, totalBytesRead, 
joinTimeMs,
-                    bytesRead, messagesRead, lastBytesRead, lastMessagesRead,
+                consume(consumer, options, totalRecordsRead, totalBytesRead, 
joinTimeMs,
+                    bytesRead, recordsRead, lastBytesRead, lastRecordsRead,
                     joinStartMs, joinTimeMsInSingleRound);
                 long endMs = System.currentTimeMillis();
 
@@ -92,12 +93,12 @@ public class ConsumerPerformance {
                         options.dateFormat().format(endMs),
                         totalMbRead,
                         totalMbRead / elapsedSec,
-                        totalMessagesRead.get(),
-                        totalMessagesRead.get() / elapsedSec,
+                        totalRecordsRead.get(),
+                        totalRecordsRead.get() / elapsedSec,
                         joinTimeMs.get(),
                         fetchTimeInMs,
                         totalMbRead / (fetchTimeInMs / 1000.0),
-                        totalMessagesRead.get() / (fetchTimeInMs / 1000.0)
+                        totalRecordsRead.get() / (fetchTimeInMs / 1000.0)
                     );
                 }
 
@@ -122,16 +123,16 @@ public class ConsumerPerformance {
 
     private static void consume(Consumer<byte[], byte[]> consumer,
                                 ConsumerPerfOptions options,
-                                AtomicLong totalMessagesRead,
+                                AtomicLong totalRecordsRead,
                                 AtomicLong totalBytesRead,
                                 AtomicLong joinTimeMs,
                                 long bytesRead,
-                                long messagesRead,
+                                long recordsRead,
                                 long lastBytesRead,
-                                long lastMessagesRead,
+                                long lastRecordsRead,
                                 long joinStartMs,
                                 AtomicLong joinTimeMsInSingleRound) {
-        long numMessages = options.numMessages();
+        long numRecords = options.numRecords();
         long recordFetchTimeoutMs = options.recordFetchTimeoutMs();
         long reportingIntervalMs = options.reportingIntervalMs();
         boolean showDetailedStats = options.showDetailedStats();
@@ -149,55 +150,55 @@ public class ConsumerPerformance {
         long lastReportTimeMs = currentTimeMs;
         long lastConsumedTimeMs = currentTimeMs;
 
-        while (messagesRead < numMessages && currentTimeMs - 
lastConsumedTimeMs <= recordFetchTimeoutMs) {
+        while (recordsRead < numRecords && currentTimeMs - lastConsumedTimeMs 
<= recordFetchTimeoutMs) {
             ConsumerRecords<byte[], byte[]> records = 
consumer.poll(Duration.ofMillis(100));
             currentTimeMs = System.currentTimeMillis();
             if (!records.isEmpty())
                 lastConsumedTimeMs = currentTimeMs;
             for (ConsumerRecord<byte[], byte[]> record : records) {
-                messagesRead += 1;
+                recordsRead += 1;
                 if (record.key() != null)
                     bytesRead += record.key().length;
                 if (record.value() != null)
                     bytesRead += record.value().length;
                 if (currentTimeMs - lastReportTimeMs >= reportingIntervalMs) {
                     if (showDetailedStats)
-                        printConsumerProgress(0, bytesRead, lastBytesRead, 
messagesRead, lastMessagesRead,
+                        printConsumerProgress(0, bytesRead, lastBytesRead, 
recordsRead, lastRecordsRead,
                             lastReportTimeMs, currentTimeMs, dateFormat, 
joinTimeMsInSingleRound.get());
                     joinTimeMsInSingleRound = new AtomicLong(0);
                     lastReportTimeMs = currentTimeMs;
-                    lastMessagesRead = messagesRead;
+                    lastRecordsRead = recordsRead;
                     lastBytesRead = bytesRead;
                 }
             }
         }
 
-        if (messagesRead < numMessages)
-            System.out.printf("WARNING: Exiting before consuming the expected 
number of messages: timeout (%d ms) exceeded. " +
+        if (recordsRead < numRecords)
+            System.out.printf("WARNING: Exiting before consuming the expected 
number of records: timeout (%d ms) exceeded. " +
                 "You can use the --timeout option to increase the timeout.%n", 
recordFetchTimeoutMs);
-        totalMessagesRead.set(messagesRead);
+        totalRecordsRead.set(recordsRead);
         totalBytesRead.set(bytesRead);
     }
 
     protected static void printConsumerProgress(int id,
                                                 long bytesRead,
                                                 long lastBytesRead,
-                                                long messagesRead,
-                                                long lastMessagesRead,
+                                                long recordsRead,
+                                                long lastRecordsRead,
                                                 long startMs,
                                                 long endMs,
                                                 SimpleDateFormat dateFormat,
                                                 long joinTimeMsInSingleRound) {
-        printBasicProgress(id, bytesRead, lastBytesRead, messagesRead, 
lastMessagesRead, startMs, endMs, dateFormat);
-        printExtendedProgress(bytesRead, lastBytesRead, messagesRead, 
lastMessagesRead, startMs, endMs, joinTimeMsInSingleRound);
+        printBasicProgress(id, bytesRead, lastBytesRead, recordsRead, 
lastRecordsRead, startMs, endMs, dateFormat);
+        printExtendedProgress(bytesRead, lastBytesRead, recordsRead, 
lastRecordsRead, startMs, endMs, joinTimeMsInSingleRound);
         System.out.println();
     }
 
     private static void printBasicProgress(int id,
                                            long bytesRead,
                                            long lastBytesRead,
-                                           long messagesRead,
-                                           long lastMessagesRead,
+                                           long recordsRead,
+                                           long lastRecordsRead,
                                            long startMs,
                                            long endMs,
                                            SimpleDateFormat dateFormat) {
@@ -205,25 +206,25 @@ public class ConsumerPerformance {
         double totalMbRead = (bytesRead * 1.0) / (1024 * 1024);
         double intervalMbRead = ((bytesRead - lastBytesRead) * 1.0) / (1024 * 
1024);
         double intervalMbPerSec = 1000.0 * intervalMbRead / elapsedMs;
-        double intervalMessagesPerSec = ((messagesRead - lastMessagesRead) / 
elapsedMs) * 1000.0;
+        double intervalRecordsPerSec = ((recordsRead - lastRecordsRead) / 
elapsedMs) * 1000.0;
         System.out.printf("%s, %d, %.4f, %.4f, %d, %.4f", 
dateFormat.format(endMs), id,
-            totalMbRead, intervalMbPerSec, messagesRead, 
intervalMessagesPerSec);
+            totalMbRead, intervalMbPerSec, recordsRead, intervalRecordsPerSec);
     }
 
     private static void printExtendedProgress(long bytesRead,
                                               long lastBytesRead,
-                                              long messagesRead,
-                                              long lastMessagesRead,
+                                              long recordsRead,
+                                              long lastRecordsRead,
                                               long startMs,
                                               long endMs,
                                               long joinTimeMsInSingleRound) {
         long fetchTimeMs = endMs - startMs - joinTimeMsInSingleRound;
         double intervalMbRead = ((bytesRead - lastBytesRead) * 1.0) / (1024 * 
1024);
-        long intervalMessagesRead = messagesRead - lastMessagesRead;
+        long intervalRecordsRead = recordsRead - lastRecordsRead;
         double intervalMbPerSec = (fetchTimeMs <= 0) ? 0.0 : 1000.0 * 
intervalMbRead / fetchTimeMs;
-        double intervalMessagesPerSec = (fetchTimeMs <= 0) ? 0.0 : 1000.0 * 
intervalMessagesRead / fetchTimeMs;
+        double intervalRecordsPerSec = (fetchTimeMs <= 0) ? 0.0 : 1000.0 * 
intervalRecordsRead / fetchTimeMs;
         System.out.printf(", %d, %d, %.4f, %.4f", joinTimeMsInSingleRound,
-            fetchTimeMs, intervalMbPerSec, intervalMessagesPerSec);
+            fetchTimeMs, intervalMbPerSec, intervalRecordsPerSec);
     }
 
     public static class ConsumerPerfRebListener implements 
ConsumerRebalanceListener {
@@ -256,13 +257,18 @@ public class ConsumerPerformance {
         private final OptionSpec<String> includeOpt;
         private final OptionSpec<String> groupIdOpt;
         private final OptionSpec<Integer> fetchSizeOpt;
+        private final OptionSpec<String> commandPropertiesOpt;
         private final OptionSpec<Void> resetBeginningOffsetOpt;
         private final OptionSpec<Integer> socketBufferSizeOpt;
+        @Deprecated(since = "4.2", forRemoval = true)
         private final OptionSpec<String> consumerConfigOpt;
+        private final OptionSpec<String> commandConfigOpt;
         private final OptionSpec<Void> printMetricsOpt;
         private final OptionSpec<Void> showDetailedStatsOpt;
         private final OptionSpec<Long> recordFetchTimeoutOpt;
+        @Deprecated(since = "4.2", forRemoval = true)
         private final OptionSpec<Long> numMessagesOpt;
+        private final OptionSpec<Long> numRecordsOpt;
         private final OptionSpec<Long> reportingIntervalOpt;
         private final OptionSpec<String> dateFormatOpt;
         private final OptionSpec<Void> hideHeaderOpt;
@@ -291,26 +297,41 @@ public class ConsumerPerformance {
                 .describedAs("size")
                 .ofType(Integer.class)
                 .defaultsTo(1024 * 1024);
+            commandPropertiesOpt = parser.accepts("command-property", "Kafka 
consumer related configuration properties like client.id. " +
+                    "These configs take precedence over those passed via 
--command-config or --consumer.config.")
+                .withRequiredArg()
+                .describedAs("prop1=val1")
+                .ofType(String.class);
             resetBeginningOffsetOpt = parser.accepts("from-latest", "If the 
consumer does not already have an established " +
-                "offset to consume from, start with the latest message present 
in the log rather than the earliest message.");
+                "offset to consume from, start with the latest record present 
in the log rather than the earliest record.");
             socketBufferSizeOpt = parser.accepts("socket-buffer-size", "The 
size of the tcp RECV size.")
                 .withRequiredArg()
                 .describedAs("size")
                 .ofType(Integer.class)
                 .defaultsTo(2 * 1024 * 1024);
-            consumerConfigOpt = parser.accepts("consumer.config", "Consumer 
config properties file.")
+            consumerConfigOpt = parser.accepts("consumer.config", 
"(DEPRECATED) Consumer config properties file. " +
+                            "This option will be removed in a future version. 
Use --command-config instead.")
+                .withRequiredArg()
+                .describedAs("config file")
+                .ofType(String.class);
+            commandConfigOpt = parser.accepts("command-config", "Config 
properties file.")
                 .withRequiredArg()
                 .describedAs("config file")
                 .ofType(String.class);
             printMetricsOpt = parser.accepts("print-metrics", "Print out the 
metrics.");
             showDetailedStatsOpt = parser.accepts("show-detailed-stats", "If 
set, stats are reported for each reporting " +
-                "interval as configured by reporting-interval");
+                "interval as configured by reporting-interval.");
             recordFetchTimeoutOpt = parser.accepts("timeout", "The maximum 
allowed time in milliseconds between returned records.")
                 .withOptionalArg()
                 .describedAs("milliseconds")
                 .ofType(Long.class)
                 .defaultsTo(10_000L);
-            numMessagesOpt = parser.accepts("messages", "REQUIRED: The number 
of messages to consume.")
+            numMessagesOpt = parser.accepts("messages", "(DEPRECATED) The 
number of records to consume. " +
+                            "This option will be removed in a future version. 
Use --num-records instead.")
+                .withRequiredArg()
+                .describedAs("count")
+                .ofType(Long.class);
+            numRecordsOpt = parser.accepts("num-records", "REQUIRED: The 
number of records to consume.")
                 .withRequiredArg()
                 .describedAs("count")
                 .ofType(Long.class);
@@ -326,7 +347,7 @@ public class ConsumerPerformance {
                 .describedAs("date format")
                 .ofType(String.class)
                 .defaultsTo("yyyy-MM-dd HH:mm:ss:SSS");
-            hideHeaderOpt = parser.accepts("hide-header", "If set, skips 
printing the header for the stats");
+            hideHeaderOpt = parser.accepts("hide-header", "If set, skips 
printing the header for the stats.");
             try {
                 options = parser.parse(args);
             } catch (OptionException e) {
@@ -335,8 +356,19 @@ public class ConsumerPerformance {
             }
             if (options != null) {
                 CommandLineUtils.maybePrintHelpOrVersion(this, "This tool is 
used to verify the consumer performance.");
-                CommandLineUtils.checkRequiredArgs(parser, options, 
numMessagesOpt);
+                CommandLineUtils.checkRequiredArgs(parser, options, 
bootstrapServerOpt);
                 CommandLineUtils.checkOneOfArgs(parser, options, topicOpt, 
includeOpt);
+
+                CommandLineUtils.checkOneOfArgs(parser, options, 
numMessagesOpt, numRecordsOpt);
+                CommandLineUtils.checkInvalidArgs(parser, options, 
consumerConfigOpt, commandConfigOpt);
+
+                if (options.has(numMessagesOpt)) {
+                    System.out.println("Warning: --messages is deprecated. Use 
--num-records instead.");
+                }
+
+                if (options.has(consumerConfigOpt)) {
+                    System.out.println("Warning: --consumer.config is 
deprecated. Use --command-config instead.");
+                }
             }
         }
 
@@ -348,10 +380,23 @@ public class ConsumerPerformance {
             return options.valueOf(bootstrapServerOpt);
         }
 
+        private Properties readProps(List<String> commandProperties, String 
commandConfigFile) throws IOException {
+            Properties props = commandConfigFile != null
+                    ? Utils.loadProps(commandConfigFile)
+                    : new Properties();
+            props.putAll(parseKeyValueArgs(commandProperties));
+            return props;
+        }
+
         public Properties props() throws IOException {
-            Properties props = (options.has(consumerConfigOpt))
-                ? Utils.loadProps(options.valueOf(consumerConfigOpt))
-                : new Properties();
+            List<String> commandProperties = 
options.valuesOf(commandPropertiesOpt);
+            String commandConfigFile;
+            if (options.has(consumerConfigOpt)) {
+                commandConfigFile = options.valueOf(consumerConfigOpt);
+            } else {
+                commandConfigFile = options.valueOf(commandConfigOpt);
+            }
+            Properties props = readProps(commandProperties, commandConfigFile);
             props.put(ConsumerConfig.BOOTSTRAP_SERVERS_CONFIG, 
brokerHostsAndPorts());
             props.put(ConsumerConfig.GROUP_ID_CONFIG, 
options.valueOf(groupIdOpt));
             props.put(ConsumerConfig.RECEIVE_BUFFER_CONFIG, 
options.valueOf(socketBufferSizeOpt).toString());
@@ -378,8 +423,10 @@ public class ConsumerPerformance {
                     : Optional.empty();
         }
 
-        public long numMessages() {
-            return options.valueOf(numMessagesOpt);
+        public long numRecords() {
+            return options.has(numMessagesOpt)
+                    ? options.valueOf(numMessagesOpt)
+                    : options.valueOf(numRecordsOpt);
         }
 
         public long reportingIntervalMs() {
diff --git 
a/tools/src/main/java/org/apache/kafka/tools/ShareConsumerPerformance.java 
b/tools/src/main/java/org/apache/kafka/tools/ShareConsumerPerformance.java
index 5d6179efaeb..51c66704668 100644
--- a/tools/src/main/java/org/apache/kafka/tools/ShareConsumerPerformance.java
+++ b/tools/src/main/java/org/apache/kafka/tools/ShareConsumerPerformance.java
@@ -55,6 +55,7 @@ import joptsimple.OptionException;
 import joptsimple.OptionSpec;
 
 import static joptsimple.util.RegexMatcher.regex;
+import static org.apache.kafka.server.util.CommandLineUtils.parseKeyValueArgs;
 
 public class ShareConsumerPerformance {
     private static final Logger LOG = 
LoggerFactory.getLogger(ShareConsumerPerformance.class);
@@ -67,7 +68,7 @@ public class ShareConsumerPerformance {
         try {
             LOG.info("Starting share consumer/consumers...");
             ShareConsumerPerfOptions options = new 
ShareConsumerPerfOptions(args);
-            AtomicLong totalMessagesRead = new AtomicLong(0);
+            AtomicLong totalRecordsRead = new AtomicLong(0);
             AtomicLong totalBytesRead = new AtomicLong(0);
 
             if (!options.hideHeader())
@@ -78,7 +79,7 @@ public class ShareConsumerPerformance {
                 
shareConsumers.add(shareConsumerCreator.apply(options.props()));
             }
             long startMs = System.currentTimeMillis();
-            consume(shareConsumers, options, totalMessagesRead, 
totalBytesRead, startMs);
+            consume(shareConsumers, options, totalRecordsRead, totalBytesRead, 
startMs);
             long endMs = System.currentTimeMillis();
 
             List<Map<MetricName, ? extends Metric>> shareConsumersMetrics = 
new ArrayList<>();
@@ -93,7 +94,7 @@ public class ShareConsumerPerformance {
             // Print final stats for share group.
             double elapsedSec = (endMs - startMs) / 1_000.0;
             long fetchTimeInMs = endMs - startMs;
-            printStats(totalBytesRead.get(), totalMessagesRead.get(), 
elapsedSec, fetchTimeInMs, startMs, endMs,
+            printStats(totalBytesRead.get(), totalRecordsRead.get(), 
elapsedSec, fetchTimeInMs, startMs, endMs,
                     options.dateFormat(), -1);
 
             shareConsumersMetrics.forEach(ToolsUtils::printMetrics);
@@ -113,15 +114,15 @@ public class ShareConsumerPerformance {
 
     private static void consume(List<ShareConsumer<byte[], byte[]>> 
shareConsumers,
                                 ShareConsumerPerfOptions options,
-                                AtomicLong totalMessagesRead,
+                                AtomicLong totalRecordsRead,
                                 AtomicLong totalBytesRead,
                                 long startMs) throws ExecutionException, 
InterruptedException {
-        long numMessages = options.numMessages();
+        long numRecords = options.numRecords();
         long recordFetchTimeoutMs = options.recordFetchTimeoutMs();
         shareConsumers.forEach(shareConsumer -> 
shareConsumer.subscribe(options.topic()));
 
         // Now start the benchmark.
-        AtomicLong messagesRead = new AtomicLong(0);
+        AtomicLong recordsRead = new AtomicLong(0);
         AtomicLong bytesRead = new AtomicLong(0);
         List<ShareConsumerConsumption> shareConsumersConsumptionDetails = new 
ArrayList<>();
 
@@ -133,7 +134,7 @@ public class ShareConsumerPerformance {
             ShareConsumerConsumption shareConsumerConsumption = new 
ShareConsumerConsumption(0, 0);
             futures.add(executorService.submit(() -> {
                 try {
-                    
consumeMessagesForSingleShareConsumer(shareConsumers.get(index), messagesRead, 
bytesRead, options,
+                    
consumeRecordsForSingleShareConsumer(shareConsumers.get(index), recordsRead, 
bytesRead, options,
                         shareConsumerConsumption, index + 1);
                 } catch (InterruptedException e) {
                     throw new RuntimeException(e);
@@ -171,22 +172,22 @@ public class ShareConsumerPerformance {
                 // Print stats for share consumer.
                 double elapsedSec = (endMs - startMs) / 1_000.0;
                 long fetchTimeInMs = endMs - startMs;
-                long messagesReadByConsumer = 
shareConsumersConsumptionDetails.get(index).messagesConsumed();
+                long recordsReadByConsumer = 
shareConsumersConsumptionDetails.get(index).recordsConsumed();
                 long bytesReadByConsumer = 
shareConsumersConsumptionDetails.get(index).bytesConsumed();
-                printStats(bytesReadByConsumer, messagesReadByConsumer, 
elapsedSec, fetchTimeInMs, startMs, endMs, options.dateFormat(), index + 1);
+                printStats(bytesReadByConsumer, recordsReadByConsumer, 
elapsedSec, fetchTimeInMs, startMs, endMs, options.dateFormat(), index + 1);
             }
         }
 
-        if (messagesRead.get() < numMessages) {
-            System.out.printf("WARNING: Exiting before consuming the expected 
number of messages: timeout (%d ms) exceeded. " +
+        if (recordsRead.get() < numRecords) {
+            System.out.printf("WARNING: Exiting before consuming the expected 
number of records: timeout (%d ms) exceeded. " +
                     "You can use the --timeout option to increase the 
timeout.%n", recordFetchTimeoutMs);
         }
-        totalMessagesRead.set(messagesRead.get());
+        totalRecordsRead.set(recordsRead.get());
         totalBytesRead.set(bytesRead.get());
     }
 
-    private static void 
consumeMessagesForSingleShareConsumer(ShareConsumer<byte[], byte[]> 
shareConsumer,
-                                                              AtomicLong 
totalMessagesRead,
+    private static void 
consumeRecordsForSingleShareConsumer(ShareConsumer<byte[], byte[]> 
shareConsumer,
+                                                              AtomicLong 
totalRecordsRead,
                                                               AtomicLong 
totalBytesRead,
                                                               
ShareConsumerPerfOptions options,
                                                               
ShareConsumerConsumption shareConsumerConsumption,
@@ -197,17 +198,17 @@ public class ShareConsumerPerformance {
         long lastReportTimeMs = currentTimeMs;
 
         long lastBytesRead = 0L;
-        long lastMessagesRead = 0L;
-        long messagesReadByConsumer = 0L;
+        long lastRecordsRead = 0L;
+        long recordsReadByConsumer = 0L;
         long bytesReadByConsumer = 0L;
-        while (totalMessagesRead.get() < options.numMessages() && 
currentTimeMs - lastConsumedTimeMs <= options.recordFetchTimeoutMs()) {
+        while (totalRecordsRead.get() < options.numRecords() && currentTimeMs 
- lastConsumedTimeMs <= options.recordFetchTimeoutMs()) {
             ConsumerRecords<byte[], byte[]> records = 
shareConsumer.poll(Duration.ofMillis(100));
             currentTimeMs = System.currentTimeMillis();
             if (!records.isEmpty())
                 lastConsumedTimeMs = currentTimeMs;
             for (ConsumerRecord<byte[], byte[]> record : records) {
-                messagesReadByConsumer += 1;
-                totalMessagesRead.addAndGet(1);
+                recordsReadByConsumer += 1;
+                totalRecordsRead.addAndGet(1);
                 if (record.key() != null) {
                     bytesReadByConsumer += record.key().length;
                     totalBytesRead.addAndGet(record.key().length);
@@ -218,13 +219,13 @@ public class ShareConsumerPerformance {
                 }
                 if (currentTimeMs - lastReportTimeMs >= 
options.reportingIntervalMs()) {
                     if (options.showDetailedStats())
-                        printShareConsumerProgress(bytesReadByConsumer, 
lastBytesRead, messagesReadByConsumer, lastMessagesRead,
+                        printShareConsumerProgress(bytesReadByConsumer, 
lastBytesRead, recordsReadByConsumer, lastRecordsRead,
                                 lastReportTimeMs, currentTimeMs, dateFormat, 
index);
                     lastReportTimeMs = currentTimeMs;
-                    lastMessagesRead = messagesReadByConsumer;
+                    lastRecordsRead = recordsReadByConsumer;
                     lastBytesRead = bytesReadByConsumer;
                 }
-                
shareConsumerConsumption.updateMessagesConsumed(messagesReadByConsumer);
+                
shareConsumerConsumption.updateRecordsConsumed(recordsReadByConsumer);
                 
shareConsumerConsumption.updateBytesConsumed(bytesReadByConsumer);
             }
         }
@@ -232,8 +233,8 @@ public class ShareConsumerPerformance {
 
     protected static void printShareConsumerProgress(long bytesRead,
                                                 long lastBytesRead,
-                                                long messagesRead,
-                                                long lastMessagesRead,
+                                                long recordsRead,
+                                                long lastRecordsRead,
                                                 long startMs,
                                                 long endMs,
                                                 SimpleDateFormat dateFormat,
@@ -242,18 +243,18 @@ public class ShareConsumerPerformance {
         double totalMbRead = (bytesRead * 1.0) / (1024 * 1024);
         double intervalMbRead = ((bytesRead - lastBytesRead) * 1.0) / (1024 * 
1024);
         double intervalMbPerSec = 1000.0 * intervalMbRead / elapsedMs;
-        double intervalMessagesPerSec = ((messagesRead - lastMessagesRead) / 
elapsedMs) * 1000.0;
+        double intervalRecordsPerSec = ((recordsRead - lastRecordsRead) / 
elapsedMs) * 1000.0;
         long fetchTimeMs = endMs - startMs;
 
         System.out.printf("%s, %s, %.4f, %.4f, %.4f, %d, %d for share consumer 
%d", dateFormat.format(startMs), dateFormat.format(endMs),
-            totalMbRead, intervalMbPerSec, intervalMessagesPerSec, 
messagesRead, fetchTimeMs, index);
+            totalMbRead, intervalMbPerSec, intervalRecordsPerSec, recordsRead, 
fetchTimeMs, index);
         System.out.println();
     }
 
     // Prints stats for both share consumer and share group. For share group, 
index is -1. For share consumer,
     // index is >= 1.
     private static void printStats(long bytesRead,
-                                   long messagesRead,
+                                   long recordsRead,
                                    double elapsedSec,
                                    long fetchTimeInMs,
                                    long startMs,
@@ -268,8 +269,8 @@ public class ShareConsumerPerformance {
                     dateFormat.format(endMs),
                     totalMbRead,
                     totalMbRead / elapsedSec,
-                    messagesRead / elapsedSec,
-                    messagesRead,
+                    recordsRead / elapsedSec,
+                    recordsRead,
                     fetchTimeInMs
             );
             return;
@@ -279,8 +280,8 @@ public class ShareConsumerPerformance {
                 dateFormat.format(endMs),
                 totalMbRead,
                 totalMbRead / elapsedSec,
-                messagesRead / elapsedSec,
-                messagesRead,
+                recordsRead / elapsedSec,
+                recordsRead,
                 fetchTimeInMs
         );
     }
@@ -290,12 +291,17 @@ public class ShareConsumerPerformance {
         private final OptionSpec<String> topicOpt;
         private final OptionSpec<String> groupIdOpt;
         private final OptionSpec<Integer> fetchSizeOpt;
+        private final OptionSpec<String> commandPropertiesOpt;
         private final OptionSpec<Integer> socketBufferSizeOpt;
+        @Deprecated(since = "4.2", forRemoval = true)
         private final OptionSpec<String> consumerConfigOpt;
+        private final OptionSpec<String> commandConfigOpt;
         private final OptionSpec<Void> printMetricsOpt;
         private final OptionSpec<Void> showDetailedStatsOpt;
         private final OptionSpec<Long> recordFetchTimeoutOpt;
+        @Deprecated(since = "4.2", forRemoval = true)
         private final OptionSpec<Long> numMessagesOpt;
+        private final OptionSpec<Long> numRecordsOpt;
         private final OptionSpec<Long> reportingIntervalOpt;
         private final OptionSpec<String> dateFormatOpt;
         private final OptionSpec<Void> hideHeaderOpt;
@@ -322,24 +328,39 @@ public class ShareConsumerPerformance {
                     .describedAs("size")
                     .ofType(Integer.class)
                     .defaultsTo(1024 * 1024);
+            commandPropertiesOpt = parser.accepts("command-property", "Kafka 
share consumer related configuration properties like client.id. " +
+                            "These configs take precedence over those passed 
via --command-config or --consumer.config.")
+                    .withRequiredArg()
+                    .describedAs("prop1=val1")
+                    .ofType(String.class);
             socketBufferSizeOpt = parser.accepts("socket-buffer-size", "The 
size of the tcp RECV size.")
                     .withRequiredArg()
                     .describedAs("size")
                     .ofType(Integer.class)
                     .defaultsTo(2 * 1024 * 1024);
-            consumerConfigOpt = parser.accepts("consumer.config", "Share 
consumer config properties file.")
+            consumerConfigOpt = parser.accepts("consumer.config", 
"(DEPRECATED) Share consumer config properties file. " +
+                    "This option will be removed in a future version. Use 
--command-config instead.")
+                    .withRequiredArg()
+                    .describedAs("config file")
+                    .ofType(String.class);
+            commandConfigOpt = parser.accepts("command-config", "Config 
properties file.")
                     .withRequiredArg()
                     .describedAs("config file")
                     .ofType(String.class);
             printMetricsOpt = parser.accepts("print-metrics", "Print out the 
metrics.");
             showDetailedStatsOpt = parser.accepts("show-detailed-stats", "If 
set, stats are reported for each reporting " +
-                    "interval as configured by reporting-interval");
+                    "interval as configured by reporting-interval.");
             recordFetchTimeoutOpt = parser.accepts("timeout", "The maximum 
allowed time in milliseconds between returned records.")
                     .withOptionalArg()
                     .describedAs("milliseconds")
                     .ofType(Long.class)
                     .defaultsTo(10_000L);
-            numMessagesOpt = parser.accepts("messages", "REQUIRED: The number 
of messages to consume.")
+            numMessagesOpt = parser.accepts("messages", "(DEPRECATED) The 
number of records to consume. " +
+                            "This option will be removed in a future version. 
Use --num-records instead.")
+                    .withRequiredArg()
+                    .describedAs("count")
+                    .ofType(Long.class);
+            numRecordsOpt = parser.accepts("num-records", "REQUIRED: The 
number of records to consume.")
                     .withRequiredArg()
                     .describedAs("count")
                     .ofType(Long.class);
@@ -355,7 +376,7 @@ public class ShareConsumerPerformance {
                     .describedAs("date format")
                     .ofType(String.class)
                     .defaultsTo("yyyy-MM-dd HH:mm:ss:SSS");
-            hideHeaderOpt = parser.accepts("hide-header", "If set, skips 
printing the header for the stats");
+            hideHeaderOpt = parser.accepts("hide-header", "If set, skips 
printing the header for the stats.");
             numThreadsOpt = parser.accepts("threads", "The number of share 
consumers to use for sharing the load.")
                     .withRequiredArg()
                     .describedAs("count")
@@ -371,7 +392,18 @@ public class ShareConsumerPerformance {
             }
             if (options != null) {
                 CommandLineUtils.maybePrintHelpOrVersion(this, "This tool is 
used to verify the share consumer performance.");
-                CommandLineUtils.checkRequiredArgs(parser, options, topicOpt, 
numMessagesOpt);
+                CommandLineUtils.checkRequiredArgs(parser, options, topicOpt, 
bootstrapServerOpt);
+
+                CommandLineUtils.checkOneOfArgs(parser, options, 
numMessagesOpt, numRecordsOpt);
+                CommandLineUtils.checkInvalidArgs(parser, options, 
consumerConfigOpt, commandConfigOpt);
+
+                if (options.has(numMessagesOpt)) {
+                    System.out.println("Warning: --messages is deprecated. Use 
--num-records instead.");
+                }
+
+                if (options.has(consumerConfigOpt)) {
+                    System.out.println("Warning: --consumer.config is 
deprecated. Use --command-config instead.");
+                }
             }
         }
 
@@ -383,10 +415,23 @@ public class ShareConsumerPerformance {
             return options.valueOf(bootstrapServerOpt);
         }
 
-        public Properties props() throws IOException {
-            Properties props = (options.has(consumerConfigOpt))
-                    ? Utils.loadProps(options.valueOf(consumerConfigOpt))
+        private Properties readProps(List<String> commandProperties, String 
commandConfigFile) throws IOException {
+            Properties props = commandConfigFile != null
+                    ? Utils.loadProps(commandConfigFile)
                     : new Properties();
+            props.putAll(parseKeyValueArgs(commandProperties));
+            return props;
+        }
+
+        public Properties props() throws IOException {
+            List<String> commandProperties = 
options.valuesOf(commandPropertiesOpt);
+            String commandConfigFile;
+            if (options.has(consumerConfigOpt)) {
+                commandConfigFile = options.valueOf(consumerConfigOpt);
+            } else {
+                commandConfigFile = options.valueOf(commandConfigOpt);
+            }
+            Properties props = readProps(commandProperties, commandConfigFile);
             props.put(ConsumerConfig.BOOTSTRAP_SERVERS_CONFIG, 
brokerHostsAndPorts());
             props.put(ConsumerConfig.GROUP_ID_CONFIG, 
options.valueOf(groupIdOpt));
             props.put(ConsumerConfig.RECEIVE_BUFFER_CONFIG, 
options.valueOf(socketBufferSizeOpt).toString());
@@ -403,8 +448,10 @@ public class ShareConsumerPerformance {
             return Set.of(options.valueOf(topicOpt));
         }
 
-        public long numMessages() {
-            return options.valueOf(numMessagesOpt);
+        public long numRecords() {
+            return options.has(numMessagesOpt)
+                    ? options.valueOf(numMessagesOpt)
+                    : options.valueOf(numRecordsOpt);
         }
 
         public int threads() {
@@ -439,26 +486,26 @@ public class ShareConsumerPerformance {
         }
     }
 
-    // Helper class to know the final messages and bytes consumer by share 
consumer at the end of consumption.
+    // Helper class to know the final records and bytes consumed by share 
consumer at the end of consumption.
     private static class ShareConsumerConsumption {
-        private long messagesConsumed;
+        private long recordsConsumed;
         private long bytesConsumed;
 
-        public ShareConsumerConsumption(long messagesConsumed, long 
bytesConsumed) {
-            this.messagesConsumed = messagesConsumed;
+        public ShareConsumerConsumption(long recordsConsumed, long 
bytesConsumed) {
+            this.recordsConsumed = recordsConsumed;
             this.bytesConsumed = bytesConsumed;
         }
 
-        public long messagesConsumed() {
-            return messagesConsumed;
+        public long recordsConsumed() {
+            return recordsConsumed;
         }
 
         public long bytesConsumed() {
             return bytesConsumed;
         }
 
-        public void updateMessagesConsumed(long messagesConsumed) {
-            this.messagesConsumed = messagesConsumed;
+        public void updateRecordsConsumed(long recordsConsumed) {
+            this.recordsConsumed = recordsConsumed;
         }
 
         public void updateBytesConsumed(long bytesConsumed) {
diff --git 
a/tools/src/test/java/org/apache/kafka/tools/ConsumerPerformanceTest.java 
b/tools/src/test/java/org/apache/kafka/tools/ConsumerPerformanceTest.java
index df6c3a93966..497deb7808d 100644
--- a/tools/src/test/java/org/apache/kafka/tools/ConsumerPerformanceTest.java
+++ b/tools/src/test/java/org/apache/kafka/tools/ConsumerPerformanceTest.java
@@ -74,7 +74,7 @@ public class ConsumerPerformanceTest {
         String[] args = new String[]{
             "--bootstrap-server", "localhost:9092",
             "--topic", "test",
-            "--messages", "10",
+            "--num-records", "10",
             "--print-metrics"
         };
 
@@ -82,15 +82,64 @@ public class ConsumerPerformanceTest {
 
         assertEquals("localhost:9092", config.brokerHostsAndPorts());
         assertTrue(config.topic().get().contains("test"));
-        assertEquals(10, config.numMessages());
+        assertEquals(10, config.numRecords());
     }
 
     @Test
-    public void testConfigWithUnrecognizedOption() {
+    public void testBootstrapServerNotPresent() {
+        String[] args = new String[]{
+            "--topic", "test"
+        };
+
+        String err = ToolsTestUtils.captureStandardErr(() ->
+                new ConsumerPerformance.ConsumerPerfOptions(args));
+        assertTrue(err.contains("Missing required argument 
\"[bootstrap-server]\""));
+    }
+
+    @Test
+    public void testNumOfRecordsNotPresent() {
+        String[] args = new String[]{
+            "--bootstrap-server", "localhost:9092",
+            "--topic", "test"
+        };
+
+        String err = ToolsTestUtils.captureStandardErr(() ->
+                new ConsumerPerformance.ConsumerPerfOptions(args));
+        assertTrue(err.contains("Exactly one of the following arguments is 
required:"));
+    }
+
+    @Test
+    public void testMessagesDeprecated() {
+        String[] args = new String[]{
+            "--bootstrap-server", "localhost:9092",
+            "--topic", "test",
+            "--messages", "10"
+        };
+
+        ConsumerPerformance.ConsumerPerfOptions config = new 
ConsumerPerformance.ConsumerPerfOptions(args);
+        assertEquals(10, config.numRecords());
+    }
+
+    @Test
+    public void testNumOfRecordsWithMessagesPresent() {
         String[] args = new String[]{
             "--bootstrap-server", "localhost:9092",
             "--topic", "test",
             "--messages", "10",
+            "--num-records", "20"
+        };
+
+        String err = ToolsTestUtils.captureStandardErr(() ->
+                new ConsumerPerformance.ConsumerPerfOptions(args));
+        assertTrue(err.contains("Exactly one of the following arguments is 
required"));
+    }
+
+    @Test
+    public void testConfigWithUnrecognizedOption() {
+        String[] args = new String[]{
+            "--bootstrap-server", "localhost:9092",
+            "--topic", "test",
+            "--num-records", "10",
             "--new-consumer"
         };
 
@@ -104,14 +153,14 @@ public class ConsumerPerformanceTest {
         String[] args = new String[]{
             "--bootstrap-server", "localhost:9092",
             "--include", "test.*",
-            "--messages", "10"
+            "--num-records", "10"
         };
 
         ConsumerPerformance.ConsumerPerfOptions config = new 
ConsumerPerformance.ConsumerPerfOptions(args);
 
         assertEquals("localhost:9092", config.brokerHostsAndPorts());
         assertTrue(config.include().get().toString().contains("test.*"));
-        assertEquals(10, config.numMessages());
+        assertEquals(10, config.numRecords());
     }
 
     @Test
@@ -120,7 +169,7 @@ public class ConsumerPerformanceTest {
             "--bootstrap-server", "localhost:9092",
             "--topic", "test",
             "--include", "test.*",
-            "--messages", "10"
+            "--num-records", "10"
         };
 
         String err = ToolsTestUtils.captureStandardErr(() -> new 
ConsumerPerformance.ConsumerPerfOptions(args));
@@ -132,7 +181,7 @@ public class ConsumerPerformanceTest {
     public void testConfigWithoutTopicAndInclude() {
         String[] args = new String[]{
             "--bootstrap-server", "localhost:9092",
-            "--messages", "10"
+            "--num-records", "10"
         };
 
         String err = ToolsTestUtils.captureStandardErr(() -> new 
ConsumerPerformance.ConsumerPerfOptions(args));
@@ -140,9 +189,36 @@ public class ConsumerPerformanceTest {
         assertTrue(err.contains("Exactly one of the following arguments is 
required: [topic], [include]"));
     }
 
+    @Test
+    public void testCommandProperty() throws IOException {
+        Path configPath = 
tempDir.resolve("test_command_property_consumer_perf.conf");
+        Files.deleteIfExists(configPath);
+        File tempFile = Files.createFile(configPath).toFile();
+        try (PrintWriter output = new 
PrintWriter(Files.newOutputStream(tempFile.toPath()))) {
+            output.println("client.id=consumer-1");
+            output.flush();
+        }
+
+        String[] args = new String[]{
+            "--bootstrap-server", "localhost:9092",
+            "--topic", "test",
+            "--num-records", "10",
+            "--command-property", "client.id=consumer-2",
+            "--command-config", tempFile.getAbsolutePath(),
+            "--command-property", "prop=val"
+        };
+
+        ConsumerPerformance.ConsumerPerfOptions config = new 
ConsumerPerformance.ConsumerPerfOptions(args);
+
+        assertEquals("consumer-2", 
config.props().getProperty(ConsumerConfig.CLIENT_ID_CONFIG));
+        assertEquals("val", config.props().getProperty("prop"));
+    }
+
     @Test
     public void testClientIdOverride() throws IOException {
-        File tempFile = 
Files.createFile(tempDir.resolve("test_consumer_config.conf")).toFile();
+        Path configPath = 
tempDir.resolve("test_client_id_override_consumer_perf.conf");
+        Files.deleteIfExists(configPath);
+        File tempFile = Files.createFile(configPath).toFile();
         try (PrintWriter output = new 
PrintWriter(Files.newOutputStream(tempFile.toPath()))) {
             output.println("client.id=consumer-1");
             output.flush();
@@ -151,7 +227,29 @@ public class ConsumerPerformanceTest {
         String[] args = new String[]{
             "--bootstrap-server", "localhost:9092",
             "--topic", "test",
-            "--messages", "10",
+            "--num-records", "10",
+            "--command-config", tempFile.getAbsolutePath()
+        };
+
+        ConsumerPerformance.ConsumerPerfOptions config = new 
ConsumerPerformance.ConsumerPerfOptions(args);
+
+        assertEquals("consumer-1", 
config.props().getProperty(ConsumerConfig.CLIENT_ID_CONFIG));
+    }
+
+    @Test
+    public void testConsumerConfigDeprecated() throws IOException {
+        Path configPath = 
tempDir.resolve("test_consumer_config_deprecated_consumer_perf.conf");
+        Files.deleteIfExists(configPath);
+        File tempFile = Files.createFile(configPath).toFile();
+        try (PrintWriter output = new 
PrintWriter(Files.newOutputStream(tempFile.toPath()))) {
+            output.println("client.id=consumer-1");
+            output.flush();
+        }
+
+        String[] args = new String[]{
+            "--bootstrap-server", "localhost:9092",
+            "--topic", "test",
+            "--num-records", "10",
             "--consumer.config", tempFile.getAbsolutePath()
         };
 
@@ -160,12 +258,28 @@ public class ConsumerPerformanceTest {
         assertEquals("consumer-1", 
config.props().getProperty(ConsumerConfig.CLIENT_ID_CONFIG));
     }
 
+    @Test
+    public void testCommandConfigWithConsumerConfigPresent() {
+        String[] args = new String[]{
+            "--bootstrap-server", "localhost:9092",
+            "--topic", "test",
+            "--num-records", "10",
+            "--consumer.config", "some-path",
+            "--command-config", "some-path"
+        };
+
+        String err = ToolsTestUtils.captureStandardErr(() ->
+                new ConsumerPerformance.ConsumerPerfOptions(args));
+        assertTrue(err.contains(String.format("Option \"%s\" can't be used 
with option \"%s\"",
+                "[consumer.config]", "[command-config]")));
+    }
+
     @Test
     public void testDefaultClientId() throws IOException {
         String[] args = new String[]{
             "--bootstrap-server", "localhost:9092",
             "--topic", "test",
-            "--messages", "10"
+            "--num-records", "10"
         };
 
         ConsumerPerformance.ConsumerPerfOptions config = new 
ConsumerPerformance.ConsumerPerfOptions(args);
@@ -178,7 +292,7 @@ public class ConsumerPerformanceTest {
         String[] args = new String[]{
             "--bootstrap-server", "localhost:9092",
             "--topic", "test",
-            "--messages", "0",
+            "--num-records", "0",
             "--print-metrics"
         };
 
diff --git 
a/tools/src/test/java/org/apache/kafka/tools/ShareConsumerPerformanceTest.java 
b/tools/src/test/java/org/apache/kafka/tools/ShareConsumerPerformanceTest.java
index a22a97f8211..6cffde627bb 100644
--- 
a/tools/src/test/java/org/apache/kafka/tools/ShareConsumerPerformanceTest.java
+++ 
b/tools/src/test/java/org/apache/kafka/tools/ShareConsumerPerformanceTest.java
@@ -67,7 +67,7 @@ public class ShareConsumerPerformanceTest {
         String[] args = new String[]{
             "--bootstrap-server", "localhost:9092",
             "--topic", "test",
-            "--messages", "10",
+            "--num-records", "10",
             "--print-metrics"
         };
 
@@ -75,15 +75,65 @@ public class ShareConsumerPerformanceTest {
 
         assertEquals("localhost:9092", config.brokerHostsAndPorts());
         assertTrue(config.topic().contains("test"));
-        assertEquals(10, config.numMessages());
+        assertEquals(10, config.numRecords());
     }
 
     @Test
-    public void testConfigWithUnrecognizedOption() {
+    public void testBootstrapServerNotPresent() {
+        String[] args = new String[]{
+            "--topic", "test"
+        };
+
+        String err = ToolsTestUtils.captureStandardErr(() ->
+                new ShareConsumerPerformance.ShareConsumerPerfOptions(args));
+        assertTrue(err.contains("Missing required argument 
\"[bootstrap-server]\""));
+    }
+
+    @Test
+    public void testNumOfRecordsNotPresent() {
+        String[] args = new String[]{
+            "--bootstrap-server", "localhost:9092",
+            "--topic", "test"
+        };
+
+        String err = ToolsTestUtils.captureStandardErr(() ->
+                new ShareConsumerPerformance.ShareConsumerPerfOptions(args));
+        assertTrue(err.contains("Exactly one of the following arguments is 
required:"));
+    }
+
+    @Test
+    public void testMessagesDeprecated() {
+        String[] args = new String[]{
+            "--bootstrap-server", "localhost:9092",
+            "--topic", "test",
+            "--messages", "10"
+        };
+
+        ShareConsumerPerformance.ShareConsumerPerfOptions config =
+                new ShareConsumerPerformance.ShareConsumerPerfOptions(args);
+        assertEquals(10, config.numRecords());
+    }
+
+    @Test
+    public void testNumOfRecordsWithMessagesPresent() {
         String[] args = new String[]{
             "--bootstrap-server", "localhost:9092",
             "--topic", "test",
             "--messages", "10",
+            "--num-records", "20"
+        };
+
+        String err = ToolsTestUtils.captureStandardErr(() ->
+                new ShareConsumerPerformance.ShareConsumerPerfOptions(args));
+        assertTrue(err.contains("Exactly one of the following arguments is 
required"));
+    }
+
+    @Test
+    public void testConfigWithUnrecognizedOption() {
+        String[] args = new String[]{
+            "--bootstrap-server", "localhost:9092",
+            "--topic", "test",
+            "--num-records", "10",
             "--new-share-consumer"
         };
 
@@ -92,9 +142,36 @@ public class ShareConsumerPerformanceTest {
         assertTrue(err.contains("new-share-consumer is not a recognized 
option"));
     }
 
+    @Test
+    public void testCommandProperty() throws IOException {
+        Path configPath = 
tempDir.resolve("test_command_property_share_consumer_perf.conf");
+        Files.deleteIfExists(configPath);
+        File tempFile = Files.createFile(configPath).toFile();
+        try (PrintWriter output = new 
PrintWriter(Files.newOutputStream(tempFile.toPath()))) {
+            output.println("client.id=consumer-1");
+            output.flush();
+        }
+
+        String[] args = new String[]{
+            "--bootstrap-server", "localhost:9092",
+            "--topic", "test",
+            "--num-records", "10",
+            "--command-property", "client.id=consumer-2",
+            "--command-config", tempFile.getAbsolutePath(),
+            "--command-property", "prop=val"
+        };
+
+        ShareConsumerPerformance.ShareConsumerPerfOptions config = new 
ShareConsumerPerformance.ShareConsumerPerfOptions(args);
+
+        assertEquals("consumer-2", 
config.props().getProperty(ConsumerConfig.CLIENT_ID_CONFIG));
+        assertEquals("val", config.props().getProperty("prop"));
+    }
+
     @Test
     public void testClientIdOverride() throws IOException {
-        File tempFile = 
Files.createFile(tempDir.resolve("test_share_consumer_config.conf")).toFile();
+        Path configPath = 
tempDir.resolve("test_client_id_override_share_consumer_perf.conf");
+        Files.deleteIfExists(configPath);
+        File tempFile = Files.createFile(configPath).toFile();
         try (PrintWriter output = new 
PrintWriter(Files.newOutputStream(tempFile.toPath()))) {
             output.println("client.id=share-consumer-1");
             output.flush();
@@ -103,8 +180,8 @@ public class ShareConsumerPerformanceTest {
         String[] args = new String[]{
             "--bootstrap-server", "localhost:9092",
             "--topic", "test",
-            "--messages", "10",
-            "--consumer.config", tempFile.getAbsolutePath()
+            "--num-records", "10",
+            "--command-config", tempFile.getAbsolutePath()
         };
 
         ShareConsumerPerformance.ShareConsumerPerfOptions config = new 
ShareConsumerPerformance.ShareConsumerPerfOptions(args);
@@ -112,12 +189,51 @@ public class ShareConsumerPerformanceTest {
         assertEquals("share-consumer-1", 
config.props().getProperty(ConsumerConfig.CLIENT_ID_CONFIG));
     }
 
+    @Test
+    public void testConsumerConfigDeprecated() throws IOException {
+        Path configPath = 
tempDir.resolve("test_consumer_config_deprecated_share_consumer_perf.conf");
+        Files.deleteIfExists(configPath);
+        File tempFile = Files.createFile(configPath).toFile();
+        try (PrintWriter output = new 
PrintWriter(Files.newOutputStream(tempFile.toPath()))) {
+            output.println("client.id=share-consumer-1");
+            output.flush();
+        }
+
+        String[] args = new String[]{
+            "--bootstrap-server", "localhost:9092",
+            "--topic", "test",
+            "--num-records", "10",
+            "--consumer.config", tempFile.getAbsolutePath()
+        };
+
+        ShareConsumerPerformance.ShareConsumerPerfOptions config =
+                new ShareConsumerPerformance.ShareConsumerPerfOptions(args);
+
+        assertEquals("share-consumer-1", 
config.props().getProperty(ConsumerConfig.CLIENT_ID_CONFIG));
+    }
+
+    @Test
+    public void testCommandConfigWithConsumerConfigPresent() {
+        String[] args = new String[]{
+            "--bootstrap-server", "localhost:9092",
+            "--topic", "test",
+            "--num-records", "10",
+            "--consumer.config", "some-path",
+            "--command-config", "some-path"
+        };
+
+        String err = ToolsTestUtils.captureStandardErr(() ->
+                new ShareConsumerPerformance.ShareConsumerPerfOptions(args));
+        assertTrue(err.contains(String.format("Option \"%s\" can't be used 
with option \"%s\"",
+                "[consumer.config]", "[command-config]")));
+    }
+
     @Test
     public void testDefaultClientId() throws IOException {
         String[] args = new String[]{
             "--bootstrap-server", "localhost:9092",
             "--topic", "test",
-            "--messages", "10"
+            "--num-records", "10"
         };
 
         ShareConsumerPerformance.ShareConsumerPerfOptions config = new 
ShareConsumerPerformance.ShareConsumerPerfOptions(args);
@@ -130,7 +246,7 @@ public class ShareConsumerPerformanceTest {
         String[] args = new String[]{
             "--bootstrap-server", "localhost:9092",
             "--topic", "test",
-            "--messages", "0",
+            "--num-records", "0",
             "--print-metrics"
         };
 

Reply via email to