Repository: kafka
Updated Branches:
  refs/heads/trunk 0a8b10e27 -> b63e41ea7


KAFKA-3982: Fix processing order of some of the consumer properties

This PR updates processing of console consumer's input properties.

For both old and new consumer, the value provided for `auto.offset.reset` 
indirectly through `consumer.config` or `consumer.property` arguments will now 
take effect.
For new consumer and for `key.deserializer` and `value.deserializer` 
properties, the precedence order is fixed to first the value directly provided 
as an argument, then the value provided indirectly via `consumer.property` and 
then `consumer.config`, and finally a default value.

Author: Vahid Hashemian <[email protected]>

Reviewers: Guozhang Wang <[email protected]>

Closes #1655 from vahidhashemian/KAFKA-3982


Project: http://git-wip-us.apache.org/repos/asf/kafka/repo
Commit: http://git-wip-us.apache.org/repos/asf/kafka/commit/b63e41ea
Tree: http://git-wip-us.apache.org/repos/asf/kafka/tree/b63e41ea
Diff: http://git-wip-us.apache.org/repos/asf/kafka/diff/b63e41ea

Branch: refs/heads/trunk
Commit: b63e41ea78a58bdea78be33f90bfcb61ce5988d3
Parents: 0a8b10e
Author: Vahid Hashemian <[email protected]>
Authored: Fri Jun 2 12:38:11 2017 -0700
Committer: Guozhang Wang <[email protected]>
Committed: Fri Jun 2 12:38:11 2017 -0700

----------------------------------------------------------------------
 .../scala/kafka/tools/ConsoleConsumer.scala     | 34 ++++++++++++++++----
 .../unit/kafka/tools/ConsoleConsumerTest.scala  | 32 ++++++++++++++++++
 2 files changed, 60 insertions(+), 6 deletions(-)
----------------------------------------------------------------------


