This is an automated email from the ASF dual-hosted git repository.

mimaison pushed a commit to branch trunk
in repository https://gitbox.apache.org/repos/asf/kafka.git


The following commit(s) were added to refs/heads/trunk by this push:
     new 672c6172334 KAFKA-14577: Move ConsoleProducer to tools module (#17157)
672c6172334 is described below

commit 672c6172334f5c1c25702abaa174eb25858af286
Author: Dmitry Werner <[email protected]>
AuthorDate: Mon Oct 7 14:19:59 2024 +0500

    KAFKA-14577: Move ConsoleProducer to tools module (#17157)
    
    
    Reviewers: Mickael Maison <[email protected]>, Federico Valeri 
<[email protected]>
---
 bin/kafka-console-producer.sh                      |   2 +-
 bin/windows/kafka-console-producer.bat             |   2 +-
 checkstyle/import-control.xml                      |   2 +
 checkstyle/suppressions.xml                        |   2 +-
 .../main/scala/kafka/tools/ConsoleProducer.scala   | 426 ---------------------
 core/src/main/scala/kafka/utils/ToolsUtils.scala   |  16 +-
 .../unit/kafka/tools/ConsoleProducerTest.scala     | 259 -------------
 .../unit/kafka/tools/LineMessageReaderTest.scala   | 354 -----------------
 .../native-image-configs/reflect-config.json       |   2 +-
 .../org/apache/kafka/tools/ConsoleProducer.java    | 349 +++++++++++++++++
 .../org/apache/kafka/tools/LineMessageReader.java  | 218 +++++++++++
 .../java/org/apache/kafka/tools/ToolsUtils.java    |   3 +-
 .../apache/kafka/tools/ConsoleProducerTest.java    | 252 ++++++++++++
 .../apache/kafka/tools/LineMessageReaderTest.java  | 403 +++++++++++++++++++
 .../org/apache/kafka/tools/api/RecordReader.java   |   2 +-
 15 files changed, 1231 insertions(+), 1061 deletions(-)

diff --git a/bin/kafka-console-producer.sh b/bin/kafka-console-producer.sh
index e5187b8b533..a0929407c0a 100755
--- a/bin/kafka-console-producer.sh
+++ b/bin/kafka-console-producer.sh
@@ -17,4 +17,4 @@
 if [ "x$KAFKA_HEAP_OPTS" = "x" ]; then
     export KAFKA_HEAP_OPTS="-Xmx512M"
 fi
-exec $(dirname $0)/kafka-run-class.sh kafka.tools.ConsoleProducer "$@"
+exec $(dirname $0)/kafka-run-class.sh org.apache.kafka.tools.ConsoleProducer 
"$@"
diff --git a/bin/windows/kafka-console-producer.bat 
b/bin/windows/kafka-console-producer.bat
index e1834bc5a85..d572c7af04e 100644
--- a/bin/windows/kafka-console-producer.bat
+++ b/bin/windows/kafka-console-producer.bat
@@ -16,5 +16,5 @@ rem limitations under the License.
 
 SetLocal
 set KAFKA_HEAP_OPTS=-Xmx512M
-"%~dp0kafka-run-class.bat" kafka.tools.ConsoleProducer %*
+"%~dp0kafka-run-class.bat" org.apache.kafka.tools.ConsoleProducer %*
 EndLocal
diff --git a/checkstyle/import-control.xml b/checkstyle/import-control.xml
index 74c99f834e7..834d51b7a89 100644
--- a/checkstyle/import-control.xml
+++ b/checkstyle/import-control.xml
@@ -312,6 +312,8 @@
     <allow pkg="scala.collection" />
     <allow pkg="org.apache.kafka.coordinator.transaction" />
     <allow pkg="org.apache.kafka.coordinator.group" />
+    <allow pkg="org.apache.kafka.tools" />
+    <allow pkg="org.apache.kafka.tools.api" />
 
     <subpackage name="consumer">
       <allow pkg="org.apache.kafka.tools"/>
diff --git a/checkstyle/suppressions.xml b/checkstyle/suppressions.xml
index 099d99a80de..4b243252f67 100644
--- a/checkstyle/suppressions.xml
+++ b/checkstyle/suppressions.xml
@@ -271,7 +271,7 @@
     <suppress checks="BooleanExpressionComplexity"
               files="(StreamsResetter|DefaultMessageFormatter).java"/>
     <suppress checks="NPathComplexity"
-              
files="(DefaultMessageFormatter|ProducerPerformance|StreamsResetter|Agent|TransactionalMessageCopier|ReplicaVerificationTool).java"/>
+              
files="(DefaultMessageFormatter|ProducerPerformance|StreamsResetter|Agent|TransactionalMessageCopier|ReplicaVerificationTool|LineMessageReader).java"/>
     <suppress checks="ImportControl"
               files="SignalLogger.java"/>
     <suppress checks="IllegalImport"
diff --git a/core/src/main/scala/kafka/tools/ConsoleProducer.scala 
b/core/src/main/scala/kafka/tools/ConsoleProducer.scala
deleted file mode 100644
index 64df24e4270..00000000000
--- a/core/src/main/scala/kafka/tools/ConsoleProducer.scala
+++ /dev/null
@@ -1,426 +0,0 @@
-/**
- * Licensed to the Apache Software Foundation (ASF) under one or more
- * contributor license agreements.  See the NOTICE file distributed with
- * this work for additional information regarding copyright ownership.
- * The ASF licenses this file to You under the Apache License, Version 2.0
- * (the "License"); you may not use this file except in compliance with
- * the License.  You may obtain a copy of the License at
- *
- *    http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
- */
-
-package kafka.tools
-
-import java.io._
-import java.nio.charset.StandardCharsets
-import java.util.Properties
-import java.util.regex.Pattern
-import joptsimple.{OptionException, OptionParser, OptionSet, OptionSpec}
-import kafka.utils.Implicits._
-import kafka.utils.{Logging, ToolsUtils}
-import org.apache.kafka.clients.producer.internals.ErrorLoggingCallback
-import org.apache.kafka.clients.producer.{KafkaProducer, Producer, 
ProducerConfig, ProducerRecord}
-import org.apache.kafka.common.KafkaException
-import org.apache.kafka.common.record.CompressionType
-import org.apache.kafka.common.utils.{Exit, Utils}
-import org.apache.kafka.server.util.{CommandDefaultOptions, CommandLineUtils}
-import org.apache.kafka.tools.api.RecordReader
-
-import java.lang
-
-object ConsoleProducer extends Logging {
-
-  private[tools] def newReader(className: String, prop: Properties): 
RecordReader = {
-    val reader = 
Class.forName(className).getDeclaredConstructor().newInstance()
-    reader match {
-      case r: RecordReader =>
-        r.configure(prop.asInstanceOf[java.util.Map[String, _]])
-        r
-      case _ => throw new IllegalArgumentException(f"the reader must implement 
${classOf[RecordReader].getName}")
-    }
-  }
-
-  private[tools] def loopReader(producer: Producer[Array[Byte], Array[Byte]],
-                               reader: RecordReader,
-                                inputStream: InputStream,
-                               sync: Boolean): Unit = {
-    val iter = reader.readRecords(inputStream)
-    try while (iter.hasNext) send(producer, iter.next(), sync) finally 
reader.close()
-  }
-
-  def main(args: Array[String]): Unit = {
-
-    try {
-      val config = new ProducerConfig(args)
-      val input = System.in
-      val producer = new KafkaProducer[Array[Byte], 
Array[Byte]](producerProps(config))
-      try loopReader(producer, newReader(config.readerClass, 
getReaderProps(config)), input, config.sync)
-      finally producer.close()
-      Exit.exit(0)
-    } catch {
-      case e: joptsimple.OptionException =>
-        System.err.println(e.getMessage)
-        Exit.exit(1)
-      case e: Exception =>
-        e.printStackTrace()
-        Exit.exit(1)
-    }
-  }
-
-  private def send(producer: Producer[Array[Byte], Array[Byte]],
-                         record: ProducerRecord[Array[Byte], Array[Byte]], 
sync: Boolean): Unit = {
-    if (sync)
-      producer.send(record).get()
-    else
-      producer.send(record, new ErrorLoggingCallback(record.topic, record.key, 
record.value, false))
-  }
-
-  def getReaderProps(config: ProducerConfig): Properties = {
-    val props =
-      if (config.options.has(config.readerConfigOpt))
-        Utils.loadProps(config.options.valueOf(config.readerConfigOpt))
-      else new Properties
-    props.put("topic", config.topic)
-    props ++= config.cmdLineProps
-    props
-  }
-
-  def producerProps(config: ProducerConfig): Properties = {
-    val props =
-      if (config.options.has(config.producerConfigOpt))
-        Utils.loadProps(config.options.valueOf(config.producerConfigOpt))
-      else new Properties
-
-    props ++= config.extraProducerProps
-
-    if (config.bootstrapServer != null)
-      props.put(ProducerConfig.BOOTSTRAP_SERVERS_CONFIG, 
config.bootstrapServer)
-    else
-      props.put(ProducerConfig.BOOTSTRAP_SERVERS_CONFIG, config.brokerList)
-
-    props.put(ProducerConfig.COMPRESSION_TYPE_CONFIG, config.compressionCodec)
-    if (props.getProperty(ProducerConfig.CLIENT_ID_CONFIG) == null)
-      props.put(ProducerConfig.CLIENT_ID_CONFIG, "console-producer")
-    props.put(ProducerConfig.KEY_SERIALIZER_CLASS_CONFIG, 
"org.apache.kafka.common.serialization.ByteArraySerializer")
-    props.put(ProducerConfig.VALUE_SERIALIZER_CLASS_CONFIG, 
"org.apache.kafka.common.serialization.ByteArraySerializer")
-
-    CommandLineUtils.maybeMergeOptions(
-      props, ProducerConfig.LINGER_MS_CONFIG, config.options, 
config.sendTimeoutOpt)
-    CommandLineUtils.maybeMergeOptions(
-      props, ProducerConfig.ACKS_CONFIG, config.options, 
config.requestRequiredAcksOpt)
-    CommandLineUtils.maybeMergeOptions(
-      props, ProducerConfig.REQUEST_TIMEOUT_MS_CONFIG, config.options, 
config.requestTimeoutMsOpt)
-    CommandLineUtils.maybeMergeOptions(
-      props, ProducerConfig.RETRIES_CONFIG, config.options, 
config.messageSendMaxRetriesOpt)
-    CommandLineUtils.maybeMergeOptions(
-      props, ProducerConfig.RETRY_BACKOFF_MS_CONFIG, config.options, 
config.retryBackoffMsOpt)
-    CommandLineUtils.maybeMergeOptions(
-      props, ProducerConfig.SEND_BUFFER_CONFIG, config.options, 
config.socketBufferSizeOpt)
-    CommandLineUtils.maybeMergeOptions(
-      props, ProducerConfig.BUFFER_MEMORY_CONFIG, config.options, 
config.maxMemoryBytesOpt)
-    // We currently have 2 options to set the batch.size value. We'll 
deprecate/remove one of them in KIP-717.
-    CommandLineUtils.maybeMergeOptions(
-      props, ProducerConfig.BATCH_SIZE_CONFIG, config.options, 
config.batchSizeOpt)
-    CommandLineUtils.maybeMergeOptions(
-      props, ProducerConfig.BATCH_SIZE_CONFIG, config.options, 
config.maxPartitionMemoryBytesOpt)
-    CommandLineUtils.maybeMergeOptions(
-      props, ProducerConfig.METADATA_MAX_AGE_CONFIG, config.options, 
config.metadataExpiryMsOpt)
-    CommandLineUtils.maybeMergeOptions(
-      props, ProducerConfig.MAX_BLOCK_MS_CONFIG, config.options, 
config.maxBlockMsOpt)
-
-    props
-  }
-
-  class ProducerConfig(args: Array[String]) extends 
CommandDefaultOptions(args) {
-    val topicOpt: OptionSpec[String] = parser.accepts("topic", "REQUIRED: The 
topic id to produce messages to.")
-      .withRequiredArg
-      .describedAs("topic")
-      .ofType(classOf[String])
-    private val brokerListOpt = parser.accepts("broker-list", "DEPRECATED, use 
--bootstrap-server instead; ignored if --bootstrap-server is specified.  The 
broker list string in the form HOST1:PORT1,HOST2:PORT2.")
-      .withRequiredArg
-      .describedAs("broker-list")
-      .ofType(classOf[String])
-    val bootstrapServerOpt: OptionSpec[String]  = 
parser.accepts("bootstrap-server", "REQUIRED unless --broker-list(deprecated) 
is specified. The server(s) to connect to. The broker list string in the form 
HOST1:PORT1,HOST2:PORT2.")
-      .requiredUnless("broker-list")
-      .withRequiredArg
-      .describedAs("server to connect to")
-      .ofType(classOf[String])
-    private val syncOpt = parser.accepts("sync", "If set message send requests 
to the brokers are synchronously, one at a time as they arrive.")
-    val compressionCodecOpt: OptionSpec[String]  = 
parser.accepts("compression-codec", "The compression codec: either 'none', 
'gzip', 'snappy', 'lz4', or 'zstd'." +
-                                                                  "If 
specified without value, then it defaults to 'gzip'")
-                                    .withOptionalArg()
-                                    .describedAs("compression-codec")
-                                    .ofType(classOf[String])
-    val batchSizeOpt: OptionSpec[Integer] = parser.accepts("batch-size", 
"Number of messages to send in a single batch if they are not being sent 
synchronously. "+
-       "please note that this option will be replaced if 
max-partition-memory-bytes is also set")
-      .withRequiredArg
-      .describedAs("size")
-      .ofType(classOf[java.lang.Integer])
-      .defaultsTo(16 * 1024)
-    val messageSendMaxRetriesOpt: OptionSpec[Integer] = 
parser.accepts("message-send-max-retries", "Brokers can fail receiving the 
message for multiple reasons, " +
-      "and being unavailable transiently is just one of them. This property 
specifies the number of retries before the producer give up and drop this 
message. " +
-      "This is the option to control `retries` in producer configs.")
-      .withRequiredArg
-      .ofType(classOf[java.lang.Integer])
-      .defaultsTo(3)
-    val retryBackoffMsOpt: OptionSpec[lang.Long] = 
parser.accepts("retry-backoff-ms", "Before each retry, the producer refreshes 
the metadata of relevant topics. " +
-      "Since leader election takes a bit of time, this property specifies the 
amount of time that the producer waits before refreshing the metadata. " +
-      "This is the option to control `retry.backoff.ms` in producer configs.")
-      .withRequiredArg
-      .ofType(classOf[java.lang.Long])
-      .defaultsTo(100)
-    val sendTimeoutOpt: OptionSpec[lang.Long] = parser.accepts("timeout", "If 
set and the producer is running in asynchronous mode, this gives the maximum 
amount of time" +
-      " a message will queue awaiting sufficient batch size. The value is 
given in ms. " +
-      "This is the option to control `linger.ms` in producer configs.")
-      .withRequiredArg
-      .describedAs("timeout_ms")
-      .ofType(classOf[java.lang.Long])
-      .defaultsTo(1000)
-    val requestRequiredAcksOpt: OptionSpec[String] = 
parser.accepts("request-required-acks", "The required `acks` of the producer 
requests")
-      .withRequiredArg
-      .describedAs("request required acks")
-      .ofType(classOf[java.lang.String])
-      .defaultsTo("-1")
-    val requestTimeoutMsOpt: OptionSpec[Integer] = 
parser.accepts("request-timeout-ms", "The ack timeout of the producer requests. 
Value must be non-negative and non-zero.")
-      .withRequiredArg
-      .describedAs("request timeout ms")
-      .ofType(classOf[java.lang.Integer])
-      .defaultsTo(1500)
-    val metadataExpiryMsOpt: OptionSpec[lang.Long] = 
parser.accepts("metadata-expiry-ms",
-      "The period of time in milliseconds after which we force a refresh of 
metadata even if we haven't seen any leadership changes. " +
-        "This is the option to control `metadata.max.age.ms` in producer 
configs.")
-      .withRequiredArg
-      .describedAs("metadata expiration interval")
-      .ofType(classOf[java.lang.Long])
-      .defaultsTo(5*60*1000L)
-    val maxBlockMsOpt: OptionSpec[lang.Long] = parser.accepts("max-block-ms",
-      "The max time that the producer will block for during a send request.")
-      .withRequiredArg
-      .describedAs("max block on send")
-      .ofType(classOf[java.lang.Long])
-      .defaultsTo(60*1000L)
-    val maxMemoryBytesOpt: OptionSpec[lang.Long] = 
parser.accepts("max-memory-bytes",
-      "The total memory used by the producer to buffer records waiting to be 
sent to the server. " +
-        "This is the option to control `buffer.memory` in producer configs.")
-      .withRequiredArg
-      .describedAs("total memory in bytes")
-      .ofType(classOf[java.lang.Long])
-      .defaultsTo(32 * 1024 * 1024L)
-    val maxPartitionMemoryBytesOpt: OptionSpec[Integer] = 
parser.accepts("max-partition-memory-bytes",
-      "The buffer size allocated for a partition. When records are received 
which are smaller than this size the producer " +
-        "will attempt to optimistically group them together until this size is 
reached. " +
-        "This is the option to control `batch.size` in producer configs.")
-      .withRequiredArg
-      .describedAs("memory in bytes per partition")
-      .ofType(classOf[java.lang.Integer])
-      .defaultsTo(16 * 1024)
-    private val messageReaderOpt = parser.accepts("line-reader", "The class 
name of the class to use for reading lines from standard in. " +
-      "By default each line is read as a separate message.")
-      .withRequiredArg
-      .describedAs("reader_class")
-      .ofType(classOf[java.lang.String])
-      .defaultsTo(classOf[LineMessageReader].getName)
-    val socketBufferSizeOpt: OptionSpec[Integer] = 
parser.accepts("socket-buffer-size", "The size of the tcp RECV size. " +
-      "This is the option to control `send.buffer.bytes` in producer configs.")
-      .withRequiredArg
-      .describedAs("size")
-      .ofType(classOf[java.lang.Integer])
-      .defaultsTo(1024*100)
-    private val propertyOpt = parser.accepts("property",
-      """A mechanism to pass user-defined properties in the form key=value to 
the message reader. This allows custom configuration for a user-defined message 
reader.
-        |Default properties include:
-        | parse.key=false
-        | parse.headers=false
-        | ignore.error=false
-        | key.separator=\t
-        | headers.delimiter=\t
-        | headers.separator=,
-        | headers.key.separator=:
-        | null.marker=   When set, any fields (key, value and headers) equal 
to this will be replaced by null
-        |Default parsing pattern when:
-        | parse.headers=true and parse.key=true:
-        |  "h1:v1,h2:v2...\tkey\tvalue"
-        | parse.key=true:
-        |  "key\tvalue"
-        | parse.headers=true:
-        |  "h1:v1,h2:v2...\tvalue"
-      """.stripMargin
-      )
-      .withRequiredArg
-      .describedAs("prop")
-      .ofType(classOf[String])
-    val readerConfigOpt: OptionSpec[String] = parser.accepts("reader-config", 
s"Config properties file for the message reader. Note that $propertyOpt takes 
precedence over this config.")
-      .withRequiredArg
-      .describedAs("config file")
-      .ofType(classOf[String])
-    private val producerPropertyOpt = parser.accepts("producer-property", "A 
mechanism to pass user-defined properties in the form key=value to the 
producer. ")
-            .withRequiredArg
-            .describedAs("producer_prop")
-            .ofType(classOf[String])
-    val producerConfigOpt: OptionSpec[String]  = 
parser.accepts("producer.config", s"Producer config properties file. Note that 
$producerPropertyOpt takes precedence over this config.")
-      .withRequiredArg
-      .describedAs("config file")
-      .ofType(classOf[String])
-
-    options = tryParse(parser, args)
-
-    CommandLineUtils.maybePrintHelpOrVersion(this, "This tool helps to read 
data from standard input and publish it to Kafka.")
-
-    CommandLineUtils.checkRequiredArgs(parser, options, topicOpt)
-
-    val topic: String = options.valueOf(topicOpt)
-
-    val bootstrapServer: String = options.valueOf(bootstrapServerOpt)
-    val brokerList: String = options.valueOf(brokerListOpt)
-
-    val brokerHostsAndPorts: String = options.valueOf(if 
(options.has(bootstrapServerOpt)) bootstrapServerOpt else brokerListOpt)
-    ToolsUtils.validatePortOrDie(parser, brokerHostsAndPorts)
-
-    val sync: Boolean = options.has(syncOpt)
-    private val compressionCodecOptionValue = 
options.valueOf(compressionCodecOpt)
-    val compressionCodec: String = if (options.has(compressionCodecOpt))
-                             if (compressionCodecOptionValue == null || 
compressionCodecOptionValue.isEmpty)
-                               CompressionType.GZIP.name
-                             else compressionCodecOptionValue
-                           else CompressionType.NONE.name
-    val readerClass: String = options.valueOf(messageReaderOpt)
-    val cmdLineProps: Properties = 
CommandLineUtils.parseKeyValueArgs(options.valuesOf(propertyOpt))
-    val extraProducerProps: Properties = 
CommandLineUtils.parseKeyValueArgs(options.valuesOf(producerPropertyOpt))
-
-    def tryParse(parser: OptionParser, args: Array[String]): OptionSet = {
-      try
-        parser.parse(args: _*)
-      catch {
-        case e: OptionException =>
-          ToolsUtils.printUsageAndExit(parser, e.getMessage)
-      }
-    }
-  }
-
-  class LineMessageReader extends RecordReader {
-    var topic: String = _
-    var parseKey: Boolean = false
-    var keySeparator: String = "\t"
-    var parseHeaders: Boolean = false
-    private var headersDelimiter = "\t"
-    var headersSeparator: String = ","
-    private var headersKeySeparator = ":"
-    private var ignoreError = false
-    private var lineNumber = 0
-    private val printPrompt = System.console != null
-    private var headersSeparatorPattern: Pattern = _
-    private var nullMarker: String = _
-
-    override def configure(props: java.util.Map[String, _]): Unit = {
-      topic = props.get("topic").toString
-      if (props.containsKey("parse.key"))
-        parseKey = 
props.get("parse.key").toString.trim.equalsIgnoreCase("true")
-      if (props.containsKey("key.separator"))
-        keySeparator = props.get("key.separator").toString
-      if (props.containsKey("parse.headers"))
-        parseHeaders = 
props.get("parse.headers").toString.trim.equalsIgnoreCase("true")
-      if (props.containsKey("headers.delimiter"))
-        headersDelimiter = props.get("headers.delimiter").toString
-      if (props.containsKey("headers.separator"))
-        headersSeparator = props.get("headers.separator").toString
-      headersSeparatorPattern = Pattern.compile(headersSeparator)
-      if (props.containsKey("headers.key.separator"))
-        headersKeySeparator = props.get("headers.key.separator").toString
-      if (props.containsKey("ignore.error"))
-        ignoreError = 
props.get("ignore.error").toString.trim.equalsIgnoreCase("true")
-      if (headersDelimiter == headersSeparator)
-        throw new KafkaException("headers.delimiter and headers.separator may 
not be equal")
-      if (headersDelimiter == headersKeySeparator)
-        throw new KafkaException("headers.delimiter and headers.key.separator 
may not be equal")
-      if (headersSeparator == headersKeySeparator)
-        throw new KafkaException("headers.separator and headers.key.separator 
may not be equal")
-      if (props.containsKey("null.marker"))
-        nullMarker = props.get("null.marker").toString
-      if (nullMarker == keySeparator)
-        throw new KafkaException("null.marker and key.separator may not be 
equal")
-      if (nullMarker == headersSeparator)
-        throw new KafkaException("null.marker and headers.separator may not be 
equal")
-      if (nullMarker == headersDelimiter)
-        throw new KafkaException("null.marker and headers.delimiter may not be 
equal")
-      if (nullMarker == headersKeySeparator)
-        throw new KafkaException("null.marker and headers.key.separator may 
not be equal")
-    }
-
-    override def readRecords(inputStream: InputStream): 
java.util.Iterator[ProducerRecord[Array[Byte], Array[Byte]]] =
-      new java.util.Iterator[ProducerRecord[Array[Byte], Array[Byte]]] {
-        private[this] val reader = new BufferedReader(new 
InputStreamReader(inputStream, StandardCharsets.UTF_8))
-        private[this] var current: ProducerRecord[Array[Byte], Array[Byte]] = _
-        override def hasNext: Boolean =
-          if (current != null) true
-          else {
-            lineNumber += 1
-            if (printPrompt) print(">")
-            val line = reader.readLine()
-            current = line match {
-              case null => null
-              case line =>
-                val headers = parse(parseHeaders, line, 0, headersDelimiter, 
"headers delimiter")
-                val headerOffset = if (headers == null) 0 else headers.length 
+ headersDelimiter.length
-
-                val key = parse(parseKey, line, headerOffset, keySeparator, 
"key separator")
-                val keyOffset = if (key == null) 0 else key.length + 
keySeparator.length
-
-                val value = line.substring(headerOffset + keyOffset)
-
-                val record = new ProducerRecord[Array[Byte], Array[Byte]](
-                  topic,
-                  if (key != null && key != nullMarker) 
key.getBytes(StandardCharsets.UTF_8) else null,
-                  if (value != null && value != nullMarker) 
value.getBytes(StandardCharsets.UTF_8) else null,
-                )
-
-                if (headers != null && headers != nullMarker) {
-                  splitHeaders(headers)
-                    .foreach(header => record.headers.add(header._1, 
header._2))
-                }
-                record
-            }
-            current != null
-          }
-
-        override def next(): ProducerRecord[Array[Byte], Array[Byte]] = if 
(!hasNext) throw new NoSuchElementException("no more record")
-        else try current finally current = null
-      }
-
-
-    private def parse(enabled: Boolean, line: String, startIndex: Int, 
demarcation: String, demarcationName: String): String = {
-      (enabled, line.indexOf(demarcation, startIndex)) match {
-        case (false, _) => null
-        case (_, -1) =>
-          if (ignoreError) null
-          else throw new KafkaException(s"No $demarcationName found on line 
number $lineNumber: '$line'")
-        case (_, index) => line.substring(startIndex, index)
-      }
-    }
-
-    private def splitHeaders(headers: String): Array[(String, Array[Byte])] = {
-      headersSeparatorPattern.split(headers).map { pair =>
-        (pair.indexOf(headersKeySeparator), ignoreError) match {
-          case (-1, false) => throw new KafkaException(s"No header key 
separator found in pair '$pair' on line number $lineNumber")
-          case (-1, true) => (pair, null)
-          case (i, _) =>
-            val headerKey = pair.substring(0, i) match {
-              case k if k == nullMarker =>
-                throw new KafkaException(s"Header keys should not be equal to 
the null marker '$nullMarker' as they can't be null")
-              case k => k
-            }
-            val headerValue = pair.substring(i + headersKeySeparator.length) 
match {
-              case v if v == nullMarker => null
-              case v => v.getBytes(StandardCharsets.UTF_8)
-            }
-            (headerKey, headerValue)
-        }
-      }
-    }
-  }
-}
diff --git a/core/src/main/scala/kafka/utils/ToolsUtils.scala 
b/core/src/main/scala/kafka/utils/ToolsUtils.scala
index 5c076c7f746..7831ee64d1e 100644
--- a/core/src/main/scala/kafka/utils/ToolsUtils.scala
+++ b/core/src/main/scala/kafka/utils/ToolsUtils.scala
@@ -20,24 +20,10 @@ import joptsimple.OptionParser
 import org.apache.kafka.server.util.CommandLineUtils
 
 object ToolsUtils {
-
-  def validatePortOrDie(parser: OptionParser, hostPort: String): Unit = {
-    val hostPorts: Array[String] = if (hostPort.contains(','))
-      hostPort.split(",")
-    else
-      Array(hostPort)
-    val validHostPort = hostPorts.filter { hostPortData =>
-      org.apache.kafka.common.utils.Utils.getPort(hostPortData) != null
-    }
-    val isValid = validHostPort.nonEmpty && validHostPort.length == 
hostPorts.length
-    if (!isValid)
-      CommandLineUtils.printUsageAndExit(parser, "Please provide valid 
host:port like host1:9091,host2:9092\n ")
-  }
-
   /**
    * This is a simple wrapper around `CommandLineUtils.printUsageAndExit`.
    * It is needed for tools migration (KAFKA-14525), as there is no Java 
equivalent for return type `Nothing`.
-   * Can be removed once [[kafka.tools.ConsoleProducer]] are migrated.
+   * Can be removed once ZooKeeper related code are deleted.
    *
    * @param parser Command line options parser.
    * @param message Error message.
diff --git a/core/src/test/scala/unit/kafka/tools/ConsoleProducerTest.scala 
b/core/src/test/scala/unit/kafka/tools/ConsoleProducerTest.scala
deleted file mode 100644
index ab27ffc9595..00000000000
--- a/core/src/test/scala/unit/kafka/tools/ConsoleProducerTest.scala
+++ /dev/null
@@ -1,259 +0,0 @@
-/*
- * Licensed to the Apache Software Foundation (ASF) under one or more
- * contributor license agreements.  See the NOTICE file distributed with
- * this work for additional information regarding copyright ownership.
- * The ASF licenses this file to You under the Apache License, Version 2.0
- * (the "License"); you may not use this file except in compliance with
- * the License.  You may obtain a copy of the License at
- *
- *      http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
- */
-
-package kafka.tools
-
-import kafka.tools.ConsoleProducer.LineMessageReader
-import kafka.utils.TestUtils
-import org.apache.kafka.clients.producer.{Producer, ProducerConfig, 
ProducerRecord}
-import org.apache.kafka.common.utils.Exit
-import org.apache.kafka.tools.api.RecordReader
-import org.junit.jupiter.api.Assertions.{assertEquals, assertThrows, 
assertTrue}
-import org.junit.jupiter.api.Test
-import org.mockito.Mockito
-
-import java.io.InputStream
-import java.util
-import java.util.Properties
-
-class ConsoleProducerTest {
-
-  val brokerListValidArgs: Array[String] = Array(
-    "--broker-list",
-    "localhost:1001,localhost:1002",
-    "--topic",
-    "t3",
-    "--property",
-    "parse.key=true",
-    "--property",
-    "key.separator=#"
-  )
-  val bootstrapServerValidArgs: Array[String] = Array(
-    "--bootstrap-server",
-    "localhost:1003,localhost:1004",
-    "--topic",
-    "t3",
-    "--property",
-    "parse.key=true",
-    "--property",
-    "key.separator=#"
-  )
-  val invalidArgs: Array[String] = Array(
-    "--t", // not a valid argument
-    "t3"
-  )
-  val bootstrapServerOverride: Array[String] = Array(
-    "--broker-list",
-    "localhost:1001",
-    "--bootstrap-server",
-    "localhost:1002",
-    "--topic",
-    "t3",
-  )
-  val clientIdOverride: Array[String] = Array(
-    "--broker-list",
-    "localhost:1001",
-    "--topic",
-    "t3",
-    "--producer-property",
-    "client.id=producer-1"
-  )
-  val batchSizeOverriddenByMaxPartitionMemoryBytesValue: Array[String] = Array(
-    "--broker-list",
-    "localhost:1001",
-    "--bootstrap-server",
-    "localhost:1002",
-    "--topic",
-    "t3",
-    "--batch-size",
-    "123",
-    "--max-partition-memory-bytes",
-    "456"
-  )
-  val btchSizeSetAndMaxPartitionMemoryBytesNotSet: Array[String] = Array(
-    "--broker-list",
-    "localhost:1001",
-    "--bootstrap-server",
-    "localhost:1002",
-    "--topic",
-    "t3",
-    "--batch-size",
-    "123"
-  )
-  val batchSizeNotSetAndMaxPartitionMemoryBytesSet: Array[String] = Array(
-    "--broker-list",
-    "localhost:1001",
-    "--bootstrap-server",
-    "localhost:1002",
-    "--topic",
-    "t3",
-    "--max-partition-memory-bytes",
-    "456"
-  )
-  val batchSizeDefault: Array[String] = Array(
-    "--broker-list",
-    "localhost:1001",
-    "--bootstrap-server",
-    "localhost:1002",
-    "--topic",
-    "t3"
-  )
-
-  @Test
-  def testValidConfigsBrokerList(): Unit = {
-    val config = new ConsoleProducer.ProducerConfig(brokerListValidArgs)
-    val producerConfig = new 
ProducerConfig(ConsoleProducer.producerProps(config))
-    assertEquals(util.Arrays.asList("localhost:1001", "localhost:1002"),
-      producerConfig.getList(ProducerConfig.BOOTSTRAP_SERVERS_CONFIG))
-  }
-
-  @Test
-  def testValidConfigsBootstrapServer(): Unit = {
-    val config = new ConsoleProducer.ProducerConfig(bootstrapServerValidArgs)
-    val producerConfig = new 
ProducerConfig(ConsoleProducer.producerProps(config))
-    assertEquals(util.Arrays.asList("localhost:1003", "localhost:1004"),
-      producerConfig.getList(ProducerConfig.BOOTSTRAP_SERVERS_CONFIG))
-  }
-
-  @Test
-  def testInvalidConfigs(): Unit = {
-    Exit.setExitProcedure((_, message) => throw new 
IllegalArgumentException(message))
-    try assertThrows(classOf[IllegalArgumentException], () => new 
ConsoleProducer.ProducerConfig(invalidArgs))
-    finally Exit.resetExitProcedure()
-  }
-
-  @Test
-  def testParseKeyProp(): Unit = {
-    val config = new ConsoleProducer.ProducerConfig(brokerListValidArgs)
-    val reader = 
Class.forName(config.readerClass).getDeclaredConstructor().newInstance().asInstanceOf[LineMessageReader]
-    
reader.configure(ConsoleProducer.getReaderProps(config).asInstanceOf[java.util.Map[String,
 _]])
-    assertTrue(reader.keySeparator == "#")
-    assertTrue(reader.parseKey)
-  }
-
-  @Test
-  def testParseReaderConfigFile(): Unit = {
-    val propsFile = TestUtils.tempPropertiesFile(Map("parse.key" -> "true", 
"key.separator" -> "|"))
-
-    val args = Array(
-      "--bootstrap-server", "localhost:9092",
-      "--topic", "test",
-      "--property", "key.separator=;",
-      "--property", "parse.headers=true",
-      "--reader-config", propsFile.getAbsolutePath
-    )
-    val config = new ConsoleProducer.ProducerConfig(args)
-    val reader = 
Class.forName(config.readerClass).getDeclaredConstructor().newInstance().asInstanceOf[LineMessageReader]
-    
reader.configure(ConsoleProducer.getReaderProps(config).asInstanceOf[java.util.Map[String,
 _]])
-    assertEquals(";", reader.keySeparator)
-    assertTrue(reader.parseKey)
-    assertTrue(reader.parseHeaders)
-  }
-
-  @Test
-  def testBootstrapServerOverride(): Unit = {
-    val config = new ConsoleProducer.ProducerConfig(bootstrapServerOverride)
-    val producerConfig = new 
ProducerConfig(ConsoleProducer.producerProps(config))
-    assertEquals(util.Arrays.asList("localhost:1002"),
-      producerConfig.getList(ProducerConfig.BOOTSTRAP_SERVERS_CONFIG))
-  }
-
-  @Test
-  def testClientIdOverride(): Unit = {
-    val config = new ConsoleProducer.ProducerConfig(clientIdOverride)
-    val producerConfig = new 
ProducerConfig(ConsoleProducer.producerProps(config))
-    assertEquals("producer-1",
-      producerConfig.getString(ProducerConfig.CLIENT_ID_CONFIG))
-  }
-
-  @Test
-  def testDefaultClientId(): Unit = {
-    val config = new ConsoleProducer.ProducerConfig(brokerListValidArgs)
-    val producerConfig = new 
ProducerConfig(ConsoleProducer.producerProps(config))
-    assertEquals("console-producer",
-      producerConfig.getString(ProducerConfig.CLIENT_ID_CONFIG))
-  }
-
-  @Test
-  def testBatchSizeOverriddenByMaxPartitionMemoryBytesValue(): Unit = {
-    val config = new 
ConsoleProducer.ProducerConfig(batchSizeOverriddenByMaxPartitionMemoryBytesValue)
-    val producerConfig = new 
ProducerConfig(ConsoleProducer.producerProps(config))
-    assertEquals(456,
-      producerConfig.getInt(ProducerConfig.BATCH_SIZE_CONFIG))
-  }
-
-  @Test
-  def testBatchSizeSetAndMaxPartitionMemoryBytesNotSet(): Unit = {
-    val config = new 
ConsoleProducer.ProducerConfig(btchSizeSetAndMaxPartitionMemoryBytesNotSet)
-    val producerConfig = new 
ProducerConfig(ConsoleProducer.producerProps(config))
-    assertEquals(123,
-      producerConfig.getInt(ProducerConfig.BATCH_SIZE_CONFIG))
-  }
-
-  @Test
-  def testDefaultBatchSize(): Unit = {
-    val config = new ConsoleProducer.ProducerConfig(batchSizeDefault)
-    val producerConfig = new 
ProducerConfig(ConsoleProducer.producerProps(config))
-    assertEquals(16*1024,
-      producerConfig.getInt(ProducerConfig.BATCH_SIZE_CONFIG))
-  }
-
-  @Test
-  def testBatchSizeNotSetAndMaxPartitionMemoryBytesSet (): Unit = {
-    val config = new 
ConsoleProducer.ProducerConfig(batchSizeNotSetAndMaxPartitionMemoryBytesSet)
-    val producerConfig = new 
ProducerConfig(ConsoleProducer.producerProps(config))
-    assertEquals(456,
-      producerConfig.getInt(ProducerConfig.BATCH_SIZE_CONFIG))
-  }
-
-  @Test
-  def testNewReader(): Unit = {
-    ConsoleProducerTest.configureCount = 0
-    ConsoleProducerTest.closeCount = 0
-    val reader1 = 
ConsoleProducer.newReader(classOf[ConsoleProducerTest.TestRecordReader].getName,
 new Properties())
-    assertEquals(1, ConsoleProducerTest.configureCount)
-    assertEquals(0, ConsoleProducerTest.closeCount)
-    reader1.close()
-    assertEquals(1, ConsoleProducerTest.closeCount)
-  }
-
-  @Test
-  def testLoopReader(): Unit = {
-    ConsoleProducerTest.configureCount = 0
-    ConsoleProducerTest.closeCount = 0
-    val reader = 
ConsoleProducer.newReader(classOf[ConsoleProducerTest.TestRecordReader].getName,
 new Properties())
-
-    ConsoleProducer.loopReader(Mockito.mock(classOf[Producer[Array[Byte], 
Array[Byte]]]),
-      reader, System.in, false)
-
-    assertEquals(1, ConsoleProducerTest.configureCount)
-    assertEquals(1, ConsoleProducerTest.closeCount)
-  }
-}
-
-object ConsoleProducerTest {
-  var configureCount = 0
-  var closeCount = 0
-
-  class TestRecordReader extends RecordReader {
-    override def configure(configs: util.Map[String, _]): Unit = 
configureCount += 1
-    override def readRecords(inputStream: InputStream): 
java.util.Iterator[ProducerRecord[Array[Byte], Array[Byte]]] =
-      java.util.Collections.emptyIterator()
-
-    override def close(): Unit = closeCount += 1
-  }
-}
diff --git a/core/src/test/scala/unit/kafka/tools/LineMessageReaderTest.scala 
b/core/src/test/scala/unit/kafka/tools/LineMessageReaderTest.scala
deleted file mode 100644
index 740f42762de..00000000000
--- a/core/src/test/scala/unit/kafka/tools/LineMessageReaderTest.scala
+++ /dev/null
@@ -1,354 +0,0 @@
-/**
- * Licensed to the Apache Software Foundation (ASF) under one or more
- * contributor license agreements.  See the NOTICE file distributed with
- * this work for additional information regarding copyright ownership.
- * The ASF licenses this file to You under the Apache License, Version 2.0
- * (the "License") you may not use this file except in compliance with
- * the License.  You may obtain a copy of the License at
- *
- * http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
- */
-
-package kafka.tools
-
-import kafka.tools.ConsoleProducer.LineMessageReader
-import org.apache.kafka.clients.producer.ProducerRecord
-import org.apache.kafka.common.KafkaException
-import org.junit.jupiter.api.Assertions.{assertEquals, assertFalse, 
assertThrows}
-import org.junit.jupiter.api.Test
-
-import java.io.ByteArrayInputStream
-import java.util.Properties
-
-class LineMessageReaderTest {
-
-  private def defaultTestProps = {
-    val props = new Properties
-    props.put("topic", "topic")
-    props.put("parse.key", "true")
-    props.put("parse.headers", "true")
-    props
-  }
-
-  @Test
-  def testLineReader(): Unit = {
-    val input = "key0\tvalue0\nkey1\tvalue1"
-
-    val props = defaultTestProps
-    props.put("parse.headers", "false")
-
-    runTest(props, input, record("key0", "value0"), record("key1", "value1"))
-  }
-
-  @Test
-  def testLineReaderHeader(): Unit = {
-    val input = 
"headerKey0:headerValue0,headerKey1:headerValue1\tkey0\tvalue0\n"
-    val expected = record("key0", "value0", List("headerKey0" -> 
"headerValue0", "headerKey1" -> "headerValue1"))
-    runTest(defaultTestProps, input, expected)
-  }
-
-  @Test
-  def testMinimalValidInputWithHeaderKeyAndValue(): Unit = {
-    runTest(defaultTestProps, ":\t\t", record("", "", List("" -> "")))
-  }
-
-  @Test
-  def testKeyMissingValue(): Unit = {
-    val props = defaultTestProps
-    props.put("parse.headers", "false")
-    runTest(props, "key\t", record("key", ""))
-  }
-
-  @Test
-  def testDemarcationsLongerThanOne(): Unit = {
-    val props = defaultTestProps
-    props.put("key.separator", "\t\t")
-    props.put("headers.delimiter", "\t\t")
-    props.put("headers.separator", "---")
-    props.put("headers.key.separator", "::::")
-
-    runTest(
-      props,
-      "headerKey0.0::::headerValue0.0---headerKey1.0::::\t\tkey\t\tvalue",
-      record("key", "value", List("headerKey0.0" -> "headerValue0.0", 
"headerKey1.0"-> ""))
-    )
-  }
-
-  @Test
-  def testLineReaderHeaderNoKey(): Unit = {
-    val input = "headerKey:headerValue\tvalue\n"
-
-    val props = defaultTestProps
-    props.put("parse.key", "false")
-
-    runTest(props, input, record(null, "value", List("headerKey" -> 
"headerValue")))
-  }
-
-  @Test
-  def testLineReaderOnlyValue(): Unit = {
-    val props = defaultTestProps
-    props.put("parse.key", "false")
-    props.put("parse.headers", "false")
-
-    runTest(props, "value\n", record(null, "value"))
-  }
-
-  @Test
-  def 
testParseHeaderEnabledWithCustomDelimiterAndVaryingNumberOfKeyValueHeaderPairs():
 Unit = {
-    val props = defaultTestProps
-    props.put("key.separator", "#")
-    props.put("headers.delimiter", "!")
-    props.put("headers.separator", "&")
-    props.put("headers.key.separator", ":")
-
-    val input =
-      "headerKey0.0:headerValue0.0&headerKey0.1:headerValue0.1!key0#value0\n" +
-      "headerKey1.0:headerValue1.0!key1#value1"
-
-    val record0 = record("key0", "value0", List("headerKey0.0" -> 
"headerValue0.0", "headerKey0.1" -> "headerValue0.1"))
-    val record1 = record("key1", "value1", List("headerKey1.0" -> 
"headerValue1.0"))
-
-    runTest(props, input, record0, record1)
-  }
-
-  @Test
-  def testMissingKeySeparator(): Unit = {
-    val lineReader = new LineMessageReader
-    val input =
-      
"headerKey0.0:headerValue0.0,headerKey0.1:headerValue0.1\tkey0\tvalue0\n" +
-      "headerKey1.0:headerValue1.0\tkey1[MISSING-DELIMITER]value1"
-
-    lineReader.configure(defaultTestProps.asInstanceOf[java.util.Map[String, 
_]])
-    val iter = lineReader.readRecords(new ByteArrayInputStream(input.getBytes))
-    iter.next()
-
-    val expectedException = assertThrows(classOf[KafkaException], () => 
iter.next())
-
-    assertEquals(
-      "No key separator found on line number 2: 
'headerKey1.0:headerValue1.0\tkey1[MISSING-DELIMITER]value1'",
-      expectedException.getMessage
-    )
-  }
-
-  @Test
-  def testMissingHeaderKeySeparator(): Unit = {
-    val lineReader = new LineMessageReader()
-    val input = "key[MISSING-DELIMITER]val\tkey0\tvalue0\n"
-    lineReader.configure(defaultTestProps.asInstanceOf[java.util.Map[String, 
_]])
-    val iter = lineReader.readRecords(new ByteArrayInputStream(input.getBytes))
-
-    val expectedException = assertThrows(classOf[KafkaException], () => 
iter.next())
-
-    assertEquals(
-      "No header key separator found in pair 'key[MISSING-DELIMITER]val' on 
line number 1",
-      expectedException.getMessage
-    )
-  }
-
-  @Test
-  def testHeaderDemarcationCollision(): Unit = {
-    val props = defaultTestProps
-    props.put("headers.delimiter", "\t")
-    props.put("headers.separator", "\t")
-    props.put("headers.key.separator", "\t")
-
-    assertThrowsOnInvalidPatternConfig(props, "headers.delimiter and 
headers.separator may not be equal")
-
-    props.put("headers.separator", ",")
-    assertThrowsOnInvalidPatternConfig(props, "headers.delimiter and 
headers.key.separator may not be equal")
-
-    props.put("headers.key.separator", ",")
-    assertThrowsOnInvalidPatternConfig(props, "headers.separator and 
headers.key.separator may not be equal")
-  }
-
-  private def assertThrowsOnInvalidPatternConfig(props: Properties, 
expectedMessage: String): Unit = {
-    val exception = assertThrows(classOf[KafkaException], () => new 
LineMessageReader().configure(props.asInstanceOf[java.util.Map[String, _]]))
-    assertEquals(
-      expectedMessage,
-      exception.getMessage
-    )
-  }
-
-  @Test
-  def testIgnoreErrorInInput(): Unit = {
-    val input =
-      "headerKey0.0:headerValue0.0\tkey0\tvalue0\n" +
-      
"headerKey1.0:headerValue1.0,headerKey1.1:headerValue1.1[MISSING-HEADER-DELIMITER]key1\tvalue1\n"
 +
-      "headerKey2.0:headerValue2.0\tkey2[MISSING-KEY-DELIMITER]value2\n" +
-      
"headerKey3.0:headerValue3.0[MISSING-HEADER-DELIMITER]key3[MISSING-KEY-DELIMITER]value3\n"
-
-    val props = defaultTestProps
-    props.put("ignore.error", "true")
-
-    val validRecord = record("key0", "value0", List("headerKey0.0" -> 
"headerValue0.0"))
-
-    val missingHeaderDelimiter: ProducerRecord[String, String] =
-      record(
-        null,
-        "value1",
-        List("headerKey1.0" -> "headerValue1.0", "headerKey1.1" -> 
"headerValue1.1[MISSING-HEADER-DELIMITER]key1")
-      )
-
-    val missingKeyDelimiter: ProducerRecord[String, String] =
-      record(
-        null,
-        "key2[MISSING-KEY-DELIMITER]value2",
-        List("headerKey2.0" -> "headerValue2.0")
-      )
-
-    val missingKeyHeaderDelimiter: ProducerRecord[String, String] =
-      record(
-        null,
-        
"headerKey3.0:headerValue3.0[MISSING-HEADER-DELIMITER]key3[MISSING-KEY-DELIMITER]value3",
-        List()
-      )
-
-    runTest(props, input, validRecord, missingHeaderDelimiter, 
missingKeyDelimiter, missingKeyHeaderDelimiter)
-  }
-
-  @Test
-  def testMalformedHeaderIgnoreError(): Unit = {
-    val input = "key-val\tkey0\tvalue0\n"
-
-    val props = defaultTestProps
-    props.put("ignore.error", "true")
-
-    val expected = record("key0", "value0", List("key-val" -> null))
-
-    runTest(props, input, expected)
-  }
-
-  @Test
-  def testNullMarker(): Unit = {
-    val input =
-      "key\t\n" +
-      "key\t<NULL>\n" +
-      "key\t<NULL>value\n" +
-      "<NULL>\tvalue\n" +
-      "<NULL>\t<NULL>"
-
-    val props = defaultTestProps
-    props.put("null.marker", "<NULL>")
-    props.put("parse.headers", "false")
-    runTest(props, input,
-      record("key", ""),
-      record("key", null),
-      record("key", "<NULL>value"),
-      record(null, "value"),
-      record(null, null))
-
-    // If the null marker is not set
-    props.remove("null.marker")
-    runTest(props, input,
-      record("key", ""),
-      record("key", "<NULL>"),
-      record("key", "<NULL>value"),
-      record("<NULL>", "value"),
-      record("<NULL>", "<NULL>"))
-  }
-
-  @Test
-  def testNullMarkerWithHeaders(): Unit = {
-    val input =
-      "h0:v0,h1:v1\t<NULL>\tvalue\n" +
-      "<NULL>\tkey\t<NULL>\n" +
-      "h0:,h1:v1\t<NULL>\t<NULL>\n" +
-      "h0:<NULL>,h1:v1\tkey\t<NULL>\n" +
-      "h0:<NULL>,h1:<NULL>value\tkey\t<NULL>\n"
-    val header = "h1" -> "v1"
-
-    val props = defaultTestProps
-    props.put("null.marker", "<NULL>")
-    runTest(props, input,
-      record(null, "value", List("h0" -> "v0", header)),
-      record("key", null),
-      record(null, null, List("h0" -> "", header)),
-      record("key", null, List("h0" -> null, header)),
-      record("key", null, List("h0" -> null, "h1" -> "<NULL>value")))
-
-    // If the null marker is not set
-    val lineReader = new LineMessageReader()
-    props.remove("null.marker")
-    lineReader.configure(props.asInstanceOf[java.util.Map[String, _]])
-    val iter = lineReader.readRecords(new ByteArrayInputStream(input.getBytes))
-    assertRecordEquals(record("<NULL>", "value", List("h0" -> "v0", header)), 
iter.next())
-    // line 2 is not valid anymore
-    val expectedException = assertThrows(classOf[KafkaException], () => 
iter.next())
-    assertEquals(
-      "No header key separator found in pair '<NULL>' on line number 2",
-      expectedException.getMessage
-    )
-    assertRecordEquals(record("<NULL>", "<NULL>", List("h0" -> "", header)), 
iter.next())
-    assertRecordEquals(record("key", "<NULL>", List("h0" -> "<NULL>", 
header)), iter.next())
-    assertRecordEquals(record("key", "<NULL>", List("h0" -> "<NULL>", "h1" -> 
"<NULL>value")), iter.next())
-  }
-
-  @Test
-  def testNullMarkerHeaderKeyThrows(): Unit = {
-    val input = "<NULL>:v0,h1:v1\tkey\tvalue\n"
-
-    val props = defaultTestProps
-    props.put("null.marker", "<NULL>")
-    val lineReader = new LineMessageReader()
-    lineReader.configure(props.asInstanceOf[java.util.Map[String, _]])
-    val iter = lineReader.readRecords(new ByteArrayInputStream(input.getBytes))
-    val expectedException = assertThrows(classOf[KafkaException], () => 
iter.next())
-    assertEquals(
-      "Header keys should not be equal to the null marker '<NULL>' as they 
can't be null",
-      expectedException.getMessage
-    )
-
-    // If the null marker is not set
-    props.remove("null.marker")
-    runTest(props, input, record("key", "value", List("<NULL>" -> "v0", "h1" 
-> "v1")))
-  }
-
-  @Test
-  def testInvalidNullMarker(): Unit = {
-    val props = defaultTestProps
-    props.put("headers.delimiter", "-")
-    props.put("headers.separator", ":")
-    props.put("headers.key.separator", "/")
-
-    props.put("null.marker", "-")
-    assertThrowsOnInvalidPatternConfig(props, "null.marker and 
headers.delimiter may not be equal")
-
-    props.put("null.marker", ":")
-    assertThrowsOnInvalidPatternConfig(props, "null.marker and 
headers.separator may not be equal")
-
-    props.put("null.marker", "/")
-    assertThrowsOnInvalidPatternConfig(props, "null.marker and 
headers.key.separator may not be equal")
-  }
-
-  def runTest(props: Properties, input: String, expectedRecords: 
ProducerRecord[String, String]*): Unit = {
-    val lineReader = new LineMessageReader
-    lineReader.configure(props.asInstanceOf[java.util.Map[String, _]])
-    val iter = lineReader.readRecords(new ByteArrayInputStream(input.getBytes))
-    expectedRecords.foreach(r => assertRecordEquals(r, iter.next()))
-    assertFalse(iter.hasNext)
-    assertThrows(classOf[NoSuchElementException], () => iter.next())
-  }
-
-  //  The equality method of ProducerRecord compares memory references for the 
header iterator, this is why this custom equality check is used.
-  private def assertRecordEquals[K, V](expected: ProducerRecord[K, V], actual: 
ProducerRecord[Array[Byte], Array[Byte]]): Unit = {
-    assertEquals(expected.key, if (actual.key == null) null else new 
String(actual.key))
-    assertEquals(expected.value, if (actual.value == null) null else new 
String(actual.value))
-    assertEquals(expected.headers.toArray.toList, 
actual.headers.toArray.toList)
-  }
-
-  private def record[K, V](key: K, value: V, headers: List[(String, String)]): 
ProducerRecord[K, V] = {
-    val record = new ProducerRecord("topic", key, value)
-    headers.foreach(h => record.headers.add(h._1, if (h._2 != null) 
h._2.getBytes else null))
-    record
-  }
-
-  private def record[K, V](key: K, value: V): ProducerRecord[K, V] = {
-    new ProducerRecord("topic", key, value)
-  }
-}
diff --git a/docker/native/native-image-configs/reflect-config.json 
b/docker/native/native-image-configs/reflect-config.json
index 838e35d6f9b..dc7a219be4f 100644
--- a/docker/native/native-image-configs/reflect-config.json
+++ b/docker/native/native-image-configs/reflect-config.json
@@ -716,7 +716,7 @@
   "methods":[{"name":"<init>","parameterTypes":[] }]
 },
 {
-  "name":"kafka.tools.ConsoleProducer$LineMessageReader",
+  "name":"org.apache.kafka.tools.LineMessageReader",
   "methods":[{"name":"<init>","parameterTypes":[] }]
 },
 {
diff --git a/tools/src/main/java/org/apache/kafka/tools/ConsoleProducer.java 
b/tools/src/main/java/org/apache/kafka/tools/ConsoleProducer.java
new file mode 100644
index 00000000000..23da5b1b2d3
--- /dev/null
+++ b/tools/src/main/java/org/apache/kafka/tools/ConsoleProducer.java
@@ -0,0 +1,349 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ *    http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.kafka.tools;
+
+import org.apache.kafka.clients.producer.KafkaProducer;
+import org.apache.kafka.clients.producer.Producer;
+import org.apache.kafka.clients.producer.ProducerRecord;
+import org.apache.kafka.clients.producer.internals.ErrorLoggingCallback;
+import org.apache.kafka.common.record.CompressionType;
+import org.apache.kafka.common.utils.Exit;
+import org.apache.kafka.server.util.CommandDefaultOptions;
+import org.apache.kafka.server.util.CommandLineUtils;
+import org.apache.kafka.tools.api.RecordReader;
+
+import java.io.IOException;
+import java.util.HashMap;
+import java.util.Iterator;
+import java.util.Map;
+import java.util.Properties;
+
+import joptsimple.OptionException;
+import joptsimple.OptionSpec;
+
+import static org.apache.kafka.clients.producer.ProducerConfig.ACKS_CONFIG;
+import static 
org.apache.kafka.clients.producer.ProducerConfig.BATCH_SIZE_CONFIG;
+import static 
org.apache.kafka.clients.producer.ProducerConfig.BOOTSTRAP_SERVERS_CONFIG;
+import static 
org.apache.kafka.clients.producer.ProducerConfig.BUFFER_MEMORY_CONFIG;
+import static 
org.apache.kafka.clients.producer.ProducerConfig.CLIENT_ID_CONFIG;
+import static 
org.apache.kafka.clients.producer.ProducerConfig.COMPRESSION_TYPE_CONFIG;
+import static 
org.apache.kafka.clients.producer.ProducerConfig.KEY_SERIALIZER_CLASS_CONFIG;
+import static 
org.apache.kafka.clients.producer.ProducerConfig.LINGER_MS_CONFIG;
+import static 
org.apache.kafka.clients.producer.ProducerConfig.MAX_BLOCK_MS_CONFIG;
+import static 
org.apache.kafka.clients.producer.ProducerConfig.METADATA_MAX_AGE_CONFIG;
+import static 
org.apache.kafka.clients.producer.ProducerConfig.REQUEST_TIMEOUT_MS_CONFIG;
+import static org.apache.kafka.clients.producer.ProducerConfig.RETRIES_CONFIG;
+import static 
org.apache.kafka.clients.producer.ProducerConfig.RETRY_BACKOFF_MS_CONFIG;
+import static 
org.apache.kafka.clients.producer.ProducerConfig.SEND_BUFFER_CONFIG;
+import static 
org.apache.kafka.clients.producer.ProducerConfig.VALUE_SERIALIZER_CLASS_CONFIG;
+import static org.apache.kafka.common.utils.Utils.loadProps;
+import static org.apache.kafka.common.utils.Utils.propsToStringMap;
+import static org.apache.kafka.server.util.CommandLineUtils.parseKeyValueArgs;
+
+public class ConsoleProducer {
+    public static void main(String[] args) {
+        ConsoleProducer consoleProducer = new ConsoleProducer();
+        consoleProducer.start(args);
+    }
+
+    void start(String[] args) {
+        try {
+            ConsoleProducerOptions opts = new ConsoleProducerOptions(args);
+            RecordReader reader = messageReader(opts);
+            KafkaProducer<byte[], byte[]> producer = new 
KafkaProducer<>(opts.producerProps());
+
+            Exit.addShutdownHook("producer-shutdown-hook", producer::close);
+
+            loopReader(producer, reader, opts.sync());
+        } catch (OptionException e) {
+            System.err.println(e.getMessage());
+            Exit.exit(1);
+        } catch (Exception e) {
+            e.printStackTrace();
+            Exit.exit(1);
+        }
+        Exit.exit(0);
+    }
+
+    // Visible for testing
+    RecordReader messageReader(ConsoleProducerOptions opts) throws Exception {
+        Object objReader = 
Class.forName(opts.readerClass()).getDeclaredConstructor().newInstance();
+
+        if (objReader instanceof RecordReader) {
+            RecordReader reader = (RecordReader) objReader;
+            reader.configure(opts.readerProps());
+
+            return reader;
+        }
+
+        throw new IllegalArgumentException("The reader must implement " + 
RecordReader.class.getName() + " interface");
+    }
+
+    // Visible for testing
+    void loopReader(Producer<byte[], byte[]> producer, RecordReader reader, 
boolean sync) throws Exception {
+        Iterator<ProducerRecord<byte[], byte[]>> iter = 
reader.readRecords(System.in);
+        try {
+            while (iter.hasNext()) {
+                send(producer, iter.next(), sync);
+            }
+        } finally {
+            reader.close();
+        }
+    }
+
+    private void send(Producer<byte[], byte[]> producer, 
ProducerRecord<byte[], byte[]> record, boolean sync) throws Exception {
+        if (sync) {
+            producer.send(record).get();
+        } else {
+            producer.send(record, new ErrorLoggingCallback(record.topic(), 
record.key(), record.value(), false));
+        }
+    }
+
+    static final class ConsoleProducerOptions extends CommandDefaultOptions {
+        private final OptionSpec<String> topicOpt;
+        private final OptionSpec<String> bootstrapServerOpt;
+        private final OptionSpec<Void> syncOpt;
+        private final OptionSpec<String> compressionCodecOpt;
+        private final OptionSpec<Integer> batchSizeOpt;
+        private final OptionSpec<Integer> messageSendMaxRetriesOpt;
+        private final OptionSpec<Long> retryBackoffMsOpt;
+        private final OptionSpec<Long> sendTimeoutOpt;
+        private final OptionSpec<String> requestRequiredAcksOpt;
+        private final OptionSpec<Integer> requestTimeoutMsOpt;
+        private final OptionSpec<Long> metadataExpiryMsOpt;
+        private final OptionSpec<Long> maxBlockMsOpt;
+        private final OptionSpec<Long> maxMemoryBytesOpt;
+        private final OptionSpec<Integer> maxPartitionMemoryBytesOpt;
+        private final OptionSpec<String> messageReaderOpt;
+        private final OptionSpec<Integer> socketBufferSizeOpt;
+        private final OptionSpec<String> propertyOpt;
+        private final OptionSpec<String> readerConfigOpt;
+        private final OptionSpec<String> producerPropertyOpt;
+        private final OptionSpec<String> producerConfigOpt;
+
+        public ConsoleProducerOptions(String[] args) {
+            super(args);
+            topicOpt = parser.accepts("topic", "REQUIRED: The topic name to 
produce messages to.")
+                    .withRequiredArg()
+                    .describedAs("topic")
+                    .ofType(String.class);
+            bootstrapServerOpt = parser.accepts("bootstrap-server", "REQUIRED: 
The server(s) to connect to. The broker list string in the form 
HOST1:PORT1,HOST2:PORT2.")
+                    .withRequiredArg()
+                    .describedAs("server to connect to")
+                    .ofType(String.class);
+            syncOpt = parser.accepts("sync", "If set message send requests to 
the brokers are synchronously, one at a time as they arrive.");
+            compressionCodecOpt = parser.accepts("compression-codec", "The 
compression codec: either 'none', 'gzip', 'snappy', 'lz4', or 'zstd'." +
+                            "If specified without value, then it defaults to 
'gzip'")
+                    .withOptionalArg()
+                    .describedAs("compression-codec")
+                    .ofType(String.class);
+            batchSizeOpt = parser.accepts("batch-size", "Number of messages to 
send in a single batch if they are not being sent synchronously. " +
+                            "please note that this option will be replaced if 
max-partition-memory-bytes is also set")
+                    .withRequiredArg()
+                    .describedAs("size")
+                    .ofType(Integer.class)
+                    .defaultsTo(16 * 1024);
+            messageSendMaxRetriesOpt = 
parser.accepts("message-send-max-retries", "Brokers can fail receiving the 
message for multiple reasons, " +
+                            "and being unavailable transiently is just one of 
them. This property specifies the number of retries before the producer give up 
and drop this message. " +
+                            "This is the option to control `retries` in 
producer configs.")
+                    .withRequiredArg()
+                    .ofType(Integer.class)
+                    .defaultsTo(3);
+            retryBackoffMsOpt = parser.accepts("retry-backoff-ms", "Before 
each retry, the producer refreshes the metadata of relevant topics. " +
+                            "Since leader election takes a bit of time, this 
property specifies the amount of time that the producer waits before refreshing 
the metadata. " +
+                            "This is the option to control `retry.backoff.ms` 
in producer configs.")
+                    .withRequiredArg()
+                    .ofType(Long.class)
+                    .defaultsTo(100L);
+            sendTimeoutOpt = parser.accepts("timeout", "If set and the 
producer is running in asynchronous mode, this gives the maximum amount of 
time" +
+                            " a message will queue awaiting sufficient batch 
size. The value is given in ms. " +
+                            "This is the option to control `linger.ms` in 
producer configs.")
+                    .withRequiredArg()
+                    .describedAs("timeout_ms")
+                    .ofType(Long.class)
+                    .defaultsTo(1000L);
+            requestRequiredAcksOpt = parser.accepts("request-required-acks", 
"The required `acks` of the producer requests")
+                    .withRequiredArg()
+                    .describedAs("request required acks")
+                    .ofType(String.class)
+                    .defaultsTo("-1");
+            requestTimeoutMsOpt = parser.accepts("request-timeout-ms", "The 
ack timeout of the producer requests. Value must be non-negative and non-zero.")
+                    .withRequiredArg()
+                    .describedAs("request timeout ms")
+                    .ofType(Integer.class)
+                    .defaultsTo(1500);
+            metadataExpiryMsOpt = parser.accepts("metadata-expiry-ms",
+                            "The period of time in milliseconds after which we 
force a refresh of metadata even if we haven't seen any leadership changes. " +
+                                    "This is the option to control 
`metadata.max.age.ms` in producer configs.")
+                    .withRequiredArg()
+                    .describedAs("metadata expiration interval")
+                    .ofType(Long.class)
+                    .defaultsTo(5 * 60 * 1000L);
+            maxBlockMsOpt = parser.accepts("max-block-ms",
+                            "The max time that the producer will block for 
during a send request.")
+                    .withRequiredArg()
+                    .describedAs("max block on send")
+                    .ofType(Long.class)
+                    .defaultsTo(60 * 1000L);
+            maxMemoryBytesOpt = parser.accepts("max-memory-bytes",
+                            "The total memory used by the producer to buffer 
records waiting to be sent to the server. " +
+                                    "This is the option to control 
`buffer.memory` in producer configs.")
+                    .withRequiredArg()
+                    .describedAs("total memory in bytes")
+                    .ofType(Long.class)
+                    .defaultsTo(32 * 1024 * 1024L);
+            maxPartitionMemoryBytesOpt = 
parser.accepts("max-partition-memory-bytes",
+                            "The buffer size allocated for a partition. When 
records are received which are smaller than this size the producer " +
+                                    "will attempt to optimistically group them 
together until this size is reached. " +
+                                    "This is the option to control 
`batch.size` in producer configs.")
+                    .withRequiredArg()
+                    .describedAs("memory in bytes per partition")
+                    .ofType(Integer.class)
+                    .defaultsTo(16 * 1024);
+            messageReaderOpt = parser.accepts("line-reader", "The class name 
of the class to use for reading lines from standard in. " +
+                            "By default each line is read as a separate 
message.")
+                    .withRequiredArg()
+                    .describedAs("reader_class")
+                    .ofType(String.class)
+                    .defaultsTo(LineMessageReader.class.getName());
+            socketBufferSizeOpt = parser.accepts("socket-buffer-size", "The 
size of the tcp RECV size. " +
+                            "This is the option to control `send.buffer.bytes` 
in producer configs.")
+                    .withRequiredArg()
+                    .describedAs("size")
+                    .ofType(Integer.class)
+                    .defaultsTo(1024 * 100);
+            propertyOpt = parser.accepts("property",
+                            "A mechanism to pass user-defined properties in 
the form key=value to the message reader. This allows custom configuration for 
a user-defined message reader." +
+                                    "\nDefault properties include:" +
+                                    "\n parse.key=false" +
+                                    "\n parse.headers=false" +
+                                    "\n ignore.error=false" +
+                                    "\n key.separator=\\t" +
+                                    "\n headers.delimiter=\\t" +
+                                    "\n headers.separator=," +
+                                    "\n headers.key.separator=:" +
+                                    "\n null.marker=   When set, any fields 
(key, value and headers) equal to this will be replaced by null" +
+                                    "\nDefault parsing pattern when:" +
+                                    "\n parse.headers=true and 
parse.key=true:" +
+                                    "\n  \"h1:v1,h2:v2...\\tkey\\tvalue\"" +
+                                    "\n parse.key=true:" +
+                                    "\n  \"key\\tvalue\"" +
+                                    "\n parse.headers=true:" +
+                                    "\n  \"h1:v1,h2:v2...\\tvalue\"")
+                    .withRequiredArg()
+                    .describedAs("prop")
+                    .ofType(String.class);
+            readerConfigOpt = parser.accepts("reader-config", "Config 
properties file for the message reader. Note that " + propertyOpt + " takes 
precedence over this config.")
+                    .withRequiredArg()
+                    .describedAs("config file")
+                    .ofType(String.class);
+            producerPropertyOpt = parser.accepts("producer-property", "A 
mechanism to pass user-defined properties in the form key=value to the 
producer. ")
+                    .withRequiredArg()
+                    .describedAs("producer_prop")
+                    .ofType(String.class);
+            producerConfigOpt = parser.accepts("producer.config", "Producer 
config properties file. Note that " + producerPropertyOpt + " takes precedence 
over this config.")
+                    .withRequiredArg()
+                    .describedAs("config file")
+                    .ofType(String.class);
+
+            try {
+                options = parser.parse(args);
+            } catch (OptionException e) {
+                CommandLineUtils.printUsageAndExit(parser, e.getMessage());
+            }
+
+            checkArgs();
+        }
+
+        void checkArgs() {
+            CommandLineUtils.maybePrintHelpOrVersion(this, "This tool helps to 
read data from standard input and publish it to Kafka.");
+
+            CommandLineUtils.checkRequiredArgs(parser, options, topicOpt);
+
+            try {
+                
ToolsUtils.validateBootstrapServer(options.valueOf(bootstrapServerOpt));
+            } catch (IllegalArgumentException e) {
+                CommandLineUtils.printUsageAndExit(parser, e.getMessage());
+            }
+        }
+
+        boolean sync() {
+            return options.has(syncOpt);
+        }
+
+        String compressionCodec() {
+            if (options.has(compressionCodecOpt)) {
+                String codecOptValue = options.valueOf(compressionCodecOpt);
+                // Defaults to gzip if no value is provided.
+                return codecOptValue == null || codecOptValue.isEmpty() ? 
CompressionType.GZIP.name : codecOptValue;
+            }
+
+            return CompressionType.NONE.name;
+        }
+
+        String readerClass() {
+            return options.valueOf(messageReaderOpt);
+        }
+
+        Map<String, String> readerProps() throws IOException {
+            Map<String, String> properties = new HashMap<>();
+
+            if (options.has(readerConfigOpt)) {
+                
properties.putAll(propsToStringMap(loadProps(options.valueOf(readerConfigOpt))));
+            }
+
+            properties.put("topic", options.valueOf(topicOpt));
+            
properties.putAll(propsToStringMap(parseKeyValueArgs(options.valuesOf(propertyOpt))));
+
+            return properties;
+        }
+
+        Properties producerProps() throws IOException {
+            Properties props = new Properties();
+
+            if (options.has(producerConfigOpt)) {
+                props.putAll(loadProps(options.valueOf(producerConfigOpt)));
+            }
+
+            
props.putAll(parseKeyValueArgs(options.valuesOf(producerPropertyOpt)));
+            props.put(BOOTSTRAP_SERVERS_CONFIG, 
options.valueOf(bootstrapServerOpt));
+            props.put(COMPRESSION_TYPE_CONFIG, compressionCodec());
+
+            if (props.getProperty(CLIENT_ID_CONFIG) == null) {
+                props.put(CLIENT_ID_CONFIG, "console-producer");
+            }
+
+            props.put(KEY_SERIALIZER_CLASS_CONFIG, 
"org.apache.kafka.common.serialization.ByteArraySerializer");
+            props.put(VALUE_SERIALIZER_CLASS_CONFIG, 
"org.apache.kafka.common.serialization.ByteArraySerializer");
+
+            CommandLineUtils.maybeMergeOptions(props, LINGER_MS_CONFIG, 
options, sendTimeoutOpt);
+            CommandLineUtils.maybeMergeOptions(props, ACKS_CONFIG, options, 
requestRequiredAcksOpt);
+            CommandLineUtils.maybeMergeOptions(props, 
REQUEST_TIMEOUT_MS_CONFIG, options, requestTimeoutMsOpt);
+            CommandLineUtils.maybeMergeOptions(props, RETRIES_CONFIG, options, 
messageSendMaxRetriesOpt);
+            CommandLineUtils.maybeMergeOptions(props, RETRY_BACKOFF_MS_CONFIG, 
options, retryBackoffMsOpt);
+            CommandLineUtils.maybeMergeOptions(props, SEND_BUFFER_CONFIG, 
options, socketBufferSizeOpt);
+            CommandLineUtils.maybeMergeOptions(props, BUFFER_MEMORY_CONFIG, 
options, maxMemoryBytesOpt);
+            // We currently have 2 options to set the batch.size value. We'll 
deprecate/remove one of them in KIP-717.
+            CommandLineUtils.maybeMergeOptions(props, BATCH_SIZE_CONFIG, 
options, batchSizeOpt);
+            CommandLineUtils.maybeMergeOptions(props, BATCH_SIZE_CONFIG, 
options, maxPartitionMemoryBytesOpt);
+            CommandLineUtils.maybeMergeOptions(props, METADATA_MAX_AGE_CONFIG, 
options, metadataExpiryMsOpt);
+            CommandLineUtils.maybeMergeOptions(props, MAX_BLOCK_MS_CONFIG, 
options, maxBlockMsOpt);
+
+            return props;
+        }
+    }
+}
diff --git a/tools/src/main/java/org/apache/kafka/tools/LineMessageReader.java 
b/tools/src/main/java/org/apache/kafka/tools/LineMessageReader.java
new file mode 100644
index 00000000000..13f90d3745e
--- /dev/null
+++ b/tools/src/main/java/org/apache/kafka/tools/LineMessageReader.java
@@ -0,0 +1,218 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ *    http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.kafka.tools;
+
+import org.apache.kafka.clients.producer.ProducerRecord;
+import org.apache.kafka.common.KafkaException;
+import org.apache.kafka.common.header.Header;
+import org.apache.kafka.common.header.internals.RecordHeader;
+import org.apache.kafka.tools.api.RecordReader;
+
+import java.io.BufferedReader;
+import java.io.IOException;
+import java.io.InputStream;
+import java.io.InputStreamReader;
+import java.nio.charset.StandardCharsets;
+import java.util.Iterator;
+import java.util.Map;
+import java.util.NoSuchElementException;
+import java.util.regex.Pattern;
+
+import static java.util.Arrays.stream;
+
+/**
+ * The default implementation of {@link RecordReader} for the {@link 
ConsoleProducer}. This reader comes with
+ * the ability to parse a record's headers, key and value based on 
configurable separators. The reader configuration
+ * is defined as follows:
+ * <p></p>
+ * <pre>
+ *    parse.key             : indicates if a record's key is included in a 
line input and needs to be parsed. (default: false).
+ *    key.separator         : the string separating a record's key from its 
value. (default: \t).
+ *    parse.headers         : indicates if record headers are included in a 
line input and need to be parsed. (default: false).
+ *    headers.delimiter     : the string separating the list of headers from 
the record key. (default: \t).
+ *    headers.separator     : the string separating headers. (default: ,).
+ *    headers.key.separator : the string separating the key and value within a 
header. (default: :).
+ *    ignore.error          : whether best attempts should be made to ignore 
parsing errors. (default: false).
+ *    null.marker           : record key, record value, header key, header 
value which match this marker are replaced by null. (default: null).
+ * </pre>
+ */
+public class LineMessageReader implements RecordReader {
+    private String topic;
+    private boolean parseKey;
+    private String keySeparator = "\t";
+    private boolean parseHeaders;
+    private String headersDelimiter = "\t";
+    private String headersSeparator = ",";
+    private String headersKeySeparator = ":";
+    private boolean ignoreError;
+    private int lineNumber;
+    private final boolean printPrompt = System.console() != null;
+    private Pattern headersSeparatorPattern;
+    private String nullMarker;
+
+    @Override
+    public void configure(Map<String, ?> props) {
+        topic = props.get("topic").toString();
+        if (props.containsKey("parse.key"))
+            parseKey = 
props.get("parse.key").toString().trim().equalsIgnoreCase("true");
+        if (props.containsKey("key.separator"))
+            keySeparator = props.get("key.separator").toString();
+        if (props.containsKey("parse.headers"))
+            parseHeaders = 
props.get("parse.headers").toString().trim().equalsIgnoreCase("true");
+        if (props.containsKey("headers.delimiter"))
+            headersDelimiter = props.get("headers.delimiter").toString();
+        if (props.containsKey("headers.separator"))
+            headersSeparator = props.get("headers.separator").toString();
+        headersSeparatorPattern = Pattern.compile(headersSeparator);
+        if (props.containsKey("headers.key.separator"))
+            headersKeySeparator = 
props.get("headers.key.separator").toString();
+        if (props.containsKey("ignore.error"))
+            ignoreError = 
props.get("ignore.error").toString().trim().equalsIgnoreCase("true");
+        if (headersDelimiter.equals(headersSeparator))
+            throw new KafkaException("headers.delimiter and headers.separator 
may not be equal");
+        if (headersDelimiter.equals(headersKeySeparator))
+            throw new KafkaException("headers.delimiter and 
headers.key.separator may not be equal");
+        if (headersSeparator.equals(headersKeySeparator))
+            throw new KafkaException("headers.separator and 
headers.key.separator may not be equal");
+        if (props.containsKey("null.marker"))
+            nullMarker = props.get("null.marker").toString();
+        if (keySeparator.equals(nullMarker))
+            throw new KafkaException("null.marker and key.separator may not be 
equal");
+        if (headersSeparator.equals(nullMarker))
+            throw new KafkaException("null.marker and headers.separator may 
not be equal");
+        if (headersDelimiter.equals(nullMarker))
+            throw new KafkaException("null.marker and headers.delimiter may 
not be equal");
+        if (headersKeySeparator.equals(nullMarker))
+            throw new KafkaException("null.marker and headers.key.separator 
may not be equal");
+    }
+
+    @Override
+    public Iterator<ProducerRecord<byte[], byte[]>> readRecords(InputStream 
inputStream) {
+        return new Iterator<ProducerRecord<byte[], byte[]>>() {
+            private final BufferedReader reader = new BufferedReader(new 
InputStreamReader(inputStream, StandardCharsets.UTF_8));
+            private ProducerRecord<byte[], byte[]> current;
+
+            @Override
+            public boolean hasNext() {
+                if (current != null) {
+                    return true;
+                } else {
+                    lineNumber += 1;
+                    if (printPrompt) {
+                        System.out.print(">");
+                    }
+
+                    String line;
+                    try {
+                        line = reader.readLine();
+                    } catch (IOException e) {
+                        throw new KafkaException(e);
+                    }
+
+                    if (line == null) {
+                        current = null;
+                    } else {
+                        String headers = parse(parseHeaders, line, 0, 
headersDelimiter, "headers delimiter");
+                        int headerOffset = headers == null ? 0 : 
headers.length() + headersDelimiter.length();
+
+                        String key = parse(parseKey, line, headerOffset, 
keySeparator, "key separator");
+                        int keyOffset = key == null ? 0 : key.length() + 
keySeparator.length();
+
+                        String value = line.substring(headerOffset + 
keyOffset);
+
+                        ProducerRecord<byte[], byte[]> record = new 
ProducerRecord<>(
+                                topic,
+                                key != null && !key.equals(nullMarker) ? 
key.getBytes(StandardCharsets.UTF_8) : null,
+                                value != null && !value.equals(nullMarker) ? 
value.getBytes(StandardCharsets.UTF_8) : null
+                        );
+
+                        if (headers != null && !headers.equals(nullMarker)) {
+                            stream(splitHeaders(headers)).forEach(header -> 
record.headers().add(header.key(), header.value()));
+                        }
+                        current = record;
+                    }
+
+                    return current != null;
+                }
+            }
+
+            @Override
+            public ProducerRecord<byte[], byte[]> next() {
+                if (!hasNext()) {
+                    throw new NoSuchElementException("no more record");
+                } else {
+                    try {
+                        return current;
+                    } finally {
+                        current = null;
+                    }
+                }
+            }
+        };
+    }
+
+    private String parse(boolean enabled, String line, int startIndex, String 
demarcation, String demarcationName) {
+        if (!enabled) {
+            return null;
+        }
+        int index = line.indexOf(demarcation, startIndex);
+        if (index == -1) {
+            if (ignoreError) {
+                return null;
+            }
+            throw new KafkaException("No " + demarcationName + " found on line 
number " + lineNumber + ": '" + line + "'");
+        }
+        return line.substring(startIndex, index);
+    }
+
+    private Header[] splitHeaders(String headers) {
+        return stream(headersSeparatorPattern.split(headers))
+                .map(pair -> {
+                    int i = pair.indexOf(headersKeySeparator);
+                    if (i == -1) {
+                        if (ignoreError) {
+                            return new RecordHeader(pair, null);
+                        }
+                        throw new KafkaException("No header key separator 
found in pair '" + pair + "' on line number " + lineNumber);
+                    }
+
+                    String headerKey = pair.substring(0, i);
+                    if (headerKey.equals(nullMarker)) {
+                        throw new KafkaException("Header keys should not be 
equal to the null marker '" + nullMarker + "' as they can't be null");
+                    }
+
+                    String value = pair.substring(i + 
headersKeySeparator.length());
+                    byte[] headerValue = value.equals(nullMarker) ? null : 
value.getBytes(StandardCharsets.UTF_8);
+                    return new RecordHeader(headerKey, headerValue);
+                }).toArray(Header[]::new);
+    }
+
+    // Visible for testing
+    String keySeparator() {
+        return keySeparator;
+    }
+
+    // Visible for testing
+    boolean parseKey() {
+        return parseKey;
+    }
+
+    // Visible for testing
+    boolean parseHeaders() {
+        return parseHeaders;
+    }
+}
diff --git a/tools/src/main/java/org/apache/kafka/tools/ToolsUtils.java 
b/tools/src/main/java/org/apache/kafka/tools/ToolsUtils.java
index bbc30bea01e..57239ee4b89 100644
--- a/tools/src/main/java/org/apache/kafka/tools/ToolsUtils.java
+++ b/tools/src/main/java/org/apache/kafka/tools/ToolsUtils.java
@@ -161,8 +161,7 @@ public class ToolsUtils {
     /**
      * This is a simple wrapper around `CommandLineUtils.printUsageAndExit`.
      * It is needed for tools migration (KAFKA-14525), as there is no Java 
equivalent for return type `Nothing`.
-     * Can be removed once [[kafka.tools.ConsoleConsumer]]
-     * and [[kafka.tools.ConsoleProducer]] are migrated.
+     * Can be removed once [[kafka.tools.ConsoleConsumer]] are migrated.
      *
      * @param parser Command line options parser.
      * @param message Error message.
diff --git 
a/tools/src/test/java/org/apache/kafka/tools/ConsoleProducerTest.java 
b/tools/src/test/java/org/apache/kafka/tools/ConsoleProducerTest.java
new file mode 100644
index 00000000000..3804224ef17
--- /dev/null
+++ b/tools/src/test/java/org/apache/kafka/tools/ConsoleProducerTest.java
@@ -0,0 +1,252 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ *    http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.kafka.tools;
+
+import kafka.utils.TestUtils;
+
+import org.apache.kafka.clients.producer.Producer;
+import org.apache.kafka.clients.producer.ProducerConfig;
+import org.apache.kafka.clients.producer.ProducerRecord;
+import org.apache.kafka.common.utils.Exit;
+import org.apache.kafka.tools.ConsoleProducer.ConsoleProducerOptions;
+import org.apache.kafka.tools.api.RecordReader;
+
+import org.junit.jupiter.api.Test;
+import org.mockito.Mockito;
+
+import java.io.File;
+import java.io.IOException;
+import java.io.InputStream;
+import java.io.OutputStream;
+import java.nio.file.Files;
+import java.util.Collections;
+import java.util.Iterator;
+import java.util.Map;
+
+import static java.util.Arrays.asList;
+import static org.junit.jupiter.api.Assertions.assertEquals;
+import static org.junit.jupiter.api.Assertions.assertThrows;
+import static org.junit.jupiter.api.Assertions.assertTrue;
+
+public class ConsoleProducerTest {
+    private static final String[] BOOTSTRAP_SERVER_VALID_ARGS = new String[]{
+        "--bootstrap-server", "localhost:1003,localhost:1004",
+        "--topic", "t3",
+        "--property", "parse.key=true",
+        "--property", "key.separator=#"
+    };
+    private static final String[] INVALID_ARGS = new String[]{
+        "--t", // not a valid argument
+        "t3"
+    };
+    private static final String[] BOOTSTRAP_SERVER_OVERRIDE = new String[]{
+        "--bootstrap-server", "localhost:1002",
+        "--topic", "t3",
+    };
+    private static final String[] CLIENT_ID_OVERRIDE = new String[]{
+        "--bootstrap-server", "localhost:1001",
+        "--topic", "t3",
+        "--producer-property", "client.id=producer-1"
+    };
+    private static final String[] 
BATCH_SIZE_OVERRIDDEN_BY_MAX_PARTITION_MEMORY_BYTES_VALUE = new String[]{
+        "--bootstrap-server", "localhost:1002",
+        "--topic", "t3",
+        "--batch-size", "123",
+        "--max-partition-memory-bytes", "456"
+    };
+    private static final String[] 
BATCH_SIZE_SET_AND_MAX_PARTITION_MEMORY_BYTES_NOT_SET = new String[]{
+        "--bootstrap-server", "localhost:1002",
+        "--topic", "t3",
+        "--batch-size", "123"
+    };
+    private static final String[] 
BATCH_SIZE_NOT_SET_AND_MAX_PARTITION_MEMORY_BYTES_SET = new String[]{
+        "--bootstrap-server", "localhost:1002",
+        "--topic", "t3",
+        "--max-partition-memory-bytes", "456"
+    };
+    private static final String[] BATCH_SIZE_DEFAULT = new String[]{
+        "--bootstrap-server", "localhost:1002",
+        "--topic", "t3"
+    };
+    private static final String[] TEST_RECORD_READER = new String[]{
+        "--bootstrap-server", "localhost:1002",
+        "--topic", "t3",
+        "--line-reader", TestRecordReader.class.getName()
+    };
+
+    @Test
+    public void testValidConfigsBootstrapServer() throws IOException {
+        ConsoleProducerOptions opts = new 
ConsoleProducerOptions(BOOTSTRAP_SERVER_VALID_ARGS);
+        ProducerConfig producerConfig = new 
ProducerConfig(opts.producerProps());
+
+        assertEquals(asList("localhost:1003", "localhost:1004"), 
producerConfig.getList(ProducerConfig.BOOTSTRAP_SERVERS_CONFIG));
+    }
+
+    @Test
+    public void testInvalidConfigs() {
+        Exit.setExitProcedure((statusCode, message) -> {
+            throw new IllegalArgumentException(message);
+        });
+        try {
+            assertThrows(IllegalArgumentException.class, () -> new 
ConsoleProducerOptions(INVALID_ARGS));
+        } finally {
+            Exit.resetExitProcedure();
+        }
+    }
+
+    @Test
+    public void testParseKeyProp() throws ReflectiveOperationException, 
IOException {
+        ConsoleProducerOptions opts = new 
ConsoleProducerOptions(BOOTSTRAP_SERVER_VALID_ARGS);
+        LineMessageReader reader = (LineMessageReader) 
Class.forName(opts.readerClass()).getDeclaredConstructor().newInstance();
+        reader.configure(opts.readerProps());
+
+        assertEquals("#", reader.keySeparator());
+        assertTrue(reader.parseKey());
+    }
+
+    @Test
+    public void testParseReaderConfigFile() throws Exception {
+        File propsFile = TestUtils.tempFile();
+        OutputStream propsStream = Files.newOutputStream(propsFile.toPath());
+        propsStream.write("parse.key=true\n".getBytes());
+        propsStream.write("key.separator=|".getBytes());
+        propsStream.close();
+
+        String[] args = new String[]{
+            "--bootstrap-server", "localhost:9092",
+            "--topic", "test",
+            "--property", "key.separator=;",
+            "--property", "parse.headers=true",
+            "--reader-config", propsFile.getAbsolutePath()
+        };
+        ConsoleProducerOptions opts = new ConsoleProducerOptions(args);
+        LineMessageReader reader = (LineMessageReader) 
Class.forName(opts.readerClass()).getDeclaredConstructor().newInstance();
+        reader.configure(opts.readerProps());
+
+        assertEquals(";", reader.keySeparator());
+        assertTrue(reader.parseKey());
+        assertTrue(reader.parseHeaders());
+    }
+
+    @Test
+    public void testBootstrapServerOverride() throws IOException {
+        ConsoleProducerOptions opts = new 
ConsoleProducerOptions(BOOTSTRAP_SERVER_OVERRIDE);
+        ProducerConfig producerConfig = new 
ProducerConfig(opts.producerProps());
+
+        assertEquals(Collections.singletonList("localhost:1002"), 
producerConfig.getList(ProducerConfig.BOOTSTRAP_SERVERS_CONFIG));
+    }
+
+    @Test
+    public void testClientIdOverride() throws IOException {
+        ConsoleProducerOptions opts = new 
ConsoleProducerOptions(CLIENT_ID_OVERRIDE);
+        ProducerConfig producerConfig = new 
ProducerConfig(opts.producerProps());
+
+        assertEquals("producer-1", 
producerConfig.getString(ProducerConfig.CLIENT_ID_CONFIG));
+    }
+
+    @Test
+    public void testDefaultClientId() throws IOException {
+        ConsoleProducerOptions opts = new 
ConsoleProducerOptions(BOOTSTRAP_SERVER_VALID_ARGS);
+        ProducerConfig producerConfig = new 
ProducerConfig(opts.producerProps());
+
+        assertEquals("console-producer", 
producerConfig.getString(ProducerConfig.CLIENT_ID_CONFIG));
+    }
+
+    @Test
+    public void testBatchSizeOverriddenByMaxPartitionMemoryBytesValue() throws 
IOException {
+        ConsoleProducerOptions opts = new 
ConsoleProducerOptions(BATCH_SIZE_OVERRIDDEN_BY_MAX_PARTITION_MEMORY_BYTES_VALUE);
+        ProducerConfig producerConfig = new 
ProducerConfig(opts.producerProps());
+
+        assertEquals(456, 
producerConfig.getInt(ProducerConfig.BATCH_SIZE_CONFIG));
+    }
+
+    @Test
+    public void testBatchSizeSetAndMaxPartitionMemoryBytesNotSet() throws 
IOException {
+        ConsoleProducerOptions opts = new 
ConsoleProducerOptions(BATCH_SIZE_SET_AND_MAX_PARTITION_MEMORY_BYTES_NOT_SET);
+        ProducerConfig producerConfig = new 
ProducerConfig(opts.producerProps());
+
+        assertEquals(123, 
producerConfig.getInt(ProducerConfig.BATCH_SIZE_CONFIG));
+    }
+
+    @Test
+    public void testDefaultBatchSize() throws IOException {
+        ConsoleProducerOptions opts = new 
ConsoleProducerOptions(BATCH_SIZE_DEFAULT);
+        ProducerConfig producerConfig = new 
ProducerConfig(opts.producerProps());
+
+        assertEquals(16 * 1024, 
producerConfig.getInt(ProducerConfig.BATCH_SIZE_CONFIG));
+    }
+
+    @Test
+    public void testBatchSizeNotSetAndMaxPartitionMemoryBytesSet() throws 
IOException {
+        ConsoleProducerOptions opts = new 
ConsoleProducerOptions(BATCH_SIZE_NOT_SET_AND_MAX_PARTITION_MEMORY_BYTES_SET);
+        ProducerConfig producerConfig = new 
ProducerConfig(opts.producerProps());
+
+        assertEquals(456, 
producerConfig.getInt(ProducerConfig.BATCH_SIZE_CONFIG));
+    }
+
+    @Test
+    public void testNewReader() throws Exception {
+        ConsoleProducer producer = new ConsoleProducer();
+        TestRecordReader reader = (TestRecordReader) 
producer.messageReader(new ConsoleProducerOptions(TEST_RECORD_READER));
+
+        assertEquals(1, reader.configureCount());
+        assertEquals(0, reader.closeCount());
+
+        reader.close();
+        assertEquals(1, reader.closeCount());
+    }
+
+    @Test
+    @SuppressWarnings("unchecked")
+    public void testLoopReader() throws Exception {
+        ConsoleProducer producer = new ConsoleProducer();
+        TestRecordReader reader = (TestRecordReader) 
producer.messageReader(new ConsoleProducerOptions(TEST_RECORD_READER));
+
+        producer.loopReader(Mockito.mock(Producer.class), reader, false);
+
+        assertEquals(1, reader.configureCount());
+        assertEquals(1, reader.closeCount());
+    }
+
+    public static class TestRecordReader implements RecordReader {
+        private int configureCount = 0;
+        private int closeCount = 0;
+
+        @Override
+        public void configure(Map<String, ?> configs) {
+            configureCount += 1;
+        }
+
+        @Override
+        public Iterator<ProducerRecord<byte[], byte[]>> 
readRecords(InputStream inputStream) {
+            return Collections.emptyIterator();
+        }
+
+        @Override
+        public void close() {
+            closeCount += 1;
+        }
+
+        public int configureCount() {
+            return configureCount;
+        }
+
+        public int closeCount() {
+            return closeCount;
+        }
+    }
+}
diff --git 
a/tools/src/test/java/org/apache/kafka/tools/LineMessageReaderTest.java 
b/tools/src/test/java/org/apache/kafka/tools/LineMessageReaderTest.java
new file mode 100644
index 00000000000..5864869df1a
--- /dev/null
+++ b/tools/src/test/java/org/apache/kafka/tools/LineMessageReaderTest.java
@@ -0,0 +1,403 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ *    http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.kafka.tools;
+
+import org.apache.kafka.clients.producer.ProducerRecord;
+import org.apache.kafka.common.KafkaException;
+import org.apache.kafka.common.header.Header;
+import org.apache.kafka.common.header.internals.RecordHeader;
+import org.apache.kafka.tools.api.RecordReader;
+
+import org.junit.jupiter.api.Test;
+
+import java.io.ByteArrayInputStream;
+import java.util.Collections;
+import java.util.Iterator;
+import java.util.List;
+import java.util.NoSuchElementException;
+import java.util.Properties;
+
+import static java.nio.charset.StandardCharsets.UTF_8;
+import static java.util.Arrays.asList;
+import static java.util.Collections.singletonList;
+import static org.apache.kafka.common.utils.Utils.propsToStringMap;
+import static org.junit.jupiter.api.Assertions.assertArrayEquals;
+import static org.junit.jupiter.api.Assertions.assertEquals;
+import static org.junit.jupiter.api.Assertions.assertFalse;
+import static org.junit.jupiter.api.Assertions.assertThrows;
+
+public class LineMessageReaderTest {
+    @Test
+    public void testLineReader() {
+        String input = "key0\tvalue0\nkey1\tvalue1";
+
+        Properties props = defaultTestProps();
+        props.put("parse.headers", "false");
+
+        runTest(props, input, record("key0", "value0"), record("key1", 
"value1"));
+    }
+
+    @Test
+    public void testLineReaderHeader() {
+        String input = 
"headerKey0:headerValue0,headerKey1:headerValue1\tkey0\tvalue0\n";
+        ProducerRecord<String, String> expected = record(
+                "key0",
+                "value0",
+                asList(
+                        new RecordHeader("headerKey0", 
"headerValue0".getBytes(UTF_8)),
+                        new RecordHeader("headerKey1", 
"headerValue1".getBytes(UTF_8))
+                )
+        );
+        runTest(defaultTestProps(), input, expected);
+    }
+
+    @Test
+    public void testMinimalValidInputWithHeaderKeyAndValue() {
+        runTest(defaultTestProps(), ":\t\t", record("", "", singletonList(new 
RecordHeader("", "".getBytes(UTF_8)))));
+    }
+
+    @Test
+    public void testKeyMissingValue() {
+        Properties props = defaultTestProps();
+        props.put("parse.headers", "false");
+        runTest(props, "key\t", record("key", ""));
+    }
+
+    @Test
+    public void testDemarcationsLongerThanOne() {
+        Properties props = defaultTestProps();
+        props.put("key.separator", "\t\t");
+        props.put("headers.delimiter", "\t\t");
+        props.put("headers.separator", "---");
+        props.put("headers.key.separator", "::::");
+
+        runTest(
+                props,
+                
"headerKey0.0::::headerValue0.0---headerKey1.0::::\t\tkey\t\tvalue",
+                record("key",
+                        "value",
+                        asList(
+                                new RecordHeader("headerKey0.0", 
"headerValue0.0".getBytes(UTF_8)),
+                                new RecordHeader("headerKey1.0", 
"".getBytes(UTF_8))
+                        )
+                )
+        );
+    }
+
+    @Test
+    public void testLineReaderHeaderNoKey() {
+        String input = "headerKey:headerValue\tvalue\n";
+
+        Properties props = defaultTestProps();
+        props.put("parse.key", "false");
+
+        runTest(props, input, record(null, "value", singletonList(new 
RecordHeader("headerKey", "headerValue".getBytes(UTF_8)))));
+    }
+
+    @Test
+    public void testLineReaderOnlyValue() {
+        Properties props = defaultTestProps();
+        props.put("parse.key", "false");
+        props.put("parse.headers", "false");
+
+        runTest(props, "value\n", record(null, "value"));
+    }
+
+    @Test
+    public void 
testParseHeaderEnabledWithCustomDelimiterAndVaryingNumberOfKeyValueHeaderPairs()
 {
+        Properties props = defaultTestProps();
+        props.put("key.separator", "#");
+        props.put("headers.delimiter", "!");
+        props.put("headers.separator", "&");
+        props.put("headers.key.separator", ":");
+
+        String input =
+                
"headerKey0.0:headerValue0.0&headerKey0.1:headerValue0.1!key0#value0\n" +
+                        "headerKey1.0:headerValue1.0!key1#value1";
+
+        ProducerRecord<String, String> record0 = record(
+                "key0",
+                "value0",
+                asList(
+                        new RecordHeader("headerKey0.0", 
"headerValue0.0".getBytes(UTF_8)),
+                        new RecordHeader("headerKey0.1", 
"headerValue0.1".getBytes(UTF_8))
+                )
+        );
+        ProducerRecord<String, String> record1 = record(
+                "key1",
+                "value1",
+                singletonList(new RecordHeader("headerKey1.0", 
"headerValue1.0".getBytes(UTF_8)))
+        );
+
+        runTest(props, input, record0, record1);
+    }
+
+    @Test
+    public void testMissingKeySeparator() {
+        RecordReader lineReader = new LineMessageReader();
+        String input =
+                
"headerKey0.0:headerValue0.0,headerKey0.1:headerValue0.1\tkey0\tvalue0\n" +
+                        
"headerKey1.0:headerValue1.0\tkey1[MISSING-DELIMITER]value1";
+
+        lineReader.configure(propsToStringMap(defaultTestProps()));
+        Iterator<ProducerRecord<byte[], byte[]>> iter = 
lineReader.readRecords(new ByteArrayInputStream(input.getBytes()));
+        iter.next();
+
+        KafkaException expectedException = assertThrows(KafkaException.class, 
iter::next);
+
+        assertEquals(
+                "No key separator found on line number 2: 
'headerKey1.0:headerValue1.0\tkey1[MISSING-DELIMITER]value1'",
+                expectedException.getMessage()
+        );
+    }
+
+    @Test
+    public void testMissingHeaderKeySeparator() {
+        RecordReader lineReader = new LineMessageReader();
+        String input = "key[MISSING-DELIMITER]val\tkey0\tvalue0\n";
+        lineReader.configure(propsToStringMap(defaultTestProps()));
+        Iterator<ProducerRecord<byte[], byte[]>> iter = 
lineReader.readRecords(new ByteArrayInputStream(input.getBytes()));
+
+        KafkaException expectedException = assertThrows(KafkaException.class, 
iter::next);
+
+        assertEquals(
+                "No header key separator found in pair 
'key[MISSING-DELIMITER]val' on line number 1",
+                expectedException.getMessage()
+        );
+    }
+
+    @Test
+    public void testHeaderDemarcationCollision() {
+        Properties props = defaultTestProps();
+        props.put("headers.delimiter", "\t");
+        props.put("headers.separator", "\t");
+        props.put("headers.key.separator", "\t");
+
+        assertThrowsOnInvalidPatternConfig(props, "headers.delimiter and 
headers.separator may not be equal");
+
+        props.put("headers.separator", ",");
+        assertThrowsOnInvalidPatternConfig(props, "headers.delimiter and 
headers.key.separator may not be equal");
+
+        props.put("headers.key.separator", ",");
+        assertThrowsOnInvalidPatternConfig(props, "headers.separator and 
headers.key.separator may not be equal");
+    }
+
+    @Test
+    public void testIgnoreErrorInInput() {
+        String input =
+                "headerKey0.0:headerValue0.0\tkey0\tvalue0\n" +
+                        
"headerKey1.0:headerValue1.0,headerKey1.1:headerValue1.1[MISSING-HEADER-DELIMITER]key1\tvalue1\n"
 +
+                        
"headerKey2.0:headerValue2.0\tkey2[MISSING-KEY-DELIMITER]value2\n" +
+                        
"headerKey3.0:headerValue3.0[MISSING-HEADER-DELIMITER]key3[MISSING-KEY-DELIMITER]value3\n";
+
+        Properties props = defaultTestProps();
+        props.put("ignore.error", "true");
+
+        ProducerRecord<String, String> validRecord = record("key0", "value0",
+                singletonList(new RecordHeader("headerKey0.0", 
"headerValue0.0".getBytes(UTF_8))));
+
+        ProducerRecord<String, String> missingHeaderDelimiter = record(
+                null,
+                "value1",
+                asList(
+                        new RecordHeader("headerKey1.0", 
"headerValue1.0".getBytes(UTF_8)),
+                        new RecordHeader("headerKey1.1", 
"headerValue1.1[MISSING-HEADER-DELIMITER]key1".getBytes(UTF_8))
+                )
+        );
+
+        ProducerRecord<String, String> missingKeyDelimiter = record(
+                null,
+                "key2[MISSING-KEY-DELIMITER]value2",
+                singletonList(new RecordHeader("headerKey2.0", 
"headerValue2.0".getBytes(UTF_8)))
+        );
+
+        ProducerRecord<String, String> missingKeyHeaderDelimiter = record(
+                null,
+                
"headerKey3.0:headerValue3.0[MISSING-HEADER-DELIMITER]key3[MISSING-KEY-DELIMITER]value3",
+                Collections.emptyList()
+        );
+
+        runTest(props, input, validRecord, missingHeaderDelimiter, 
missingKeyDelimiter, missingKeyHeaderDelimiter);
+    }
+
+    @Test
+    public void testMalformedHeaderIgnoreError() {
+        String input = "key-val\tkey0\tvalue0\n";
+
+        Properties props = defaultTestProps();
+        props.put("ignore.error", "true");
+
+        ProducerRecord<String, String> expected = record("key0", "value0", 
singletonList(new RecordHeader("key-val", null)));
+
+        runTest(props, input, expected);
+    }
+
+    @Test
+    public void testNullMarker() {
+        String input =
+                "key\t\n" +
+                        "key\t<NULL>\n" +
+                        "key\t<NULL>value\n" +
+                        "<NULL>\tvalue\n" +
+                        "<NULL>\t<NULL>";
+
+        Properties props = defaultTestProps();
+        props.put("null.marker", "<NULL>");
+        props.put("parse.headers", "false");
+        runTest(props, input,
+                record("key", ""),
+                record("key", null),
+                record("key", "<NULL>value"),
+                record(null, "value"),
+                record(null, null));
+
+        // If the null marker is not set
+        props.remove("null.marker");
+        runTest(props, input,
+                record("key", ""),
+                record("key", "<NULL>"),
+                record("key", "<NULL>value"),
+                record("<NULL>", "value"),
+                record("<NULL>", "<NULL>"));
+    }
+
+    @Test
+    public void testNullMarkerWithHeaders() {
+        String input =
+                "h0:v0,h1:v1\t<NULL>\tvalue\n" +
+                        "<NULL>\tkey\t<NULL>\n" +
+                        "h0:,h1:v1\t<NULL>\t<NULL>\n" +
+                        "h0:<NULL>,h1:v1\tkey\t<NULL>\n" +
+                        "h0:<NULL>,h1:<NULL>value\tkey\t<NULL>\n";
+        Header header = new RecordHeader("h1", "v1".getBytes(UTF_8));
+
+        Properties props = defaultTestProps();
+        props.put("null.marker", "<NULL>");
+        runTest(props, input,
+                record(null, "value", asList(new RecordHeader("h0", 
"v0".getBytes(UTF_8)), header)),
+                record("key", null),
+                record(null, null, asList(new RecordHeader("h0", 
"".getBytes(UTF_8)), header)),
+                record("key", null, asList(new RecordHeader("h0", null), 
header)),
+                record("key", null, asList(new RecordHeader("h0", null), new 
RecordHeader("h1", "<NULL>value".getBytes(UTF_8))))
+        );
+
+        // If the null marker is not set
+        RecordReader lineReader = new LineMessageReader();
+        props.remove("null.marker");
+        lineReader.configure(propsToStringMap(props));
+        Iterator<ProducerRecord<byte[], byte[]>> iter = 
lineReader.readRecords(new ByteArrayInputStream(input.getBytes()));
+        assertRecordEquals(record("<NULL>", "value", asList(new 
RecordHeader("h0", "v0".getBytes(UTF_8)), header)), iter.next());
+        // line 2 is not valid anymore
+        KafkaException expectedException = assertThrows(KafkaException.class, 
iter::next);
+        assertEquals(
+                "No header key separator found in pair '<NULL>' on line number 
2",
+                expectedException.getMessage()
+        );
+        assertRecordEquals(record("<NULL>", "<NULL>", asList(new 
RecordHeader("h0", "".getBytes(UTF_8)), header)), iter.next());
+        assertRecordEquals(record("key", "<NULL>", asList(new 
RecordHeader("h0", "<NULL>".getBytes(UTF_8)), header)), iter.next());
+        assertRecordEquals(record("key", "<NULL>", asList(
+                new RecordHeader("h0", "<NULL>".getBytes(UTF_8)),
+                new RecordHeader("h1", "<NULL>value".getBytes(UTF_8)))), 
iter.next()
+        );
+    }
+
+    @Test
+    public void testNullMarkerHeaderKeyThrows() {
+        String input = "<NULL>:v0,h1:v1\tkey\tvalue\n";
+
+        Properties props = defaultTestProps();
+        props.put("null.marker", "<NULL>");
+        RecordReader lineReader = new LineMessageReader();
+        lineReader.configure(propsToStringMap(props));
+        Iterator<ProducerRecord<byte[], byte[]>> iter = 
lineReader.readRecords(new ByteArrayInputStream(input.getBytes()));
+        KafkaException expectedException = assertThrows(KafkaException.class, 
iter::next);
+        assertEquals(
+                "Header keys should not be equal to the null marker '<NULL>' 
as they can't be null",
+                expectedException.getMessage()
+        );
+
+        // If the null marker is not set
+        props.remove("null.marker");
+        runTest(props, input, record("key", "value", asList(
+                new RecordHeader("<NULL>", "v0".getBytes(UTF_8)),
+                new RecordHeader("h1", "v1".getBytes(UTF_8))))
+        );
+    }
+
+    @Test
+    public void testInvalidNullMarker() {
+        Properties props = defaultTestProps();
+        props.put("headers.delimiter", "-");
+        props.put("headers.separator", ":");
+        props.put("headers.key.separator", "/");
+
+        props.put("null.marker", "-");
+        assertThrowsOnInvalidPatternConfig(props, "null.marker and 
headers.delimiter may not be equal");
+
+        props.put("null.marker", ":");
+        assertThrowsOnInvalidPatternConfig(props, "null.marker and 
headers.separator may not be equal");
+
+        props.put("null.marker", "/");
+        assertThrowsOnInvalidPatternConfig(props, "null.marker and 
headers.key.separator may not be equal");
+    }
+
+    private Properties defaultTestProps() {
+        Properties props = new Properties();
+        props.put("topic", "topic");
+        props.put("parse.key", "true");
+        props.put("parse.headers", "true");
+
+        return props;
+    }
+
+    private void assertThrowsOnInvalidPatternConfig(Properties props, String 
expectedMessage) {
+        KafkaException exception = assertThrows(KafkaException.class, () -> 
new LineMessageReader().configure(propsToStringMap(props)));
+        assertEquals(expectedMessage, exception.getMessage());
+    }
+
+    @SafeVarargs
+    private final void runTest(Properties props, String input, 
ProducerRecord<String, String>... expectedRecords) {
+        RecordReader lineReader = new LineMessageReader();
+        lineReader.configure(propsToStringMap(props));
+        Iterator<ProducerRecord<byte[], byte[]>> iter = 
lineReader.readRecords(new ByteArrayInputStream(input.getBytes()));
+
+        for (ProducerRecord<String, String> record : expectedRecords) {
+            assertRecordEquals(record, iter.next());
+        }
+
+        assertFalse(iter.hasNext());
+        assertThrows(NoSuchElementException.class, iter::next);
+    }
+
+    //  The equality method of ProducerRecord compares memory references for 
the header iterator, this is why this custom equality check is used.
+    private <K, V> void assertRecordEquals(ProducerRecord<K, V> expected, 
ProducerRecord<byte[], byte[]> actual) {
+        assertEquals(expected.key(), actual.key() == null ? null : new 
String(actual.key()));
+        assertEquals(expected.value(), actual.value() == null ? null : new 
String(actual.value()));
+        assertArrayEquals(expected.headers().toArray(), 
actual.headers().toArray());
+    }
+
+    private <K, V> ProducerRecord<K, V> record(K key, V value, List<Header> 
headers) {
+        ProducerRecord<K, V> record = new ProducerRecord<>("topic", key, 
value);
+        headers.forEach(h -> record.headers().add(h.key(), h.value()));
+
+        return record;
+    }
+
+    private <K, V> ProducerRecord<K, V> record(K key, V value) {
+        return new ProducerRecord<>("topic", key, value);
+    }
+}
diff --git 
a/tools/tools-api/src/main/java/org/apache/kafka/tools/api/RecordReader.java 
b/tools/tools-api/src/main/java/org/apache/kafka/tools/api/RecordReader.java
index 305e68af7ad..f8340ddcda1 100644
--- a/tools/tools-api/src/main/java/org/apache/kafka/tools/api/RecordReader.java
+++ b/tools/tools-api/src/main/java/org/apache/kafka/tools/api/RecordReader.java
@@ -28,7 +28,7 @@ import java.util.Map;
  * Typical implementations of this interface convert data from an 
`InputStream` received via `readRecords` into a
  * iterator of `ProducerRecord` instance. Note that implementations must have 
a public nullary constructor.
  *
- * This is used by the `kafka.tools.ConsoleProducer`.
+ * This is used by the `org.apache.kafka.tools.ConsoleProducer`.
  */
 public interface RecordReader extends Closeable, Configurable {
 

Reply via email to