Repository: kafka Updated Branches: refs/heads/trunk 36242b846 -> 1d055f755
KAFKA-3282; Change tools to use new consumer if zookeeper is not specified Author: Arun Mahadevan <[email protected]> Reviewers: Jason Gustafson <[email protected]>, Ismael Juma <[email protected]> Closes #1376 from arunmahadevan/cons-consumer-fix Project: http://git-wip-us.apache.org/repos/asf/kafka/repo Commit: http://git-wip-us.apache.org/repos/asf/kafka/commit/1d055f75 Tree: http://git-wip-us.apache.org/repos/asf/kafka/tree/1d055f75 Diff: http://git-wip-us.apache.org/repos/asf/kafka/diff/1d055f75 Branch: refs/heads/trunk Commit: 1d055f7551d138324d2540095a1cfc1c8f74d76f Parents: 36242b8 Author: Arun Mahadevan <[email protected]> Authored: Sun Sep 25 08:44:56 2016 +0100 Committer: Ismael Juma <[email protected]> Committed: Sun Sep 25 09:12:02 2016 +0100 ---------------------------------------------------------------------- .../kafka/admin/ConsumerGroupCommand.scala | 34 ++++++++-------- .../scala/kafka/tools/ConsoleConsumer.scala | 41 ++++++++++++-------- .../scala/kafka/tools/ConsumerPerformance.scala | 17 +++++--- .../unit/kafka/tools/ConsoleConsumerTest.scala | 32 +++++++++++---- 4 files changed, 76 insertions(+), 48 deletions(-) ---------------------------------------------------------------------- http://git-wip-us.apache.org/repos/asf/kafka/blob/1d055f75/core/src/main/scala/kafka/admin/ConsumerGroupCommand.scala ---------------------------------------------------------------------- diff --git a/core/src/main/scala/kafka/admin/ConsumerGroupCommand.scala b/core/src/main/scala/kafka/admin/ConsumerGroupCommand.scala index f0c817f..1cc63b1 100755 --- a/core/src/main/scala/kafka/admin/ConsumerGroupCommand.scala +++ b/core/src/main/scala/kafka/admin/ConsumerGroupCommand.scala @@ -54,8 +54,11 @@ object ConsumerGroupCommand { opts.checkArgs() val consumerGroupService = { - if (opts.options.has(opts.newConsumerOpt)) new KafkaConsumerGroupService(opts) - else new ZkConsumerGroupService(opts) + if (opts.useOldConsumer) { + new ZkConsumerGroupService(opts) + } else { + new KafkaConsumerGroupService(opts) + } } try { @@ -376,9 +379,9 @@ object ConsumerGroupCommand { } class ConsumerGroupCommandOptions(args: Array[String]) { - val ZkConnectDoc = "REQUIRED (unless new-consumer is used): The connection string for the zookeeper connection in the form host:port. " + + val ZkConnectDoc = "REQUIRED (only when using old consumer): The connection string for the zookeeper connection in the form host:port. " + "Multiple URLS can be given to allow fail-over." - val BootstrapServerDoc = "REQUIRED (only when using new-consumer): The server to connect to." + val BootstrapServerDoc = "REQUIRED (unless old consumer is used): The server to connect to." val GroupDoc = "The consumer group we wish to act on." val TopicDoc = "The topic whose consumer group information should be deleted." val ListDoc = "List all consumer groups." @@ -391,7 +394,7 @@ object ConsumerGroupCommand { "Pass in just a topic to delete the given topic's partition offsets and ownership information " + "for every consumer group. For instance --topic t1" + nl + "WARNING: Group deletion only works for old ZK-based consumer groups, and one has to use it carefully to only delete groups that are not active." - val NewConsumerDoc = "Use new consumer." + val NewConsumerDoc = "Use new consumer. This is the default." val CommandConfigDoc = "Property file containing configs to be passed to Admin Client and Consumer." val parser = new OptionParser val zkConnectOpt = parser.accepts("zookeeper", ZkConnectDoc) @@ -420,27 +423,24 @@ object ConsumerGroupCommand { .ofType(classOf[String]) val options = parser.parse(args : _*) + val useOldConsumer = options.has(zkConnectOpt) + val allConsumerGroupLevelOpts: Set[OptionSpec[_]] = Set(listOpt, describeOpt, deleteOpt) def checkArgs() { // check required args - if (options.has(newConsumerOpt)) { + if (useOldConsumer) { + if (options.has(bootstrapServerOpt)) + CommandLineUtils.printUsageAndDie(parser, s"Option $bootstrapServerOpt is not valid with $zkConnectOpt.") + else if (options.has(newConsumerOpt)) + CommandLineUtils.printUsageAndDie(parser, s"Option $newConsumerOpt is not valid with $zkConnectOpt.") + } else { CommandLineUtils.checkRequiredArgs(parser, options, bootstrapServerOpt) - if (options.has(zkConnectOpt)) - CommandLineUtils.printUsageAndDie(parser, s"Option $zkConnectOpt is not valid with $newConsumerOpt") - if (options.has(deleteOpt)) - CommandLineUtils.printUsageAndDie(parser, s"Option $deleteOpt is not valid with $newConsumerOpt. Note that " + + CommandLineUtils.printUsageAndDie(parser, s"Option $deleteOpt is only valid with $zkConnectOpt. Note that " + "there's no need to delete group metadata for the new consumer as it is automatically deleted when the last " + "member leaves") - - } else { - CommandLineUtils.checkRequiredArgs(parser, options, zkConnectOpt) - - if (options.has(bootstrapServerOpt)) - CommandLineUtils.printUsageAndDie(parser, s"Option $bootstrapServerOpt is only valid with $newConsumerOpt") - } if (options.has(describeOpt)) http://git-wip-us.apache.org/repos/asf/kafka/blob/1d055f75/core/src/main/scala/kafka/tools/ConsoleConsumer.scala ---------------------------------------------------------------------- diff --git a/core/src/main/scala/kafka/tools/ConsoleConsumer.scala b/core/src/main/scala/kafka/tools/ConsoleConsumer.scala index 17cf5bd..361bef2 100755 --- a/core/src/main/scala/kafka/tools/ConsoleConsumer.scala +++ b/core/src/main/scala/kafka/tools/ConsoleConsumer.scala @@ -58,15 +58,15 @@ object ConsoleConsumer extends Logging { def run(conf: ConsumerConfig) { val consumer = - if (conf.useNewConsumer) { + if (conf.useOldConsumer) { + checkZk(conf) + new OldConsumer(conf.filterSpec, getOldConsumerProps(conf)) + } else { val timeoutMs = if (conf.timeoutMs >= 0) conf.timeoutMs else Long.MaxValue if (conf.partitionArg.isDefined) new NewShinyConsumer(Option(conf.topicArg), conf.partitionArg, Option(conf.offsetArg), None, getNewConsumerProps(conf), timeoutMs) else new NewShinyConsumer(Option(conf.topicArg), None, None, Option(conf.whitelistArg), getNewConsumerProps(conf), timeoutMs) - } else { - checkZk(conf) - new OldConsumer(conf.filterSpec, getOldConsumerProps(conf)) } addShutdownHook(consumer, conf) @@ -224,7 +224,7 @@ object ConsoleConsumer extends Logging { .describedAs("consume offset") .ofType(classOf[String]) .defaultsTo("latest") - val zkConnectOpt = parser.accepts("zookeeper", "REQUIRED: The connection string for the zookeeper connection in the form host:port. " + + val zkConnectOpt = parser.accepts("zookeeper", "REQUIRED (only when using old consumer): The connection string for the zookeeper connection in the form host:port. " + "Multiple URLS can be given to allow fail-over.") .withRequiredArg .describedAs("urls") @@ -265,8 +265,8 @@ object ConsoleConsumer extends Logging { .withRequiredArg .describedAs("metrics directory") .ofType(classOf[java.lang.String]) - val useNewConsumerOpt = parser.accepts("new-consumer", "Use the new consumer implementation.") - val bootstrapServerOpt = parser.accepts("bootstrap-server") + val newConsumerOpt = parser.accepts("new-consumer", "Use the new consumer implementation. This is the default.") + val bootstrapServerOpt = parser.accepts("bootstrap-server", "REQUIRED (unless old consumer is used): The server to connect to.") .withRequiredArg .describedAs("server to connect to") .ofType(classOf[String]) @@ -287,7 +287,7 @@ object ConsoleConsumer extends Logging { var groupIdPassed = true val options: OptionSet = tryParse(parser, args) - val useNewConsumer = options.has(useNewConsumerOpt) + val useOldConsumer = options.has(zkConnectOpt) val enableSystestEventsLogging = options.has(enableSystestEventsLoggingOpt) // If using old consumer, exactly one of whitelist/blacklist/topic is required. @@ -314,21 +314,27 @@ object ConsoleConsumer extends Logging { val formatter: MessageFormatter = messageFormatterClass.newInstance().asInstanceOf[MessageFormatter] formatter.init(formatterArgs) - if (useNewConsumer) { - val topicOrFilterOpt = List(topicIdOpt, whitelistOpt).filter(options.has) - if (topicOrFilterOpt.size != 1) - CommandLineUtils.printUsageAndDie(parser, "Exactly one of whitelist/topic is required.") - topicArg = options.valueOf(topicIdOpt) - whitelistArg = options.valueOf(whitelistOpt) - } else { + if (useOldConsumer) { + if (options.has(bootstrapServerOpt)) + CommandLineUtils.printUsageAndDie(parser, s"Option $bootstrapServerOpt is not valid with $zkConnectOpt.") + else if (options.has(newConsumerOpt)) + CommandLineUtils.printUsageAndDie(parser, s"Option $newConsumerOpt is not valid with $zkConnectOpt.") val topicOrFilterOpt = List(topicIdOpt, whitelistOpt, blacklistOpt).filter(options.has) if (topicOrFilterOpt.size != 1) CommandLineUtils.printUsageAndDie(parser, "Exactly one of whitelist/blacklist/topic is required.") topicArg = options.valueOf(topicOrFilterOpt.head) filterSpec = if (options.has(blacklistOpt)) new Blacklist(topicArg) else new Whitelist(topicArg) + Console.err.println("Using the ConsoleConsumer with old consumer is deprecated and will be removed " + + s"in a future major release. Consider using the new consumer by passing $bootstrapServerOpt instead of ${zkConnectOpt}.") + } else { + val topicOrFilterOpt = List(topicIdOpt, whitelistOpt).filter(options.has) + if (topicOrFilterOpt.size != 1) + CommandLineUtils.printUsageAndDie(parser, "Exactly one of whitelist/topic is required.") + topicArg = options.valueOf(topicIdOpt) + whitelistArg = options.valueOf(whitelistOpt) } - if (!useNewConsumer && (partitionArg.isDefined || options.has(offsetOpt))) + if (useOldConsumer && (partitionArg.isDefined || options.has(offsetOpt))) CommandLineUtils.printUsageAndDie(parser, "Partition-offset based consumption is supported in the new consumer only.") if (partitionArg.isDefined) { @@ -361,7 +367,8 @@ object ConsoleConsumer extends Logging { else if (fromBeginning) OffsetRequest.EarliestTime else OffsetRequest.LatestTime - CommandLineUtils.checkRequiredArgs(parser, options, if (useNewConsumer) bootstrapServerOpt else zkConnectOpt) + if (!useOldConsumer) + CommandLineUtils.checkRequiredArgs(parser, options, bootstrapServerOpt) if (options.has(csvMetricsReporterEnabledOpt)) { val csvReporterProps = new Properties() http://git-wip-us.apache.org/repos/asf/kafka/blob/1d055f75/core/src/main/scala/kafka/tools/ConsumerPerformance.scala ---------------------------------------------------------------------- diff --git a/core/src/main/scala/kafka/tools/ConsumerPerformance.scala b/core/src/main/scala/kafka/tools/ConsumerPerformance.scala index 36376bf..63a04c9 100644 --- a/core/src/main/scala/kafka/tools/ConsumerPerformance.scala +++ b/core/src/main/scala/kafka/tools/ConsumerPerformance.scala @@ -58,7 +58,7 @@ object ConsumerPerformance { } var startMs, endMs = 0L - if (config.useNewConsumer) { + if (!config.useOldConsumer) { val consumer = new KafkaConsumer[Array[Byte], Array[Byte]](config.props) consumer.subscribe(List(config.topic)) startMs = System.currentTimeMillis @@ -163,12 +163,12 @@ object ConsumerPerformance { } class ConsumerPerfConfig(args: Array[String]) extends PerfConfig(args) { - val zkConnectOpt = parser.accepts("zookeeper", "The connection string for the zookeeper connection in the form host:port. " + + val zkConnectOpt = parser.accepts("zookeeper", "REQUIRED (only when using old consumer): The connection string for the zookeeper connection in the form host:port. " + "Multiple URLS can be given to allow fail-over. This option is only used with the old consumer.") .withRequiredArg .describedAs("urls") .ofType(classOf[String]) - val bootstrapServersOpt = parser.accepts("broker-list", "A broker list to use for connecting if using the new consumer.") + val bootstrapServersOpt = parser.accepts("broker-list", "REQUIRED (unless old consumer is used): A broker list to use for connecting if using the new consumer.") .withRequiredArg() .describedAs("host") .ofType(classOf[String]) @@ -203,7 +203,7 @@ object ConsumerPerformance { .describedAs("count") .ofType(classOf[java.lang.Integer]) .defaultsTo(1) - val useNewConsumerOpt = parser.accepts("new-consumer", "Use the new consumer implementation.") + val newConsumerOpt = parser.accepts("new-consumer", "Use the new consumer implementation. This is the default.") val consumerConfigOpt = parser.accepts("consumer.config", "Consumer config properties file.") .withRequiredArg .describedAs("config file") @@ -213,13 +213,14 @@ object ConsumerPerformance { CommandLineUtils.checkRequiredArgs(parser, options, topicOpt, numMessagesOpt) - val useNewConsumer = options.has(useNewConsumerOpt) + val useOldConsumer = options.has(zkConnectOpt) val props = if (options.has(consumerConfigOpt)) Utils.loadProps(options.valueOf(consumerConfigOpt)) else new Properties - if (useNewConsumer) { + if (!useOldConsumer) { + CommandLineUtils.checkRequiredArgs(parser, options, bootstrapServersOpt) import org.apache.kafka.clients.consumer.ConsumerConfig props.put(ConsumerConfig.BOOTSTRAP_SERVERS_CONFIG, options.valueOf(bootstrapServersOpt)) props.put(ConsumerConfig.GROUP_ID_CONFIG, options.valueOf(groupIdOpt)) @@ -230,6 +231,10 @@ object ConsumerPerformance { props.put(ConsumerConfig.VALUE_DESERIALIZER_CLASS_CONFIG, classOf[ByteArrayDeserializer]) props.put(ConsumerConfig.CHECK_CRCS_CONFIG, "false") } else { + if (options.has(bootstrapServersOpt)) + CommandLineUtils.printUsageAndDie(parser, s"Option $bootstrapServersOpt is not valid with $zkConnectOpt.") + else if (options.has(newConsumerOpt)) + CommandLineUtils.printUsageAndDie(parser, s"Option $newConsumerOpt is not valid with $zkConnectOpt.") CommandLineUtils.checkRequiredArgs(parser, options, zkConnectOpt, numMessagesOpt) props.put("group.id", options.valueOf(groupIdOpt)) props.put("socket.receive.buffer.bytes", options.valueOf(socketBufferSizeOpt).toString) http://git-wip-us.apache.org/repos/asf/kafka/blob/1d055f75/core/src/test/scala/unit/kafka/tools/ConsoleConsumerTest.scala ---------------------------------------------------------------------- diff --git a/core/src/test/scala/unit/kafka/tools/ConsoleConsumerTest.scala b/core/src/test/scala/unit/kafka/tools/ConsoleConsumerTest.scala index 63be9c4..013ed3e 100644 --- a/core/src/test/scala/unit/kafka/tools/ConsoleConsumerTest.scala +++ b/core/src/test/scala/unit/kafka/tools/ConsoleConsumerTest.scala @@ -89,7 +89,7 @@ class ConsoleConsumerTest extends JUnitSuite { val config = new ConsoleConsumer.ConsumerConfig(args) //Then - assertFalse(config.useNewConsumer) + assertTrue(config.useOldConsumer) assertEquals("localhost:2181", config.zkConnectionStr) assertEquals("test", config.topicArg) assertEquals(true, config.fromBeginning) @@ -108,14 +108,14 @@ class ConsoleConsumerTest extends JUnitSuite { val config = new ConsoleConsumer.ConsumerConfig(args) //Then - assertTrue(config.useNewConsumer) + assertFalse(config.useOldConsumer) assertEquals("localhost:9092", config.bootstrapServer) assertEquals("test", config.topicArg) assertEquals(true, config.fromBeginning) } @Test - def shouldParseValidNewSimpleConsumerValidConfigWithNumericOffset() { + def shouldParseValidNewSimpleConsumerValidConfigWithNumericOffset(): Unit = { //Given val args: Array[String] = Array( "--bootstrap-server", "localhost:9092", @@ -128,12 +128,28 @@ class ConsoleConsumerTest extends JUnitSuite { val config = new ConsoleConsumer.ConsumerConfig(args) //Then - assertTrue(config.useNewConsumer) + assertFalse(config.useOldConsumer) assertEquals("localhost:9092", config.bootstrapServer) assertEquals("test", config.topicArg) assertEquals(0, config.partitionArg.get) assertEquals(3, config.offsetArg) assertEquals(false, config.fromBeginning) + + } + + @Test + def testDefaultConsumer() { + //Given + val args: Array[String] = Array( + "--bootstrap-server", "localhost:9092", + "--topic", "test", + "--from-beginning") + + //When + val config = new ConsoleConsumer.ConsumerConfig(args) + + //Then + assertFalse(config.useOldConsumer) } @Test @@ -150,7 +166,7 @@ class ConsoleConsumerTest extends JUnitSuite { val config = new ConsoleConsumer.ConsumerConfig(args) //Then - assertTrue(config.useNewConsumer) + assertFalse(config.useOldConsumer) assertEquals("localhost:9092", config.bootstrapServer) assertEquals("test", config.topicArg) assertEquals(0, config.partitionArg.get) @@ -162,16 +178,16 @@ class ConsoleConsumerTest extends JUnitSuite { def shouldParseConfigsFromFile() { val propsFile = TestUtils.tempFile() val propsStream = new FileOutputStream(propsFile) - propsStream.write("consumer.timeout.ms=1000".getBytes()) + propsStream.write("request.timeout.ms=1000".getBytes()) propsStream.close() val args: Array[String] = Array( - "--zookeeper", "localhost:2181", + "--bootstrap-server", "localhost:9092", "--topic", "test", "--consumer.config", propsFile.getAbsolutePath ) val config = new ConsoleConsumer.ConsumerConfig(args) - assertEquals("1000", config.consumerProps.getProperty("consumer.timeout.ms")) + assertEquals("1000", config.consumerProps.getProperty("request.timeout.ms")) } }
