This is an automated email from the ASF dual-hosted git repository.

mimaison pushed a commit to branch trunk
in repository https://gitbox.apache.org/repos/asf/kafka.git


The following commit(s) were added to refs/heads/trunk by this push:
     new 07e2f6cd4d0 KAFKA-14578: Move ConsumerPerformance to tools (#13215)
07e2f6cd4d0 is described below

commit 07e2f6cd4d0ba703db510ba72354f8e500f72536
Author: Federico Valeri <[email protected]>
AuthorDate: Mon Mar 6 18:16:55 2023 +0100

    KAFKA-14578: Move ConsumerPerformance to tools (#13215)
    
    
    Reviewers: Mickael Maison <[email protected]>, Alexandre Dupriez 
<[email protected]>
---
 bin/kafka-consumer-perf-test.sh                    |   2 +-
 bin/windows/kafka-consumer-perf-test.bat           |   2 +-
 .../scala/kafka/tools/ConsumerPerformance.scala    | 306 ----------------
 core/src/main/scala/kafka/tools/PerfConfig.scala   |  39 --
 .../unit/kafka/tools/ConsumerPerformanceTest.scala | 166 ---------
 tests/kafkatest/benchmarks/core/benchmark_test.py  |   2 +-
 .../services/performance/consumer_performance.py   |   2 +-
 .../apache/kafka/tools/ConsumerPerformance.java    | 407 +++++++++++++++++++++
 .../kafka/tools/ConsumerPerformanceTest.java       | 172 +++++++++
 9 files changed, 583 insertions(+), 515 deletions(-)

diff --git a/bin/kafka-consumer-perf-test.sh b/bin/kafka-consumer-perf-test.sh
index 77cda721d6c..4eebe87a5fb 100755
--- a/bin/kafka-consumer-perf-test.sh
+++ b/bin/kafka-consumer-perf-test.sh
@@ -17,4 +17,4 @@
 if [ "x$KAFKA_HEAP_OPTS" = "x" ]; then
     export KAFKA_HEAP_OPTS="-Xmx512M"
 fi
-exec $(dirname $0)/kafka-run-class.sh kafka.tools.ConsumerPerformance "$@"
+exec $(dirname $0)/kafka-run-class.sh 
org.apache.kafka.tools.ConsumerPerformance "$@"
diff --git a/bin/windows/kafka-consumer-perf-test.bat 
b/bin/windows/kafka-consumer-perf-test.bat
index 606c784605a..17e17d3b105 100644
--- a/bin/windows/kafka-consumer-perf-test.bat
+++ b/bin/windows/kafka-consumer-perf-test.bat
@@ -16,5 +16,5 @@ rem limitations under the License.
 
 SetLocal
 set KAFKA_HEAP_OPTS=-Xmx512M -Xms512M
-"%~dp0kafka-run-class.bat" kafka.tools.ConsumerPerformance %*
+"%~dp0kafka-run-class.bat" org.apache.kafka.tools.ConsumerPerformance %*
 EndLocal
diff --git a/core/src/main/scala/kafka/tools/ConsumerPerformance.scala 
b/core/src/main/scala/kafka/tools/ConsumerPerformance.scala
deleted file mode 100644
index 56f49456705..00000000000
--- a/core/src/main/scala/kafka/tools/ConsumerPerformance.scala
+++ /dev/null
@@ -1,306 +0,0 @@
-/**
- * Licensed to the Apache Software Foundation (ASF) under one or more
- * contributor license agreements.  See the NOTICE file distributed with
- * this work for additional information regarding copyright ownership.
- * The ASF licenses this file to You under the Apache License, Version 2.0
- * (the "License"); you may not use this file except in compliance with
- * the License.  You may obtain a copy of the License at
- *
- *    http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
- */
-
-package kafka.tools
-
-import java.text.SimpleDateFormat
-import java.time.Duration
-import java.util
-import java.util.concurrent.atomic.AtomicLong
-import java.util.{Properties, Random}
-import com.typesafe.scalalogging.LazyLogging
-import joptsimple.OptionException
-import kafka.utils.ToolsUtils
-import org.apache.kafka.clients.consumer.{ConsumerRebalanceListener, 
KafkaConsumer}
-import org.apache.kafka.common.serialization.ByteArrayDeserializer
-import org.apache.kafka.common.utils.Utils
-import org.apache.kafka.common.{Metric, MetricName, TopicPartition}
-import org.apache.kafka.server.util.CommandLineUtils
-
-import scala.jdk.CollectionConverters._
-import scala.collection.mutable
-
-/**
- * Performance test for the full zookeeper consumer
- */
-object ConsumerPerformance extends LazyLogging {
-
-  def main(args: Array[String]): Unit = {
-
-    val config = new ConsumerPerfConfig(args)
-    logger.info("Starting consumer...")
-    val totalMessagesRead = new AtomicLong(0)
-    val totalBytesRead = new AtomicLong(0)
-    var metrics: mutable.Map[MetricName, _ <: Metric] = null
-    val joinGroupTimeInMs = new AtomicLong(0)
-
-    if (!config.hideHeader)
-      printHeader(config.showDetailedStats)
-
-    var startMs, endMs = 0L
-    val consumer = new KafkaConsumer[Array[Byte], Array[Byte]](config.props)
-    startMs = System.currentTimeMillis
-    consume(consumer, List(config.topic), config.numMessages, 
config.recordFetchTimeoutMs, config, totalMessagesRead, totalBytesRead, 
joinGroupTimeInMs, startMs)
-    endMs = System.currentTimeMillis
-
-    if (config.printMetrics) {
-      metrics = consumer.metrics.asScala
-    }
-    consumer.close()
-    val elapsedSecs = (endMs - startMs) / 1000.0
-    val fetchTimeInMs = (endMs - startMs) - joinGroupTimeInMs.get
-    if (!config.showDetailedStats) {
-      val totalMBRead = (totalBytesRead.get * 1.0) / (1024 * 1024)
-      println("%s, %s, %.4f, %.4f, %d, %.4f, %d, %d, %.4f, %.4f".format(
-        config.dateFormat.format(startMs),
-        config.dateFormat.format(endMs),
-        totalMBRead,
-        totalMBRead / elapsedSecs,
-        totalMessagesRead.get,
-        totalMessagesRead.get / elapsedSecs,
-        joinGroupTimeInMs.get,
-        fetchTimeInMs,
-        totalMBRead / (fetchTimeInMs / 1000.0),
-        totalMessagesRead.get / (fetchTimeInMs / 1000.0)
-      ))
-    }
-
-    if (metrics != null) {
-      ToolsUtils.printMetrics(metrics)
-    }
-
-  }
-
-  private[tools] def printHeader(showDetailedStats: Boolean): Unit = {
-    val newFieldsInHeader = ", rebalance.time.ms, fetch.time.ms, fetch.MB.sec, 
fetch.nMsg.sec"
-    if (!showDetailedStats)
-      println("start.time, end.time, data.consumed.in.MB, MB.sec, 
data.consumed.in.nMsg, nMsg.sec" + newFieldsInHeader)
-    else
-      println("time, threadId, data.consumed.in.MB, MB.sec, 
data.consumed.in.nMsg, nMsg.sec" + newFieldsInHeader)
-  }
-
-  def consume(consumer: KafkaConsumer[Array[Byte], Array[Byte]],
-              topics: List[String],
-              count: Long,
-              timeout: Long,
-              config: ConsumerPerfConfig,
-              totalMessagesRead: AtomicLong,
-              totalBytesRead: AtomicLong,
-              joinTime: AtomicLong,
-              testStartTime: Long): Unit = {
-    var bytesRead = 0L
-    var messagesRead = 0L
-    var lastBytesRead = 0L
-    var lastMessagesRead = 0L
-    var joinStart = System.currentTimeMillis
-    var joinTimeMsInSingleRound = 0L
-
-    consumer.subscribe(topics.asJava, new ConsumerRebalanceListener {
-      def onPartitionsAssigned(partitions: util.Collection[TopicPartition]): 
Unit = {
-        joinTime.addAndGet(System.currentTimeMillis - joinStart)
-        joinTimeMsInSingleRound += System.currentTimeMillis - joinStart
-      }
-      def onPartitionsRevoked(partitions: util.Collection[TopicPartition]): 
Unit = {
-        joinStart = System.currentTimeMillis
-      }})
-
-    // Now start the benchmark
-    var currentTimeMillis = System.currentTimeMillis
-    var lastReportTime: Long = currentTimeMillis
-    var lastConsumedTime = currentTimeMillis
-
-    while (messagesRead < count && currentTimeMillis - lastConsumedTime <= 
timeout) {
-      val records = consumer.poll(Duration.ofMillis(100)).asScala
-      currentTimeMillis = System.currentTimeMillis
-      if (records.nonEmpty)
-        lastConsumedTime = currentTimeMillis
-      for (record <- records) {
-        messagesRead += 1
-        if (record.key != null)
-          bytesRead += record.key.size
-        if (record.value != null)
-          bytesRead += record.value.size
-
-        if (currentTimeMillis - lastReportTime >= config.reportingInterval) {
-          if (config.showDetailedStats)
-            printConsumerProgress(0, bytesRead, lastBytesRead, messagesRead, 
lastMessagesRead,
-              lastReportTime, currentTimeMillis, config.dateFormat, 
joinTimeMsInSingleRound)
-          joinTimeMsInSingleRound = 0L
-          lastReportTime = currentTimeMillis
-          lastMessagesRead = messagesRead
-          lastBytesRead = bytesRead
-        }
-      }
-    }
-
-    if (messagesRead < count)
-      println(s"WARNING: Exiting before consuming the expected number of 
messages: timeout ($timeout ms) exceeded. " +
-        "You can use the --timeout option to increase the timeout.")
-    totalMessagesRead.set(messagesRead)
-    totalBytesRead.set(bytesRead)
-  }
-
-  def printConsumerProgress(id: Int,
-                               bytesRead: Long,
-                               lastBytesRead: Long,
-                               messagesRead: Long,
-                               lastMessagesRead: Long,
-                               startMs: Long,
-                               endMs: Long,
-                               dateFormat: SimpleDateFormat,
-                               periodicJoinTimeInMs: Long): Unit = {
-    printBasicProgress(id, bytesRead, lastBytesRead, messagesRead, 
lastMessagesRead, startMs, endMs, dateFormat)
-    printExtendedProgress(bytesRead, lastBytesRead, messagesRead, 
lastMessagesRead, startMs, endMs, periodicJoinTimeInMs)
-    println()
-  }
-
-  private def printBasicProgress(id: Int,
-                                 bytesRead: Long,
-                                 lastBytesRead: Long,
-                                 messagesRead: Long,
-                                 lastMessagesRead: Long,
-                                 startMs: Long,
-                                 endMs: Long,
-                                 dateFormat: SimpleDateFormat): Unit = {
-    val elapsedMs: Double = (endMs - startMs).toDouble
-    val totalMbRead = (bytesRead * 1.0) / (1024 * 1024)
-    val intervalMbRead = ((bytesRead - lastBytesRead) * 1.0) / (1024 * 1024)
-    val intervalMbPerSec = 1000.0 * intervalMbRead / elapsedMs
-    val intervalMessagesPerSec = ((messagesRead - lastMessagesRead) / 
elapsedMs) * 1000.0
-    print("%s, %d, %.4f, %.4f, %d, %.4f".format(dateFormat.format(endMs), id, 
totalMbRead,
-      intervalMbPerSec, messagesRead, intervalMessagesPerSec))
-  }
-
-  private def printExtendedProgress(bytesRead: Long,
-                                    lastBytesRead: Long,
-                                    messagesRead: Long,
-                                    lastMessagesRead: Long,
-                                    startMs: Long,
-                                    endMs: Long,
-                                    periodicJoinTimeInMs: Long): Unit = {
-    val fetchTimeMs = endMs - startMs - periodicJoinTimeInMs
-    val intervalMbRead = ((bytesRead - lastBytesRead) * 1.0) / (1024 * 1024)
-    val intervalMessagesRead = messagesRead - lastMessagesRead
-    val (intervalMbPerSec, intervalMessagesPerSec) = if (fetchTimeMs <= 0)
-      (0.0, 0.0)
-    else
-      (1000.0 * intervalMbRead / fetchTimeMs, 1000.0 * intervalMessagesRead / 
fetchTimeMs)
-    print(", %d, %d, %.4f, %.4f".format(periodicJoinTimeInMs, fetchTimeMs, 
intervalMbPerSec, intervalMessagesPerSec))
-  }
-
-  class ConsumerPerfConfig(args: Array[String]) extends PerfConfig(args) {
-    val brokerListOpt = parser.accepts("broker-list", "DEPRECATED, use 
--bootstrap-server instead; ignored if --bootstrap-server is specified.  The 
broker list string in the form HOST1:PORT1,HOST2:PORT2.")
-      .withRequiredArg
-      .describedAs("broker-list")
-      .ofType(classOf[String])
-    val bootstrapServerOpt = parser.accepts("bootstrap-server", "REQUIRED 
unless --broker-list(deprecated) is specified. The server(s) to connect to.")
-      .requiredUnless("broker-list")
-      .withRequiredArg
-      .describedAs("server to connect to")
-      .ofType(classOf[String])
-    val topicOpt = parser.accepts("topic", "REQUIRED: The topic to consume 
from.")
-      .withRequiredArg
-      .describedAs("topic")
-      .ofType(classOf[String])
-    val groupIdOpt = parser.accepts("group", "The group id to consume on.")
-      .withRequiredArg
-      .describedAs("gid")
-      .defaultsTo("perf-consumer-" + new Random().nextInt(100000))
-      .ofType(classOf[String])
-    val fetchSizeOpt = parser.accepts("fetch-size", "The amount of data to 
fetch in a single request.")
-      .withRequiredArg
-      .describedAs("size")
-      .ofType(classOf[java.lang.Integer])
-      .defaultsTo(1024 * 1024)
-    val resetBeginningOffsetOpt = parser.accepts("from-latest", "If the 
consumer does not already have an established " +
-      "offset to consume from, start with the latest message present in the 
log rather than the earliest message.")
-    val socketBufferSizeOpt = parser.accepts("socket-buffer-size", "The size 
of the tcp RECV size.")
-      .withRequiredArg
-      .describedAs("size")
-      .ofType(classOf[java.lang.Integer])
-      .defaultsTo(2 * 1024 * 1024)
-    val numThreadsOpt = parser.accepts("threads", "DEPRECATED AND IGNORED: 
Number of processing threads.")
-      .withRequiredArg
-      .describedAs("count")
-      .ofType(classOf[java.lang.Integer])
-      .defaultsTo(10)
-    val numFetchersOpt = parser.accepts("num-fetch-threads", "DEPRECATED AND 
IGNORED: Number of fetcher threads.")
-      .withRequiredArg
-      .describedAs("count")
-      .ofType(classOf[java.lang.Integer])
-      .defaultsTo(1)
-    val consumerConfigOpt = parser.accepts("consumer.config", "Consumer config 
properties file.")
-      .withRequiredArg
-      .describedAs("config file")
-      .ofType(classOf[String])
-    val printMetricsOpt = parser.accepts("print-metrics", "Print out the 
metrics.")
-    val showDetailedStatsOpt = parser.accepts("show-detailed-stats", "If set, 
stats are reported for each reporting " +
-      "interval as configured by reporting-interval")
-    val recordFetchTimeoutOpt = parser.accepts("timeout", "The maximum allowed 
time in milliseconds between returned records.")
-      .withOptionalArg()
-      .describedAs("milliseconds")
-      .ofType(classOf[Long])
-      .defaultsTo(10000)
-
-    try
-      options = parser.parse(args: _*)
-    catch {
-      case e: OptionException =>
-        CommandLineUtils.printUsageAndExit(parser, e.getMessage)
-    }
-
-    if(options.has(numThreadsOpt) || options.has(numFetchersOpt))
-      println("WARNING: option [threads] and [num-fetch-threads] have been 
deprecated and will be ignored by the test")
-
-    CommandLineUtils.maybePrintHelpOrVersion(this, "This tool helps in 
performance test for the full zookeeper consumer")
-
-    CommandLineUtils.checkRequiredArgs(parser, options, topicOpt, 
numMessagesOpt)
-
-    val printMetrics = options.has(printMetricsOpt)
-
-    val props = if (options.has(consumerConfigOpt))
-      Utils.loadProps(options.valueOf(consumerConfigOpt))
-    else
-      new Properties
-
-    import org.apache.kafka.clients.consumer.ConsumerConfig
-
-    val brokerHostsAndPorts = options.valueOf(if 
(options.has(bootstrapServerOpt)) bootstrapServerOpt else brokerListOpt)
-    props.put(ConsumerConfig.BOOTSTRAP_SERVERS_CONFIG, brokerHostsAndPorts)
-    props.put(ConsumerConfig.GROUP_ID_CONFIG, options.valueOf(groupIdOpt))
-    props.put(ConsumerConfig.RECEIVE_BUFFER_CONFIG, 
options.valueOf(socketBufferSizeOpt).toString)
-    props.put(ConsumerConfig.MAX_PARTITION_FETCH_BYTES_CONFIG, 
options.valueOf(fetchSizeOpt).toString)
-    props.put(ConsumerConfig.AUTO_OFFSET_RESET_CONFIG, if 
(options.has(resetBeginningOffsetOpt)) "latest" else "earliest")
-    props.put(ConsumerConfig.KEY_DESERIALIZER_CLASS_CONFIG, 
classOf[ByteArrayDeserializer])
-    props.put(ConsumerConfig.VALUE_DESERIALIZER_CLASS_CONFIG, 
classOf[ByteArrayDeserializer])
-    props.put(ConsumerConfig.CHECK_CRCS_CONFIG, "false")
-    if (props.getProperty(ConsumerConfig.CLIENT_ID_CONFIG) == null)
-      props.put(ConsumerConfig.CLIENT_ID_CONFIG, "perf-consumer-client")
-
-    val numThreads = options.valueOf(numThreadsOpt).intValue
-    val topic = options.valueOf(topicOpt)
-    val numMessages = options.valueOf(numMessagesOpt).longValue
-    val reportingInterval = options.valueOf(reportingIntervalOpt).intValue
-    if (reportingInterval <= 0)
-      throw new IllegalArgumentException("Reporting interval must be greater 
than 0.")
-    val showDetailedStats = options.has(showDetailedStatsOpt)
-    val dateFormat = new SimpleDateFormat(options.valueOf(dateFormatOpt))
-    val hideHeader = options.has(hideHeaderOpt)
-    val recordFetchTimeoutMs = 
options.valueOf(recordFetchTimeoutOpt).longValue()
-  }
-
-}
diff --git a/core/src/main/scala/kafka/tools/PerfConfig.scala 
b/core/src/main/scala/kafka/tools/PerfConfig.scala
deleted file mode 100644
index 6857e401e80..00000000000
--- a/core/src/main/scala/kafka/tools/PerfConfig.scala
+++ /dev/null
@@ -1,39 +0,0 @@
-/**
- * Licensed to the Apache Software Foundation (ASF) under one or more
- * contributor license agreements.  See the NOTICE file distributed with
- * this work for additional information regarding copyright ownership.
- * The ASF licenses this file to You under the Apache License, Version 2.0
- * (the "License"); you may not use this file except in compliance with
- * the License.  You may obtain a copy of the License at
- *
- *    http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
-*/
-
-package kafka.tools
-
-import org.apache.kafka.server.util.CommandDefaultOptions
-
-class PerfConfig(args: Array[String]) extends CommandDefaultOptions(args) {
-  val numMessagesOpt = parser.accepts("messages", "REQUIRED: The number of 
messages to send or consume")
-    .withRequiredArg
-    .describedAs("count")
-    .ofType(classOf[java.lang.Long])
-  val reportingIntervalOpt = parser.accepts("reporting-interval", "Interval in 
milliseconds at which to print progress info.")
-    .withRequiredArg
-    .describedAs("interval_ms")
-    .ofType(classOf[java.lang.Integer])
-    .defaultsTo(5000)
-  val dateFormatOpt = parser.accepts("date-format", "The date format to use 
for formatting the time field. " +
-    "See java.text.SimpleDateFormat for options.")
-    .withRequiredArg
-    .describedAs("date format")
-    .ofType(classOf[String])
-    .defaultsTo("yyyy-MM-dd HH:mm:ss:SSS")
-  val hideHeaderOpt = parser.accepts("hide-header", "If set, skips printing 
the header for the stats ")
-}
diff --git a/core/src/test/scala/unit/kafka/tools/ConsumerPerformanceTest.scala 
b/core/src/test/scala/unit/kafka/tools/ConsumerPerformanceTest.scala
deleted file mode 100644
index d872a40bc1d..00000000000
--- a/core/src/test/scala/unit/kafka/tools/ConsumerPerformanceTest.scala
+++ /dev/null
@@ -1,166 +0,0 @@
-/**
- * Licensed to the Apache Software Foundation (ASF) under one or more
- * contributor license agreements.  See the NOTICE file distributed with
- * this work for additional information regarding copyright ownership.
- * The ASF licenses this file to You under the Apache License, Version 2.0
- * (the "License"); you may not use this file except in compliance with
- * the License.  You may obtain a copy of the License at
- *
- *    http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
- */
-
-package kafka.tools
-
-import java.io.{ByteArrayOutputStream, PrintWriter}
-import java.text.SimpleDateFormat
-import kafka.utils.{Exit, TestUtils}
-import org.apache.kafka.clients.consumer.ConsumerConfig
-import org.junit.jupiter.api.Assertions.{assertEquals, assertThrows}
-import org.junit.jupiter.api.Test
-
-class ConsumerPerformanceTest {
-  private val dateFormat = new SimpleDateFormat("yyyy-MM-dd HH:mm:ss:SSS")
-
-  @Test
-  def testDetailedHeaderMatchBody(): Unit = {
-    testHeaderMatchContent(detailed = true, 2,
-      () => ConsumerPerformance.printConsumerProgress(1, 1024 * 1024, 0, 1, 0, 
0, 1, dateFormat, 1L))
-  }
-
-  @Test
-  def testNonDetailedHeaderMatchBody(): Unit = {
-    testHeaderMatchContent(detailed = false, 2, () => 
println(s"${dateFormat.format(System.currentTimeMillis)}, " +
-      s"${dateFormat.format(System.currentTimeMillis)}, 1.0, 1.0, 1, 1.0, 1, 
1, 1.1, 1.1"))
-  }
-
-  @Test
-  def testConfigBrokerList(): Unit = {
-    //Given
-    val args: Array[String] = Array(
-      "--broker-list", "localhost:9092",
-      "--topic", "test",
-      "--messages", "10"
-    )
-
-    //When
-    val config = new ConsumerPerformance.ConsumerPerfConfig(args)
-
-    //Then
-    assertEquals("localhost:9092", config.brokerHostsAndPorts)
-    assertEquals("test", config.topic)
-    assertEquals(10, config.numMessages)
-  }
-
-  @Test
-  def testConfigBootStrapServer(): Unit = {
-    //Given
-    val args: Array[String] = Array(
-      "--bootstrap-server", "localhost:9092",
-      "--topic", "test",
-      "--messages", "10",
-      "--print-metrics"
-    )
-
-    //When
-    val config = new ConsumerPerformance.ConsumerPerfConfig(args)
-
-    //Then
-    assertEquals("localhost:9092", config.brokerHostsAndPorts)
-    assertEquals("test", config.topic)
-    assertEquals(10, config.numMessages)
-  }
-
-  @Test
-  def testBrokerListOverride(): Unit = {
-    //Given
-    val args: Array[String] = Array(
-      "--broker-list", "localhost:9094",
-      "--bootstrap-server", "localhost:9092",
-      "--topic", "test",
-      "--messages", "10"
-    )
-
-    //When
-    val config = new ConsumerPerformance.ConsumerPerfConfig(args)
-
-    //Then
-    assertEquals("localhost:9092", config.brokerHostsAndPorts)
-    assertEquals("test", config.topic)
-    assertEquals(10, config.numMessages)
-  }
-
-  @Test
-  def testConfigWithUnrecognizedOption(): Unit = {
-    Exit.setExitProcedure((_, message) => throw new 
IllegalArgumentException(message.orNull))
-    //Given
-    val args: Array[String] = Array(
-      "--broker-list", "localhost:9092",
-      "--topic", "test",
-      "--messages", "10",
-      "--new-consumer"
-    )
-    try assertThrows(classOf[IllegalArgumentException], () => new 
ConsumerPerformance.ConsumerPerfConfig(args))
-    finally Exit.resetExitProcedure()
-  }
-
-  @Test
-  def testClientIdOverride(): Unit = {
-    val consumerConfigFile = TestUtils.tempFile("test_consumer_config",".conf")
-    new PrintWriter(consumerConfigFile.getPath) { 
write("client.id=consumer-1"); close() }
-
-    //Given
-    val args: Array[String] = Array(
-      "--broker-list", "localhost:9092",
-      "--topic", "test",
-      "--messages", "10",
-      "--consumer.config", consumerConfigFile.getPath
-    )
-
-    //When
-    val config = new ConsumerPerformance.ConsumerPerfConfig(args)
-
-    //Then
-    assertEquals("consumer-1", 
config.props.getProperty(ConsumerConfig.CLIENT_ID_CONFIG))
-  }
-
-  @Test
-  def testDefaultClientId(): Unit = {
-    //Given
-    val args: Array[String] = Array(
-      "--broker-list", "localhost:9092",
-      "--topic", "test",
-      "--messages", "10"
-    )
-
-    //When
-    val config = new ConsumerPerformance.ConsumerPerfConfig(args)
-
-    //Then
-    assertEquals("perf-consumer-client", 
config.props.getProperty(ConsumerConfig.CLIENT_ID_CONFIG))
-  }
-
-  private def testHeaderMatchContent(detailed: Boolean, 
expectedOutputLineCount: Int, fun: () => Unit): Unit = {
-    val outContent = new ByteArrayOutputStream
-    try {
-      Console.withOut(outContent) {
-        ConsumerPerformance.printHeader(detailed)
-        fun()
-
-        val contents = outContent.toString.split("\n")
-        assertEquals(expectedOutputLineCount, contents.length)
-        val header = contents(0)
-        val body = contents(1)
-
-        assertEquals(header.split(",\\s").length, body.split(",\\s").length)
-      }
-    } finally {
-      outContent.close()
-    }
-  }
-}
diff --git a/tests/kafkatest/benchmarks/core/benchmark_test.py 
b/tests/kafkatest/benchmarks/core/benchmark_test.py
index 959f23a366c..497569650cc 100644
--- a/tests/kafkatest/benchmarks/core/benchmark_test.py
+++ b/tests/kafkatest/benchmarks/core/benchmark_test.py
@@ -202,7 +202,7 @@ class Benchmark(Test):
 
         Return aggregate throughput statistics for both producer and consumer.
 
-        (Under the hood, this runs ProducerPerformance.java, and 
ConsumerPerformance.scala)
+        (Under the hood, this runs ProducerPerformance.java, and 
ConsumerPerformance.java)
         """
         client_version = KafkaVersion(client_version)
         broker_version = KafkaVersion(broker_version)
diff --git a/tests/kafkatest/services/performance/consumer_performance.py 
b/tests/kafkatest/services/performance/consumer_performance.py
index 6df8dfb6be8..ed7cc99f86a 100644
--- a/tests/kafkatest/services/performance/consumer_performance.py
+++ b/tests/kafkatest/services/performance/consumer_performance.py
@@ -23,7 +23,7 @@ from kafkatest.version import DEV_BRANCH, V_2_0_0, 
LATEST_0_10_0
 
 class ConsumerPerformanceService(PerformanceService):
     """
-        See ConsumerPerformance.scala as the source of truth on these 
settings, but for reference:
+        See ConsumerPerformance tool as the source of truth on these settings, 
but for reference:
 
         "zookeeper" "The connection string for the zookeeper connection in the 
form host:port. Multiple URLS can
                      be given to allow fail-over. This option is only used 
with the old consumer."
diff --git 
a/tools/src/main/java/org/apache/kafka/tools/ConsumerPerformance.java 
b/tools/src/main/java/org/apache/kafka/tools/ConsumerPerformance.java
new file mode 100644
index 00000000000..56c51f34a84
--- /dev/null
+++ b/tools/src/main/java/org/apache/kafka/tools/ConsumerPerformance.java
@@ -0,0 +1,407 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ *    http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.kafka.tools;
+
+import joptsimple.OptionException;
+import joptsimple.OptionSpec;
+import org.apache.kafka.clients.consumer.ConsumerConfig;
+import org.apache.kafka.clients.consumer.ConsumerRebalanceListener;
+import org.apache.kafka.clients.consumer.ConsumerRecord;
+import org.apache.kafka.clients.consumer.ConsumerRecords;
+import org.apache.kafka.clients.consumer.KafkaConsumer;
+import org.apache.kafka.common.Metric;
+import org.apache.kafka.common.MetricName;
+import org.apache.kafka.common.TopicPartition;
+import org.apache.kafka.common.serialization.ByteArrayDeserializer;
+import org.apache.kafka.common.utils.Exit;
+import org.apache.kafka.common.utils.Utils;
+import org.apache.kafka.server.util.CommandDefaultOptions;
+import org.apache.kafka.server.util.CommandLineUtils;
+import org.apache.kafka.server.util.ToolsUtils;
+import org.slf4j.Logger;
+import org.slf4j.LoggerFactory;
+
+import java.io.IOException;
+import java.text.SimpleDateFormat;
+import java.time.Duration;
+import java.util.Collection;
+import java.util.Collections;
+import java.util.Map;
+import java.util.Properties;
+import java.util.Random;
+import java.util.Set;
+import java.util.concurrent.atomic.AtomicLong;
+
+import static joptsimple.util.RegexMatcher.regex;
+
+public class ConsumerPerformance {
+    private static final Logger LOG = 
LoggerFactory.getLogger(ConsumerPerformance.class);
+    private static final Random RND = new Random();
+
+    public static void main(String[] args) {
+        try {
+            LOG.info("Starting consumer...");
+            ConsumerPerfOptions options = new ConsumerPerfOptions(args);
+            AtomicLong totalMessagesRead = new AtomicLong(0);
+            AtomicLong totalBytesRead = new AtomicLong(0);
+            AtomicLong joinTimeMs = new AtomicLong(0);
+            AtomicLong joinTimeMsInSingleRound = new AtomicLong(0);
+
+            if (!options.hideHeader())
+                printHeader(options.showDetailedStats());
+
+            KafkaConsumer<byte[], byte[]> consumer = new 
KafkaConsumer<>(options.props());
+            long bytesRead = 0L;
+            long messagesRead = 0L;
+            long lastBytesRead = 0L;
+            long lastMessagesRead = 0L;
+            long currentTimeMs = System.currentTimeMillis();
+            long joinStartMs = currentTimeMs;
+            long startMs = currentTimeMs;
+            consume(consumer, options, totalMessagesRead, totalBytesRead, 
joinTimeMs,
+                bytesRead, messagesRead, lastBytesRead, lastMessagesRead,
+                joinStartMs, joinTimeMsInSingleRound);
+            long endMs = System.currentTimeMillis();
+
+            Map<MetricName, ? extends Metric> metrics = null;
+            if (options.printMetrics())
+                metrics = consumer.metrics();
+            consumer.close();
+
+            // print final stats
+            double elapsedSec = (endMs - startMs) / 1_000.0;
+            long fetchTimeInMs = (endMs - startMs) - joinTimeMs.get();
+            if (!options.showDetailedStats()) {
+                double totalMbRead = (totalBytesRead.get() * 1.0) / (1024 * 
1024);
+                System.out.printf("%s, %s, %.4f, %.4f, %d, %.4f, %d, %d, %.4f, 
%.4f%n",
+                    options.dateFormat().format(startMs),
+                    options.dateFormat().format(endMs),
+                    totalMbRead,
+                    totalMbRead / elapsedSec,
+                    totalMessagesRead.get(),
+                    totalMessagesRead.get() / elapsedSec,
+                    joinTimeMs.get(),
+                    fetchTimeInMs,
+                    totalMbRead / (fetchTimeInMs / 1000.0),
+                    totalMessagesRead.get() / (fetchTimeInMs / 1000.0)
+                );
+            }
+
+            if (metrics != null)
+                ToolsUtils.printMetrics(metrics);
+        } catch (Throwable e) {
+            System.err.println(e.getMessage());
+            System.err.println(Utils.stackTrace(e));
+            Exit.exit(1);
+        }
+    }
+
+    protected static void printHeader(boolean showDetailedStats) {
+        String newFieldsInHeader = ", rebalance.time.ms, fetch.time.ms, 
fetch.MB.sec, fetch.nMsg.sec";
+        if (!showDetailedStats)
+            System.out.printf("start.time, end.time, data.consumed.in.MB, 
MB.sec, data.consumed.in.nMsg, nMsg.sec%s%n", newFieldsInHeader);
+        else
+            System.out.printf("time, threadId, data.consumed.in.MB, MB.sec, 
data.consumed.in.nMsg, nMsg.sec%s%n", newFieldsInHeader);
+    }
+
+    private static void consume(KafkaConsumer<byte[], byte[]> consumer,
+                                ConsumerPerfOptions options,
+                                AtomicLong totalMessagesRead,
+                                AtomicLong totalBytesRead,
+                                AtomicLong joinTimeMs,
+                                long bytesRead,
+                                long messagesRead,
+                                long lastBytesRead,
+                                long lastMessagesRead,
+                                long joinStartMs,
+                                AtomicLong joinTimeMsInSingleRound) {
+        long numMessages = options.numMessages();
+        long recordFetchTimeoutMs = options.recordFetchTimeoutMs();
+        long reportingIntervalMs = options.reportingIntervalMs();
+        boolean showDetailedStats = options.showDetailedStats();
+        SimpleDateFormat dateFormat = options.dateFormat();
+        consumer.subscribe(options.topic(),
+            new ConsumerPerfRebListener(joinTimeMs, joinStartMs, 
joinTimeMsInSingleRound));
+
+        // now start the benchmark
+        long currentTimeMs = System.currentTimeMillis();
+        long lastReportTimeMs = currentTimeMs;
+        long lastConsumedTimeMs = currentTimeMs;
+
+        while (messagesRead < numMessages && currentTimeMs - 
lastConsumedTimeMs <= recordFetchTimeoutMs) {
+            ConsumerRecords<byte[], byte[]> records = 
consumer.poll(Duration.ofMillis(100));
+            currentTimeMs = System.currentTimeMillis();
+            if (!records.isEmpty())
+                lastConsumedTimeMs = currentTimeMs;
+            for (ConsumerRecord<byte[], byte[]> record : records) {
+                messagesRead += 1;
+                if (record.key() != null)
+                    bytesRead += record.key().length;
+                if (record.value() != null)
+                    bytesRead += record.value().length;
+                if (currentTimeMs - lastReportTimeMs >= reportingIntervalMs) {
+                    if (showDetailedStats)
+                        printConsumerProgress(0, bytesRead, lastBytesRead, 
messagesRead, lastMessagesRead,
+                            lastReportTimeMs, currentTimeMs, dateFormat, 
joinTimeMsInSingleRound.get());
+                    joinTimeMsInSingleRound = new AtomicLong(0);
+                    lastReportTimeMs = currentTimeMs;
+                    lastMessagesRead = messagesRead;
+                    lastBytesRead = bytesRead;
+                }
+            }
+        }
+
+        if (messagesRead < numMessages)
+            System.out.printf("WARNING: Exiting before consuming the expected 
number of messages: timeout (%d ms) exceeded. " +
+                "You can use the --timeout option to increase the timeout.%n", 
recordFetchTimeoutMs);
+        totalMessagesRead.set(messagesRead);
+        totalBytesRead.set(bytesRead);
+    }
+
+    protected static void printConsumerProgress(int id,
+                                                long bytesRead,
+                                                long lastBytesRead,
+                                                long messagesRead,
+                                                long lastMessagesRead,
+                                                long startMs,
+                                                long endMs,
+                                                SimpleDateFormat dateFormat,
+                                                long joinTimeMsInSingleRound) {
+        printBasicProgress(id, bytesRead, lastBytesRead, messagesRead, 
lastMessagesRead, startMs, endMs, dateFormat);
+        printExtendedProgress(bytesRead, lastBytesRead, messagesRead, 
lastMessagesRead, startMs, endMs, joinTimeMsInSingleRound);
+        System.out.println();
+    }
+
+    private static void printBasicProgress(int id,
+                                           long bytesRead,
+                                           long lastBytesRead,
+                                           long messagesRead,
+                                           long lastMessagesRead,
+                                           long startMs,
+                                           long endMs,
+                                           SimpleDateFormat dateFormat) {
+        double elapsedMs = endMs - startMs;
+        double totalMbRead = (bytesRead * 1.0) / (1024 * 1024);
+        double intervalMbRead = ((bytesRead - lastBytesRead) * 1.0) / (1024 * 
1024);
+        double intervalMbPerSec = 1000.0 * intervalMbRead / elapsedMs;
+        double intervalMessagesPerSec = ((messagesRead - lastMessagesRead) / 
elapsedMs) * 1000.0;
+        System.out.printf("%s, %d, %.4f, %.4f, %d, %.4f", 
dateFormat.format(endMs), id,
+            totalMbRead, intervalMbPerSec, messagesRead, 
intervalMessagesPerSec);
+    }
+
+    private static void printExtendedProgress(long bytesRead,
+                                              long lastBytesRead,
+                                              long messagesRead,
+                                              long lastMessagesRead,
+                                              long startMs,
+                                              long endMs,
+                                              long joinTimeMsInSingleRound) {
+        long fetchTimeMs = endMs - startMs - joinTimeMsInSingleRound;
+        double intervalMbRead = ((bytesRead - lastBytesRead) * 1.0) / (1024 * 
1024);
+        long intervalMessagesRead = messagesRead - lastMessagesRead;
+        double intervalMbPerSec = (fetchTimeMs <= 0) ? 0.0 : 1000.0 * 
intervalMbRead / fetchTimeMs;
+        double intervalMessagesPerSec = (fetchTimeMs <= 0) ? 0.0 : 1000.0 * 
intervalMessagesRead / fetchTimeMs;
+        System.out.printf(", %d, %d, %.4f, %.4f", joinTimeMsInSingleRound,
+            fetchTimeMs, intervalMbPerSec, intervalMessagesPerSec);
+    }
+
+    public static class ConsumerPerfRebListener implements 
ConsumerRebalanceListener {
+        private AtomicLong joinTimeMs;
+        private AtomicLong joinTimeMsInSingleRound;
+        private long joinStartMs;
+
+        public ConsumerPerfRebListener(AtomicLong joinTimeMs, long 
joinStartMs, AtomicLong joinTimeMsInSingleRound) {
+            this.joinTimeMs = joinTimeMs;
+            this.joinStartMs = joinStartMs;
+            this.joinTimeMsInSingleRound = joinTimeMsInSingleRound;
+        }
+
+        @Override
+        public void onPartitionsRevoked(Collection<TopicPartition> partitions) 
{
+            joinStartMs = System.currentTimeMillis();
+        }
+
+        @Override
+        public void onPartitionsAssigned(Collection<TopicPartition> 
partitions) {
+            long elapsedMs = System.currentTimeMillis() - joinStartMs;
+            joinTimeMs.addAndGet(elapsedMs);
+            joinTimeMsInSingleRound.addAndGet(elapsedMs);
+        }
+    }
+
+    protected static class ConsumerPerfOptions extends CommandDefaultOptions {
+        private final OptionSpec<String> brokerListOpt;
+        private final OptionSpec<String> bootstrapServerOpt;
+        private final OptionSpec<String> topicOpt;
+        private final OptionSpec<String> groupIdOpt;
+        private final OptionSpec<Integer> fetchSizeOpt;
+        private final OptionSpec<Void> resetBeginningOffsetOpt;
+        private final OptionSpec<Integer> socketBufferSizeOpt;
+        private final OptionSpec<Integer> numThreadsOpt;
+        private final OptionSpec<Integer> numFetchersOpt;
+        private final OptionSpec<String> consumerConfigOpt;
+        private final OptionSpec<Void> printMetricsOpt;
+        private final OptionSpec<Void> showDetailedStatsOpt;
+        private final OptionSpec<Long> recordFetchTimeoutOpt;
+        private final OptionSpec<Long> numMessagesOpt;
+        private final OptionSpec<Long> reportingIntervalOpt;
+        private final OptionSpec<String> dateFormatOpt;
+        private final OptionSpec<Void> hideHeaderOpt;
+
+        public ConsumerPerfOptions(String[] args) {
+            super(args);
+            brokerListOpt = parser.accepts("broker-list", "DEPRECATED, use 
--bootstrap-server instead; ignored if --bootstrap-server is specified. The 
broker list string in the form HOST1:PORT1,HOST2:PORT2.")
+                .withRequiredArg()
+                .describedAs("broker-list")
+                .ofType(String.class);
+            bootstrapServerOpt = parser.accepts("bootstrap-server", "REQUIRED 
unless --broker-list(deprecated) is specified. The server(s) to connect to.")
+                .requiredUnless("broker-list")
+                .withRequiredArg()
+                .describedAs("server to connect to")
+                .ofType(String.class);
+            topicOpt = parser.accepts("topic", "REQUIRED: The topic to consume 
from.")
+                .withRequiredArg()
+                .describedAs("topic")
+                .ofType(String.class);
+            groupIdOpt = parser.accepts("group", "The group id to consume on.")
+                .withRequiredArg()
+                .describedAs("gid")
+                .defaultsTo("perf-consumer-" + RND.nextInt(100_000))
+                .ofType(String.class);
+            fetchSizeOpt = parser.accepts("fetch-size", "The amount of data to 
fetch in a single request.")
+                .withRequiredArg()
+                .describedAs("size")
+                .ofType(Integer.class)
+                .defaultsTo(1024 * 1024);
+            resetBeginningOffsetOpt = parser.accepts("from-latest", "If the 
consumer does not already have an established " +
+                "offset to consume from, start with the latest message present 
in the log rather than the earliest message.");
+            socketBufferSizeOpt = parser.accepts("socket-buffer-size", "The 
size of the tcp RECV size.")
+                .withRequiredArg()
+                .describedAs("size")
+                .ofType(Integer.class)
+                .defaultsTo(2 * 1024 * 1024);
+            numThreadsOpt = parser.accepts("threads", "DEPRECATED AND IGNORED: 
Number of processing threads.")
+                .withRequiredArg()
+                .describedAs("count")
+                .ofType(Integer.class)
+                .defaultsTo(10);
+            numFetchersOpt = parser.accepts("num-fetch-threads", "DEPRECATED 
AND IGNORED: Number of fetcher threads.")
+                .withRequiredArg()
+                .describedAs("count")
+                .ofType(Integer.class)
+                .defaultsTo(1);
+            consumerConfigOpt = parser.accepts("consumer.config", "Consumer 
config properties file.")
+                .withRequiredArg()
+                .describedAs("config file")
+                .ofType(String.class);
+            printMetricsOpt = parser.accepts("print-metrics", "Print out the 
metrics.");
+            showDetailedStatsOpt = parser.accepts("show-detailed-stats", "If 
set, stats are reported for each reporting " +
+                "interval as configured by reporting-interval");
+            recordFetchTimeoutOpt = parser.accepts("timeout", "The maximum 
allowed time in milliseconds between returned records.")
+                .withOptionalArg()
+                .describedAs("milliseconds")
+                .ofType(Long.class)
+                .defaultsTo(10_000L);
+            numMessagesOpt = parser.accepts("messages", "REQUIRED: The number 
of messages to send or consume")
+                .withRequiredArg()
+                .describedAs("count")
+                .ofType(Long.class);
+            reportingIntervalOpt = parser.accepts("reporting-interval", 
"Interval in milliseconds at which to print progress info.")
+                .withRequiredArg()
+                .withValuesConvertedBy(regex("^\\d+$"))
+                .describedAs("interval_ms")
+                .ofType(Long.class)
+                .defaultsTo(5_000L);
+            dateFormatOpt = parser.accepts("date-format", "The date format to 
use for formatting the time field. " +
+                    "See java.text.SimpleDateFormat for options.")
+                .withRequiredArg()
+                .describedAs("date format")
+                .ofType(String.class)
+                .defaultsTo("yyyy-MM-dd HH:mm:ss:SSS");
+            hideHeaderOpt = parser.accepts("hide-header", "If set, skips 
printing the header for the stats");
+            try {
+                options = parser.parse(args);
+            } catch (OptionException e) {
+                CommandLineUtils.printUsageAndExit(parser, e.getMessage());
+                return;
+            }
+            if (options != null) {
+                if (options.has(numThreadsOpt) || options.has(numFetchersOpt))
+                    System.out.println("WARNING: option [threads] and 
[num-fetch-threads] have been deprecated and will be ignored by the test");
+                CommandLineUtils.maybePrintHelpOrVersion(this, "This tool is 
used to verify the consumer performance.");
+                CommandLineUtils.checkRequiredArgs(parser, options, topicOpt, 
numMessagesOpt);
+            }
+        }
+
+        public boolean printMetrics() {
+            return options.has(printMetricsOpt);
+        }
+
+        public String brokerHostsAndPorts() {
+            return options.valueOf(options.has(bootstrapServerOpt) ? 
bootstrapServerOpt : brokerListOpt);
+        }
+
+        public Properties props() throws IOException {
+            Properties props = (options.has(consumerConfigOpt))
+                ? Utils.loadProps(options.valueOf(consumerConfigOpt))
+                : new Properties();
+            props.put(ConsumerConfig.BOOTSTRAP_SERVERS_CONFIG, 
brokerHostsAndPorts());
+            props.put(ConsumerConfig.GROUP_ID_CONFIG, 
options.valueOf(groupIdOpt));
+            props.put(ConsumerConfig.RECEIVE_BUFFER_CONFIG, 
options.valueOf(socketBufferSizeOpt).toString());
+            props.put(ConsumerConfig.MAX_PARTITION_FETCH_BYTES_CONFIG, 
options.valueOf(fetchSizeOpt).toString());
+            props.put(ConsumerConfig.AUTO_OFFSET_RESET_CONFIG,
+                options.has(resetBeginningOffsetOpt) ? "latest" : "earliest");
+            props.put(ConsumerConfig.KEY_DESERIALIZER_CLASS_CONFIG, 
ByteArrayDeserializer.class);
+            props.put(ConsumerConfig.VALUE_DESERIALIZER_CLASS_CONFIG, 
ByteArrayDeserializer.class);
+            props.put(ConsumerConfig.CHECK_CRCS_CONFIG, "false");
+            if (props.getProperty(ConsumerConfig.CLIENT_ID_CONFIG) == null)
+                props.put(ConsumerConfig.CLIENT_ID_CONFIG, 
"perf-consumer-client");
+            return props;
+        }
+
+        public Set<String> topic() {
+            return Collections.singleton(options.valueOf(topicOpt));
+        }
+
+        public long numMessages() {
+            return options.valueOf(numMessagesOpt);
+        }
+
+        public long reportingIntervalMs() {
+            long value = options.valueOf(reportingIntervalOpt);
+            if (value <= 0)
+                throw new IllegalArgumentException("Reporting interval must be 
greater than 0.");
+            return value;
+        }
+
+        public boolean showDetailedStats() {
+            return options.has(showDetailedStatsOpt);
+        }
+
+        public SimpleDateFormat dateFormat() {
+            return new SimpleDateFormat(options.valueOf(dateFormatOpt));
+        }
+
+        public boolean hideHeader() {
+            return options.has(hideHeaderOpt);
+        }
+
+        public long recordFetchTimeoutMs() {
+            return options.valueOf(recordFetchTimeoutOpt);
+        }
+    }
+}
diff --git 
a/tools/src/test/java/org/apache/kafka/tools/ConsumerPerformanceTest.java 
b/tools/src/test/java/org/apache/kafka/tools/ConsumerPerformanceTest.java
new file mode 100644
index 00000000000..65a533063aa
--- /dev/null
+++ b/tools/src/test/java/org/apache/kafka/tools/ConsumerPerformanceTest.java
@@ -0,0 +1,172 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ *    http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.kafka.tools;
+
+import org.apache.kafka.clients.consumer.ConsumerConfig;
+import org.apache.kafka.common.utils.Exit;
+import org.junit.jupiter.api.AfterEach;
+import org.junit.jupiter.api.BeforeEach;
+import org.junit.jupiter.api.Test;
+import org.junit.jupiter.api.io.TempDir;
+
+import java.io.File;
+import java.io.IOException;
+import java.io.PrintWriter;
+import java.nio.file.Files;
+import java.nio.file.Path;
+import java.text.SimpleDateFormat;
+
+import static org.junit.jupiter.api.Assertions.assertEquals;
+import static org.junit.jupiter.api.Assertions.assertTrue;
+
+public class ConsumerPerformanceTest {
+    private final ToolsTestUtils.MockExitProcedure exitProcedure = new 
ToolsTestUtils.MockExitProcedure();
+    private final SimpleDateFormat dateFormat = new 
SimpleDateFormat("yyyy-MM-dd HH:mm:ss:SSS");
+
+    @TempDir
+    static Path tempDir;
+
+    @BeforeEach
+    public void beforeEach() {
+        Exit.setExitProcedure(exitProcedure);
+    }
+
+    @AfterEach
+    public void afterEach() {
+        Exit.resetExitProcedure();
+    }
+
+    @Test
+    public void testDetailedHeaderMatchBody() {
+        testHeaderMatchContent(true, 2,
+            () -> ConsumerPerformance.printConsumerProgress(1, 1024 * 1024, 0, 
1, 0, 0, 1, dateFormat, 1L));
+    }
+
+    @Test
+    public void testNonDetailedHeaderMatchBody() {
+        testHeaderMatchContent(false, 2,
+            () -> ConsumerPerformance.printConsumerProgress(1, 1024 * 1024, 0, 
1, 0, 0, 1, dateFormat, 1L));
+    }
+
+    @Test
+    public void testConfigBrokerList() {
+        String[] args = new String[]{
+            "--broker-list", "localhost:9092",
+            "--topic", "test",
+            "--messages", "10"
+        };
+
+        ConsumerPerformance.ConsumerPerfOptions config = new 
ConsumerPerformance.ConsumerPerfOptions(args);
+
+        assertEquals("localhost:9092", config.brokerHostsAndPorts());
+        assertTrue(config.topic().contains("test"));
+        assertEquals(10, config.numMessages());
+    }
+
+    @Test
+    public void testConfigBootStrapServer() {
+        String[] args = new String[]{
+            "--bootstrap-server", "localhost:9092",
+            "--topic", "test",
+            "--messages", "10",
+            "--print-metrics"
+        };
+
+        ConsumerPerformance.ConsumerPerfOptions config = new 
ConsumerPerformance.ConsumerPerfOptions(args);
+
+        assertEquals("localhost:9092", config.brokerHostsAndPorts());
+        assertTrue(config.topic().contains("test"));
+        assertEquals(10, config.numMessages());
+    }
+
+    @Test
+    public void testBrokerListOverride() {
+        String[] args = new String[]{
+            "--broker-list", "localhost:9094",
+            "--bootstrap-server", "localhost:9092",
+            "--topic", "test",
+            "--messages", "10"
+        };
+
+        ConsumerPerformance.ConsumerPerfOptions config = new 
ConsumerPerformance.ConsumerPerfOptions(args);
+
+        assertEquals("localhost:9092", config.brokerHostsAndPorts());
+        assertTrue(config.topic().contains("test"));
+        assertEquals(10, config.numMessages());
+    }
+
+    @Test
+    public void testConfigWithUnrecognizedOption() {
+        String[] args = new String[]{
+            "--broker-list", "localhost:9092",
+            "--topic", "test",
+            "--messages", "10",
+            "--new-consumer"
+        };
+
+        String err = ToolsTestUtils.captureStandardErr(() -> new 
ConsumerPerformance.ConsumerPerfOptions(args));
+
+        assertTrue(err.contains("new-consumer is not a recognized option"));
+    }
+
+    @Test
+    public void testClientIdOverride() throws IOException {
+        File tempFile = 
Files.createFile(tempDir.resolve("test_consumer_config.conf")).toFile();
+        try (PrintWriter output = new 
PrintWriter(Files.newOutputStream(tempFile.toPath()))) {
+            output.println("client.id=consumer-1");
+            output.flush();
+        }
+
+        String[] args = new String[]{
+            "--broker-list", "localhost:9092",
+            "--topic", "test",
+            "--messages", "10",
+            "--consumer.config", tempFile.getAbsolutePath()
+        };
+
+        ConsumerPerformance.ConsumerPerfOptions config = new 
ConsumerPerformance.ConsumerPerfOptions(args);
+
+        assertEquals("consumer-1", 
config.props().getProperty(ConsumerConfig.CLIENT_ID_CONFIG));
+    }
+
+    @Test
+    public void testDefaultClientId() throws IOException {
+        String[] args = new String[]{
+            "--broker-list", "localhost:9092",
+            "--topic", "test",
+            "--messages", "10"
+        };
+
+        ConsumerPerformance.ConsumerPerfOptions config = new 
ConsumerPerformance.ConsumerPerfOptions(args);
+
+        assertEquals("perf-consumer-client", 
config.props().getProperty(ConsumerConfig.CLIENT_ID_CONFIG));
+    }
+
+    private void testHeaderMatchContent(boolean detailed, int 
expectedOutputLineCount, Runnable runnable) {
+        String out = ToolsTestUtils.captureStandardOut(() -> {
+            ConsumerPerformance.printHeader(detailed);
+            runnable.run();
+        });
+
+        String[] contents = out.split("\n");
+        assertEquals(expectedOutputLineCount, contents.length);
+        String header = contents[0];
+        String body = contents[1];
+
+        assertEquals(header.split(",\\s").length, body.split(",\\s").length);
+    }
+}

Reply via email to