Repository: kafka Updated Branches: refs/heads/trunk b30d68a4e -> d23785ff2
KAFKA-2738: Replica FetcherThread should connect to leader endpoint matching its inter-broker security protocol. â¦atching its inter-broker security protocol Author: Gwen Shapira <[email protected]> Reviewers: Jun Rao, Guozhang Wang Closes #428 from gwenshap/KAFKA-2738 Project: http://git-wip-us.apache.org/repos/asf/kafka/repo Commit: http://git-wip-us.apache.org/repos/asf/kafka/commit/d23785ff Tree: http://git-wip-us.apache.org/repos/asf/kafka/tree/d23785ff Diff: http://git-wip-us.apache.org/repos/asf/kafka/diff/d23785ff Branch: refs/heads/trunk Commit: d23785ff2df136090e3c80a3a6df5cb85443924b Parents: b30d68a Author: Gwen Shapira <[email protected]> Authored: Thu Nov 5 09:11:24 2015 -0800 Committer: Guozhang Wang <[email protected]> Committed: Thu Nov 5 09:11:24 2015 -0800 ---------------------------------------------------------------------- core/src/main/scala/kafka/server/KafkaApis.scala | 2 +- core/src/main/scala/kafka/server/ReplicaManager.scala | 14 +++++++------- 2 files changed, 8 insertions(+), 8 deletions(-) ---------------------------------------------------------------------- http://git-wip-us.apache.org/repos/asf/kafka/blob/d23785ff/core/src/main/scala/kafka/server/KafkaApis.scala ---------------------------------------------------------------------- diff --git a/core/src/main/scala/kafka/server/KafkaApis.scala b/core/src/main/scala/kafka/server/KafkaApis.scala index df064e4..d1c6f79 100644 --- a/core/src/main/scala/kafka/server/KafkaApis.scala +++ b/core/src/main/scala/kafka/server/KafkaApis.scala @@ -113,7 +113,7 @@ class KafkaApis(val requestChannel: RequestChannel, try { // call replica manager to handle updating partitions to become leader or follower - val result = replicaManager.becomeLeaderOrFollower(leaderAndIsrRequest) + val result = replicaManager.becomeLeaderOrFollower(leaderAndIsrRequest, metadataCache) val leaderAndIsrResponse = new LeaderAndIsrResponse(leaderAndIsrRequest.correlationId, result.responseMap, result.errorCode) // for each new leader or follower, call coordinator to handle // consumer group migration http://git-wip-us.apache.org/repos/asf/kafka/blob/d23785ff/core/src/main/scala/kafka/server/ReplicaManager.scala ---------------------------------------------------------------------- diff --git a/core/src/main/scala/kafka/server/ReplicaManager.scala b/core/src/main/scala/kafka/server/ReplicaManager.scala index 89f2462..7823659 100644 --- a/core/src/main/scala/kafka/server/ReplicaManager.scala +++ b/core/src/main/scala/kafka/server/ReplicaManager.scala @@ -583,7 +583,7 @@ class ReplicaManager(val config: KafkaConfig, } } - def becomeLeaderOrFollower(leaderAndISRRequest: LeaderAndIsrRequest): BecomeLeaderOrFollowerResult = { + def becomeLeaderOrFollower(leaderAndISRRequest: LeaderAndIsrRequest, metadataCache: MetadataCache): BecomeLeaderOrFollowerResult = { leaderAndISRRequest.partitionStateInfos.foreach { case ((topic, partition), stateInfo) => stateChangeLogger.trace("Broker %d received LeaderAndIsr request %s correlation id %d from controller %d epoch %d for partition [%s,%d]" .format(localBrokerId, stateInfo, leaderAndISRRequest.correlationId, @@ -639,7 +639,7 @@ class ReplicaManager(val config: KafkaConfig, else Set.empty[Partition] val partitionsBecomeFollower = if (!partitionsToBeFollower.isEmpty) - makeFollowers(controllerId, controllerEpoch, partitionsToBeFollower, leaderAndISRRequest.leaders, leaderAndISRRequest.correlationId, responseMap) + makeFollowers(controllerId, controllerEpoch, partitionsToBeFollower, leaderAndISRRequest.correlationId, responseMap, metadataCache) else Set.empty[Partition] @@ -731,9 +731,9 @@ class ReplicaManager(val config: KafkaConfig, private def makeFollowers(controllerId: Int, epoch: Int, partitionState: Map[Partition, PartitionStateInfo], - leaders: Set[BrokerEndPoint], correlationId: Int, - responseMap: mutable.Map[(String, Int), Short]) : Set[Partition] = { + responseMap: mutable.Map[(String, Int), Short], + metadataCache: MetadataCache) : Set[Partition] = { partitionState.foreach { state => stateChangeLogger.trace(("Broker %d handling LeaderAndIsr request correlationId %d from controller %d epoch %d " + "starting the become-follower transition for partition %s") @@ -751,7 +751,7 @@ class ReplicaManager(val config: KafkaConfig, partitionState.foreach{ case (partition, partitionStateInfo) => val leaderIsrAndControllerEpoch = partitionStateInfo.leaderIsrAndControllerEpoch val newLeaderBrokerId = leaderIsrAndControllerEpoch.leaderAndIsr.leader - leaders.find(_.id == newLeaderBrokerId) match { + metadataCache.getAliveBrokers.find(_.id == newLeaderBrokerId) match { // Only change partition state when the leader is available case Some(leaderBroker) => if (partition.makeFollower(controllerId, partitionStateInfo, correlationId)) @@ -762,7 +762,7 @@ class ReplicaManager(val config: KafkaConfig, .format(localBrokerId, correlationId, controllerId, leaderIsrAndControllerEpoch.controllerEpoch, partition.topic, partition.partitionId, newLeaderBrokerId)) case None => - // The leader broker should always be present in the leaderAndIsrRequest. + // The leader broker should always be present in the metadata cache. // If not, we should record the error message and abort the transition process for this partition stateChangeLogger.error(("Broker %d received LeaderAndIsrRequest with correlation id %d from controller" + " %d epoch %d for partition [%s,%d] but cannot become follower since the new leader %d is unavailable.") @@ -800,7 +800,7 @@ class ReplicaManager(val config: KafkaConfig, // we do not need to check if the leader exists again since this has been done at the beginning of this process val partitionsToMakeFollowerWithLeaderAndOffset = partitionsToMakeFollower.map(partition => new TopicAndPartition(partition) -> BrokerAndInitialOffset( - leaders.find(_.id == partition.leaderReplicaIdOpt.get).get, + metadataCache.getAliveBrokers.find(_.id == partition.leaderReplicaIdOpt.get).get.getBrokerEndPoint(config.interBrokerSecurityProtocol), partition.getReplica().get.logEndOffset.messageOffset)).toMap replicaFetcherManager.addFetcherForPartitions(partitionsToMakeFollowerWithLeaderAndOffset)
