Updated Branches: refs/heads/trunk dd58d753c -> b5d16871c
KAFKA-1139 Topic data change handling callback should not call syncedRebalance directly; reviewed by Guozhang Wang and Jun Rao Project: http://git-wip-us.apache.org/repos/asf/kafka/repo Commit: http://git-wip-us.apache.org/repos/asf/kafka/commit/b5d16871 Tree: http://git-wip-us.apache.org/repos/asf/kafka/tree/b5d16871 Diff: http://git-wip-us.apache.org/repos/asf/kafka/diff/b5d16871 Branch: refs/heads/trunk Commit: b5d16871c02a585643aec3229546af04721bb42c Parents: dd58d75 Author: Neha Narkhede <neha.narkh...@gmail.com> Authored: Thu Dec 19 12:54:10 2013 -0800 Committer: Neha Narkhede <neha.narkh...@gmail.com> Committed: Thu Dec 19 12:54:10 2013 -0800 ---------------------------------------------------------------------- .../main/scala/kafka/admin/TopicCommand.scala | 112 ++++++++++--------- .../consumer/ZookeeperConsumerConnector.scala | 19 ++-- 2 files changed, 72 insertions(+), 59 deletions(-) ---------------------------------------------------------------------- http://git-wip-us.apache.org/repos/asf/kafka/blob/b5d16871/core/src/main/scala/kafka/admin/TopicCommand.scala ---------------------------------------------------------------------- diff --git a/core/src/main/scala/kafka/admin/TopicCommand.scala b/core/src/main/scala/kafka/admin/TopicCommand.scala index d25aae3..083fd63 100644 --- a/core/src/main/scala/kafka/admin/TopicCommand.scala +++ b/core/src/main/scala/kafka/admin/TopicCommand.scala @@ -25,6 +25,7 @@ import scala.collection._ import scala.collection.JavaConversions._ import kafka.cluster.Broker import kafka.log.LogConfig +import kafka.consumer.Whitelist object TopicCommand { @@ -43,67 +44,79 @@ object TopicCommand { CommandLineUtils.checkRequiredArgs(opts.parser, opts.options, opts.zkConnectOpt) val zkClient = new ZkClient(opts.options.valueOf(opts.zkConnectOpt), 30000, 30000, ZKStringSerializer) - - if(opts.options.has(opts.createOpt)) - createTopic(zkClient, opts) - else if(opts.options.has(opts.alterOpt)) - alterTopic(zkClient, opts) - else if(opts.options.has(opts.deleteOpt)) - deleteTopic(zkClient, opts) - else if(opts.options.has(opts.listOpt)) - listTopics(zkClient, opts) - else if(opts.options.has(opts.describeOpt)) - describeTopic(zkClient, opts) - zkClient.close() + try { + if(opts.options.has(opts.createOpt)) + createTopic(zkClient, opts) + else if(opts.options.has(opts.alterOpt)) + alterTopic(zkClient, opts) + else if(opts.options.has(opts.deleteOpt)) + deleteTopic(zkClient, opts) + else if(opts.options.has(opts.listOpt)) + listTopics(zkClient, opts) + else if(opts.options.has(opts.describeOpt)) + describeTopic(zkClient, opts) + } catch { + case e => println("Error while executing topic command", e) + } finally { + zkClient.close() + } + } + + private def getTopics(zkClient: ZkClient, opts: TopicCommandOptions): Seq[String] = { + val topicsSpec = opts.options.valueOf(opts.topicOpt) + val topicsFilter = new Whitelist(topicsSpec) + val allTopics = ZkUtils.getAllTopics(zkClient) + allTopics.filter(topicsFilter.isTopicAllowed).sorted } def createTopic(zkClient: ZkClient, opts: TopicCommandOptions) { CommandLineUtils.checkRequiredArgs(opts.parser, opts.options, opts.topicOpt) - val topics = opts.options.valuesOf(opts.topicOpt) + val topic = opts.options.valueOf(opts.topicOpt) val configs = parseTopicConfigsToBeAdded(opts) - for (topic <- topics) { - if (opts.options.has(opts.replicaAssignmentOpt)) { - val assignment = parseReplicaAssignment(opts.options.valueOf(opts.replicaAssignmentOpt)) - AdminUtils.createOrUpdateTopicPartitionAssignmentPathInZK(zkClient, topic, assignment, configs) - } else { - CommandLineUtils.checkRequiredArgs(opts.parser, opts.options, opts.partitionsOpt, opts.replicationFactorOpt) - val partitions = opts.options.valueOf(opts.partitionsOpt).intValue - val replicas = opts.options.valueOf(opts.replicationFactorOpt).intValue - AdminUtils.createTopic(zkClient, topic, partitions, replicas, configs) - } - println("Created topic \"%s\".".format(topic)) + if (opts.options.has(opts.replicaAssignmentOpt)) { + val assignment = parseReplicaAssignment(opts.options.valueOf(opts.replicaAssignmentOpt)) + AdminUtils.createOrUpdateTopicPartitionAssignmentPathInZK(zkClient, topic, assignment, configs) + } else { + CommandLineUtils.checkRequiredArgs(opts.parser, opts.options, opts.partitionsOpt, opts.replicationFactorOpt) + val partitions = opts.options.valueOf(opts.partitionsOpt).intValue + val replicas = opts.options.valueOf(opts.replicationFactorOpt).intValue + AdminUtils.createTopic(zkClient, topic, partitions, replicas, configs) } + println("Created topic \"%s\".".format(topic)) } - + def alterTopic(zkClient: ZkClient, opts: TopicCommandOptions) { CommandLineUtils.checkRequiredArgs(opts.parser, opts.options, opts.topicOpt) - val topic = opts.options.valueOf(opts.topicOpt) - if(opts.options.has(opts.configOpt) || opts.options.has(opts.deleteConfigOpt)) { - val configsToBeAdded = parseTopicConfigsToBeAdded(opts) - val configsToBeDeleted = parseTopicConfigsToBeDeleted(opts) - // compile the final set of configs - val configs = AdminUtils.fetchTopicConfig(zkClient, topic) - configs.putAll(configsToBeAdded) - configsToBeDeleted.foreach(config => configs.remove(config)) - AdminUtils.changeTopicConfig(zkClient, topic, configs) - println("Updated config for topic \"%s\".".format(topic)) - } - if(opts.options.has(opts.partitionsOpt)) { - println("WARNING: If partitions are increased for a topic that has a key, the partition " + - "logic or ordering of the messages will be affected") - val nPartitions = opts.options.valueOf(opts.partitionsOpt).intValue - val replicaAssignmentStr = opts.options.valueOf(opts.replicaAssignmentOpt) - AdminUtils.addPartitions(zkClient, topic, nPartitions, replicaAssignmentStr) - println("adding partitions succeeded!") + val topics = getTopics(zkClient, opts) + topics.foreach { topic => + if(opts.options.has(opts.configOpt) || opts.options.has(opts.deleteConfigOpt)) { + val configsToBeAdded = parseTopicConfigsToBeAdded(opts) + val configsToBeDeleted = parseTopicConfigsToBeDeleted(opts) + // compile the final set of configs + val configs = AdminUtils.fetchTopicConfig(zkClient, topic) + configs.putAll(configsToBeAdded) + configsToBeDeleted.foreach(config => configs.remove(config)) + AdminUtils.changeTopicConfig(zkClient, topic, configs) + println("Updated config for topic \"%s\".".format(topic)) + } + if(opts.options.has(opts.partitionsOpt)) { + println("WARNING: If partitions are increased for a topic that has a key, the partition " + + "logic or ordering of the messages will be affected") + val nPartitions = opts.options.valueOf(opts.partitionsOpt).intValue + val replicaAssignmentStr = opts.options.valueOf(opts.replicaAssignmentOpt) + AdminUtils.addPartitions(zkClient, topic, nPartitions, replicaAssignmentStr) + println("adding partitions succeeded!") + } + if(opts.options.has(opts.replicationFactorOpt)) + Utils.croak("Changing the replication factor is not supported.") } - if(opts.options.has(opts.replicationFactorOpt)) - Utils.croak("Changing the replication factor is not supported.") } def deleteTopic(zkClient: ZkClient, opts: TopicCommandOptions) { CommandLineUtils.checkRequiredArgs(opts.parser, opts.options, opts.topicOpt) - for(topic <- opts.options.valuesOf(opts.topicOpt)) { + val topics = getTopics(zkClient, opts) + topics.foreach { topic => AdminUtils.deleteTopic(zkClient, topic) println("Topic \"%s\" deleted.".format(topic)) } @@ -128,9 +141,7 @@ object TopicCommand { } def describeTopic(zkClient: ZkClient, opts: TopicCommandOptions) { - var topics: Seq[String] = opts.options.valuesOf(opts.topicOpt).toSeq.sorted - if (topics.size <= 0) - topics = ZkUtils.getChildrenParentMayNotExist(zkClient, ZkUtils.BrokerTopicsPath).sorted + var topics = getTopics(zkClient, opts) val reportUnderReplicatedPartitions = if (opts.options.has(opts.reportUnderReplicatedPartitionsOpt)) true else false val reportUnavailablePartitions = if (opts.options.has(opts.reportUnavailablePartitionsOpt)) true else false val liveBrokers = ZkUtils.getAllBrokersInCluster(zkClient).map(_.id).toSet @@ -212,7 +223,8 @@ object TopicCommand { val deleteOpt = parser.accepts("delete", "Delete the topic.") val describeOpt = parser.accepts("describe", "List details for the given topics.") val helpOpt = parser.accepts("help", "Print usage information.") - val topicOpt = parser.accepts("topic", "The topic to be create, alter, delete, or describe.") + val topicOpt = parser.accepts("topic", "The topic to be create, alter, delete, or describe. Can also accept a regular " + + "expression except for --create option") .withRequiredArg .describedAs("topic") .ofType(classOf[String]) http://git-wip-us.apache.org/repos/asf/kafka/blob/b5d16871/core/src/main/scala/kafka/consumer/ZookeeperConsumerConnector.scala ---------------------------------------------------------------------- diff --git a/core/src/main/scala/kafka/consumer/ZookeeperConsumerConnector.scala b/core/src/main/scala/kafka/consumer/ZookeeperConsumerConnector.scala index 0cc236a..703b2e2 100644 --- a/core/src/main/scala/kafka/consumer/ZookeeperConsumerConnector.scala +++ b/core/src/main/scala/kafka/consumer/ZookeeperConsumerConnector.scala @@ -91,7 +91,7 @@ private[kafka] class ZookeeperConsumerConnector(val config: ConsumerConfig, private val messageStreamCreated = new AtomicBoolean(false) private var sessionExpirationListener: ZKSessionExpireListener = null - private var topicPartitionChangeListenner: ZKTopicPartitionChangeListener = null + private var topicPartitionChangeListener: ZKTopicPartitionChangeListener = null private var loadBalancerListener: ZKRebalancerListener = null private var wildcardTopicWatcher: ZookeeperTopicEventWatcher = null @@ -302,7 +302,6 @@ private[kafka] class ZookeeperConsumerConnector(val config: ConsumerConfig, registerConsumerInZK(dirs, consumerIdString, topicCount) // explicitly trigger load balancing for this consumer loadBalancerListener.syncedRebalance() - // There is no need to resubscribe to child and state changes. // The child change watchers will be set inside rebalance when we read the children list. } @@ -315,9 +314,8 @@ private[kafka] class ZookeeperConsumerConnector(val config: ConsumerConfig, def handleDataChange(dataPath : String, data: Object) { try { info("Topic info for path " + dataPath + " changed to " + data.toString + ", triggering rebalance") - // explicitly trigger load balancing for this consumer - loadBalancerListener.syncedRebalance() - + // queue up the rebalance event + loadBalancerListener.rebalanceEventTriggered() // There is no need to re-subscribe the watcher since it will be automatically // re-registered upon firing of this event by zkClient } catch { @@ -335,7 +333,6 @@ private[kafka] class ZookeeperConsumerConnector(val config: ConsumerConfig, class ZKRebalancerListener(val group: String, val consumerIdString: String, val kafkaMessageAndMetadataStreams: mutable.Map[String,List[KafkaStream[_,_]]]) extends IZkChildListener { - private val correlationId = new AtomicInteger(0) private var isWatcherTriggered = false private val lock = new ReentrantLock private val cond = lock.newCondition() @@ -367,6 +364,10 @@ private[kafka] class ZookeeperConsumerConnector(val config: ConsumerConfig, @throws(classOf[Exception]) def handleChildChange(parentPath : String, curChilds : java.util.List[String]) { + rebalanceEventTriggered() + } + + def rebalanceEventTriggered() { inLock(lock) { isWatcherTriggered = true cond.signalAll() @@ -655,8 +656,8 @@ private[kafka] class ZookeeperConsumerConnector(val config: ConsumerConfig, dirs, consumerIdString, topicCount, loadBalancerListener) // create listener for topic partition change event if not exist yet - if (topicPartitionChangeListenner == null) - topicPartitionChangeListenner = new ZKTopicPartitionChangeListener(loadBalancerListener) + if (topicPartitionChangeListener == null) + topicPartitionChangeListener = new ZKTopicPartitionChangeListener(loadBalancerListener) val topicStreamsMap = loadBalancerListener.kafkaMessageAndMetadataStreams @@ -714,7 +715,7 @@ private[kafka] class ZookeeperConsumerConnector(val config: ConsumerConfig, topicStreamsMap.foreach { topicAndStreams => // register on broker partition path changes val topicPath = BrokerTopicsPath + "/" + topicAndStreams._1 - zkClient.subscribeDataChanges(topicPath, topicPartitionChangeListenner) + zkClient.subscribeDataChanges(topicPath, topicPartitionChangeListener) } // explicitly trigger load balancing for this consumer