Repository: kafka Updated Branches: refs/heads/0.11.0 847098610 -> f9de919c1
KAFKA-3982: Fix processing order of some of the consumer properties This PR updates processing of console consumer's input properties. For both old and new consumer, the value provided for `auto.offset.reset` indirectly through `consumer.config` or `consumer.property` arguments will now take effect. For new consumer and for `key.deserializer` and `value.deserializer` properties, the precedence order is fixed to first the value directly provided as an argument, then the value provided indirectly via `consumer.property` and then `consumer.config`, and finally a default value. Author: Vahid Hashemian <[email protected]> Reviewers: Guozhang Wang <[email protected]> Closes #1655 from vahidhashemian/KAFKA-3982 (cherry picked from commit b63e41ea78a58bdea78be33f90bfcb61ce5988d3) Signed-off-by: Guozhang Wang <[email protected]> Project: http://git-wip-us.apache.org/repos/asf/kafka/repo Commit: http://git-wip-us.apache.org/repos/asf/kafka/commit/f9de919c Tree: http://git-wip-us.apache.org/repos/asf/kafka/tree/f9de919c Diff: http://git-wip-us.apache.org/repos/asf/kafka/diff/f9de919c Branch: refs/heads/0.11.0 Commit: f9de919c1644e1686d8217048f87fd508a9c21f8 Parents: 8470986 Author: Vahid Hashemian <[email protected]> Authored: Fri Jun 2 12:38:11 2017 -0700 Committer: Guozhang Wang <[email protected]> Committed: Fri Jun 2 12:38:17 2017 -0700 ---------------------------------------------------------------------- .../scala/kafka/tools/ConsoleConsumer.scala | 34 ++++++++++++++++---- .../unit/kafka/tools/ConsoleConsumerTest.scala | 32 ++++++++++++++++++ 2 files changed, 60 insertions(+), 6 deletions(-) ---------------------------------------------------------------------- http://git-wip-us.apache.org/repos/asf/kafka/blob/f9de919c/core/src/main/scala/kafka/tools/ConsoleConsumer.scala ---------------------------------------------------------------------- diff --git a/core/src/main/scala/kafka/tools/ConsoleConsumer.scala b/core/src/main/scala/kafka/tools/ConsoleConsumer.scala index a1e2ffa..664557a 100755 --- a/core/src/main/scala/kafka/tools/ConsoleConsumer.scala +++ b/core/src/main/scala/kafka/tools/ConsoleConsumer.scala @@ -174,7 +174,8 @@ object ConsoleConsumer extends Logging { props.putAll(config.consumerProps) props.putAll(config.extraConsumerProps) - props.put("auto.offset.reset", if (config.fromBeginning) "smallest" else "largest") + if (!props.containsKey("auto.offset.reset")) + props.put("auto.offset.reset", if (config.fromBeginning) "smallest" else "largest") props.put("zookeeper.connect", config.zkConnectionStr) if (!config.options.has(config.deleteConsumerOffsetsOpt) && config.options.has(config.resetBeginningOpt) && @@ -197,7 +198,8 @@ object ConsoleConsumer extends Logging { props.putAll(config.consumerProps) props.putAll(config.extraConsumerProps) - props.put(ConsumerConfig.AUTO_OFFSET_RESET_CONFIG, if (config.options.has(config.resetBeginningOpt)) "earliest" else "latest") + if (!props.containsKey(ConsumerConfig.AUTO_OFFSET_RESET_CONFIG)) + props.put(ConsumerConfig.AUTO_OFFSET_RESET_CONFIG, if (config.options.has(config.resetBeginningOpt)) "earliest" else "latest") props.put(ConsumerConfig.BOOTSTRAP_SERVERS_CONFIG, config.bootstrapServer) props.put(ConsumerConfig.KEY_DESERIALIZER_CLASS_CONFIG, "org.apache.kafka.common.serialization.ByteArrayDeserializer") props.put(ConsumerConfig.VALUE_DESERIALIZER_CLASS_CONFIG, "org.apache.kafka.common.serialization.ByteArrayDeserializer") @@ -327,12 +329,32 @@ object ConsoleConsumer extends Logging { val isolationLevel = options.valueOf(isolationLevelOpt).toString val formatter: MessageFormatter = messageFormatterClass.newInstance().asInstanceOf[MessageFormatter] - if (keyDeserializer != null && !keyDeserializer.isEmpty) { + if (keyDeserializer != null && !keyDeserializer.isEmpty) + // the argument that is provided directly takes precedence formatterArgs.setProperty(ConsumerConfig.KEY_DESERIALIZER_CLASS_CONFIG, keyDeserializer) - } - if (valueDeserializer != null && !valueDeserializer.isEmpty) { + else if (extraConsumerProps.containsKey(ConsumerConfig.KEY_DESERIALIZER_CLASS_CONFIG)) + // then the argument that is provided through --consumer-property + formatterArgs.setProperty(ConsumerConfig.KEY_DESERIALIZER_CLASS_CONFIG, extraConsumerProps.getProperty(ConsumerConfig.KEY_DESERIALIZER_CLASS_CONFIG)) + else if (consumerProps.containsKey(ConsumerConfig.KEY_DESERIALIZER_CLASS_CONFIG)) + // then the argument that is provided through --consumer.config + formatterArgs.setProperty(ConsumerConfig.KEY_DESERIALIZER_CLASS_CONFIG, consumerProps.getProperty(ConsumerConfig.KEY_DESERIALIZER_CLASS_CONFIG)) + else + // the default is used if the argument is not provided directly or indirectly + formatterArgs.setProperty(ConsumerConfig.KEY_DESERIALIZER_CLASS_CONFIG, "org.apache.kafka.common.serialization.ByteArrayDeserializer") + + if (valueDeserializer != null && !valueDeserializer.isEmpty) + // the argument that is provided directly takes precedence formatterArgs.setProperty(ConsumerConfig.VALUE_DESERIALIZER_CLASS_CONFIG, valueDeserializer) - } + else if (extraConsumerProps.containsKey(ConsumerConfig.VALUE_DESERIALIZER_CLASS_CONFIG)) + // then the argument that is provided through --consumer-property + formatterArgs.setProperty(ConsumerConfig.VALUE_DESERIALIZER_CLASS_CONFIG, extraConsumerProps.getProperty(ConsumerConfig.VALUE_DESERIALIZER_CLASS_CONFIG)) + else if (consumerProps.containsKey(ConsumerConfig.VALUE_DESERIALIZER_CLASS_CONFIG)) + // then the argument that is provided through --consumer.config + formatterArgs.setProperty(ConsumerConfig.VALUE_DESERIALIZER_CLASS_CONFIG, consumerProps.getProperty(ConsumerConfig.VALUE_DESERIALIZER_CLASS_CONFIG)) + else + // the default is used if the argument is not provided directly or indirectly + formatterArgs.setProperty(ConsumerConfig.VALUE_DESERIALIZER_CLASS_CONFIG, "org.apache.kafka.common.serialization.ByteArrayDeserializer") + formatter.init(formatterArgs) if (useOldConsumer) { http://git-wip-us.apache.org/repos/asf/kafka/blob/f9de919c/core/src/test/scala/unit/kafka/tools/ConsoleConsumerTest.scala ---------------------------------------------------------------------- diff --git a/core/src/test/scala/unit/kafka/tools/ConsoleConsumerTest.scala b/core/src/test/scala/unit/kafka/tools/ConsoleConsumerTest.scala index e0917a2..3cfb5a5 100644 --- a/core/src/test/scala/unit/kafka/tools/ConsoleConsumerTest.scala +++ b/core/src/test/scala/unit/kafka/tools/ConsoleConsumerTest.scala @@ -191,4 +191,36 @@ class ConsoleConsumerTest { assertEquals("1000", config.consumerProps.getProperty("request.timeout.ms")) } + + @Test + def shouldOverwriteConfigFromConfigFileOrPropertiesWithConfigFromArguments() { + val propsFile = TestUtils.tempFile() + val propsStream = new FileOutputStream(propsFile) + propsStream.write("bootstrap.servers=localhost:9093\n".getBytes()) + propsStream.write("auto.offset.reset=earliest\n".getBytes()) + propsStream.write("key.deserializer=org.apache.kafka.common.serialization.LongDeserializer\n".getBytes()) + propsStream.write("value.deserializer=org.apache.kafka.common.serialization.LongDeserializer".getBytes()) + propsStream.close() + val args: Array[String] = Array( + "--bootstrap-server", "localhost:9092", + "--topic", "test", + "--key-deserializer", "org.apache.kafka.common.serialization.DoubleDeserializer", + "--value-deserializer", "org.apache.kafka.common.serialization.DoubleDeserializer", + "--consumer-property", "auto.offset.reset=latest", + "--consumer-property", "key.deserializer=org.apache.kafka.common.serialization.FloatDeserializer", + "--consumer-property", "value.deserializer=org.apache.kafka.common.serialization.FloatDeserializer", + "--consumer.config", propsFile.getAbsolutePath + ) + + val config = new ConsoleConsumer.ConsumerConfig(args) + val props = ConsoleConsumer.getNewConsumerProps(config) + + assertEquals("localhost:9092", props.getProperty("bootstrap.servers")) + assertEquals("latest", props.getProperty("auto.offset.reset")) + assertEquals("org.apache.kafka.common.serialization.DoubleDeserializer", config.formatterArgs.getProperty("key.deserializer")) + assertEquals("org.apache.kafka.common.serialization.DoubleDeserializer", config.formatterArgs.getProperty("value.deserializer")) + // serde settings applies to message formatter only, not the consumer itself + assertEquals("org.apache.kafka.common.serialization.ByteArrayDeserializer", props.getProperty("key.deserializer")) + assertEquals("org.apache.kafka.common.serialization.ByteArrayDeserializer", props.getProperty("value.deserializer")) + } }
