Repository: kafka Updated Branches: refs/heads/0.10.0 c8ef1778c -> a1afbab3e
KAFKA-3670; ControlledShutdownLeaderSelector should pick the preferred replica as the new leader, if possible Author: Ismael Juma <[email protected]> Reviewers: Jun Rao <[email protected]> Closes #1338 from ijuma/kafka-3670-controlled-shutdown-leader-selector-preferred-replica (cherry picked from commit 51f7a35c929d9aa04d821098a2266902f9178d7c) Signed-off-by: Jun Rao <[email protected]> Project: http://git-wip-us.apache.org/repos/asf/kafka/repo Commit: http://git-wip-us.apache.org/repos/asf/kafka/commit/a1afbab3 Tree: http://git-wip-us.apache.org/repos/asf/kafka/tree/a1afbab3 Diff: http://git-wip-us.apache.org/repos/asf/kafka/diff/a1afbab3 Branch: refs/heads/0.10.0 Commit: a1afbab3edadeed96825deb512a7961fbce26e1f Parents: c8ef177 Author: Ismael Juma <[email protected]> Authored: Sun May 8 10:45:47 2016 -0700 Committer: Jun Rao <[email protected]> Committed: Sun May 8 10:46:05 2016 -0700 ---------------------------------------------------------------------- .../controller/PartitionLeaderSelector.scala | 9 +-- .../ControlledShutdownLeaderSelectorTest.scala | 73 ++++++++++++++++++++ 2 files changed, 76 insertions(+), 6 deletions(-) ---------------------------------------------------------------------- http://git-wip-us.apache.org/repos/asf/kafka/blob/a1afbab3/core/src/main/scala/kafka/controller/PartitionLeaderSelector.scala ---------------------------------------------------------------------- diff --git a/core/src/main/scala/kafka/controller/PartitionLeaderSelector.scala b/core/src/main/scala/kafka/controller/PartitionLeaderSelector.scala index 5eed382..9d8b0b6 100644 --- a/core/src/main/scala/kafka/controller/PartitionLeaderSelector.scala +++ b/core/src/main/scala/kafka/controller/PartitionLeaderSelector.scala @@ -185,13 +185,10 @@ class ControlledShutdownLeaderSelector(controllerContext: ControllerContext) val liveAssignedReplicas = assignedReplicas.filter(r => liveOrShuttingDownBrokerIds.contains(r)) val newIsr = currentLeaderAndIsr.isr.filter(brokerId => !controllerContext.shuttingDownBrokerIds.contains(brokerId)) - val newLeaderOpt = newIsr.headOption - newLeaderOpt match { + liveAssignedReplicas.filter(newIsr.contains).headOption match { case Some(newLeader) => - debug("Partition %s : current leader = %d, new leader = %d" - .format(topicAndPartition, currentLeader, newLeader)) - (LeaderAndIsr(newLeader, currentLeaderEpoch + 1, newIsr, currentLeaderIsrZkPathVersion + 1), - liveAssignedReplicas) + debug("Partition %s : current leader = %d, new leader = %d".format(topicAndPartition, currentLeader, newLeader)) + (LeaderAndIsr(newLeader, currentLeaderEpoch + 1, newIsr, currentLeaderIsrZkPathVersion + 1), liveAssignedReplicas) case None => throw new StateChangeFailedException(("No other replicas in ISR %s for %s besides" + " shutting down brokers %s").format(currentLeaderAndIsr.isr.mkString(","), topicAndPartition, controllerContext.shuttingDownBrokerIds.mkString(","))) http://git-wip-us.apache.org/repos/asf/kafka/blob/a1afbab3/core/src/test/scala/unit/kafka/server/ControlledShutdownLeaderSelectorTest.scala ---------------------------------------------------------------------- diff --git a/core/src/test/scala/unit/kafka/server/ControlledShutdownLeaderSelectorTest.scala b/core/src/test/scala/unit/kafka/server/ControlledShutdownLeaderSelectorTest.scala new file mode 100644 index 0000000..f032eb6 --- /dev/null +++ b/core/src/test/scala/unit/kafka/server/ControlledShutdownLeaderSelectorTest.scala @@ -0,0 +1,73 @@ +/** + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ +package kafka.server + +import kafka.api.LeaderAndIsr +import kafka.common.TopicAndPartition +import kafka.controller.{ControlledShutdownLeaderSelector, ControllerContext} +import org.easymock.EasyMock +import org.junit.{Assert, Test} +import Assert._ +import kafka.cluster.Broker +import kafka.utils.ZkUtils + +import scala.collection.mutable + +class ControlledShutdownLeaderSelectorTest { + + @Test + def testSelectLeader() { + val topicPartition = TopicAndPartition("topic", 1) + val assignment = Seq(6, 5, 4, 3, 2, 1) + val preferredReplicaId = assignment.head + + val firstIsr = List(1, 3, 6) + val firstLeader = 1 + + val zkUtils = EasyMock.mock(classOf[ZkUtils]) + val controllerContext = new ControllerContext(zkUtils, zkSessionTimeout = 1000) + controllerContext.liveBrokers = assignment.map(Broker(_, Map.empty, None)).toSet + controllerContext.shuttingDownBrokerIds = mutable.Set(2, 3) + controllerContext.partitionReplicaAssignment = mutable.Map(topicPartition -> assignment) + + val leaderSelector = new ControlledShutdownLeaderSelector(controllerContext) + val firstLeaderAndIsr = new LeaderAndIsr(firstLeader, firstIsr) + val (secondLeaderAndIsr, secondReplicas) = leaderSelector.selectLeader(topicPartition, firstLeaderAndIsr) + + assertEquals(preferredReplicaId, secondLeaderAndIsr.leader) + assertEquals(Seq(1, 6), secondLeaderAndIsr.isr) + assertEquals(1, secondLeaderAndIsr.zkVersion) + assertEquals(1, secondLeaderAndIsr.leaderEpoch) + assertEquals(assignment, secondReplicas) + + controllerContext.shuttingDownBrokerIds += preferredReplicaId + + val deadBrokerId = 2 + controllerContext.liveBrokers = controllerContext.liveOrShuttingDownBrokers.filter(_.id != deadBrokerId) + controllerContext.shuttingDownBrokerIds -= deadBrokerId + + val (thirdLeaderAndIsr, thirdReplicas) = leaderSelector.selectLeader(topicPartition, secondLeaderAndIsr) + + assertEquals(1, thirdLeaderAndIsr.leader) + assertEquals(Seq(1), thirdLeaderAndIsr.isr) + assertEquals(2, thirdLeaderAndIsr.zkVersion) + assertEquals(2, thirdLeaderAndIsr.leaderEpoch) + assertEquals(Seq(6, 5, 4, 3, 1), thirdReplicas) + + } + +}
