kafka-1503; all partitions are using same broker as their leader after broker 
is down;  patched by Jianwen Wang; reviewed by Guozhang Wang and Jun Rao


Project: http://git-wip-us.apache.org/repos/asf/kafka/repo
Commit: http://git-wip-us.apache.org/repos/asf/kafka/commit/2a4718c1
Tree: http://git-wip-us.apache.org/repos/asf/kafka/tree/2a4718c1
Diff: http://git-wip-us.apache.org/repos/asf/kafka/diff/2a4718c1

Branch: refs/heads/transactional_messaging
Commit: 2a4718c1a7e8e1566c5c87468779fdd1f95fe3bc
Parents: b8d87d0
Author: Jianwen Wang <[email protected]>
Authored: Wed Jul 2 21:54:42 2014 -0700
Committer: Jun Rao <[email protected]>
Committed: Wed Jul 2 21:54:42 2014 -0700

----------------------------------------------------------------------
 .../main/scala/kafka/controller/PartitionLeaderSelector.scala   | 5 +++--
 1 file changed, 3 insertions(+), 2 deletions(-)
----------------------------------------------------------------------


http://git-wip-us.apache.org/repos/asf/kafka/blob/2a4718c1/core/src/main/scala/kafka/controller/PartitionLeaderSelector.scala
----------------------------------------------------------------------
diff --git a/core/src/main/scala/kafka/controller/PartitionLeaderSelector.scala 
b/core/src/main/scala/kafka/controller/PartitionLeaderSelector.scala
index d3b25fa..4a31c72 100644
--- a/core/src/main/scala/kafka/controller/PartitionLeaderSelector.scala
+++ b/core/src/main/scala/kafka/controller/PartitionLeaderSelector.scala
@@ -83,7 +83,8 @@ class OfflinePartitionLeaderSelector(controllerContext: 
ControllerContext, confi
                 new LeaderAndIsr(newLeader, currentLeaderEpoch + 1, 
List(newLeader), currentLeaderIsrZkPathVersion + 1)
             }
           case false =>
-            val newLeader = liveBrokersInIsr.head
+            val liveReplicasInIsr = liveAssignedReplicas.filter(r => 
liveBrokersInIsr.contains(r))
+            val newLeader = liveReplicasInIsr.head
             debug("Some broker in ISR is alive for %s. Select %d from ISR %s 
to be the leader."
                   .format(topicAndPartition, newLeader, 
liveBrokersInIsr.mkString(",")))
             new LeaderAndIsr(newLeader, currentLeaderEpoch + 1, 
liveBrokersInIsr.toList, currentLeaderIsrZkPathVersion + 1)
@@ -210,4 +211,4 @@ class NoOpLeaderSelector(controllerContext: 
ControllerContext) extends Partition
     warn("I should never have been asked to perform leader election, returning 
the current LeaderAndIsr and replica assignment.")
     (currentLeaderAndIsr, 
controllerContext.partitionReplicaAssignment(topicAndPartition))
   }
-}
\ No newline at end of file
+}

Reply via email to