Repository: kafka Updated Branches: refs/heads/0.11.0 9dd7db2f5 -> 0521c65bc
KAFKA-5278: ConsoleConsumer should honor `--value-deserializer` In the original implementation, console-consumer fails to honor `--value-deserializer` config. Author: amethystic <[email protected]> Reviewers: Guozhang Wang <[email protected]> Closes #3100 from amethystic/KAFKA-5278 (cherry picked from commit 6f5930d631a7d3fc090cb81e5eb9cd69580f142b) Signed-off-by: Guozhang Wang <[email protected]> Project: http://git-wip-us.apache.org/repos/asf/kafka/repo Commit: http://git-wip-us.apache.org/repos/asf/kafka/commit/0521c65b Tree: http://git-wip-us.apache.org/repos/asf/kafka/tree/0521c65b Diff: http://git-wip-us.apache.org/repos/asf/kafka/diff/0521c65b Branch: refs/heads/0.11.0 Commit: 0521c65bcf89684df82873d219436226adda3e07 Parents: 9dd7db2 Author: amethystic <[email protected]> Authored: Tue May 30 00:12:54 2017 -0700 Committer: Guozhang Wang <[email protected]> Committed: Tue May 30 00:13:05 2017 -0700 ---------------------------------------------------------------------- core/src/main/scala/kafka/tools/ConsoleConsumer.scala | 11 +++++++++-- 1 file changed, 9 insertions(+), 2 deletions(-) ---------------------------------------------------------------------- http://git-wip-us.apache.org/repos/asf/kafka/blob/0521c65b/core/src/main/scala/kafka/tools/ConsoleConsumer.scala ---------------------------------------------------------------------- diff --git a/core/src/main/scala/kafka/tools/ConsoleConsumer.scala b/core/src/main/scala/kafka/tools/ConsoleConsumer.scala index 6d27e85..8a41386 100755 --- a/core/src/main/scala/kafka/tools/ConsoleConsumer.scala +++ b/core/src/main/scala/kafka/tools/ConsoleConsumer.scala @@ -198,8 +198,8 @@ object ConsoleConsumer extends Logging { props.putAll(config.extraConsumerProps) props.put(ConsumerConfig.AUTO_OFFSET_RESET_CONFIG, if (config.options.has(config.resetBeginningOpt)) "earliest" else "latest") props.put(ConsumerConfig.BOOTSTRAP_SERVERS_CONFIG, config.bootstrapServer) - props.put(ConsumerConfig.KEY_DESERIALIZER_CLASS_CONFIG, if (config.keyDeserializer != null) config.keyDeserializer else "org.apache.kafka.common.serialization.ByteArrayDeserializer") - props.put(ConsumerConfig.VALUE_DESERIALIZER_CLASS_CONFIG, if (config.valueDeserializer != null) config.valueDeserializer else "org.apache.kafka.common.serialization.ByteArrayDeserializer") + props.put(ConsumerConfig.KEY_DESERIALIZER_CLASS_CONFIG, "org.apache.kafka.common.serialization.ByteArrayDeserializer") + props.put(ConsumerConfig.VALUE_DESERIALIZER_CLASS_CONFIG, "org.apache.kafka.common.serialization.ByteArrayDeserializer") props } @@ -315,6 +315,13 @@ object ConsoleConsumer extends Logging { val keyDeserializer = options.valueOf(keyDeserializerOpt) val valueDeserializer = options.valueOf(valueDeserializerOpt) val formatter: MessageFormatter = messageFormatterClass.newInstance().asInstanceOf[MessageFormatter] + + if (keyDeserializer != null && !keyDeserializer.isEmpty) { + formatterArgs.setProperty(ConsumerConfig.KEY_DESERIALIZER_CLASS_CONFIG, keyDeserializer) + } + if (valueDeserializer != null && !valueDeserializer.isEmpty) { + formatterArgs.setProperty(ConsumerConfig.VALUE_DESERIALIZER_CLASS_CONFIG, valueDeserializer) + } formatter.init(formatterArgs) if (useOldConsumer) {
