Repository: kafka Updated Branches: refs/heads/trunk d9f052acc -> 06a57cf19
KAFKA-3748: Add consumer-property to console tools consumer ijuma harshach edoardocomar Can you please review the changes. edoardocomar I have addressed your comment of extra space. Author: Bharat Viswanadham <[email protected]> Reviewers: Ewen Cheslack-Postava <[email protected]> Closes #1474 from bharatviswa504/Kafka-3748 Project: http://git-wip-us.apache.org/repos/asf/kafka/repo Commit: http://git-wip-us.apache.org/repos/asf/kafka/commit/06a57cf1 Tree: http://git-wip-us.apache.org/repos/asf/kafka/tree/06a57cf1 Diff: http://git-wip-us.apache.org/repos/asf/kafka/diff/06a57cf1 Branch: refs/heads/trunk Commit: 06a57cf19c45e82245ada886ea885087ce60ab80 Parents: d9f052a Author: Bharat Viswanadham <[email protected]> Authored: Mon Jun 6 23:58:29 2016 -0700 Committer: Ewen Cheslack-Postava <[email protected]> Committed: Mon Jun 6 23:58:29 2016 -0700 ---------------------------------------------------------------------- core/src/main/scala/kafka/tools/ConsoleConsumer.scala | 9 ++++++++- 1 file changed, 8 insertions(+), 1 deletion(-) ---------------------------------------------------------------------- http://git-wip-us.apache.org/repos/asf/kafka/blob/06a57cf1/core/src/main/scala/kafka/tools/ConsoleConsumer.scala ---------------------------------------------------------------------- diff --git a/core/src/main/scala/kafka/tools/ConsoleConsumer.scala b/core/src/main/scala/kafka/tools/ConsoleConsumer.scala index 3b7a214..c1b5aee 100755 --- a/core/src/main/scala/kafka/tools/ConsoleConsumer.scala +++ b/core/src/main/scala/kafka/tools/ConsoleConsumer.scala @@ -167,6 +167,7 @@ object ConsoleConsumer extends Logging { val props = new Properties props.putAll(config.consumerProps) + props.putAll(config.extraConsumerProps) props.put("auto.offset.reset", if (config.fromBeginning) "smallest" else "largest") props.put("zookeeper.connect", config.zkConnectionStr) @@ -189,6 +190,7 @@ object ConsoleConsumer extends Logging { val props = new Properties props.putAll(config.consumerProps) + props.putAll(config.extraConsumerProps) props.put(ConsumerConfig.AUTO_OFFSET_RESET_CONFIG, if (config.options.has(config.resetBeginningOpt)) "earliest" else "latest") props.put(ConsumerConfig.BOOTSTRAP_SERVERS_CONFIG, config.bootstrapServer) props.put(ConsumerConfig.KEY_DESERIALIZER_CLASS_CONFIG, if (config.keyDeserializer != null) config.keyDeserializer else "org.apache.kafka.common.serialization.ByteArrayDeserializer") @@ -216,7 +218,11 @@ object ConsoleConsumer extends Logging { .withRequiredArg .describedAs("urls") .ofType(classOf[String]) - val consumerConfigOpt = parser.accepts("consumer.config", "Consumer config properties file.") + val consumerPropertyOpt = parser.accepts("consumer-property", "A mechanism to pass user-defined properties in the form key=value to the consumer.") + .withRequiredArg + .describedAs("consumer_prop") + .ofType(classOf[String]) + val consumerConfigOpt = parser.accepts("consumer.config", s"Consumer config properties file. Note that ${consumerPropertyOpt} takes precedence over this config.") .withRequiredArg .describedAs("config file") .ofType(classOf[String]) @@ -291,6 +297,7 @@ object ConsoleConsumer extends Logging { topicArg = options.valueOf(topicOrFilterOpt.head) filterSpec = if (options.has(blacklistOpt)) new Blacklist(topicArg) else new Whitelist(topicArg) } + val extraConsumerProps = CommandLineUtils.parseKeyValueArgs(options.valuesOf(consumerPropertyOpt).asScala) val consumerProps = if (options.has(consumerConfigOpt)) Utils.loadProps(options.valueOf(consumerConfigOpt)) else
