Updated Branches:
  refs/heads/0.8 c39d37e9d -> bf31a6bf7

kafka-969; Need to prevent failure of rebalance when there are no brokers 
available when consumer comes up; patched by Sriram Subramanian; reviewed by 
Joel Koshy and Jun Rao


Project: http://git-wip-us.apache.org/repos/asf/kafka/repo
Commit: http://git-wip-us.apache.org/repos/asf/kafka/commit/bf31a6bf
Tree: http://git-wip-us.apache.org/repos/asf/kafka/tree/bf31a6bf
Diff: http://git-wip-us.apache.org/repos/asf/kafka/diff/bf31a6bf

Branch: refs/heads/0.8
Commit: bf31a6bf705f0438dfc15320ff7047a62eb8f94d
Parents: c39d37e
Author: Sriram Subramanian <sri...@gmail.com>
Authored: Wed Jul 10 22:50:28 2013 -0700
Committer: Jun Rao <jun...@gmail.com>
Committed: Wed Jul 10 22:50:28 2013 -0700

----------------------------------------------------------------------
 .../consumer/ZookeeperConsumerConnector.scala   | 152 ++++++++++---------
 1 file changed, 81 insertions(+), 71 deletions(-)
----------------------------------------------------------------------


http://git-wip-us.apache.org/repos/asf/kafka/blob/bf31a6bf/core/src/main/scala/kafka/consumer/ZookeeperConsumerConnector.scala
----------------------------------------------------------------------
diff --git 
a/core/src/main/scala/kafka/consumer/ZookeeperConsumerConnector.scala 
b/core/src/main/scala/kafka/consumer/ZookeeperConsumerConnector.scala
index d952187..e3944d5 100644
--- a/core/src/main/scala/kafka/consumer/ZookeeperConsumerConnector.scala
+++ b/core/src/main/scala/kafka/consumer/ZookeeperConsumerConnector.scala
@@ -401,81 +401,91 @@ private[kafka] class ZookeeperConsumerConnector(val 
config: ConsumerConfig,
       val myTopicThreadIdsMap = TopicCount.constructTopicCount(group, 
consumerIdString, zkClient).getConsumerThreadIdsPerTopic
       val consumersPerTopicMap = getConsumersPerTopic(zkClient, group)
       val brokers = getAllBrokersInCluster(zkClient)
-      val topicsMetadata = 
ClientUtils.fetchTopicMetadata(myTopicThreadIdsMap.keySet,
-                                                          brokers,
-                                                          config.clientId,
-                                                          
config.socketTimeoutMs,
-                                                          
correlationId.getAndIncrement).topicsMetadata
-      val partitionsPerTopicMap = new mutable.HashMap[String, Seq[Int]]
-      topicsMetadata.foreach(m => {
-        val topic = m.topic
-        val partitions = m.partitionsMetadata.map(m1 => m1.partitionId)
-        partitionsPerTopicMap.put(topic, partitions)
-      })
-
-      /**
-       * fetchers must be stopped to avoid data duplication, since if the 
current
-       * rebalancing attempt fails, the partitions that are released could be 
owned by another consumer.
-       * But if we don't stop the fetchers first, this consumer would continue 
returning data for released
-       * partitions in parallel. So, not stopping the fetchers leads to 
duplicate data.
-       */
-      closeFetchers(cluster, kafkaMessageAndMetadataStreams, 
myTopicThreadIdsMap)
-
-      releasePartitionOwnership(topicRegistry)
-
-      var partitionOwnershipDecision = new collection.mutable.HashMap[(String, 
Int), String]()
-      val currentTopicRegistry = new Pool[String, Pool[Int, 
PartitionTopicInfo]]
-
-      for ((topic, consumerThreadIdSet) <- myTopicThreadIdsMap) {
-        currentTopicRegistry.put(topic, new Pool[Int, PartitionTopicInfo])
-
-        val topicDirs = new ZKGroupTopicDirs(group, topic)
-        val curConsumers = consumersPerTopicMap.get(topic).get
-        val curPartitions: Seq[Int] = partitionsPerTopicMap.get(topic).get
-
-        val nPartsPerConsumer = curPartitions.size / curConsumers.size
-        val nConsumersWithExtraPart = curPartitions.size % curConsumers.size
-
-        info("Consumer " + consumerIdString + " rebalancing the following 
partitions: " + curPartitions +
-          " for topic " + topic + " with consumers: " + curConsumers)
-
-        for (consumerThreadId <- consumerThreadIdSet) {
-          val myConsumerPosition = curConsumers.findIndexOf(_ == 
consumerThreadId)
-          assert(myConsumerPosition >= 0)
-          val startPart = nPartsPerConsumer*myConsumerPosition + 
myConsumerPosition.min(nConsumersWithExtraPart)
-          val nParts = nPartsPerConsumer + (if (myConsumerPosition + 1 > 
nConsumersWithExtraPart) 0 else 1)
-
-          /**
-           *   Range-partition the sorted partitions to consumers for better 
locality.
-           *  The first few consumers pick up an extra partition, if any.
-           */
-          if (nParts <= 0)
-            warn("No broker partitions consumed by consumer thread " + 
consumerThreadId + " for topic " + topic)
-          else {
-            for (i <- startPart until startPart + nParts) {
-              val partition = curPartitions(i)
-              info(consumerThreadId + " attempting to claim partition " + 
partition)
-              addPartitionTopicInfo(currentTopicRegistry, topicDirs, 
partition, topic, consumerThreadId)
-              // record the partition ownership decision
-              partitionOwnershipDecision += ((topic, partition) -> 
consumerThreadId)
+      if (brokers.size == 0) {
+        // This can happen in a rare case when there are no brokers available 
in the cluster when the consumer is started.
+        // We log an warning and register for child changes on brokers/id so 
that rebalance can be triggered when the brokers
+        // are up.
+        warn("no brokers found when trying to rebalance.")
+        zkClient.subscribeChildChanges(ZkUtils.BrokerIdsPath, 
loadBalancerListener)
+        true
+      }
+      else {
+        val topicsMetadata = 
ClientUtils.fetchTopicMetadata(myTopicThreadIdsMap.keySet,
+                                                            brokers,
+                                                            config.clientId,
+                                                            
config.socketTimeoutMs,
+                                                            
correlationId.getAndIncrement).topicsMetadata
+        val partitionsPerTopicMap = new mutable.HashMap[String, Seq[Int]]
+        topicsMetadata.foreach(m => {
+          val topic = m.topic
+          val partitions = m.partitionsMetadata.map(m1 => m1.partitionId)
+          partitionsPerTopicMap.put(topic, partitions)
+        })
+
+        /**
+         * fetchers must be stopped to avoid data duplication, since if the 
current
+         * rebalancing attempt fails, the partitions that are released could 
be owned by another consumer.
+         * But if we don't stop the fetchers first, this consumer would 
continue returning data for released
+         * partitions in parallel. So, not stopping the fetchers leads to 
duplicate data.
+         */
+        closeFetchers(cluster, kafkaMessageAndMetadataStreams, 
myTopicThreadIdsMap)
+
+        releasePartitionOwnership(topicRegistry)
+
+        var partitionOwnershipDecision = new 
collection.mutable.HashMap[(String, Int), String]()
+        val currentTopicRegistry = new Pool[String, Pool[Int, 
PartitionTopicInfo]]
+
+        for ((topic, consumerThreadIdSet) <- myTopicThreadIdsMap) {
+          currentTopicRegistry.put(topic, new Pool[Int, PartitionTopicInfo])
+
+          val topicDirs = new ZKGroupTopicDirs(group, topic)
+          val curConsumers = consumersPerTopicMap.get(topic).get
+          val curPartitions: Seq[Int] = partitionsPerTopicMap.get(topic).get
+
+          val nPartsPerConsumer = curPartitions.size / curConsumers.size
+          val nConsumersWithExtraPart = curPartitions.size % curConsumers.size
+
+          info("Consumer " + consumerIdString + " rebalancing the following 
partitions: " + curPartitions +
+            " for topic " + topic + " with consumers: " + curConsumers)
+
+          for (consumerThreadId <- consumerThreadIdSet) {
+            val myConsumerPosition = curConsumers.findIndexOf(_ == 
consumerThreadId)
+            assert(myConsumerPosition >= 0)
+            val startPart = nPartsPerConsumer*myConsumerPosition + 
myConsumerPosition.min(nConsumersWithExtraPart)
+            val nParts = nPartsPerConsumer + (if (myConsumerPosition + 1 > 
nConsumersWithExtraPart) 0 else 1)
+
+            /**
+             *   Range-partition the sorted partitions to consumers for better 
locality.
+             *  The first few consumers pick up an extra partition, if any.
+             */
+            if (nParts <= 0)
+              warn("No broker partitions consumed by consumer thread " + 
consumerThreadId + " for topic " + topic)
+            else {
+              for (i <- startPart until startPart + nParts) {
+                val partition = curPartitions(i)
+                info(consumerThreadId + " attempting to claim partition " + 
partition)
+                addPartitionTopicInfo(currentTopicRegistry, topicDirs, 
partition, topic, consumerThreadId)
+                // record the partition ownership decision
+                partitionOwnershipDecision += ((topic, partition) -> 
consumerThreadId)
+              }
             }
           }
         }
-      }
 
-      /**
-       * move the partition ownership here, since that can be used to indicate 
a truly successful rebalancing attempt
-       * A rebalancing attempt is completed successfully only after the 
fetchers have been started correctly
-       */
-      if(reflectPartitionOwnershipDecision(partitionOwnershipDecision.toMap)) {
-        info("Updating the cache")
-        debug("Partitions per topic cache " + partitionsPerTopicMap)
-        debug("Consumers per topic cache " + consumersPerTopicMap)
-        topicRegistry = currentTopicRegistry
-        updateFetcher(cluster)
-        true
-      } else {
-        false
+        /**
+         * move the partition ownership here, since that can be used to 
indicate a truly successful rebalancing attempt
+         * A rebalancing attempt is completed successfully only after the 
fetchers have been started correctly
+         */
+        
if(reflectPartitionOwnershipDecision(partitionOwnershipDecision.toMap)) {
+          info("Updating the cache")
+          debug("Partitions per topic cache " + partitionsPerTopicMap)
+          debug("Consumers per topic cache " + consumersPerTopicMap)
+          topicRegistry = currentTopicRegistry
+          updateFetcher(cluster)
+          true
+        } else {
+          false
+        }
       }
     }
 

Reply via email to