kafka-1503; all partitions are using same broker as their leader after broker is down; patched by Jianwen Wang; reviewed by Guozhang Wang and Jun Rao
Project: http://git-wip-us.apache.org/repos/asf/kafka/repo Commit: http://git-wip-us.apache.org/repos/asf/kafka/commit/2a4718c1 Tree: http://git-wip-us.apache.org/repos/asf/kafka/tree/2a4718c1 Diff: http://git-wip-us.apache.org/repos/asf/kafka/diff/2a4718c1 Branch: refs/heads/transactional_messaging Commit: 2a4718c1a7e8e1566c5c87468779fdd1f95fe3bc Parents: b8d87d0 Author: Jianwen Wang <[email protected]> Authored: Wed Jul 2 21:54:42 2014 -0700 Committer: Jun Rao <[email protected]> Committed: Wed Jul 2 21:54:42 2014 -0700 ---------------------------------------------------------------------- .../main/scala/kafka/controller/PartitionLeaderSelector.scala | 5 +++-- 1 file changed, 3 insertions(+), 2 deletions(-) ---------------------------------------------------------------------- http://git-wip-us.apache.org/repos/asf/kafka/blob/2a4718c1/core/src/main/scala/kafka/controller/PartitionLeaderSelector.scala ---------------------------------------------------------------------- diff --git a/core/src/main/scala/kafka/controller/PartitionLeaderSelector.scala b/core/src/main/scala/kafka/controller/PartitionLeaderSelector.scala index d3b25fa..4a31c72 100644 --- a/core/src/main/scala/kafka/controller/PartitionLeaderSelector.scala +++ b/core/src/main/scala/kafka/controller/PartitionLeaderSelector.scala @@ -83,7 +83,8 @@ class OfflinePartitionLeaderSelector(controllerContext: ControllerContext, confi new LeaderAndIsr(newLeader, currentLeaderEpoch + 1, List(newLeader), currentLeaderIsrZkPathVersion + 1) } case false => - val newLeader = liveBrokersInIsr.head + val liveReplicasInIsr = liveAssignedReplicas.filter(r => liveBrokersInIsr.contains(r)) + val newLeader = liveReplicasInIsr.head debug("Some broker in ISR is alive for %s. Select %d from ISR %s to be the leader." .format(topicAndPartition, newLeader, liveBrokersInIsr.mkString(","))) new LeaderAndIsr(newLeader, currentLeaderEpoch + 1, liveBrokersInIsr.toList, currentLeaderIsrZkPathVersion + 1) @@ -210,4 +211,4 @@ class NoOpLeaderSelector(controllerContext: ControllerContext) extends Partition warn("I should never have been asked to perform leader election, returning the current LeaderAndIsr and replica assignment.") (currentLeaderAndIsr, controllerContext.partitionReplicaAssignment(topicAndPartition)) } -} \ No newline at end of file +}
