Repository: kafka Updated Branches: refs/heads/trunk 52d5e8839 -> ffc0965d3
KAFKA-2746; Add support for using ConsumerGroupCommand on secure install Author: Ashish Singh <[email protected]> Reviewers: Ismael Juma <[email protected]>, Jason Gustafson <[email protected]>, Jun Rao <[email protected]> Closes #534 from SinghAsDev/KAFKA-2746 Project: http://git-wip-us.apache.org/repos/asf/kafka/repo Commit: http://git-wip-us.apache.org/repos/asf/kafka/commit/ffc0965d Tree: http://git-wip-us.apache.org/repos/asf/kafka/tree/ffc0965d Diff: http://git-wip-us.apache.org/repos/asf/kafka/diff/ffc0965d Branch: refs/heads/trunk Commit: ffc0965d38c364272078d771fc5ed5f8784e4012 Parents: 52d5e88 Author: Ashish Singh <[email protected]> Authored: Tue Nov 17 12:00:16 2015 -0800 Committer: Jun Rao <[email protected]> Committed: Tue Nov 17 12:00:16 2015 -0800 ---------------------------------------------------------------------- .../main/scala/kafka/admin/AdminClient.scala | 3 ++ .../kafka/admin/ConsumerGroupCommand.scala | 36 +++++++++----------- 2 files changed, 19 insertions(+), 20 deletions(-) ---------------------------------------------------------------------- http://git-wip-us.apache.org/repos/asf/kafka/blob/ffc0965d/core/src/main/scala/kafka/admin/AdminClient.scala ---------------------------------------------------------------------- diff --git a/core/src/main/scala/kafka/admin/AdminClient.scala b/core/src/main/scala/kafka/admin/AdminClient.scala index 181080f..53b6fdb 100644 --- a/core/src/main/scala/kafka/admin/AdminClient.scala +++ b/core/src/main/scala/kafka/admin/AdminClient.scala @@ -13,6 +13,7 @@ package kafka.admin import java.nio.ByteBuffer +import java.util.Properties import java.util.concurrent.atomic.AtomicInteger import kafka.common.KafkaException @@ -209,6 +210,8 @@ object AdminClient { create(new AdminConfig(config)) } + def create(props: Properties): AdminClient = create(props.asScala.toMap) + def create(props: Map[String, _]): AdminClient = create(new AdminConfig(props)) def create(config: AdminConfig): AdminClient = { http://git-wip-us.apache.org/repos/asf/kafka/blob/ffc0965d/core/src/main/scala/kafka/admin/ConsumerGroupCommand.scala ---------------------------------------------------------------------- diff --git a/core/src/main/scala/kafka/admin/ConsumerGroupCommand.scala b/core/src/main/scala/kafka/admin/ConsumerGroupCommand.scala index 2d95767..d71499e 100755 --- a/core/src/main/scala/kafka/admin/ConsumerGroupCommand.scala +++ b/core/src/main/scala/kafka/admin/ConsumerGroupCommand.scala @@ -26,6 +26,7 @@ import kafka.common.{TopicAndPartition, _} import kafka.consumer.SimpleConsumer import kafka.utils._ import org.I0Itec.zkclient.exception.ZkNoNodeException +import org.apache.kafka.clients.CommonClientConfigs import org.apache.kafka.clients.consumer.{ConsumerConfig, KafkaConsumer} import org.apache.kafka.common.TopicPartition import org.apache.kafka.common.security.JaasUtils @@ -75,15 +76,6 @@ object ConsumerGroupCommand { } } - private def parseConfigs(opts: ConsumerGroupCommandOptions): Properties = { - val configsToBeAdded = opts.options.valuesOf(opts.configOpt).asScala.map(_.split("""\s*=\s*""")) - require(configsToBeAdded.forall(config => config.length == 2), - "Invalid config: all configs to be added must be in the format \"key=val\".") - val props = new Properties - configsToBeAdded.foreach(pair => props.setProperty(pair(0).trim, pair(1).trim)) - props - } - sealed trait ConsumerGroupService { def list(): Unit @@ -160,9 +152,9 @@ object ConsumerGroupCommand { } protected def describeGroup(group: String) { - val configs = parseConfigs(opts) - val channelSocketTimeoutMs = configs.getProperty("channelSocketTimeoutMs", "600").toInt - val channelRetryBackoffMs = configs.getProperty("channelRetryBackoffMsOpt", "300").toInt + val props = if (opts.options.has(opts.commandConfigOpt)) Utils.loadProps(opts.options.valueOf(opts.commandConfigOpt)) else new Properties() + val channelSocketTimeoutMs = props.getProperty("channelSocketTimeoutMs", "600").toInt + val channelRetryBackoffMs = props.getProperty("channelRetryBackoffMsOpt", "300").toInt val topics = zkUtils.getTopicsByConsumerGroup(group) if (topics.isEmpty) println("No topic available for consumer group provided") @@ -352,8 +344,11 @@ object ConsumerGroupCommand { if (consumer != null) consumer.close() } - private def createAdminClient(): AdminClient = - AdminClient.createSimplePlaintext(opts.options.valueOf(opts.bootstrapServerOpt)) + private def createAdminClient(): AdminClient = { + val props = if (opts.options.has(opts.commandConfigOpt)) Utils.loadProps(opts.options.valueOf(opts.commandConfigOpt)) else new Properties() + props.put(CommonClientConfigs.BOOTSTRAP_SERVERS_CONFIG, opts.options.valueOf(opts.bootstrapServerOpt)) + AdminClient.create(props) + } private def getConsumer() = { if (consumer == null) @@ -371,6 +366,8 @@ object ConsumerGroupCommand { properties.put(ConsumerConfig.SESSION_TIMEOUT_MS_CONFIG, "30000") properties.put(ConsumerConfig.KEY_DESERIALIZER_CLASS_CONFIG, deserializer) properties.put(ConsumerConfig.VALUE_DESERIALIZER_CLASS_CONFIG, deserializer) + if (opts.options.has(opts.commandConfigOpt)) properties.putAll(Utils.loadProps(opts.options.valueOf(opts.commandConfigOpt))) + new KafkaConsumer(properties) } @@ -390,7 +387,6 @@ object ConsumerGroupCommand { val BootstrapServerDoc = "REQUIRED (only when using new-consumer): The server to connect to." val GroupDoc = "The consumer group we wish to act on." val TopicDoc = "The topic whose consumer group information should be deleted." - val ConfigDoc = "Configuration for timeouts. For instance --config channelSocketTimeoutMs=600" val ListDoc = "List all consumer groups." val DescribeDoc = "Describe consumer group and list offset lag related to given group." val nl = System.getProperty("line.separator") @@ -402,6 +398,7 @@ object ConsumerGroupCommand { "for every consumer group. For instance --topic t1" + nl + "WARNING: Group deletion only works for old ZK-based consumer groups, and one has to use it carefully to only delete groups that are not active." val NewConsumerDoc = "Use new consumer." + val CommandConfigDoc = "Property file containing configs to be passed to Admin Client and Consumer." val parser = new OptionParser val zkConnectOpt = parser.accepts("zookeeper", ZkConnectDoc) .withRequiredArg @@ -419,14 +416,14 @@ object ConsumerGroupCommand { .withRequiredArg .describedAs("topic") .ofType(classOf[String]) - val configOpt = parser.accepts("config", ConfigDoc) - .withRequiredArg - .describedAs("name=value") - .ofType(classOf[String]) val listOpt = parser.accepts("list", ListDoc) val describeOpt = parser.accepts("describe", DescribeDoc) val deleteOpt = parser.accepts("delete", DeleteDoc) val newConsumerOpt = parser.accepts("new-consumer", NewConsumerDoc) + val commandConfigOpt = parser.accepts("command-config", CommandConfigDoc) + .withRequiredArg + .describedAs("command config property file") + .ofType(classOf[String]) val options = parser.parse(args : _*) val allConsumerGroupLevelOpts: Set[OptionSpec[_]] = Set(listOpt, describeOpt, deleteOpt) @@ -460,7 +457,6 @@ object ConsumerGroupCommand { // check invalid args CommandLineUtils.checkInvalidArgs(parser, options, groupOpt, allConsumerGroupLevelOpts - describeOpt - deleteOpt) CommandLineUtils.checkInvalidArgs(parser, options, topicOpt, allConsumerGroupLevelOpts - deleteOpt) - CommandLineUtils.checkInvalidArgs(parser, options, configOpt, allConsumerGroupLevelOpts - describeOpt) } } }
