This is an automated email from the ASF dual-hosted git repository. cmccabe pushed a commit to branch trunk in repository https://gitbox.apache.org/repos/asf/kafka.git
The following commit(s) were added to refs/heads/trunk by this push: new 5681309 KAFKA-6764: Improve the whitelist command-line option for console-consumer (#5637) 5681309 is described below commit 5681309094b114c65018c6951f23eca88c329e03 Author: Suman <sumannew...@gmail.com> AuthorDate: Tue Oct 16 08:44:17 2018 +0530 KAFKA-6764: Improve the whitelist command-line option for console-consumer (#5637) --- .../main/scala/kafka/tools/ConsoleConsumer.scala | 8 ++- .../unit/kafka/tools/ConsoleConsumerTest.scala | 63 ++++++++++++++++++++++ 2 files changed, 69 insertions(+), 2 deletions(-) diff --git a/core/src/main/scala/kafka/tools/ConsoleConsumer.scala b/core/src/main/scala/kafka/tools/ConsoleConsumer.scala index 365652a..06705d5 100755 --- a/core/src/main/scala/kafka/tools/ConsoleConsumer.scala +++ b/core/src/main/scala/kafka/tools/ConsoleConsumer.scala @@ -65,6 +65,7 @@ object ConsoleConsumer extends Logging { def run(conf: ConsumerConfig) { val timeoutMs = if (conf.timeoutMs >= 0) conf.timeoutMs else Long.MaxValue val consumer = new KafkaConsumer(consumerProps(conf), new ByteArrayDeserializer, new ByteArrayDeserializer) + val consumerWrapper = if (conf.partitionArg.isDefined) new ConsumerWrapper(Option(conf.topicArg), conf.partitionArg, Option(conf.offsetArg), None, consumer, timeoutMs) @@ -194,7 +195,7 @@ object ConsoleConsumer extends Logging { .withRequiredArg .describedAs("topic") .ofType(classOf[String]) - val whitelistOpt = parser.accepts("whitelist", "Whitelist of topics to include for consumption.") + val whitelistOpt = parser.accepts("whitelist", "Regular expression specifying whitelist of topics to include for consumption.") .withRequiredArg .describedAs("whitelist") .ofType(classOf[String]) @@ -355,7 +356,7 @@ object ConsoleConsumer extends Logging { val groupIdsProvided = Set( Option(options.valueOf(groupIdOpt)), // via --group Option(consumerProps.get(ConsumerConfig.GROUP_ID_CONFIG)), // via --consumer-property - Option(extraConsumerProps.get(ConsumerConfig.GROUP_ID_CONFIG)) // via --cosumer.config + Option(extraConsumerProps.get(ConsumerConfig.GROUP_ID_CONFIG)) // via --consumer.config ).flatten if (groupIdsProvided.size > 1) { @@ -376,6 +377,9 @@ object ConsoleConsumer extends Logging { groupIdPassed = false } + if (groupIdPassed && partitionArg.isDefined) + CommandLineUtils.printUsageAndDie(parser, "Options group and partition cannot be specified together.") + def tryParse(parser: OptionParser, args: Array[String]): OptionSet = { try parser.parse(args: _*) diff --git a/core/src/test/scala/unit/kafka/tools/ConsoleConsumerTest.scala b/core/src/test/scala/unit/kafka/tools/ConsoleConsumerTest.scala index 47b7fae..cdc146f 100644 --- a/core/src/test/scala/unit/kafka/tools/ConsoleConsumerTest.scala +++ b/core/src/test/scala/unit/kafka/tools/ConsoleConsumerTest.scala @@ -432,4 +432,67 @@ class ConsoleConsumerTest { assertTrue(formatter.keyDeserializer.get.asInstanceOf[MockDeserializer].isKey) } + @Test + def shouldParseGroupIdFromBeginningGivenTogether() { + // Start from earliest + var args: Array[String] = Array( + "--bootstrap-server", "localhost:9092", + "--topic", "test", + "--group", "test-group", + "--from-beginning") + + var config = new ConsoleConsumer.ConsumerConfig(args) + assertEquals("localhost:9092", config.bootstrapServer) + assertEquals("test", config.topicArg) + assertEquals(-2, config.offsetArg) + assertEquals(true, config.fromBeginning) + + // Start from latest + args = Array( + "--bootstrap-server", "localhost:9092", + "--topic", "test", + "--group", "test-group" + ) + + config = new ConsoleConsumer.ConsumerConfig(args) + assertEquals("localhost:9092", config.bootstrapServer) + assertEquals("test", config.topicArg) + assertEquals(-1, config.offsetArg) + assertEquals(false, config.fromBeginning) + } + + @Test(expected = classOf[IllegalArgumentException]) + def shouldExitOnGroupIdAndPartitionGivenTogether() { + Exit.setExitProcedure((_, message) => throw new IllegalArgumentException(message.orNull)) + //Given + val args: Array[String] = Array( + "--bootstrap-server", "localhost:9092", + "--topic", "test", + "--group", "test-group", + "--partition", "0") + + //When + try { + new ConsoleConsumer.ConsumerConfig(args) + } finally { + Exit.resetExitProcedure() + } + } + + @Test(expected = classOf[IllegalArgumentException]) + def shouldExitOnOffsetWithoutPartition() { + Exit.setExitProcedure((_, message) => throw new IllegalArgumentException(message.orNull)) + //Given + val args: Array[String] = Array( + "--bootstrap-server", "localhost:9092", + "--topic", "test", + "--offset", "10") + + //When + try { + new ConsoleConsumer.ConsumerConfig(args) + } finally { + Exit.resetExitProcedure() + } + } }