This is an automated email from the ASF dual-hosted git repository.
mimaison pushed a commit to branch trunk
in repository https://gitbox.apache.org/repos/asf/kafka.git
The following commit(s) were added to refs/heads/trunk by this push:
new 672c6172334 KAFKA-14577: Move ConsoleProducer to tools module (#17157)
672c6172334 is described below
commit 672c6172334f5c1c25702abaa174eb25858af286
Author: Dmitry Werner <[email protected]>
AuthorDate: Mon Oct 7 14:19:59 2024 +0500
KAFKA-14577: Move ConsoleProducer to tools module (#17157)
Reviewers: Mickael Maison <[email protected]>, Federico Valeri
<[email protected]>
---
bin/kafka-console-producer.sh | 2 +-
bin/windows/kafka-console-producer.bat | 2 +-
checkstyle/import-control.xml | 2 +
checkstyle/suppressions.xml | 2 +-
.../main/scala/kafka/tools/ConsoleProducer.scala | 426 ---------------------
core/src/main/scala/kafka/utils/ToolsUtils.scala | 16 +-
.../unit/kafka/tools/ConsoleProducerTest.scala | 259 -------------
.../unit/kafka/tools/LineMessageReaderTest.scala | 354 -----------------
.../native-image-configs/reflect-config.json | 2 +-
.../org/apache/kafka/tools/ConsoleProducer.java | 349 +++++++++++++++++
.../org/apache/kafka/tools/LineMessageReader.java | 218 +++++++++++
.../java/org/apache/kafka/tools/ToolsUtils.java | 3 +-
.../apache/kafka/tools/ConsoleProducerTest.java | 252 ++++++++++++
.../apache/kafka/tools/LineMessageReaderTest.java | 403 +++++++++++++++++++
.../org/apache/kafka/tools/api/RecordReader.java | 2 +-
15 files changed, 1231 insertions(+), 1061 deletions(-)
diff --git a/bin/kafka-console-producer.sh b/bin/kafka-console-producer.sh
index e5187b8b533..a0929407c0a 100755
--- a/bin/kafka-console-producer.sh
+++ b/bin/kafka-console-producer.sh
@@ -17,4 +17,4 @@
if [ "x$KAFKA_HEAP_OPTS" = "x" ]; then
export KAFKA_HEAP_OPTS="-Xmx512M"
fi
-exec $(dirname $0)/kafka-run-class.sh kafka.tools.ConsoleProducer "$@"
+exec $(dirname $0)/kafka-run-class.sh org.apache.kafka.tools.ConsoleProducer
"$@"
diff --git a/bin/windows/kafka-console-producer.bat
b/bin/windows/kafka-console-producer.bat
index e1834bc5a85..d572c7af04e 100644
--- a/bin/windows/kafka-console-producer.bat
+++ b/bin/windows/kafka-console-producer.bat
@@ -16,5 +16,5 @@ rem limitations under the License.
SetLocal
set KAFKA_HEAP_OPTS=-Xmx512M
-"%~dp0kafka-run-class.bat" kafka.tools.ConsoleProducer %*
+"%~dp0kafka-run-class.bat" org.apache.kafka.tools.ConsoleProducer %*
EndLocal
diff --git a/checkstyle/import-control.xml b/checkstyle/import-control.xml
index 74c99f834e7..834d51b7a89 100644
--- a/checkstyle/import-control.xml
+++ b/checkstyle/import-control.xml
@@ -312,6 +312,8 @@
<allow pkg="scala.collection" />
<allow pkg="org.apache.kafka.coordinator.transaction" />
<allow pkg="org.apache.kafka.coordinator.group" />
+ <allow pkg="org.apache.kafka.tools" />
+ <allow pkg="org.apache.kafka.tools.api" />
<subpackage name="consumer">
<allow pkg="org.apache.kafka.tools"/>
diff --git a/checkstyle/suppressions.xml b/checkstyle/suppressions.xml
index 099d99a80de..4b243252f67 100644
--- a/checkstyle/suppressions.xml
+++ b/checkstyle/suppressions.xml
@@ -271,7 +271,7 @@
<suppress checks="BooleanExpressionComplexity"
files="(StreamsResetter|DefaultMessageFormatter).java"/>
<suppress checks="NPathComplexity"
-
files="(DefaultMessageFormatter|ProducerPerformance|StreamsResetter|Agent|TransactionalMessageCopier|ReplicaVerificationTool).java"/>
+
files="(DefaultMessageFormatter|ProducerPerformance|StreamsResetter|Agent|TransactionalMessageCopier|ReplicaVerificationTool|LineMessageReader).java"/>
<suppress checks="ImportControl"
files="SignalLogger.java"/>
<suppress checks="IllegalImport"
diff --git a/core/src/main/scala/kafka/tools/ConsoleProducer.scala
b/core/src/main/scala/kafka/tools/ConsoleProducer.scala
deleted file mode 100644
index 64df24e4270..00000000000
--- a/core/src/main/scala/kafka/tools/ConsoleProducer.scala
+++ /dev/null
@@ -1,426 +0,0 @@
-/**
- * Licensed to the Apache Software Foundation (ASF) under one or more
- * contributor license agreements. See the NOTICE file distributed with
- * this work for additional information regarding copyright ownership.
- * The ASF licenses this file to You under the Apache License, Version 2.0
- * (the "License"); you may not use this file except in compliance with
- * the License. You may obtain a copy of the License at
- *
- * http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
- */
-
-package kafka.tools
-
-import java.io._
-import java.nio.charset.StandardCharsets
-import java.util.Properties
-import java.util.regex.Pattern
-import joptsimple.{OptionException, OptionParser, OptionSet, OptionSpec}
-import kafka.utils.Implicits._
-import kafka.utils.{Logging, ToolsUtils}
-import org.apache.kafka.clients.producer.internals.ErrorLoggingCallback
-import org.apache.kafka.clients.producer.{KafkaProducer, Producer,
ProducerConfig, ProducerRecord}
-import org.apache.kafka.common.KafkaException
-import org.apache.kafka.common.record.CompressionType
-import org.apache.kafka.common.utils.{Exit, Utils}
-import org.apache.kafka.server.util.{CommandDefaultOptions, CommandLineUtils}
-import org.apache.kafka.tools.api.RecordReader
-
-import java.lang
-
-object ConsoleProducer extends Logging {
-
- private[tools] def newReader(className: String, prop: Properties):
RecordReader = {
- val reader =
Class.forName(className).getDeclaredConstructor().newInstance()
- reader match {
- case r: RecordReader =>
- r.configure(prop.asInstanceOf[java.util.Map[String, _]])
- r
- case _ => throw new IllegalArgumentException(f"the reader must implement
${classOf[RecordReader].getName}")
- }
- }
-
- private[tools] def loopReader(producer: Producer[Array[Byte], Array[Byte]],
- reader: RecordReader,
- inputStream: InputStream,
- sync: Boolean): Unit = {
- val iter = reader.readRecords(inputStream)
- try while (iter.hasNext) send(producer, iter.next(), sync) finally
reader.close()
- }
-
- def main(args: Array[String]): Unit = {
-
- try {
- val config = new ProducerConfig(args)
- val input = System.in
- val producer = new KafkaProducer[Array[Byte],
Array[Byte]](producerProps(config))
- try loopReader(producer, newReader(config.readerClass,
getReaderProps(config)), input, config.sync)
- finally producer.close()
- Exit.exit(0)
- } catch {
- case e: joptsimple.OptionException =>
- System.err.println(e.getMessage)
- Exit.exit(1)
- case e: Exception =>
- e.printStackTrace()
- Exit.exit(1)
- }
- }
-
- private def send(producer: Producer[Array[Byte], Array[Byte]],
- record: ProducerRecord[Array[Byte], Array[Byte]],
sync: Boolean): Unit = {
- if (sync)
- producer.send(record).get()
- else
- producer.send(record, new ErrorLoggingCallback(record.topic, record.key,
record.value, false))
- }
-
- def getReaderProps(config: ProducerConfig): Properties = {
- val props =
- if (config.options.has(config.readerConfigOpt))
- Utils.loadProps(config.options.valueOf(config.readerConfigOpt))
- else new Properties
- props.put("topic", config.topic)
- props ++= config.cmdLineProps
- props
- }
-
- def producerProps(config: ProducerConfig): Properties = {
- val props =
- if (config.options.has(config.producerConfigOpt))
- Utils.loadProps(config.options.valueOf(config.producerConfigOpt))
- else new Properties
-
- props ++= config.extraProducerProps
-
- if (config.bootstrapServer != null)
- props.put(ProducerConfig.BOOTSTRAP_SERVERS_CONFIG,
config.bootstrapServer)
- else
- props.put(ProducerConfig.BOOTSTRAP_SERVERS_CONFIG, config.brokerList)
-
- props.put(ProducerConfig.COMPRESSION_TYPE_CONFIG, config.compressionCodec)
- if (props.getProperty(ProducerConfig.CLIENT_ID_CONFIG) == null)
- props.put(ProducerConfig.CLIENT_ID_CONFIG, "console-producer")
- props.put(ProducerConfig.KEY_SERIALIZER_CLASS_CONFIG,
"org.apache.kafka.common.serialization.ByteArraySerializer")
- props.put(ProducerConfig.VALUE_SERIALIZER_CLASS_CONFIG,
"org.apache.kafka.common.serialization.ByteArraySerializer")
-
- CommandLineUtils.maybeMergeOptions(
- props, ProducerConfig.LINGER_MS_CONFIG, config.options,
config.sendTimeoutOpt)
- CommandLineUtils.maybeMergeOptions(
- props, ProducerConfig.ACKS_CONFIG, config.options,
config.requestRequiredAcksOpt)
- CommandLineUtils.maybeMergeOptions(
- props, ProducerConfig.REQUEST_TIMEOUT_MS_CONFIG, config.options,
config.requestTimeoutMsOpt)
- CommandLineUtils.maybeMergeOptions(
- props, ProducerConfig.RETRIES_CONFIG, config.options,
config.messageSendMaxRetriesOpt)
- CommandLineUtils.maybeMergeOptions(
- props, ProducerConfig.RETRY_BACKOFF_MS_CONFIG, config.options,
config.retryBackoffMsOpt)
- CommandLineUtils.maybeMergeOptions(
- props, ProducerConfig.SEND_BUFFER_CONFIG, config.options,
config.socketBufferSizeOpt)
- CommandLineUtils.maybeMergeOptions(
- props, ProducerConfig.BUFFER_MEMORY_CONFIG, config.options,
config.maxMemoryBytesOpt)
- // We currently have 2 options to set the batch.size value. We'll
deprecate/remove one of them in KIP-717.
- CommandLineUtils.maybeMergeOptions(
- props, ProducerConfig.BATCH_SIZE_CONFIG, config.options,
config.batchSizeOpt)
- CommandLineUtils.maybeMergeOptions(
- props, ProducerConfig.BATCH_SIZE_CONFIG, config.options,
config.maxPartitionMemoryBytesOpt)
- CommandLineUtils.maybeMergeOptions(
- props, ProducerConfig.METADATA_MAX_AGE_CONFIG, config.options,
config.metadataExpiryMsOpt)
- CommandLineUtils.maybeMergeOptions(
- props, ProducerConfig.MAX_BLOCK_MS_CONFIG, config.options,
config.maxBlockMsOpt)
-
- props
- }
-
- class ProducerConfig(args: Array[String]) extends
CommandDefaultOptions(args) {
- val topicOpt: OptionSpec[String] = parser.accepts("topic", "REQUIRED: The
topic id to produce messages to.")
- .withRequiredArg
- .describedAs("topic")
- .ofType(classOf[String])
- private val brokerListOpt = parser.accepts("broker-list", "DEPRECATED, use
--bootstrap-server instead; ignored if --bootstrap-server is specified. The
broker list string in the form HOST1:PORT1,HOST2:PORT2.")
- .withRequiredArg
- .describedAs("broker-list")
- .ofType(classOf[String])
- val bootstrapServerOpt: OptionSpec[String] =
parser.accepts("bootstrap-server", "REQUIRED unless --broker-list(deprecated)
is specified. The server(s) to connect to. The broker list string in the form
HOST1:PORT1,HOST2:PORT2.")
- .requiredUnless("broker-list")
- .withRequiredArg
- .describedAs("server to connect to")
- .ofType(classOf[String])
- private val syncOpt = parser.accepts("sync", "If set message send requests
to the brokers are synchronously, one at a time as they arrive.")
- val compressionCodecOpt: OptionSpec[String] =
parser.accepts("compression-codec", "The compression codec: either 'none',
'gzip', 'snappy', 'lz4', or 'zstd'." +
- "If
specified without value, then it defaults to 'gzip'")
- .withOptionalArg()
- .describedAs("compression-codec")
- .ofType(classOf[String])
- val batchSizeOpt: OptionSpec[Integer] = parser.accepts("batch-size",
"Number of messages to send in a single batch if they are not being sent
synchronously. "+
- "please note that this option will be replaced if
max-partition-memory-bytes is also set")
- .withRequiredArg
- .describedAs("size")
- .ofType(classOf[java.lang.Integer])
- .defaultsTo(16 * 1024)
- val messageSendMaxRetriesOpt: OptionSpec[Integer] =
parser.accepts("message-send-max-retries", "Brokers can fail receiving the
message for multiple reasons, " +
- "and being unavailable transiently is just one of them. This property
specifies the number of retries before the producer give up and drop this
message. " +
- "This is the option to control `retries` in producer configs.")
- .withRequiredArg
- .ofType(classOf[java.lang.Integer])
- .defaultsTo(3)
- val retryBackoffMsOpt: OptionSpec[lang.Long] =
parser.accepts("retry-backoff-ms", "Before each retry, the producer refreshes
the metadata of relevant topics. " +
- "Since leader election takes a bit of time, this property specifies the
amount of time that the producer waits before refreshing the metadata. " +
- "This is the option to control `retry.backoff.ms` in producer configs.")
- .withRequiredArg
- .ofType(classOf[java.lang.Long])
- .defaultsTo(100)
- val sendTimeoutOpt: OptionSpec[lang.Long] = parser.accepts("timeout", "If
set and the producer is running in asynchronous mode, this gives the maximum
amount of time" +
- " a message will queue awaiting sufficient batch size. The value is
given in ms. " +
- "This is the option to control `linger.ms` in producer configs.")
- .withRequiredArg
- .describedAs("timeout_ms")
- .ofType(classOf[java.lang.Long])
- .defaultsTo(1000)
- val requestRequiredAcksOpt: OptionSpec[String] =
parser.accepts("request-required-acks", "The required `acks` of the producer
requests")
- .withRequiredArg
- .describedAs("request required acks")
- .ofType(classOf[java.lang.String])
- .defaultsTo("-1")
- val requestTimeoutMsOpt: OptionSpec[Integer] =
parser.accepts("request-timeout-ms", "The ack timeout of the producer requests.
Value must be non-negative and non-zero.")
- .withRequiredArg
- .describedAs("request timeout ms")
- .ofType(classOf[java.lang.Integer])
- .defaultsTo(1500)
- val metadataExpiryMsOpt: OptionSpec[lang.Long] =
parser.accepts("metadata-expiry-ms",
- "The period of time in milliseconds after which we force a refresh of
metadata even if we haven't seen any leadership changes. " +
- "This is the option to control `metadata.max.age.ms` in producer
configs.")
- .withRequiredArg
- .describedAs("metadata expiration interval")
- .ofType(classOf[java.lang.Long])
- .defaultsTo(5*60*1000L)
- val maxBlockMsOpt: OptionSpec[lang.Long] = parser.accepts("max-block-ms",
- "The max time that the producer will block for during a send request.")
- .withRequiredArg
- .describedAs("max block on send")
- .ofType(classOf[java.lang.Long])
- .defaultsTo(60*1000L)
- val maxMemoryBytesOpt: OptionSpec[lang.Long] =
parser.accepts("max-memory-bytes",
- "The total memory used by the producer to buffer records waiting to be
sent to the server. " +
- "This is the option to control `buffer.memory` in producer configs.")
- .withRequiredArg
- .describedAs("total memory in bytes")
- .ofType(classOf[java.lang.Long])
- .defaultsTo(32 * 1024 * 1024L)
- val maxPartitionMemoryBytesOpt: OptionSpec[Integer] =
parser.accepts("max-partition-memory-bytes",
- "The buffer size allocated for a partition. When records are received
which are smaller than this size the producer " +
- "will attempt to optimistically group them together until this size is
reached. " +
- "This is the option to control `batch.size` in producer configs.")
- .withRequiredArg
- .describedAs("memory in bytes per partition")
- .ofType(classOf[java.lang.Integer])
- .defaultsTo(16 * 1024)
- private val messageReaderOpt = parser.accepts("line-reader", "The class
name of the class to use for reading lines from standard in. " +
- "By default each line is read as a separate message.")
- .withRequiredArg
- .describedAs("reader_class")
- .ofType(classOf[java.lang.String])
- .defaultsTo(classOf[LineMessageReader].getName)
- val socketBufferSizeOpt: OptionSpec[Integer] =
parser.accepts("socket-buffer-size", "The size of the tcp RECV size. " +
- "This is the option to control `send.buffer.bytes` in producer configs.")
- .withRequiredArg
- .describedAs("size")
- .ofType(classOf[java.lang.Integer])
- .defaultsTo(1024*100)
- private val propertyOpt = parser.accepts("property",
- """A mechanism to pass user-defined properties in the form key=value to
the message reader. This allows custom configuration for a user-defined message
reader.
- |Default properties include:
- | parse.key=false
- | parse.headers=false
- | ignore.error=false
- | key.separator=\t
- | headers.delimiter=\t
- | headers.separator=,
- | headers.key.separator=:
- | null.marker= When set, any fields (key, value and headers) equal
to this will be replaced by null
- |Default parsing pattern when:
- | parse.headers=true and parse.key=true:
- | "h1:v1,h2:v2...\tkey\tvalue"
- | parse.key=true:
- | "key\tvalue"
- | parse.headers=true:
- | "h1:v1,h2:v2...\tvalue"
- """.stripMargin
- )
- .withRequiredArg
- .describedAs("prop")
- .ofType(classOf[String])
- val readerConfigOpt: OptionSpec[String] = parser.accepts("reader-config",
s"Config properties file for the message reader. Note that $propertyOpt takes
precedence over this config.")
- .withRequiredArg
- .describedAs("config file")
- .ofType(classOf[String])
- private val producerPropertyOpt = parser.accepts("producer-property", "A
mechanism to pass user-defined properties in the form key=value to the
producer. ")
- .withRequiredArg
- .describedAs("producer_prop")
- .ofType(classOf[String])
- val producerConfigOpt: OptionSpec[String] =
parser.accepts("producer.config", s"Producer config properties file. Note that
$producerPropertyOpt takes precedence over this config.")
- .withRequiredArg
- .describedAs("config file")
- .ofType(classOf[String])
-
- options = tryParse(parser, args)
-
- CommandLineUtils.maybePrintHelpOrVersion(this, "This tool helps to read
data from standard input and publish it to Kafka.")
-
- CommandLineUtils.checkRequiredArgs(parser, options, topicOpt)
-
- val topic: String = options.valueOf(topicOpt)
-
- val bootstrapServer: String = options.valueOf(bootstrapServerOpt)
- val brokerList: String = options.valueOf(brokerListOpt)
-
- val brokerHostsAndPorts: String = options.valueOf(if
(options.has(bootstrapServerOpt)) bootstrapServerOpt else brokerListOpt)
- ToolsUtils.validatePortOrDie(parser, brokerHostsAndPorts)
-
- val sync: Boolean = options.has(syncOpt)
- private val compressionCodecOptionValue =
options.valueOf(compressionCodecOpt)
- val compressionCodec: String = if (options.has(compressionCodecOpt))
- if (compressionCodecOptionValue == null ||
compressionCodecOptionValue.isEmpty)
- CompressionType.GZIP.name
- else compressionCodecOptionValue
- else CompressionType.NONE.name
- val readerClass: String = options.valueOf(messageReaderOpt)
- val cmdLineProps: Properties =
CommandLineUtils.parseKeyValueArgs(options.valuesOf(propertyOpt))
- val extraProducerProps: Properties =
CommandLineUtils.parseKeyValueArgs(options.valuesOf(producerPropertyOpt))
-
- def tryParse(parser: OptionParser, args: Array[String]): OptionSet = {
- try
- parser.parse(args: _*)
- catch {
- case e: OptionException =>
- ToolsUtils.printUsageAndExit(parser, e.getMessage)
- }
- }
- }
-
- class LineMessageReader extends RecordReader {
- var topic: String = _
- var parseKey: Boolean = false
- var keySeparator: String = "\t"
- var parseHeaders: Boolean = false
- private var headersDelimiter = "\t"
- var headersSeparator: String = ","
- private var headersKeySeparator = ":"
- private var ignoreError = false
- private var lineNumber = 0
- private val printPrompt = System.console != null
- private var headersSeparatorPattern: Pattern = _
- private var nullMarker: String = _
-
- override def configure(props: java.util.Map[String, _]): Unit = {
- topic = props.get("topic").toString
- if (props.containsKey("parse.key"))
- parseKey =
props.get("parse.key").toString.trim.equalsIgnoreCase("true")
- if (props.containsKey("key.separator"))
- keySeparator = props.get("key.separator").toString
- if (props.containsKey("parse.headers"))
- parseHeaders =
props.get("parse.headers").toString.trim.equalsIgnoreCase("true")
- if (props.containsKey("headers.delimiter"))
- headersDelimiter = props.get("headers.delimiter").toString
- if (props.containsKey("headers.separator"))
- headersSeparator = props.get("headers.separator").toString
- headersSeparatorPattern = Pattern.compile(headersSeparator)
- if (props.containsKey("headers.key.separator"))
- headersKeySeparator = props.get("headers.key.separator").toString
- if (props.containsKey("ignore.error"))
- ignoreError =
props.get("ignore.error").toString.trim.equalsIgnoreCase("true")
- if (headersDelimiter == headersSeparator)
- throw new KafkaException("headers.delimiter and headers.separator may
not be equal")
- if (headersDelimiter == headersKeySeparator)
- throw new KafkaException("headers.delimiter and headers.key.separator
may not be equal")
- if (headersSeparator == headersKeySeparator)
- throw new KafkaException("headers.separator and headers.key.separator
may not be equal")
- if (props.containsKey("null.marker"))
- nullMarker = props.get("null.marker").toString
- if (nullMarker == keySeparator)
- throw new KafkaException("null.marker and key.separator may not be
equal")
- if (nullMarker == headersSeparator)
- throw new KafkaException("null.marker and headers.separator may not be
equal")
- if (nullMarker == headersDelimiter)
- throw new KafkaException("null.marker and headers.delimiter may not be
equal")
- if (nullMarker == headersKeySeparator)
- throw new KafkaException("null.marker and headers.key.separator may
not be equal")
- }
-
- override def readRecords(inputStream: InputStream):
java.util.Iterator[ProducerRecord[Array[Byte], Array[Byte]]] =
- new java.util.Iterator[ProducerRecord[Array[Byte], Array[Byte]]] {
- private[this] val reader = new BufferedReader(new
InputStreamReader(inputStream, StandardCharsets.UTF_8))
- private[this] var current: ProducerRecord[Array[Byte], Array[Byte]] = _
- override def hasNext: Boolean =
- if (current != null) true
- else {
- lineNumber += 1
- if (printPrompt) print(">")
- val line = reader.readLine()
- current = line match {
- case null => null
- case line =>
- val headers = parse(parseHeaders, line, 0, headersDelimiter,
"headers delimiter")
- val headerOffset = if (headers == null) 0 else headers.length
+ headersDelimiter.length
-
- val key = parse(parseKey, line, headerOffset, keySeparator,
"key separator")
- val keyOffset = if (key == null) 0 else key.length +
keySeparator.length
-
- val value = line.substring(headerOffset + keyOffset)
-
- val record = new ProducerRecord[Array[Byte], Array[Byte]](
- topic,
- if (key != null && key != nullMarker)
key.getBytes(StandardCharsets.UTF_8) else null,
- if (value != null && value != nullMarker)
value.getBytes(StandardCharsets.UTF_8) else null,
- )
-
- if (headers != null && headers != nullMarker) {
- splitHeaders(headers)
- .foreach(header => record.headers.add(header._1,
header._2))
- }
- record
- }
- current != null
- }
-
- override def next(): ProducerRecord[Array[Byte], Array[Byte]] = if
(!hasNext) throw new NoSuchElementException("no more record")
- else try current finally current = null
- }
-
-
- private def parse(enabled: Boolean, line: String, startIndex: Int,
demarcation: String, demarcationName: String): String = {
- (enabled, line.indexOf(demarcation, startIndex)) match {
- case (false, _) => null
- case (_, -1) =>
- if (ignoreError) null
- else throw new KafkaException(s"No $demarcationName found on line
number $lineNumber: '$line'")
- case (_, index) => line.substring(startIndex, index)
- }
- }
-
- private def splitHeaders(headers: String): Array[(String, Array[Byte])] = {
- headersSeparatorPattern.split(headers).map { pair =>
- (pair.indexOf(headersKeySeparator), ignoreError) match {
- case (-1, false) => throw new KafkaException(s"No header key
separator found in pair '$pair' on line number $lineNumber")
- case (-1, true) => (pair, null)
- case (i, _) =>
- val headerKey = pair.substring(0, i) match {
- case k if k == nullMarker =>
- throw new KafkaException(s"Header keys should not be equal to
the null marker '$nullMarker' as they can't be null")
- case k => k
- }
- val headerValue = pair.substring(i + headersKeySeparator.length)
match {
- case v if v == nullMarker => null
- case v => v.getBytes(StandardCharsets.UTF_8)
- }
- (headerKey, headerValue)
- }
- }
- }
- }
-}
diff --git a/core/src/main/scala/kafka/utils/ToolsUtils.scala
b/core/src/main/scala/kafka/utils/ToolsUtils.scala
index 5c076c7f746..7831ee64d1e 100644
--- a/core/src/main/scala/kafka/utils/ToolsUtils.scala
+++ b/core/src/main/scala/kafka/utils/ToolsUtils.scala
@@ -20,24 +20,10 @@ import joptsimple.OptionParser
import org.apache.kafka.server.util.CommandLineUtils
object ToolsUtils {
-
- def validatePortOrDie(parser: OptionParser, hostPort: String): Unit = {
- val hostPorts: Array[String] = if (hostPort.contains(','))
- hostPort.split(",")
- else
- Array(hostPort)
- val validHostPort = hostPorts.filter { hostPortData =>
- org.apache.kafka.common.utils.Utils.getPort(hostPortData) != null
- }
- val isValid = validHostPort.nonEmpty && validHostPort.length ==
hostPorts.length
- if (!isValid)
- CommandLineUtils.printUsageAndExit(parser, "Please provide valid
host:port like host1:9091,host2:9092\n ")
- }
-
/**
* This is a simple wrapper around `CommandLineUtils.printUsageAndExit`.
* It is needed for tools migration (KAFKA-14525), as there is no Java
equivalent for return type `Nothing`.
- * Can be removed once [[kafka.tools.ConsoleProducer]] are migrated.
+ * Can be removed once ZooKeeper related code are deleted.
*
* @param parser Command line options parser.
* @param message Error message.
diff --git a/core/src/test/scala/unit/kafka/tools/ConsoleProducerTest.scala
b/core/src/test/scala/unit/kafka/tools/ConsoleProducerTest.scala
deleted file mode 100644
index ab27ffc9595..00000000000
--- a/core/src/test/scala/unit/kafka/tools/ConsoleProducerTest.scala
+++ /dev/null
@@ -1,259 +0,0 @@
-/*
- * Licensed to the Apache Software Foundation (ASF) under one or more
- * contributor license agreements. See the NOTICE file distributed with
- * this work for additional information regarding copyright ownership.
- * The ASF licenses this file to You under the Apache License, Version 2.0
- * (the "License"); you may not use this file except in compliance with
- * the License. You may obtain a copy of the License at
- *
- * http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
- */
-
-package kafka.tools
-
-import kafka.tools.ConsoleProducer.LineMessageReader
-import kafka.utils.TestUtils
-import org.apache.kafka.clients.producer.{Producer, ProducerConfig,
ProducerRecord}
-import org.apache.kafka.common.utils.Exit
-import org.apache.kafka.tools.api.RecordReader
-import org.junit.jupiter.api.Assertions.{assertEquals, assertThrows,
assertTrue}
-import org.junit.jupiter.api.Test
-import org.mockito.Mockito
-
-import java.io.InputStream
-import java.util
-import java.util.Properties
-
-class ConsoleProducerTest {
-
- val brokerListValidArgs: Array[String] = Array(
- "--broker-list",
- "localhost:1001,localhost:1002",
- "--topic",
- "t3",
- "--property",
- "parse.key=true",
- "--property",
- "key.separator=#"
- )
- val bootstrapServerValidArgs: Array[String] = Array(
- "--bootstrap-server",
- "localhost:1003,localhost:1004",
- "--topic",
- "t3",
- "--property",
- "parse.key=true",
- "--property",
- "key.separator=#"
- )
- val invalidArgs: Array[String] = Array(
- "--t", // not a valid argument
- "t3"
- )
- val bootstrapServerOverride: Array[String] = Array(
- "--broker-list",
- "localhost:1001",
- "--bootstrap-server",
- "localhost:1002",
- "--topic",
- "t3",
- )
- val clientIdOverride: Array[String] = Array(
- "--broker-list",
- "localhost:1001",
- "--topic",
- "t3",
- "--producer-property",
- "client.id=producer-1"
- )
- val batchSizeOverriddenByMaxPartitionMemoryBytesValue: Array[String] = Array(
- "--broker-list",
- "localhost:1001",
- "--bootstrap-server",
- "localhost:1002",
- "--topic",
- "t3",
- "--batch-size",
- "123",
- "--max-partition-memory-bytes",
- "456"
- )
- val btchSizeSetAndMaxPartitionMemoryBytesNotSet: Array[String] = Array(
- "--broker-list",
- "localhost:1001",
- "--bootstrap-server",
- "localhost:1002",
- "--topic",
- "t3",
- "--batch-size",
- "123"
- )
- val batchSizeNotSetAndMaxPartitionMemoryBytesSet: Array[String] = Array(
- "--broker-list",
- "localhost:1001",
- "--bootstrap-server",
- "localhost:1002",
- "--topic",
- "t3",
- "--max-partition-memory-bytes",
- "456"
- )
- val batchSizeDefault: Array[String] = Array(
- "--broker-list",
- "localhost:1001",
- "--bootstrap-server",
- "localhost:1002",
- "--topic",
- "t3"
- )
-
- @Test
- def testValidConfigsBrokerList(): Unit = {
- val config = new ConsoleProducer.ProducerConfig(brokerListValidArgs)
- val producerConfig = new
ProducerConfig(ConsoleProducer.producerProps(config))
- assertEquals(util.Arrays.asList("localhost:1001", "localhost:1002"),
- producerConfig.getList(ProducerConfig.BOOTSTRAP_SERVERS_CONFIG))
- }
-
- @Test
- def testValidConfigsBootstrapServer(): Unit = {
- val config = new ConsoleProducer.ProducerConfig(bootstrapServerValidArgs)
- val producerConfig = new
ProducerConfig(ConsoleProducer.producerProps(config))
- assertEquals(util.Arrays.asList("localhost:1003", "localhost:1004"),
- producerConfig.getList(ProducerConfig.BOOTSTRAP_SERVERS_CONFIG))
- }
-
- @Test
- def testInvalidConfigs(): Unit = {
- Exit.setExitProcedure((_, message) => throw new
IllegalArgumentException(message))
- try assertThrows(classOf[IllegalArgumentException], () => new
ConsoleProducer.ProducerConfig(invalidArgs))
- finally Exit.resetExitProcedure()
- }
-
- @Test
- def testParseKeyProp(): Unit = {
- val config = new ConsoleProducer.ProducerConfig(brokerListValidArgs)
- val reader =
Class.forName(config.readerClass).getDeclaredConstructor().newInstance().asInstanceOf[LineMessageReader]
-
reader.configure(ConsoleProducer.getReaderProps(config).asInstanceOf[java.util.Map[String,
_]])
- assertTrue(reader.keySeparator == "#")
- assertTrue(reader.parseKey)
- }
-
- @Test
- def testParseReaderConfigFile(): Unit = {
- val propsFile = TestUtils.tempPropertiesFile(Map("parse.key" -> "true",
"key.separator" -> "|"))
-
- val args = Array(
- "--bootstrap-server", "localhost:9092",
- "--topic", "test",
- "--property", "key.separator=;",
- "--property", "parse.headers=true",
- "--reader-config", propsFile.getAbsolutePath
- )
- val config = new ConsoleProducer.ProducerConfig(args)
- val reader =
Class.forName(config.readerClass).getDeclaredConstructor().newInstance().asInstanceOf[LineMessageReader]
-
reader.configure(ConsoleProducer.getReaderProps(config).asInstanceOf[java.util.Map[String,
_]])
- assertEquals(";", reader.keySeparator)
- assertTrue(reader.parseKey)
- assertTrue(reader.parseHeaders)
- }
-
- @Test
- def testBootstrapServerOverride(): Unit = {
- val config = new ConsoleProducer.ProducerConfig(bootstrapServerOverride)
- val producerConfig = new
ProducerConfig(ConsoleProducer.producerProps(config))
- assertEquals(util.Arrays.asList("localhost:1002"),
- producerConfig.getList(ProducerConfig.BOOTSTRAP_SERVERS_CONFIG))
- }
-
- @Test
- def testClientIdOverride(): Unit = {
- val config = new ConsoleProducer.ProducerConfig(clientIdOverride)
- val producerConfig = new
ProducerConfig(ConsoleProducer.producerProps(config))
- assertEquals("producer-1",
- producerConfig.getString(ProducerConfig.CLIENT_ID_CONFIG))
- }
-
- @Test
- def testDefaultClientId(): Unit = {
- val config = new ConsoleProducer.ProducerConfig(brokerListValidArgs)
- val producerConfig = new
ProducerConfig(ConsoleProducer.producerProps(config))
- assertEquals("console-producer",
- producerConfig.getString(ProducerConfig.CLIENT_ID_CONFIG))
- }
-
- @Test
- def testBatchSizeOverriddenByMaxPartitionMemoryBytesValue(): Unit = {
- val config = new
ConsoleProducer.ProducerConfig(batchSizeOverriddenByMaxPartitionMemoryBytesValue)
- val producerConfig = new
ProducerConfig(ConsoleProducer.producerProps(config))
- assertEquals(456,
- producerConfig.getInt(ProducerConfig.BATCH_SIZE_CONFIG))
- }
-
- @Test
- def testBatchSizeSetAndMaxPartitionMemoryBytesNotSet(): Unit = {
- val config = new
ConsoleProducer.ProducerConfig(btchSizeSetAndMaxPartitionMemoryBytesNotSet)
- val producerConfig = new
ProducerConfig(ConsoleProducer.producerProps(config))
- assertEquals(123,
- producerConfig.getInt(ProducerConfig.BATCH_SIZE_CONFIG))
- }
-
- @Test
- def testDefaultBatchSize(): Unit = {
- val config = new ConsoleProducer.ProducerConfig(batchSizeDefault)
- val producerConfig = new
ProducerConfig(ConsoleProducer.producerProps(config))
- assertEquals(16*1024,
- producerConfig.getInt(ProducerConfig.BATCH_SIZE_CONFIG))
- }
-
- @Test
- def testBatchSizeNotSetAndMaxPartitionMemoryBytesSet (): Unit = {
- val config = new
ConsoleProducer.ProducerConfig(batchSizeNotSetAndMaxPartitionMemoryBytesSet)
- val producerConfig = new
ProducerConfig(ConsoleProducer.producerProps(config))
- assertEquals(456,
- producerConfig.getInt(ProducerConfig.BATCH_SIZE_CONFIG))
- }
-
- @Test
- def testNewReader(): Unit = {
- ConsoleProducerTest.configureCount = 0
- ConsoleProducerTest.closeCount = 0
- val reader1 =
ConsoleProducer.newReader(classOf[ConsoleProducerTest.TestRecordReader].getName,
new Properties())
- assertEquals(1, ConsoleProducerTest.configureCount)
- assertEquals(0, ConsoleProducerTest.closeCount)
- reader1.close()
- assertEquals(1, ConsoleProducerTest.closeCount)
- }
-
- @Test
- def testLoopReader(): Unit = {
- ConsoleProducerTest.configureCount = 0
- ConsoleProducerTest.closeCount = 0
- val reader =
ConsoleProducer.newReader(classOf[ConsoleProducerTest.TestRecordReader].getName,
new Properties())
-
- ConsoleProducer.loopReader(Mockito.mock(classOf[Producer[Array[Byte],
Array[Byte]]]),
- reader, System.in, false)
-
- assertEquals(1, ConsoleProducerTest.configureCount)
- assertEquals(1, ConsoleProducerTest.closeCount)
- }
-}
-
-object ConsoleProducerTest {
- var configureCount = 0
- var closeCount = 0
-
- class TestRecordReader extends RecordReader {
- override def configure(configs: util.Map[String, _]): Unit =
configureCount += 1
- override def readRecords(inputStream: InputStream):
java.util.Iterator[ProducerRecord[Array[Byte], Array[Byte]]] =
- java.util.Collections.emptyIterator()
-
- override def close(): Unit = closeCount += 1
- }
-}
diff --git a/core/src/test/scala/unit/kafka/tools/LineMessageReaderTest.scala
b/core/src/test/scala/unit/kafka/tools/LineMessageReaderTest.scala
deleted file mode 100644
index 740f42762de..00000000000
--- a/core/src/test/scala/unit/kafka/tools/LineMessageReaderTest.scala
+++ /dev/null
@@ -1,354 +0,0 @@
-/**
- * Licensed to the Apache Software Foundation (ASF) under one or more
- * contributor license agreements. See the NOTICE file distributed with
- * this work for additional information regarding copyright ownership.
- * The ASF licenses this file to You under the Apache License, Version 2.0
- * (the "License") you may not use this file except in compliance with
- * the License. You may obtain a copy of the License at
- *
- * http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
- */
-
-package kafka.tools
-
-import kafka.tools.ConsoleProducer.LineMessageReader
-import org.apache.kafka.clients.producer.ProducerRecord
-import org.apache.kafka.common.KafkaException
-import org.junit.jupiter.api.Assertions.{assertEquals, assertFalse,
assertThrows}
-import org.junit.jupiter.api.Test
-
-import java.io.ByteArrayInputStream
-import java.util.Properties
-
-class LineMessageReaderTest {
-
- private def defaultTestProps = {
- val props = new Properties
- props.put("topic", "topic")
- props.put("parse.key", "true")
- props.put("parse.headers", "true")
- props
- }
-
- @Test
- def testLineReader(): Unit = {
- val input = "key0\tvalue0\nkey1\tvalue1"
-
- val props = defaultTestProps
- props.put("parse.headers", "false")
-
- runTest(props, input, record("key0", "value0"), record("key1", "value1"))
- }
-
- @Test
- def testLineReaderHeader(): Unit = {
- val input =
"headerKey0:headerValue0,headerKey1:headerValue1\tkey0\tvalue0\n"
- val expected = record("key0", "value0", List("headerKey0" ->
"headerValue0", "headerKey1" -> "headerValue1"))
- runTest(defaultTestProps, input, expected)
- }
-
- @Test
- def testMinimalValidInputWithHeaderKeyAndValue(): Unit = {
- runTest(defaultTestProps, ":\t\t", record("", "", List("" -> "")))
- }
-
- @Test
- def testKeyMissingValue(): Unit = {
- val props = defaultTestProps
- props.put("parse.headers", "false")
- runTest(props, "key\t", record("key", ""))
- }
-
- @Test
- def testDemarcationsLongerThanOne(): Unit = {
- val props = defaultTestProps
- props.put("key.separator", "\t\t")
- props.put("headers.delimiter", "\t\t")
- props.put("headers.separator", "---")
- props.put("headers.key.separator", "::::")
-
- runTest(
- props,
- "headerKey0.0::::headerValue0.0---headerKey1.0::::\t\tkey\t\tvalue",
- record("key", "value", List("headerKey0.0" -> "headerValue0.0",
"headerKey1.0"-> ""))
- )
- }
-
- @Test
- def testLineReaderHeaderNoKey(): Unit = {
- val input = "headerKey:headerValue\tvalue\n"
-
- val props = defaultTestProps
- props.put("parse.key", "false")
-
- runTest(props, input, record(null, "value", List("headerKey" ->
"headerValue")))
- }
-
- @Test
- def testLineReaderOnlyValue(): Unit = {
- val props = defaultTestProps
- props.put("parse.key", "false")
- props.put("parse.headers", "false")
-
- runTest(props, "value\n", record(null, "value"))
- }
-
- @Test
- def
testParseHeaderEnabledWithCustomDelimiterAndVaryingNumberOfKeyValueHeaderPairs():
Unit = {
- val props = defaultTestProps
- props.put("key.separator", "#")
- props.put("headers.delimiter", "!")
- props.put("headers.separator", "&")
- props.put("headers.key.separator", ":")
-
- val input =
- "headerKey0.0:headerValue0.0&headerKey0.1:headerValue0.1!key0#value0\n" +
- "headerKey1.0:headerValue1.0!key1#value1"
-
- val record0 = record("key0", "value0", List("headerKey0.0" ->
"headerValue0.0", "headerKey0.1" -> "headerValue0.1"))
- val record1 = record("key1", "value1", List("headerKey1.0" ->
"headerValue1.0"))
-
- runTest(props, input, record0, record1)
- }
-
- @Test
- def testMissingKeySeparator(): Unit = {
- val lineReader = new LineMessageReader
- val input =
-
"headerKey0.0:headerValue0.0,headerKey0.1:headerValue0.1\tkey0\tvalue0\n" +
- "headerKey1.0:headerValue1.0\tkey1[MISSING-DELIMITER]value1"
-
- lineReader.configure(defaultTestProps.asInstanceOf[java.util.Map[String,
_]])
- val iter = lineReader.readRecords(new ByteArrayInputStream(input.getBytes))
- iter.next()
-
- val expectedException = assertThrows(classOf[KafkaException], () =>
iter.next())
-
- assertEquals(
- "No key separator found on line number 2:
'headerKey1.0:headerValue1.0\tkey1[MISSING-DELIMITER]value1'",
- expectedException.getMessage
- )
- }
-
- @Test
- def testMissingHeaderKeySeparator(): Unit = {
- val lineReader = new LineMessageReader()
- val input = "key[MISSING-DELIMITER]val\tkey0\tvalue0\n"
- lineReader.configure(defaultTestProps.asInstanceOf[java.util.Map[String,
_]])
- val iter = lineReader.readRecords(new ByteArrayInputStream(input.getBytes))
-
- val expectedException = assertThrows(classOf[KafkaException], () =>
iter.next())
-
- assertEquals(
- "No header key separator found in pair 'key[MISSING-DELIMITER]val' on
line number 1",
- expectedException.getMessage
- )
- }
-
- @Test
- def testHeaderDemarcationCollision(): Unit = {
- val props = defaultTestProps
- props.put("headers.delimiter", "\t")
- props.put("headers.separator", "\t")
- props.put("headers.key.separator", "\t")
-
- assertThrowsOnInvalidPatternConfig(props, "headers.delimiter and
headers.separator may not be equal")
-
- props.put("headers.separator", ",")
- assertThrowsOnInvalidPatternConfig(props, "headers.delimiter and
headers.key.separator may not be equal")
-
- props.put("headers.key.separator", ",")
- assertThrowsOnInvalidPatternConfig(props, "headers.separator and
headers.key.separator may not be equal")
- }
-
- private def assertThrowsOnInvalidPatternConfig(props: Properties,
expectedMessage: String): Unit = {
- val exception = assertThrows(classOf[KafkaException], () => new
LineMessageReader().configure(props.asInstanceOf[java.util.Map[String, _]]))
- assertEquals(
- expectedMessage,
- exception.getMessage
- )
- }
-
- @Test
- def testIgnoreErrorInInput(): Unit = {
- val input =
- "headerKey0.0:headerValue0.0\tkey0\tvalue0\n" +
-
"headerKey1.0:headerValue1.0,headerKey1.1:headerValue1.1[MISSING-HEADER-DELIMITER]key1\tvalue1\n"
+
- "headerKey2.0:headerValue2.0\tkey2[MISSING-KEY-DELIMITER]value2\n" +
-
"headerKey3.0:headerValue3.0[MISSING-HEADER-DELIMITER]key3[MISSING-KEY-DELIMITER]value3\n"
-
- val props = defaultTestProps
- props.put("ignore.error", "true")
-
- val validRecord = record("key0", "value0", List("headerKey0.0" ->
"headerValue0.0"))
-
- val missingHeaderDelimiter: ProducerRecord[String, String] =
- record(
- null,
- "value1",
- List("headerKey1.0" -> "headerValue1.0", "headerKey1.1" ->
"headerValue1.1[MISSING-HEADER-DELIMITER]key1")
- )
-
- val missingKeyDelimiter: ProducerRecord[String, String] =
- record(
- null,
- "key2[MISSING-KEY-DELIMITER]value2",
- List("headerKey2.0" -> "headerValue2.0")
- )
-
- val missingKeyHeaderDelimiter: ProducerRecord[String, String] =
- record(
- null,
-
"headerKey3.0:headerValue3.0[MISSING-HEADER-DELIMITER]key3[MISSING-KEY-DELIMITER]value3",
- List()
- )
-
- runTest(props, input, validRecord, missingHeaderDelimiter,
missingKeyDelimiter, missingKeyHeaderDelimiter)
- }
-
- @Test
- def testMalformedHeaderIgnoreError(): Unit = {
- val input = "key-val\tkey0\tvalue0\n"
-
- val props = defaultTestProps
- props.put("ignore.error", "true")
-
- val expected = record("key0", "value0", List("key-val" -> null))
-
- runTest(props, input, expected)
- }
-
- @Test
- def testNullMarker(): Unit = {
- val input =
- "key\t\n" +
- "key\t<NULL>\n" +
- "key\t<NULL>value\n" +
- "<NULL>\tvalue\n" +
- "<NULL>\t<NULL>"
-
- val props = defaultTestProps
- props.put("null.marker", "<NULL>")
- props.put("parse.headers", "false")
- runTest(props, input,
- record("key", ""),
- record("key", null),
- record("key", "<NULL>value"),
- record(null, "value"),
- record(null, null))
-
- // If the null marker is not set
- props.remove("null.marker")
- runTest(props, input,
- record("key", ""),
- record("key", "<NULL>"),
- record("key", "<NULL>value"),
- record("<NULL>", "value"),
- record("<NULL>", "<NULL>"))
- }
-
- @Test
- def testNullMarkerWithHeaders(): Unit = {
- val input =
- "h0:v0,h1:v1\t<NULL>\tvalue\n" +
- "<NULL>\tkey\t<NULL>\n" +
- "h0:,h1:v1\t<NULL>\t<NULL>\n" +
- "h0:<NULL>,h1:v1\tkey\t<NULL>\n" +
- "h0:<NULL>,h1:<NULL>value\tkey\t<NULL>\n"
- val header = "h1" -> "v1"
-
- val props = defaultTestProps
- props.put("null.marker", "<NULL>")
- runTest(props, input,
- record(null, "value", List("h0" -> "v0", header)),
- record("key", null),
- record(null, null, List("h0" -> "", header)),
- record("key", null, List("h0" -> null, header)),
- record("key", null, List("h0" -> null, "h1" -> "<NULL>value")))
-
- // If the null marker is not set
- val lineReader = new LineMessageReader()
- props.remove("null.marker")
- lineReader.configure(props.asInstanceOf[java.util.Map[String, _]])
- val iter = lineReader.readRecords(new ByteArrayInputStream(input.getBytes))
- assertRecordEquals(record("<NULL>", "value", List("h0" -> "v0", header)),
iter.next())
- // line 2 is not valid anymore
- val expectedException = assertThrows(classOf[KafkaException], () =>
iter.next())
- assertEquals(
- "No header key separator found in pair '<NULL>' on line number 2",
- expectedException.getMessage
- )
- assertRecordEquals(record("<NULL>", "<NULL>", List("h0" -> "", header)),
iter.next())
- assertRecordEquals(record("key", "<NULL>", List("h0" -> "<NULL>",
header)), iter.next())
- assertRecordEquals(record("key", "<NULL>", List("h0" -> "<NULL>", "h1" ->
"<NULL>value")), iter.next())
- }
-
- @Test
- def testNullMarkerHeaderKeyThrows(): Unit = {
- val input = "<NULL>:v0,h1:v1\tkey\tvalue\n"
-
- val props = defaultTestProps
- props.put("null.marker", "<NULL>")
- val lineReader = new LineMessageReader()
- lineReader.configure(props.asInstanceOf[java.util.Map[String, _]])
- val iter = lineReader.readRecords(new ByteArrayInputStream(input.getBytes))
- val expectedException = assertThrows(classOf[KafkaException], () =>
iter.next())
- assertEquals(
- "Header keys should not be equal to the null marker '<NULL>' as they
can't be null",
- expectedException.getMessage
- )
-
- // If the null marker is not set
- props.remove("null.marker")
- runTest(props, input, record("key", "value", List("<NULL>" -> "v0", "h1"
-> "v1")))
- }
-
- @Test
- def testInvalidNullMarker(): Unit = {
- val props = defaultTestProps
- props.put("headers.delimiter", "-")
- props.put("headers.separator", ":")
- props.put("headers.key.separator", "/")
-
- props.put("null.marker", "-")
- assertThrowsOnInvalidPatternConfig(props, "null.marker and
headers.delimiter may not be equal")
-
- props.put("null.marker", ":")
- assertThrowsOnInvalidPatternConfig(props, "null.marker and
headers.separator may not be equal")
-
- props.put("null.marker", "/")
- assertThrowsOnInvalidPatternConfig(props, "null.marker and
headers.key.separator may not be equal")
- }
-
- def runTest(props: Properties, input: String, expectedRecords:
ProducerRecord[String, String]*): Unit = {
- val lineReader = new LineMessageReader
- lineReader.configure(props.asInstanceOf[java.util.Map[String, _]])
- val iter = lineReader.readRecords(new ByteArrayInputStream(input.getBytes))
- expectedRecords.foreach(r => assertRecordEquals(r, iter.next()))
- assertFalse(iter.hasNext)
- assertThrows(classOf[NoSuchElementException], () => iter.next())
- }
-
- // The equality method of ProducerRecord compares memory references for the
header iterator, this is why this custom equality check is used.
- private def assertRecordEquals[K, V](expected: ProducerRecord[K, V], actual:
ProducerRecord[Array[Byte], Array[Byte]]): Unit = {
- assertEquals(expected.key, if (actual.key == null) null else new
String(actual.key))
- assertEquals(expected.value, if (actual.value == null) null else new
String(actual.value))
- assertEquals(expected.headers.toArray.toList,
actual.headers.toArray.toList)
- }
-
- private def record[K, V](key: K, value: V, headers: List[(String, String)]):
ProducerRecord[K, V] = {
- val record = new ProducerRecord("topic", key, value)
- headers.foreach(h => record.headers.add(h._1, if (h._2 != null)
h._2.getBytes else null))
- record
- }
-
- private def record[K, V](key: K, value: V): ProducerRecord[K, V] = {
- new ProducerRecord("topic", key, value)
- }
-}
diff --git a/docker/native/native-image-configs/reflect-config.json
b/docker/native/native-image-configs/reflect-config.json
index 838e35d6f9b..dc7a219be4f 100644
--- a/docker/native/native-image-configs/reflect-config.json
+++ b/docker/native/native-image-configs/reflect-config.json
@@ -716,7 +716,7 @@
"methods":[{"name":"<init>","parameterTypes":[] }]
},
{
- "name":"kafka.tools.ConsoleProducer$LineMessageReader",
+ "name":"org.apache.kafka.tools.LineMessageReader",
"methods":[{"name":"<init>","parameterTypes":[] }]
},
{
diff --git a/tools/src/main/java/org/apache/kafka/tools/ConsoleProducer.java
b/tools/src/main/java/org/apache/kafka/tools/ConsoleProducer.java
new file mode 100644
index 00000000000..23da5b1b2d3
--- /dev/null
+++ b/tools/src/main/java/org/apache/kafka/tools/ConsoleProducer.java
@@ -0,0 +1,349 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.kafka.tools;
+
+import org.apache.kafka.clients.producer.KafkaProducer;
+import org.apache.kafka.clients.producer.Producer;
+import org.apache.kafka.clients.producer.ProducerRecord;
+import org.apache.kafka.clients.producer.internals.ErrorLoggingCallback;
+import org.apache.kafka.common.record.CompressionType;
+import org.apache.kafka.common.utils.Exit;
+import org.apache.kafka.server.util.CommandDefaultOptions;
+import org.apache.kafka.server.util.CommandLineUtils;
+import org.apache.kafka.tools.api.RecordReader;
+
+import java.io.IOException;
+import java.util.HashMap;
+import java.util.Iterator;
+import java.util.Map;
+import java.util.Properties;
+
+import joptsimple.OptionException;
+import joptsimple.OptionSpec;
+
+import static org.apache.kafka.clients.producer.ProducerConfig.ACKS_CONFIG;
+import static
org.apache.kafka.clients.producer.ProducerConfig.BATCH_SIZE_CONFIG;
+import static
org.apache.kafka.clients.producer.ProducerConfig.BOOTSTRAP_SERVERS_CONFIG;
+import static
org.apache.kafka.clients.producer.ProducerConfig.BUFFER_MEMORY_CONFIG;
+import static
org.apache.kafka.clients.producer.ProducerConfig.CLIENT_ID_CONFIG;
+import static
org.apache.kafka.clients.producer.ProducerConfig.COMPRESSION_TYPE_CONFIG;
+import static
org.apache.kafka.clients.producer.ProducerConfig.KEY_SERIALIZER_CLASS_CONFIG;
+import static
org.apache.kafka.clients.producer.ProducerConfig.LINGER_MS_CONFIG;
+import static
org.apache.kafka.clients.producer.ProducerConfig.MAX_BLOCK_MS_CONFIG;
+import static
org.apache.kafka.clients.producer.ProducerConfig.METADATA_MAX_AGE_CONFIG;
+import static
org.apache.kafka.clients.producer.ProducerConfig.REQUEST_TIMEOUT_MS_CONFIG;
+import static org.apache.kafka.clients.producer.ProducerConfig.RETRIES_CONFIG;
+import static
org.apache.kafka.clients.producer.ProducerConfig.RETRY_BACKOFF_MS_CONFIG;
+import static
org.apache.kafka.clients.producer.ProducerConfig.SEND_BUFFER_CONFIG;
+import static
org.apache.kafka.clients.producer.ProducerConfig.VALUE_SERIALIZER_CLASS_CONFIG;
+import static org.apache.kafka.common.utils.Utils.loadProps;
+import static org.apache.kafka.common.utils.Utils.propsToStringMap;
+import static org.apache.kafka.server.util.CommandLineUtils.parseKeyValueArgs;
+
+public class ConsoleProducer {
+ public static void main(String[] args) {
+ ConsoleProducer consoleProducer = new ConsoleProducer();
+ consoleProducer.start(args);
+ }
+
+ void start(String[] args) {
+ try {
+ ConsoleProducerOptions opts = new ConsoleProducerOptions(args);
+ RecordReader reader = messageReader(opts);
+ KafkaProducer<byte[], byte[]> producer = new
KafkaProducer<>(opts.producerProps());
+
+ Exit.addShutdownHook("producer-shutdown-hook", producer::close);
+
+ loopReader(producer, reader, opts.sync());
+ } catch (OptionException e) {
+ System.err.println(e.getMessage());
+ Exit.exit(1);
+ } catch (Exception e) {
+ e.printStackTrace();
+ Exit.exit(1);
+ }
+ Exit.exit(0);
+ }
+
+ // Visible for testing
+ RecordReader messageReader(ConsoleProducerOptions opts) throws Exception {
+ Object objReader =
Class.forName(opts.readerClass()).getDeclaredConstructor().newInstance();
+
+ if (objReader instanceof RecordReader) {
+ RecordReader reader = (RecordReader) objReader;
+ reader.configure(opts.readerProps());
+
+ return reader;
+ }
+
+ throw new IllegalArgumentException("The reader must implement " +
RecordReader.class.getName() + " interface");
+ }
+
+ // Visible for testing
+ void loopReader(Producer<byte[], byte[]> producer, RecordReader reader,
boolean sync) throws Exception {
+ Iterator<ProducerRecord<byte[], byte[]>> iter =
reader.readRecords(System.in);
+ try {
+ while (iter.hasNext()) {
+ send(producer, iter.next(), sync);
+ }
+ } finally {
+ reader.close();
+ }
+ }
+
+ private void send(Producer<byte[], byte[]> producer,
ProducerRecord<byte[], byte[]> record, boolean sync) throws Exception {
+ if (sync) {
+ producer.send(record).get();
+ } else {
+ producer.send(record, new ErrorLoggingCallback(record.topic(),
record.key(), record.value(), false));
+ }
+ }
+
+ static final class ConsoleProducerOptions extends CommandDefaultOptions {
+ private final OptionSpec<String> topicOpt;
+ private final OptionSpec<String> bootstrapServerOpt;
+ private final OptionSpec<Void> syncOpt;
+ private final OptionSpec<String> compressionCodecOpt;
+ private final OptionSpec<Integer> batchSizeOpt;
+ private final OptionSpec<Integer> messageSendMaxRetriesOpt;
+ private final OptionSpec<Long> retryBackoffMsOpt;
+ private final OptionSpec<Long> sendTimeoutOpt;
+ private final OptionSpec<String> requestRequiredAcksOpt;
+ private final OptionSpec<Integer> requestTimeoutMsOpt;
+ private final OptionSpec<Long> metadataExpiryMsOpt;
+ private final OptionSpec<Long> maxBlockMsOpt;
+ private final OptionSpec<Long> maxMemoryBytesOpt;
+ private final OptionSpec<Integer> maxPartitionMemoryBytesOpt;
+ private final OptionSpec<String> messageReaderOpt;
+ private final OptionSpec<Integer> socketBufferSizeOpt;
+ private final OptionSpec<String> propertyOpt;
+ private final OptionSpec<String> readerConfigOpt;
+ private final OptionSpec<String> producerPropertyOpt;
+ private final OptionSpec<String> producerConfigOpt;
+
+ public ConsoleProducerOptions(String[] args) {
+ super(args);
+ topicOpt = parser.accepts("topic", "REQUIRED: The topic name to
produce messages to.")
+ .withRequiredArg()
+ .describedAs("topic")
+ .ofType(String.class);
+ bootstrapServerOpt = parser.accepts("bootstrap-server", "REQUIRED:
The server(s) to connect to. The broker list string in the form
HOST1:PORT1,HOST2:PORT2.")
+ .withRequiredArg()
+ .describedAs("server to connect to")
+ .ofType(String.class);
+ syncOpt = parser.accepts("sync", "If set message send requests to
the brokers are synchronously, one at a time as they arrive.");
+ compressionCodecOpt = parser.accepts("compression-codec", "The
compression codec: either 'none', 'gzip', 'snappy', 'lz4', or 'zstd'." +
+ "If specified without value, then it defaults to
'gzip'")
+ .withOptionalArg()
+ .describedAs("compression-codec")
+ .ofType(String.class);
+ batchSizeOpt = parser.accepts("batch-size", "Number of messages to
send in a single batch if they are not being sent synchronously. " +
+ "please note that this option will be replaced if
max-partition-memory-bytes is also set")
+ .withRequiredArg()
+ .describedAs("size")
+ .ofType(Integer.class)
+ .defaultsTo(16 * 1024);
+ messageSendMaxRetriesOpt =
parser.accepts("message-send-max-retries", "Brokers can fail receiving the
message for multiple reasons, " +
+ "and being unavailable transiently is just one of
them. This property specifies the number of retries before the producer give up
and drop this message. " +
+ "This is the option to control `retries` in
producer configs.")
+ .withRequiredArg()
+ .ofType(Integer.class)
+ .defaultsTo(3);
+ retryBackoffMsOpt = parser.accepts("retry-backoff-ms", "Before
each retry, the producer refreshes the metadata of relevant topics. " +
+ "Since leader election takes a bit of time, this
property specifies the amount of time that the producer waits before refreshing
the metadata. " +
+ "This is the option to control `retry.backoff.ms`
in producer configs.")
+ .withRequiredArg()
+ .ofType(Long.class)
+ .defaultsTo(100L);
+ sendTimeoutOpt = parser.accepts("timeout", "If set and the
producer is running in asynchronous mode, this gives the maximum amount of
time" +
+ " a message will queue awaiting sufficient batch
size. The value is given in ms. " +
+ "This is the option to control `linger.ms` in
producer configs.")
+ .withRequiredArg()
+ .describedAs("timeout_ms")
+ .ofType(Long.class)
+ .defaultsTo(1000L);
+ requestRequiredAcksOpt = parser.accepts("request-required-acks",
"The required `acks` of the producer requests")
+ .withRequiredArg()
+ .describedAs("request required acks")
+ .ofType(String.class)
+ .defaultsTo("-1");
+ requestTimeoutMsOpt = parser.accepts("request-timeout-ms", "The
ack timeout of the producer requests. Value must be non-negative and non-zero.")
+ .withRequiredArg()
+ .describedAs("request timeout ms")
+ .ofType(Integer.class)
+ .defaultsTo(1500);
+ metadataExpiryMsOpt = parser.accepts("metadata-expiry-ms",
+ "The period of time in milliseconds after which we
force a refresh of metadata even if we haven't seen any leadership changes. " +
+ "This is the option to control
`metadata.max.age.ms` in producer configs.")
+ .withRequiredArg()
+ .describedAs("metadata expiration interval")
+ .ofType(Long.class)
+ .defaultsTo(5 * 60 * 1000L);
+ maxBlockMsOpt = parser.accepts("max-block-ms",
+ "The max time that the producer will block for
during a send request.")
+ .withRequiredArg()
+ .describedAs("max block on send")
+ .ofType(Long.class)
+ .defaultsTo(60 * 1000L);
+ maxMemoryBytesOpt = parser.accepts("max-memory-bytes",
+ "The total memory used by the producer to buffer
records waiting to be sent to the server. " +
+ "This is the option to control
`buffer.memory` in producer configs.")
+ .withRequiredArg()
+ .describedAs("total memory in bytes")
+ .ofType(Long.class)
+ .defaultsTo(32 * 1024 * 1024L);
+ maxPartitionMemoryBytesOpt =
parser.accepts("max-partition-memory-bytes",
+ "The buffer size allocated for a partition. When
records are received which are smaller than this size the producer " +
+ "will attempt to optimistically group them
together until this size is reached. " +
+ "This is the option to control
`batch.size` in producer configs.")
+ .withRequiredArg()
+ .describedAs("memory in bytes per partition")
+ .ofType(Integer.class)
+ .defaultsTo(16 * 1024);
+ messageReaderOpt = parser.accepts("line-reader", "The class name
of the class to use for reading lines from standard in. " +
+ "By default each line is read as a separate
message.")
+ .withRequiredArg()
+ .describedAs("reader_class")
+ .ofType(String.class)
+ .defaultsTo(LineMessageReader.class.getName());
+ socketBufferSizeOpt = parser.accepts("socket-buffer-size", "The
size of the tcp RECV size. " +
+ "This is the option to control `send.buffer.bytes`
in producer configs.")
+ .withRequiredArg()
+ .describedAs("size")
+ .ofType(Integer.class)
+ .defaultsTo(1024 * 100);
+ propertyOpt = parser.accepts("property",
+ "A mechanism to pass user-defined properties in
the form key=value to the message reader. This allows custom configuration for
a user-defined message reader." +
+ "\nDefault properties include:" +
+ "\n parse.key=false" +
+ "\n parse.headers=false" +
+ "\n ignore.error=false" +
+ "\n key.separator=\\t" +
+ "\n headers.delimiter=\\t" +
+ "\n headers.separator=," +
+ "\n headers.key.separator=:" +
+ "\n null.marker= When set, any fields
(key, value and headers) equal to this will be replaced by null" +
+ "\nDefault parsing pattern when:" +
+ "\n parse.headers=true and
parse.key=true:" +
+ "\n \"h1:v1,h2:v2...\\tkey\\tvalue\"" +
+ "\n parse.key=true:" +
+ "\n \"key\\tvalue\"" +
+ "\n parse.headers=true:" +
+ "\n \"h1:v1,h2:v2...\\tvalue\"")
+ .withRequiredArg()
+ .describedAs("prop")
+ .ofType(String.class);
+ readerConfigOpt = parser.accepts("reader-config", "Config
properties file for the message reader. Note that " + propertyOpt + " takes
precedence over this config.")
+ .withRequiredArg()
+ .describedAs("config file")
+ .ofType(String.class);
+ producerPropertyOpt = parser.accepts("producer-property", "A
mechanism to pass user-defined properties in the form key=value to the
producer. ")
+ .withRequiredArg()
+ .describedAs("producer_prop")
+ .ofType(String.class);
+ producerConfigOpt = parser.accepts("producer.config", "Producer
config properties file. Note that " + producerPropertyOpt + " takes precedence
over this config.")
+ .withRequiredArg()
+ .describedAs("config file")
+ .ofType(String.class);
+
+ try {
+ options = parser.parse(args);
+ } catch (OptionException e) {
+ CommandLineUtils.printUsageAndExit(parser, e.getMessage());
+ }
+
+ checkArgs();
+ }
+
+ void checkArgs() {
+ CommandLineUtils.maybePrintHelpOrVersion(this, "This tool helps to
read data from standard input and publish it to Kafka.");
+
+ CommandLineUtils.checkRequiredArgs(parser, options, topicOpt);
+
+ try {
+
ToolsUtils.validateBootstrapServer(options.valueOf(bootstrapServerOpt));
+ } catch (IllegalArgumentException e) {
+ CommandLineUtils.printUsageAndExit(parser, e.getMessage());
+ }
+ }
+
+ boolean sync() {
+ return options.has(syncOpt);
+ }
+
+ String compressionCodec() {
+ if (options.has(compressionCodecOpt)) {
+ String codecOptValue = options.valueOf(compressionCodecOpt);
+ // Defaults to gzip if no value is provided.
+ return codecOptValue == null || codecOptValue.isEmpty() ?
CompressionType.GZIP.name : codecOptValue;
+ }
+
+ return CompressionType.NONE.name;
+ }
+
+ String readerClass() {
+ return options.valueOf(messageReaderOpt);
+ }
+
+ Map<String, String> readerProps() throws IOException {
+ Map<String, String> properties = new HashMap<>();
+
+ if (options.has(readerConfigOpt)) {
+
properties.putAll(propsToStringMap(loadProps(options.valueOf(readerConfigOpt))));
+ }
+
+ properties.put("topic", options.valueOf(topicOpt));
+
properties.putAll(propsToStringMap(parseKeyValueArgs(options.valuesOf(propertyOpt))));
+
+ return properties;
+ }
+
+ Properties producerProps() throws IOException {
+ Properties props = new Properties();
+
+ if (options.has(producerConfigOpt)) {
+ props.putAll(loadProps(options.valueOf(producerConfigOpt)));
+ }
+
+
props.putAll(parseKeyValueArgs(options.valuesOf(producerPropertyOpt)));
+ props.put(BOOTSTRAP_SERVERS_CONFIG,
options.valueOf(bootstrapServerOpt));
+ props.put(COMPRESSION_TYPE_CONFIG, compressionCodec());
+
+ if (props.getProperty(CLIENT_ID_CONFIG) == null) {
+ props.put(CLIENT_ID_CONFIG, "console-producer");
+ }
+
+ props.put(KEY_SERIALIZER_CLASS_CONFIG,
"org.apache.kafka.common.serialization.ByteArraySerializer");
+ props.put(VALUE_SERIALIZER_CLASS_CONFIG,
"org.apache.kafka.common.serialization.ByteArraySerializer");
+
+ CommandLineUtils.maybeMergeOptions(props, LINGER_MS_CONFIG,
options, sendTimeoutOpt);
+ CommandLineUtils.maybeMergeOptions(props, ACKS_CONFIG, options,
requestRequiredAcksOpt);
+ CommandLineUtils.maybeMergeOptions(props,
REQUEST_TIMEOUT_MS_CONFIG, options, requestTimeoutMsOpt);
+ CommandLineUtils.maybeMergeOptions(props, RETRIES_CONFIG, options,
messageSendMaxRetriesOpt);
+ CommandLineUtils.maybeMergeOptions(props, RETRY_BACKOFF_MS_CONFIG,
options, retryBackoffMsOpt);
+ CommandLineUtils.maybeMergeOptions(props, SEND_BUFFER_CONFIG,
options, socketBufferSizeOpt);
+ CommandLineUtils.maybeMergeOptions(props, BUFFER_MEMORY_CONFIG,
options, maxMemoryBytesOpt);
+ // We currently have 2 options to set the batch.size value. We'll
deprecate/remove one of them in KIP-717.
+ CommandLineUtils.maybeMergeOptions(props, BATCH_SIZE_CONFIG,
options, batchSizeOpt);
+ CommandLineUtils.maybeMergeOptions(props, BATCH_SIZE_CONFIG,
options, maxPartitionMemoryBytesOpt);
+ CommandLineUtils.maybeMergeOptions(props, METADATA_MAX_AGE_CONFIG,
options, metadataExpiryMsOpt);
+ CommandLineUtils.maybeMergeOptions(props, MAX_BLOCK_MS_CONFIG,
options, maxBlockMsOpt);
+
+ return props;
+ }
+ }
+}
diff --git a/tools/src/main/java/org/apache/kafka/tools/LineMessageReader.java
b/tools/src/main/java/org/apache/kafka/tools/LineMessageReader.java
new file mode 100644
index 00000000000..13f90d3745e
--- /dev/null
+++ b/tools/src/main/java/org/apache/kafka/tools/LineMessageReader.java
@@ -0,0 +1,218 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.kafka.tools;
+
+import org.apache.kafka.clients.producer.ProducerRecord;
+import org.apache.kafka.common.KafkaException;
+import org.apache.kafka.common.header.Header;
+import org.apache.kafka.common.header.internals.RecordHeader;
+import org.apache.kafka.tools.api.RecordReader;
+
+import java.io.BufferedReader;
+import java.io.IOException;
+import java.io.InputStream;
+import java.io.InputStreamReader;
+import java.nio.charset.StandardCharsets;
+import java.util.Iterator;
+import java.util.Map;
+import java.util.NoSuchElementException;
+import java.util.regex.Pattern;
+
+import static java.util.Arrays.stream;
+
+/**
+ * The default implementation of {@link RecordReader} for the {@link
ConsoleProducer}. This reader comes with
+ * the ability to parse a record's headers, key and value based on
configurable separators. The reader configuration
+ * is defined as follows:
+ * <p></p>
+ * <pre>
+ * parse.key : indicates if a record's key is included in a
line input and needs to be parsed. (default: false).
+ * key.separator : the string separating a record's key from its
value. (default: \t).
+ * parse.headers : indicates if record headers are included in a
line input and need to be parsed. (default: false).
+ * headers.delimiter : the string separating the list of headers from
the record key. (default: \t).
+ * headers.separator : the string separating headers. (default: ,).
+ * headers.key.separator : the string separating the key and value within a
header. (default: :).
+ * ignore.error : whether best attempts should be made to ignore
parsing errors. (default: false).
+ * null.marker : record key, record value, header key, header
value which match this marker are replaced by null. (default: null).
+ * </pre>
+ */
+public class LineMessageReader implements RecordReader {
+ private String topic;
+ private boolean parseKey;
+ private String keySeparator = "\t";
+ private boolean parseHeaders;
+ private String headersDelimiter = "\t";
+ private String headersSeparator = ",";
+ private String headersKeySeparator = ":";
+ private boolean ignoreError;
+ private int lineNumber;
+ private final boolean printPrompt = System.console() != null;
+ private Pattern headersSeparatorPattern;
+ private String nullMarker;
+
+ @Override
+ public void configure(Map<String, ?> props) {
+ topic = props.get("topic").toString();
+ if (props.containsKey("parse.key"))
+ parseKey =
props.get("parse.key").toString().trim().equalsIgnoreCase("true");
+ if (props.containsKey("key.separator"))
+ keySeparator = props.get("key.separator").toString();
+ if (props.containsKey("parse.headers"))
+ parseHeaders =
props.get("parse.headers").toString().trim().equalsIgnoreCase("true");
+ if (props.containsKey("headers.delimiter"))
+ headersDelimiter = props.get("headers.delimiter").toString();
+ if (props.containsKey("headers.separator"))
+ headersSeparator = props.get("headers.separator").toString();
+ headersSeparatorPattern = Pattern.compile(headersSeparator);
+ if (props.containsKey("headers.key.separator"))
+ headersKeySeparator =
props.get("headers.key.separator").toString();
+ if (props.containsKey("ignore.error"))
+ ignoreError =
props.get("ignore.error").toString().trim().equalsIgnoreCase("true");
+ if (headersDelimiter.equals(headersSeparator))
+ throw new KafkaException("headers.delimiter and headers.separator
may not be equal");
+ if (headersDelimiter.equals(headersKeySeparator))
+ throw new KafkaException("headers.delimiter and
headers.key.separator may not be equal");
+ if (headersSeparator.equals(headersKeySeparator))
+ throw new KafkaException("headers.separator and
headers.key.separator may not be equal");
+ if (props.containsKey("null.marker"))
+ nullMarker = props.get("null.marker").toString();
+ if (keySeparator.equals(nullMarker))
+ throw new KafkaException("null.marker and key.separator may not be
equal");
+ if (headersSeparator.equals(nullMarker))
+ throw new KafkaException("null.marker and headers.separator may
not be equal");
+ if (headersDelimiter.equals(nullMarker))
+ throw new KafkaException("null.marker and headers.delimiter may
not be equal");
+ if (headersKeySeparator.equals(nullMarker))
+ throw new KafkaException("null.marker and headers.key.separator
may not be equal");
+ }
+
+ @Override
+ public Iterator<ProducerRecord<byte[], byte[]>> readRecords(InputStream
inputStream) {
+ return new Iterator<ProducerRecord<byte[], byte[]>>() {
+ private final BufferedReader reader = new BufferedReader(new
InputStreamReader(inputStream, StandardCharsets.UTF_8));
+ private ProducerRecord<byte[], byte[]> current;
+
+ @Override
+ public boolean hasNext() {
+ if (current != null) {
+ return true;
+ } else {
+ lineNumber += 1;
+ if (printPrompt) {
+ System.out.print(">");
+ }
+
+ String line;
+ try {
+ line = reader.readLine();
+ } catch (IOException e) {
+ throw new KafkaException(e);
+ }
+
+ if (line == null) {
+ current = null;
+ } else {
+ String headers = parse(parseHeaders, line, 0,
headersDelimiter, "headers delimiter");
+ int headerOffset = headers == null ? 0 :
headers.length() + headersDelimiter.length();
+
+ String key = parse(parseKey, line, headerOffset,
keySeparator, "key separator");
+ int keyOffset = key == null ? 0 : key.length() +
keySeparator.length();
+
+ String value = line.substring(headerOffset +
keyOffset);
+
+ ProducerRecord<byte[], byte[]> record = new
ProducerRecord<>(
+ topic,
+ key != null && !key.equals(nullMarker) ?
key.getBytes(StandardCharsets.UTF_8) : null,
+ value != null && !value.equals(nullMarker) ?
value.getBytes(StandardCharsets.UTF_8) : null
+ );
+
+ if (headers != null && !headers.equals(nullMarker)) {
+ stream(splitHeaders(headers)).forEach(header ->
record.headers().add(header.key(), header.value()));
+ }
+ current = record;
+ }
+
+ return current != null;
+ }
+ }
+
+ @Override
+ public ProducerRecord<byte[], byte[]> next() {
+ if (!hasNext()) {
+ throw new NoSuchElementException("no more record");
+ } else {
+ try {
+ return current;
+ } finally {
+ current = null;
+ }
+ }
+ }
+ };
+ }
+
+ private String parse(boolean enabled, String line, int startIndex, String
demarcation, String demarcationName) {
+ if (!enabled) {
+ return null;
+ }
+ int index = line.indexOf(demarcation, startIndex);
+ if (index == -1) {
+ if (ignoreError) {
+ return null;
+ }
+ throw new KafkaException("No " + demarcationName + " found on line
number " + lineNumber + ": '" + line + "'");
+ }
+ return line.substring(startIndex, index);
+ }
+
+ private Header[] splitHeaders(String headers) {
+ return stream(headersSeparatorPattern.split(headers))
+ .map(pair -> {
+ int i = pair.indexOf(headersKeySeparator);
+ if (i == -1) {
+ if (ignoreError) {
+ return new RecordHeader(pair, null);
+ }
+ throw new KafkaException("No header key separator
found in pair '" + pair + "' on line number " + lineNumber);
+ }
+
+ String headerKey = pair.substring(0, i);
+ if (headerKey.equals(nullMarker)) {
+ throw new KafkaException("Header keys should not be
equal to the null marker '" + nullMarker + "' as they can't be null");
+ }
+
+ String value = pair.substring(i +
headersKeySeparator.length());
+ byte[] headerValue = value.equals(nullMarker) ? null :
value.getBytes(StandardCharsets.UTF_8);
+ return new RecordHeader(headerKey, headerValue);
+ }).toArray(Header[]::new);
+ }
+
+ // Visible for testing
+ String keySeparator() {
+ return keySeparator;
+ }
+
+ // Visible for testing
+ boolean parseKey() {
+ return parseKey;
+ }
+
+ // Visible for testing
+ boolean parseHeaders() {
+ return parseHeaders;
+ }
+}
diff --git a/tools/src/main/java/org/apache/kafka/tools/ToolsUtils.java
b/tools/src/main/java/org/apache/kafka/tools/ToolsUtils.java
index bbc30bea01e..57239ee4b89 100644
--- a/tools/src/main/java/org/apache/kafka/tools/ToolsUtils.java
+++ b/tools/src/main/java/org/apache/kafka/tools/ToolsUtils.java
@@ -161,8 +161,7 @@ public class ToolsUtils {
/**
* This is a simple wrapper around `CommandLineUtils.printUsageAndExit`.
* It is needed for tools migration (KAFKA-14525), as there is no Java
equivalent for return type `Nothing`.
- * Can be removed once [[kafka.tools.ConsoleConsumer]]
- * and [[kafka.tools.ConsoleProducer]] are migrated.
+ * Can be removed once [[kafka.tools.ConsoleConsumer]] are migrated.
*
* @param parser Command line options parser.
* @param message Error message.
diff --git
a/tools/src/test/java/org/apache/kafka/tools/ConsoleProducerTest.java
b/tools/src/test/java/org/apache/kafka/tools/ConsoleProducerTest.java
new file mode 100644
index 00000000000..3804224ef17
--- /dev/null
+++ b/tools/src/test/java/org/apache/kafka/tools/ConsoleProducerTest.java
@@ -0,0 +1,252 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.kafka.tools;
+
+import kafka.utils.TestUtils;
+
+import org.apache.kafka.clients.producer.Producer;
+import org.apache.kafka.clients.producer.ProducerConfig;
+import org.apache.kafka.clients.producer.ProducerRecord;
+import org.apache.kafka.common.utils.Exit;
+import org.apache.kafka.tools.ConsoleProducer.ConsoleProducerOptions;
+import org.apache.kafka.tools.api.RecordReader;
+
+import org.junit.jupiter.api.Test;
+import org.mockito.Mockito;
+
+import java.io.File;
+import java.io.IOException;
+import java.io.InputStream;
+import java.io.OutputStream;
+import java.nio.file.Files;
+import java.util.Collections;
+import java.util.Iterator;
+import java.util.Map;
+
+import static java.util.Arrays.asList;
+import static org.junit.jupiter.api.Assertions.assertEquals;
+import static org.junit.jupiter.api.Assertions.assertThrows;
+import static org.junit.jupiter.api.Assertions.assertTrue;
+
+public class ConsoleProducerTest {
+ private static final String[] BOOTSTRAP_SERVER_VALID_ARGS = new String[]{
+ "--bootstrap-server", "localhost:1003,localhost:1004",
+ "--topic", "t3",
+ "--property", "parse.key=true",
+ "--property", "key.separator=#"
+ };
+ private static final String[] INVALID_ARGS = new String[]{
+ "--t", // not a valid argument
+ "t3"
+ };
+ private static final String[] BOOTSTRAP_SERVER_OVERRIDE = new String[]{
+ "--bootstrap-server", "localhost:1002",
+ "--topic", "t3",
+ };
+ private static final String[] CLIENT_ID_OVERRIDE = new String[]{
+ "--bootstrap-server", "localhost:1001",
+ "--topic", "t3",
+ "--producer-property", "client.id=producer-1"
+ };
+ private static final String[]
BATCH_SIZE_OVERRIDDEN_BY_MAX_PARTITION_MEMORY_BYTES_VALUE = new String[]{
+ "--bootstrap-server", "localhost:1002",
+ "--topic", "t3",
+ "--batch-size", "123",
+ "--max-partition-memory-bytes", "456"
+ };
+ private static final String[]
BATCH_SIZE_SET_AND_MAX_PARTITION_MEMORY_BYTES_NOT_SET = new String[]{
+ "--bootstrap-server", "localhost:1002",
+ "--topic", "t3",
+ "--batch-size", "123"
+ };
+ private static final String[]
BATCH_SIZE_NOT_SET_AND_MAX_PARTITION_MEMORY_BYTES_SET = new String[]{
+ "--bootstrap-server", "localhost:1002",
+ "--topic", "t3",
+ "--max-partition-memory-bytes", "456"
+ };
+ private static final String[] BATCH_SIZE_DEFAULT = new String[]{
+ "--bootstrap-server", "localhost:1002",
+ "--topic", "t3"
+ };
+ private static final String[] TEST_RECORD_READER = new String[]{
+ "--bootstrap-server", "localhost:1002",
+ "--topic", "t3",
+ "--line-reader", TestRecordReader.class.getName()
+ };
+
+ @Test
+ public void testValidConfigsBootstrapServer() throws IOException {
+ ConsoleProducerOptions opts = new
ConsoleProducerOptions(BOOTSTRAP_SERVER_VALID_ARGS);
+ ProducerConfig producerConfig = new
ProducerConfig(opts.producerProps());
+
+ assertEquals(asList("localhost:1003", "localhost:1004"),
producerConfig.getList(ProducerConfig.BOOTSTRAP_SERVERS_CONFIG));
+ }
+
+ @Test
+ public void testInvalidConfigs() {
+ Exit.setExitProcedure((statusCode, message) -> {
+ throw new IllegalArgumentException(message);
+ });
+ try {
+ assertThrows(IllegalArgumentException.class, () -> new
ConsoleProducerOptions(INVALID_ARGS));
+ } finally {
+ Exit.resetExitProcedure();
+ }
+ }
+
+ @Test
+ public void testParseKeyProp() throws ReflectiveOperationException,
IOException {
+ ConsoleProducerOptions opts = new
ConsoleProducerOptions(BOOTSTRAP_SERVER_VALID_ARGS);
+ LineMessageReader reader = (LineMessageReader)
Class.forName(opts.readerClass()).getDeclaredConstructor().newInstance();
+ reader.configure(opts.readerProps());
+
+ assertEquals("#", reader.keySeparator());
+ assertTrue(reader.parseKey());
+ }
+
+ @Test
+ public void testParseReaderConfigFile() throws Exception {
+ File propsFile = TestUtils.tempFile();
+ OutputStream propsStream = Files.newOutputStream(propsFile.toPath());
+ propsStream.write("parse.key=true\n".getBytes());
+ propsStream.write("key.separator=|".getBytes());
+ propsStream.close();
+
+ String[] args = new String[]{
+ "--bootstrap-server", "localhost:9092",
+ "--topic", "test",
+ "--property", "key.separator=;",
+ "--property", "parse.headers=true",
+ "--reader-config", propsFile.getAbsolutePath()
+ };
+ ConsoleProducerOptions opts = new ConsoleProducerOptions(args);
+ LineMessageReader reader = (LineMessageReader)
Class.forName(opts.readerClass()).getDeclaredConstructor().newInstance();
+ reader.configure(opts.readerProps());
+
+ assertEquals(";", reader.keySeparator());
+ assertTrue(reader.parseKey());
+ assertTrue(reader.parseHeaders());
+ }
+
+ @Test
+ public void testBootstrapServerOverride() throws IOException {
+ ConsoleProducerOptions opts = new
ConsoleProducerOptions(BOOTSTRAP_SERVER_OVERRIDE);
+ ProducerConfig producerConfig = new
ProducerConfig(opts.producerProps());
+
+ assertEquals(Collections.singletonList("localhost:1002"),
producerConfig.getList(ProducerConfig.BOOTSTRAP_SERVERS_CONFIG));
+ }
+
+ @Test
+ public void testClientIdOverride() throws IOException {
+ ConsoleProducerOptions opts = new
ConsoleProducerOptions(CLIENT_ID_OVERRIDE);
+ ProducerConfig producerConfig = new
ProducerConfig(opts.producerProps());
+
+ assertEquals("producer-1",
producerConfig.getString(ProducerConfig.CLIENT_ID_CONFIG));
+ }
+
+ @Test
+ public void testDefaultClientId() throws IOException {
+ ConsoleProducerOptions opts = new
ConsoleProducerOptions(BOOTSTRAP_SERVER_VALID_ARGS);
+ ProducerConfig producerConfig = new
ProducerConfig(opts.producerProps());
+
+ assertEquals("console-producer",
producerConfig.getString(ProducerConfig.CLIENT_ID_CONFIG));
+ }
+
+ @Test
+ public void testBatchSizeOverriddenByMaxPartitionMemoryBytesValue() throws
IOException {
+ ConsoleProducerOptions opts = new
ConsoleProducerOptions(BATCH_SIZE_OVERRIDDEN_BY_MAX_PARTITION_MEMORY_BYTES_VALUE);
+ ProducerConfig producerConfig = new
ProducerConfig(opts.producerProps());
+
+ assertEquals(456,
producerConfig.getInt(ProducerConfig.BATCH_SIZE_CONFIG));
+ }
+
+ @Test
+ public void testBatchSizeSetAndMaxPartitionMemoryBytesNotSet() throws
IOException {
+ ConsoleProducerOptions opts = new
ConsoleProducerOptions(BATCH_SIZE_SET_AND_MAX_PARTITION_MEMORY_BYTES_NOT_SET);
+ ProducerConfig producerConfig = new
ProducerConfig(opts.producerProps());
+
+ assertEquals(123,
producerConfig.getInt(ProducerConfig.BATCH_SIZE_CONFIG));
+ }
+
+ @Test
+ public void testDefaultBatchSize() throws IOException {
+ ConsoleProducerOptions opts = new
ConsoleProducerOptions(BATCH_SIZE_DEFAULT);
+ ProducerConfig producerConfig = new
ProducerConfig(opts.producerProps());
+
+ assertEquals(16 * 1024,
producerConfig.getInt(ProducerConfig.BATCH_SIZE_CONFIG));
+ }
+
+ @Test
+ public void testBatchSizeNotSetAndMaxPartitionMemoryBytesSet() throws
IOException {
+ ConsoleProducerOptions opts = new
ConsoleProducerOptions(BATCH_SIZE_NOT_SET_AND_MAX_PARTITION_MEMORY_BYTES_SET);
+ ProducerConfig producerConfig = new
ProducerConfig(opts.producerProps());
+
+ assertEquals(456,
producerConfig.getInt(ProducerConfig.BATCH_SIZE_CONFIG));
+ }
+
+ @Test
+ public void testNewReader() throws Exception {
+ ConsoleProducer producer = new ConsoleProducer();
+ TestRecordReader reader = (TestRecordReader)
producer.messageReader(new ConsoleProducerOptions(TEST_RECORD_READER));
+
+ assertEquals(1, reader.configureCount());
+ assertEquals(0, reader.closeCount());
+
+ reader.close();
+ assertEquals(1, reader.closeCount());
+ }
+
+ @Test
+ @SuppressWarnings("unchecked")
+ public void testLoopReader() throws Exception {
+ ConsoleProducer producer = new ConsoleProducer();
+ TestRecordReader reader = (TestRecordReader)
producer.messageReader(new ConsoleProducerOptions(TEST_RECORD_READER));
+
+ producer.loopReader(Mockito.mock(Producer.class), reader, false);
+
+ assertEquals(1, reader.configureCount());
+ assertEquals(1, reader.closeCount());
+ }
+
+ public static class TestRecordReader implements RecordReader {
+ private int configureCount = 0;
+ private int closeCount = 0;
+
+ @Override
+ public void configure(Map<String, ?> configs) {
+ configureCount += 1;
+ }
+
+ @Override
+ public Iterator<ProducerRecord<byte[], byte[]>>
readRecords(InputStream inputStream) {
+ return Collections.emptyIterator();
+ }
+
+ @Override
+ public void close() {
+ closeCount += 1;
+ }
+
+ public int configureCount() {
+ return configureCount;
+ }
+
+ public int closeCount() {
+ return closeCount;
+ }
+ }
+}
diff --git
a/tools/src/test/java/org/apache/kafka/tools/LineMessageReaderTest.java
b/tools/src/test/java/org/apache/kafka/tools/LineMessageReaderTest.java
new file mode 100644
index 00000000000..5864869df1a
--- /dev/null
+++ b/tools/src/test/java/org/apache/kafka/tools/LineMessageReaderTest.java
@@ -0,0 +1,403 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.kafka.tools;
+
+import org.apache.kafka.clients.producer.ProducerRecord;
+import org.apache.kafka.common.KafkaException;
+import org.apache.kafka.common.header.Header;
+import org.apache.kafka.common.header.internals.RecordHeader;
+import org.apache.kafka.tools.api.RecordReader;
+
+import org.junit.jupiter.api.Test;
+
+import java.io.ByteArrayInputStream;
+import java.util.Collections;
+import java.util.Iterator;
+import java.util.List;
+import java.util.NoSuchElementException;
+import java.util.Properties;
+
+import static java.nio.charset.StandardCharsets.UTF_8;
+import static java.util.Arrays.asList;
+import static java.util.Collections.singletonList;
+import static org.apache.kafka.common.utils.Utils.propsToStringMap;
+import static org.junit.jupiter.api.Assertions.assertArrayEquals;
+import static org.junit.jupiter.api.Assertions.assertEquals;
+import static org.junit.jupiter.api.Assertions.assertFalse;
+import static org.junit.jupiter.api.Assertions.assertThrows;
+
+public class LineMessageReaderTest {
+ @Test
+ public void testLineReader() {
+ String input = "key0\tvalue0\nkey1\tvalue1";
+
+ Properties props = defaultTestProps();
+ props.put("parse.headers", "false");
+
+ runTest(props, input, record("key0", "value0"), record("key1",
"value1"));
+ }
+
+ @Test
+ public void testLineReaderHeader() {
+ String input =
"headerKey0:headerValue0,headerKey1:headerValue1\tkey0\tvalue0\n";
+ ProducerRecord<String, String> expected = record(
+ "key0",
+ "value0",
+ asList(
+ new RecordHeader("headerKey0",
"headerValue0".getBytes(UTF_8)),
+ new RecordHeader("headerKey1",
"headerValue1".getBytes(UTF_8))
+ )
+ );
+ runTest(defaultTestProps(), input, expected);
+ }
+
+ @Test
+ public void testMinimalValidInputWithHeaderKeyAndValue() {
+ runTest(defaultTestProps(), ":\t\t", record("", "", singletonList(new
RecordHeader("", "".getBytes(UTF_8)))));
+ }
+
+ @Test
+ public void testKeyMissingValue() {
+ Properties props = defaultTestProps();
+ props.put("parse.headers", "false");
+ runTest(props, "key\t", record("key", ""));
+ }
+
+ @Test
+ public void testDemarcationsLongerThanOne() {
+ Properties props = defaultTestProps();
+ props.put("key.separator", "\t\t");
+ props.put("headers.delimiter", "\t\t");
+ props.put("headers.separator", "---");
+ props.put("headers.key.separator", "::::");
+
+ runTest(
+ props,
+
"headerKey0.0::::headerValue0.0---headerKey1.0::::\t\tkey\t\tvalue",
+ record("key",
+ "value",
+ asList(
+ new RecordHeader("headerKey0.0",
"headerValue0.0".getBytes(UTF_8)),
+ new RecordHeader("headerKey1.0",
"".getBytes(UTF_8))
+ )
+ )
+ );
+ }
+
+ @Test
+ public void testLineReaderHeaderNoKey() {
+ String input = "headerKey:headerValue\tvalue\n";
+
+ Properties props = defaultTestProps();
+ props.put("parse.key", "false");
+
+ runTest(props, input, record(null, "value", singletonList(new
RecordHeader("headerKey", "headerValue".getBytes(UTF_8)))));
+ }
+
+ @Test
+ public void testLineReaderOnlyValue() {
+ Properties props = defaultTestProps();
+ props.put("parse.key", "false");
+ props.put("parse.headers", "false");
+
+ runTest(props, "value\n", record(null, "value"));
+ }
+
+ @Test
+ public void
testParseHeaderEnabledWithCustomDelimiterAndVaryingNumberOfKeyValueHeaderPairs()
{
+ Properties props = defaultTestProps();
+ props.put("key.separator", "#");
+ props.put("headers.delimiter", "!");
+ props.put("headers.separator", "&");
+ props.put("headers.key.separator", ":");
+
+ String input =
+
"headerKey0.0:headerValue0.0&headerKey0.1:headerValue0.1!key0#value0\n" +
+ "headerKey1.0:headerValue1.0!key1#value1";
+
+ ProducerRecord<String, String> record0 = record(
+ "key0",
+ "value0",
+ asList(
+ new RecordHeader("headerKey0.0",
"headerValue0.0".getBytes(UTF_8)),
+ new RecordHeader("headerKey0.1",
"headerValue0.1".getBytes(UTF_8))
+ )
+ );
+ ProducerRecord<String, String> record1 = record(
+ "key1",
+ "value1",
+ singletonList(new RecordHeader("headerKey1.0",
"headerValue1.0".getBytes(UTF_8)))
+ );
+
+ runTest(props, input, record0, record1);
+ }
+
+ @Test
+ public void testMissingKeySeparator() {
+ RecordReader lineReader = new LineMessageReader();
+ String input =
+
"headerKey0.0:headerValue0.0,headerKey0.1:headerValue0.1\tkey0\tvalue0\n" +
+
"headerKey1.0:headerValue1.0\tkey1[MISSING-DELIMITER]value1";
+
+ lineReader.configure(propsToStringMap(defaultTestProps()));
+ Iterator<ProducerRecord<byte[], byte[]>> iter =
lineReader.readRecords(new ByteArrayInputStream(input.getBytes()));
+ iter.next();
+
+ KafkaException expectedException = assertThrows(KafkaException.class,
iter::next);
+
+ assertEquals(
+ "No key separator found on line number 2:
'headerKey1.0:headerValue1.0\tkey1[MISSING-DELIMITER]value1'",
+ expectedException.getMessage()
+ );
+ }
+
+ @Test
+ public void testMissingHeaderKeySeparator() {
+ RecordReader lineReader = new LineMessageReader();
+ String input = "key[MISSING-DELIMITER]val\tkey0\tvalue0\n";
+ lineReader.configure(propsToStringMap(defaultTestProps()));
+ Iterator<ProducerRecord<byte[], byte[]>> iter =
lineReader.readRecords(new ByteArrayInputStream(input.getBytes()));
+
+ KafkaException expectedException = assertThrows(KafkaException.class,
iter::next);
+
+ assertEquals(
+ "No header key separator found in pair
'key[MISSING-DELIMITER]val' on line number 1",
+ expectedException.getMessage()
+ );
+ }
+
+ @Test
+ public void testHeaderDemarcationCollision() {
+ Properties props = defaultTestProps();
+ props.put("headers.delimiter", "\t");
+ props.put("headers.separator", "\t");
+ props.put("headers.key.separator", "\t");
+
+ assertThrowsOnInvalidPatternConfig(props, "headers.delimiter and
headers.separator may not be equal");
+
+ props.put("headers.separator", ",");
+ assertThrowsOnInvalidPatternConfig(props, "headers.delimiter and
headers.key.separator may not be equal");
+
+ props.put("headers.key.separator", ",");
+ assertThrowsOnInvalidPatternConfig(props, "headers.separator and
headers.key.separator may not be equal");
+ }
+
+ @Test
+ public void testIgnoreErrorInInput() {
+ String input =
+ "headerKey0.0:headerValue0.0\tkey0\tvalue0\n" +
+
"headerKey1.0:headerValue1.0,headerKey1.1:headerValue1.1[MISSING-HEADER-DELIMITER]key1\tvalue1\n"
+
+
"headerKey2.0:headerValue2.0\tkey2[MISSING-KEY-DELIMITER]value2\n" +
+
"headerKey3.0:headerValue3.0[MISSING-HEADER-DELIMITER]key3[MISSING-KEY-DELIMITER]value3\n";
+
+ Properties props = defaultTestProps();
+ props.put("ignore.error", "true");
+
+ ProducerRecord<String, String> validRecord = record("key0", "value0",
+ singletonList(new RecordHeader("headerKey0.0",
"headerValue0.0".getBytes(UTF_8))));
+
+ ProducerRecord<String, String> missingHeaderDelimiter = record(
+ null,
+ "value1",
+ asList(
+ new RecordHeader("headerKey1.0",
"headerValue1.0".getBytes(UTF_8)),
+ new RecordHeader("headerKey1.1",
"headerValue1.1[MISSING-HEADER-DELIMITER]key1".getBytes(UTF_8))
+ )
+ );
+
+ ProducerRecord<String, String> missingKeyDelimiter = record(
+ null,
+ "key2[MISSING-KEY-DELIMITER]value2",
+ singletonList(new RecordHeader("headerKey2.0",
"headerValue2.0".getBytes(UTF_8)))
+ );
+
+ ProducerRecord<String, String> missingKeyHeaderDelimiter = record(
+ null,
+
"headerKey3.0:headerValue3.0[MISSING-HEADER-DELIMITER]key3[MISSING-KEY-DELIMITER]value3",
+ Collections.emptyList()
+ );
+
+ runTest(props, input, validRecord, missingHeaderDelimiter,
missingKeyDelimiter, missingKeyHeaderDelimiter);
+ }
+
+ @Test
+ public void testMalformedHeaderIgnoreError() {
+ String input = "key-val\tkey0\tvalue0\n";
+
+ Properties props = defaultTestProps();
+ props.put("ignore.error", "true");
+
+ ProducerRecord<String, String> expected = record("key0", "value0",
singletonList(new RecordHeader("key-val", null)));
+
+ runTest(props, input, expected);
+ }
+
+ @Test
+ public void testNullMarker() {
+ String input =
+ "key\t\n" +
+ "key\t<NULL>\n" +
+ "key\t<NULL>value\n" +
+ "<NULL>\tvalue\n" +
+ "<NULL>\t<NULL>";
+
+ Properties props = defaultTestProps();
+ props.put("null.marker", "<NULL>");
+ props.put("parse.headers", "false");
+ runTest(props, input,
+ record("key", ""),
+ record("key", null),
+ record("key", "<NULL>value"),
+ record(null, "value"),
+ record(null, null));
+
+ // If the null marker is not set
+ props.remove("null.marker");
+ runTest(props, input,
+ record("key", ""),
+ record("key", "<NULL>"),
+ record("key", "<NULL>value"),
+ record("<NULL>", "value"),
+ record("<NULL>", "<NULL>"));
+ }
+
+ @Test
+ public void testNullMarkerWithHeaders() {
+ String input =
+ "h0:v0,h1:v1\t<NULL>\tvalue\n" +
+ "<NULL>\tkey\t<NULL>\n" +
+ "h0:,h1:v1\t<NULL>\t<NULL>\n" +
+ "h0:<NULL>,h1:v1\tkey\t<NULL>\n" +
+ "h0:<NULL>,h1:<NULL>value\tkey\t<NULL>\n";
+ Header header = new RecordHeader("h1", "v1".getBytes(UTF_8));
+
+ Properties props = defaultTestProps();
+ props.put("null.marker", "<NULL>");
+ runTest(props, input,
+ record(null, "value", asList(new RecordHeader("h0",
"v0".getBytes(UTF_8)), header)),
+ record("key", null),
+ record(null, null, asList(new RecordHeader("h0",
"".getBytes(UTF_8)), header)),
+ record("key", null, asList(new RecordHeader("h0", null),
header)),
+ record("key", null, asList(new RecordHeader("h0", null), new
RecordHeader("h1", "<NULL>value".getBytes(UTF_8))))
+ );
+
+ // If the null marker is not set
+ RecordReader lineReader = new LineMessageReader();
+ props.remove("null.marker");
+ lineReader.configure(propsToStringMap(props));
+ Iterator<ProducerRecord<byte[], byte[]>> iter =
lineReader.readRecords(new ByteArrayInputStream(input.getBytes()));
+ assertRecordEquals(record("<NULL>", "value", asList(new
RecordHeader("h0", "v0".getBytes(UTF_8)), header)), iter.next());
+ // line 2 is not valid anymore
+ KafkaException expectedException = assertThrows(KafkaException.class,
iter::next);
+ assertEquals(
+ "No header key separator found in pair '<NULL>' on line number
2",
+ expectedException.getMessage()
+ );
+ assertRecordEquals(record("<NULL>", "<NULL>", asList(new
RecordHeader("h0", "".getBytes(UTF_8)), header)), iter.next());
+ assertRecordEquals(record("key", "<NULL>", asList(new
RecordHeader("h0", "<NULL>".getBytes(UTF_8)), header)), iter.next());
+ assertRecordEquals(record("key", "<NULL>", asList(
+ new RecordHeader("h0", "<NULL>".getBytes(UTF_8)),
+ new RecordHeader("h1", "<NULL>value".getBytes(UTF_8)))),
iter.next()
+ );
+ }
+
+ @Test
+ public void testNullMarkerHeaderKeyThrows() {
+ String input = "<NULL>:v0,h1:v1\tkey\tvalue\n";
+
+ Properties props = defaultTestProps();
+ props.put("null.marker", "<NULL>");
+ RecordReader lineReader = new LineMessageReader();
+ lineReader.configure(propsToStringMap(props));
+ Iterator<ProducerRecord<byte[], byte[]>> iter =
lineReader.readRecords(new ByteArrayInputStream(input.getBytes()));
+ KafkaException expectedException = assertThrows(KafkaException.class,
iter::next);
+ assertEquals(
+ "Header keys should not be equal to the null marker '<NULL>'
as they can't be null",
+ expectedException.getMessage()
+ );
+
+ // If the null marker is not set
+ props.remove("null.marker");
+ runTest(props, input, record("key", "value", asList(
+ new RecordHeader("<NULL>", "v0".getBytes(UTF_8)),
+ new RecordHeader("h1", "v1".getBytes(UTF_8))))
+ );
+ }
+
+ @Test
+ public void testInvalidNullMarker() {
+ Properties props = defaultTestProps();
+ props.put("headers.delimiter", "-");
+ props.put("headers.separator", ":");
+ props.put("headers.key.separator", "/");
+
+ props.put("null.marker", "-");
+ assertThrowsOnInvalidPatternConfig(props, "null.marker and
headers.delimiter may not be equal");
+
+ props.put("null.marker", ":");
+ assertThrowsOnInvalidPatternConfig(props, "null.marker and
headers.separator may not be equal");
+
+ props.put("null.marker", "/");
+ assertThrowsOnInvalidPatternConfig(props, "null.marker and
headers.key.separator may not be equal");
+ }
+
+ private Properties defaultTestProps() {
+ Properties props = new Properties();
+ props.put("topic", "topic");
+ props.put("parse.key", "true");
+ props.put("parse.headers", "true");
+
+ return props;
+ }
+
+ private void assertThrowsOnInvalidPatternConfig(Properties props, String
expectedMessage) {
+ KafkaException exception = assertThrows(KafkaException.class, () ->
new LineMessageReader().configure(propsToStringMap(props)));
+ assertEquals(expectedMessage, exception.getMessage());
+ }
+
+ @SafeVarargs
+ private final void runTest(Properties props, String input,
ProducerRecord<String, String>... expectedRecords) {
+ RecordReader lineReader = new LineMessageReader();
+ lineReader.configure(propsToStringMap(props));
+ Iterator<ProducerRecord<byte[], byte[]>> iter =
lineReader.readRecords(new ByteArrayInputStream(input.getBytes()));
+
+ for (ProducerRecord<String, String> record : expectedRecords) {
+ assertRecordEquals(record, iter.next());
+ }
+
+ assertFalse(iter.hasNext());
+ assertThrows(NoSuchElementException.class, iter::next);
+ }
+
+ // The equality method of ProducerRecord compares memory references for
the header iterator, this is why this custom equality check is used.
+ private <K, V> void assertRecordEquals(ProducerRecord<K, V> expected,
ProducerRecord<byte[], byte[]> actual) {
+ assertEquals(expected.key(), actual.key() == null ? null : new
String(actual.key()));
+ assertEquals(expected.value(), actual.value() == null ? null : new
String(actual.value()));
+ assertArrayEquals(expected.headers().toArray(),
actual.headers().toArray());
+ }
+
+ private <K, V> ProducerRecord<K, V> record(K key, V value, List<Header>
headers) {
+ ProducerRecord<K, V> record = new ProducerRecord<>("topic", key,
value);
+ headers.forEach(h -> record.headers().add(h.key(), h.value()));
+
+ return record;
+ }
+
+ private <K, V> ProducerRecord<K, V> record(K key, V value) {
+ return new ProducerRecord<>("topic", key, value);
+ }
+}
diff --git
a/tools/tools-api/src/main/java/org/apache/kafka/tools/api/RecordReader.java
b/tools/tools-api/src/main/java/org/apache/kafka/tools/api/RecordReader.java
index 305e68af7ad..f8340ddcda1 100644
--- a/tools/tools-api/src/main/java/org/apache/kafka/tools/api/RecordReader.java
+++ b/tools/tools-api/src/main/java/org/apache/kafka/tools/api/RecordReader.java
@@ -28,7 +28,7 @@ import java.util.Map;
* Typical implementations of this interface convert data from an
`InputStream` received via `readRecords` into a
* iterator of `ProducerRecord` instance. Note that implementations must have
a public nullary constructor.
*
- * This is used by the `kafka.tools.ConsoleProducer`.
+ * This is used by the `org.apache.kafka.tools.ConsoleProducer`.
*/
public interface RecordReader extends Closeable, Configurable {