This is an automated email from the ASF dual-hosted git repository.

dajac pushed a commit to branch trunk
in repository https://gitbox.apache.org/repos/asf/kafka.git


The following commit(s) were added to refs/heads/trunk by this push:
     new 4877a94885a KAFKA-14146: Config file option for 
MessageReader/MessageFormatter in ConsoleProducer/ConsoleConsumer (KIP-840) 
(#12175)
4877a94885a is described below

commit 4877a94885ab8362e1934711e4ccbef027b847f7
Author: Alexandre Garnier <[email protected]>
AuthorDate: Fri Dec 2 17:03:17 2022 +0100

    KAFKA-14146: Config file option for MessageReader/MessageFormatter in 
ConsoleProducer/ConsoleConsumer (KIP-840) (#12175)
    
    This patch implements KIP-840 as outlined here: 
https://cwiki.apache.org/confluence/pages/viewpage.action?pageId=211884652.
    
    Reviewers: David Jacot <[email protected]>
---
 .../main/scala/kafka/tools/ConsoleConsumer.scala   | 10 ++++++-
 .../main/scala/kafka/tools/ConsoleProducer.scala   |  9 +++++-
 .../unit/kafka/tools/ConsoleConsumerTest.scala     | 33 +++++++++++++++++++--
 .../unit/kafka/tools/ConsoleProducerTest.scala     | 34 ++++++++++++++++++----
 4 files changed, 76 insertions(+), 10 deletions(-)

diff --git a/core/src/main/scala/kafka/tools/ConsoleConsumer.scala 
b/core/src/main/scala/kafka/tools/ConsoleConsumer.scala
index 0162998caa3..a70ce920e8e 100755
--- a/core/src/main/scala/kafka/tools/ConsoleConsumer.scala
+++ b/core/src/main/scala/kafka/tools/ConsoleConsumer.scala
@@ -248,6 +248,10 @@ object ConsoleConsumer extends Logging {
       .withRequiredArg
       .describedAs("prop")
       .ofType(classOf[String])
+    val messageFormatterConfigOpt = parser.accepts("formatter-config", 
s"Config properties file to initialize the message formatter. Note that 
$messageFormatterArgOpt takes precedence over this config.")
+      .withRequiredArg
+      .describedAs("config file")
+      .ofType(classOf[String])
     val resetBeginningOpt = parser.accepts("from-beginning", "If the consumer 
does not already have an established offset to consume from, " +
       "start with the earliest message present in the log rather than the 
latest message.")
     val maxMessagesOpt = parser.accepts("max-messages", "The maximum number of 
messages to consume before exiting. If not set, consumption is continual.")
@@ -307,7 +311,11 @@ object ConsoleConsumer extends Logging {
     val partitionArg = if (options.has(partitionIdOpt)) 
Some(options.valueOf(partitionIdOpt).intValue) else None
     val skipMessageOnError = options.has(skipMessageOnErrorOpt)
     val messageFormatterClass = 
Class.forName(options.valueOf(messageFormatterOpt))
-    val formatterArgs = 
CommandLineUtils.parseKeyValueArgs(options.valuesOf(messageFormatterArgOpt).asScala)
+    val formatterArgs = if (options.has(messageFormatterConfigOpt))
+      Utils.loadProps(options.valueOf(messageFormatterConfigOpt))
+    else
+      new Properties()
+    formatterArgs ++= 
CommandLineUtils.parseKeyValueArgs(options.valuesOf(messageFormatterArgOpt).asScala)
     val maxMessages = if (options.has(maxMessagesOpt)) 
options.valueOf(maxMessagesOpt).intValue else -1
     val timeoutMs = if (options.has(timeoutMsOpt)) 
options.valueOf(timeoutMsOpt).intValue else -1
     val bootstrapServer = options.valueOf(bootstrapServerOpt)
diff --git a/core/src/main/scala/kafka/tools/ConsoleProducer.scala 
b/core/src/main/scala/kafka/tools/ConsoleProducer.scala
index 302e7f870a2..27c6fb682b0 100644
--- a/core/src/main/scala/kafka/tools/ConsoleProducer.scala
+++ b/core/src/main/scala/kafka/tools/ConsoleProducer.scala
@@ -71,7 +71,10 @@ object ConsoleProducer {
   }
 
   def getReaderProps(config: ProducerConfig): Properties = {
-    val props = new Properties
+    val props =
+      if (config.options.has(config.readerConfigOpt))
+        Utils.loadProps(config.options.valueOf(config.readerConfigOpt))
+      else new Properties
     props.put("topic", config.topic)
     props ++= config.cmdLineProps
     props
@@ -241,6 +244,10 @@ object ConsoleProducer {
       .withRequiredArg
       .describedAs("prop")
       .ofType(classOf[String])
+    val readerConfigOpt = parser.accepts("reader-config", s"Config properties 
file for the message reader. Note that $propertyOpt takes precedence over this 
config.")
+      .withRequiredArg
+      .describedAs("config file")
+      .ofType(classOf[String])
     val producerPropertyOpt = parser.accepts("producer-property", "A mechanism 
to pass user-defined properties in the form key=value to the producer. ")
             .withRequiredArg
             .describedAs("producer_prop")
diff --git a/core/src/test/scala/unit/kafka/tools/ConsoleConsumerTest.scala 
b/core/src/test/scala/unit/kafka/tools/ConsoleConsumerTest.scala
index 686346e232b..9ad65a356ad 100644
--- a/core/src/test/scala/unit/kafka/tools/ConsoleConsumerTest.scala
+++ b/core/src/test/scala/unit/kafka/tools/ConsoleConsumerTest.scala
@@ -483,9 +483,36 @@ class ConsoleConsumerTest {
     assertTrue(config.formatterArgs.containsKey("key.deserializer.my-props"))
     val formatter = config.formatter.asInstanceOf[DefaultMessageFormatter]
     assertTrue(formatter.keyDeserializer.get.isInstanceOf[MockDeserializer])
-    assertEquals(1, 
formatter.keyDeserializer.get.asInstanceOf[MockDeserializer].configs.size)
-    assertEquals("abc", 
formatter.keyDeserializer.get.asInstanceOf[MockDeserializer].configs.get("my-props"))
-    
assertTrue(formatter.keyDeserializer.get.asInstanceOf[MockDeserializer].isKey)
+    val keyDeserializer = 
formatter.keyDeserializer.get.asInstanceOf[MockDeserializer]
+    assertEquals(1, keyDeserializer.configs.size)
+    assertEquals("abc", keyDeserializer.configs.get("my-props"))
+    assertTrue(keyDeserializer.isKey)
+  }
+
+  @Test
+  def testCustomConfigShouldBePassedToConfigureMethod(): Unit = {
+    val propsFile = TestUtils.tempFile()
+    val propsStream = Files.newOutputStream(propsFile.toPath)
+    propsStream.write("key.deserializer.my-props=abc\n".getBytes())
+    propsStream.write("print.key=false".getBytes())
+    propsStream.close()
+
+    val args = Array(
+      "--bootstrap-server", "localhost:9092",
+      "--topic", "test",
+      "--property", "print.key=true",
+      "--property", "key.deserializer=org.apache.kafka.test.MockDeserializer",
+      "--formatter-config", propsFile.getAbsolutePath
+    )
+    val config = new ConsoleConsumer.ConsumerConfig(args)
+    assertTrue(config.formatter.isInstanceOf[DefaultMessageFormatter])
+    assertTrue(config.formatterArgs.containsKey("key.deserializer.my-props"))
+    val formatter = config.formatter.asInstanceOf[DefaultMessageFormatter]
+    assertTrue(formatter.keyDeserializer.get.isInstanceOf[MockDeserializer])
+    val keyDeserializer = 
formatter.keyDeserializer.get.asInstanceOf[MockDeserializer]
+    assertEquals(1, keyDeserializer.configs.size)
+    assertEquals("abc", keyDeserializer.configs.get("my-props"))
+    assertTrue(keyDeserializer.isKey)
   }
 
   @Test
diff --git a/core/src/test/scala/unit/kafka/tools/ConsoleProducerTest.scala 
b/core/src/test/scala/unit/kafka/tools/ConsoleProducerTest.scala
index f136c62b5ff..8a594f92a30 100644
--- a/core/src/test/scala/unit/kafka/tools/ConsoleProducerTest.scala
+++ b/core/src/test/scala/unit/kafka/tools/ConsoleProducerTest.scala
@@ -17,10 +17,11 @@
 
 package kafka.tools
 
+import java.nio.file.Files
 import kafka.tools.ConsoleProducer.LineMessageReader
-import kafka.utils.Exit
+import kafka.utils.{Exit, TestUtils}
 import org.apache.kafka.clients.producer.ProducerConfig
-import org.junit.jupiter.api.Assertions.{assertEquals, assertThrows}
+import org.junit.jupiter.api.Assertions.{assertEquals, assertThrows, 
assertTrue}
 import org.junit.jupiter.api.Test
 
 import java.util
@@ -135,9 +136,32 @@ class ConsoleProducerTest {
   def testParseKeyProp(): Unit = {
     val config = new ConsoleProducer.ProducerConfig(brokerListValidArgs)
     val reader = 
Class.forName(config.readerClass).getDeclaredConstructor().newInstance().asInstanceOf[LineMessageReader]
-    reader.init(System.in,ConsoleProducer.getReaderProps(config))
-    assert(reader.keySeparator == "#")
-    assert(reader.parseKey)
+    reader.init(System.in, ConsoleProducer.getReaderProps(config))
+    assertTrue(reader.keySeparator == "#")
+    assertTrue(reader.parseKey)
+  }
+
+  @Test
+  def testParseReaderConfigFile(): Unit = {
+    val propsFile = TestUtils.tempFile()
+    val propsStream = Files.newOutputStream(propsFile.toPath)
+    propsStream.write("parse.key=true\n".getBytes())
+    propsStream.write("key.separator=|".getBytes())
+    propsStream.close()
+
+    val args = Array(
+      "--bootstrap-server", "localhost:9092",
+      "--topic", "test",
+      "--property", "key.separator=;",
+      "--property", "parse.headers=true",
+      "--reader-config", propsFile.getAbsolutePath
+    )
+    val config = new ConsoleProducer.ProducerConfig(args)
+    val reader = 
Class.forName(config.readerClass).getDeclaredConstructor().newInstance().asInstanceOf[LineMessageReader]
+    reader.init(System.in, ConsoleProducer.getReaderProps(config))
+    assertEquals(";", reader.keySeparator)
+    assertTrue(reader.parseKey)
+    assertTrue(reader.parseHeaders)
   }
 
   @Test

Reply via email to