KAFKA-820 Topic metadata request handling fails to return all metadata about replicas; reviewed by Jun Rao
Project: http://git-wip-us.apache.org/repos/asf/kafka/repo Commit: http://git-wip-us.apache.org/repos/asf/kafka/commit/08b2a37c Tree: http://git-wip-us.apache.org/repos/asf/kafka/tree/08b2a37c Diff: http://git-wip-us.apache.org/repos/asf/kafka/diff/08b2a37c Branch: refs/heads/trunk Commit: 08b2a37c33ed907614e9621f315bb737f9be490e Parents: 7b14eba Author: Neha Narkhede <[email protected]> Authored: Fri Mar 22 09:12:41 2013 -0700 Committer: Neha Narkhede <[email protected]> Committed: Fri Mar 22 09:12:41 2013 -0700 ---------------------------------------------------------------------- core/src/main/scala/kafka/admin/AdminUtils.scala | 36 ++++++++++----- .../scala/kafka/producer/BrokerPartitionInfo.scala | 10 ++-- .../main/scala/kafka/producer/ProducerPool.scala | 4 +- 3 files changed, 31 insertions(+), 19 deletions(-) ---------------------------------------------------------------------- http://git-wip-us.apache.org/repos/asf/kafka/blob/08b2a37c/core/src/main/scala/kafka/admin/AdminUtils.scala ---------------------------------------------------------------------- diff --git a/core/src/main/scala/kafka/admin/AdminUtils.scala b/core/src/main/scala/kafka/admin/AdminUtils.scala index b9ef4dc..f4bf3b9 100644 --- a/core/src/main/scala/kafka/admin/AdminUtils.scala +++ b/core/src/main/scala/kafka/admin/AdminUtils.scala @@ -24,6 +24,7 @@ import kafka.utils.{Logging, ZkUtils} import org.I0Itec.zkclient.ZkClient import org.I0Itec.zkclient.exception.ZkNodeExistsException import scala.collection._ +import mutable.ListBuffer import scala.collection.mutable import kafka.common._ import scala.Some @@ -111,13 +112,14 @@ object AdminUtils extends Logging { var replicaInfo: Seq[Broker] = Nil var isrInfo: Seq[Broker] = Nil try { - try { - leaderInfo = leader match { - case Some(l) => Some(getBrokerInfoFromCache(zkClient, cachedBrokerInfo, List(l)).head) - case None => throw new LeaderNotAvailableException("No leader exists for partition " + partition) - } - } catch { - case e => throw new LeaderNotAvailableException("Leader not available for topic %s partition %d".format(topic, partition), e) + leaderInfo = leader match { + case Some(l) => + try { + Some(getBrokerInfoFromCache(zkClient, cachedBrokerInfo, List(l)).head) + } catch { + case e => throw new LeaderNotAvailableException("Leader not available for topic %s partition %d".format(topic, partition), e) + } + case None => throw new LeaderNotAvailableException("No leader exists for partition " + partition) } try { replicaInfo = getBrokerInfoFromCache(zkClient, cachedBrokerInfo, replicas.map(id => id.toInt)) @@ -125,12 +127,18 @@ object AdminUtils extends Logging { } catch { case e => throw new ReplicaNotAvailableException(e) } + if(replicaInfo.size < replicas.size) + throw new ReplicaNotAvailableException("Replica information not available for following brokers: " + + replicas.filterNot(replicaInfo.map(_.id).contains(_)).mkString(",")) + if(isrInfo.size < inSyncReplicas.size) + throw new ReplicaNotAvailableException("In Sync Replica information not available for following brokers: " + + inSyncReplicas.filterNot(isrInfo.map(_.id).contains(_)).mkString(",")) new PartitionMetadata(partition, leaderInfo, replicaInfo, isrInfo, ErrorMapping.NoError) } catch { case e => error("Error while fetching metadata for partition [%s,%d]".format(topic, partition), e) new PartitionMetadata(partition, leaderInfo, replicaInfo, isrInfo, - ErrorMapping.codeFor(e.getClass.asInstanceOf[Class[Throwable]])) + ErrorMapping.codeFor(e.getClass.asInstanceOf[Class[Throwable]])) } } new TopicMetadata(topic, partitionMetadata) @@ -143,19 +151,23 @@ object AdminUtils extends Logging { private def getBrokerInfoFromCache(zkClient: ZkClient, cachedBrokerInfo: scala.collection.mutable.Map[Int, Broker], brokerIds: Seq[Int]): Seq[Broker] = { - brokerIds.map { id => + var failedBrokerIds: ListBuffer[Int] = new ListBuffer() + val brokerMetadata = brokerIds.map { id => val optionalBrokerInfo = cachedBrokerInfo.get(id) optionalBrokerInfo match { - case Some(brokerInfo) => brokerInfo // return broker info from the cache + case Some(brokerInfo) => Some(brokerInfo) // return broker info from the cache case None => // fetch it from zookeeper ZkUtils.getBrokerInfo(zkClient, id) match { case Some(brokerInfo) => cachedBrokerInfo += (id -> brokerInfo) - brokerInfo - case None => throw new BrokerNotAvailableException("Failed to fetch broker info for broker " + id) + Some(brokerInfo) + case None => + failedBrokerIds += id + None } } } + brokerMetadata.filter(_.isDefined).map(_.get) } private def getWrappedIndex(firstReplicaIndex: Int, secondReplicaShift: Int, replicaIndex: Int, nBrokers: Int): Int = { http://git-wip-us.apache.org/repos/asf/kafka/blob/08b2a37c/core/src/main/scala/kafka/producer/BrokerPartitionInfo.scala ---------------------------------------------------------------------- diff --git a/core/src/main/scala/kafka/producer/BrokerPartitionInfo.scala b/core/src/main/scala/kafka/producer/BrokerPartitionInfo.scala index 617fc43..72597ef 100644 --- a/core/src/main/scala/kafka/producer/BrokerPartitionInfo.scala +++ b/core/src/main/scala/kafka/producer/BrokerPartitionInfo.scala @@ -80,12 +80,12 @@ class BrokerPartitionInfo(producerConfig: ProducerConfig, if(tmd.errorCode == ErrorMapping.NoError){ topicPartitionInfo.put(tmd.topic, tmd) } else - warn("Error while fetching metadata for topic [%s]: [%s]".format(tmd.topic, tmd), ErrorMapping.exceptionFor(tmd.errorCode)) + warn("Error while fetching metadata [%s] for topic [%s]: %s ".format(tmd, tmd.topic, ErrorMapping.exceptionFor(tmd.errorCode).getClass)) tmd.partitionsMetadata.foreach(pmd =>{ - if (pmd.errorCode != ErrorMapping.NoError){ - warn("Error while fetching metadata for topic partition [%s,%d]: [%s]".format(tmd.topic, pmd.partitionId, pmd), - ErrorMapping.exceptionFor(pmd.errorCode)) - } + if (pmd.errorCode != ErrorMapping.NoError && pmd.errorCode == ErrorMapping.LeaderNotAvailableCode) { + warn("Error while fetching metadata %s for topic partition [%s,%d]: [%s]".format(pmd, tmd.topic, pmd.partitionId, + ErrorMapping.exceptionFor(pmd.errorCode).getClass)) + } // any other error code (e.g. ReplicaNotAvailable) can be ignored since the producer does not need to access the replica and isr metadata }) }) producerPool.updateProducer(topicsMetadata) http://git-wip-us.apache.org/repos/asf/kafka/blob/08b2a37c/core/src/main/scala/kafka/producer/ProducerPool.scala ---------------------------------------------------------------------- diff --git a/core/src/main/scala/kafka/producer/ProducerPool.scala b/core/src/main/scala/kafka/producer/ProducerPool.scala index 4970029..43df70b 100644 --- a/core/src/main/scala/kafka/producer/ProducerPool.scala +++ b/core/src/main/scala/kafka/producer/ProducerPool.scala @@ -43,9 +43,9 @@ class ProducerPool(val config: ProducerConfig) extends Logging { private val syncProducers = new HashMap[Int, SyncProducer] private val lock = new Object() - def updateProducer(topicMetadatas: Seq[TopicMetadata]) { + def updateProducer(topicMetadata: Seq[TopicMetadata]) { val newBrokers = new collection.mutable.HashSet[Broker] - topicMetadatas.foreach(tmd => { + topicMetadata.foreach(tmd => { tmd.partitionsMetadata.foreach(pmd => { if(pmd.leader.isDefined) newBrokers+=(pmd.leader.get)
