Repository: kafka Updated Branches: refs/heads/0.11.0 f2673eb37 -> 27578d009
KAFKA-5373; Revert breaking change to console consumer This patch reverts b63e41ea78a58bdea78be33f90bfcb61ce5988d3 since it broke the console consumer -- the consumer prints the addresses of the messages instead of the contents with that patch. Author: Apurva Mehta <[email protected]> Reviewers: Ismael Juma <[email protected]> Closes #3218 from apurvam/KAFKA-5373-fix-console-consumer (cherry picked from commit 8104c0de273fb7627c4172f18a609472186860fd) Signed-off-by: Ismael Juma <[email protected]> Project: http://git-wip-us.apache.org/repos/asf/kafka/repo Commit: http://git-wip-us.apache.org/repos/asf/kafka/commit/27578d00 Tree: http://git-wip-us.apache.org/repos/asf/kafka/tree/27578d00 Diff: http://git-wip-us.apache.org/repos/asf/kafka/diff/27578d00 Branch: refs/heads/0.11.0 Commit: 27578d0096663993612849121fdc5dfb509a117f Parents: f2673eb Author: Apurva Mehta <[email protected]> Authored: Sat Jun 3 01:53:23 2017 +0100 Committer: Ismael Juma <[email protected]> Committed: Sat Jun 3 01:53:56 2017 +0100 ---------------------------------------------------------------------- .../scala/kafka/tools/ConsoleConsumer.scala | 34 ++++---------------- .../unit/kafka/tools/ConsoleConsumerTest.scala | 32 ------------------ 2 files changed, 6 insertions(+), 60 deletions(-) ---------------------------------------------------------------------- http://git-wip-us.apache.org/repos/asf/kafka/blob/27578d00/core/src/main/scala/kafka/tools/ConsoleConsumer.scala ---------------------------------------------------------------------- diff --git a/core/src/main/scala/kafka/tools/ConsoleConsumer.scala b/core/src/main/scala/kafka/tools/ConsoleConsumer.scala index 664557a..a1e2ffa 100755 --- a/core/src/main/scala/kafka/tools/ConsoleConsumer.scala +++ b/core/src/main/scala/kafka/tools/ConsoleConsumer.scala @@ -174,8 +174,7 @@ object ConsoleConsumer extends Logging { props.putAll(config.consumerProps) props.putAll(config.extraConsumerProps) - if (!props.containsKey("auto.offset.reset")) - props.put("auto.offset.reset", if (config.fromBeginning) "smallest" else "largest") + props.put("auto.offset.reset", if (config.fromBeginning) "smallest" else "largest") props.put("zookeeper.connect", config.zkConnectionStr) if (!config.options.has(config.deleteConsumerOffsetsOpt) && config.options.has(config.resetBeginningOpt) && @@ -198,8 +197,7 @@ object ConsoleConsumer extends Logging { props.putAll(config.consumerProps) props.putAll(config.extraConsumerProps) - if (!props.containsKey(ConsumerConfig.AUTO_OFFSET_RESET_CONFIG)) - props.put(ConsumerConfig.AUTO_OFFSET_RESET_CONFIG, if (config.options.has(config.resetBeginningOpt)) "earliest" else "latest") + props.put(ConsumerConfig.AUTO_OFFSET_RESET_CONFIG, if (config.options.has(config.resetBeginningOpt)) "earliest" else "latest") props.put(ConsumerConfig.BOOTSTRAP_SERVERS_CONFIG, config.bootstrapServer) props.put(ConsumerConfig.KEY_DESERIALIZER_CLASS_CONFIG, "org.apache.kafka.common.serialization.ByteArrayDeserializer") props.put(ConsumerConfig.VALUE_DESERIALIZER_CLASS_CONFIG, "org.apache.kafka.common.serialization.ByteArrayDeserializer") @@ -329,32 +327,12 @@ object ConsoleConsumer extends Logging { val isolationLevel = options.valueOf(isolationLevelOpt).toString val formatter: MessageFormatter = messageFormatterClass.newInstance().asInstanceOf[MessageFormatter] - if (keyDeserializer != null && !keyDeserializer.isEmpty) - // the argument that is provided directly takes precedence + if (keyDeserializer != null && !keyDeserializer.isEmpty) { formatterArgs.setProperty(ConsumerConfig.KEY_DESERIALIZER_CLASS_CONFIG, keyDeserializer) - else if (extraConsumerProps.containsKey(ConsumerConfig.KEY_DESERIALIZER_CLASS_CONFIG)) - // then the argument that is provided through --consumer-property - formatterArgs.setProperty(ConsumerConfig.KEY_DESERIALIZER_CLASS_CONFIG, extraConsumerProps.getProperty(ConsumerConfig.KEY_DESERIALIZER_CLASS_CONFIG)) - else if (consumerProps.containsKey(ConsumerConfig.KEY_DESERIALIZER_CLASS_CONFIG)) - // then the argument that is provided through --consumer.config - formatterArgs.setProperty(ConsumerConfig.KEY_DESERIALIZER_CLASS_CONFIG, consumerProps.getProperty(ConsumerConfig.KEY_DESERIALIZER_CLASS_CONFIG)) - else - // the default is used if the argument is not provided directly or indirectly - formatterArgs.setProperty(ConsumerConfig.KEY_DESERIALIZER_CLASS_CONFIG, "org.apache.kafka.common.serialization.ByteArrayDeserializer") - - if (valueDeserializer != null && !valueDeserializer.isEmpty) - // the argument that is provided directly takes precedence + } + if (valueDeserializer != null && !valueDeserializer.isEmpty) { formatterArgs.setProperty(ConsumerConfig.VALUE_DESERIALIZER_CLASS_CONFIG, valueDeserializer) - else if (extraConsumerProps.containsKey(ConsumerConfig.VALUE_DESERIALIZER_CLASS_CONFIG)) - // then the argument that is provided through --consumer-property - formatterArgs.setProperty(ConsumerConfig.VALUE_DESERIALIZER_CLASS_CONFIG, extraConsumerProps.getProperty(ConsumerConfig.VALUE_DESERIALIZER_CLASS_CONFIG)) - else if (consumerProps.containsKey(ConsumerConfig.VALUE_DESERIALIZER_CLASS_CONFIG)) - // then the argument that is provided through --consumer.config - formatterArgs.setProperty(ConsumerConfig.VALUE_DESERIALIZER_CLASS_CONFIG, consumerProps.getProperty(ConsumerConfig.VALUE_DESERIALIZER_CLASS_CONFIG)) - else - // the default is used if the argument is not provided directly or indirectly - formatterArgs.setProperty(ConsumerConfig.VALUE_DESERIALIZER_CLASS_CONFIG, "org.apache.kafka.common.serialization.ByteArrayDeserializer") - + } formatter.init(formatterArgs) if (useOldConsumer) { http://git-wip-us.apache.org/repos/asf/kafka/blob/27578d00/core/src/test/scala/unit/kafka/tools/ConsoleConsumerTest.scala ---------------------------------------------------------------------- diff --git a/core/src/test/scala/unit/kafka/tools/ConsoleConsumerTest.scala b/core/src/test/scala/unit/kafka/tools/ConsoleConsumerTest.scala index 3cfb5a5..e0917a2 100644 --- a/core/src/test/scala/unit/kafka/tools/ConsoleConsumerTest.scala +++ b/core/src/test/scala/unit/kafka/tools/ConsoleConsumerTest.scala @@ -191,36 +191,4 @@ class ConsoleConsumerTest { assertEquals("1000", config.consumerProps.getProperty("request.timeout.ms")) } - - @Test - def shouldOverwriteConfigFromConfigFileOrPropertiesWithConfigFromArguments() { - val propsFile = TestUtils.tempFile() - val propsStream = new FileOutputStream(propsFile) - propsStream.write("bootstrap.servers=localhost:9093\n".getBytes()) - propsStream.write("auto.offset.reset=earliest\n".getBytes()) - propsStream.write("key.deserializer=org.apache.kafka.common.serialization.LongDeserializer\n".getBytes()) - propsStream.write("value.deserializer=org.apache.kafka.common.serialization.LongDeserializer".getBytes()) - propsStream.close() - val args: Array[String] = Array( - "--bootstrap-server", "localhost:9092", - "--topic", "test", - "--key-deserializer", "org.apache.kafka.common.serialization.DoubleDeserializer", - "--value-deserializer", "org.apache.kafka.common.serialization.DoubleDeserializer", - "--consumer-property", "auto.offset.reset=latest", - "--consumer-property", "key.deserializer=org.apache.kafka.common.serialization.FloatDeserializer", - "--consumer-property", "value.deserializer=org.apache.kafka.common.serialization.FloatDeserializer", - "--consumer.config", propsFile.getAbsolutePath - ) - - val config = new ConsoleConsumer.ConsumerConfig(args) - val props = ConsoleConsumer.getNewConsumerProps(config) - - assertEquals("localhost:9092", props.getProperty("bootstrap.servers")) - assertEquals("latest", props.getProperty("auto.offset.reset")) - assertEquals("org.apache.kafka.common.serialization.DoubleDeserializer", config.formatterArgs.getProperty("key.deserializer")) - assertEquals("org.apache.kafka.common.serialization.DoubleDeserializer", config.formatterArgs.getProperty("value.deserializer")) - // serde settings applies to message formatter only, not the consumer itself - assertEquals("org.apache.kafka.common.serialization.ByteArrayDeserializer", props.getProperty("key.deserializer")) - assertEquals("org.apache.kafka.common.serialization.ByteArrayDeserializer", props.getProperty("value.deserializer")) - } }
