dajac commented on code in PR #12175:
URL: https://github.com/apache/kafka/pull/12175#discussion_r950125824
##########
core/src/main/scala/kafka/tools/ConsoleConsumer.scala:
##########
@@ -307,7 +311,11 @@ object ConsoleConsumer extends Logging {
val partitionArg = if (options.has(partitionIdOpt))
Some(options.valueOf(partitionIdOpt).intValue) else None
val skipMessageOnError = options.has(skipMessageOnErrorOpt)
val messageFormatterClass =
Class.forName(options.valueOf(messageFormatterOpt))
- val formatterArgs =
CommandLineUtils.parseKeyValueArgs(options.valuesOf(messageFormatterArgOpt).asScala)
+ val formatterArgs = if (options.has(messageFormatterConfigOpt))
+ Utils.loadProps(options.valueOf(messageFormatterConfigOpt))
+ else
+ new Properties()
Review Comment:
nit: I think that you can omit the `()` here.
##########
core/src/main/scala/kafka/tools/ConsoleConsumer.scala:
##########
@@ -307,7 +311,11 @@ object ConsoleConsumer extends Logging {
val partitionArg = if (options.has(partitionIdOpt))
Some(options.valueOf(partitionIdOpt).intValue) else None
val skipMessageOnError = options.has(skipMessageOnErrorOpt)
val messageFormatterClass =
Class.forName(options.valueOf(messageFormatterOpt))
- val formatterArgs =
CommandLineUtils.parseKeyValueArgs(options.valuesOf(messageFormatterArgOpt).asScala)
+ val formatterArgs = if (options.has(messageFormatterConfigOpt))
Review Comment:
nit: There is an extra space before `if`.
##########
core/src/main/scala/kafka/tools/ConsoleProducer.scala:
##########
@@ -71,7 +71,10 @@ object ConsoleProducer {
}
def getReaderProps(config: ProducerConfig): Properties = {
- val props = new Properties
+ val props =
+ if (config.options.has(config.readerConfigOpt))
+ Utils.loadProps(config.options.valueOf(config.readerConfigOpt))
+ else new Properties
Review Comment:
nit: Could we format it as follow?
```
val props = if (config.options.has(config.readerConfigOpt))
Utils.loadProps(config.options.valueOf(config.readerConfigOpt))
else
new Properties
```
##########
core/src/test/scala/unit/kafka/tools/ConsoleProducerTest.scala:
##########
@@ -140,6 +141,29 @@ class ConsoleProducerTest {
assert(reader.parseKey)
}
+ @Test
+ def testParseConfigFile (): Unit = {
Review Comment:
nit: Should we name it `testParseReaderConfigFile`? There is also an extra
space before `()`.
##########
core/src/test/scala/unit/kafka/tools/ConsoleConsumerTest.scala:
##########
@@ -488,6 +488,31 @@ class ConsoleConsumerTest {
assertTrue(formatter.keyDeserializer.get.asInstanceOf[MockDeserializer].isKey)
}
+ @Test
+ def testCustomConfigShouldBePassedToConfigureMethod(): Unit = {
+ val propsFile = TestUtils.tempFile()
+ val propsStream = Files.newOutputStream(propsFile.toPath)
+ propsStream.write("key.deserializer.my-props=abc\n".getBytes())
+ propsStream.write("print.key=false".getBytes())
+ propsStream.close()
+
+ val args = Array(
+ "--bootstrap-server", "localhost:9092",
+ "--topic", "test",
+ "--property", "print.key=true",
+ "--property", "key.deserializer=org.apache.kafka.test.MockDeserializer",
+ "--formatter-config", propsFile.getAbsolutePath
+ )
+ val config = new ConsoleConsumer.ConsumerConfig(args)
+ assertTrue(config.formatter.isInstanceOf[DefaultMessageFormatter])
+ assertTrue(config.formatterArgs.containsKey("key.deserializer.my-props"))
+ val formatter = config.formatter.asInstanceOf[DefaultMessageFormatter]
+ assertTrue(formatter.keyDeserializer.get.isInstanceOf[MockDeserializer])
+ assertEquals(1,
formatter.keyDeserializer.get.asInstanceOf[MockDeserializer].configs.size)
Review Comment:
nit: Should we define `keyDeserializer =
formatter.keyDeserializer.get.asInstanceOf[MockDeserializer]` instead of having
`formatter.keyDeserializer.get.asInstanceOf[MockDeserializer]` in all
assertions?
##########
core/src/test/scala/unit/kafka/tools/ConsoleProducerTest.scala:
##########
@@ -140,6 +141,29 @@ class ConsoleProducerTest {
assert(reader.parseKey)
}
+ @Test
+ def testParseConfigFile (): Unit = {
+ val propsFile = TestUtils.tempFile()
+ val propsStream = Files.newOutputStream(propsFile.toPath)
+ propsStream.write("parse.key=true\n".getBytes())
+ propsStream.write("key.separator=|".getBytes())
+ propsStream.close()
+
+ val args = Array(
+ "--bootstrap-server", "localhost:9092",
+ "--topic", "test",
+ "--property", "key.separator=;",
+ "--property", "parse.headers=true",
+ "--reader-config", propsFile.getAbsolutePath
+ )
+ val config = new ConsoleProducer.ProducerConfig(args)
+ val reader =
Class.forName(config.readerClass).getDeclaredConstructor().newInstance().asInstanceOf[LineMessageReader]
+ reader.init(System.in,ConsoleProducer.getReaderProps(config))
Review Comment:
nit: We need to add a space after `,`.
##########
core/src/test/scala/unit/kafka/tools/ConsoleProducerTest.scala:
##########
@@ -140,6 +141,29 @@ class ConsoleProducerTest {
assert(reader.parseKey)
}
+ @Test
+ def testParseConfigFile (): Unit = {
+ val propsFile = TestUtils.tempFile()
+ val propsStream = Files.newOutputStream(propsFile.toPath)
+ propsStream.write("parse.key=true\n".getBytes())
+ propsStream.write("key.separator=|".getBytes())
+ propsStream.close()
+
+ val args = Array(
+ "--bootstrap-server", "localhost:9092",
+ "--topic", "test",
+ "--property", "key.separator=;",
+ "--property", "parse.headers=true",
+ "--reader-config", propsFile.getAbsolutePath
+ )
+ val config = new ConsoleProducer.ProducerConfig(args)
+ val reader =
Class.forName(config.readerClass).getDeclaredConstructor().newInstance().asInstanceOf[LineMessageReader]
+ reader.init(System.in,ConsoleProducer.getReaderProps(config))
+ assertEquals(";", reader.keySeparator)
+ assert(reader.parseKey)
+ assert(reader.parseHeaders)
Review Comment:
nit: Could we use `assertTrue`?
##########
core/src/test/scala/unit/kafka/tools/ConsoleConsumerTest.scala:
##########
@@ -488,6 +488,31 @@ class ConsoleConsumerTest {
assertTrue(formatter.keyDeserializer.get.asInstanceOf[MockDeserializer].isKey)
}
+ @Test
+ def testCustomConfigShouldBePassedToConfigureMethod(): Unit = {
+ val propsFile = TestUtils.tempFile()
+ val propsStream = Files.newOutputStream(propsFile.toPath)
+ propsStream.write("key.deserializer.my-props=abc\n".getBytes())
+ propsStream.write("print.key=false".getBytes())
+ propsStream.close()
+
+ val args = Array(
+ "--bootstrap-server", "localhost:9092",
+ "--topic", "test",
+ "--property", "print.key=true",
+ "--property", "key.deserializer=org.apache.kafka.test.MockDeserializer",
+ "--formatter-config", propsFile.getAbsolutePath
+ )
+ val config = new ConsoleConsumer.ConsumerConfig(args)
+ assertTrue(config.formatter.isInstanceOf[DefaultMessageFormatter])
+ assertTrue(config.formatterArgs.containsKey("key.deserializer.my-props"))
Review Comment:
nit: I would remove this one because we implicitly verifies it with
`assertEquals("abc",
formatter.keyDeserializer.get.asInstanceOf[MockDeserializer].configs.get("my-props"))`.
What do you think?
##########
core/src/test/scala/unit/kafka/tools/ConsoleConsumerTest.scala:
##########
@@ -488,6 +488,31 @@ class ConsoleConsumerTest {
assertTrue(formatter.keyDeserializer.get.asInstanceOf[MockDeserializer].isKey)
}
+ @Test
+ def testCustomConfigShouldBePassedToConfigureMethod(): Unit = {
+ val propsFile = TestUtils.tempFile()
+ val propsStream = Files.newOutputStream(propsFile.toPath)
+ propsStream.write("key.deserializer.my-props=abc\n".getBytes())
+ propsStream.write("print.key=false".getBytes())
+ propsStream.close()
Review Comment:
I am actually surprised that we don't have an helper for doing this. I think
that this is something that we could add to `TestUtils`. Something like
`tempPropertiesFile(props: Map[String, String]): String`. If you are
interested, this is something that we could do in a follow up PR.
--
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.
To unsubscribe, e-mail: [email protected]
For queries about this service, please contact Infrastructure at:
[email protected]