This is an automated email from the ASF dual-hosted git repository. chia7712 pushed a commit to branch trunk in repository https://gitbox.apache.org/repos/asf/kafka.git
The following commit(s) were added to refs/heads/trunk by this push: new c565ba1a04f KAFKA-19598: Command-line arguments for producer perf test (#20361) c565ba1a04f is described below commit c565ba1a04f30452500dbf1075f74866deb535ca Author: Andrew Schofield <aschofi...@confluent.io> AuthorDate: Fri Aug 22 18:14:14 2025 +0100 KAFKA-19598: Command-line arguments for producer perf test (#20361) This implements KIP-1147 for kafka-producer-perf-test.sh. Reviewers: Chia-Ping Tsai <chia7...@gmail.com> --- docs/ops.html | 2 +- .../services/performance/producer_performance.py | 2 +- .../apache/kafka/tools/ProducerPerformance.java | 191 +++++++++++------- .../kafka/tools/ProducerPerformanceTest.java | 213 +++++++++++++++++---- 4 files changed, 304 insertions(+), 104 deletions(-) diff --git a/docs/ops.html b/docs/ops.html index 61007a30b8f..f8040610070 100644 --- a/docs/ops.html +++ b/docs/ops.html @@ -4318,7 +4318,7 @@ $ bin/kafka-topics.sh --create --topic tieredTopic --bootstrap-server localhost: <p>Try to send messages to the `tieredTopic` topic to roll the log segment:</p> -<pre><code class="language-bash">$ bin/kafka-producer-perf-test.sh --topic tieredTopic --num-records 1200 --record-size 1024 --throughput -1 --producer-props bootstrap.servers=localhost:9092</code></pre> +<pre><code class="language-bash">$ bin/kafka-producer-perf-test.sh --bootstrap-server localhost:9092 --topic tieredTopic --num-records 1200 --record-size 1024 --throughput -1</code></pre> <p>Then, after the active segment is rolled, the old segment should be moved to the remote storage and get deleted. This can be verified by checking the remote log directory configured above. For example: diff --git a/tests/kafkatest/services/performance/producer_performance.py b/tests/kafkatest/services/performance/producer_performance.py index acfe4790d73..4a93541c7cb 100644 --- a/tests/kafkatest/services/performance/producer_performance.py +++ b/tests/kafkatest/services/performance/producer_performance.py @@ -91,7 +91,7 @@ class ProducerPerformanceService(HttpMetricsCollector, PerformanceService): cmd += " export KAFKA_LOG4J_OPTS=\"%s%s\"; " % (get_log4j_config_param(node), get_log4j_config_for_tools(node)) cmd += "KAFKA_OPTS=%(kafka_opts)s KAFKA_HEAP_OPTS=\"-XX:+HeapDumpOnOutOfMemoryError\" %(kafka_run_class)s org.apache.kafka.tools.ProducerPerformance " \ - "--topic %(topic)s --num-records %(num_records)d --record-size %(record_size)d --throughput %(throughput)d --producer-props bootstrap.servers=%(bootstrap_servers)s client.id=%(client_id)s %(metrics_props)s" % args + "--topic %(topic)s --num-records %(num_records)d --record-size %(record_size)d --throughput %(throughput)d --command-property bootstrap.servers=%(bootstrap_servers)s client.id=%(client_id)s %(metrics_props)s" % args self.security_config.setup_node(node) if self.security_config.security_protocol != SecurityConfig.PLAINTEXT: diff --git a/tools/src/main/java/org/apache/kafka/tools/ProducerPerformance.java b/tools/src/main/java/org/apache/kafka/tools/ProducerPerformance.java index d6ed1d0a4ef..fa21432b587 100644 --- a/tools/src/main/java/org/apache/kafka/tools/ProducerPerformance.java +++ b/tools/src/main/java/org/apache/kafka/tools/ProducerPerformance.java @@ -81,7 +81,7 @@ public class ProducerPerformance { System.out.println("Warmup first " + config.warmupRecords + " records. Steady state results will print after the complete test summary."); } boolean isSteadyState = false; - stats = new Stats(config.numRecords, isSteadyState); + stats = new Stats(config.numRecords, config.reportingInterval, isSteadyState); long startMs = System.currentTimeMillis(); ThroughputThrottler throttler = new ThroughputThrottler(config.throughput, startMs); @@ -101,7 +101,7 @@ public class ProducerPerformance { long sendStartMs = System.currentTimeMillis(); if ((isSteadyState = config.warmupRecords > 0) && i == config.warmupRecords) { - steadyStateStats = new Stats(config.numRecords - config.warmupRecords, isSteadyState); + steadyStateStats = new Stats(config.numRecords - config.warmupRecords, config.reportingInterval, isSteadyState); stats.suppressPrinting(); } cb = new PerfCallback(sendStartMs, payload.length, stats, steadyStateStats); @@ -131,7 +131,7 @@ public class ProducerPerformance { steadyStateStats.printTotal(); } } else { - // Make sure all messages are sent before printing out the stats and the metrics + // Make sure all records are sent before printing out the stats and the metrics // We need to do this in a different branch for now since tests/kafkatest/sanity_checks/test_performance_services.py // expects this class to work with older versions of the client jar that don't support flush(). producer.flush(); @@ -177,7 +177,7 @@ public class ProducerPerformance { } else if (payloadMonotonic) { payload = Long.toString(recordValue).getBytes(StandardCharsets.UTF_8); } else { - throw new IllegalArgumentException("no payload File Path or record Size or payload-monotonic option provided"); + throw new IllegalArgumentException("No payload file, record size or payload-monotonic option provided."); } return payload; } @@ -221,7 +221,7 @@ public class ProducerPerformance { } } - System.out.println("Number of messages read: " + payloadByteList.size()); + System.out.println("Number of records read: " + payloadByteList.size()); } return payloadByteList; @@ -230,24 +230,34 @@ public class ProducerPerformance { /** Get the command-line argument parser. */ static ArgumentParser argParser() { ArgumentParser parser = ArgumentParsers - .newArgumentParser("producer-performance") + .newArgumentParser("kafka-producer-perf-test") .defaultHelp(true) .description("This tool is used to verify the producer performance. To enable transactions, " + - "you can specify a transaction id or set a transaction duration using --transaction-duration-ms. " + - "There are three ways to specify the transaction id: set transaction.id=<id> via --producer-props, " + - "set transaction.id=<id> in the config file via --producer.config, or use --transaction-id <id>."); + "you can specify a transactional id or set a transaction duration using --transaction-duration-ms. " + + "There are three ways to specify the transactional id: set transactional.id=<id> via --command-property, " + + "set transactional.id=<id> in the config file via --command-config, or use --transactional-id <id>."); + + parser.addArgument("--bootstrap-server") + .action(store()) + .required(false) + .type(String.class) + .metavar("BOOTSTRAP-SERVERS") + .dest("bootstrapServers") + .help("The server(s) to connect to. This config takes precedence over bootstrap.servers specified " + + "via --command-property or --command-config."); MutuallyExclusiveGroup payloadOptions = parser .addMutuallyExclusiveGroup() .required(true) - .description("either --record-size or --payload-file must be specified but not both."); + .description("Note that you must provide exactly one of --record-size, --payload-file " + + "or --payload-monotonic."); parser.addArgument("--topic") .action(store()) .required(true) .type(String.class) .metavar("TOPIC") - .help("produce messages to this topic"); + .help("Produce records to this topic."); parser.addArgument("--num-records") .action(store()) @@ -255,7 +265,7 @@ public class ProducerPerformance { .type(Long.class) .metavar("NUM-RECORDS") .dest("numRecords") - .help("number of messages to produce"); + .help("Number of records to produce."); payloadOptions.addArgument("--record-size") .action(store()) @@ -263,7 +273,7 @@ public class ProducerPerformance { .type(Integer.class) .metavar("RECORD-SIZE") .dest("recordSize") - .help("message size in bytes. Note that you must provide exactly one of --record-size or --payload-file " + + .help("Record size in bytes. Note that you must provide exactly one of --record-size, --payload-file " + "or --payload-monotonic."); payloadOptions.addArgument("--payload-file") @@ -272,17 +282,17 @@ public class ProducerPerformance { .type(String.class) .metavar("PAYLOAD-FILE") .dest("payloadFile") - .help("file to read the message payloads from. This works only for UTF-8 encoded text files. " + - "Payloads will be read from this file and a payload will be randomly selected when sending messages. " + - "Note that you must provide exactly one of --record-size or --payload-file or --payload-monotonic."); + .help("File to read the record payloads from. This works only for UTF-8 encoded text files. " + + "Payloads will be read from this file and a payload will be randomly selected when sending records. " + + "Note that you must provide exactly one of --record-size, --payload-file or --payload-monotonic."); payloadOptions.addArgument("--payload-monotonic") .action(storeTrue()) .type(Boolean.class) .metavar("PAYLOAD-MONOTONIC") .dest("payloadMonotonic") - .help("payload is monotonically increasing integer. Note that you must provide exactly one of --record-size " + - "or --payload-file or --payload-monotonic."); + .help("Payload is a monotonically increasing integer. Note that you must provide exactly one of --record-size, " + + "--payload-file or --payload-monotonic."); parser.addArgument("--payload-delimiter") .action(store()) @@ -291,8 +301,7 @@ public class ProducerPerformance { .metavar("PAYLOAD-DELIMITER") .dest("payloadDelimiter") .setDefault("\\n") - .help("provides delimiter to be used when --payload-file is provided. " + - "Defaults to new line. " + + .help("Provides the delimiter to be used when --payload-file is provided. Defaults to new line. " + "Note that this parameter will be ignored if --payload-file is not provided."); parser.addArgument("--throughput") @@ -300,16 +309,26 @@ public class ProducerPerformance { .required(true) .type(Double.class) .metavar("THROUGHPUT") - .help("throttle maximum message throughput to *approximately* THROUGHPUT messages/sec. Set this to -1 to disable throttling."); + .help("Throttle maximum record throughput to *approximately* THROUGHPUT records/sec. Set this to -1 to disable throttling."); parser.addArgument("--producer-props") - .nargs("+") - .required(false) - .metavar("PROP-NAME=PROP-VALUE") - .type(String.class) - .dest("producerConfig") - .help("kafka producer related configuration properties like bootstrap.servers,client.id etc. " + - "These configs take precedence over those passed via --producer.config."); + .nargs("+") + .required(false) + .metavar("PROP-NAME=PROP-VALUE") + .type(String.class) + .dest("producerConfig") + .help("(DEPRECATED) Kafka producer related configuration properties like client.id. " + + "These configs take precedence over those passed via --command-config or --producer.config. " + + "This option will be removed in a future version. Use --command-property instead."); + + parser.addArgument("--command-property") + .nargs("+") + .required(false) + .metavar("PROP-NAME=PROP-VALUE") + .type(String.class) + .dest("commandProperties") + .help("Kafka producer related configuration properties like client.id. " + + "These configs take precedence over those passed via --command-config or --producer.config."); parser.addArgument("--producer.config") .action(store()) @@ -317,46 +336,64 @@ public class ProducerPerformance { .type(String.class) .metavar("CONFIG-FILE") .dest("producerConfigFile") - .help("producer config properties file."); + .help("(DEPRECATED) Producer config properties file. " + + "This option will be removed in a future version. Use --command-config instead."); + + parser.addArgument("--command-config") + .action(store()) + .required(false) + .type(String.class) + .metavar("CONFIG-FILE") + .dest("commandConfigFile") + .help("Producer config properties file."); parser.addArgument("--print-metrics") .action(storeTrue()) .type(Boolean.class) .metavar("PRINT-METRICS") .dest("printMetrics") - .help("print out metrics at the end of the test."); + .help("Print out metrics at the end of the test."); parser.addArgument("--transactional-id") - .action(store()) - .required(false) - .type(String.class) - .metavar("TRANSACTIONAL-ID") - .dest("transactionalId") - .help("The transactional id to use. This config takes precedence over the transactional.id " + - "specified via --producer.config or --producer-props. Note that if the transactional id " + - "is not specified while --transaction-duration-ms is provided, the default value for the " + - "transactional id will be performance-producer- followed by a random uuid."); + .action(store()) + .required(false) + .type(String.class) + .metavar("TRANSACTIONAL-ID") + .dest("transactionalId") + .help("The transactional id to use. This config takes precedence over the transactional.id " + + "specified via --command-property or --command-config. Note that if the transactional id " + + "is not specified while --transaction-duration-ms is provided, the default value for the " + + "transactional id will be performance-producer- followed by a random uuid."); parser.addArgument("--transaction-duration-ms") - .action(store()) - .required(false) - .type(Long.class) - .metavar("TRANSACTION-DURATION") - .dest("transactionDurationMs") - .help("The max age of each transaction. The commitTransaction will be called after this time has elapsed. " + - "The value should be greater than 0. If the transactional id is specified via --producer-props, " + - "--producer.config, or --transactional-id but --transaction-duration-ms is not specified, " + - "the default value will be 3000."); + .action(store()) + .required(false) + .type(Long.class) + .metavar("TRANSACTION-DURATION") + .dest("transactionDurationMs") + .help("The maximum duration of each transaction. The commitTransaction will be called after this time has elapsed. " + + "The value should be greater than 0. If the transactional id is specified via --command-property, " + + "--command-config or --transactional-id but --transaction-duration-ms is not specified, " + + "the default value will be 3000."); parser.addArgument("--warmup-records") - .action(store()) - .required(false) - .type(Long.class) - .metavar("WARMUP-RECORDS") - .dest("warmupRecords") - .setDefault(0L) - .help("The number of records to treat as warmup; these initial records will not be included in steady-state statistics. " + - "An additional summary line will be printed describing the steady-state statistics. (default: 0)."); + .action(store()) + .required(false) + .type(Long.class) + .metavar("WARMUP-RECORDS") + .dest("warmupRecords") + .setDefault(0L) + .help("The number of records to treat as warmup. These initial records will not be included in steady-state statistics. " + + "An additional summary line will be printed describing the steady-state statistics."); + + parser.addArgument("--reporting-interval") + .action(store()) + .required(false) + .type(Long.class) + .metavar("INTERVAL-MS") + .dest("reportingInterval") + .setDefault(5_000L) + .help("Interval in milliseconds at which to print progress info."); return parser; } @@ -381,7 +418,7 @@ public class ProducerPerformance { private final boolean isSteadyState; private boolean suppressPrint; - public Stats(long numRecords, boolean isSteadyState) { + public Stats(long numRecords, long reportingInterval, boolean isSteadyState) { this.start = System.currentTimeMillis(); this.windowStart = System.currentTimeMillis(); this.iteration = 0; @@ -394,7 +431,7 @@ public class ProducerPerformance { this.windowTotalLatency = 0; this.windowBytes = 0; this.totalLatency = 0; - this.reportingInterval = 5000; + this.reportingInterval = reportingInterval; this.isSteadyState = isSteadyState; this.suppressPrint = false; } @@ -529,6 +566,7 @@ public class ProducerPerformance { } static final class ConfigPostProcessor { + final String bootstrapServers; final String topicName; final long numRecords; final long warmupRecords; @@ -540,9 +578,11 @@ public class ProducerPerformance { final Long transactionDurationMs; final boolean transactionsEnabled; final List<byte[]> payloadByteList; + final long reportingInterval; public ConfigPostProcessor(ArgumentParser parser, String[] args) throws IOException, ArgumentParserException { Namespace namespace = parser.parseArgs(args); + this.bootstrapServers = namespace.getString("bootstrapServers"); this.topicName = namespace.getString("topic"); this.numRecords = namespace.getLong("numRecords"); this.warmupRecords = Math.max(namespace.getLong("warmupRecords"), 0); @@ -550,33 +590,56 @@ public class ProducerPerformance { this.throughput = namespace.getDouble("throughput"); this.payloadMonotonic = namespace.getBoolean("payloadMonotonic"); this.shouldPrintMetrics = namespace.getBoolean("printMetrics"); + this.reportingInterval = namespace.getLong("reportingInterval"); List<String> producerConfigs = namespace.getList("producerConfig"); String producerConfigFile = namespace.getString("producerConfigFile"); + List<String> commandProperties = namespace.getList("commandProperties"); + String commandConfigFile = namespace.getString("commandConfigFile"); String payloadFilePath = namespace.getString("payloadFile"); Long transactionDurationMsArg = namespace.getLong("transactionDurationMs"); String transactionIdArg = namespace.getString("transactionalId"); if (numRecords <= 0) { - throw new ArgumentParserException("--num-records should be greater than zero", parser); + throw new ArgumentParserException("--num-records should be greater than zero.", parser); } if (warmupRecords >= numRecords) { throw new ArgumentParserException("The value for --warmup-records must be strictly fewer than the number of records in the test, --num-records.", parser); } if (recordSize != null && recordSize <= 0) { - throw new ArgumentParserException("--record-size should be greater than zero", parser); + throw new ArgumentParserException("--record-size should be greater than zero.", parser); + } + if (bootstrapServers == null && commandProperties == null && producerConfigs == null && producerConfigFile == null && commandConfigFile == null) { + throw new ArgumentParserException("At least one of --bootstrap-server, --command-property, --producer-props, --producer.config or --command-config must be specified.", parser); + } + if (commandProperties != null && producerConfigs != null) { + throw new ArgumentParserException("--command-property and --producer-props cannot be specified together.", parser); } - if (producerConfigs == null && producerConfigFile == null) { - throw new ArgumentParserException("Either --producer-props or --producer.config must be specified.", parser); + if (commandConfigFile != null && producerConfigFile != null) { + throw new ArgumentParserException("--command-config and --producer.config cannot be specified together.", parser); } if (transactionDurationMsArg != null && transactionDurationMsArg <= 0) { - throw new ArgumentParserException("--transaction-duration-ms should be greater than zero", parser); + throw new ArgumentParserException("--transaction-duration-ms should be greater than zero.", parser); + } + if (reportingInterval <= 0) { + throw new ArgumentParserException("--reporting-interval should be greater than zero.", parser); } // since default value gets printed with the help text, we are escaping \n there and replacing it with correct value here. String payloadDelimiter = namespace.getString("payloadDelimiter").equals("\\n") ? "\n" : namespace.getString("payloadDelimiter"); this.payloadByteList = readPayloadFile(payloadFilePath, payloadDelimiter); - this.producerProps = readProps(producerConfigs, producerConfigFile); + if (producerConfigs != null) { + System.out.println("Option --producer-props has been deprecated and will be removed in a future version. Use --command-property instead."); + commandProperties = producerConfigs; + } + if (producerConfigFile != null) { + System.out.println("Option --producer.config has been deprecated and will be removed in a future version. Use --command-config instead."); + commandConfigFile = producerConfigFile; + } + this.producerProps = readProps(commandProperties, commandConfigFile); + if (bootstrapServers != null) { + producerProps.put(ProducerConfig.BOOTSTRAP_SERVERS_CONFIG, bootstrapServers); + } // setup transaction related configs this.transactionsEnabled = transactionDurationMsArg != null || transactionIdArg != null diff --git a/tools/src/test/java/org/apache/kafka/tools/ProducerPerformanceTest.java b/tools/src/test/java/org/apache/kafka/tools/ProducerPerformanceTest.java index 7217c2101a0..007a22a8020 100644 --- a/tools/src/test/java/org/apache/kafka/tools/ProducerPerformanceTest.java +++ b/tools/src/test/java/org/apache/kafka/tools/ProducerPerformanceTest.java @@ -188,7 +188,7 @@ public class ProducerPerformanceTest { "--num-records", "5", "--throughput", "100", "--record-size", "100", - "--producer-props", "bootstrap.servers=localhost:9000"}; + "--bootstrap-server", "localhost:9000"}; producerPerformanceSpy.start(args); verify(producerMock, times(5)).send(any(), any()); verify(producerMock, times(1)).close(); @@ -205,7 +205,7 @@ public class ProducerPerformanceTest { "--throughput", "100", "--record-size", "100", "--transactional-id", "foobar", - "--producer-props", "bootstrap.servers=localhost:9000"}; + "--bootstrap-server", "localhost:9000"}; producerPerformanceSpy.start(args); verify(producerMock, times(1)).beginTransaction(); verify(producerMock, times(1)).commitTransaction(); @@ -225,7 +225,7 @@ public class ProducerPerformanceTest { "--num-records", "10", "--throughput", "1", "--record-size", "100", - "--producer-props", "bootstrap.servers=localhost:9000"}; + "--bootstrap-server", "localhost:9000"}; producerPerformanceSpy.start(args); verify(producerMock, times(10)).send(any(), any()); @@ -246,7 +246,7 @@ public class ProducerPerformanceTest { "--num-records", "10", "--throughput", "1", "--record-size", "100", - "--producer-props", "bootstrap.servers=localhost:9000"}; + "--bootstrap-server", "localhost:9000"}; producerPerformanceSpy.start(args); verify(producerMock, times(10)).send(any(), any()); @@ -263,7 +263,7 @@ public class ProducerPerformanceTest { "--throughput", "100", "--record-size", "100", "--payload-monotonic", - "--producer-props", "bootstrap.servers=localhost:9000"}; + "--bootstrap-server", "localhost:9000"}; ArgumentParser parser1 = ProducerPerformance.argParser(); ArgumentParserException thrown = assertThrows(ArgumentParserException.class, () -> parser1.parseArgs(args1)); assertEquals("argument --payload-monotonic: not allowed with argument --record-size", thrown.getMessage()); @@ -274,7 +274,7 @@ public class ProducerPerformanceTest { "--throughput", "100", "--payload-file", "abc.txt", "--payload-monotonic", - "--producer-props", "bootstrap.servers=localhost:9000"}; + "--bootstrap-server", "localhost:9000"}; ArgumentParser parser2 = ProducerPerformance.argParser(); thrown = assertThrows(ArgumentParserException.class, () -> parser2.parseArgs(args2)); assertEquals("argument --payload-monotonic: not allowed with argument --payload-file", thrown.getMessage()); @@ -287,8 +287,8 @@ public class ProducerPerformanceTest { "--topic", "Hello-Kafka", "--num-records", "5", "--throughput", "100", - "--record-size", "100", - "--producer-props", "bootstrap.servers=localhost:9000"}; + "--record-size", "100", + "--bootstrap-server", "localhost:9000"}; ArgumentParser parser = ProducerPerformance.argParser(); ArgumentParserException thrown = assertThrows(ArgumentParserException.class, () -> parser.parseArgs(args)); assertEquals("unrecognized arguments: '--test'", thrown.getMessage()); @@ -301,7 +301,7 @@ public class ProducerPerformanceTest { "--num-records", "5", "--throughput", "1.25", "--record-size", "100", - "--producer-props", "bootstrap.servers=localhost:9000"}; + "--bootstrap-server", "localhost:9000"}; ArgumentParser parser = ProducerPerformance.argParser(); assertDoesNotThrow(() -> parser.parseArgs(args)); } @@ -354,7 +354,7 @@ public class ProducerPerformanceTest { SplittableRandom random = new SplittableRandom(0); IllegalArgumentException thrown = assertThrows(IllegalArgumentException.class, () -> ProducerPerformance.generateRandomPayload(recordSize, payloadByteList, payload, random, false, 0L)); - assertEquals("no payload File Path or record Size or payload-monotonic option provided", thrown.getMessage()); + assertEquals("No payload file, record size or payload-monotonic option provided.", thrown.getMessage()); } @Test @@ -380,14 +380,14 @@ public class ProducerPerformanceTest { @Test public void testStatsInitializationWithLargeNumRecords() { long numRecords = Long.MAX_VALUE; - assertDoesNotThrow(() -> new ProducerPerformance.Stats(numRecords, false)); + assertDoesNotThrow(() -> new ProducerPerformance.Stats(numRecords, 5000L, false)); } @Test public void testStatsCorrectness() throws Exception { ExecutorService singleThreaded = Executors.newSingleThreadExecutor(); final long numRecords = 1000000; - ProducerPerformance.Stats stats = new ProducerPerformance.Stats(numRecords, false); + ProducerPerformance.Stats stats = new ProducerPerformance.Stats(numRecords, 5000L, false); for (long i = 0; i < numRecords; i++) { final Callback callback = new ProducerPerformance.PerfCallback(0, 100, stats, null); CompletableFuture.runAsync(() -> callback.onCompletion(null, null), singleThreaded); @@ -412,11 +412,12 @@ public class ProducerPerformanceTest { "--throughput", "100", "--record-size", "100", "--print-metrics", - "--producer-props", "bootstrap.servers=localhost:9000", + "--bootstrap-server", "localhost:9000", "--transactional-id", "foobar", "--transaction-duration-ms", "5000", }; ProducerPerformance.ConfigPostProcessor configs = new ProducerPerformance.ConfigPostProcessor(parser, args); + assertEquals("localhost:9000", configs.bootstrapServers); assertEquals("Hello-Kafka", configs.topicName); assertEquals(5, configs.numRecords); assertEquals(100, configs.throughput); @@ -438,28 +439,28 @@ public class ProducerPerformanceTest { "--num-records", "5", "--throughput", "100", "--record-size", "100"}; - assertEquals("Either --producer-props or --producer.config must be specified.", - assertThrows(ArgumentParserException.class, - () -> new ProducerPerformance.ConfigPostProcessor(parser, invalidProducerProps)).getMessage()); + assertEquals("At least one of --bootstrap-server, --command-property, --producer-props, --producer.config or --command-config must be specified.", + assertThrows(ArgumentParserException.class, + () -> new ProducerPerformance.ConfigPostProcessor(parser, invalidProducerProps)).getMessage()); String[] invalidTransactionDurationMs = new String[]{ "--topic", "Hello-Kafka", "--num-records", "5", "--throughput", "100", "--record-size", "100", - "--producer-props", "bootstrap.servers=localhost:9000", + "--bootstrap-server", "localhost:9000", "--transaction-duration-ms", "0"}; - assertEquals("--transaction-duration-ms should be greater than zero", - assertThrows(ArgumentParserException.class, - () -> new ProducerPerformance.ConfigPostProcessor(parser, invalidTransactionDurationMs)).getMessage()); + assertEquals("--transaction-duration-ms should be greater than zero.", + assertThrows(ArgumentParserException.class, + () -> new ProducerPerformance.ConfigPostProcessor(parser, invalidTransactionDurationMs)).getMessage()); String[] invalidNumRecords = new String[]{ "--topic", "Hello-Kafka", "--num-records", "-5", "--throughput", "100", "--record-size", "100", - "--producer-props", "bootstrap.servers=localhost:9000"}; - assertEquals("--num-records should be greater than zero", + "--bootstrap-server", "localhost:9000"}; + assertEquals("--num-records should be greater than zero.", assertThrows(ArgumentParserException.class, () -> new ProducerPerformance.ConfigPostProcessor(parser, invalidNumRecords)).getMessage()); @@ -468,10 +469,72 @@ public class ProducerPerformanceTest { "--num-records", "5", "--throughput", "100", "--record-size", "-100", - "--producer-props", "bootstrap.servers=localhost:9000"}; - assertEquals("--record-size should be greater than zero", + "--bootstrap-server", "localhost:9000"}; + assertEquals("--record-size should be greater than zero.", assertThrows(ArgumentParserException.class, () -> new ProducerPerformance.ConfigPostProcessor(parser, invalidRecordSize)).getMessage()); + + String[] invalidReportingInterval = new String[]{ + "--topic", "Hello-Kafka", + "--num-records", "5", + "--throughput", "100", + "--record-size", "100", + "--reporting-interval", "0", + "--bootstrap-server", "localhost:9000"}; + assertEquals("--reporting-interval should be greater than zero.", + assertThrows(ArgumentParserException.class, + () -> new ProducerPerformance.ConfigPostProcessor(parser, invalidReportingInterval)).getMessage()); + } + + @Test + public void testBootstrapServer() throws IOException, ArgumentParserException { + ArgumentParser parser = ProducerPerformance.argParser(); + String[] args = new String[]{ + "--topic", "Hello-Kafka", + "--num-records", "5", + "--throughput", "100", + "--record-size", "100", + "--bootstrap-server", "localhost:9000"}; + ProducerPerformance.ConfigPostProcessor configs = new ProducerPerformance.ConfigPostProcessor(parser, args); + assertEquals("localhost:9000", configs.producerProps.get(ProducerConfig.BOOTSTRAP_SERVERS_CONFIG)); + + args = new String[]{ + "--topic", "Hello-Kafka", + "--num-records", "5", + "--throughput", "100", + "--record-size", "100", + "--command-property", "bootstrap.servers=localhost:9001"}; + configs = new ProducerPerformance.ConfigPostProcessor(parser, args); + assertEquals("localhost:9001", configs.producerProps.get(ProducerConfig.BOOTSTRAP_SERVERS_CONFIG)); + + args = new String[]{ + "--topic", "Hello-Kafka", + "--num-records", "5", + "--throughput", "100", + "--record-size", "100", + "--bootstrap-server", "localhost:9000", + "--command-property", "bootstrap.servers=localhost:9001"}; + configs = new ProducerPerformance.ConfigPostProcessor(parser, args); + assertEquals("localhost:9000", configs.producerProps.get(ProducerConfig.BOOTSTRAP_SERVERS_CONFIG)); + + args = new String[]{ + "--topic", "Hello-Kafka", + "--num-records", "5", + "--throughput", "100", + "--record-size", "100", + "--producer-props", "bootstrap.servers=localhost:9001"}; + configs = new ProducerPerformance.ConfigPostProcessor(parser, args); + assertEquals("localhost:9001", configs.producerProps.get(ProducerConfig.BOOTSTRAP_SERVERS_CONFIG)); + + args = new String[]{ + "--topic", "Hello-Kafka", + "--num-records", "5", + "--throughput", "100", + "--record-size", "100", + "--bootstrap-server", "localhost:9000", + "--producer-props", "bootstrap.servers=localhost:9001"}; + configs = new ProducerPerformance.ConfigPostProcessor(parser, args); + assertEquals("localhost:9000", configs.producerProps.get(ProducerConfig.BOOTSTRAP_SERVERS_CONFIG)); } @Test @@ -482,7 +545,7 @@ public class ProducerPerformanceTest { "--num-records", "5", "--throughput", "100", "--record-size", "100", - "--producer-props", "bootstrap.servers=localhost:9000"}; + "--bootstrap-server", "localhost:9000"}; ProducerPerformance.ConfigPostProcessor configs = new ProducerPerformance.ConfigPostProcessor(parser, args); assertFalse(configs.transactionsEnabled); assertNull(configs.transactionDurationMs); @@ -490,14 +553,15 @@ public class ProducerPerformanceTest { } @Test - public void testEnableTransactionByProducerProps() throws IOException, ArgumentParserException { + public void testEnableTransactionByProducerProperty() throws IOException, ArgumentParserException { ArgumentParser parser = ProducerPerformance.argParser(); String[] args = new String[]{ "--topic", "Hello-Kafka", "--num-records", "5", "--throughput", "100", "--record-size", "100", - "--producer-props", "bootstrap.servers=localhost:9000", "transactional.id=foobar"}; + "--bootstrap-server", "localhost:9000", + "--command-property", "transactional.id=foobar"}; ProducerPerformance.ConfigPostProcessor configs = new ProducerPerformance.ConfigPostProcessor(parser, args); assertTrue(configs.transactionsEnabled); assertEquals(ProducerPerformance.DEFAULT_TRANSACTION_DURATION_MS, configs.transactionDurationMs); @@ -513,8 +577,8 @@ public class ProducerPerformanceTest { "--num-records", "5", "--throughput", "100", "--record-size", "100", - "--producer.config", producerConfigFile.getAbsolutePath(), - "--producer-props", "bootstrap.servers=localhost:9000"}; + "--bootstrap-server", "localhost:9000", + "--command-config", producerConfigFile.getAbsolutePath()}; ProducerPerformance.ConfigPostProcessor configs = new ProducerPerformance.ConfigPostProcessor(parser, args); assertTrue(configs.transactionsEnabled); assertEquals(ProducerPerformance.DEFAULT_TRANSACTION_DURATION_MS, configs.transactionDurationMs); @@ -525,8 +589,55 @@ public class ProducerPerformanceTest { "--num-records", "5", "--throughput", "100", "--record-size", "100", + "--bootstrap-server", "localhost:9000", + "--command-config", producerConfigFile.getAbsolutePath(), + "--command-property", "transactional.id=hello_kafka"}; + configs = new ProducerPerformance.ConfigPostProcessor(parser, args); + assertTrue(configs.transactionsEnabled); + assertEquals(ProducerPerformance.DEFAULT_TRANSACTION_DURATION_MS, configs.transactionDurationMs); + assertEquals("hello_kafka", configs.producerProps.get(ProducerConfig.TRANSACTIONAL_ID_CONFIG)); + + args = new String[]{ + "--topic", "Hello-Kafka", + "--num-records", "5", + "--throughput", "100", + "--record-size", "100", + "--transactional-id", "kafka_hello", + "--bootstrap-server", "localhost:9000", + "--command-config", producerConfigFile.getAbsolutePath(), + "--command-property", "transactional.id=hello_kafka"}; + configs = new ProducerPerformance.ConfigPostProcessor(parser, args); + assertTrue(configs.transactionsEnabled); + assertEquals(ProducerPerformance.DEFAULT_TRANSACTION_DURATION_MS, configs.transactionDurationMs); + assertEquals("kafka_hello", configs.producerProps.get(ProducerConfig.TRANSACTIONAL_ID_CONFIG)); + + Utils.delete(producerConfigFile); + } + + @Test + public void testEnableTransactionByTransactionIdDeprecated() throws IOException, ArgumentParserException { + File producerConfigFile = createTempFile("transactional.id=foobar"); + ArgumentParser parser = ProducerPerformance.argParser(); + String[] args = new String[]{ + "--topic", "Hello-Kafka", + "--num-records", "5", + "--throughput", "100", + "--record-size", "100", + "--bootstrap-server", "localhost:9000", + "--producer.config", producerConfigFile.getAbsolutePath()}; + ProducerPerformance.ConfigPostProcessor configs = new ProducerPerformance.ConfigPostProcessor(parser, args); + assertTrue(configs.transactionsEnabled); + assertEquals(ProducerPerformance.DEFAULT_TRANSACTION_DURATION_MS, configs.transactionDurationMs); + assertEquals("foobar", configs.producerProps.get(ProducerConfig.TRANSACTIONAL_ID_CONFIG)); + + args = new String[]{ + "--topic", "Hello-Kafka", + "--num-records", "5", + "--throughput", "100", + "--record-size", "100", + "--bootstrap-server", "localhost:9000", "--producer.config", producerConfigFile.getAbsolutePath(), - "--producer-props", "bootstrap.servers=localhost:9000", "transactional.id=hello_kafka"}; + "--producer-props", "transactional.id=hello_kafka"}; configs = new ProducerPerformance.ConfigPostProcessor(parser, args); assertTrue(configs.transactionsEnabled); assertEquals(ProducerPerformance.DEFAULT_TRANSACTION_DURATION_MS, configs.transactionDurationMs); @@ -538,8 +649,9 @@ public class ProducerPerformanceTest { "--throughput", "100", "--record-size", "100", "--transactional-id", "kafka_hello", + "--bootstrap-server", "localhost:9000", "--producer.config", producerConfigFile.getAbsolutePath(), - "--producer-props", "bootstrap.servers=localhost:9000", "transactional.id=hello_kafka"}; + "--producer-props", "transactional.id=hello_kafka"}; configs = new ProducerPerformance.ConfigPostProcessor(parser, args); assertTrue(configs.transactionsEnabled); assertEquals(ProducerPerformance.DEFAULT_TRANSACTION_DURATION_MS, configs.transactionDurationMs); @@ -548,6 +660,31 @@ public class ProducerPerformanceTest { Utils.delete(producerConfigFile); } + @Test + public void testEnsureDeprecatedAndModernArgumentsNotBothSpecified() throws IOException { + File producerConfigFile = createTempFile("bootstrap.servers=localhost:9000"); + String[] args = new String[]{ + "--topic", "Hello-Kafka", + "--num-records", "5", + "--throughput", "100", + "--record-size", "100", + "--producer.config", producerConfigFile.getAbsolutePath(), + "--command-config", producerConfigFile.getAbsolutePath()}; + ArgumentParser parser = ProducerPerformance.argParser(); + assertThrows(ArgumentParserException.class, () -> new ProducerPerformance.ConfigPostProcessor(parser, args)); + + String[] args2 = new String[]{ + "--topic", "Hello-Kafka", + "--num-records", "5", + "--throughput", "100", + "--record-size", "100", + "--producer-props", "bootstrap.servers=localhost:9090", + "--command-property", "bootstrap.servers=localhost:9090"}; + assertThrows(ArgumentParserException.class, () -> new ProducerPerformance.ConfigPostProcessor(parser, args2)); + + Utils.delete(producerConfigFile); + } + @Test public void testEnableTransactionByTransactionDurationMs() throws IOException, ArgumentParserException { ArgumentParser parser = ProducerPerformance.argParser(); @@ -557,7 +694,7 @@ public class ProducerPerformanceTest { "--throughput", "100", "--record-size", "100", "--transaction-duration-ms", "5000", - "--producer-props", "bootstrap.servers=localhost:9000"}; + "--bootstrap-server", "localhost:9000"}; ProducerPerformance.ConfigPostProcessor configs = new ProducerPerformance.ConfigPostProcessor(parser, args); assertTrue(configs.transactionsEnabled); assertEquals(5000, configs.transactionDurationMs); @@ -566,28 +703,28 @@ public class ProducerPerformanceTest { } @Test - public void testWarmupRecordsFractionalValue() throws Exception { + public void testWarmupRecordsFractionalValue() { String[] args = new String[] { "--topic", "Hello-Kafka", "--num-records", "10", "--warmup-records", "1.5", "--throughput", "100", "--record-size", "100", - "--producer-props", "bootstrap.servers=localhost:9000"}; + "--bootstrap-server", "localhost:9000"}; ArgumentParser parser = ProducerPerformance.argParser(); ArgumentParserException thrown = assertThrows(ArgumentParserException.class, () -> parser.parseArgs(args)); thrown.printStackTrace(); } @Test - public void testWarmupRecordsString() throws Exception { + public void testWarmupRecordsString() { String[] args = new String[] { "--topic", "Hello-Kafka", "--num-records", "10", "--warmup-records", "foo", "--throughput", "100", "--record-size", "100", - "--producer-props", "bootstrap.servers=localhost:9000"}; + "--bootstrap-server", "localhost:9000"}; ArgumentParser parser = ProducerPerformance.argParser(); ArgumentParserException thrown = assertThrows(ArgumentParserException.class, () -> parser.parseArgs(args)); thrown.printStackTrace(); @@ -607,7 +744,7 @@ public class ProducerPerformanceTest { "--warmup-records", "2", "--throughput", "1", "--record-size", "100", - "--producer-props", "bootstrap.servers=localhost:9000"}; + "--bootstrap-server", "localhost:9000"}; producerPerformanceSpy.start(args); verify(producerMock, times(10)).send(any(), any()); @@ -630,7 +767,7 @@ public class ProducerPerformanceTest { "--warmup-records", "-1", "--throughput", "1", "--record-size", "100", - "--producer-props", "bootstrap.servers=localhost:9000"}; + "--bootstrap-server", "localhost:9000"}; producerPerformanceSpy.start(args); verify(producerMock, times(10)).send(any(), any());