Updated Branches: refs/heads/trunk 3b470f56b -> 2cda5d1fc
kafka-813; Minor cleanup in Controller; patched by Swapnil Ghike; reviewed by Neha Narkhede and Jun Rao Project: http://git-wip-us.apache.org/repos/asf/kafka/repo Commit: http://git-wip-us.apache.org/repos/asf/kafka/commit/28ee7855 Tree: http://git-wip-us.apache.org/repos/asf/kafka/tree/28ee7855 Diff: http://git-wip-us.apache.org/repos/asf/kafka/diff/28ee7855 Branch: refs/heads/trunk Commit: 28ee7855360b8e6ea690fc802dac4eaa60ad81e2 Parents: 51421fc Author: Swapnil Ghike <sgh...@linkedin.com> Authored: Mon Mar 25 09:31:57 2013 -0700 Committer: Jun Rao <jun...@gmail.com> Committed: Mon Mar 25 09:31:57 2013 -0700 ---------------------------------------------------------------------- .../scala/kafka/api/LeaderAndIsrRequest.scala | 7 ++- .../common/PartitionOfflineException.scala | 28 ------------ .../controller/ControllerChannelManager.scala | 6 ++- .../kafka/controller/KafkaController.scala | 47 +++++++++++++------- .../controller/PartitionLeaderSelector.scala | 44 +++++++++++------- .../controller/PartitionStateMachine.scala | 28 +++++------- .../kafka/controller/ReplicaStateMachine.scala | 6 +-- .../main/scala/kafka/server/KafkaServer.scala | 2 +- .../test/scala/unit/kafka/admin/AdminTest.scala | 6 +-- .../api/RequestResponseSerializationTest.scala | 2 +- .../unit/kafka/server/LeaderElectionTest.scala | 3 +- 11 files changed, 87 insertions(+), 92 deletions(-) ---------------------------------------------------------------------- http://git-wip-us.apache.org/repos/asf/kafka/blob/28ee7855/core/src/main/scala/kafka/api/LeaderAndIsrRequest.scala ---------------------------------------------------------------------- diff --git a/core/src/main/scala/kafka/api/LeaderAndIsrRequest.scala b/core/src/main/scala/kafka/api/LeaderAndIsrRequest.scala index b40522d..3b7ee24 100644 --- a/core/src/main/scala/kafka/api/LeaderAndIsrRequest.scala +++ b/core/src/main/scala/kafka/api/LeaderAndIsrRequest.scala @@ -83,7 +83,6 @@ case class PartitionStateInfo(val leaderIsrAndControllerEpoch: LeaderIsrAndContr object LeaderAndIsrRequest { val CurrentVersion = 0.shortValue - val DefaultClientId = "" val IsInit: Boolean = true val NotInit: Boolean = false val DefaultAckTimeout: Int = 1000 @@ -126,9 +125,9 @@ case class LeaderAndIsrRequest (versionId: Short, extends RequestOrResponse(Some(RequestKeys.LeaderAndIsrKey), correlationId) { def this(partitionStateInfos: Map[(String, Int), PartitionStateInfo], liveBrokers: Set[Broker], controllerId: Int, - controllerEpoch: Int, correlationId: Int) = { - this(LeaderAndIsrRequest.CurrentVersion, correlationId, LeaderAndIsrRequest.DefaultClientId, LeaderAndIsrRequest.DefaultAckTimeout, - controllerId, controllerEpoch, partitionStateInfos, liveBrokers) + controllerEpoch: Int, correlationId: Int, clientId: String) = { + this(LeaderAndIsrRequest.CurrentVersion, correlationId, clientId, LeaderAndIsrRequest.DefaultAckTimeout, + controllerId, controllerEpoch, partitionStateInfos, liveBrokers) } def writeTo(buffer: ByteBuffer) { http://git-wip-us.apache.org/repos/asf/kafka/blob/28ee7855/core/src/main/scala/kafka/common/PartitionOfflineException.scala ---------------------------------------------------------------------- diff --git a/core/src/main/scala/kafka/common/PartitionOfflineException.scala b/core/src/main/scala/kafka/common/PartitionOfflineException.scala deleted file mode 100644 index 3367708..0000000 --- a/core/src/main/scala/kafka/common/PartitionOfflineException.scala +++ /dev/null @@ -1,28 +0,0 @@ -/** - * Licensed to the Apache Software Foundation (ASF) under one or more - * contributor license agreements. See the NOTICE file distributed with - * this work for additional information regarding copyright ownership. - * The ASF licenses this file to You under the Apache License, Version 2.0 - * (the "License"); you may not use this file except in compliance with - * the License. You may obtain a copy of the License at - * - * http://www.apache.org/licenses/LICENSE-2.0 - * - * Unless required by applicable law or agreed to in writing, software - * distributed under the License is distributed on an "AS IS" BASIS, - * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. - * See the License for the specific language governing permissions and - * limitations under the License. - */ - -package kafka.common - - -/** - * This exception is thrown by the leader elector in the controller when leader election fails for a partition since - * all the replicas for a partition are offline - */ -class PartitionOfflineException(message: String, cause: Throwable) extends RuntimeException(message, cause) { - def this(message: String) = this(message, null) - def this() = this(null, null) -} \ No newline at end of file http://git-wip-us.apache.org/repos/asf/kafka/blob/28ee7855/core/src/main/scala/kafka/controller/ControllerChannelManager.scala ---------------------------------------------------------------------- diff --git a/core/src/main/scala/kafka/controller/ControllerChannelManager.scala b/core/src/main/scala/kafka/controller/ControllerChannelManager.scala index 6e563d2..f7a7bd4 100644 --- a/core/src/main/scala/kafka/controller/ControllerChannelManager.scala +++ b/core/src/main/scala/kafka/controller/ControllerChannelManager.scala @@ -144,7 +144,8 @@ class RequestSendThread(val controllerId: Int, } } -class ControllerBrokerRequestBatch(sendRequest: (Int, RequestOrResponse, (RequestOrResponse) => Unit) => Unit, controllerId: Int) +class ControllerBrokerRequestBatch(sendRequest: (Int, RequestOrResponse, (RequestOrResponse) => Unit) => Unit, + controllerId: Int, clientId: String) extends Logging { val leaderAndIsrRequestMap = new mutable.HashMap[Int, mutable.HashMap[(String, Int), PartitionStateInfo]] val stopReplicaRequestMap = new mutable.HashMap[Int, Seq[(String, Int)]] @@ -190,7 +191,8 @@ class ControllerBrokerRequestBatch(sendRequest: (Int, RequestOrResponse, (Reques val partitionStateInfos = m._2.toMap val leaderIds = partitionStateInfos.map(_._2.leaderIsrAndControllerEpoch.leaderAndIsr.leader).toSet val leaders = liveBrokers.filter(b => leaderIds.contains(b.id)) - val leaderAndIsrRequest = new LeaderAndIsrRequest(partitionStateInfos, leaders, controllerId, controllerEpoch, correlationId) + val leaderAndIsrRequest = new LeaderAndIsrRequest(partitionStateInfos, leaders, controllerId, controllerEpoch, correlationId, + clientId) for (p <- partitionStateInfos) { val typeOfRequest = if (broker == p._2.leaderIsrAndControllerEpoch.leaderAndIsr.leader) "become-leader" else "become-follower" stateChangeLogger.trace(("Controller %d epoch %d sending %s LeaderAndIsr request with correlationId %d to broker %d " + http://git-wip-us.apache.org/repos/asf/kafka/blob/28ee7855/core/src/main/scala/kafka/controller/KafkaController.scala ---------------------------------------------------------------------- diff --git a/core/src/main/scala/kafka/controller/KafkaController.scala b/core/src/main/scala/kafka/controller/KafkaController.scala index 25a8cfe..6e07096 100644 --- a/core/src/main/scala/kafka/controller/KafkaController.scala +++ b/core/src/main/scala/kafka/controller/KafkaController.scala @@ -46,7 +46,7 @@ class ControllerContext(val zkClient: ZkClient, val correlationId: AtomicInteger = new AtomicInteger(0), var allTopics: Set[String] = Set.empty, var partitionReplicaAssignment: mutable.Map[TopicAndPartition, Seq[Int]] = mutable.Map.empty, - var allLeaders: mutable.Map[TopicAndPartition, LeaderIsrAndControllerEpoch] = mutable.Map.empty, + var partitionLeadershipInfo: mutable.Map[TopicAndPartition, LeaderIsrAndControllerEpoch] = mutable.Map.empty, var partitionsBeingReassigned: mutable.Map[TopicAndPartition, ReassignedPartitionsContext] = new mutable.HashMap, var partitionsUndergoingPreferredReplicaElection: mutable.Set[TopicAndPartition] = @@ -87,10 +87,11 @@ class KafkaController(val config : KafkaConfig, zkClient: ZkClient) extends Logg private val replicaStateMachine = new ReplicaStateMachine(this) private val controllerElector = new ZookeeperLeaderElector(controllerContext, ZkUtils.ControllerPath, onControllerFailover, config.brokerId) + val offlinePartitionSelector = new OfflinePartitionLeaderSelector(controllerContext) private val reassignedPartitionLeaderSelector = new ReassignedPartitionLeaderSelector(controllerContext) private val preferredReplicaPartitionLeaderSelector = new PreferredReplicaPartitionLeaderSelector(controllerContext) private val controlledShutdownPartitionLeaderSelector = new ControlledShutdownLeaderSelector(controllerContext) - private val brokerRequestBatch = new ControllerBrokerRequestBatch(sendRequest, config.brokerId) + private val brokerRequestBatch = new ControllerBrokerRequestBatch(sendRequest, this.config.brokerId, this.clientId) registerControllerChangedListener() newGauge( @@ -100,8 +101,21 @@ class KafkaController(val config : KafkaConfig, zkClient: ZkClient) extends Logg } ) + newGauge( + "OfflinePartitionsCount", + new Gauge[Int] { + def getValue: Int = { + controllerContext.controllerLock synchronized { + controllerContext.partitionLeadershipInfo.count(p => !controllerContext.liveBrokerIds.contains(p._2.leaderAndIsr.leader)) + } + } + } + ) + def epoch = controllerContext.epoch + def clientId = "id_%d-host_%s-port_%d".format(config.brokerId, config.hostName, config.port) + /** * JMX operation to initiate clean shutdown of a broker. On clean shutdown, * the controller first determines the partitions that the shutting down @@ -137,8 +151,8 @@ class KafkaController(val config : KafkaConfig, zkClient: ZkClient) extends Logg } def replicatedPartitionsBrokerLeads() = controllerContext.controllerLock.synchronized { - trace("All leaders = " + controllerContext.allLeaders.mkString(",")) - controllerContext.allLeaders.filter { + trace("All leaders = " + controllerContext.partitionLeadershipInfo.mkString(",")) + controllerContext.partitionLeadershipInfo.filter { case (topicAndPartition, leaderIsrAndControllerEpoch) => leaderIsrAndControllerEpoch.leaderAndIsr.leader == id && controllerContext.partitionReplicaAssignment(topicAndPartition).size > 1 }.map(_._1) @@ -151,11 +165,11 @@ class KafkaController(val config : KafkaConfig, zkClient: ZkClient) extends Logg val (topic, partition) = topicAndPartition.asTuple // move leadership serially to relinquish lock. controllerContext.controllerLock synchronized { - controllerContext.allLeaders.get(topicAndPartition).foreach { currLeaderIsrAndControllerEpoch => + controllerContext.partitionLeadershipInfo.get(topicAndPartition).foreach { currLeaderIsrAndControllerEpoch => if (currLeaderIsrAndControllerEpoch.leaderAndIsr.leader == id) { partitionStateMachine.handleStateChanges(Set(topicAndPartition), OnlinePartition, - controlledShutdownPartitionLeaderSelector) - val newLeaderIsrAndControllerEpoch = controllerContext.allLeaders(topicAndPartition) + controlledShutdownPartitionLeaderSelector) + val newLeaderIsrAndControllerEpoch = controllerContext.partitionLeadershipInfo(topicAndPartition) // mark replica offline only if leadership was moved successfully if (newLeaderIsrAndControllerEpoch.leaderAndIsr.leader != currLeaderIsrAndControllerEpoch.leaderAndIsr.leader) @@ -180,7 +194,7 @@ class KafkaController(val config : KafkaConfig, zkClient: ZkClient) extends Logg allPartitionsAndReplicationFactorOnBroker foreach { case(topicAndPartition, replicationFactor) => val (topic, partition) = topicAndPartition.asTuple - if (controllerContext.allLeaders(topicAndPartition).leaderAndIsr.leader != id) { + if (controllerContext.partitionLeadershipInfo(topicAndPartition).leaderAndIsr.leader != id) { brokerRequestBatch.addStopReplicaRequestForBrokers(Seq(id), topic, partition, deletePartition = false) removeReplicaFromIsr(topic, partition, id) match { case Some(updatedLeaderIsrAndControllerEpoch) => @@ -289,7 +303,7 @@ class KafkaController(val config : KafkaConfig, zkClient: ZkClient) extends Logg val deadBrokersSet = deadBrokers.toSet // trigger OfflinePartition state for all partitions whose current leader is one amongst the dead brokers - val partitionsWithoutLeader = controllerContext.allLeaders.filter(partitionAndLeader => + val partitionsWithoutLeader = controllerContext.partitionLeadershipInfo.filter(partitionAndLeader => deadBrokersSet.contains(partitionAndLeader._2.leaderAndIsr.leader)).keySet partitionStateMachine.handleStateChanges(partitionsWithoutLeader, OfflinePartition) // trigger OnlinePartition state changes for offline or new partitions @@ -321,7 +335,7 @@ class KafkaController(val config : KafkaConfig, zkClient: ZkClient) extends Logg info("New partition creation callback for %s".format(newPartitions.mkString(","))) partitionStateMachine.handleStateChanges(newPartitions, NewPartition) replicaStateMachine.handleStateChanges(getAllReplicasForPartition(newPartitions), NewReplica) - partitionStateMachine.handleStateChanges(newPartitions, OnlinePartition) + partitionStateMachine.handleStateChanges(newPartitions, OnlinePartition, offlinePartitionSelector) replicaStateMachine.handleStateChanges(getAllReplicasForPartition(newPartitions), OnlineReplica) } @@ -450,7 +464,7 @@ class KafkaController(val config : KafkaConfig, zkClient: ZkClient) extends Logg controllerContext.liveBrokers = ZkUtils.getAllBrokersInCluster(zkClient).toSet controllerContext.allTopics = ZkUtils.getAllTopics(zkClient).toSet controllerContext.partitionReplicaAssignment = ZkUtils.getReplicaAssignmentForTopics(zkClient, controllerContext.allTopics.toSeq) - controllerContext.allLeaders = new mutable.HashMap[TopicAndPartition, LeaderIsrAndControllerEpoch] + controllerContext.partitionLeadershipInfo = new mutable.HashMap[TopicAndPartition, LeaderIsrAndControllerEpoch] // update the leader and isr cache for all existing partitions from Zookeeper updateLeaderAndIsrCache() // start the channel manager @@ -482,7 +496,7 @@ class KafkaController(val config : KafkaConfig, zkClient: ZkClient) extends Logg val partitionsUndergoingPreferredReplicaElection = ZkUtils.getPartitionsUndergoingPreferredReplicaElection(zkClient) // check if they are already completed val partitionsThatCompletedPreferredReplicaElection = partitionsUndergoingPreferredReplicaElection.filter(partition => - controllerContext.allLeaders(partition).leaderAndIsr.leader == controllerContext.partitionReplicaAssignment(partition).head) + controllerContext.partitionLeadershipInfo(partition).leaderAndIsr.leader == controllerContext.partitionReplicaAssignment(partition).head) controllerContext.partitionsUndergoingPreferredReplicaElection ++= partitionsUndergoingPreferredReplicaElection controllerContext.partitionsUndergoingPreferredReplicaElection --= partitionsThatCompletedPreferredReplicaElection info("Partitions undergoing preferred replica election: %s".format(partitionsUndergoingPreferredReplicaElection.mkString(","))) @@ -502,7 +516,7 @@ class KafkaController(val config : KafkaConfig, zkClient: ZkClient) extends Logg // If the leader specified in the leaderAndIsr is no longer alive, there is no need to recover it controllerContext.liveBrokerIds.contains(leaderIsrAndControllerEpoch.leaderAndIsr.leader) match { case true => - controllerContext.allLeaders.put(topicPartition, leaderIsrAndControllerEpoch) + controllerContext.partitionLeadershipInfo.put(topicPartition, leaderIsrAndControllerEpoch) case false => debug("While refreshing controller's leader and isr cache, leader %d for ".format(leaderIsrAndControllerEpoch.leaderAndIsr.leader) + "partition %s is dead, just ignore it".format(topicPartition)) @@ -522,7 +536,7 @@ class KafkaController(val config : KafkaConfig, zkClient: ZkClient) extends Logg private def moveReassignedPartitionLeaderIfRequired(topicAndPartition: TopicAndPartition, reassignedPartitionContext: ReassignedPartitionsContext) { val reassignedReplicas = reassignedPartitionContext.newReplicas - val currentLeader = controllerContext.allLeaders(topicAndPartition).leaderAndIsr.leader + val currentLeader = controllerContext.partitionLeadershipInfo(topicAndPartition).leaderAndIsr.leader if(!reassignedPartitionContext.newReplicas.contains(currentLeader)) { info("Leader %s for partition %s being reassigned, ".format(currentLeader, topicAndPartition) + "is not in the new list of replicas %s. Re-electing leader".format(reassignedReplicas.mkString(","))) @@ -626,7 +640,7 @@ class KafkaController(val config : KafkaConfig, zkClient: ZkClient) extends Logg def removePartitionsFromPreferredReplicaElection(partitionsToBeRemoved: Set[TopicAndPartition]) { for(partition <- partitionsToBeRemoved) { // check the status - val currentLeader = controllerContext.allLeaders(partition).leaderAndIsr.leader + val currentLeader = controllerContext.partitionLeadershipInfo(partition).leaderAndIsr.leader val preferredReplica = controllerContext.partitionReplicaAssignment(partition).head if(currentLeader == preferredReplica) { info("Partition %s completed preferred replica leader election. New leader is %d".format(partition, preferredReplica)) @@ -965,7 +979,6 @@ case class PartitionAndReplica(topic: String, partition: Int, replica: Int) case class LeaderIsrAndControllerEpoch(val leaderAndIsr: LeaderAndIsr, controllerEpoch: Int) object ControllerStats extends KafkaMetricsGroup { - val offlinePartitionRate = newMeter("OfflinePartitionsPerSec", "partitions", TimeUnit.SECONDS) - val uncleanLeaderElectionRate = newMeter("UncleanLeaderElectionsPerSec", "elections", TimeUnit.SECONDS) + val uncleanLeaderElectionRate = newMeter("UncleanLeaderElectionsPerSec", "elections", TimeUnit.SECONDS) val leaderElectionTimer = new KafkaTimer(newTimer("LeaderElectionRateAndTimeMs", TimeUnit.MILLISECONDS, TimeUnit.SECONDS)) } http://git-wip-us.apache.org/repos/asf/kafka/blob/28ee7855/core/src/main/scala/kafka/controller/PartitionLeaderSelector.scala ---------------------------------------------------------------------- diff --git a/core/src/main/scala/kafka/controller/PartitionLeaderSelector.scala b/core/src/main/scala/kafka/controller/PartitionLeaderSelector.scala index 3ed9b7e..d295781 100644 --- a/core/src/main/scala/kafka/controller/PartitionLeaderSelector.scala +++ b/core/src/main/scala/kafka/controller/PartitionLeaderSelector.scala @@ -18,14 +18,14 @@ package kafka.controller import kafka.api.LeaderAndIsr import kafka.utils.Logging -import kafka.common.{TopicAndPartition, StateChangeFailedException, PartitionOfflineException} +import kafka.common.{TopicAndPartition, StateChangeFailedException, NoReplicaOnlineException} trait PartitionLeaderSelector { /** * @param topicAndPartition The topic and partition whose leader needs to be elected * @param currentLeaderAndIsr The current leader and isr of input partition read from zookeeper - * @throws PartitionOfflineException If no replica in the assigned replicas list is alive + * @throws NoReplicaOnlineException If no replica in the assigned replicas list is alive * @return The leader and isr request, with the newly selected leader info, to send to the brokers * Also, returns the list of replicas the returned leader and isr request should be sent to * This API selects a new leader for the input partition @@ -38,7 +38,7 @@ trait PartitionLeaderSelector { * This API selects a new leader for the input partition - * 1. If at least one broker from the isr is alive, it picks a broker from the isr as the new leader * 2. Else, it picks some alive broker from the assigned replica list as the new leader - * 3. If no broker in the assigned replica list is alive, it throws PartitionOfflineException + * 3. If no broker in the assigned replica list is alive, it throws NoReplicaOnlineException * Once the leader is successfully registered in zookeeper, it updates the allLeaders cache */ class OfflinePartitionLeaderSelector(controllerContext: ControllerContext) extends PartitionLeaderSelector with Logging { @@ -57,8 +57,7 @@ class OfflinePartitionLeaderSelector(controllerContext: ControllerContext) exten .format(liveAssignedReplicasToThisPartition.mkString(","))) liveAssignedReplicasToThisPartition.isEmpty match { case true => - ControllerStats.offlinePartitionRate.mark() - throw new PartitionOfflineException(("No replica for partition " + + throw new NoReplicaOnlineException(("No replica for partition " + "%s is alive. Live brokers are: [%s],".format(topicAndPartition, controllerContext.liveBrokerIds)) + " Assigned replicas are: [%s]".format(assignedReplicas)) case false => @@ -76,30 +75,31 @@ class OfflinePartitionLeaderSelector(controllerContext: ControllerContext) exten info("Selected new leader and ISR %s for offline partition %s".format(newLeaderAndIsr.toString(), topicAndPartition)) (newLeaderAndIsr, liveAssignedReplicasToThisPartition) case None => - ControllerStats.offlinePartitionRate.mark() - throw new PartitionOfflineException("Partition %s doesn't have".format(topicAndPartition) + "replicas assigned to it") + throw new NoReplicaOnlineException("Partition %s doesn't have".format(topicAndPartition) + "replicas assigned to it") } } } /** - * Picks one of the alive in-sync reassigned replicas as the new leader + * Picks one of the alive in-sync reassigned replicas as the new leader. */ class ReassignedPartitionLeaderSelector(controllerContext: ControllerContext) extends PartitionLeaderSelector with Logging { this.logIdent = "[ReassignedPartitionLeaderSelector]: " + /** + * The reassigned replicas are already in the ISR when selectLeader is called. + */ def selectLeader(topicAndPartition: TopicAndPartition, currentLeaderAndIsr: LeaderAndIsr): (LeaderAndIsr, Seq[Int]) = { - val reassignedReplicas = controllerContext.partitionsBeingReassigned(topicAndPartition).newReplicas + val reassignedInSyncReplicas = controllerContext.partitionsBeingReassigned(topicAndPartition).newReplicas val currentLeaderEpoch = currentLeaderAndIsr.leaderEpoch val currentLeaderIsrZkPathVersion = currentLeaderAndIsr.zkVersion - // pick any replica from the newly assigned replicas list that is in the ISR - val aliveReassignedReplicas = reassignedReplicas.filter(r => controllerContext.liveBrokerIds.contains(r)) - val newLeaderOpt = aliveReassignedReplicas.headOption + val aliveReassignedInSyncReplicas = reassignedInSyncReplicas.filter(r => controllerContext.liveBrokerIds.contains(r)) + val newLeaderOpt = aliveReassignedInSyncReplicas.headOption newLeaderOpt match { case Some(newLeader) => (new LeaderAndIsr(newLeader, currentLeaderEpoch + 1, currentLeaderAndIsr.isr, - currentLeaderIsrZkPathVersion + 1), reassignedReplicas) + currentLeaderIsrZkPathVersion + 1), reassignedInSyncReplicas) case None => - reassignedReplicas.size match { + reassignedInSyncReplicas.size match { case 0 => throw new StateChangeFailedException("List of reassigned replicas for partition " + " %s is empty. Current leader and ISR: [%s]".format(topicAndPartition, currentLeaderAndIsr)) @@ -124,7 +124,7 @@ with Logging { val assignedReplicas = controllerContext.partitionReplicaAssignment(topicAndPartition) val preferredReplica = assignedReplicas.head // check if preferred replica is the current leader - val currentLeader = controllerContext.allLeaders(topicAndPartition).leaderAndIsr.leader + val currentLeader = controllerContext.partitionLeadershipInfo(topicAndPartition).leaderAndIsr.leader if(currentLeader == preferredReplica) { throw new StateChangeFailedException("Preferred replica %d is already the current leader for partition %s" .format(preferredReplica, topicAndPartition)) @@ -177,6 +177,18 @@ class ControlledShutdownLeaderSelector(controllerContext: ControllerContext) " shutting down brokers %s").format(currentLeaderAndIsr.isr.mkString(","), topicAndPartition, currentLeader, controllerContext.shuttingDownBrokerIds.mkString(","))) } } - } +/** + * Essentially does nothing. Returns the current leader and ISR, and the current + * set of replicas assigned to a given topic/partition. + */ +class NoOpLeaderSelector(controllerContext: ControllerContext) extends PartitionLeaderSelector with Logging { + + this.logIdent = "[NoOpLeaderSelector]: " + + def selectLeader(topicAndPartition: TopicAndPartition, currentLeaderAndIsr: LeaderAndIsr): (LeaderAndIsr, Seq[Int]) = { + warn("I should never have been asked to perform leader election, returning the current LeaderAndIsr and replica assignment.") + (currentLeaderAndIsr, controllerContext.partitionReplicaAssignment(topicAndPartition)) + } +} \ No newline at end of file http://git-wip-us.apache.org/repos/asf/kafka/blob/28ee7855/core/src/main/scala/kafka/controller/PartitionStateMachine.scala ---------------------------------------------------------------------- diff --git a/core/src/main/scala/kafka/controller/PartitionStateMachine.scala b/core/src/main/scala/kafka/controller/PartitionStateMachine.scala index b25e9f4..654fa2e 100644 --- a/core/src/main/scala/kafka/controller/PartitionStateMachine.scala +++ b/core/src/main/scala/kafka/controller/PartitionStateMachine.scala @@ -20,7 +20,7 @@ import collection._ import collection.JavaConversions._ import java.util.concurrent.atomic.AtomicBoolean import kafka.api.LeaderAndIsr -import kafka.common.{TopicAndPartition, StateChangeFailedException, PartitionOfflineException} +import kafka.common.{TopicAndPartition, StateChangeFailedException, NoReplicaOnlineException} import kafka.utils.{Logging, ZkUtils} import org.I0Itec.zkclient.IZkChildListener import org.I0Itec.zkclient.exception.ZkNodeExistsException @@ -43,9 +43,9 @@ class PartitionStateMachine(controller: KafkaController) extends Logging { private val controllerId = controller.config.brokerId private val zkClient = controllerContext.zkClient var partitionState: mutable.Map[TopicAndPartition, PartitionState] = mutable.Map.empty - val brokerRequestBatch = new ControllerBrokerRequestBatch(controller.sendRequest, controllerId) - val offlinePartitionSelector = new OfflinePartitionLeaderSelector(controllerContext) + val brokerRequestBatch = new ControllerBrokerRequestBatch(controller.sendRequest, controllerId, controller.clientId) private val isShuttingDown = new AtomicBoolean(false) + private val noOpPartitionLeaderSelector = new NoOpLeaderSelector(controllerContext) this.logIdent = "[Partition state machine on Controller " + controllerId + "]: " private val stateChangeLogger = Logger.getLogger(KafkaController.stateChangeLogger) @@ -86,7 +86,7 @@ class PartitionStateMachine(controller: KafkaController) extends Logging { // try to move all partitions in NewPartition or OfflinePartition state to OnlinePartition state for((topicAndPartition, partitionState) <- partitionState) { if(partitionState.equals(OfflinePartition) || partitionState.equals(NewPartition)) - handleStateChange(topicAndPartition.topic, topicAndPartition.partition, OnlinePartition, offlinePartitionSelector) + handleStateChange(topicAndPartition.topic, topicAndPartition.partition, OnlinePartition, controller.offlinePartitionSelector) } brokerRequestBatch.sendRequestsToBrokers(controller.epoch, controllerContext.correlationId.getAndIncrement, controllerContext.liveBrokers) } catch { @@ -101,7 +101,7 @@ class PartitionStateMachine(controller: KafkaController) extends Logging { * @param targetState The state that the partitions should be moved to */ def handleStateChanges(partitions: Set[TopicAndPartition], targetState: PartitionState, - leaderSelector: PartitionLeaderSelector = offlinePartitionSelector) { + leaderSelector: PartitionLeaderSelector = noOpPartitionLeaderSelector) { info("Invoking state change to %s for partitions %s".format(targetState, partitions.mkString(","))) try { brokerRequestBatch.newBatch() @@ -111,6 +111,7 @@ class PartitionStateMachine(controller: KafkaController) extends Logging { brokerRequestBatch.sendRequestsToBrokers(controller.epoch, controllerContext.correlationId.getAndIncrement, controllerContext.liveBrokers) }catch { case e => error("Error while moving some partitions to %s state".format(targetState), e) + // TODO: It is not enough to bail out and log an error, it is important to trigger state changes for those partitions } } @@ -149,7 +150,7 @@ class PartitionStateMachine(controller: KafkaController) extends Logging { case _ => // should never come here since illegal previous states are checked above } partitionState.put(topicAndPartition, OnlinePartition) - val leader = controllerContext.allLeaders(topicAndPartition).leaderAndIsr.leader + val leader = controllerContext.partitionLeadershipInfo(topicAndPartition).leaderAndIsr.leader stateChangeLogger.trace("Controller %d epoch %d changed partition %s from %s to OnlinePartition with leader %d" .format(controllerId, controller.epoch, topicAndPartition, partitionState(topicAndPartition), leader)) // post: partition has a leader @@ -172,7 +173,7 @@ class PartitionStateMachine(controller: KafkaController) extends Logging { } catch { case t: Throwable => stateChangeLogger.error("Controller %d epoch %d initiated state change for partition %s from %s to %s failed" - .format(controllerId, controller.epoch, topicAndPartition, currState, targetState), t) + .format(controllerId, controller.epoch, topicAndPartition, currState, targetState), t) } } @@ -232,7 +233,6 @@ class PartitionStateMachine(controller: KafkaController) extends Logging { val liveAssignedReplicas = replicaAssignment.filter(r => controllerContext.liveBrokerIds.contains(r)) liveAssignedReplicas.size match { case 0 => - ControllerStats.offlinePartitionRate.mark() val failMsg = ("encountered error during state change of partition %s from New to Online, assigned replicas are [%s], " + "live brokers are [%s]. No assigned replica is alive.") .format(topicAndPartition, replicaAssignment.mkString(","), controllerContext.liveBrokerIds) @@ -253,14 +253,12 @@ class PartitionStateMachine(controller: KafkaController) extends Logging { // GC pause brokerRequestBatch.addLeaderAndIsrRequestForBrokers(liveAssignedReplicas, topicAndPartition.topic, topicAndPartition.partition, leaderIsrAndControllerEpoch, replicaAssignment.size) - controllerContext.allLeaders.put(topicAndPartition, leaderIsrAndControllerEpoch) - partitionState.put(topicAndPartition, OnlinePartition) + controllerContext.partitionLeadershipInfo.put(topicAndPartition, leaderIsrAndControllerEpoch) } catch { case e: ZkNodeExistsException => // read the controller epoch val leaderIsrAndEpoch = ZkUtils.getLeaderIsrAndEpochForPartition(zkClient, topicAndPartition.topic, topicAndPartition.partition).get - ControllerStats.offlinePartitionRate.mark() val failMsg = ("encountered error while changing partition %s's state from New to Online since LeaderAndIsr path already " + "exists with value %s and controller epoch %d") .format(topicAndPartition, leaderIsrAndEpoch.leaderAndIsr.toString(), leaderIsrAndEpoch.controllerEpoch) @@ -310,22 +308,20 @@ class PartitionStateMachine(controller: KafkaController) extends Logging { } val newLeaderIsrAndControllerEpoch = new LeaderIsrAndControllerEpoch(newLeaderAndIsr, controller.epoch) // update the leader cache - controllerContext.allLeaders.put(TopicAndPartition(topic, partition), newLeaderIsrAndControllerEpoch) + controllerContext.partitionLeadershipInfo.put(TopicAndPartition(topic, partition), newLeaderIsrAndControllerEpoch) stateChangeLogger.trace("Controller %d epoch %d elected leader %d for Offline partition %s" .format(controllerId, controller.epoch, newLeaderAndIsr.leader, topicAndPartition)) // store new leader and isr info in cache brokerRequestBatch.addLeaderAndIsrRequestForBrokers(replicasForThisPartition, topic, partition, newLeaderIsrAndControllerEpoch, controllerContext.partitionReplicaAssignment(TopicAndPartition(topic, partition)).size) } catch { - case poe: PartitionOfflineException => throw new PartitionOfflineException("All replicas %s for partition %s are dead." - .format(controllerContext.partitionReplicaAssignment(topicAndPartition).mkString(","), topicAndPartition) + - " Marking this partition offline", poe) + case nroe: NoReplicaOnlineException => throw nroe case sce => val failMsg = "encountered error while electing leader for partition %s due to: %s.".format(topicAndPartition, sce.getMessage) stateChangeLogger.error("Controller %d epoch %d ".format(controllerId, controller.epoch) + failMsg) throw new StateChangeFailedException(failMsg, sce) } - debug("After leader election, leader cache is updated to %s".format(controllerContext.allLeaders.map(l => (l._1, l._2)))) + debug("After leader election, leader cache is updated to %s".format(controllerContext.partitionLeadershipInfo.map(l => (l._1, l._2)))) } private def registerTopicChangeListener() = { http://git-wip-us.apache.org/repos/asf/kafka/blob/28ee7855/core/src/main/scala/kafka/controller/ReplicaStateMachine.scala ---------------------------------------------------------------------- diff --git a/core/src/main/scala/kafka/controller/ReplicaStateMachine.scala b/core/src/main/scala/kafka/controller/ReplicaStateMachine.scala index 88058ec..5146f12 100644 --- a/core/src/main/scala/kafka/controller/ReplicaStateMachine.scala +++ b/core/src/main/scala/kafka/controller/ReplicaStateMachine.scala @@ -42,7 +42,7 @@ class ReplicaStateMachine(controller: KafkaController) extends Logging { private val controllerId = controller.config.brokerId private val zkClient = controllerContext.zkClient var replicaState: mutable.Map[(String, Int, Int), ReplicaState] = mutable.Map.empty - val brokerRequestBatch = new ControllerBrokerRequestBatch(controller.sendRequest, controller.config.brokerId) + val brokerRequestBatch = new ControllerBrokerRequestBatch(controller.sendRequest, controllerId, controller.clientId) private val isShuttingDown = new AtomicBoolean(false) this.logIdent = "[Replica state machine on controller " + controller.config.brokerId + "]: " private val stateChangeLogger = Logger.getLogger(KafkaController.stateChangeLogger) @@ -143,7 +143,7 @@ class ReplicaStateMachine(controller: KafkaController) extends Logging { .format(controllerId, controller.epoch, replicaId, topicAndPartition)) case _ => // check if the leader for this partition is alive or even exists - controllerContext.allLeaders.get(topicAndPartition) match { + controllerContext.partitionLeadershipInfo.get(topicAndPartition) match { case Some(leaderIsrAndControllerEpoch) => controllerContext.liveBrokerIds.contains(leaderIsrAndControllerEpoch.leaderAndIsr.leader) match { case true => // leader is alive @@ -163,7 +163,7 @@ class ReplicaStateMachine(controller: KafkaController) extends Logging { assertValidPreviousStates(topic, partition, replicaId, List(NewReplica, OnlineReplica), targetState) // As an optimization, the controller removes dead replicas from the ISR val leaderAndIsrIsEmpty: Boolean = - controllerContext.allLeaders.get(topicAndPartition) match { + controllerContext.partitionLeadershipInfo.get(topicAndPartition) match { case Some(currLeaderIsrAndControllerEpoch) => if (currLeaderIsrAndControllerEpoch.leaderAndIsr.isr.contains(replicaId)) controller.removeReplicaFromIsr(topic, partition, replicaId) match { http://git-wip-us.apache.org/repos/asf/kafka/blob/28ee7855/core/src/main/scala/kafka/server/KafkaServer.scala ---------------------------------------------------------------------- diff --git a/core/src/main/scala/kafka/server/KafkaServer.scala b/core/src/main/scala/kafka/server/KafkaServer.scala index e7248c3..7298ccb 100644 --- a/core/src/main/scala/kafka/server/KafkaServer.scala +++ b/core/src/main/scala/kafka/server/KafkaServer.scala @@ -98,8 +98,8 @@ class KafkaServer(val config: KafkaConfig, time: Time = SystemTime) extends Logg */ private def registerStats() { BrokerTopicStats.getBrokerAllTopicsStats() - ControllerStats.offlinePartitionRate ControllerStats.uncleanLeaderElectionRate + ControllerStats.leaderElectionTimer } /** http://git-wip-us.apache.org/repos/asf/kafka/blob/28ee7855/core/src/test/scala/unit/kafka/admin/AdminTest.scala ---------------------------------------------------------------------- diff --git a/core/src/test/scala/unit/kafka/admin/AdminTest.scala b/core/src/test/scala/unit/kafka/admin/AdminTest.scala index 4dd1ba7..6c80c4c 100644 --- a/core/src/test/scala/unit/kafka/admin/AdminTest.scala +++ b/core/src/test/scala/unit/kafka/admin/AdminTest.scala @@ -381,7 +381,7 @@ class AdminTest extends JUnit3Suite with ZooKeeperTestHarness with Logging { var leaderAfterShutdown = topicMetadata.partitionsMetadata.head.leader.get.id assertTrue(leaderAfterShutdown != leaderBeforeShutdown) // assertEquals(2, topicMetadata.partitionsMetadata.head.isr.size) - assertEquals(2, controller.controllerContext.allLeaders(TopicAndPartition("test", 1)).leaderAndIsr.isr.size) + assertEquals(2, controller.controllerContext.partitionLeadershipInfo(TopicAndPartition("test", 1)).leaderAndIsr.isr.size) leaderBeforeShutdown = leaderAfterShutdown controllerId = ZkUtils.getController(zkClient) @@ -392,7 +392,7 @@ class AdminTest extends JUnit3Suite with ZooKeeperTestHarness with Logging { leaderAfterShutdown = topicMetadata.partitionsMetadata.head.leader.get.id assertTrue(leaderAfterShutdown != leaderBeforeShutdown) // assertEquals(1, topicMetadata.partitionsMetadata.head.isr.size) - assertEquals(1, controller.controllerContext.allLeaders(TopicAndPartition("test", 1)).leaderAndIsr.isr.size) + assertEquals(1, controller.controllerContext.partitionLeadershipInfo(TopicAndPartition("test", 1)).leaderAndIsr.isr.size) leaderBeforeShutdown = leaderAfterShutdown controllerId = ZkUtils.getController(zkClient) @@ -402,7 +402,7 @@ class AdminTest extends JUnit3Suite with ZooKeeperTestHarness with Logging { topicMetadata = AdminUtils.fetchTopicMetadataFromZk(topic, zkClient) leaderAfterShutdown = topicMetadata.partitionsMetadata.head.leader.get.id assertTrue(leaderAfterShutdown == leaderBeforeShutdown) - assertEquals(1, controller.controllerContext.allLeaders(TopicAndPartition("test", 1)).leaderAndIsr.isr.size) + assertEquals(1, controller.controllerContext.partitionLeadershipInfo(TopicAndPartition("test", 1)).leaderAndIsr.isr.size) } finally { servers.foreach(_.shutdown()) http://git-wip-us.apache.org/repos/asf/kafka/blob/28ee7855/core/src/test/scala/unit/kafka/api/RequestResponseSerializationTest.scala ---------------------------------------------------------------------- diff --git a/core/src/test/scala/unit/kafka/api/RequestResponseSerializationTest.scala b/core/src/test/scala/unit/kafka/api/RequestResponseSerializationTest.scala index 02ff81f..0f15718 100644 --- a/core/src/test/scala/unit/kafka/api/RequestResponseSerializationTest.scala +++ b/core/src/test/scala/unit/kafka/api/RequestResponseSerializationTest.scala @@ -89,7 +89,7 @@ object SerializationTestUtils{ val leaderAndIsr2 = new LeaderIsrAndControllerEpoch(new LeaderAndIsr(leader2, 1, isr2, 2), 1) val map = Map(((topic1, 0), PartitionStateInfo(leaderAndIsr1, 3)), ((topic2, 0), PartitionStateInfo(leaderAndIsr2, 3))) - new LeaderAndIsrRequest(map.toMap, collection.immutable.Set[Broker](), 0, 1, 0) + new LeaderAndIsrRequest(map.toMap, collection.immutable.Set[Broker](), 0, 1, 0, "") } def createTestLeaderAndIsrResponse() : LeaderAndIsrResponse = { http://git-wip-us.apache.org/repos/asf/kafka/blob/28ee7855/core/src/test/scala/unit/kafka/server/LeaderElectionTest.scala ---------------------------------------------------------------------- diff --git a/core/src/test/scala/unit/kafka/server/LeaderElectionTest.scala b/core/src/test/scala/unit/kafka/server/LeaderElectionTest.scala index ec1db2d..8f88177 100644 --- a/core/src/test/scala/unit/kafka/server/LeaderElectionTest.scala +++ b/core/src/test/scala/unit/kafka/server/LeaderElectionTest.scala @@ -133,7 +133,8 @@ class LeaderElectionTest extends JUnit3Suite with ZooKeeperTestHarness { leaderAndIsr.put((topic, partitionId), new LeaderIsrAndControllerEpoch(new LeaderAndIsr(brokerId2, List(brokerId1, brokerId2)), 2)) val partitionStateInfo = leaderAndIsr.mapValues(l => new PartitionStateInfo(l, 1)).toMap - val leaderAndIsrRequest = new LeaderAndIsrRequest(partitionStateInfo, brokers.toSet, controllerId, staleControllerEpoch, 0) + val leaderAndIsrRequest = new LeaderAndIsrRequest(partitionStateInfo, brokers.toSet, controllerId, + staleControllerEpoch, 0, "") controllerChannelManager.sendRequest(brokerId2, leaderAndIsrRequest, staleControllerEpochCallback) TestUtils.waitUntilTrue(() => staleControllerEpochDetected == true, 1000)