http://git-wip-us.apache.org/repos/asf/kafka/blob/b63e41ea/core/src/main/scala/kafka/tools/ConsoleConsumer.scala
----------------------------------------------------------------------
diff --git a/core/src/main/scala/kafka/tools/ConsoleConsumer.scala 
b/core/src/main/scala/kafka/tools/ConsoleConsumer.scala
index a1e2ffa..664557a 100755
--- a/core/src/main/scala/kafka/tools/ConsoleConsumer.scala
+++ b/core/src/main/scala/kafka/tools/ConsoleConsumer.scala
@@ -174,7 +174,8 @@ object ConsoleConsumer extends Logging {
 
     props.putAll(config.consumerProps)
     props.putAll(config.extraConsumerProps)
-    props.put("auto.offset.reset", if (config.fromBeginning) "smallest" else 
"largest")
+    if (!props.containsKey("auto.offset.reset"))
+      props.put("auto.offset.reset", if (config.fromBeginning) "smallest" else 
"largest")
     props.put("zookeeper.connect", config.zkConnectionStr)
 
     if (!config.options.has(config.deleteConsumerOffsetsOpt) && 
config.options.has(config.resetBeginningOpt) &&
@@ -197,7 +198,8 @@ object ConsoleConsumer extends Logging {
 
     props.putAll(config.consumerProps)
     props.putAll(config.extraConsumerProps)
-    props.put(ConsumerConfig.AUTO_OFFSET_RESET_CONFIG, if 
(config.options.has(config.resetBeginningOpt)) "earliest" else "latest")
+    if (!props.containsKey(ConsumerConfig.AUTO_OFFSET_RESET_CONFIG))
+      props.put(ConsumerConfig.AUTO_OFFSET_RESET_CONFIG, if 
(config.options.has(config.resetBeginningOpt)) "earliest" else "latest")
     props.put(ConsumerConfig.BOOTSTRAP_SERVERS_CONFIG, config.bootstrapServer)
     props.put(ConsumerConfig.KEY_DESERIALIZER_CLASS_CONFIG, 
"org.apache.kafka.common.serialization.ByteArrayDeserializer")
     props.put(ConsumerConfig.VALUE_DESERIALIZER_CLASS_CONFIG, 
"org.apache.kafka.common.serialization.ByteArrayDeserializer")
@@ -327,12 +329,32 @@ object ConsoleConsumer extends Logging {
     val isolationLevel = options.valueOf(isolationLevelOpt).toString
     val formatter: MessageFormatter = 
messageFormatterClass.newInstance().asInstanceOf[MessageFormatter]
 
-    if (keyDeserializer != null && !keyDeserializer.isEmpty) {
+    if (keyDeserializer != null && !keyDeserializer.isEmpty)
+      // the argument that is provided directly takes precedence
       formatterArgs.setProperty(ConsumerConfig.KEY_DESERIALIZER_CLASS_CONFIG, 
keyDeserializer)
-    }
-    if (valueDeserializer != null && !valueDeserializer.isEmpty) {
+    else if 
(extraConsumerProps.containsKey(ConsumerConfig.KEY_DESERIALIZER_CLASS_CONFIG))
+      // then the argument that is provided through --consumer-property
+      formatterArgs.setProperty(ConsumerConfig.KEY_DESERIALIZER_CLASS_CONFIG, 
extraConsumerProps.getProperty(ConsumerConfig.KEY_DESERIALIZER_CLASS_CONFIG))
+    else if 
(consumerProps.containsKey(ConsumerConfig.KEY_DESERIALIZER_CLASS_CONFIG))
+      // then the argument that is provided through --consumer.config
+      formatterArgs.setProperty(ConsumerConfig.KEY_DESERIALIZER_CLASS_CONFIG, 
consumerProps.getProperty(ConsumerConfig.KEY_DESERIALIZER_CLASS_CONFIG))
+    else
+      // the default is used if the argument is not provided directly or 
indirectly
+      formatterArgs.setProperty(ConsumerConfig.KEY_DESERIALIZER_CLASS_CONFIG, 
"org.apache.kafka.common.serialization.ByteArrayDeserializer")
+
+    if (valueDeserializer != null && !valueDeserializer.isEmpty)
+      // the argument that is provided directly takes precedence
       
formatterArgs.setProperty(ConsumerConfig.VALUE_DESERIALIZER_CLASS_CONFIG, 
valueDeserializer)
-    }
+    else if 
(extraConsumerProps.containsKey(ConsumerConfig.VALUE_DESERIALIZER_CLASS_CONFIG))
+      // then the argument that is provided through --consumer-property
+      
formatterArgs.setProperty(ConsumerConfig.VALUE_DESERIALIZER_CLASS_CONFIG, 
extraConsumerProps.getProperty(ConsumerConfig.VALUE_DESERIALIZER_CLASS_CONFIG))
+    else if 
(consumerProps.containsKey(ConsumerConfig.VALUE_DESERIALIZER_CLASS_CONFIG))
+      // then the argument that is provided through --consumer.config
+      
formatterArgs.setProperty(ConsumerConfig.VALUE_DESERIALIZER_CLASS_CONFIG, 
consumerProps.getProperty(ConsumerConfig.VALUE_DESERIALIZER_CLASS_CONFIG))
+    else
+      // the default is used if the argument is not provided directly or 
indirectly
+      
formatterArgs.setProperty(ConsumerConfig.VALUE_DESERIALIZER_CLASS_CONFIG, 
"org.apache.kafka.common.serialization.ByteArrayDeserializer")
+
     formatter.init(formatterArgs)
 
     if (useOldConsumer) {

http://git-wip-us.apache.org/repos/asf/kafka/blob/b63e41ea/core/src/test/scala/unit/kafka/tools/ConsoleConsumerTest.scala
----------------------------------------------------------------------
diff --git a/core/src/test/scala/unit/kafka/tools/ConsoleConsumerTest.scala 
b/core/src/test/scala/unit/kafka/tools/ConsoleConsumerTest.scala
index e0917a2..3cfb5a5 100644
--- a/core/src/test/scala/unit/kafka/tools/ConsoleConsumerTest.scala
+++ b/core/src/test/scala/unit/kafka/tools/ConsoleConsumerTest.scala
@@ -191,4 +191,36 @@ class ConsoleConsumerTest {
 
     assertEquals("1000", 
config.consumerProps.getProperty("request.timeout.ms"))
   }
+
+  @Test
+  def shouldOverwriteConfigFromConfigFileOrPropertiesWithConfigFromArguments() 
{
+    val propsFile = TestUtils.tempFile()
+    val propsStream = new FileOutputStream(propsFile)
+    propsStream.write("bootstrap.servers=localhost:9093\n".getBytes())
+    propsStream.write("auto.offset.reset=earliest\n".getBytes())
+    
propsStream.write("key.deserializer=org.apache.kafka.common.serialization.LongDeserializer\n".getBytes())
+    
propsStream.write("value.deserializer=org.apache.kafka.common.serialization.LongDeserializer".getBytes())
+    propsStream.close()
+    val args: Array[String] = Array(
+      "--bootstrap-server", "localhost:9092",
+      "--topic", "test",
+      "--key-deserializer", 
"org.apache.kafka.common.serialization.DoubleDeserializer",
+      "--value-deserializer", 
"org.apache.kafka.common.serialization.DoubleDeserializer",
+      "--consumer-property", "auto.offset.reset=latest",
+      "--consumer-property", 
"key.deserializer=org.apache.kafka.common.serialization.FloatDeserializer",
+      "--consumer-property", 
"value.deserializer=org.apache.kafka.common.serialization.FloatDeserializer",
+      "--consumer.config", propsFile.getAbsolutePath
+    )
+
+    val config = new ConsoleConsumer.ConsumerConfig(args)
+    val props = ConsoleConsumer.getNewConsumerProps(config)
+
+    assertEquals("localhost:9092", props.getProperty("bootstrap.servers"))
+    assertEquals("latest", props.getProperty("auto.offset.reset"))
+    assertEquals("org.apache.kafka.common.serialization.DoubleDeserializer", 
config.formatterArgs.getProperty("key.deserializer"))
+    assertEquals("org.apache.kafka.common.serialization.DoubleDeserializer", 
config.formatterArgs.getProperty("value.deserializer"))
+    // serde settings applies to message formatter only, not the consumer 
itself
+    
assertEquals("org.apache.kafka.common.serialization.ByteArrayDeserializer", 
props.getProperty("key.deserializer"))
+    
assertEquals("org.apache.kafka.common.serialization.ByteArrayDeserializer", 
props.getProperty("value.deserializer"))
+  }
 }

Reply via email to