Repository: kafka
Updated Branches:
  refs/heads/trunk a8794d8a5 -> 6f5930d63


KAFKA-5278: ConsoleConsumer should honor  `--value-deserializer`

In the original implementation, console-consumer fails to honor 
`--value-deserializer` config.

Author: amethystic <[email protected]>

Reviewers: Guozhang Wang <[email protected]>

Closes #3100 from amethystic/KAFKA-5278


Project: http://git-wip-us.apache.org/repos/asf/kafka/repo
Commit: http://git-wip-us.apache.org/repos/asf/kafka/commit/6f5930d6
Tree: http://git-wip-us.apache.org/repos/asf/kafka/tree/6f5930d6
Diff: http://git-wip-us.apache.org/repos/asf/kafka/diff/6f5930d6

Branch: refs/heads/trunk
Commit: 6f5930d631a7d3fc090cb81e5eb9cd69580f142b
Parents: a8794d8
Author: amethystic <[email protected]>
Authored: Tue May 30 00:12:54 2017 -0700
Committer: Guozhang Wang <[email protected]>
Committed: Tue May 30 00:12:54 2017 -0700

----------------------------------------------------------------------
 core/src/main/scala/kafka/tools/ConsoleConsumer.scala | 11 +++++++++--
 1 file changed, 9 insertions(+), 2 deletions(-)
----------------------------------------------------------------------


http://git-wip-us.apache.org/repos/asf/kafka/blob/6f5930d6/core/src/main/scala/kafka/tools/ConsoleConsumer.scala
----------------------------------------------------------------------
diff --git a/core/src/main/scala/kafka/tools/ConsoleConsumer.scala 
b/core/src/main/scala/kafka/tools/ConsoleConsumer.scala
index 6d27e85..8a41386 100755
--- a/core/src/main/scala/kafka/tools/ConsoleConsumer.scala
+++ b/core/src/main/scala/kafka/tools/ConsoleConsumer.scala
@@ -198,8 +198,8 @@ object ConsoleConsumer extends Logging {
     props.putAll(config.extraConsumerProps)
     props.put(ConsumerConfig.AUTO_OFFSET_RESET_CONFIG, if 
(config.options.has(config.resetBeginningOpt)) "earliest" else "latest")
     props.put(ConsumerConfig.BOOTSTRAP_SERVERS_CONFIG, config.bootstrapServer)
-    props.put(ConsumerConfig.KEY_DESERIALIZER_CLASS_CONFIG, if 
(config.keyDeserializer != null) config.keyDeserializer else 
"org.apache.kafka.common.serialization.ByteArrayDeserializer")
-    props.put(ConsumerConfig.VALUE_DESERIALIZER_CLASS_CONFIG, if 
(config.valueDeserializer != null) config.valueDeserializer else 
"org.apache.kafka.common.serialization.ByteArrayDeserializer")
+    props.put(ConsumerConfig.KEY_DESERIALIZER_CLASS_CONFIG, 
"org.apache.kafka.common.serialization.ByteArrayDeserializer")
+    props.put(ConsumerConfig.VALUE_DESERIALIZER_CLASS_CONFIG, 
"org.apache.kafka.common.serialization.ByteArrayDeserializer")
 
     props
   }
@@ -315,6 +315,13 @@ object ConsoleConsumer extends Logging {
     val keyDeserializer = options.valueOf(keyDeserializerOpt)
     val valueDeserializer = options.valueOf(valueDeserializerOpt)
     val formatter: MessageFormatter = 
messageFormatterClass.newInstance().asInstanceOf[MessageFormatter]
+
+    if (keyDeserializer != null && !keyDeserializer.isEmpty) {
+      formatterArgs.setProperty(ConsumerConfig.KEY_DESERIALIZER_CLASS_CONFIG, 
keyDeserializer)
+    }
+    if (valueDeserializer != null && !valueDeserializer.isEmpty) {
+      
formatterArgs.setProperty(ConsumerConfig.VALUE_DESERIALIZER_CLASS_CONFIG, 
valueDeserializer)
+    }
     formatter.init(formatterArgs)
 
     if (useOldConsumer) {

Reply via email to