This is an automated email from the ASF dual-hosted git repository.
schofielaj pushed a commit to branch trunk
in repository https://gitbox.apache.org/repos/asf/kafka.git
The following commit(s) were added to refs/heads/trunk by this push:
new dbe9d34e474 KAFKA-19624: Improve consistency of command-line arguments
for consumer performance tests (#20385)
dbe9d34e474 is described below
commit dbe9d34e47452059b78a73f782c4911ec7e366f7
Author: ally heev <[email protected]>
AuthorDate: Tue Sep 23 14:31:40 2025 +0530
KAFKA-19624: Improve consistency of command-line arguments for consumer
performance tests (#20385)
resolves https://issues.apache.org/jira/browse/KAFKA-19624
Reviewers: @brandboat, @AndrewJSchofield, @m1a2st
---
.../services/performance/consumer_performance.py | 15 ++-
.../performance/share_consumer_performance.py | 17 ++-
.../apache/kafka/tools/ConsumerPerformance.java | 133 +++++++++++++------
.../kafka/tools/ShareConsumerPerformance.java | 145 ++++++++++++++-------
.../kafka/tools/ConsumerPerformanceTest.java | 136 +++++++++++++++++--
.../kafka/tools/ShareConsumerPerformanceTest.java | 132 +++++++++++++++++--
6 files changed, 458 insertions(+), 120 deletions(-)
diff --git a/tests/kafkatest/services/performance/consumer_performance.py
b/tests/kafkatest/services/performance/consumer_performance.py
index 28086e82818..aa414e5a89f 100644
--- a/tests/kafkatest/services/performance/consumer_performance.py
+++ b/tests/kafkatest/services/performance/consumer_performance.py
@@ -40,7 +40,7 @@ class ConsumerPerformanceService(PerformanceService):
"socket-buffer-size", "The size of the tcp RECV size."
"new-consumer", "Use the new consumer implementation."
- "consumer.config", "Consumer config properties file."
+ "command-config", "Config properties file."
"""
# Root directory for persistent output
@@ -83,10 +83,14 @@ class ConsumerPerformanceService(PerformanceService):
def args(self, version):
"""Dictionary of arguments used to start the Consumer Performance
script."""
args = {
- 'topic': self.topic,
- 'messages': self.messages
+ 'topic': self.topic
}
+ if version.supports_command_config():
+ args['num-records'] = self.messages
+ else:
+ args['messages'] = self.messages
+
if version < V_2_5_0:
args['broker-list'] =
self.kafka.bootstrap_servers(self.security_config.security_protocol)
else:
@@ -115,7 +119,10 @@ class ConsumerPerformanceService(PerformanceService):
for key, value in self.args(node.version).items():
cmd += " --%s %s" % (key, value)
- cmd += " --consumer.config %s" % ConsumerPerformanceService.CONFIG_FILE
+ if node.version.supports_command_config():
+ cmd += " --command-config %s" %
ConsumerPerformanceService.CONFIG_FILE
+ else:
+ cmd += " --consumer.config %s" %
ConsumerPerformanceService.CONFIG_FILE
for key, value in self.settings.items():
cmd += " %s=%s" % (str(key), str(value))
diff --git a/tests/kafkatest/services/performance/share_consumer_performance.py
b/tests/kafkatest/services/performance/share_consumer_performance.py
index ccb09524580..432d1e1da81 100644
--- a/tests/kafkatest/services/performance/share_consumer_performance.py
+++ b/tests/kafkatest/services/performance/share_consumer_performance.py
@@ -33,7 +33,7 @@ class ShareConsumerPerformanceService(PerformanceService):
"socket-buffer-size", "The size of the tcp RECV size."
- "consumer.config", "Consumer config properties file."
+ "command-config", "Config properties file."
"""
# Root directory for persistent output
@@ -73,16 +73,20 @@ class ShareConsumerPerformanceService(PerformanceService):
for node in self.nodes:
node.version = version
- def args(self):
+ def args(self, version):
"""Dictionary of arguments used to start the Share Consumer
Performance script."""
args = {
'topic': self.topic,
- 'messages': self.messages,
'bootstrap-server':
self.kafka.bootstrap_servers(self.security_config.security_protocol),
'group': self.group,
'timeout': self.timeout
}
+ if version.supports_command_config():
+ args['num-records'] = self.messages
+ else:
+ args['messages'] = self.messages
+
if self.fetch_size is not None:
args['fetch-size'] = self.fetch_size
@@ -97,10 +101,13 @@ class ShareConsumerPerformanceService(PerformanceService):
cmd += " export KAFKA_OPTS=%s;" % self.security_config.kafka_opts
cmd += " export KAFKA_LOG4J_OPTS=\"%s%s\";" %
(get_log4j_config_param(node), get_log4j_config_for_tools(node))
cmd += " %s" % self.path.script("kafka-share-consumer-perf-test.sh",
node)
- for key, value in self.args().items():
+ for key, value in self.args(node.version).items():
cmd += " --%s %s" % (key, value)
- cmd += " --consumer.config %s" %
ShareConsumerPerformanceService.CONFIG_FILE
+ if node.version.supports_command_config():
+ cmd += " --command-config %s" %
ShareConsumerPerformanceService.CONFIG_FILE
+ else:
+ cmd += " --consumer.config %s" %
ShareConsumerPerformanceService.CONFIG_FILE
for key, value in self.settings.items():
cmd += " %s=%s" % (str(key), str(value))
diff --git
a/tools/src/main/java/org/apache/kafka/tools/ConsumerPerformance.java
b/tools/src/main/java/org/apache/kafka/tools/ConsumerPerformance.java
index 62def15d324..0334af83aa1 100644
--- a/tools/src/main/java/org/apache/kafka/tools/ConsumerPerformance.java
+++ b/tools/src/main/java/org/apache/kafka/tools/ConsumerPerformance.java
@@ -48,6 +48,7 @@ import joptsimple.OptionException;
import joptsimple.OptionSpec;
import static joptsimple.util.RegexMatcher.regex;
+import static org.apache.kafka.server.util.CommandLineUtils.parseKeyValueArgs;
public class ConsumerPerformance {
private static final Logger LOG =
LoggerFactory.getLogger(ConsumerPerformance.class);
@@ -61,7 +62,7 @@ public class ConsumerPerformance {
try {
LOG.info("Starting consumer...");
ConsumerPerfOptions options = new ConsumerPerfOptions(args);
- AtomicLong totalMessagesRead = new AtomicLong(0);
+ AtomicLong totalRecordsRead = new AtomicLong(0);
AtomicLong totalBytesRead = new AtomicLong(0);
AtomicLong joinTimeMs = new AtomicLong(0);
AtomicLong joinTimeMsInSingleRound = new AtomicLong(0);
@@ -71,14 +72,14 @@ public class ConsumerPerformance {
try (Consumer<byte[], byte[]> consumer =
consumerCreator.apply(options.props())) {
long bytesRead = 0L;
- long messagesRead = 0L;
+ long recordsRead = 0L;
long lastBytesRead = 0L;
- long lastMessagesRead = 0L;
+ long lastRecordsRead = 0L;
long currentTimeMs = System.currentTimeMillis();
long joinStartMs = currentTimeMs;
long startMs = currentTimeMs;
- consume(consumer, options, totalMessagesRead, totalBytesRead,
joinTimeMs,
- bytesRead, messagesRead, lastBytesRead, lastMessagesRead,
+ consume(consumer, options, totalRecordsRead, totalBytesRead,
joinTimeMs,
+ bytesRead, recordsRead, lastBytesRead, lastRecordsRead,
joinStartMs, joinTimeMsInSingleRound);
long endMs = System.currentTimeMillis();
@@ -92,12 +93,12 @@ public class ConsumerPerformance {
options.dateFormat().format(endMs),
totalMbRead,
totalMbRead / elapsedSec,
- totalMessagesRead.get(),
- totalMessagesRead.get() / elapsedSec,
+ totalRecordsRead.get(),
+ totalRecordsRead.get() / elapsedSec,
joinTimeMs.get(),
fetchTimeInMs,
totalMbRead / (fetchTimeInMs / 1000.0),
- totalMessagesRead.get() / (fetchTimeInMs / 1000.0)
+ totalRecordsRead.get() / (fetchTimeInMs / 1000.0)
);
}
@@ -122,16 +123,16 @@ public class ConsumerPerformance {
private static void consume(Consumer<byte[], byte[]> consumer,
ConsumerPerfOptions options,
- AtomicLong totalMessagesRead,
+ AtomicLong totalRecordsRead,
AtomicLong totalBytesRead,
AtomicLong joinTimeMs,
long bytesRead,
- long messagesRead,
+ long recordsRead,
long lastBytesRead,
- long lastMessagesRead,
+ long lastRecordsRead,
long joinStartMs,
AtomicLong joinTimeMsInSingleRound) {
- long numMessages = options.numMessages();
+ long numRecords = options.numRecords();
long recordFetchTimeoutMs = options.recordFetchTimeoutMs();
long reportingIntervalMs = options.reportingIntervalMs();
boolean showDetailedStats = options.showDetailedStats();
@@ -149,55 +150,55 @@ public class ConsumerPerformance {
long lastReportTimeMs = currentTimeMs;
long lastConsumedTimeMs = currentTimeMs;
- while (messagesRead < numMessages && currentTimeMs -
lastConsumedTimeMs <= recordFetchTimeoutMs) {
+ while (recordsRead < numRecords && currentTimeMs - lastConsumedTimeMs
<= recordFetchTimeoutMs) {
ConsumerRecords<byte[], byte[]> records =
consumer.poll(Duration.ofMillis(100));
currentTimeMs = System.currentTimeMillis();
if (!records.isEmpty())
lastConsumedTimeMs = currentTimeMs;
for (ConsumerRecord<byte[], byte[]> record : records) {
- messagesRead += 1;
+ recordsRead += 1;
if (record.key() != null)
bytesRead += record.key().length;
if (record.value() != null)
bytesRead += record.value().length;
if (currentTimeMs - lastReportTimeMs >= reportingIntervalMs) {
if (showDetailedStats)
- printConsumerProgress(0, bytesRead, lastBytesRead,
messagesRead, lastMessagesRead,
+ printConsumerProgress(0, bytesRead, lastBytesRead,
recordsRead, lastRecordsRead,
lastReportTimeMs, currentTimeMs, dateFormat,
joinTimeMsInSingleRound.get());
joinTimeMsInSingleRound = new AtomicLong(0);
lastReportTimeMs = currentTimeMs;
- lastMessagesRead = messagesRead;
+ lastRecordsRead = recordsRead;
lastBytesRead = bytesRead;
}
}
}
- if (messagesRead < numMessages)
- System.out.printf("WARNING: Exiting before consuming the expected
number of messages: timeout (%d ms) exceeded. " +
+ if (recordsRead < numRecords)
+ System.out.printf("WARNING: Exiting before consuming the expected
number of records: timeout (%d ms) exceeded. " +
"You can use the --timeout option to increase the timeout.%n",
recordFetchTimeoutMs);
- totalMessagesRead.set(messagesRead);
+ totalRecordsRead.set(recordsRead);
totalBytesRead.set(bytesRead);
}
protected static void printConsumerProgress(int id,
long bytesRead,
long lastBytesRead,
- long messagesRead,
- long lastMessagesRead,
+ long recordsRead,
+ long lastRecordsRead,
long startMs,
long endMs,
SimpleDateFormat dateFormat,
long joinTimeMsInSingleRound) {
- printBasicProgress(id, bytesRead, lastBytesRead, messagesRead,
lastMessagesRead, startMs, endMs, dateFormat);
- printExtendedProgress(bytesRead, lastBytesRead, messagesRead,
lastMessagesRead, startMs, endMs, joinTimeMsInSingleRound);
+ printBasicProgress(id, bytesRead, lastBytesRead, recordsRead,
lastRecordsRead, startMs, endMs, dateFormat);
+ printExtendedProgress(bytesRead, lastBytesRead, recordsRead,
lastRecordsRead, startMs, endMs, joinTimeMsInSingleRound);
System.out.println();
}
private static void printBasicProgress(int id,
long bytesRead,
long lastBytesRead,
- long messagesRead,
- long lastMessagesRead,
+ long recordsRead,
+ long lastRecordsRead,
long startMs,
long endMs,
SimpleDateFormat dateFormat) {
@@ -205,25 +206,25 @@ public class ConsumerPerformance {
double totalMbRead = (bytesRead * 1.0) / (1024 * 1024);
double intervalMbRead = ((bytesRead - lastBytesRead) * 1.0) / (1024 *
1024);
double intervalMbPerSec = 1000.0 * intervalMbRead / elapsedMs;
- double intervalMessagesPerSec = ((messagesRead - lastMessagesRead) /
elapsedMs) * 1000.0;
+ double intervalRecordsPerSec = ((recordsRead - lastRecordsRead) /
elapsedMs) * 1000.0;
System.out.printf("%s, %d, %.4f, %.4f, %d, %.4f",
dateFormat.format(endMs), id,
- totalMbRead, intervalMbPerSec, messagesRead,
intervalMessagesPerSec);
+ totalMbRead, intervalMbPerSec, recordsRead, intervalRecordsPerSec);
}
private static void printExtendedProgress(long bytesRead,
long lastBytesRead,
- long messagesRead,
- long lastMessagesRead,
+ long recordsRead,
+ long lastRecordsRead,
long startMs,
long endMs,
long joinTimeMsInSingleRound) {
long fetchTimeMs = endMs - startMs - joinTimeMsInSingleRound;
double intervalMbRead = ((bytesRead - lastBytesRead) * 1.0) / (1024 *
1024);
- long intervalMessagesRead = messagesRead - lastMessagesRead;
+ long intervalRecordsRead = recordsRead - lastRecordsRead;
double intervalMbPerSec = (fetchTimeMs <= 0) ? 0.0 : 1000.0 *
intervalMbRead / fetchTimeMs;
- double intervalMessagesPerSec = (fetchTimeMs <= 0) ? 0.0 : 1000.0 *
intervalMessagesRead / fetchTimeMs;
+ double intervalRecordsPerSec = (fetchTimeMs <= 0) ? 0.0 : 1000.0 *
intervalRecordsRead / fetchTimeMs;
System.out.printf(", %d, %d, %.4f, %.4f", joinTimeMsInSingleRound,
- fetchTimeMs, intervalMbPerSec, intervalMessagesPerSec);
+ fetchTimeMs, intervalMbPerSec, intervalRecordsPerSec);
}
public static class ConsumerPerfRebListener implements
ConsumerRebalanceListener {
@@ -256,13 +257,18 @@ public class ConsumerPerformance {
private final OptionSpec<String> includeOpt;
private final OptionSpec<String> groupIdOpt;
private final OptionSpec<Integer> fetchSizeOpt;
+ private final OptionSpec<String> commandPropertiesOpt;
private final OptionSpec<Void> resetBeginningOffsetOpt;
private final OptionSpec<Integer> socketBufferSizeOpt;
+ @Deprecated(since = "4.2", forRemoval = true)
private final OptionSpec<String> consumerConfigOpt;
+ private final OptionSpec<String> commandConfigOpt;
private final OptionSpec<Void> printMetricsOpt;
private final OptionSpec<Void> showDetailedStatsOpt;
private final OptionSpec<Long> recordFetchTimeoutOpt;
+ @Deprecated(since = "4.2", forRemoval = true)
private final OptionSpec<Long> numMessagesOpt;
+ private final OptionSpec<Long> numRecordsOpt;
private final OptionSpec<Long> reportingIntervalOpt;
private final OptionSpec<String> dateFormatOpt;
private final OptionSpec<Void> hideHeaderOpt;
@@ -291,26 +297,41 @@ public class ConsumerPerformance {
.describedAs("size")
.ofType(Integer.class)
.defaultsTo(1024 * 1024);
+ commandPropertiesOpt = parser.accepts("command-property", "Kafka
consumer related configuration properties like client.id. " +
+ "These configs take precedence over those passed via
--command-config or --consumer.config.")
+ .withRequiredArg()
+ .describedAs("prop1=val1")
+ .ofType(String.class);
resetBeginningOffsetOpt = parser.accepts("from-latest", "If the
consumer does not already have an established " +
- "offset to consume from, start with the latest message present
in the log rather than the earliest message.");
+ "offset to consume from, start with the latest record present
in the log rather than the earliest record.");
socketBufferSizeOpt = parser.accepts("socket-buffer-size", "The
size of the tcp RECV size.")
.withRequiredArg()
.describedAs("size")
.ofType(Integer.class)
.defaultsTo(2 * 1024 * 1024);
- consumerConfigOpt = parser.accepts("consumer.config", "Consumer
config properties file.")
+ consumerConfigOpt = parser.accepts("consumer.config",
"(DEPRECATED) Consumer config properties file. " +
+ "This option will be removed in a future version.
Use --command-config instead.")
+ .withRequiredArg()
+ .describedAs("config file")
+ .ofType(String.class);
+ commandConfigOpt = parser.accepts("command-config", "Config
properties file.")
.withRequiredArg()
.describedAs("config file")
.ofType(String.class);
printMetricsOpt = parser.accepts("print-metrics", "Print out the
metrics.");
showDetailedStatsOpt = parser.accepts("show-detailed-stats", "If
set, stats are reported for each reporting " +
- "interval as configured by reporting-interval");
+ "interval as configured by reporting-interval.");
recordFetchTimeoutOpt = parser.accepts("timeout", "The maximum
allowed time in milliseconds between returned records.")
.withOptionalArg()
.describedAs("milliseconds")
.ofType(Long.class)
.defaultsTo(10_000L);
- numMessagesOpt = parser.accepts("messages", "REQUIRED: The number
of messages to consume.")
+ numMessagesOpt = parser.accepts("messages", "(DEPRECATED) The
number of records to consume. " +
+ "This option will be removed in a future version.
Use --num-records instead.")
+ .withRequiredArg()
+ .describedAs("count")
+ .ofType(Long.class);
+ numRecordsOpt = parser.accepts("num-records", "REQUIRED: The
number of records to consume.")
.withRequiredArg()
.describedAs("count")
.ofType(Long.class);
@@ -326,7 +347,7 @@ public class ConsumerPerformance {
.describedAs("date format")
.ofType(String.class)
.defaultsTo("yyyy-MM-dd HH:mm:ss:SSS");
- hideHeaderOpt = parser.accepts("hide-header", "If set, skips
printing the header for the stats");
+ hideHeaderOpt = parser.accepts("hide-header", "If set, skips
printing the header for the stats.");
try {
options = parser.parse(args);
} catch (OptionException e) {
@@ -335,8 +356,19 @@ public class ConsumerPerformance {
}
if (options != null) {
CommandLineUtils.maybePrintHelpOrVersion(this, "This tool is
used to verify the consumer performance.");
- CommandLineUtils.checkRequiredArgs(parser, options,
numMessagesOpt);
+ CommandLineUtils.checkRequiredArgs(parser, options,
bootstrapServerOpt);
CommandLineUtils.checkOneOfArgs(parser, options, topicOpt,
includeOpt);
+
+ CommandLineUtils.checkOneOfArgs(parser, options,
numMessagesOpt, numRecordsOpt);
+ CommandLineUtils.checkInvalidArgs(parser, options,
consumerConfigOpt, commandConfigOpt);
+
+ if (options.has(numMessagesOpt)) {
+ System.out.println("Warning: --messages is deprecated. Use
--num-records instead.");
+ }
+
+ if (options.has(consumerConfigOpt)) {
+ System.out.println("Warning: --consumer.config is
deprecated. Use --command-config instead.");
+ }
}
}
@@ -348,10 +380,23 @@ public class ConsumerPerformance {
return options.valueOf(bootstrapServerOpt);
}
+ private Properties readProps(List<String> commandProperties, String
commandConfigFile) throws IOException {
+ Properties props = commandConfigFile != null
+ ? Utils.loadProps(commandConfigFile)
+ : new Properties();
+ props.putAll(parseKeyValueArgs(commandProperties));
+ return props;
+ }
+
public Properties props() throws IOException {
- Properties props = (options.has(consumerConfigOpt))
- ? Utils.loadProps(options.valueOf(consumerConfigOpt))
- : new Properties();
+ List<String> commandProperties =
options.valuesOf(commandPropertiesOpt);
+ String commandConfigFile;
+ if (options.has(consumerConfigOpt)) {
+ commandConfigFile = options.valueOf(consumerConfigOpt);
+ } else {
+ commandConfigFile = options.valueOf(commandConfigOpt);
+ }
+ Properties props = readProps(commandProperties, commandConfigFile);
props.put(ConsumerConfig.BOOTSTRAP_SERVERS_CONFIG,
brokerHostsAndPorts());
props.put(ConsumerConfig.GROUP_ID_CONFIG,
options.valueOf(groupIdOpt));
props.put(ConsumerConfig.RECEIVE_BUFFER_CONFIG,
options.valueOf(socketBufferSizeOpt).toString());
@@ -378,8 +423,10 @@ public class ConsumerPerformance {
: Optional.empty();
}
- public long numMessages() {
- return options.valueOf(numMessagesOpt);
+ public long numRecords() {
+ return options.has(numMessagesOpt)
+ ? options.valueOf(numMessagesOpt)
+ : options.valueOf(numRecordsOpt);
}
public long reportingIntervalMs() {
diff --git
a/tools/src/main/java/org/apache/kafka/tools/ShareConsumerPerformance.java
b/tools/src/main/java/org/apache/kafka/tools/ShareConsumerPerformance.java
index 5d6179efaeb..51c66704668 100644
--- a/tools/src/main/java/org/apache/kafka/tools/ShareConsumerPerformance.java
+++ b/tools/src/main/java/org/apache/kafka/tools/ShareConsumerPerformance.java
@@ -55,6 +55,7 @@ import joptsimple.OptionException;
import joptsimple.OptionSpec;
import static joptsimple.util.RegexMatcher.regex;
+import static org.apache.kafka.server.util.CommandLineUtils.parseKeyValueArgs;
public class ShareConsumerPerformance {
private static final Logger LOG =
LoggerFactory.getLogger(ShareConsumerPerformance.class);
@@ -67,7 +68,7 @@ public class ShareConsumerPerformance {
try {
LOG.info("Starting share consumer/consumers...");
ShareConsumerPerfOptions options = new
ShareConsumerPerfOptions(args);
- AtomicLong totalMessagesRead = new AtomicLong(0);
+ AtomicLong totalRecordsRead = new AtomicLong(0);
AtomicLong totalBytesRead = new AtomicLong(0);
if (!options.hideHeader())
@@ -78,7 +79,7 @@ public class ShareConsumerPerformance {
shareConsumers.add(shareConsumerCreator.apply(options.props()));
}
long startMs = System.currentTimeMillis();
- consume(shareConsumers, options, totalMessagesRead,
totalBytesRead, startMs);
+ consume(shareConsumers, options, totalRecordsRead, totalBytesRead,
startMs);
long endMs = System.currentTimeMillis();
List<Map<MetricName, ? extends Metric>> shareConsumersMetrics =
new ArrayList<>();
@@ -93,7 +94,7 @@ public class ShareConsumerPerformance {
// Print final stats for share group.
double elapsedSec = (endMs - startMs) / 1_000.0;
long fetchTimeInMs = endMs - startMs;
- printStats(totalBytesRead.get(), totalMessagesRead.get(),
elapsedSec, fetchTimeInMs, startMs, endMs,
+ printStats(totalBytesRead.get(), totalRecordsRead.get(),
elapsedSec, fetchTimeInMs, startMs, endMs,
options.dateFormat(), -1);
shareConsumersMetrics.forEach(ToolsUtils::printMetrics);
@@ -113,15 +114,15 @@ public class ShareConsumerPerformance {
private static void consume(List<ShareConsumer<byte[], byte[]>>
shareConsumers,
ShareConsumerPerfOptions options,
- AtomicLong totalMessagesRead,
+ AtomicLong totalRecordsRead,
AtomicLong totalBytesRead,
long startMs) throws ExecutionException,
InterruptedException {
- long numMessages = options.numMessages();
+ long numRecords = options.numRecords();
long recordFetchTimeoutMs = options.recordFetchTimeoutMs();
shareConsumers.forEach(shareConsumer ->
shareConsumer.subscribe(options.topic()));
// Now start the benchmark.
- AtomicLong messagesRead = new AtomicLong(0);
+ AtomicLong recordsRead = new AtomicLong(0);
AtomicLong bytesRead = new AtomicLong(0);
List<ShareConsumerConsumption> shareConsumersConsumptionDetails = new
ArrayList<>();
@@ -133,7 +134,7 @@ public class ShareConsumerPerformance {
ShareConsumerConsumption shareConsumerConsumption = new
ShareConsumerConsumption(0, 0);
futures.add(executorService.submit(() -> {
try {
-
consumeMessagesForSingleShareConsumer(shareConsumers.get(index), messagesRead,
bytesRead, options,
+
consumeRecordsForSingleShareConsumer(shareConsumers.get(index), recordsRead,
bytesRead, options,
shareConsumerConsumption, index + 1);
} catch (InterruptedException e) {
throw new RuntimeException(e);
@@ -171,22 +172,22 @@ public class ShareConsumerPerformance {
// Print stats for share consumer.
double elapsedSec = (endMs - startMs) / 1_000.0;
long fetchTimeInMs = endMs - startMs;
- long messagesReadByConsumer =
shareConsumersConsumptionDetails.get(index).messagesConsumed();
+ long recordsReadByConsumer =
shareConsumersConsumptionDetails.get(index).recordsConsumed();
long bytesReadByConsumer =
shareConsumersConsumptionDetails.get(index).bytesConsumed();
- printStats(bytesReadByConsumer, messagesReadByConsumer,
elapsedSec, fetchTimeInMs, startMs, endMs, options.dateFormat(), index + 1);
+ printStats(bytesReadByConsumer, recordsReadByConsumer,
elapsedSec, fetchTimeInMs, startMs, endMs, options.dateFormat(), index + 1);
}
}
- if (messagesRead.get() < numMessages) {
- System.out.printf("WARNING: Exiting before consuming the expected
number of messages: timeout (%d ms) exceeded. " +
+ if (recordsRead.get() < numRecords) {
+ System.out.printf("WARNING: Exiting before consuming the expected
number of records: timeout (%d ms) exceeded. " +
"You can use the --timeout option to increase the
timeout.%n", recordFetchTimeoutMs);
}
- totalMessagesRead.set(messagesRead.get());
+ totalRecordsRead.set(recordsRead.get());
totalBytesRead.set(bytesRead.get());
}
- private static void
consumeMessagesForSingleShareConsumer(ShareConsumer<byte[], byte[]>
shareConsumer,
- AtomicLong
totalMessagesRead,
+ private static void
consumeRecordsForSingleShareConsumer(ShareConsumer<byte[], byte[]>
shareConsumer,
+ AtomicLong
totalRecordsRead,
AtomicLong
totalBytesRead,
ShareConsumerPerfOptions options,
ShareConsumerConsumption shareConsumerConsumption,
@@ -197,17 +198,17 @@ public class ShareConsumerPerformance {
long lastReportTimeMs = currentTimeMs;
long lastBytesRead = 0L;
- long lastMessagesRead = 0L;
- long messagesReadByConsumer = 0L;
+ long lastRecordsRead = 0L;
+ long recordsReadByConsumer = 0L;
long bytesReadByConsumer = 0L;
- while (totalMessagesRead.get() < options.numMessages() &&
currentTimeMs - lastConsumedTimeMs <= options.recordFetchTimeoutMs()) {
+ while (totalRecordsRead.get() < options.numRecords() && currentTimeMs
- lastConsumedTimeMs <= options.recordFetchTimeoutMs()) {
ConsumerRecords<byte[], byte[]> records =
shareConsumer.poll(Duration.ofMillis(100));
currentTimeMs = System.currentTimeMillis();
if (!records.isEmpty())
lastConsumedTimeMs = currentTimeMs;
for (ConsumerRecord<byte[], byte[]> record : records) {
- messagesReadByConsumer += 1;
- totalMessagesRead.addAndGet(1);
+ recordsReadByConsumer += 1;
+ totalRecordsRead.addAndGet(1);
if (record.key() != null) {
bytesReadByConsumer += record.key().length;
totalBytesRead.addAndGet(record.key().length);
@@ -218,13 +219,13 @@ public class ShareConsumerPerformance {
}
if (currentTimeMs - lastReportTimeMs >=
options.reportingIntervalMs()) {
if (options.showDetailedStats())
- printShareConsumerProgress(bytesReadByConsumer,
lastBytesRead, messagesReadByConsumer, lastMessagesRead,
+ printShareConsumerProgress(bytesReadByConsumer,
lastBytesRead, recordsReadByConsumer, lastRecordsRead,
lastReportTimeMs, currentTimeMs, dateFormat,
index);
lastReportTimeMs = currentTimeMs;
- lastMessagesRead = messagesReadByConsumer;
+ lastRecordsRead = recordsReadByConsumer;
lastBytesRead = bytesReadByConsumer;
}
-
shareConsumerConsumption.updateMessagesConsumed(messagesReadByConsumer);
+
shareConsumerConsumption.updateRecordsConsumed(recordsReadByConsumer);
shareConsumerConsumption.updateBytesConsumed(bytesReadByConsumer);
}
}
@@ -232,8 +233,8 @@ public class ShareConsumerPerformance {
protected static void printShareConsumerProgress(long bytesRead,
long lastBytesRead,
- long messagesRead,
- long lastMessagesRead,
+ long recordsRead,
+ long lastRecordsRead,
long startMs,
long endMs,
SimpleDateFormat dateFormat,
@@ -242,18 +243,18 @@ public class ShareConsumerPerformance {
double totalMbRead = (bytesRead * 1.0) / (1024 * 1024);
double intervalMbRead = ((bytesRead - lastBytesRead) * 1.0) / (1024 *
1024);
double intervalMbPerSec = 1000.0 * intervalMbRead / elapsedMs;
- double intervalMessagesPerSec = ((messagesRead - lastMessagesRead) /
elapsedMs) * 1000.0;
+ double intervalRecordsPerSec = ((recordsRead - lastRecordsRead) /
elapsedMs) * 1000.0;
long fetchTimeMs = endMs - startMs;
System.out.printf("%s, %s, %.4f, %.4f, %.4f, %d, %d for share consumer
%d", dateFormat.format(startMs), dateFormat.format(endMs),
- totalMbRead, intervalMbPerSec, intervalMessagesPerSec,
messagesRead, fetchTimeMs, index);
+ totalMbRead, intervalMbPerSec, intervalRecordsPerSec, recordsRead,
fetchTimeMs, index);
System.out.println();
}
// Prints stats for both share consumer and share group. For share group,
index is -1. For share consumer,
// index is >= 1.
private static void printStats(long bytesRead,
- long messagesRead,
+ long recordsRead,
double elapsedSec,
long fetchTimeInMs,
long startMs,
@@ -268,8 +269,8 @@ public class ShareConsumerPerformance {
dateFormat.format(endMs),
totalMbRead,
totalMbRead / elapsedSec,
- messagesRead / elapsedSec,
- messagesRead,
+ recordsRead / elapsedSec,
+ recordsRead,
fetchTimeInMs
);
return;
@@ -279,8 +280,8 @@ public class ShareConsumerPerformance {
dateFormat.format(endMs),
totalMbRead,
totalMbRead / elapsedSec,
- messagesRead / elapsedSec,
- messagesRead,
+ recordsRead / elapsedSec,
+ recordsRead,
fetchTimeInMs
);
}
@@ -290,12 +291,17 @@ public class ShareConsumerPerformance {
private final OptionSpec<String> topicOpt;
private final OptionSpec<String> groupIdOpt;
private final OptionSpec<Integer> fetchSizeOpt;
+ private final OptionSpec<String> commandPropertiesOpt;
private final OptionSpec<Integer> socketBufferSizeOpt;
+ @Deprecated(since = "4.2", forRemoval = true)
private final OptionSpec<String> consumerConfigOpt;
+ private final OptionSpec<String> commandConfigOpt;
private final OptionSpec<Void> printMetricsOpt;
private final OptionSpec<Void> showDetailedStatsOpt;
private final OptionSpec<Long> recordFetchTimeoutOpt;
+ @Deprecated(since = "4.2", forRemoval = true)
private final OptionSpec<Long> numMessagesOpt;
+ private final OptionSpec<Long> numRecordsOpt;
private final OptionSpec<Long> reportingIntervalOpt;
private final OptionSpec<String> dateFormatOpt;
private final OptionSpec<Void> hideHeaderOpt;
@@ -322,24 +328,39 @@ public class ShareConsumerPerformance {
.describedAs("size")
.ofType(Integer.class)
.defaultsTo(1024 * 1024);
+ commandPropertiesOpt = parser.accepts("command-property", "Kafka
share consumer related configuration properties like client.id. " +
+ "These configs take precedence over those passed
via --command-config or --consumer.config.")
+ .withRequiredArg()
+ .describedAs("prop1=val1")
+ .ofType(String.class);
socketBufferSizeOpt = parser.accepts("socket-buffer-size", "The
size of the tcp RECV size.")
.withRequiredArg()
.describedAs("size")
.ofType(Integer.class)
.defaultsTo(2 * 1024 * 1024);
- consumerConfigOpt = parser.accepts("consumer.config", "Share
consumer config properties file.")
+ consumerConfigOpt = parser.accepts("consumer.config",
"(DEPRECATED) Share consumer config properties file. " +
+ "This option will be removed in a future version. Use
--command-config instead.")
+ .withRequiredArg()
+ .describedAs("config file")
+ .ofType(String.class);
+ commandConfigOpt = parser.accepts("command-config", "Config
properties file.")
.withRequiredArg()
.describedAs("config file")
.ofType(String.class);
printMetricsOpt = parser.accepts("print-metrics", "Print out the
metrics.");
showDetailedStatsOpt = parser.accepts("show-detailed-stats", "If
set, stats are reported for each reporting " +
- "interval as configured by reporting-interval");
+ "interval as configured by reporting-interval.");
recordFetchTimeoutOpt = parser.accepts("timeout", "The maximum
allowed time in milliseconds between returned records.")
.withOptionalArg()
.describedAs("milliseconds")
.ofType(Long.class)
.defaultsTo(10_000L);
- numMessagesOpt = parser.accepts("messages", "REQUIRED: The number
of messages to consume.")
+ numMessagesOpt = parser.accepts("messages", "(DEPRECATED) The
number of records to consume. " +
+ "This option will be removed in a future version.
Use --num-records instead.")
+ .withRequiredArg()
+ .describedAs("count")
+ .ofType(Long.class);
+ numRecordsOpt = parser.accepts("num-records", "REQUIRED: The
number of records to consume.")
.withRequiredArg()
.describedAs("count")
.ofType(Long.class);
@@ -355,7 +376,7 @@ public class ShareConsumerPerformance {
.describedAs("date format")
.ofType(String.class)
.defaultsTo("yyyy-MM-dd HH:mm:ss:SSS");
- hideHeaderOpt = parser.accepts("hide-header", "If set, skips
printing the header for the stats");
+ hideHeaderOpt = parser.accepts("hide-header", "If set, skips
printing the header for the stats.");
numThreadsOpt = parser.accepts("threads", "The number of share
consumers to use for sharing the load.")
.withRequiredArg()
.describedAs("count")
@@ -371,7 +392,18 @@ public class ShareConsumerPerformance {
}
if (options != null) {
CommandLineUtils.maybePrintHelpOrVersion(this, "This tool is
used to verify the share consumer performance.");
- CommandLineUtils.checkRequiredArgs(parser, options, topicOpt,
numMessagesOpt);
+ CommandLineUtils.checkRequiredArgs(parser, options, topicOpt,
bootstrapServerOpt);
+
+ CommandLineUtils.checkOneOfArgs(parser, options,
numMessagesOpt, numRecordsOpt);
+ CommandLineUtils.checkInvalidArgs(parser, options,
consumerConfigOpt, commandConfigOpt);
+
+ if (options.has(numMessagesOpt)) {
+ System.out.println("Warning: --messages is deprecated. Use
--num-records instead.");
+ }
+
+ if (options.has(consumerConfigOpt)) {
+ System.out.println("Warning: --consumer.config is
deprecated. Use --command-config instead.");
+ }
}
}
@@ -383,10 +415,23 @@ public class ShareConsumerPerformance {
return options.valueOf(bootstrapServerOpt);
}
- public Properties props() throws IOException {
- Properties props = (options.has(consumerConfigOpt))
- ? Utils.loadProps(options.valueOf(consumerConfigOpt))
+ private Properties readProps(List<String> commandProperties, String
commandConfigFile) throws IOException {
+ Properties props = commandConfigFile != null
+ ? Utils.loadProps(commandConfigFile)
: new Properties();
+ props.putAll(parseKeyValueArgs(commandProperties));
+ return props;
+ }
+
+ public Properties props() throws IOException {
+ List<String> commandProperties =
options.valuesOf(commandPropertiesOpt);
+ String commandConfigFile;
+ if (options.has(consumerConfigOpt)) {
+ commandConfigFile = options.valueOf(consumerConfigOpt);
+ } else {
+ commandConfigFile = options.valueOf(commandConfigOpt);
+ }
+ Properties props = readProps(commandProperties, commandConfigFile);
props.put(ConsumerConfig.BOOTSTRAP_SERVERS_CONFIG,
brokerHostsAndPorts());
props.put(ConsumerConfig.GROUP_ID_CONFIG,
options.valueOf(groupIdOpt));
props.put(ConsumerConfig.RECEIVE_BUFFER_CONFIG,
options.valueOf(socketBufferSizeOpt).toString());
@@ -403,8 +448,10 @@ public class ShareConsumerPerformance {
return Set.of(options.valueOf(topicOpt));
}
- public long numMessages() {
- return options.valueOf(numMessagesOpt);
+ public long numRecords() {
+ return options.has(numMessagesOpt)
+ ? options.valueOf(numMessagesOpt)
+ : options.valueOf(numRecordsOpt);
}
public int threads() {
@@ -439,26 +486,26 @@ public class ShareConsumerPerformance {
}
}
- // Helper class to know the final messages and bytes consumer by share
consumer at the end of consumption.
+ // Helper class to know the final records and bytes consumed by share
consumer at the end of consumption.
private static class ShareConsumerConsumption {
- private long messagesConsumed;
+ private long recordsConsumed;
private long bytesConsumed;
- public ShareConsumerConsumption(long messagesConsumed, long
bytesConsumed) {
- this.messagesConsumed = messagesConsumed;
+ public ShareConsumerConsumption(long recordsConsumed, long
bytesConsumed) {
+ this.recordsConsumed = recordsConsumed;
this.bytesConsumed = bytesConsumed;
}
- public long messagesConsumed() {
- return messagesConsumed;
+ public long recordsConsumed() {
+ return recordsConsumed;
}
public long bytesConsumed() {
return bytesConsumed;
}
- public void updateMessagesConsumed(long messagesConsumed) {
- this.messagesConsumed = messagesConsumed;
+ public void updateRecordsConsumed(long recordsConsumed) {
+ this.recordsConsumed = recordsConsumed;
}
public void updateBytesConsumed(long bytesConsumed) {
diff --git
a/tools/src/test/java/org/apache/kafka/tools/ConsumerPerformanceTest.java
b/tools/src/test/java/org/apache/kafka/tools/ConsumerPerformanceTest.java
index df6c3a93966..497deb7808d 100644
--- a/tools/src/test/java/org/apache/kafka/tools/ConsumerPerformanceTest.java
+++ b/tools/src/test/java/org/apache/kafka/tools/ConsumerPerformanceTest.java
@@ -74,7 +74,7 @@ public class ConsumerPerformanceTest {
String[] args = new String[]{
"--bootstrap-server", "localhost:9092",
"--topic", "test",
- "--messages", "10",
+ "--num-records", "10",
"--print-metrics"
};
@@ -82,15 +82,64 @@ public class ConsumerPerformanceTest {
assertEquals("localhost:9092", config.brokerHostsAndPorts());
assertTrue(config.topic().get().contains("test"));
- assertEquals(10, config.numMessages());
+ assertEquals(10, config.numRecords());
}
@Test
- public void testConfigWithUnrecognizedOption() {
+ public void testBootstrapServerNotPresent() {
+ String[] args = new String[]{
+ "--topic", "test"
+ };
+
+ String err = ToolsTestUtils.captureStandardErr(() ->
+ new ConsumerPerformance.ConsumerPerfOptions(args));
+ assertTrue(err.contains("Missing required argument
\"[bootstrap-server]\""));
+ }
+
+ @Test
+ public void testNumOfRecordsNotPresent() {
+ String[] args = new String[]{
+ "--bootstrap-server", "localhost:9092",
+ "--topic", "test"
+ };
+
+ String err = ToolsTestUtils.captureStandardErr(() ->
+ new ConsumerPerformance.ConsumerPerfOptions(args));
+ assertTrue(err.contains("Exactly one of the following arguments is
required:"));
+ }
+
+ @Test
+ public void testMessagesDeprecated() {
+ String[] args = new String[]{
+ "--bootstrap-server", "localhost:9092",
+ "--topic", "test",
+ "--messages", "10"
+ };
+
+ ConsumerPerformance.ConsumerPerfOptions config = new
ConsumerPerformance.ConsumerPerfOptions(args);
+ assertEquals(10, config.numRecords());
+ }
+
+ @Test
+ public void testNumOfRecordsWithMessagesPresent() {
String[] args = new String[]{
"--bootstrap-server", "localhost:9092",
"--topic", "test",
"--messages", "10",
+ "--num-records", "20"
+ };
+
+ String err = ToolsTestUtils.captureStandardErr(() ->
+ new ConsumerPerformance.ConsumerPerfOptions(args));
+ assertTrue(err.contains("Exactly one of the following arguments is
required"));
+ }
+
+ @Test
+ public void testConfigWithUnrecognizedOption() {
+ String[] args = new String[]{
+ "--bootstrap-server", "localhost:9092",
+ "--topic", "test",
+ "--num-records", "10",
"--new-consumer"
};
@@ -104,14 +153,14 @@ public class ConsumerPerformanceTest {
String[] args = new String[]{
"--bootstrap-server", "localhost:9092",
"--include", "test.*",
- "--messages", "10"
+ "--num-records", "10"
};
ConsumerPerformance.ConsumerPerfOptions config = new
ConsumerPerformance.ConsumerPerfOptions(args);
assertEquals("localhost:9092", config.brokerHostsAndPorts());
assertTrue(config.include().get().toString().contains("test.*"));
- assertEquals(10, config.numMessages());
+ assertEquals(10, config.numRecords());
}
@Test
@@ -120,7 +169,7 @@ public class ConsumerPerformanceTest {
"--bootstrap-server", "localhost:9092",
"--topic", "test",
"--include", "test.*",
- "--messages", "10"
+ "--num-records", "10"
};
String err = ToolsTestUtils.captureStandardErr(() -> new
ConsumerPerformance.ConsumerPerfOptions(args));
@@ -132,7 +181,7 @@ public class ConsumerPerformanceTest {
public void testConfigWithoutTopicAndInclude() {
String[] args = new String[]{
"--bootstrap-server", "localhost:9092",
- "--messages", "10"
+ "--num-records", "10"
};
String err = ToolsTestUtils.captureStandardErr(() -> new
ConsumerPerformance.ConsumerPerfOptions(args));
@@ -140,9 +189,36 @@ public class ConsumerPerformanceTest {
assertTrue(err.contains("Exactly one of the following arguments is
required: [topic], [include]"));
}
+ @Test
+ public void testCommandProperty() throws IOException {
+ Path configPath =
tempDir.resolve("test_command_property_consumer_perf.conf");
+ Files.deleteIfExists(configPath);
+ File tempFile = Files.createFile(configPath).toFile();
+ try (PrintWriter output = new
PrintWriter(Files.newOutputStream(tempFile.toPath()))) {
+ output.println("client.id=consumer-1");
+ output.flush();
+ }
+
+ String[] args = new String[]{
+ "--bootstrap-server", "localhost:9092",
+ "--topic", "test",
+ "--num-records", "10",
+ "--command-property", "client.id=consumer-2",
+ "--command-config", tempFile.getAbsolutePath(),
+ "--command-property", "prop=val"
+ };
+
+ ConsumerPerformance.ConsumerPerfOptions config = new
ConsumerPerformance.ConsumerPerfOptions(args);
+
+ assertEquals("consumer-2",
config.props().getProperty(ConsumerConfig.CLIENT_ID_CONFIG));
+ assertEquals("val", config.props().getProperty("prop"));
+ }
+
@Test
public void testClientIdOverride() throws IOException {
- File tempFile =
Files.createFile(tempDir.resolve("test_consumer_config.conf")).toFile();
+ Path configPath =
tempDir.resolve("test_client_id_override_consumer_perf.conf");
+ Files.deleteIfExists(configPath);
+ File tempFile = Files.createFile(configPath).toFile();
try (PrintWriter output = new
PrintWriter(Files.newOutputStream(tempFile.toPath()))) {
output.println("client.id=consumer-1");
output.flush();
@@ -151,7 +227,29 @@ public class ConsumerPerformanceTest {
String[] args = new String[]{
"--bootstrap-server", "localhost:9092",
"--topic", "test",
- "--messages", "10",
+ "--num-records", "10",
+ "--command-config", tempFile.getAbsolutePath()
+ };
+
+ ConsumerPerformance.ConsumerPerfOptions config = new
ConsumerPerformance.ConsumerPerfOptions(args);
+
+ assertEquals("consumer-1",
config.props().getProperty(ConsumerConfig.CLIENT_ID_CONFIG));
+ }
+
+ @Test
+ public void testConsumerConfigDeprecated() throws IOException {
+ Path configPath =
tempDir.resolve("test_consumer_config_deprecated_consumer_perf.conf");
+ Files.deleteIfExists(configPath);
+ File tempFile = Files.createFile(configPath).toFile();
+ try (PrintWriter output = new
PrintWriter(Files.newOutputStream(tempFile.toPath()))) {
+ output.println("client.id=consumer-1");
+ output.flush();
+ }
+
+ String[] args = new String[]{
+ "--bootstrap-server", "localhost:9092",
+ "--topic", "test",
+ "--num-records", "10",
"--consumer.config", tempFile.getAbsolutePath()
};
@@ -160,12 +258,28 @@ public class ConsumerPerformanceTest {
assertEquals("consumer-1",
config.props().getProperty(ConsumerConfig.CLIENT_ID_CONFIG));
}
+ @Test
+ public void testCommandConfigWithConsumerConfigPresent() {
+ String[] args = new String[]{
+ "--bootstrap-server", "localhost:9092",
+ "--topic", "test",
+ "--num-records", "10",
+ "--consumer.config", "some-path",
+ "--command-config", "some-path"
+ };
+
+ String err = ToolsTestUtils.captureStandardErr(() ->
+ new ConsumerPerformance.ConsumerPerfOptions(args));
+ assertTrue(err.contains(String.format("Option \"%s\" can't be used
with option \"%s\"",
+ "[consumer.config]", "[command-config]")));
+ }
+
@Test
public void testDefaultClientId() throws IOException {
String[] args = new String[]{
"--bootstrap-server", "localhost:9092",
"--topic", "test",
- "--messages", "10"
+ "--num-records", "10"
};
ConsumerPerformance.ConsumerPerfOptions config = new
ConsumerPerformance.ConsumerPerfOptions(args);
@@ -178,7 +292,7 @@ public class ConsumerPerformanceTest {
String[] args = new String[]{
"--bootstrap-server", "localhost:9092",
"--topic", "test",
- "--messages", "0",
+ "--num-records", "0",
"--print-metrics"
};
diff --git
a/tools/src/test/java/org/apache/kafka/tools/ShareConsumerPerformanceTest.java
b/tools/src/test/java/org/apache/kafka/tools/ShareConsumerPerformanceTest.java
index a22a97f8211..6cffde627bb 100644
---
a/tools/src/test/java/org/apache/kafka/tools/ShareConsumerPerformanceTest.java
+++
b/tools/src/test/java/org/apache/kafka/tools/ShareConsumerPerformanceTest.java
@@ -67,7 +67,7 @@ public class ShareConsumerPerformanceTest {
String[] args = new String[]{
"--bootstrap-server", "localhost:9092",
"--topic", "test",
- "--messages", "10",
+ "--num-records", "10",
"--print-metrics"
};
@@ -75,15 +75,65 @@ public class ShareConsumerPerformanceTest {
assertEquals("localhost:9092", config.brokerHostsAndPorts());
assertTrue(config.topic().contains("test"));
- assertEquals(10, config.numMessages());
+ assertEquals(10, config.numRecords());
}
@Test
- public void testConfigWithUnrecognizedOption() {
+ public void testBootstrapServerNotPresent() {
+ String[] args = new String[]{
+ "--topic", "test"
+ };
+
+ String err = ToolsTestUtils.captureStandardErr(() ->
+ new ShareConsumerPerformance.ShareConsumerPerfOptions(args));
+ assertTrue(err.contains("Missing required argument
\"[bootstrap-server]\""));
+ }
+
+ @Test
+ public void testNumOfRecordsNotPresent() {
+ String[] args = new String[]{
+ "--bootstrap-server", "localhost:9092",
+ "--topic", "test"
+ };
+
+ String err = ToolsTestUtils.captureStandardErr(() ->
+ new ShareConsumerPerformance.ShareConsumerPerfOptions(args));
+ assertTrue(err.contains("Exactly one of the following arguments is
required:"));
+ }
+
+ @Test
+ public void testMessagesDeprecated() {
+ String[] args = new String[]{
+ "--bootstrap-server", "localhost:9092",
+ "--topic", "test",
+ "--messages", "10"
+ };
+
+ ShareConsumerPerformance.ShareConsumerPerfOptions config =
+ new ShareConsumerPerformance.ShareConsumerPerfOptions(args);
+ assertEquals(10, config.numRecords());
+ }
+
+ @Test
+ public void testNumOfRecordsWithMessagesPresent() {
String[] args = new String[]{
"--bootstrap-server", "localhost:9092",
"--topic", "test",
"--messages", "10",
+ "--num-records", "20"
+ };
+
+ String err = ToolsTestUtils.captureStandardErr(() ->
+ new ShareConsumerPerformance.ShareConsumerPerfOptions(args));
+ assertTrue(err.contains("Exactly one of the following arguments is
required"));
+ }
+
+ @Test
+ public void testConfigWithUnrecognizedOption() {
+ String[] args = new String[]{
+ "--bootstrap-server", "localhost:9092",
+ "--topic", "test",
+ "--num-records", "10",
"--new-share-consumer"
};
@@ -92,9 +142,36 @@ public class ShareConsumerPerformanceTest {
assertTrue(err.contains("new-share-consumer is not a recognized
option"));
}
+ @Test
+ public void testCommandProperty() throws IOException {
+ Path configPath =
tempDir.resolve("test_command_property_share_consumer_perf.conf");
+ Files.deleteIfExists(configPath);
+ File tempFile = Files.createFile(configPath).toFile();
+ try (PrintWriter output = new
PrintWriter(Files.newOutputStream(tempFile.toPath()))) {
+ output.println("client.id=consumer-1");
+ output.flush();
+ }
+
+ String[] args = new String[]{
+ "--bootstrap-server", "localhost:9092",
+ "--topic", "test",
+ "--num-records", "10",
+ "--command-property", "client.id=consumer-2",
+ "--command-config", tempFile.getAbsolutePath(),
+ "--command-property", "prop=val"
+ };
+
+ ShareConsumerPerformance.ShareConsumerPerfOptions config = new
ShareConsumerPerformance.ShareConsumerPerfOptions(args);
+
+ assertEquals("consumer-2",
config.props().getProperty(ConsumerConfig.CLIENT_ID_CONFIG));
+ assertEquals("val", config.props().getProperty("prop"));
+ }
+
@Test
public void testClientIdOverride() throws IOException {
- File tempFile =
Files.createFile(tempDir.resolve("test_share_consumer_config.conf")).toFile();
+ Path configPath =
tempDir.resolve("test_client_id_override_share_consumer_perf.conf");
+ Files.deleteIfExists(configPath);
+ File tempFile = Files.createFile(configPath).toFile();
try (PrintWriter output = new
PrintWriter(Files.newOutputStream(tempFile.toPath()))) {
output.println("client.id=share-consumer-1");
output.flush();
@@ -103,8 +180,8 @@ public class ShareConsumerPerformanceTest {
String[] args = new String[]{
"--bootstrap-server", "localhost:9092",
"--topic", "test",
- "--messages", "10",
- "--consumer.config", tempFile.getAbsolutePath()
+ "--num-records", "10",
+ "--command-config", tempFile.getAbsolutePath()
};
ShareConsumerPerformance.ShareConsumerPerfOptions config = new
ShareConsumerPerformance.ShareConsumerPerfOptions(args);
@@ -112,12 +189,51 @@ public class ShareConsumerPerformanceTest {
assertEquals("share-consumer-1",
config.props().getProperty(ConsumerConfig.CLIENT_ID_CONFIG));
}
+ @Test
+ public void testConsumerConfigDeprecated() throws IOException {
+ Path configPath =
tempDir.resolve("test_consumer_config_deprecated_share_consumer_perf.conf");
+ Files.deleteIfExists(configPath);
+ File tempFile = Files.createFile(configPath).toFile();
+ try (PrintWriter output = new
PrintWriter(Files.newOutputStream(tempFile.toPath()))) {
+ output.println("client.id=share-consumer-1");
+ output.flush();
+ }
+
+ String[] args = new String[]{
+ "--bootstrap-server", "localhost:9092",
+ "--topic", "test",
+ "--num-records", "10",
+ "--consumer.config", tempFile.getAbsolutePath()
+ };
+
+ ShareConsumerPerformance.ShareConsumerPerfOptions config =
+ new ShareConsumerPerformance.ShareConsumerPerfOptions(args);
+
+ assertEquals("share-consumer-1",
config.props().getProperty(ConsumerConfig.CLIENT_ID_CONFIG));
+ }
+
+ @Test
+ public void testCommandConfigWithConsumerConfigPresent() {
+ String[] args = new String[]{
+ "--bootstrap-server", "localhost:9092",
+ "--topic", "test",
+ "--num-records", "10",
+ "--consumer.config", "some-path",
+ "--command-config", "some-path"
+ };
+
+ String err = ToolsTestUtils.captureStandardErr(() ->
+ new ShareConsumerPerformance.ShareConsumerPerfOptions(args));
+ assertTrue(err.contains(String.format("Option \"%s\" can't be used
with option \"%s\"",
+ "[consumer.config]", "[command-config]")));
+ }
+
@Test
public void testDefaultClientId() throws IOException {
String[] args = new String[]{
"--bootstrap-server", "localhost:9092",
"--topic", "test",
- "--messages", "10"
+ "--num-records", "10"
};
ShareConsumerPerformance.ShareConsumerPerfOptions config = new
ShareConsumerPerformance.ShareConsumerPerfOptions(args);
@@ -130,7 +246,7 @@ public class ShareConsumerPerformanceTest {
String[] args = new String[]{
"--bootstrap-server", "localhost:9092",
"--topic", "test",
- "--messages", "0",
+ "--num-records", "0",
"--print-metrics"
};