This is an automated email from the ASF dual-hosted git repository.
mimaison pushed a commit to branch trunk
in repository https://gitbox.apache.org/repos/asf/kafka.git
The following commit(s) were added to refs/heads/trunk by this push:
new 07e2f6cd4d0 KAFKA-14578: Move ConsumerPerformance to tools (#13215)
07e2f6cd4d0 is described below
commit 07e2f6cd4d0ba703db510ba72354f8e500f72536
Author: Federico Valeri <[email protected]>
AuthorDate: Mon Mar 6 18:16:55 2023 +0100
KAFKA-14578: Move ConsumerPerformance to tools (#13215)
Reviewers: Mickael Maison <[email protected]>, Alexandre Dupriez
<[email protected]>
---
bin/kafka-consumer-perf-test.sh | 2 +-
bin/windows/kafka-consumer-perf-test.bat | 2 +-
.../scala/kafka/tools/ConsumerPerformance.scala | 306 ----------------
core/src/main/scala/kafka/tools/PerfConfig.scala | 39 --
.../unit/kafka/tools/ConsumerPerformanceTest.scala | 166 ---------
tests/kafkatest/benchmarks/core/benchmark_test.py | 2 +-
.../services/performance/consumer_performance.py | 2 +-
.../apache/kafka/tools/ConsumerPerformance.java | 407 +++++++++++++++++++++
.../kafka/tools/ConsumerPerformanceTest.java | 172 +++++++++
9 files changed, 583 insertions(+), 515 deletions(-)
diff --git a/bin/kafka-consumer-perf-test.sh b/bin/kafka-consumer-perf-test.sh
index 77cda721d6c..4eebe87a5fb 100755
--- a/bin/kafka-consumer-perf-test.sh
+++ b/bin/kafka-consumer-perf-test.sh
@@ -17,4 +17,4 @@
if [ "x$KAFKA_HEAP_OPTS" = "x" ]; then
export KAFKA_HEAP_OPTS="-Xmx512M"
fi
-exec $(dirname $0)/kafka-run-class.sh kafka.tools.ConsumerPerformance "$@"
+exec $(dirname $0)/kafka-run-class.sh
org.apache.kafka.tools.ConsumerPerformance "$@"
diff --git a/bin/windows/kafka-consumer-perf-test.bat
b/bin/windows/kafka-consumer-perf-test.bat
index 606c784605a..17e17d3b105 100644
--- a/bin/windows/kafka-consumer-perf-test.bat
+++ b/bin/windows/kafka-consumer-perf-test.bat
@@ -16,5 +16,5 @@ rem limitations under the License.
SetLocal
set KAFKA_HEAP_OPTS=-Xmx512M -Xms512M
-"%~dp0kafka-run-class.bat" kafka.tools.ConsumerPerformance %*
+"%~dp0kafka-run-class.bat" org.apache.kafka.tools.ConsumerPerformance %*
EndLocal
diff --git a/core/src/main/scala/kafka/tools/ConsumerPerformance.scala
b/core/src/main/scala/kafka/tools/ConsumerPerformance.scala
deleted file mode 100644
index 56f49456705..00000000000
--- a/core/src/main/scala/kafka/tools/ConsumerPerformance.scala
+++ /dev/null
@@ -1,306 +0,0 @@
-/**
- * Licensed to the Apache Software Foundation (ASF) under one or more
- * contributor license agreements. See the NOTICE file distributed with
- * this work for additional information regarding copyright ownership.
- * The ASF licenses this file to You under the Apache License, Version 2.0
- * (the "License"); you may not use this file except in compliance with
- * the License. You may obtain a copy of the License at
- *
- * http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
- */
-
-package kafka.tools
-
-import java.text.SimpleDateFormat
-import java.time.Duration
-import java.util
-import java.util.concurrent.atomic.AtomicLong
-import java.util.{Properties, Random}
-import com.typesafe.scalalogging.LazyLogging
-import joptsimple.OptionException
-import kafka.utils.ToolsUtils
-import org.apache.kafka.clients.consumer.{ConsumerRebalanceListener,
KafkaConsumer}
-import org.apache.kafka.common.serialization.ByteArrayDeserializer
-import org.apache.kafka.common.utils.Utils
-import org.apache.kafka.common.{Metric, MetricName, TopicPartition}
-import org.apache.kafka.server.util.CommandLineUtils
-
-import scala.jdk.CollectionConverters._
-import scala.collection.mutable
-
-/**
- * Performance test for the full zookeeper consumer
- */
-object ConsumerPerformance extends LazyLogging {
-
- def main(args: Array[String]): Unit = {
-
- val config = new ConsumerPerfConfig(args)
- logger.info("Starting consumer...")
- val totalMessagesRead = new AtomicLong(0)
- val totalBytesRead = new AtomicLong(0)
- var metrics: mutable.Map[MetricName, _ <: Metric] = null
- val joinGroupTimeInMs = new AtomicLong(0)
-
- if (!config.hideHeader)
- printHeader(config.showDetailedStats)
-
- var startMs, endMs = 0L
- val consumer = new KafkaConsumer[Array[Byte], Array[Byte]](config.props)
- startMs = System.currentTimeMillis
- consume(consumer, List(config.topic), config.numMessages,
config.recordFetchTimeoutMs, config, totalMessagesRead, totalBytesRead,
joinGroupTimeInMs, startMs)
- endMs = System.currentTimeMillis
-
- if (config.printMetrics) {
- metrics = consumer.metrics.asScala
- }
- consumer.close()
- val elapsedSecs = (endMs - startMs) / 1000.0
- val fetchTimeInMs = (endMs - startMs) - joinGroupTimeInMs.get
- if (!config.showDetailedStats) {
- val totalMBRead = (totalBytesRead.get * 1.0) / (1024 * 1024)
- println("%s, %s, %.4f, %.4f, %d, %.4f, %d, %d, %.4f, %.4f".format(
- config.dateFormat.format(startMs),
- config.dateFormat.format(endMs),
- totalMBRead,
- totalMBRead / elapsedSecs,
- totalMessagesRead.get,
- totalMessagesRead.get / elapsedSecs,
- joinGroupTimeInMs.get,
- fetchTimeInMs,
- totalMBRead / (fetchTimeInMs / 1000.0),
- totalMessagesRead.get / (fetchTimeInMs / 1000.0)
- ))
- }
-
- if (metrics != null) {
- ToolsUtils.printMetrics(metrics)
- }
-
- }
-
- private[tools] def printHeader(showDetailedStats: Boolean): Unit = {
- val newFieldsInHeader = ", rebalance.time.ms, fetch.time.ms, fetch.MB.sec,
fetch.nMsg.sec"
- if (!showDetailedStats)
- println("start.time, end.time, data.consumed.in.MB, MB.sec,
data.consumed.in.nMsg, nMsg.sec" + newFieldsInHeader)
- else
- println("time, threadId, data.consumed.in.MB, MB.sec,
data.consumed.in.nMsg, nMsg.sec" + newFieldsInHeader)
- }
-
- def consume(consumer: KafkaConsumer[Array[Byte], Array[Byte]],
- topics: List[String],
- count: Long,
- timeout: Long,
- config: ConsumerPerfConfig,
- totalMessagesRead: AtomicLong,
- totalBytesRead: AtomicLong,
- joinTime: AtomicLong,
- testStartTime: Long): Unit = {
- var bytesRead = 0L
- var messagesRead = 0L
- var lastBytesRead = 0L
- var lastMessagesRead = 0L
- var joinStart = System.currentTimeMillis
- var joinTimeMsInSingleRound = 0L
-
- consumer.subscribe(topics.asJava, new ConsumerRebalanceListener {
- def onPartitionsAssigned(partitions: util.Collection[TopicPartition]):
Unit = {
- joinTime.addAndGet(System.currentTimeMillis - joinStart)
- joinTimeMsInSingleRound += System.currentTimeMillis - joinStart
- }
- def onPartitionsRevoked(partitions: util.Collection[TopicPartition]):
Unit = {
- joinStart = System.currentTimeMillis
- }})
-
- // Now start the benchmark
- var currentTimeMillis = System.currentTimeMillis
- var lastReportTime: Long = currentTimeMillis
- var lastConsumedTime = currentTimeMillis
-
- while (messagesRead < count && currentTimeMillis - lastConsumedTime <=
timeout) {
- val records = consumer.poll(Duration.ofMillis(100)).asScala
- currentTimeMillis = System.currentTimeMillis
- if (records.nonEmpty)
- lastConsumedTime = currentTimeMillis
- for (record <- records) {
- messagesRead += 1
- if (record.key != null)
- bytesRead += record.key.size
- if (record.value != null)
- bytesRead += record.value.size
-
- if (currentTimeMillis - lastReportTime >= config.reportingInterval) {
- if (config.showDetailedStats)
- printConsumerProgress(0, bytesRead, lastBytesRead, messagesRead,
lastMessagesRead,
- lastReportTime, currentTimeMillis, config.dateFormat,
joinTimeMsInSingleRound)
- joinTimeMsInSingleRound = 0L
- lastReportTime = currentTimeMillis
- lastMessagesRead = messagesRead
- lastBytesRead = bytesRead
- }
- }
- }
-
- if (messagesRead < count)
- println(s"WARNING: Exiting before consuming the expected number of
messages: timeout ($timeout ms) exceeded. " +
- "You can use the --timeout option to increase the timeout.")
- totalMessagesRead.set(messagesRead)
- totalBytesRead.set(bytesRead)
- }
-
- def printConsumerProgress(id: Int,
- bytesRead: Long,
- lastBytesRead: Long,
- messagesRead: Long,
- lastMessagesRead: Long,
- startMs: Long,
- endMs: Long,
- dateFormat: SimpleDateFormat,
- periodicJoinTimeInMs: Long): Unit = {
- printBasicProgress(id, bytesRead, lastBytesRead, messagesRead,
lastMessagesRead, startMs, endMs, dateFormat)
- printExtendedProgress(bytesRead, lastBytesRead, messagesRead,
lastMessagesRead, startMs, endMs, periodicJoinTimeInMs)
- println()
- }
-
- private def printBasicProgress(id: Int,
- bytesRead: Long,
- lastBytesRead: Long,
- messagesRead: Long,
- lastMessagesRead: Long,
- startMs: Long,
- endMs: Long,
- dateFormat: SimpleDateFormat): Unit = {
- val elapsedMs: Double = (endMs - startMs).toDouble
- val totalMbRead = (bytesRead * 1.0) / (1024 * 1024)
- val intervalMbRead = ((bytesRead - lastBytesRead) * 1.0) / (1024 * 1024)
- val intervalMbPerSec = 1000.0 * intervalMbRead / elapsedMs
- val intervalMessagesPerSec = ((messagesRead - lastMessagesRead) /
elapsedMs) * 1000.0
- print("%s, %d, %.4f, %.4f, %d, %.4f".format(dateFormat.format(endMs), id,
totalMbRead,
- intervalMbPerSec, messagesRead, intervalMessagesPerSec))
- }
-
- private def printExtendedProgress(bytesRead: Long,
- lastBytesRead: Long,
- messagesRead: Long,
- lastMessagesRead: Long,
- startMs: Long,
- endMs: Long,
- periodicJoinTimeInMs: Long): Unit = {
- val fetchTimeMs = endMs - startMs - periodicJoinTimeInMs
- val intervalMbRead = ((bytesRead - lastBytesRead) * 1.0) / (1024 * 1024)
- val intervalMessagesRead = messagesRead - lastMessagesRead
- val (intervalMbPerSec, intervalMessagesPerSec) = if (fetchTimeMs <= 0)
- (0.0, 0.0)
- else
- (1000.0 * intervalMbRead / fetchTimeMs, 1000.0 * intervalMessagesRead /
fetchTimeMs)
- print(", %d, %d, %.4f, %.4f".format(periodicJoinTimeInMs, fetchTimeMs,
intervalMbPerSec, intervalMessagesPerSec))
- }
-
- class ConsumerPerfConfig(args: Array[String]) extends PerfConfig(args) {
- val brokerListOpt = parser.accepts("broker-list", "DEPRECATED, use
--bootstrap-server instead; ignored if --bootstrap-server is specified. The
broker list string in the form HOST1:PORT1,HOST2:PORT2.")
- .withRequiredArg
- .describedAs("broker-list")
- .ofType(classOf[String])
- val bootstrapServerOpt = parser.accepts("bootstrap-server", "REQUIRED
unless --broker-list(deprecated) is specified. The server(s) to connect to.")
- .requiredUnless("broker-list")
- .withRequiredArg
- .describedAs("server to connect to")
- .ofType(classOf[String])
- val topicOpt = parser.accepts("topic", "REQUIRED: The topic to consume
from.")
- .withRequiredArg
- .describedAs("topic")
- .ofType(classOf[String])
- val groupIdOpt = parser.accepts("group", "The group id to consume on.")
- .withRequiredArg
- .describedAs("gid")
- .defaultsTo("perf-consumer-" + new Random().nextInt(100000))
- .ofType(classOf[String])
- val fetchSizeOpt = parser.accepts("fetch-size", "The amount of data to
fetch in a single request.")
- .withRequiredArg
- .describedAs("size")
- .ofType(classOf[java.lang.Integer])
- .defaultsTo(1024 * 1024)
- val resetBeginningOffsetOpt = parser.accepts("from-latest", "If the
consumer does not already have an established " +
- "offset to consume from, start with the latest message present in the
log rather than the earliest message.")
- val socketBufferSizeOpt = parser.accepts("socket-buffer-size", "The size
of the tcp RECV size.")
- .withRequiredArg
- .describedAs("size")
- .ofType(classOf[java.lang.Integer])
- .defaultsTo(2 * 1024 * 1024)
- val numThreadsOpt = parser.accepts("threads", "DEPRECATED AND IGNORED:
Number of processing threads.")
- .withRequiredArg
- .describedAs("count")
- .ofType(classOf[java.lang.Integer])
- .defaultsTo(10)
- val numFetchersOpt = parser.accepts("num-fetch-threads", "DEPRECATED AND
IGNORED: Number of fetcher threads.")
- .withRequiredArg
- .describedAs("count")
- .ofType(classOf[java.lang.Integer])
- .defaultsTo(1)
- val consumerConfigOpt = parser.accepts("consumer.config", "Consumer config
properties file.")
- .withRequiredArg
- .describedAs("config file")
- .ofType(classOf[String])
- val printMetricsOpt = parser.accepts("print-metrics", "Print out the
metrics.")
- val showDetailedStatsOpt = parser.accepts("show-detailed-stats", "If set,
stats are reported for each reporting " +
- "interval as configured by reporting-interval")
- val recordFetchTimeoutOpt = parser.accepts("timeout", "The maximum allowed
time in milliseconds between returned records.")
- .withOptionalArg()
- .describedAs("milliseconds")
- .ofType(classOf[Long])
- .defaultsTo(10000)
-
- try
- options = parser.parse(args: _*)
- catch {
- case e: OptionException =>
- CommandLineUtils.printUsageAndExit(parser, e.getMessage)
- }
-
- if(options.has(numThreadsOpt) || options.has(numFetchersOpt))
- println("WARNING: option [threads] and [num-fetch-threads] have been
deprecated and will be ignored by the test")
-
- CommandLineUtils.maybePrintHelpOrVersion(this, "This tool helps in
performance test for the full zookeeper consumer")
-
- CommandLineUtils.checkRequiredArgs(parser, options, topicOpt,
numMessagesOpt)
-
- val printMetrics = options.has(printMetricsOpt)
-
- val props = if (options.has(consumerConfigOpt))
- Utils.loadProps(options.valueOf(consumerConfigOpt))
- else
- new Properties
-
- import org.apache.kafka.clients.consumer.ConsumerConfig
-
- val brokerHostsAndPorts = options.valueOf(if
(options.has(bootstrapServerOpt)) bootstrapServerOpt else brokerListOpt)
- props.put(ConsumerConfig.BOOTSTRAP_SERVERS_CONFIG, brokerHostsAndPorts)
- props.put(ConsumerConfig.GROUP_ID_CONFIG, options.valueOf(groupIdOpt))
- props.put(ConsumerConfig.RECEIVE_BUFFER_CONFIG,
options.valueOf(socketBufferSizeOpt).toString)
- props.put(ConsumerConfig.MAX_PARTITION_FETCH_BYTES_CONFIG,
options.valueOf(fetchSizeOpt).toString)
- props.put(ConsumerConfig.AUTO_OFFSET_RESET_CONFIG, if
(options.has(resetBeginningOffsetOpt)) "latest" else "earliest")
- props.put(ConsumerConfig.KEY_DESERIALIZER_CLASS_CONFIG,
classOf[ByteArrayDeserializer])
- props.put(ConsumerConfig.VALUE_DESERIALIZER_CLASS_CONFIG,
classOf[ByteArrayDeserializer])
- props.put(ConsumerConfig.CHECK_CRCS_CONFIG, "false")
- if (props.getProperty(ConsumerConfig.CLIENT_ID_CONFIG) == null)
- props.put(ConsumerConfig.CLIENT_ID_CONFIG, "perf-consumer-client")
-
- val numThreads = options.valueOf(numThreadsOpt).intValue
- val topic = options.valueOf(topicOpt)
- val numMessages = options.valueOf(numMessagesOpt).longValue
- val reportingInterval = options.valueOf(reportingIntervalOpt).intValue
- if (reportingInterval <= 0)
- throw new IllegalArgumentException("Reporting interval must be greater
than 0.")
- val showDetailedStats = options.has(showDetailedStatsOpt)
- val dateFormat = new SimpleDateFormat(options.valueOf(dateFormatOpt))
- val hideHeader = options.has(hideHeaderOpt)
- val recordFetchTimeoutMs =
options.valueOf(recordFetchTimeoutOpt).longValue()
- }
-
-}
diff --git a/core/src/main/scala/kafka/tools/PerfConfig.scala
b/core/src/main/scala/kafka/tools/PerfConfig.scala
deleted file mode 100644
index 6857e401e80..00000000000
--- a/core/src/main/scala/kafka/tools/PerfConfig.scala
+++ /dev/null
@@ -1,39 +0,0 @@
-/**
- * Licensed to the Apache Software Foundation (ASF) under one or more
- * contributor license agreements. See the NOTICE file distributed with
- * this work for additional information regarding copyright ownership.
- * The ASF licenses this file to You under the Apache License, Version 2.0
- * (the "License"); you may not use this file except in compliance with
- * the License. You may obtain a copy of the License at
- *
- * http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
-*/
-
-package kafka.tools
-
-import org.apache.kafka.server.util.CommandDefaultOptions
-
-class PerfConfig(args: Array[String]) extends CommandDefaultOptions(args) {
- val numMessagesOpt = parser.accepts("messages", "REQUIRED: The number of
messages to send or consume")
- .withRequiredArg
- .describedAs("count")
- .ofType(classOf[java.lang.Long])
- val reportingIntervalOpt = parser.accepts("reporting-interval", "Interval in
milliseconds at which to print progress info.")
- .withRequiredArg
- .describedAs("interval_ms")
- .ofType(classOf[java.lang.Integer])
- .defaultsTo(5000)
- val dateFormatOpt = parser.accepts("date-format", "The date format to use
for formatting the time field. " +
- "See java.text.SimpleDateFormat for options.")
- .withRequiredArg
- .describedAs("date format")
- .ofType(classOf[String])
- .defaultsTo("yyyy-MM-dd HH:mm:ss:SSS")
- val hideHeaderOpt = parser.accepts("hide-header", "If set, skips printing
the header for the stats ")
-}
diff --git a/core/src/test/scala/unit/kafka/tools/ConsumerPerformanceTest.scala
b/core/src/test/scala/unit/kafka/tools/ConsumerPerformanceTest.scala
deleted file mode 100644
index d872a40bc1d..00000000000
--- a/core/src/test/scala/unit/kafka/tools/ConsumerPerformanceTest.scala
+++ /dev/null
@@ -1,166 +0,0 @@
-/**
- * Licensed to the Apache Software Foundation (ASF) under one or more
- * contributor license agreements. See the NOTICE file distributed with
- * this work for additional information regarding copyright ownership.
- * The ASF licenses this file to You under the Apache License, Version 2.0
- * (the "License"); you may not use this file except in compliance with
- * the License. You may obtain a copy of the License at
- *
- * http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
- */
-
-package kafka.tools
-
-import java.io.{ByteArrayOutputStream, PrintWriter}
-import java.text.SimpleDateFormat
-import kafka.utils.{Exit, TestUtils}
-import org.apache.kafka.clients.consumer.ConsumerConfig
-import org.junit.jupiter.api.Assertions.{assertEquals, assertThrows}
-import org.junit.jupiter.api.Test
-
-class ConsumerPerformanceTest {
- private val dateFormat = new SimpleDateFormat("yyyy-MM-dd HH:mm:ss:SSS")
-
- @Test
- def testDetailedHeaderMatchBody(): Unit = {
- testHeaderMatchContent(detailed = true, 2,
- () => ConsumerPerformance.printConsumerProgress(1, 1024 * 1024, 0, 1, 0,
0, 1, dateFormat, 1L))
- }
-
- @Test
- def testNonDetailedHeaderMatchBody(): Unit = {
- testHeaderMatchContent(detailed = false, 2, () =>
println(s"${dateFormat.format(System.currentTimeMillis)}, " +
- s"${dateFormat.format(System.currentTimeMillis)}, 1.0, 1.0, 1, 1.0, 1,
1, 1.1, 1.1"))
- }
-
- @Test
- def testConfigBrokerList(): Unit = {
- //Given
- val args: Array[String] = Array(
- "--broker-list", "localhost:9092",
- "--topic", "test",
- "--messages", "10"
- )
-
- //When
- val config = new ConsumerPerformance.ConsumerPerfConfig(args)
-
- //Then
- assertEquals("localhost:9092", config.brokerHostsAndPorts)
- assertEquals("test", config.topic)
- assertEquals(10, config.numMessages)
- }
-
- @Test
- def testConfigBootStrapServer(): Unit = {
- //Given
- val args: Array[String] = Array(
- "--bootstrap-server", "localhost:9092",
- "--topic", "test",
- "--messages", "10",
- "--print-metrics"
- )
-
- //When
- val config = new ConsumerPerformance.ConsumerPerfConfig(args)
-
- //Then
- assertEquals("localhost:9092", config.brokerHostsAndPorts)
- assertEquals("test", config.topic)
- assertEquals(10, config.numMessages)
- }
-
- @Test
- def testBrokerListOverride(): Unit = {
- //Given
- val args: Array[String] = Array(
- "--broker-list", "localhost:9094",
- "--bootstrap-server", "localhost:9092",
- "--topic", "test",
- "--messages", "10"
- )
-
- //When
- val config = new ConsumerPerformance.ConsumerPerfConfig(args)
-
- //Then
- assertEquals("localhost:9092", config.brokerHostsAndPorts)
- assertEquals("test", config.topic)
- assertEquals(10, config.numMessages)
- }
-
- @Test
- def testConfigWithUnrecognizedOption(): Unit = {
- Exit.setExitProcedure((_, message) => throw new
IllegalArgumentException(message.orNull))
- //Given
- val args: Array[String] = Array(
- "--broker-list", "localhost:9092",
- "--topic", "test",
- "--messages", "10",
- "--new-consumer"
- )
- try assertThrows(classOf[IllegalArgumentException], () => new
ConsumerPerformance.ConsumerPerfConfig(args))
- finally Exit.resetExitProcedure()
- }
-
- @Test
- def testClientIdOverride(): Unit = {
- val consumerConfigFile = TestUtils.tempFile("test_consumer_config",".conf")
- new PrintWriter(consumerConfigFile.getPath) {
write("client.id=consumer-1"); close() }
-
- //Given
- val args: Array[String] = Array(
- "--broker-list", "localhost:9092",
- "--topic", "test",
- "--messages", "10",
- "--consumer.config", consumerConfigFile.getPath
- )
-
- //When
- val config = new ConsumerPerformance.ConsumerPerfConfig(args)
-
- //Then
- assertEquals("consumer-1",
config.props.getProperty(ConsumerConfig.CLIENT_ID_CONFIG))
- }
-
- @Test
- def testDefaultClientId(): Unit = {
- //Given
- val args: Array[String] = Array(
- "--broker-list", "localhost:9092",
- "--topic", "test",
- "--messages", "10"
- )
-
- //When
- val config = new ConsumerPerformance.ConsumerPerfConfig(args)
-
- //Then
- assertEquals("perf-consumer-client",
config.props.getProperty(ConsumerConfig.CLIENT_ID_CONFIG))
- }
-
- private def testHeaderMatchContent(detailed: Boolean,
expectedOutputLineCount: Int, fun: () => Unit): Unit = {
- val outContent = new ByteArrayOutputStream
- try {
- Console.withOut(outContent) {
- ConsumerPerformance.printHeader(detailed)
- fun()
-
- val contents = outContent.toString.split("\n")
- assertEquals(expectedOutputLineCount, contents.length)
- val header = contents(0)
- val body = contents(1)
-
- assertEquals(header.split(",\\s").length, body.split(",\\s").length)
- }
- } finally {
- outContent.close()
- }
- }
-}
diff --git a/tests/kafkatest/benchmarks/core/benchmark_test.py
b/tests/kafkatest/benchmarks/core/benchmark_test.py
index 959f23a366c..497569650cc 100644
--- a/tests/kafkatest/benchmarks/core/benchmark_test.py
+++ b/tests/kafkatest/benchmarks/core/benchmark_test.py
@@ -202,7 +202,7 @@ class Benchmark(Test):
Return aggregate throughput statistics for both producer and consumer.
- (Under the hood, this runs ProducerPerformance.java, and
ConsumerPerformance.scala)
+ (Under the hood, this runs ProducerPerformance.java, and
ConsumerPerformance.java)
"""
client_version = KafkaVersion(client_version)
broker_version = KafkaVersion(broker_version)
diff --git a/tests/kafkatest/services/performance/consumer_performance.py
b/tests/kafkatest/services/performance/consumer_performance.py
index 6df8dfb6be8..ed7cc99f86a 100644
--- a/tests/kafkatest/services/performance/consumer_performance.py
+++ b/tests/kafkatest/services/performance/consumer_performance.py
@@ -23,7 +23,7 @@ from kafkatest.version import DEV_BRANCH, V_2_0_0,
LATEST_0_10_0
class ConsumerPerformanceService(PerformanceService):
"""
- See ConsumerPerformance.scala as the source of truth on these
settings, but for reference:
+ See ConsumerPerformance tool as the source of truth on these settings,
but for reference:
"zookeeper" "The connection string for the zookeeper connection in the
form host:port. Multiple URLS can
be given to allow fail-over. This option is only used
with the old consumer."
diff --git
a/tools/src/main/java/org/apache/kafka/tools/ConsumerPerformance.java
b/tools/src/main/java/org/apache/kafka/tools/ConsumerPerformance.java
new file mode 100644
index 00000000000..56c51f34a84
--- /dev/null
+++ b/tools/src/main/java/org/apache/kafka/tools/ConsumerPerformance.java
@@ -0,0 +1,407 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.kafka.tools;
+
+import joptsimple.OptionException;
+import joptsimple.OptionSpec;
+import org.apache.kafka.clients.consumer.ConsumerConfig;
+import org.apache.kafka.clients.consumer.ConsumerRebalanceListener;
+import org.apache.kafka.clients.consumer.ConsumerRecord;
+import org.apache.kafka.clients.consumer.ConsumerRecords;
+import org.apache.kafka.clients.consumer.KafkaConsumer;
+import org.apache.kafka.common.Metric;
+import org.apache.kafka.common.MetricName;
+import org.apache.kafka.common.TopicPartition;
+import org.apache.kafka.common.serialization.ByteArrayDeserializer;
+import org.apache.kafka.common.utils.Exit;
+import org.apache.kafka.common.utils.Utils;
+import org.apache.kafka.server.util.CommandDefaultOptions;
+import org.apache.kafka.server.util.CommandLineUtils;
+import org.apache.kafka.server.util.ToolsUtils;
+import org.slf4j.Logger;
+import org.slf4j.LoggerFactory;
+
+import java.io.IOException;
+import java.text.SimpleDateFormat;
+import java.time.Duration;
+import java.util.Collection;
+import java.util.Collections;
+import java.util.Map;
+import java.util.Properties;
+import java.util.Random;
+import java.util.Set;
+import java.util.concurrent.atomic.AtomicLong;
+
+import static joptsimple.util.RegexMatcher.regex;
+
+public class ConsumerPerformance {
+ private static final Logger LOG =
LoggerFactory.getLogger(ConsumerPerformance.class);
+ private static final Random RND = new Random();
+
+ public static void main(String[] args) {
+ try {
+ LOG.info("Starting consumer...");
+ ConsumerPerfOptions options = new ConsumerPerfOptions(args);
+ AtomicLong totalMessagesRead = new AtomicLong(0);
+ AtomicLong totalBytesRead = new AtomicLong(0);
+ AtomicLong joinTimeMs = new AtomicLong(0);
+ AtomicLong joinTimeMsInSingleRound = new AtomicLong(0);
+
+ if (!options.hideHeader())
+ printHeader(options.showDetailedStats());
+
+ KafkaConsumer<byte[], byte[]> consumer = new
KafkaConsumer<>(options.props());
+ long bytesRead = 0L;
+ long messagesRead = 0L;
+ long lastBytesRead = 0L;
+ long lastMessagesRead = 0L;
+ long currentTimeMs = System.currentTimeMillis();
+ long joinStartMs = currentTimeMs;
+ long startMs = currentTimeMs;
+ consume(consumer, options, totalMessagesRead, totalBytesRead,
joinTimeMs,
+ bytesRead, messagesRead, lastBytesRead, lastMessagesRead,
+ joinStartMs, joinTimeMsInSingleRound);
+ long endMs = System.currentTimeMillis();
+
+ Map<MetricName, ? extends Metric> metrics = null;
+ if (options.printMetrics())
+ metrics = consumer.metrics();
+ consumer.close();
+
+ // print final stats
+ double elapsedSec = (endMs - startMs) / 1_000.0;
+ long fetchTimeInMs = (endMs - startMs) - joinTimeMs.get();
+ if (!options.showDetailedStats()) {
+ double totalMbRead = (totalBytesRead.get() * 1.0) / (1024 *
1024);
+ System.out.printf("%s, %s, %.4f, %.4f, %d, %.4f, %d, %d, %.4f,
%.4f%n",
+ options.dateFormat().format(startMs),
+ options.dateFormat().format(endMs),
+ totalMbRead,
+ totalMbRead / elapsedSec,
+ totalMessagesRead.get(),
+ totalMessagesRead.get() / elapsedSec,
+ joinTimeMs.get(),
+ fetchTimeInMs,
+ totalMbRead / (fetchTimeInMs / 1000.0),
+ totalMessagesRead.get() / (fetchTimeInMs / 1000.0)
+ );
+ }
+
+ if (metrics != null)
+ ToolsUtils.printMetrics(metrics);
+ } catch (Throwable e) {
+ System.err.println(e.getMessage());
+ System.err.println(Utils.stackTrace(e));
+ Exit.exit(1);
+ }
+ }
+
+ protected static void printHeader(boolean showDetailedStats) {
+ String newFieldsInHeader = ", rebalance.time.ms, fetch.time.ms,
fetch.MB.sec, fetch.nMsg.sec";
+ if (!showDetailedStats)
+ System.out.printf("start.time, end.time, data.consumed.in.MB,
MB.sec, data.consumed.in.nMsg, nMsg.sec%s%n", newFieldsInHeader);
+ else
+ System.out.printf("time, threadId, data.consumed.in.MB, MB.sec,
data.consumed.in.nMsg, nMsg.sec%s%n", newFieldsInHeader);
+ }
+
+ private static void consume(KafkaConsumer<byte[], byte[]> consumer,
+ ConsumerPerfOptions options,
+ AtomicLong totalMessagesRead,
+ AtomicLong totalBytesRead,
+ AtomicLong joinTimeMs,
+ long bytesRead,
+ long messagesRead,
+ long lastBytesRead,
+ long lastMessagesRead,
+ long joinStartMs,
+ AtomicLong joinTimeMsInSingleRound) {
+ long numMessages = options.numMessages();
+ long recordFetchTimeoutMs = options.recordFetchTimeoutMs();
+ long reportingIntervalMs = options.reportingIntervalMs();
+ boolean showDetailedStats = options.showDetailedStats();
+ SimpleDateFormat dateFormat = options.dateFormat();
+ consumer.subscribe(options.topic(),
+ new ConsumerPerfRebListener(joinTimeMs, joinStartMs,
joinTimeMsInSingleRound));
+
+ // now start the benchmark
+ long currentTimeMs = System.currentTimeMillis();
+ long lastReportTimeMs = currentTimeMs;
+ long lastConsumedTimeMs = currentTimeMs;
+
+ while (messagesRead < numMessages && currentTimeMs -
lastConsumedTimeMs <= recordFetchTimeoutMs) {
+ ConsumerRecords<byte[], byte[]> records =
consumer.poll(Duration.ofMillis(100));
+ currentTimeMs = System.currentTimeMillis();
+ if (!records.isEmpty())
+ lastConsumedTimeMs = currentTimeMs;
+ for (ConsumerRecord<byte[], byte[]> record : records) {
+ messagesRead += 1;
+ if (record.key() != null)
+ bytesRead += record.key().length;
+ if (record.value() != null)
+ bytesRead += record.value().length;
+ if (currentTimeMs - lastReportTimeMs >= reportingIntervalMs) {
+ if (showDetailedStats)
+ printConsumerProgress(0, bytesRead, lastBytesRead,
messagesRead, lastMessagesRead,
+ lastReportTimeMs, currentTimeMs, dateFormat,
joinTimeMsInSingleRound.get());
+ joinTimeMsInSingleRound = new AtomicLong(0);
+ lastReportTimeMs = currentTimeMs;
+ lastMessagesRead = messagesRead;
+ lastBytesRead = bytesRead;
+ }
+ }
+ }
+
+ if (messagesRead < numMessages)
+ System.out.printf("WARNING: Exiting before consuming the expected
number of messages: timeout (%d ms) exceeded. " +
+ "You can use the --timeout option to increase the timeout.%n",
recordFetchTimeoutMs);
+ totalMessagesRead.set(messagesRead);
+ totalBytesRead.set(bytesRead);
+ }
+
+ protected static void printConsumerProgress(int id,
+ long bytesRead,
+ long lastBytesRead,
+ long messagesRead,
+ long lastMessagesRead,
+ long startMs,
+ long endMs,
+ SimpleDateFormat dateFormat,
+ long joinTimeMsInSingleRound) {
+ printBasicProgress(id, bytesRead, lastBytesRead, messagesRead,
lastMessagesRead, startMs, endMs, dateFormat);
+ printExtendedProgress(bytesRead, lastBytesRead, messagesRead,
lastMessagesRead, startMs, endMs, joinTimeMsInSingleRound);
+ System.out.println();
+ }
+
+ private static void printBasicProgress(int id,
+ long bytesRead,
+ long lastBytesRead,
+ long messagesRead,
+ long lastMessagesRead,
+ long startMs,
+ long endMs,
+ SimpleDateFormat dateFormat) {
+ double elapsedMs = endMs - startMs;
+ double totalMbRead = (bytesRead * 1.0) / (1024 * 1024);
+ double intervalMbRead = ((bytesRead - lastBytesRead) * 1.0) / (1024 *
1024);
+ double intervalMbPerSec = 1000.0 * intervalMbRead / elapsedMs;
+ double intervalMessagesPerSec = ((messagesRead - lastMessagesRead) /
elapsedMs) * 1000.0;
+ System.out.printf("%s, %d, %.4f, %.4f, %d, %.4f",
dateFormat.format(endMs), id,
+ totalMbRead, intervalMbPerSec, messagesRead,
intervalMessagesPerSec);
+ }
+
+ private static void printExtendedProgress(long bytesRead,
+ long lastBytesRead,
+ long messagesRead,
+ long lastMessagesRead,
+ long startMs,
+ long endMs,
+ long joinTimeMsInSingleRound) {
+ long fetchTimeMs = endMs - startMs - joinTimeMsInSingleRound;
+ double intervalMbRead = ((bytesRead - lastBytesRead) * 1.0) / (1024 *
1024);
+ long intervalMessagesRead = messagesRead - lastMessagesRead;
+ double intervalMbPerSec = (fetchTimeMs <= 0) ? 0.0 : 1000.0 *
intervalMbRead / fetchTimeMs;
+ double intervalMessagesPerSec = (fetchTimeMs <= 0) ? 0.0 : 1000.0 *
intervalMessagesRead / fetchTimeMs;
+ System.out.printf(", %d, %d, %.4f, %.4f", joinTimeMsInSingleRound,
+ fetchTimeMs, intervalMbPerSec, intervalMessagesPerSec);
+ }
+
+ public static class ConsumerPerfRebListener implements
ConsumerRebalanceListener {
+ private AtomicLong joinTimeMs;
+ private AtomicLong joinTimeMsInSingleRound;
+ private long joinStartMs;
+
+ public ConsumerPerfRebListener(AtomicLong joinTimeMs, long
joinStartMs, AtomicLong joinTimeMsInSingleRound) {
+ this.joinTimeMs = joinTimeMs;
+ this.joinStartMs = joinStartMs;
+ this.joinTimeMsInSingleRound = joinTimeMsInSingleRound;
+ }
+
+ @Override
+ public void onPartitionsRevoked(Collection<TopicPartition> partitions)
{
+ joinStartMs = System.currentTimeMillis();
+ }
+
+ @Override
+ public void onPartitionsAssigned(Collection<TopicPartition>
partitions) {
+ long elapsedMs = System.currentTimeMillis() - joinStartMs;
+ joinTimeMs.addAndGet(elapsedMs);
+ joinTimeMsInSingleRound.addAndGet(elapsedMs);
+ }
+ }
+
+ protected static class ConsumerPerfOptions extends CommandDefaultOptions {
+ private final OptionSpec<String> brokerListOpt;
+ private final OptionSpec<String> bootstrapServerOpt;
+ private final OptionSpec<String> topicOpt;
+ private final OptionSpec<String> groupIdOpt;
+ private final OptionSpec<Integer> fetchSizeOpt;
+ private final OptionSpec<Void> resetBeginningOffsetOpt;
+ private final OptionSpec<Integer> socketBufferSizeOpt;
+ private final OptionSpec<Integer> numThreadsOpt;
+ private final OptionSpec<Integer> numFetchersOpt;
+ private final OptionSpec<String> consumerConfigOpt;
+ private final OptionSpec<Void> printMetricsOpt;
+ private final OptionSpec<Void> showDetailedStatsOpt;
+ private final OptionSpec<Long> recordFetchTimeoutOpt;
+ private final OptionSpec<Long> numMessagesOpt;
+ private final OptionSpec<Long> reportingIntervalOpt;
+ private final OptionSpec<String> dateFormatOpt;
+ private final OptionSpec<Void> hideHeaderOpt;
+
+ public ConsumerPerfOptions(String[] args) {
+ super(args);
+ brokerListOpt = parser.accepts("broker-list", "DEPRECATED, use
--bootstrap-server instead; ignored if --bootstrap-server is specified. The
broker list string in the form HOST1:PORT1,HOST2:PORT2.")
+ .withRequiredArg()
+ .describedAs("broker-list")
+ .ofType(String.class);
+ bootstrapServerOpt = parser.accepts("bootstrap-server", "REQUIRED
unless --broker-list(deprecated) is specified. The server(s) to connect to.")
+ .requiredUnless("broker-list")
+ .withRequiredArg()
+ .describedAs("server to connect to")
+ .ofType(String.class);
+ topicOpt = parser.accepts("topic", "REQUIRED: The topic to consume
from.")
+ .withRequiredArg()
+ .describedAs("topic")
+ .ofType(String.class);
+ groupIdOpt = parser.accepts("group", "The group id to consume on.")
+ .withRequiredArg()
+ .describedAs("gid")
+ .defaultsTo("perf-consumer-" + RND.nextInt(100_000))
+ .ofType(String.class);
+ fetchSizeOpt = parser.accepts("fetch-size", "The amount of data to
fetch in a single request.")
+ .withRequiredArg()
+ .describedAs("size")
+ .ofType(Integer.class)
+ .defaultsTo(1024 * 1024);
+ resetBeginningOffsetOpt = parser.accepts("from-latest", "If the
consumer does not already have an established " +
+ "offset to consume from, start with the latest message present
in the log rather than the earliest message.");
+ socketBufferSizeOpt = parser.accepts("socket-buffer-size", "The
size of the tcp RECV size.")
+ .withRequiredArg()
+ .describedAs("size")
+ .ofType(Integer.class)
+ .defaultsTo(2 * 1024 * 1024);
+ numThreadsOpt = parser.accepts("threads", "DEPRECATED AND IGNORED:
Number of processing threads.")
+ .withRequiredArg()
+ .describedAs("count")
+ .ofType(Integer.class)
+ .defaultsTo(10);
+ numFetchersOpt = parser.accepts("num-fetch-threads", "DEPRECATED
AND IGNORED: Number of fetcher threads.")
+ .withRequiredArg()
+ .describedAs("count")
+ .ofType(Integer.class)
+ .defaultsTo(1);
+ consumerConfigOpt = parser.accepts("consumer.config", "Consumer
config properties file.")
+ .withRequiredArg()
+ .describedAs("config file")
+ .ofType(String.class);
+ printMetricsOpt = parser.accepts("print-metrics", "Print out the
metrics.");
+ showDetailedStatsOpt = parser.accepts("show-detailed-stats", "If
set, stats are reported for each reporting " +
+ "interval as configured by reporting-interval");
+ recordFetchTimeoutOpt = parser.accepts("timeout", "The maximum
allowed time in milliseconds between returned records.")
+ .withOptionalArg()
+ .describedAs("milliseconds")
+ .ofType(Long.class)
+ .defaultsTo(10_000L);
+ numMessagesOpt = parser.accepts("messages", "REQUIRED: The number
of messages to send or consume")
+ .withRequiredArg()
+ .describedAs("count")
+ .ofType(Long.class);
+ reportingIntervalOpt = parser.accepts("reporting-interval",
"Interval in milliseconds at which to print progress info.")
+ .withRequiredArg()
+ .withValuesConvertedBy(regex("^\\d+$"))
+ .describedAs("interval_ms")
+ .ofType(Long.class)
+ .defaultsTo(5_000L);
+ dateFormatOpt = parser.accepts("date-format", "The date format to
use for formatting the time field. " +
+ "See java.text.SimpleDateFormat for options.")
+ .withRequiredArg()
+ .describedAs("date format")
+ .ofType(String.class)
+ .defaultsTo("yyyy-MM-dd HH:mm:ss:SSS");
+ hideHeaderOpt = parser.accepts("hide-header", "If set, skips
printing the header for the stats");
+ try {
+ options = parser.parse(args);
+ } catch (OptionException e) {
+ CommandLineUtils.printUsageAndExit(parser, e.getMessage());
+ return;
+ }
+ if (options != null) {
+ if (options.has(numThreadsOpt) || options.has(numFetchersOpt))
+ System.out.println("WARNING: option [threads] and
[num-fetch-threads] have been deprecated and will be ignored by the test");
+ CommandLineUtils.maybePrintHelpOrVersion(this, "This tool is
used to verify the consumer performance.");
+ CommandLineUtils.checkRequiredArgs(parser, options, topicOpt,
numMessagesOpt);
+ }
+ }
+
+ public boolean printMetrics() {
+ return options.has(printMetricsOpt);
+ }
+
+ public String brokerHostsAndPorts() {
+ return options.valueOf(options.has(bootstrapServerOpt) ?
bootstrapServerOpt : brokerListOpt);
+ }
+
+ public Properties props() throws IOException {
+ Properties props = (options.has(consumerConfigOpt))
+ ? Utils.loadProps(options.valueOf(consumerConfigOpt))
+ : new Properties();
+ props.put(ConsumerConfig.BOOTSTRAP_SERVERS_CONFIG,
brokerHostsAndPorts());
+ props.put(ConsumerConfig.GROUP_ID_CONFIG,
options.valueOf(groupIdOpt));
+ props.put(ConsumerConfig.RECEIVE_BUFFER_CONFIG,
options.valueOf(socketBufferSizeOpt).toString());
+ props.put(ConsumerConfig.MAX_PARTITION_FETCH_BYTES_CONFIG,
options.valueOf(fetchSizeOpt).toString());
+ props.put(ConsumerConfig.AUTO_OFFSET_RESET_CONFIG,
+ options.has(resetBeginningOffsetOpt) ? "latest" : "earliest");
+ props.put(ConsumerConfig.KEY_DESERIALIZER_CLASS_CONFIG,
ByteArrayDeserializer.class);
+ props.put(ConsumerConfig.VALUE_DESERIALIZER_CLASS_CONFIG,
ByteArrayDeserializer.class);
+ props.put(ConsumerConfig.CHECK_CRCS_CONFIG, "false");
+ if (props.getProperty(ConsumerConfig.CLIENT_ID_CONFIG) == null)
+ props.put(ConsumerConfig.CLIENT_ID_CONFIG,
"perf-consumer-client");
+ return props;
+ }
+
+ public Set<String> topic() {
+ return Collections.singleton(options.valueOf(topicOpt));
+ }
+
+ public long numMessages() {
+ return options.valueOf(numMessagesOpt);
+ }
+
+ public long reportingIntervalMs() {
+ long value = options.valueOf(reportingIntervalOpt);
+ if (value <= 0)
+ throw new IllegalArgumentException("Reporting interval must be
greater than 0.");
+ return value;
+ }
+
+ public boolean showDetailedStats() {
+ return options.has(showDetailedStatsOpt);
+ }
+
+ public SimpleDateFormat dateFormat() {
+ return new SimpleDateFormat(options.valueOf(dateFormatOpt));
+ }
+
+ public boolean hideHeader() {
+ return options.has(hideHeaderOpt);
+ }
+
+ public long recordFetchTimeoutMs() {
+ return options.valueOf(recordFetchTimeoutOpt);
+ }
+ }
+}
diff --git
a/tools/src/test/java/org/apache/kafka/tools/ConsumerPerformanceTest.java
b/tools/src/test/java/org/apache/kafka/tools/ConsumerPerformanceTest.java
new file mode 100644
index 00000000000..65a533063aa
--- /dev/null
+++ b/tools/src/test/java/org/apache/kafka/tools/ConsumerPerformanceTest.java
@@ -0,0 +1,172 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.kafka.tools;
+
+import org.apache.kafka.clients.consumer.ConsumerConfig;
+import org.apache.kafka.common.utils.Exit;
+import org.junit.jupiter.api.AfterEach;
+import org.junit.jupiter.api.BeforeEach;
+import org.junit.jupiter.api.Test;
+import org.junit.jupiter.api.io.TempDir;
+
+import java.io.File;
+import java.io.IOException;
+import java.io.PrintWriter;
+import java.nio.file.Files;
+import java.nio.file.Path;
+import java.text.SimpleDateFormat;
+
+import static org.junit.jupiter.api.Assertions.assertEquals;
+import static org.junit.jupiter.api.Assertions.assertTrue;
+
+public class ConsumerPerformanceTest {
+ private final ToolsTestUtils.MockExitProcedure exitProcedure = new
ToolsTestUtils.MockExitProcedure();
+ private final SimpleDateFormat dateFormat = new
SimpleDateFormat("yyyy-MM-dd HH:mm:ss:SSS");
+
+ @TempDir
+ static Path tempDir;
+
+ @BeforeEach
+ public void beforeEach() {
+ Exit.setExitProcedure(exitProcedure);
+ }
+
+ @AfterEach
+ public void afterEach() {
+ Exit.resetExitProcedure();
+ }
+
+ @Test
+ public void testDetailedHeaderMatchBody() {
+ testHeaderMatchContent(true, 2,
+ () -> ConsumerPerformance.printConsumerProgress(1, 1024 * 1024, 0,
1, 0, 0, 1, dateFormat, 1L));
+ }
+
+ @Test
+ public void testNonDetailedHeaderMatchBody() {
+ testHeaderMatchContent(false, 2,
+ () -> ConsumerPerformance.printConsumerProgress(1, 1024 * 1024, 0,
1, 0, 0, 1, dateFormat, 1L));
+ }
+
+ @Test
+ public void testConfigBrokerList() {
+ String[] args = new String[]{
+ "--broker-list", "localhost:9092",
+ "--topic", "test",
+ "--messages", "10"
+ };
+
+ ConsumerPerformance.ConsumerPerfOptions config = new
ConsumerPerformance.ConsumerPerfOptions(args);
+
+ assertEquals("localhost:9092", config.brokerHostsAndPorts());
+ assertTrue(config.topic().contains("test"));
+ assertEquals(10, config.numMessages());
+ }
+
+ @Test
+ public void testConfigBootStrapServer() {
+ String[] args = new String[]{
+ "--bootstrap-server", "localhost:9092",
+ "--topic", "test",
+ "--messages", "10",
+ "--print-metrics"
+ };
+
+ ConsumerPerformance.ConsumerPerfOptions config = new
ConsumerPerformance.ConsumerPerfOptions(args);
+
+ assertEquals("localhost:9092", config.brokerHostsAndPorts());
+ assertTrue(config.topic().contains("test"));
+ assertEquals(10, config.numMessages());
+ }
+
+ @Test
+ public void testBrokerListOverride() {
+ String[] args = new String[]{
+ "--broker-list", "localhost:9094",
+ "--bootstrap-server", "localhost:9092",
+ "--topic", "test",
+ "--messages", "10"
+ };
+
+ ConsumerPerformance.ConsumerPerfOptions config = new
ConsumerPerformance.ConsumerPerfOptions(args);
+
+ assertEquals("localhost:9092", config.brokerHostsAndPorts());
+ assertTrue(config.topic().contains("test"));
+ assertEquals(10, config.numMessages());
+ }
+
+ @Test
+ public void testConfigWithUnrecognizedOption() {
+ String[] args = new String[]{
+ "--broker-list", "localhost:9092",
+ "--topic", "test",
+ "--messages", "10",
+ "--new-consumer"
+ };
+
+ String err = ToolsTestUtils.captureStandardErr(() -> new
ConsumerPerformance.ConsumerPerfOptions(args));
+
+ assertTrue(err.contains("new-consumer is not a recognized option"));
+ }
+
+ @Test
+ public void testClientIdOverride() throws IOException {
+ File tempFile =
Files.createFile(tempDir.resolve("test_consumer_config.conf")).toFile();
+ try (PrintWriter output = new
PrintWriter(Files.newOutputStream(tempFile.toPath()))) {
+ output.println("client.id=consumer-1");
+ output.flush();
+ }
+
+ String[] args = new String[]{
+ "--broker-list", "localhost:9092",
+ "--topic", "test",
+ "--messages", "10",
+ "--consumer.config", tempFile.getAbsolutePath()
+ };
+
+ ConsumerPerformance.ConsumerPerfOptions config = new
ConsumerPerformance.ConsumerPerfOptions(args);
+
+ assertEquals("consumer-1",
config.props().getProperty(ConsumerConfig.CLIENT_ID_CONFIG));
+ }
+
+ @Test
+ public void testDefaultClientId() throws IOException {
+ String[] args = new String[]{
+ "--broker-list", "localhost:9092",
+ "--topic", "test",
+ "--messages", "10"
+ };
+
+ ConsumerPerformance.ConsumerPerfOptions config = new
ConsumerPerformance.ConsumerPerfOptions(args);
+
+ assertEquals("perf-consumer-client",
config.props().getProperty(ConsumerConfig.CLIENT_ID_CONFIG));
+ }
+
+ private void testHeaderMatchContent(boolean detailed, int
expectedOutputLineCount, Runnable runnable) {
+ String out = ToolsTestUtils.captureStandardOut(() -> {
+ ConsumerPerformance.printHeader(detailed);
+ runnable.run();
+ });
+
+ String[] contents = out.split("\n");
+ assertEquals(expectedOutputLineCount, contents.length);
+ String header = contents[0];
+ String body = contents[1];
+
+ assertEquals(header.split(",\\s").length, body.split(",\\s").length);
+ }
+}