This is an automated email from the ASF dual-hosted git repository.
dajac pushed a commit to branch trunk
in repository https://gitbox.apache.org/repos/asf/kafka.git
The following commit(s) were added to refs/heads/trunk by this push:
new 4877a94885a KAFKA-14146: Config file option for
MessageReader/MessageFormatter in ConsoleProducer/ConsoleConsumer (KIP-840)
(#12175)
4877a94885a is described below
commit 4877a94885ab8362e1934711e4ccbef027b847f7
Author: Alexandre Garnier <[email protected]>
AuthorDate: Fri Dec 2 17:03:17 2022 +0100
KAFKA-14146: Config file option for MessageReader/MessageFormatter in
ConsoleProducer/ConsoleConsumer (KIP-840) (#12175)
This patch implements KIP-840 as outlined here:
https://cwiki.apache.org/confluence/pages/viewpage.action?pageId=211884652.
Reviewers: David Jacot <[email protected]>
---
.../main/scala/kafka/tools/ConsoleConsumer.scala | 10 ++++++-
.../main/scala/kafka/tools/ConsoleProducer.scala | 9 +++++-
.../unit/kafka/tools/ConsoleConsumerTest.scala | 33 +++++++++++++++++++--
.../unit/kafka/tools/ConsoleProducerTest.scala | 34 ++++++++++++++++++----
4 files changed, 76 insertions(+), 10 deletions(-)
diff --git a/core/src/main/scala/kafka/tools/ConsoleConsumer.scala
b/core/src/main/scala/kafka/tools/ConsoleConsumer.scala
index 0162998caa3..a70ce920e8e 100755
--- a/core/src/main/scala/kafka/tools/ConsoleConsumer.scala
+++ b/core/src/main/scala/kafka/tools/ConsoleConsumer.scala
@@ -248,6 +248,10 @@ object ConsoleConsumer extends Logging {
.withRequiredArg
.describedAs("prop")
.ofType(classOf[String])
+ val messageFormatterConfigOpt = parser.accepts("formatter-config",
s"Config properties file to initialize the message formatter. Note that
$messageFormatterArgOpt takes precedence over this config.")
+ .withRequiredArg
+ .describedAs("config file")
+ .ofType(classOf[String])
val resetBeginningOpt = parser.accepts("from-beginning", "If the consumer
does not already have an established offset to consume from, " +
"start with the earliest message present in the log rather than the
latest message.")
val maxMessagesOpt = parser.accepts("max-messages", "The maximum number of
messages to consume before exiting. If not set, consumption is continual.")
@@ -307,7 +311,11 @@ object ConsoleConsumer extends Logging {
val partitionArg = if (options.has(partitionIdOpt))
Some(options.valueOf(partitionIdOpt).intValue) else None
val skipMessageOnError = options.has(skipMessageOnErrorOpt)
val messageFormatterClass =
Class.forName(options.valueOf(messageFormatterOpt))
- val formatterArgs =
CommandLineUtils.parseKeyValueArgs(options.valuesOf(messageFormatterArgOpt).asScala)
+ val formatterArgs = if (options.has(messageFormatterConfigOpt))
+ Utils.loadProps(options.valueOf(messageFormatterConfigOpt))
+ else
+ new Properties()
+ formatterArgs ++=
CommandLineUtils.parseKeyValueArgs(options.valuesOf(messageFormatterArgOpt).asScala)
val maxMessages = if (options.has(maxMessagesOpt))
options.valueOf(maxMessagesOpt).intValue else -1
val timeoutMs = if (options.has(timeoutMsOpt))
options.valueOf(timeoutMsOpt).intValue else -1
val bootstrapServer = options.valueOf(bootstrapServerOpt)
diff --git a/core/src/main/scala/kafka/tools/ConsoleProducer.scala
b/core/src/main/scala/kafka/tools/ConsoleProducer.scala
index 302e7f870a2..27c6fb682b0 100644
--- a/core/src/main/scala/kafka/tools/ConsoleProducer.scala
+++ b/core/src/main/scala/kafka/tools/ConsoleProducer.scala
@@ -71,7 +71,10 @@ object ConsoleProducer {
}
def getReaderProps(config: ProducerConfig): Properties = {
- val props = new Properties
+ val props =
+ if (config.options.has(config.readerConfigOpt))
+ Utils.loadProps(config.options.valueOf(config.readerConfigOpt))
+ else new Properties
props.put("topic", config.topic)
props ++= config.cmdLineProps
props
@@ -241,6 +244,10 @@ object ConsoleProducer {
.withRequiredArg
.describedAs("prop")
.ofType(classOf[String])
+ val readerConfigOpt = parser.accepts("reader-config", s"Config properties
file for the message reader. Note that $propertyOpt takes precedence over this
config.")
+ .withRequiredArg
+ .describedAs("config file")
+ .ofType(classOf[String])
val producerPropertyOpt = parser.accepts("producer-property", "A mechanism
to pass user-defined properties in the form key=value to the producer. ")
.withRequiredArg
.describedAs("producer_prop")
diff --git a/core/src/test/scala/unit/kafka/tools/ConsoleConsumerTest.scala
b/core/src/test/scala/unit/kafka/tools/ConsoleConsumerTest.scala
index 686346e232b..9ad65a356ad 100644
--- a/core/src/test/scala/unit/kafka/tools/ConsoleConsumerTest.scala
+++ b/core/src/test/scala/unit/kafka/tools/ConsoleConsumerTest.scala
@@ -483,9 +483,36 @@ class ConsoleConsumerTest {
assertTrue(config.formatterArgs.containsKey("key.deserializer.my-props"))
val formatter = config.formatter.asInstanceOf[DefaultMessageFormatter]
assertTrue(formatter.keyDeserializer.get.isInstanceOf[MockDeserializer])
- assertEquals(1,
formatter.keyDeserializer.get.asInstanceOf[MockDeserializer].configs.size)
- assertEquals("abc",
formatter.keyDeserializer.get.asInstanceOf[MockDeserializer].configs.get("my-props"))
-
assertTrue(formatter.keyDeserializer.get.asInstanceOf[MockDeserializer].isKey)
+ val keyDeserializer =
formatter.keyDeserializer.get.asInstanceOf[MockDeserializer]
+ assertEquals(1, keyDeserializer.configs.size)
+ assertEquals("abc", keyDeserializer.configs.get("my-props"))
+ assertTrue(keyDeserializer.isKey)
+ }
+
+ @Test
+ def testCustomConfigShouldBePassedToConfigureMethod(): Unit = {
+ val propsFile = TestUtils.tempFile()
+ val propsStream = Files.newOutputStream(propsFile.toPath)
+ propsStream.write("key.deserializer.my-props=abc\n".getBytes())
+ propsStream.write("print.key=false".getBytes())
+ propsStream.close()
+
+ val args = Array(
+ "--bootstrap-server", "localhost:9092",
+ "--topic", "test",
+ "--property", "print.key=true",
+ "--property", "key.deserializer=org.apache.kafka.test.MockDeserializer",
+ "--formatter-config", propsFile.getAbsolutePath
+ )
+ val config = new ConsoleConsumer.ConsumerConfig(args)
+ assertTrue(config.formatter.isInstanceOf[DefaultMessageFormatter])
+ assertTrue(config.formatterArgs.containsKey("key.deserializer.my-props"))
+ val formatter = config.formatter.asInstanceOf[DefaultMessageFormatter]
+ assertTrue(formatter.keyDeserializer.get.isInstanceOf[MockDeserializer])
+ val keyDeserializer =
formatter.keyDeserializer.get.asInstanceOf[MockDeserializer]
+ assertEquals(1, keyDeserializer.configs.size)
+ assertEquals("abc", keyDeserializer.configs.get("my-props"))
+ assertTrue(keyDeserializer.isKey)
}
@Test
diff --git a/core/src/test/scala/unit/kafka/tools/ConsoleProducerTest.scala
b/core/src/test/scala/unit/kafka/tools/ConsoleProducerTest.scala
index f136c62b5ff..8a594f92a30 100644
--- a/core/src/test/scala/unit/kafka/tools/ConsoleProducerTest.scala
+++ b/core/src/test/scala/unit/kafka/tools/ConsoleProducerTest.scala
@@ -17,10 +17,11 @@
package kafka.tools
+import java.nio.file.Files
import kafka.tools.ConsoleProducer.LineMessageReader
-import kafka.utils.Exit
+import kafka.utils.{Exit, TestUtils}
import org.apache.kafka.clients.producer.ProducerConfig
-import org.junit.jupiter.api.Assertions.{assertEquals, assertThrows}
+import org.junit.jupiter.api.Assertions.{assertEquals, assertThrows,
assertTrue}
import org.junit.jupiter.api.Test
import java.util
@@ -135,9 +136,32 @@ class ConsoleProducerTest {
def testParseKeyProp(): Unit = {
val config = new ConsoleProducer.ProducerConfig(brokerListValidArgs)
val reader =
Class.forName(config.readerClass).getDeclaredConstructor().newInstance().asInstanceOf[LineMessageReader]
- reader.init(System.in,ConsoleProducer.getReaderProps(config))
- assert(reader.keySeparator == "#")
- assert(reader.parseKey)
+ reader.init(System.in, ConsoleProducer.getReaderProps(config))
+ assertTrue(reader.keySeparator == "#")
+ assertTrue(reader.parseKey)
+ }
+
+ @Test
+ def testParseReaderConfigFile(): Unit = {
+ val propsFile = TestUtils.tempFile()
+ val propsStream = Files.newOutputStream(propsFile.toPath)
+ propsStream.write("parse.key=true\n".getBytes())
+ propsStream.write("key.separator=|".getBytes())
+ propsStream.close()
+
+ val args = Array(
+ "--bootstrap-server", "localhost:9092",
+ "--topic", "test",
+ "--property", "key.separator=;",
+ "--property", "parse.headers=true",
+ "--reader-config", propsFile.getAbsolutePath
+ )
+ val config = new ConsoleProducer.ProducerConfig(args)
+ val reader =
Class.forName(config.readerClass).getDeclaredConstructor().newInstance().asInstanceOf[LineMessageReader]
+ reader.init(System.in, ConsoleProducer.getReaderProps(config))
+ assertEquals(";", reader.keySeparator)
+ assertTrue(reader.parseKey)
+ assertTrue(reader.parseHeaders)
}
@Test