Repository: kafka Updated Branches: refs/heads/trunk a8794d8a5 -> 6f5930d63
KAFKA-5278: ConsoleConsumer should honor `--value-deserializer` In the original implementation, console-consumer fails to honor `--value-deserializer` config. Author: amethystic <[email protected]> Reviewers: Guozhang Wang <[email protected]> Closes #3100 from amethystic/KAFKA-5278 Project: http://git-wip-us.apache.org/repos/asf/kafka/repo Commit: http://git-wip-us.apache.org/repos/asf/kafka/commit/6f5930d6 Tree: http://git-wip-us.apache.org/repos/asf/kafka/tree/6f5930d6 Diff: http://git-wip-us.apache.org/repos/asf/kafka/diff/6f5930d6 Branch: refs/heads/trunk Commit: 6f5930d631a7d3fc090cb81e5eb9cd69580f142b Parents: a8794d8 Author: amethystic <[email protected]> Authored: Tue May 30 00:12:54 2017 -0700 Committer: Guozhang Wang <[email protected]> Committed: Tue May 30 00:12:54 2017 -0700 ---------------------------------------------------------------------- core/src/main/scala/kafka/tools/ConsoleConsumer.scala | 11 +++++++++-- 1 file changed, 9 insertions(+), 2 deletions(-) ---------------------------------------------------------------------- http://git-wip-us.apache.org/repos/asf/kafka/blob/6f5930d6/core/src/main/scala/kafka/tools/ConsoleConsumer.scala ---------------------------------------------------------------------- diff --git a/core/src/main/scala/kafka/tools/ConsoleConsumer.scala b/core/src/main/scala/kafka/tools/ConsoleConsumer.scala index 6d27e85..8a41386 100755 --- a/core/src/main/scala/kafka/tools/ConsoleConsumer.scala +++ b/core/src/main/scala/kafka/tools/ConsoleConsumer.scala @@ -198,8 +198,8 @@ object ConsoleConsumer extends Logging { props.putAll(config.extraConsumerProps) props.put(ConsumerConfig.AUTO_OFFSET_RESET_CONFIG, if (config.options.has(config.resetBeginningOpt)) "earliest" else "latest") props.put(ConsumerConfig.BOOTSTRAP_SERVERS_CONFIG, config.bootstrapServer) - props.put(ConsumerConfig.KEY_DESERIALIZER_CLASS_CONFIG, if (config.keyDeserializer != null) config.keyDeserializer else "org.apache.kafka.common.serialization.ByteArrayDeserializer") - props.put(ConsumerConfig.VALUE_DESERIALIZER_CLASS_CONFIG, if (config.valueDeserializer != null) config.valueDeserializer else "org.apache.kafka.common.serialization.ByteArrayDeserializer") + props.put(ConsumerConfig.KEY_DESERIALIZER_CLASS_CONFIG, "org.apache.kafka.common.serialization.ByteArrayDeserializer") + props.put(ConsumerConfig.VALUE_DESERIALIZER_CLASS_CONFIG, "org.apache.kafka.common.serialization.ByteArrayDeserializer") props } @@ -315,6 +315,13 @@ object ConsoleConsumer extends Logging { val keyDeserializer = options.valueOf(keyDeserializerOpt) val valueDeserializer = options.valueOf(valueDeserializerOpt) val formatter: MessageFormatter = messageFormatterClass.newInstance().asInstanceOf[MessageFormatter] + + if (keyDeserializer != null && !keyDeserializer.isEmpty) { + formatterArgs.setProperty(ConsumerConfig.KEY_DESERIALIZER_CLASS_CONFIG, keyDeserializer) + } + if (valueDeserializer != null && !valueDeserializer.isEmpty) { + formatterArgs.setProperty(ConsumerConfig.VALUE_DESERIALIZER_CLASS_CONFIG, valueDeserializer) + } formatter.init(formatterArgs) if (useOldConsumer) {
