Repository: kafka Updated Branches: refs/heads/trunk a071e3554 -> 60a5a523b
KAFKA-3085; BrokerChangeListener computes inconsistent live/dead broker list. Follow up PR as per comments in the ticket. junrao It should be correct now as `curBrokers` included only live brokers and live/dead brokers are computed based on it. Could you take a look when you have time? Author: David Jacot <david.ja...@gmail.com> Reviewers: Jun Rao <jun...@gmail.com> Closes #756 from dajac/KAFKA-3085 Project: http://git-wip-us.apache.org/repos/asf/kafka/repo Commit: http://git-wip-us.apache.org/repos/asf/kafka/commit/60a5a523 Tree: http://git-wip-us.apache.org/repos/asf/kafka/tree/60a5a523 Diff: http://git-wip-us.apache.org/repos/asf/kafka/diff/60a5a523 Branch: refs/heads/trunk Commit: 60a5a523b6f7e5151d8cdc835845cfc4d2e72b52 Parents: a071e35 Author: David Jacot <david.ja...@gmail.com> Authored: Sun Jan 17 21:22:36 2016 -0800 Committer: Jun Rao <jun...@gmail.com> Committed: Sun Jan 17 21:22:36 2016 -0800 ---------------------------------------------------------------------- .../src/main/scala/kafka/controller/ReplicaStateMachine.scala | 7 +++---- 1 file changed, 3 insertions(+), 4 deletions(-) ---------------------------------------------------------------------- http://git-wip-us.apache.org/repos/asf/kafka/blob/60a5a523/core/src/main/scala/kafka/controller/ReplicaStateMachine.scala ---------------------------------------------------------------------- diff --git a/core/src/main/scala/kafka/controller/ReplicaStateMachine.scala b/core/src/main/scala/kafka/controller/ReplicaStateMachine.scala index 8eba704..7ebece7 100755 --- a/core/src/main/scala/kafka/controller/ReplicaStateMachine.scala +++ b/core/src/main/scala/kafka/controller/ReplicaStateMachine.scala @@ -357,13 +357,12 @@ class ReplicaStateMachine(controller: KafkaController) extends Logging { if (hasStarted.get) { ControllerStats.leaderElectionTimer.time { try { + val curBrokers = currentBrokerList.map(_.toInt).toSet.flatMap(zkUtils.getBrokerInfo) + val curBrokerIds = curBrokers.map(_.id) val liveOrShuttingDownBrokerIds = controllerContext.liveOrShuttingDownBrokerIds - val curBrokerIds = currentBrokerList.map(_.toInt).toSet val newBrokerIds = curBrokerIds -- liveOrShuttingDownBrokerIds val deadBrokerIds = liveOrShuttingDownBrokerIds -- curBrokerIds - val curBrokers = curBrokerIds.flatMap(zkUtils.getBrokerInfo) - val brokerById = curBrokers.map(broker => broker.id -> broker).toMap - val newBrokers = newBrokerIds.flatMap(brokerById.get) + val newBrokers = curBrokers.filter(broker => newBrokerIds(broker.id)) controllerContext.liveBrokers = curBrokers info("Newly added brokers: %s, deleted brokers: %s, all live brokers: %s" .format(newBrokerIds.mkString(","), deadBrokerIds.mkString(","), controllerContext.liveBrokerIds.mkString(",")))