Repository: kafka
Updated Branches:
  refs/heads/trunk b30d68a4e -> d23785ff2


KAFKA-2738: Replica FetcherThread should connect to leader endpoint matching 
its inter-broker security protocol.

…atching its inter-broker security protocol

Author: Gwen Shapira <[email protected]>

Reviewers: Jun Rao, Guozhang Wang

Closes #428 from gwenshap/KAFKA-2738


Project: http://git-wip-us.apache.org/repos/asf/kafka/repo
Commit: http://git-wip-us.apache.org/repos/asf/kafka/commit/d23785ff
Tree: http://git-wip-us.apache.org/repos/asf/kafka/tree/d23785ff
Diff: http://git-wip-us.apache.org/repos/asf/kafka/diff/d23785ff

Branch: refs/heads/trunk
Commit: d23785ff2df136090e3c80a3a6df5cb85443924b
Parents: b30d68a
Author: Gwen Shapira <[email protected]>
Authored: Thu Nov 5 09:11:24 2015 -0800
Committer: Guozhang Wang <[email protected]>
Committed: Thu Nov 5 09:11:24 2015 -0800

----------------------------------------------------------------------
 core/src/main/scala/kafka/server/KafkaApis.scala      |  2 +-
 core/src/main/scala/kafka/server/ReplicaManager.scala | 14 +++++++-------
 2 files changed, 8 insertions(+), 8 deletions(-)
----------------------------------------------------------------------


http://git-wip-us.apache.org/repos/asf/kafka/blob/d23785ff/core/src/main/scala/kafka/server/KafkaApis.scala
----------------------------------------------------------------------
diff --git a/core/src/main/scala/kafka/server/KafkaApis.scala 
b/core/src/main/scala/kafka/server/KafkaApis.scala
index df064e4..d1c6f79 100644
--- a/core/src/main/scala/kafka/server/KafkaApis.scala
+++ b/core/src/main/scala/kafka/server/KafkaApis.scala
@@ -113,7 +113,7 @@ class KafkaApis(val requestChannel: RequestChannel,
 
     try {
       // call replica manager to handle updating partitions to become leader 
or follower
-      val result = replicaManager.becomeLeaderOrFollower(leaderAndIsrRequest)
+      val result = replicaManager.becomeLeaderOrFollower(leaderAndIsrRequest, 
metadataCache)
       val leaderAndIsrResponse = new 
LeaderAndIsrResponse(leaderAndIsrRequest.correlationId, result.responseMap, 
result.errorCode)
       // for each new leader or follower, call coordinator to handle
       // consumer group migration

http://git-wip-us.apache.org/repos/asf/kafka/blob/d23785ff/core/src/main/scala/kafka/server/ReplicaManager.scala
----------------------------------------------------------------------
diff --git a/core/src/main/scala/kafka/server/ReplicaManager.scala 
b/core/src/main/scala/kafka/server/ReplicaManager.scala
index 89f2462..7823659 100644
--- a/core/src/main/scala/kafka/server/ReplicaManager.scala
+++ b/core/src/main/scala/kafka/server/ReplicaManager.scala
@@ -583,7 +583,7 @@ class ReplicaManager(val config: KafkaConfig,
     }
   }
 
-  def becomeLeaderOrFollower(leaderAndISRRequest: LeaderAndIsrRequest): 
BecomeLeaderOrFollowerResult = {
+  def becomeLeaderOrFollower(leaderAndISRRequest: LeaderAndIsrRequest, 
metadataCache: MetadataCache): BecomeLeaderOrFollowerResult = {
     leaderAndISRRequest.partitionStateInfos.foreach { case ((topic, 
partition), stateInfo) =>
       stateChangeLogger.trace("Broker %d received LeaderAndIsr request %s 
correlation id %d from controller %d epoch %d for partition [%s,%d]"
                                 .format(localBrokerId, stateInfo, 
leaderAndISRRequest.correlationId,
@@ -639,7 +639,7 @@ class ReplicaManager(val config: KafkaConfig,
         else
           Set.empty[Partition]
         val partitionsBecomeFollower = if (!partitionsToBeFollower.isEmpty)
-          makeFollowers(controllerId, controllerEpoch, partitionsToBeFollower, 
leaderAndISRRequest.leaders, leaderAndISRRequest.correlationId, responseMap)
+          makeFollowers(controllerId, controllerEpoch, partitionsToBeFollower, 
leaderAndISRRequest.correlationId, responseMap, metadataCache)
         else
           Set.empty[Partition]
 
@@ -731,9 +731,9 @@ class ReplicaManager(val config: KafkaConfig,
   private def makeFollowers(controllerId: Int,
                             epoch: Int,
                             partitionState: Map[Partition, PartitionStateInfo],
-                            leaders: Set[BrokerEndPoint],
                             correlationId: Int,
-                            responseMap: mutable.Map[(String, Int), Short]) : 
Set[Partition] = {
+                            responseMap: mutable.Map[(String, Int), Short],
+                            metadataCache: MetadataCache) : Set[Partition] = {
     partitionState.foreach { state =>
       stateChangeLogger.trace(("Broker %d handling LeaderAndIsr request 
correlationId %d from controller %d epoch %d " +
         "starting the become-follower transition for partition %s")
@@ -751,7 +751,7 @@ class ReplicaManager(val config: KafkaConfig,
       partitionState.foreach{ case (partition, partitionStateInfo) =>
         val leaderIsrAndControllerEpoch = 
partitionStateInfo.leaderIsrAndControllerEpoch
         val newLeaderBrokerId = leaderIsrAndControllerEpoch.leaderAndIsr.leader
-        leaders.find(_.id == newLeaderBrokerId) match {
+        metadataCache.getAliveBrokers.find(_.id == newLeaderBrokerId) match {
           // Only change partition state when the leader is available
           case Some(leaderBroker) =>
             if (partition.makeFollower(controllerId, partitionStateInfo, 
correlationId))
@@ -762,7 +762,7 @@ class ReplicaManager(val config: KafkaConfig,
                 .format(localBrokerId, correlationId, controllerId, 
leaderIsrAndControllerEpoch.controllerEpoch,
                 partition.topic, partition.partitionId, newLeaderBrokerId))
           case None =>
-            // The leader broker should always be present in the 
leaderAndIsrRequest.
+            // The leader broker should always be present in the metadata 
cache.
             // If not, we should record the error message and abort the 
transition process for this partition
             stateChangeLogger.error(("Broker %d received LeaderAndIsrRequest 
with correlation id %d from controller" +
               " %d epoch %d for partition [%s,%d] but cannot become follower 
since the new leader %d is unavailable.")
@@ -800,7 +800,7 @@ class ReplicaManager(val config: KafkaConfig,
         // we do not need to check if the leader exists again since this has 
been done at the beginning of this process
         val partitionsToMakeFollowerWithLeaderAndOffset = 
partitionsToMakeFollower.map(partition =>
           new TopicAndPartition(partition) -> BrokerAndInitialOffset(
-            leaders.find(_.id == partition.leaderReplicaIdOpt.get).get,
+            metadataCache.getAliveBrokers.find(_.id == 
partition.leaderReplicaIdOpt.get).get.getBrokerEndPoint(config.interBrokerSecurityProtocol),
             partition.getReplica().get.logEndOffset.messageOffset)).toMap
         
replicaFetcherManager.addFetcherForPartitions(partitionsToMakeFollowerWithLeaderAndOffset)
 

Reply via email to