Updated Branches: refs/heads/0.8 c39d37e9d -> bf31a6bf7
kafka-969; Need to prevent failure of rebalance when there are no brokers available when consumer comes up; patched by Sriram Subramanian; reviewed by Joel Koshy and Jun Rao Project: http://git-wip-us.apache.org/repos/asf/kafka/repo Commit: http://git-wip-us.apache.org/repos/asf/kafka/commit/bf31a6bf Tree: http://git-wip-us.apache.org/repos/asf/kafka/tree/bf31a6bf Diff: http://git-wip-us.apache.org/repos/asf/kafka/diff/bf31a6bf Branch: refs/heads/0.8 Commit: bf31a6bf705f0438dfc15320ff7047a62eb8f94d Parents: c39d37e Author: Sriram Subramanian <sri...@gmail.com> Authored: Wed Jul 10 22:50:28 2013 -0700 Committer: Jun Rao <jun...@gmail.com> Committed: Wed Jul 10 22:50:28 2013 -0700 ---------------------------------------------------------------------- .../consumer/ZookeeperConsumerConnector.scala | 152 ++++++++++--------- 1 file changed, 81 insertions(+), 71 deletions(-) ---------------------------------------------------------------------- http://git-wip-us.apache.org/repos/asf/kafka/blob/bf31a6bf/core/src/main/scala/kafka/consumer/ZookeeperConsumerConnector.scala ---------------------------------------------------------------------- diff --git a/core/src/main/scala/kafka/consumer/ZookeeperConsumerConnector.scala b/core/src/main/scala/kafka/consumer/ZookeeperConsumerConnector.scala index d952187..e3944d5 100644 --- a/core/src/main/scala/kafka/consumer/ZookeeperConsumerConnector.scala +++ b/core/src/main/scala/kafka/consumer/ZookeeperConsumerConnector.scala @@ -401,81 +401,91 @@ private[kafka] class ZookeeperConsumerConnector(val config: ConsumerConfig, val myTopicThreadIdsMap = TopicCount.constructTopicCount(group, consumerIdString, zkClient).getConsumerThreadIdsPerTopic val consumersPerTopicMap = getConsumersPerTopic(zkClient, group) val brokers = getAllBrokersInCluster(zkClient) - val topicsMetadata = ClientUtils.fetchTopicMetadata(myTopicThreadIdsMap.keySet, - brokers, - config.clientId, - config.socketTimeoutMs, - correlationId.getAndIncrement).topicsMetadata - val partitionsPerTopicMap = new mutable.HashMap[String, Seq[Int]] - topicsMetadata.foreach(m => { - val topic = m.topic - val partitions = m.partitionsMetadata.map(m1 => m1.partitionId) - partitionsPerTopicMap.put(topic, partitions) - }) - - /** - * fetchers must be stopped to avoid data duplication, since if the current - * rebalancing attempt fails, the partitions that are released could be owned by another consumer. - * But if we don't stop the fetchers first, this consumer would continue returning data for released - * partitions in parallel. So, not stopping the fetchers leads to duplicate data. - */ - closeFetchers(cluster, kafkaMessageAndMetadataStreams, myTopicThreadIdsMap) - - releasePartitionOwnership(topicRegistry) - - var partitionOwnershipDecision = new collection.mutable.HashMap[(String, Int), String]() - val currentTopicRegistry = new Pool[String, Pool[Int, PartitionTopicInfo]] - - for ((topic, consumerThreadIdSet) <- myTopicThreadIdsMap) { - currentTopicRegistry.put(topic, new Pool[Int, PartitionTopicInfo]) - - val topicDirs = new ZKGroupTopicDirs(group, topic) - val curConsumers = consumersPerTopicMap.get(topic).get - val curPartitions: Seq[Int] = partitionsPerTopicMap.get(topic).get - - val nPartsPerConsumer = curPartitions.size / curConsumers.size - val nConsumersWithExtraPart = curPartitions.size % curConsumers.size - - info("Consumer " + consumerIdString + " rebalancing the following partitions: " + curPartitions + - " for topic " + topic + " with consumers: " + curConsumers) - - for (consumerThreadId <- consumerThreadIdSet) { - val myConsumerPosition = curConsumers.findIndexOf(_ == consumerThreadId) - assert(myConsumerPosition >= 0) - val startPart = nPartsPerConsumer*myConsumerPosition + myConsumerPosition.min(nConsumersWithExtraPart) - val nParts = nPartsPerConsumer + (if (myConsumerPosition + 1 > nConsumersWithExtraPart) 0 else 1) - - /** - * Range-partition the sorted partitions to consumers for better locality. - * The first few consumers pick up an extra partition, if any. - */ - if (nParts <= 0) - warn("No broker partitions consumed by consumer thread " + consumerThreadId + " for topic " + topic) - else { - for (i <- startPart until startPart + nParts) { - val partition = curPartitions(i) - info(consumerThreadId + " attempting to claim partition " + partition) - addPartitionTopicInfo(currentTopicRegistry, topicDirs, partition, topic, consumerThreadId) - // record the partition ownership decision - partitionOwnershipDecision += ((topic, partition) -> consumerThreadId) + if (brokers.size == 0) { + // This can happen in a rare case when there are no brokers available in the cluster when the consumer is started. + // We log an warning and register for child changes on brokers/id so that rebalance can be triggered when the brokers + // are up. + warn("no brokers found when trying to rebalance.") + zkClient.subscribeChildChanges(ZkUtils.BrokerIdsPath, loadBalancerListener) + true + } + else { + val topicsMetadata = ClientUtils.fetchTopicMetadata(myTopicThreadIdsMap.keySet, + brokers, + config.clientId, + config.socketTimeoutMs, + correlationId.getAndIncrement).topicsMetadata + val partitionsPerTopicMap = new mutable.HashMap[String, Seq[Int]] + topicsMetadata.foreach(m => { + val topic = m.topic + val partitions = m.partitionsMetadata.map(m1 => m1.partitionId) + partitionsPerTopicMap.put(topic, partitions) + }) + + /** + * fetchers must be stopped to avoid data duplication, since if the current + * rebalancing attempt fails, the partitions that are released could be owned by another consumer. + * But if we don't stop the fetchers first, this consumer would continue returning data for released + * partitions in parallel. So, not stopping the fetchers leads to duplicate data. + */ + closeFetchers(cluster, kafkaMessageAndMetadataStreams, myTopicThreadIdsMap) + + releasePartitionOwnership(topicRegistry) + + var partitionOwnershipDecision = new collection.mutable.HashMap[(String, Int), String]() + val currentTopicRegistry = new Pool[String, Pool[Int, PartitionTopicInfo]] + + for ((topic, consumerThreadIdSet) <- myTopicThreadIdsMap) { + currentTopicRegistry.put(topic, new Pool[Int, PartitionTopicInfo]) + + val topicDirs = new ZKGroupTopicDirs(group, topic) + val curConsumers = consumersPerTopicMap.get(topic).get + val curPartitions: Seq[Int] = partitionsPerTopicMap.get(topic).get + + val nPartsPerConsumer = curPartitions.size / curConsumers.size + val nConsumersWithExtraPart = curPartitions.size % curConsumers.size + + info("Consumer " + consumerIdString + " rebalancing the following partitions: " + curPartitions + + " for topic " + topic + " with consumers: " + curConsumers) + + for (consumerThreadId <- consumerThreadIdSet) { + val myConsumerPosition = curConsumers.findIndexOf(_ == consumerThreadId) + assert(myConsumerPosition >= 0) + val startPart = nPartsPerConsumer*myConsumerPosition + myConsumerPosition.min(nConsumersWithExtraPart) + val nParts = nPartsPerConsumer + (if (myConsumerPosition + 1 > nConsumersWithExtraPart) 0 else 1) + + /** + * Range-partition the sorted partitions to consumers for better locality. + * The first few consumers pick up an extra partition, if any. + */ + if (nParts <= 0) + warn("No broker partitions consumed by consumer thread " + consumerThreadId + " for topic " + topic) + else { + for (i <- startPart until startPart + nParts) { + val partition = curPartitions(i) + info(consumerThreadId + " attempting to claim partition " + partition) + addPartitionTopicInfo(currentTopicRegistry, topicDirs, partition, topic, consumerThreadId) + // record the partition ownership decision + partitionOwnershipDecision += ((topic, partition) -> consumerThreadId) + } } } } - } - /** - * move the partition ownership here, since that can be used to indicate a truly successful rebalancing attempt - * A rebalancing attempt is completed successfully only after the fetchers have been started correctly - */ - if(reflectPartitionOwnershipDecision(partitionOwnershipDecision.toMap)) { - info("Updating the cache") - debug("Partitions per topic cache " + partitionsPerTopicMap) - debug("Consumers per topic cache " + consumersPerTopicMap) - topicRegistry = currentTopicRegistry - updateFetcher(cluster) - true - } else { - false + /** + * move the partition ownership here, since that can be used to indicate a truly successful rebalancing attempt + * A rebalancing attempt is completed successfully only after the fetchers have been started correctly + */ + if(reflectPartitionOwnershipDecision(partitionOwnershipDecision.toMap)) { + info("Updating the cache") + debug("Partitions per topic cache " + partitionsPerTopicMap) + debug("Consumers per topic cache " + consumersPerTopicMap) + topicRegistry = currentTopicRegistry + updateFetcher(cluster) + true + } else { + false + } } }