Repository: kafka Updated Branches: refs/heads/0.11.0 69970630b -> 999c247e0
KAFKA-5135; Controller Health Metrics (KIP-143) Author: Ismael Juma <[email protected]> Reviewers: Jun Rao <[email protected]>, Onur Karaman <[email protected]> Closes #2983 from ijuma/kafka-5135-controller-health-metrics-kip-143 (cherry picked from commit 516d8457d8142111a91af94cab918c84990685da) Signed-off-by: Ismael Juma <[email protected]> Project: http://git-wip-us.apache.org/repos/asf/kafka/repo Commit: http://git-wip-us.apache.org/repos/asf/kafka/commit/999c247e Tree: http://git-wip-us.apache.org/repos/asf/kafka/tree/999c247e Diff: http://git-wip-us.apache.org/repos/asf/kafka/diff/999c247e Branch: refs/heads/0.11.0 Commit: 999c247e076cbf7e84141533c5a9042a844d57ae Parents: 6997063 Author: Ismael Juma <[email protected]> Authored: Wed May 24 00:36:52 2017 +0100 Committer: Ismael Juma <[email protected]> Committed: Wed May 24 00:38:29 2017 +0100 ---------------------------------------------------------------------- .../main/scala/kafka/cluster/Partition.scala | 3 +- .../controller/ControllerChannelManager.scala | 37 +- .../controller/ControllerEventManager.scala | 65 +++ .../kafka/controller/ControllerState.scala | 83 ++++ .../kafka/controller/KafkaController.scala | 456 ++++++++++--------- .../controller/PartitionLeaderSelector.scala | 2 +- .../kafka/controller/TopicDeletionManager.scala | 7 +- .../main/scala/kafka/metrics/KafkaTimer.scala | 8 +- .../scala/kafka/server/ReplicaManager.scala | 5 +- .../controller/ControllerEventManagerTest.scala | 94 ++++ .../controller/ControllerIntegrationTest.scala | 20 +- ...MetricsDuringTopicCreationDeletionTest.scala | 10 +- 12 files changed, 549 insertions(+), 241 deletions(-) ---------------------------------------------------------------------- http://git-wip-us.apache.org/repos/asf/kafka/blob/999c247e/core/src/main/scala/kafka/cluster/Partition.scala ---------------------------------------------------------------------- diff --git a/core/src/main/scala/kafka/cluster/Partition.scala b/core/src/main/scala/kafka/cluster/Partition.scala index f123a16..e617404 100755 --- a/core/src/main/scala/kafka/cluster/Partition.scala +++ b/core/src/main/scala/kafka/cluster/Partition.scala @@ -528,7 +528,7 @@ class Partition(val topic: String, } private def updateIsr(newIsr: Set[Replica]) { - val newLeaderAndIsr = new LeaderAndIsr(localBrokerId, leaderEpoch, newIsr.map(r => r.brokerId).toList, zkVersion) + val newLeaderAndIsr = new LeaderAndIsr(localBrokerId, leaderEpoch, newIsr.map(_.brokerId).toList, zkVersion) val (updateSucceeded,newVersion) = ReplicationUtils.updateLeaderAndIsr(zkUtils, topic, partitionId, newLeaderAndIsr, controllerEpoch, zkVersion) @@ -538,6 +538,7 @@ class Partition(val topic: String, zkVersion = newVersion trace("ISR updated to [%s] and zkVersion updated to [%d]".format(newIsr.mkString(","), zkVersion)) } else { + replicaManager.failedIsrUpdatesRate.mark() info("Cached zkVersion [%d] not equal to that in zookeeper, skip updating ISR".format(zkVersion)) } } http://git-wip-us.apache.org/repos/asf/kafka/blob/999c247e/core/src/main/scala/kafka/controller/ControllerChannelManager.scala ---------------------------------------------------------------------- diff --git a/core/src/main/scala/kafka/controller/ControllerChannelManager.scala b/core/src/main/scala/kafka/controller/ControllerChannelManager.scala index 41b9549..ea8d13b 100755 --- a/core/src/main/scala/kafka/controller/ControllerChannelManager.scala +++ b/core/src/main/scala/kafka/controller/ControllerChannelManager.scala @@ -19,9 +19,11 @@ package kafka.controller import java.net.SocketTimeoutException import java.util.concurrent.{BlockingQueue, LinkedBlockingQueue} +import com.yammer.metrics.core.Gauge import kafka.api._ import kafka.cluster.Broker import kafka.common.{KafkaException, TopicAndPartition} +import kafka.metrics.KafkaMetricsGroup import kafka.server.KafkaConfig import kafka.utils._ import org.apache.kafka.clients._ @@ -38,11 +40,26 @@ import scala.collection.JavaConverters._ import scala.collection.mutable.HashMap import scala.collection.{Set, mutable} -class ControllerChannelManager(controllerContext: ControllerContext, config: KafkaConfig, time: Time, metrics: Metrics, threadNamePrefix: Option[String] = None) extends Logging { +object ControllerChannelManager { + val QueueSizeMetricName = "QueueSize" +} + +class ControllerChannelManager(controllerContext: ControllerContext, config: KafkaConfig, time: Time, metrics: Metrics, + threadNamePrefix: Option[String] = None) extends Logging with KafkaMetricsGroup { + import ControllerChannelManager._ protected val brokerStateInfo = new HashMap[Int, ControllerBrokerStateInfo] private val brokerLock = new Object this.logIdent = "[Channel manager on controller " + config.brokerId + "]: " + newGauge( + "TotalQueueSize", + new Gauge[Int] { + def value: Int = brokerLock synchronized { + brokerStateInfo.values.iterator.map(_.messageQueue.size).sum + } + } + ) + controllerContext.liveBrokers.foreach(addNewBroker) def startup() = { @@ -133,9 +150,21 @@ class ControllerChannelManager(controllerContext: ControllerContext, config: Kaf val requestThread = new RequestSendThread(config.brokerId, controllerContext, messageQueue, networkClient, brokerNode, config, time, threadName) requestThread.setDaemon(false) - brokerStateInfo.put(broker.id, new ControllerBrokerStateInfo(networkClient, brokerNode, messageQueue, requestThread)) + + val queueSizeGauge = newGauge( + QueueSizeMetricName, + new Gauge[Int] { + def value: Int = messageQueue.size + }, + queueSizeTags(broker.id) + ) + + brokerStateInfo.put(broker.id, new ControllerBrokerStateInfo(networkClient, brokerNode, messageQueue, + requestThread, queueSizeGauge)) } + private def queueSizeTags(brokerId: Int) = Map("broker-id" -> brokerId.toString) + private def removeExistingBroker(brokerState: ControllerBrokerStateInfo) { try { // Shutdown the RequestSendThread before closing the NetworkClient to avoid the concurrent use of the @@ -145,6 +174,7 @@ class ControllerChannelManager(controllerContext: ControllerContext, config: Kaf brokerState.requestSendThread.shutdown() brokerState.networkClient.close() brokerState.messageQueue.clear() + removeMetric(QueueSizeMetricName, queueSizeTags(brokerState.brokerNode.id)) brokerStateInfo.remove(brokerState.brokerNode.id) } catch { case e: Throwable => error("Error while removing broker by the controller", e) @@ -465,7 +495,8 @@ class ControllerBrokerRequestBatch(controller: KafkaController) extends Logging case class ControllerBrokerStateInfo(networkClient: NetworkClient, brokerNode: Node, messageQueue: BlockingQueue[QueueItem], - requestSendThread: RequestSendThread) + requestSendThread: RequestSendThread, + queueSizeGauge: Gauge[Int]) case class StopReplicaRequestInfo(replica: PartitionAndReplica, deletePartition: Boolean, callback: AbstractResponse => Unit = null) http://git-wip-us.apache.org/repos/asf/kafka/blob/999c247e/core/src/main/scala/kafka/controller/ControllerEventManager.scala ---------------------------------------------------------------------- diff --git a/core/src/main/scala/kafka/controller/ControllerEventManager.scala b/core/src/main/scala/kafka/controller/ControllerEventManager.scala new file mode 100644 index 0000000..3c0da23 --- /dev/null +++ b/core/src/main/scala/kafka/controller/ControllerEventManager.scala @@ -0,0 +1,65 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package kafka.controller + +import java.util.concurrent.LinkedBlockingQueue + +import scala.collection._ + +import kafka.metrics.KafkaTimer +import kafka.utils.ShutdownableThread + +class ControllerEventManager(rateAndTimeMetrics: Map[ControllerState, KafkaTimer], + eventProcessedListener: ControllerEvent => Unit) { + + @volatile private var _state: ControllerState = ControllerState.Idle + + private val queue = new LinkedBlockingQueue[ControllerEvent] + private val thread = new ControllerEventThread("controller-event-thread") + + def state: ControllerState = _state + + def start(): Unit = thread.start() + + def close(): Unit = thread.shutdown() + + def put(event: ControllerEvent): Unit = queue.put(event) + + class ControllerEventThread(name: String) extends ShutdownableThread(name = name) { + override def doWork(): Unit = { + val controllerEvent = queue.take() + _state = controllerEvent.state + + try { + rateAndTimeMetrics(state).time { + controllerEvent.process() + } + } catch { + case e: Throwable => error(s"Error processing event $controllerEvent", e) + } + + try eventProcessedListener(controllerEvent) + catch { + case e: Throwable => error(s"Error while invoking listener for processed event $controllerEvent", e) + } + + _state = ControllerState.Idle + } + } + +} http://git-wip-us.apache.org/repos/asf/kafka/blob/999c247e/core/src/main/scala/kafka/controller/ControllerState.scala ---------------------------------------------------------------------- diff --git a/core/src/main/scala/kafka/controller/ControllerState.scala b/core/src/main/scala/kafka/controller/ControllerState.scala new file mode 100644 index 0000000..2f690bb --- /dev/null +++ b/core/src/main/scala/kafka/controller/ControllerState.scala @@ -0,0 +1,83 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package kafka.controller + +import scala.collection.Seq + +sealed abstract class ControllerState { + + def value: Byte + + def rateAndTimeMetricName: Option[String] = + if (hasRateAndTimeMetric) Some(s"${toString}RateAndTimeMs") else None + + protected def hasRateAndTimeMetric: Boolean = true +} + +object ControllerState { + + // Note: `rateAndTimeMetricName` is based on the case object name by default. Changing a name is a breaking change + // unless `rateAndTimeMetricName` is overridden. + + case object Idle extends ControllerState { + def value = 0 + override protected def hasRateAndTimeMetric: Boolean = false + } + + case object ControllerChange extends ControllerState { + def value = 1 + } + + case object BrokerChange extends ControllerState { + def value = 2 + // The LeaderElectionRateAndTimeMs metric existed before `ControllerState` was introduced and we keep the name + // for backwards compatibility. The alternative would be to have the same metric under two different names. + override def rateAndTimeMetricName = Some("LeaderElectionRateAndTimeMs") + } + + case object TopicChange extends ControllerState { + def value = 3 + } + + case object TopicDeletion extends ControllerState { + def value = 4 + } + + case object PartitionReassignment extends ControllerState { + def value = 5 + } + + case object AutoLeaderBalance extends ControllerState { + def value = 6 + } + + case object ManualLeaderBalance extends ControllerState { + def value = 7 + } + + case object ControlledShutdown extends ControllerState { + def value = 8 + } + + case object IsrChange extends ControllerState { + def value = 9 + } + + val values: Seq[ControllerState] = Seq(Idle, ControllerChange, BrokerChange, TopicChange, TopicDeletion, + PartitionReassignment, AutoLeaderBalance, ManualLeaderBalance, ControlledShutdown, IsrChange) +} http://git-wip-us.apache.org/repos/asf/kafka/blob/999c247e/core/src/main/scala/kafka/controller/KafkaController.scala ---------------------------------------------------------------------- diff --git a/core/src/main/scala/kafka/controller/KafkaController.scala b/core/src/main/scala/kafka/controller/KafkaController.scala index 69669cd..dbce485 100644 --- a/core/src/main/scala/kafka/controller/KafkaController.scala +++ b/core/src/main/scala/kafka/controller/KafkaController.scala @@ -16,10 +16,9 @@ */ package kafka.controller -import java.util.concurrent.atomic.AtomicInteger -import java.util.concurrent.{LinkedBlockingQueue, TimeUnit} +import java.util.concurrent.TimeUnit -import com.yammer.metrics.core.{Gauge, Meter} +import com.yammer.metrics.core.Gauge import kafka.admin.{AdminUtils, PreferredReplicaLeaderElectionCommand} import kafka.api._ import kafka.cluster.Broker @@ -42,7 +41,7 @@ import scala.collection._ import scala.util.Try class ControllerContext(val zkUtils: ZkUtils) { - val controllerStats = new ControllerStats + val stats = new ControllerStats var controllerChannelManager: ControllerChannelManager = null @@ -158,55 +157,59 @@ class KafkaController(val config: KafkaConfig, zkUtils: ZkUtils, val brokerState // kafka server private val kafkaScheduler = new KafkaScheduler(1) - val topicDeletionManager = new TopicDeletionManager(this) + private val eventManager = new ControllerEventManager(controllerContext.stats.rateAndTimeMetrics, _ => updateMetrics()) + + val topicDeletionManager = new TopicDeletionManager(this, eventManager) val offlinePartitionSelector = new OfflinePartitionLeaderSelector(controllerContext, config) private val reassignedPartitionLeaderSelector = new ReassignedPartitionLeaderSelector(controllerContext) private val preferredReplicaPartitionLeaderSelector = new PreferredReplicaPartitionLeaderSelector(controllerContext) private val controlledShutdownPartitionLeaderSelector = new ControlledShutdownLeaderSelector(controllerContext) private val brokerRequestBatch = new ControllerBrokerRequestBatch(this) - private val controllerEventQueue = new LinkedBlockingQueue[ControllerEvent] - private val controllerEventThread = new ControllerEventThread("controller-event-thread") - - private val brokerChangeListener = new BrokerChangeListener(this) - private val topicChangeListener = new TopicChangeListener(this) - private val topicDeletionListener = new TopicDeletionListener(this) + private val brokerChangeListener = new BrokerChangeListener(this, eventManager) + private val topicChangeListener = new TopicChangeListener(this, eventManager) + private val topicDeletionListener = new TopicDeletionListener(this, eventManager) private val partitionModificationsListeners: mutable.Map[String, PartitionModificationsListener] = mutable.Map.empty - private val partitionReassignmentListener = new PartitionReassignmentListener(this) - private val preferredReplicaElectionListener = new PreferredReplicaElectionListener(this) - private val isrChangeNotificationListener = new IsrChangeNotificationListener(this) + private val partitionReassignmentListener = new PartitionReassignmentListener(this, eventManager) + private val preferredReplicaElectionListener = new PreferredReplicaElectionListener(this, eventManager) + private val isrChangeNotificationListener = new IsrChangeNotificationListener(this, eventManager) - private val activeControllerId = new AtomicInteger(-1) - private val offlinePartitionCount = new AtomicInteger(0) - private val preferredReplicaImbalanceCount = new AtomicInteger(0) + @volatile private var activeControllerId = -1 + @volatile private var offlinePartitionCount = 0 + @volatile private var preferredReplicaImbalanceCount = 0 newGauge( "ActiveControllerCount", new Gauge[Int] { - def value() = if (isActive) 1 else 0 + def value = if (isActive) 1 else 0 } ) newGauge( "OfflinePartitionsCount", new Gauge[Int] { - def value(): Int = { - offlinePartitionCount.get() - } + def value: Int = offlinePartitionCount } ) newGauge( "PreferredReplicaImbalanceCount", new Gauge[Int] { - def value(): Int = { - preferredReplicaImbalanceCount.get() - } + def value: Int = preferredReplicaImbalanceCount + } + ) + + newGauge( + "ControllerState", + new Gauge[Byte] { + def value: Byte = state.value } ) def epoch: Int = controllerContext.epoch + def state: ControllerState = eventManager.state + def clientId: String = { val controllerListener = config.listeners.find(_.listenerName == config.interBrokerListenerName).getOrElse( throw new IllegalArgumentException(s"No listener with name ${config.interBrokerListenerName} is configured.")) @@ -223,7 +226,7 @@ class KafkaController(val config: KafkaConfig, zkUtils: ZkUtils, val brokerState */ def shutdownBroker(id: Int, controlledShutdownCallback: Try[Set[TopicAndPartition]] => Unit): Unit = { val controlledShutdownEvent = ControlledShutdown(id, controlledShutdownCallback) - addToControllerEventQueue(controlledShutdownEvent) + eventManager.put(controlledShutdownEvent) } /** @@ -280,7 +283,7 @@ class KafkaController(val config: KafkaConfig, zkUtils: ZkUtils, val brokerState } private def scheduleAutoLeaderRebalanceTask(delay: Long, unit: TimeUnit): Unit = { - kafkaScheduler.schedule("auto-leader-rebalance-task", () => addToControllerEventQueue(AutoPreferredReplicaLeaderElection), + kafkaScheduler.schedule("auto-leader-rebalance-task", () => eventManager.put(AutoPreferredReplicaLeaderElection), delay = delay, unit = unit) } @@ -301,8 +304,8 @@ class KafkaController(val config: KafkaConfig, zkUtils: ZkUtils, val brokerState // shutdown leader rebalance scheduler kafkaScheduler.shutdown() - offlinePartitionCount.set(0) - preferredReplicaImbalanceCount.set(0) + offlinePartitionCount = 0 + preferredReplicaImbalanceCount = 0 // de-register partition ISR listener for on-going partition reassignment task deregisterPartitionReassignmentIsrChangeListeners() @@ -325,7 +328,7 @@ class KafkaController(val config: KafkaConfig, zkUtils: ZkUtils, val brokerState /** * Returns true if this broker is the current controller. */ - def isActive: Boolean = activeControllerId.get() == config.brokerId + def isActive: Boolean = activeControllerId == config.brokerId /** * This callback is invoked by the replica state machine's broker change listener, with the list of newly started @@ -535,7 +538,8 @@ class KafkaController(val config: KafkaConfig, zkUtils: ZkUtils, val brokerState partition: Int, reassignedPartitionContext: ReassignedPartitionsContext) { val reassignedReplicas = reassignedPartitionContext.newReplicas - val isrChangeListener = new PartitionReassignmentIsrChangeListener(this, topic, partition, reassignedReplicas.toSet) + val isrChangeListener = new PartitionReassignmentIsrChangeListener(this, eventManager, topic, partition, + reassignedReplicas.toSet) reassignedPartitionContext.isrChangeListener = isrChangeListener // register listener on the leader and isr path to wait until they catch up with the current leader zkUtils.zkClient.subscribeDataChanges(getTopicPartitionLeaderAndIsrPath(topic, partition), isrChangeListener) @@ -589,8 +593,8 @@ class KafkaController(val config: KafkaConfig, zkUtils: ZkUtils, val brokerState * elector */ def startup() = { - addToControllerEventQueue(Startup) - controllerEventThread.start() + eventManager.put(Startup) + eventManager.start() } /** @@ -599,7 +603,7 @@ class KafkaController(val config: KafkaConfig, zkUtils: ZkUtils, val brokerState * shuts down the controller channel manager, if one exists (i.e. if it was the current controller) */ def shutdown() = { - controllerEventThread.shutdown() + eventManager.close() onControllerResignation() } @@ -640,11 +644,11 @@ class KafkaController(val config: KafkaConfig, zkUtils: ZkUtils, val brokerState } private def registerSessionExpirationListener() = { - zkUtils.zkClient.subscribeStateChanges(new SessionExpirationListener(this)) + zkUtils.zkClient.subscribeStateChanges(new SessionExpirationListener(this, eventManager)) } private def registerControllerChangeListener() = { - zkUtils.zkClient.subscribeDataChanges(ZkUtils.ControllerPath, new ControllerChangeListener(this)) + zkUtils.zkClient.subscribeDataChanges(ZkUtils.ControllerPath, new ControllerChangeListener(this, eventManager)) } private def initializeControllerContext() { @@ -685,7 +689,7 @@ class KafkaController(val config: KafkaConfig, zkUtils: ZkUtils, val brokerState } private def resetControllerContext(): Unit = { - if(controllerContext.controllerChannelManager != null) { + if (controllerContext.controllerChannelManager != null) { controllerContext.controllerChannelManager.shutdown() controllerContext.controllerChannelManager = null } @@ -864,7 +868,7 @@ class KafkaController(val config: KafkaConfig, zkUtils: ZkUtils, val brokerState } def registerPartitionModificationsListener(topic: String) = { - partitionModificationsListeners.put(topic, new PartitionModificationsListener(this, topic)) + partitionModificationsListeners.put(topic, new PartitionModificationsListener(this, eventManager, topic)) zkUtils.zkClient.subscribeDataChanges(getTopicPath(topic), partitionModificationsListeners(topic)) } @@ -1106,39 +1110,36 @@ class KafkaController(val config: KafkaConfig, zkUtils: ZkUtils, val brokerState finalLeaderIsrAndControllerEpoch } - private def checkAndTriggerPartitionRebalance(): Unit = { - trace("checking need to trigger partition rebalance") - var preferredReplicasForTopicsByBrokers: Map[Int, Map[TopicAndPartition, Seq[Int]]] = controllerContext.partitionReplicaAssignment - .filterNot(p => topicDeletionManager.isTopicQueuedUpForDeletion(p._1.topic)).groupBy { - case (_, assignedReplicas) => assignedReplicas.head - } - debug("preferred replicas by broker " + preferredReplicasForTopicsByBrokers) + private def checkAndTriggerAutoLeaderRebalance(): Unit = { + trace("Checking need to trigger auto leader balancing") + val preferredReplicasForTopicsByBrokers: Map[Int, Map[TopicAndPartition, Seq[Int]]] = + controllerContext.partitionReplicaAssignment.filterNot { case (tp, _) => + topicDeletionManager.isTopicQueuedUpForDeletion(tp.topic) + }.groupBy { case (_, assignedReplicas) => assignedReplicas.head } + debug(s"Preferred replicas by broker $preferredReplicasForTopicsByBrokers") + // for each broker, check if a preferred replica election needs to be triggered - preferredReplicasForTopicsByBrokers.foreach { - case(leaderBroker, topicAndPartitionsForBroker) => { - var imbalanceRatio: Double = 0 - var topicsNotInPreferredReplica: Map[TopicAndPartition, Seq[Int]] = topicAndPartitionsForBroker - .filter { case (topicPartition, _) => - controllerContext.partitionLeadershipInfo.contains(topicPartition) && - controllerContext.partitionLeadershipInfo(topicPartition).leaderAndIsr.leader != leaderBroker - } - debug("topics not in preferred replica " + topicsNotInPreferredReplica) - val totalTopicPartitionsForBroker = topicAndPartitionsForBroker.size - val totalTopicPartitionsNotLedByBroker = topicsNotInPreferredReplica.size - imbalanceRatio = totalTopicPartitionsNotLedByBroker.toDouble / totalTopicPartitionsForBroker - trace("leader imbalance ratio for broker %d is %f".format(leaderBroker, imbalanceRatio)) - // check ratio and if greater than desired ratio, trigger a rebalance for the topic partitions - // that need to be on this broker - if (imbalanceRatio > (config.leaderImbalancePerBrokerPercentage.toDouble / 100)) { - topicsNotInPreferredReplica.keys.foreach { topicPartition => - // do this check only if the broker is live and there are no partitions being reassigned currently - // and preferred replica election is not in progress - if (controllerContext.liveBrokerIds.contains(leaderBroker) && - controllerContext.partitionsBeingReassigned.isEmpty && - !topicDeletionManager.isTopicQueuedUpForDeletion(topicPartition.topic) && - controllerContext.allTopics.contains(topicPartition.topic)) { - onPreferredReplicaElection(Set(topicPartition), true) - } + preferredReplicasForTopicsByBrokers.foreach { case (leaderBroker, topicAndPartitionsForBroker) => + val topicsNotInPreferredReplica = topicAndPartitionsForBroker.filter { case (topicPartition, _) => + val leadershipInfo = controllerContext.partitionLeadershipInfo.get(topicPartition) + leadershipInfo.map(_.leaderAndIsr.leader != leaderBroker).getOrElse(false) + } + debug(s"Topics not in preferred replica $topicsNotInPreferredReplica") + + val imbalanceRatio = topicsNotInPreferredReplica.size.toDouble / topicAndPartitionsForBroker.size + trace(s"Leader imbalance ratio for broker $leaderBroker is $imbalanceRatio") + + // check ratio and if greater than desired ratio, trigger a rebalance for the topic partitions + // that need to be on this broker + if (imbalanceRatio > (config.leaderImbalancePerBrokerPercentage.toDouble / 100)) { + topicsNotInPreferredReplica.keys.foreach { topicPartition => + // do this check only if the broker is live and there are no partitions being reassigned currently + // and preferred replica election is not in progress + if (controllerContext.liveBrokerIds.contains(leaderBroker) && + controllerContext.partitionsBeingReassigned.isEmpty && + !topicDeletionManager.isTopicQueuedUpForDeletion(topicPartition.topic) && + controllerContext.allTopics.contains(topicPartition.topic)) { + onPreferredReplicaElection(Set(topicPartition), isTriggeredByAutoRebalance = true) } } } @@ -1152,98 +1153,80 @@ class KafkaController(val config: KafkaConfig, zkUtils: ZkUtils, val brokerState } } - def addToControllerEventQueue(controllerEvent: ControllerEvent): Unit = { - controllerEventQueue.put(controllerEvent) - } + case class BrokerChange(currentBrokerList: Seq[String]) extends ControllerEvent { - class ControllerEventThread(name: String) extends ShutdownableThread(name = name) { - override def doWork(): Unit = { - val controllerEvent = controllerEventQueue.take() - try { - controllerEvent.process() - } catch { - case e: Throwable => error("Error processing event " + controllerEvent, e) - } - updateMetrics() - } - } + def state = ControllerState.BrokerChange - case class BrokerChange(currentBrokerList: Seq[String]) extends ControllerEvent { override def process(): Unit = { if (!isActive) return - controllerContext.controllerStats.leaderElectionTimer.time { - try { - val curBrokers = currentBrokerList.map(_.toInt).toSet.flatMap(zkUtils.getBrokerInfo) - val curBrokerIds = curBrokers.map(_.id) - val liveOrShuttingDownBrokerIds = controllerContext.liveOrShuttingDownBrokerIds - val newBrokerIds = curBrokerIds -- liveOrShuttingDownBrokerIds - val deadBrokerIds = liveOrShuttingDownBrokerIds -- curBrokerIds - val newBrokers = curBrokers.filter(broker => newBrokerIds(broker.id)) - controllerContext.liveBrokers = curBrokers - val newBrokerIdsSorted = newBrokerIds.toSeq.sorted - val deadBrokerIdsSorted = deadBrokerIds.toSeq.sorted - val liveBrokerIdsSorted = curBrokerIds.toSeq.sorted - info("Newly added brokers: %s, deleted brokers: %s, all live brokers: %s" - .format(newBrokerIdsSorted.mkString(","), deadBrokerIdsSorted.mkString(","), liveBrokerIdsSorted.mkString(","))) - newBrokers.foreach(controllerContext.controllerChannelManager.addBroker) - deadBrokerIds.foreach(controllerContext.controllerChannelManager.removeBroker) - if(newBrokerIds.nonEmpty) - onBrokerStartup(newBrokerIdsSorted) - if(deadBrokerIds.nonEmpty) - onBrokerFailure(deadBrokerIdsSorted) - } catch { - case e: Throwable => error("Error while handling broker changes", e) - } - } + val curBrokers = currentBrokerList.map(_.toInt).toSet.flatMap(zkUtils.getBrokerInfo) + val curBrokerIds = curBrokers.map(_.id) + val liveOrShuttingDownBrokerIds = controllerContext.liveOrShuttingDownBrokerIds + val newBrokerIds = curBrokerIds -- liveOrShuttingDownBrokerIds + val deadBrokerIds = liveOrShuttingDownBrokerIds -- curBrokerIds + val newBrokers = curBrokers.filter(broker => newBrokerIds(broker.id)) + controllerContext.liveBrokers = curBrokers + val newBrokerIdsSorted = newBrokerIds.toSeq.sorted + val deadBrokerIdsSorted = deadBrokerIds.toSeq.sorted + val liveBrokerIdsSorted = curBrokerIds.toSeq.sorted + info("Newly added brokers: %s, deleted brokers: %s, all live brokers: %s" + .format(newBrokerIdsSorted.mkString(","), deadBrokerIdsSorted.mkString(","), liveBrokerIdsSorted.mkString(","))) + newBrokers.foreach(controllerContext.controllerChannelManager.addBroker) + deadBrokerIds.foreach(controllerContext.controllerChannelManager.removeBroker) + if (newBrokerIds.nonEmpty) + onBrokerStartup(newBrokerIdsSorted) + if (deadBrokerIds.nonEmpty) + onBrokerFailure(deadBrokerIdsSorted) } } case class TopicChange(topics: Set[String]) extends ControllerEvent { + + def state = ControllerState.TopicChange + override def process(): Unit = { if (!isActive) return - try { - val newTopics = topics -- controllerContext.allTopics - val deletedTopics = controllerContext.allTopics -- topics - controllerContext.allTopics = topics - - val addedPartitionReplicaAssignment = zkUtils.getReplicaAssignmentForTopics(newTopics.toSeq) - controllerContext.partitionReplicaAssignment = controllerContext.partitionReplicaAssignment.filter(p => - !deletedTopics.contains(p._1.topic)) - controllerContext.partitionReplicaAssignment.++=(addedPartitionReplicaAssignment) - info("New topics: [%s], deleted topics: [%s], new partition replica assignment [%s]".format(newTopics, - deletedTopics, addedPartitionReplicaAssignment)) - if (newTopics.nonEmpty) - onNewTopicCreation(newTopics, addedPartitionReplicaAssignment.keySet) - } catch { - case e: Throwable => error("Error while handling new topic", e) - } + val newTopics = topics -- controllerContext.allTopics + val deletedTopics = controllerContext.allTopics -- topics + controllerContext.allTopics = topics + + val addedPartitionReplicaAssignment = zkUtils.getReplicaAssignmentForTopics(newTopics.toSeq) + controllerContext.partitionReplicaAssignment = controllerContext.partitionReplicaAssignment.filter(p => + !deletedTopics.contains(p._1.topic)) + controllerContext.partitionReplicaAssignment.++=(addedPartitionReplicaAssignment) + info("New topics: [%s], deleted topics: [%s], new partition replica assignment [%s]".format(newTopics, + deletedTopics, addedPartitionReplicaAssignment)) + if (newTopics.nonEmpty) + onNewTopicCreation(newTopics, addedPartitionReplicaAssignment.keySet) } } case class PartitionModifications(topic: String) extends ControllerEvent { + + def state = ControllerState.TopicChange + override def process(): Unit = { if (!isActive) return - try { - val partitionReplicaAssignment = zkUtils.getReplicaAssignmentForTopics(List(topic)) - val partitionsToBeAdded = partitionReplicaAssignment.filter(p => - !controllerContext.partitionReplicaAssignment.contains(p._1)) - if(topicDeletionManager.isTopicQueuedUpForDeletion(topic)) - error("Skipping adding partitions %s for topic %s since it is currently being deleted" - .format(partitionsToBeAdded.map(_._1.partition).mkString(","), topic)) - else { - if (partitionsToBeAdded.nonEmpty) { - info("New partitions to be added %s".format(partitionsToBeAdded)) - controllerContext.partitionReplicaAssignment.++=(partitionsToBeAdded) - onNewPartitionCreation(partitionsToBeAdded.keySet) - } + val partitionReplicaAssignment = zkUtils.getReplicaAssignmentForTopics(List(topic)) + val partitionsToBeAdded = partitionReplicaAssignment.filter(p => + !controllerContext.partitionReplicaAssignment.contains(p._1)) + if(topicDeletionManager.isTopicQueuedUpForDeletion(topic)) + error("Skipping adding partitions %s for topic %s since it is currently being deleted" + .format(partitionsToBeAdded.map(_._1.partition).mkString(","), topic)) + else { + if (partitionsToBeAdded.nonEmpty) { + info("New partitions to be added %s".format(partitionsToBeAdded)) + controllerContext.partitionReplicaAssignment.++=(partitionsToBeAdded) + onNewPartitionCreation(partitionsToBeAdded.keySet) } - } catch { - case e: Throwable => error("Error while handling add partitions for topic " + topic, e) } } } case class TopicDeletion(var topicsToBeDeleted: Set[String]) extends ControllerEvent { + + def state = ControllerState.TopicDeletion + override def process(): Unit = { if (!isActive) return debug("Delete topics listener fired for topics %s to be deleted".format(topicsToBeDeleted.mkString(","))) @@ -1277,6 +1260,9 @@ class KafkaController(val config: KafkaConfig, zkUtils: ZkUtils, val brokerState } case class PartitionReassignment(partitionReassignment: Map[TopicAndPartition, Seq[Int]]) extends ControllerEvent { + + def state = ControllerState.PartitionReassignment + override def process(): Unit = { if (!isActive) return val partitionsToBeReassigned = partitionReassignment.filterNot(p => controllerContext.partitionsBeingReassigned.contains(p._1)) @@ -1290,46 +1276,48 @@ class KafkaController(val config: KafkaConfig, zkUtils: ZkUtils, val brokerState initiateReassignReplicasForTopicPartition(partitionToBeReassigned._1, context) } } - } + } case class PartitionReassignmentIsrChange(topicAndPartition: TopicAndPartition, reassignedReplicas: Set[Int]) extends ControllerEvent { + + def state = ControllerState.PartitionReassignment + override def process(): Unit = { if (!isActive) return - try { // check if this partition is still being reassigned or not - controllerContext.partitionsBeingReassigned.get(topicAndPartition) match { - case Some(reassignedPartitionContext) => - // need to re-read leader and isr from zookeeper since the zkclient callback doesn't return the Stat object - val newLeaderAndIsrOpt = zkUtils.getLeaderAndIsrForPartition(topicAndPartition.topic, topicAndPartition.partition) - newLeaderAndIsrOpt match { - case Some(leaderAndIsr) => // check if new replicas have joined ISR - val caughtUpReplicas = reassignedReplicas & leaderAndIsr.isr.toSet - if(caughtUpReplicas == reassignedReplicas) { - // resume the partition reassignment process - info("%d/%d replicas have caught up with the leader for partition %s being reassigned." - .format(caughtUpReplicas.size, reassignedReplicas.size, topicAndPartition) + - "Resuming partition reassignment") - onPartitionReassignment(topicAndPartition, reassignedPartitionContext) - } - else { - info("%d/%d replicas have caught up with the leader for partition %s being reassigned." - .format(caughtUpReplicas.size, reassignedReplicas.size, topicAndPartition) + - "Replica(s) %s still need to catch up".format((reassignedReplicas -- leaderAndIsr.isr.toSet).mkString(","))) - } - case None => error("Error handling reassignment of partition %s to replicas %s as it was never created" - .format(topicAndPartition, reassignedReplicas.mkString(","))) - } - case None => - } - } catch { - case e: Throwable => error("Error while handling partition reassignment", e) + controllerContext.partitionsBeingReassigned.get(topicAndPartition) match { + case Some(reassignedPartitionContext) => + // need to re-read leader and isr from zookeeper since the zkclient callback doesn't return the Stat object + val newLeaderAndIsrOpt = zkUtils.getLeaderAndIsrForPartition(topicAndPartition.topic, topicAndPartition.partition) + newLeaderAndIsrOpt match { + case Some(leaderAndIsr) => // check if new replicas have joined ISR + val caughtUpReplicas = reassignedReplicas & leaderAndIsr.isr.toSet + if(caughtUpReplicas == reassignedReplicas) { + // resume the partition reassignment process + info("%d/%d replicas have caught up with the leader for partition %s being reassigned." + .format(caughtUpReplicas.size, reassignedReplicas.size, topicAndPartition) + + "Resuming partition reassignment") + onPartitionReassignment(topicAndPartition, reassignedPartitionContext) + } + else { + info("%d/%d replicas have caught up with the leader for partition %s being reassigned." + .format(caughtUpReplicas.size, reassignedReplicas.size, topicAndPartition) + + "Replica(s) %s still need to catch up".format((reassignedReplicas -- leaderAndIsr.isr.toSet).mkString(","))) + } + case None => error("Error handling reassignment of partition %s to replicas %s as it was never created" + .format(topicAndPartition, reassignedReplicas.mkString(","))) + } + case None => } } } case class IsrChangeNotification(sequenceNumbers: Seq[String]) extends ControllerEvent { + + def state = ControllerState.IsrChange + override def process(): Unit = { if (!isActive) return try { @@ -1381,6 +1369,9 @@ class KafkaController(val config: KafkaConfig, zkUtils: ZkUtils, val brokerState } case class PreferredReplicaLeaderElection(partitions: Set[TopicAndPartition]) extends ControllerEvent { + + def state = ControllerState.ManualLeaderBalance + override def process(): Unit = { if (!isActive) return val partitionsForTopicsToBeDeleted = partitions.filter(p => topicDeletionManager.isTopicQueuedUpForDeletion(p.topic)) @@ -1390,13 +1381,17 @@ class KafkaController(val config: KafkaConfig, zkUtils: ZkUtils, val brokerState } onPreferredReplicaElection(partitions -- partitionsForTopicsToBeDeleted) } + } case object AutoPreferredReplicaLeaderElection extends ControllerEvent { + + def state = ControllerState.AutoLeaderBalance + override def process(): Unit = { if (!isActive) return try { - checkAndTriggerPartitionRebalance() + checkAndTriggerAutoLeaderRebalance() } finally { scheduleAutoLeaderRebalanceTask(delay = config.leaderImbalanceCheckIntervalSeconds, unit = TimeUnit.SECONDS) } @@ -1404,6 +1399,9 @@ class KafkaController(val config: KafkaConfig, zkUtils: ZkUtils, val brokerState } case class ControlledShutdown(id: Int, controlledShutdownCallback: Try[Set[TopicAndPartition]] => Unit) extends ControllerEvent { + + def state = ControllerState.ControlledShutdown + override def process(): Unit = { val controlledShutdownResult = Try { doControlledShutdown(id) } controlledShutdownCallback(controlledShutdownResult) @@ -1473,6 +1471,9 @@ class KafkaController(val config: KafkaConfig, zkUtils: ZkUtils, val brokerState } case class TopicDeletionStopReplicaResult(stopReplicaResponseObj: AbstractResponse, replicaId: Int) extends ControllerEvent { + + def state = ControllerState.TopicDeletion + override def process(): Unit = { import JavaConverters._ if (!isActive) return @@ -1494,57 +1495,70 @@ class KafkaController(val config: KafkaConfig, zkUtils: ZkUtils, val brokerState } case object Startup extends ControllerEvent { + + def state = ControllerState.ControllerChange + override def process(): Unit = { registerSessionExpirationListener() registerControllerChangeListener() elect() } + } case class ControllerChange(newControllerId: Int) extends ControllerEvent { + + def state = ControllerState.ControllerChange + override def process(): Unit = { val wasActiveBeforeChange = isActive - activeControllerId.set(newControllerId) + activeControllerId = newControllerId if (wasActiveBeforeChange && !isActive) { onControllerResignation() } } + } case object Reelect extends ControllerEvent { + + def state = ControllerState.ControllerChange + override def process(): Unit = { val wasActiveBeforeChange = isActive - activeControllerId.set(getControllerID()) + activeControllerId = getControllerID() if (wasActiveBeforeChange && !isActive) { onControllerResignation() } elect() } + } private def updateMetrics(): Unit = { - val opc = if (!isActive) - 0 - else - controllerContext.partitionLeadershipInfo.count(p => - !controllerContext.liveOrShuttingDownBrokerIds.contains(p._2.leaderAndIsr.leader) && - !topicDeletionManager.isTopicQueuedUpForDeletion(p._1.topic) - ) - offlinePartitionCount.set(opc) - - val pric = if (!isActive) - 0 - else - controllerContext.partitionReplicaAssignment.count { case (topicPartition, replicas) => - controllerContext.partitionLeadershipInfo.contains(topicPartition) && - controllerContext.partitionLeadershipInfo(topicPartition).leaderAndIsr.leader != replicas.head && - !topicDeletionManager.isTopicQueuedUpForDeletion(topicPartition.topic) + offlinePartitionCount = + if (!isActive) 0 + else { + controllerContext.partitionLeadershipInfo.count { case (tp, leadershipInfo) => + !controllerContext.liveOrShuttingDownBrokerIds.contains(leadershipInfo.leaderAndIsr.leader) && + !topicDeletionManager.isTopicQueuedUpForDeletion(tp.topic) + } + } + + preferredReplicaImbalanceCount = + if (!isActive) 0 + else { + controllerContext.partitionReplicaAssignment.count { case (topicPartition, replicas) => + val preferredReplica = replicas.head + val leadershipInfo = controllerContext.partitionLeadershipInfo.get(topicPartition) + leadershipInfo.map(_.leaderAndIsr.leader != preferredReplica).getOrElse(false) && + !topicDeletionManager.isTopicQueuedUpForDeletion(topicPartition.topic) + } } - preferredReplicaImbalanceCount.set(pric) } private def triggerControllerMove(): Unit = { - activeControllerId.set(-1) + activeControllerId = -1 controllerContext.zkUtils.deletePath(ZkUtils.ControllerPath) } @@ -1552,14 +1566,14 @@ class KafkaController(val config: KafkaConfig, zkUtils: ZkUtils, val brokerState val timestamp = time.milliseconds val electString = ZkUtils.controllerZkData(config.brokerId, timestamp) - activeControllerId.set(getControllerID()) + activeControllerId = getControllerID() /* * We can get here during the initial startup and the handleDeleted ZK callback. Because of the potential race condition, * it's possible that the controller has already been elected when we get here. This check will prevent the following * createEphemeralPath method from getting into an infinite loop if this broker is already the controller. */ - if(activeControllerId.get() != -1) { - debug("Broker %d has been elected as the controller, so stopping the election process.".format(activeControllerId.get())) + if (activeControllerId != -1) { + debug("Broker %d has been elected as the controller, so stopping the election process.".format(activeControllerId)) return } @@ -1570,15 +1584,15 @@ class KafkaController(val config: KafkaConfig, zkUtils: ZkUtils, val brokerState controllerContext.zkUtils.isSecure) zkCheckedEphemeral.create() info(config.brokerId + " successfully elected as the controller") - activeControllerId.set(config.brokerId) + activeControllerId = config.brokerId onControllerFailover() } catch { case _: ZkNodeExistsException => // If someone else has written the path, then - activeControllerId.set(getControllerID) + activeControllerId = getControllerID - if (activeControllerId.get() != -1) - debug("Broker %d was elected as controller instead of broker %d".format(activeControllerId.get(), config.brokerId)) + if (activeControllerId != -1) + debug("Broker %d was elected as controller instead of broker %d".format(activeControllerId, config.brokerId)) else warn("A controller has been elected but just resigned, this will result in another round of election") @@ -1592,23 +1606,23 @@ class KafkaController(val config: KafkaConfig, zkUtils: ZkUtils, val brokerState /** * This is the zookeeper listener that triggers all the state transitions for a replica */ -class BrokerChangeListener(controller: KafkaController) extends IZkChildListener with Logging { +class BrokerChangeListener(controller: KafkaController, eventManager: ControllerEventManager) extends IZkChildListener with Logging { override def handleChildChange(parentPath: String, currentChilds: java.util.List[String]): Unit = { import JavaConverters._ - controller.addToControllerEventQueue(controller.BrokerChange(currentChilds.asScala)) + eventManager.put(controller.BrokerChange(currentChilds.asScala)) } } -class TopicChangeListener(controller: KafkaController) extends IZkChildListener with Logging { +class TopicChangeListener(controller: KafkaController, eventManager: ControllerEventManager) extends IZkChildListener with Logging { override def handleChildChange(parentPath: String, currentChilds: java.util.List[String]): Unit = { import JavaConverters._ - controller.addToControllerEventQueue(controller.TopicChange(currentChilds.asScala.toSet)) + eventManager.put(controller.TopicChange(currentChilds.asScala.toSet)) } } -class PartitionModificationsListener(controller: KafkaController, topic: String) extends IZkDataListener with Logging { +class PartitionModificationsListener(controller: KafkaController, eventManager: ControllerEventManager, topic: String) extends IZkDataListener with Logging { override def handleDataChange(dataPath: String, data: Any): Unit = { - controller.addToControllerEventQueue(controller.PartitionModifications(topic)) + eventManager.put(controller.PartitionModifications(topic)) } override def handleDataDeleted(dataPath: String): Unit = {} @@ -1619,10 +1633,10 @@ class PartitionModificationsListener(controller: KafkaController, topic: String) * 1. Add the topic to be deleted to the delete topics cache, only if the topic exists * 2. If there are topics to be deleted, it signals the delete topic thread */ -class TopicDeletionListener(controller: KafkaController) extends IZkChildListener with Logging { +class TopicDeletionListener(controller: KafkaController, eventManager: ControllerEventManager) extends IZkChildListener with Logging { override def handleChildChange(parentPath: String, currentChilds: java.util.List[String]): Unit = { import JavaConverters._ - controller.addToControllerEventQueue(controller.TopicDeletion(currentChilds.asScala.toSet)) + eventManager.put(controller.TopicDeletion(currentChilds.asScala.toSet)) } } @@ -1634,18 +1648,19 @@ class TopicDeletionListener(controller: KafkaController) extends IZkChildListene * If any of the above conditions are satisfied, it logs an error and removes the partition from list of reassigned * partitions. */ -class PartitionReassignmentListener(controller: KafkaController) extends IZkDataListener with Logging { +class PartitionReassignmentListener(controller: KafkaController, eventManager: ControllerEventManager) extends IZkDataListener with Logging { override def handleDataChange(dataPath: String, data: Any): Unit = { val partitionReassignment = ZkUtils.parsePartitionReassignmentData(data.toString) - controller.addToControllerEventQueue(controller.PartitionReassignment(partitionReassignment)) + eventManager.put(controller.PartitionReassignment(partitionReassignment)) } override def handleDataDeleted(dataPath: String): Unit = {} } -class PartitionReassignmentIsrChangeListener(controller: KafkaController, topic: String, partition: Int, reassignedReplicas: Set[Int]) extends IZkDataListener with Logging { +class PartitionReassignmentIsrChangeListener(controller: KafkaController, eventManager: ControllerEventManager, + topic: String, partition: Int, reassignedReplicas: Set[Int]) extends IZkDataListener with Logging { override def handleDataChange(dataPath: String, data: Any): Unit = { - controller.addToControllerEventQueue(controller.PartitionReassignmentIsrChange(TopicAndPartition(topic, partition), reassignedReplicas)) + eventManager.put(controller.PartitionReassignmentIsrChange(TopicAndPartition(topic, partition), reassignedReplicas)) } override def handleDataDeleted(dataPath: String): Unit = {} @@ -1654,10 +1669,10 @@ class PartitionReassignmentIsrChangeListener(controller: KafkaController, topic: /** * Called when replica leader initiates isr change */ -class IsrChangeNotificationListener(controller: KafkaController) extends IZkChildListener with Logging { +class IsrChangeNotificationListener(controller: KafkaController, eventManager: ControllerEventManager) extends IZkChildListener with Logging { override def handleChildChange(parentPath: String, currentChilds: java.util.List[String]): Unit = { import JavaConverters._ - controller.addToControllerEventQueue(controller.IsrChangeNotification(currentChilds.asScala)) + eventManager.put(controller.IsrChangeNotification(currentChilds.asScala)) } } @@ -1669,26 +1684,26 @@ object IsrChangeNotificationListener { * Starts the preferred replica leader election for the list of partitions specified under * /admin/preferred_replica_election - */ -class PreferredReplicaElectionListener(controller: KafkaController) extends IZkDataListener with Logging { +class PreferredReplicaElectionListener(controller: KafkaController, eventManager: ControllerEventManager) extends IZkDataListener with Logging { override def handleDataChange(dataPath: String, data: Any): Unit = { val partitions = PreferredReplicaLeaderElectionCommand.parsePreferredReplicaElectionData(data.toString) - controller.addToControllerEventQueue(controller.PreferredReplicaLeaderElection(partitions)) + eventManager.put(controller.PreferredReplicaLeaderElection(partitions)) } override def handleDataDeleted(dataPath: String): Unit = {} } -class ControllerChangeListener(controller: KafkaController) extends IZkDataListener { +class ControllerChangeListener(controller: KafkaController, eventManager: ControllerEventManager) extends IZkDataListener { override def handleDataChange(dataPath: String, data: Any): Unit = { - controller.addToControllerEventQueue(controller.ControllerChange(KafkaController.parseControllerId(data.toString))) + eventManager.put(controller.ControllerChange(KafkaController.parseControllerId(data.toString))) } override def handleDataDeleted(dataPath: String): Unit = { - controller.addToControllerEventQueue(controller.Reelect) + eventManager.put(controller.Reelect) } } -class SessionExpirationListener(controller: KafkaController) extends IZkStateListener with Logging { +class SessionExpirationListener(controller: KafkaController, eventManager: ControllerEventManager) extends IZkStateListener with Logging { override def handleStateChanged(state: KeeperState) { // do nothing, since zkclient will do reconnect for us. } @@ -1701,7 +1716,7 @@ class SessionExpirationListener(controller: KafkaController) extends IZkStateLis */ @throws[Exception] override def handleNewSession(): Unit = { - controller.addToControllerEventQueue(controller.Reelect) + eventManager.put(controller.Reelect) } override def handleSessionEstablishmentError(error: Throwable): Unit = { @@ -1731,9 +1746,16 @@ case class LeaderIsrAndControllerEpoch(leaderAndIsr: LeaderAndIsr, controllerEpo private[controller] class ControllerStats extends KafkaMetricsGroup { val uncleanLeaderElectionRate = newMeter("UncleanLeaderElectionsPerSec", "elections", TimeUnit.SECONDS) - val leaderElectionTimer = new KafkaTimer(newTimer("LeaderElectionRateAndTimeMs", TimeUnit.MILLISECONDS, TimeUnit.SECONDS)) + + val rateAndTimeMetrics: Map[ControllerState, KafkaTimer] = ControllerState.values.flatMap { state => + state.rateAndTimeMetricName.map { metricName => + state -> new KafkaTimer(newTimer(s"$metricName", TimeUnit.MILLISECONDS, TimeUnit.SECONDS)) + } + }.toMap + } sealed trait ControllerEvent { + def state: ControllerState def process(): Unit } http://git-wip-us.apache.org/repos/asf/kafka/blob/999c247e/core/src/main/scala/kafka/controller/PartitionLeaderSelector.scala ---------------------------------------------------------------------- diff --git a/core/src/main/scala/kafka/controller/PartitionLeaderSelector.scala b/core/src/main/scala/kafka/controller/PartitionLeaderSelector.scala index 42db26b..54bbb89 100644 --- a/core/src/main/scala/kafka/controller/PartitionLeaderSelector.scala +++ b/core/src/main/scala/kafka/controller/PartitionLeaderSelector.scala @@ -74,7 +74,7 @@ class OfflinePartitionLeaderSelector(controllerContext: ControllerContext, confi throw new NoReplicaOnlineException(s"No replica for partition $topicAndPartition is alive. Live " + s"brokers are: [${controllerContext.liveBrokerIds}]. Assigned replicas are: [$assignedReplicas].") } else { - controllerContext.controllerStats.uncleanLeaderElectionRate.mark() + controllerContext.stats.uncleanLeaderElectionRate.mark() val newLeader = liveAssignedReplicas.head warn(s"No broker in ISR is alive for $topicAndPartition. Elect leader $newLeader from live " + s"brokers ${liveAssignedReplicas.mkString(",")}. There's potential data loss.") http://git-wip-us.apache.org/repos/asf/kafka/blob/999c247e/core/src/main/scala/kafka/controller/TopicDeletionManager.scala ---------------------------------------------------------------------- diff --git a/core/src/main/scala/kafka/controller/TopicDeletionManager.scala b/core/src/main/scala/kafka/controller/TopicDeletionManager.scala index b1f98b5..4920a6b 100755 --- a/core/src/main/scala/kafka/controller/TopicDeletionManager.scala +++ b/core/src/main/scala/kafka/controller/TopicDeletionManager.scala @@ -57,7 +57,7 @@ import scala.collection.{Set, mutable} * it marks the topic for deletion retry. * @param controller */ -class TopicDeletionManager(controller: KafkaController) extends Logging { +class TopicDeletionManager(controller: KafkaController, eventManager: ControllerEventManager) extends Logging { this.logIdent = "[Topic Deletion Manager " + controller.config.brokerId + "], " val controllerContext = controller.controllerContext val partitionStateMachine = controller.partitionStateMachine @@ -298,8 +298,9 @@ class TopicDeletionManager(controller: KafkaController) extends Logging { replicaStateMachine.handleStateChanges(replicasForDeletionRetry, OfflineReplica) debug("Deletion started for replicas %s".format(replicasForDeletionRetry.mkString(","))) controller.replicaStateMachine.handleStateChanges(replicasForDeletionRetry, ReplicaDeletionStarted, - new Callbacks.CallbackBuilder().stopReplicaCallback((stopReplicaResponseObj, replicaId) => controller.addToControllerEventQueue(controller.TopicDeletionStopReplicaResult(stopReplicaResponseObj, replicaId))).build) - if(deadReplicasForTopic.nonEmpty) { + new Callbacks.CallbackBuilder().stopReplicaCallback((stopReplicaResponseObj, replicaId) => + eventManager.put(controller.TopicDeletionStopReplicaResult(stopReplicaResponseObj, replicaId))).build) + if (deadReplicasForTopic.nonEmpty) { debug("Dead Replicas (%s) found for topic %s".format(deadReplicasForTopic.mkString(","), topic)) markTopicIneligibleForDeletion(Set(topic)) } http://git-wip-us.apache.org/repos/asf/kafka/blob/999c247e/core/src/main/scala/kafka/metrics/KafkaTimer.scala ---------------------------------------------------------------------- diff --git a/core/src/main/scala/kafka/metrics/KafkaTimer.scala b/core/src/main/scala/kafka/metrics/KafkaTimer.scala index 7f76f2d..24b54d6 100644 --- a/core/src/main/scala/kafka/metrics/KafkaTimer.scala +++ b/core/src/main/scala/kafka/metrics/KafkaTimer.scala @@ -28,12 +28,8 @@ class KafkaTimer(metric: Timer) { def time[A](f: => A): A = { val ctx = metric.time - try { - f - } - finally { - ctx.stop() - } + try f + finally ctx.stop() } } http://git-wip-us.apache.org/repos/asf/kafka/blob/999c247e/core/src/main/scala/kafka/server/ReplicaManager.scala ---------------------------------------------------------------------- diff --git a/core/src/main/scala/kafka/server/ReplicaManager.scala b/core/src/main/scala/kafka/server/ReplicaManager.scala index 7310a06..fedbafb 100644 --- a/core/src/main/scala/kafka/server/ReplicaManager.scala +++ b/core/src/main/scala/kafka/server/ReplicaManager.scala @@ -171,8 +171,9 @@ class ReplicaManager(val config: KafkaConfig, def value = underReplicatedPartitionCount } ) - val isrExpandRate = newMeter("IsrExpandsPerSec", "expands", TimeUnit.SECONDS) - val isrShrinkRate = newMeter("IsrShrinksPerSec", "shrinks", TimeUnit.SECONDS) + val isrExpandRate = newMeter("IsrExpandsPerSec", "expands", TimeUnit.SECONDS) + val isrShrinkRate = newMeter("IsrShrinksPerSec", "shrinks", TimeUnit.SECONDS) + val failedIsrUpdatesRate = newMeter("FailedIsrUpdatesPerSec", "failedUpdates", TimeUnit.SECONDS) def underReplicatedPartitionCount: Int = getLeaderPartitions.count(_.isUnderReplicated) http://git-wip-us.apache.org/repos/asf/kafka/blob/999c247e/core/src/test/scala/unit/kafka/controller/ControllerEventManagerTest.scala ---------------------------------------------------------------------- diff --git a/core/src/test/scala/unit/kafka/controller/ControllerEventManagerTest.scala b/core/src/test/scala/unit/kafka/controller/ControllerEventManagerTest.scala new file mode 100644 index 0000000..fccb566 --- /dev/null +++ b/core/src/test/scala/unit/kafka/controller/ControllerEventManagerTest.scala @@ -0,0 +1,94 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package kafka.controller + +import java.util.concurrent.CountDownLatch +import java.util.concurrent.atomic.AtomicInteger + +import com.yammer.metrics.Metrics +import com.yammer.metrics.core.Timer +import kafka.utils.TestUtils +import org.easymock.{EasyMock, IAnswer} +import org.junit.Test +import org.junit.Assert.{assertEquals, fail} + +import scala.collection.JavaConverters._ + +class ControllerEventManagerTest { + + private var controllerEventManager: ControllerEventManager = _ + + def tearDown(): Unit = { + if (controllerEventManager != null) + controllerEventManager.close() + } + + @Test + def testSuccessfulEvent(): Unit = { + check("kafka.controller:type=ControllerStats,name=AutoLeaderBalanceRateAndTimeMs", ControllerState.AutoLeaderBalance, + () => Unit) + } + + @Test + def testEventThatThrowsException(): Unit = { + check("kafka.controller:type=ControllerStats,name=LeaderElectionRateAndTimeMs", ControllerState.BrokerChange, + () => throw new NullPointerException) + } + + private def check(metricName: String, controllerState: ControllerState, process: () => Unit): Unit = { + val controllerStats = new ControllerStats + val eventProcessedListenerCount = new AtomicInteger + controllerEventManager = new ControllerEventManager(controllerStats.rateAndTimeMetrics, + _ => eventProcessedListenerCount.incrementAndGet) + controllerEventManager.start() + + val initialTimerCount = timer(metricName).count + + // `ControllerEvent` is sealed so we use EasyMock to create a subclass + val eventMock = EasyMock.createMock(classOf[ControllerEvent]) + EasyMock.expect(eventMock.state).andReturn(controllerState) + + // Only return from `process()` once we have checked `controllerEventManager.state` + val latch = new CountDownLatch(1) + EasyMock.expect(eventMock.process()).andAnswer(new IAnswer[Unit]() { + def answer(): Unit = { + latch.await() + process() + } + }) + + EasyMock.replay(eventMock) + + controllerEventManager.put(eventMock) + TestUtils.waitUntilTrue(() => controllerEventManager.state == controllerState, + s"Controller state is not $controllerState") + latch.countDown() + + TestUtils.waitUntilTrue(() => controllerEventManager.state == ControllerState.Idle, + "Controller state has not changed back to Idle") + assertEquals(1, eventProcessedListenerCount.get) + + assertEquals("Timer has not been updated", initialTimerCount + 1, timer(metricName).count) + } + + private def timer(metricName: String): Timer = { + Metrics.defaultRegistry.allMetrics.asScala.filterKeys(_.getMBeanName == metricName).values.headOption + .getOrElse(fail(s"Unable to find metric $metricName")).asInstanceOf[Timer] + } + +} http://git-wip-us.apache.org/repos/asf/kafka/blob/999c247e/core/src/test/scala/unit/kafka/controller/ControllerIntegrationTest.scala ---------------------------------------------------------------------- diff --git a/core/src/test/scala/unit/kafka/controller/ControllerIntegrationTest.scala b/core/src/test/scala/unit/kafka/controller/ControllerIntegrationTest.scala index 1837ba2..2df93c7 100644 --- a/core/src/test/scala/unit/kafka/controller/ControllerIntegrationTest.scala +++ b/core/src/test/scala/unit/kafka/controller/ControllerIntegrationTest.scala @@ -17,12 +17,17 @@ package kafka.controller +import com.yammer.metrics.Metrics +import com.yammer.metrics.core.Timer import kafka.api.LeaderAndIsr import kafka.common.TopicAndPartition import kafka.server.{KafkaConfig, KafkaServer} import kafka.utils.{TestUtils, ZkUtils} import kafka.zk.ZooKeeperTestHarness import org.junit.{After, Before, Test} +import org.junit.Assert.assertTrue + +import scala.collection.JavaConverters._ class ControllerIntegrationTest extends ZooKeeperTestHarness { var servers = Seq.empty[KafkaServer] @@ -129,6 +134,10 @@ class ControllerIntegrationTest extends ZooKeeperTestHarness { def testPartitionReassignment(): Unit = { servers = makeServers(2) val controllerId = TestUtils.waitUntilControllerElected(zkUtils) + + val metricName = s"kafka.controller:type=ControllerStats,name=${ControllerState.PartitionReassignment.rateAndTimeMetricName.get}" + val timerCount = timer(metricName).count + val otherBrokerId = servers.map(_.config.brokerId).filter(_ != controllerId).head val tp = TopicAndPartition("t", 0) val assignment = Map(tp.partition -> Seq(controllerId)) @@ -141,6 +150,9 @@ class ControllerIntegrationTest extends ZooKeeperTestHarness { "failed to get updated partition assignment on topic znode after partition reassignment") TestUtils.waitUntilTrue(() => !zkUtils.pathExists(ZkUtils.ReassignPartitionsPath), "failed to remove reassign partitions path after completion") + + val updatedTimerCount = timer(metricName).count + assertTrue(s"Timer count $updatedTimerCount should be greater than $timerCount", updatedTimerCount > timerCount) } @Test @@ -312,4 +324,10 @@ class ControllerIntegrationTest extends ZooKeeperTestHarness { } configs.map(config => TestUtils.createServer(KafkaConfig.fromProps(config))) } -} \ No newline at end of file + + private def timer(metricName: String): Timer = { + Metrics.defaultRegistry.allMetrics.asScala.filterKeys(_.getMBeanName == metricName).values.headOption + .getOrElse(fail(s"Unable to find metric $metricName")).asInstanceOf[Timer] + } + +} http://git-wip-us.apache.org/repos/asf/kafka/blob/999c247e/core/src/test/scala/unit/kafka/integration/MetricsDuringTopicCreationDeletionTest.scala ---------------------------------------------------------------------- diff --git a/core/src/test/scala/unit/kafka/integration/MetricsDuringTopicCreationDeletionTest.scala b/core/src/test/scala/unit/kafka/integration/MetricsDuringTopicCreationDeletionTest.scala index 19a0f9d..60a6fb6 100644 --- a/core/src/test/scala/unit/kafka/integration/MetricsDuringTopicCreationDeletionTest.scala +++ b/core/src/test/scala/unit/kafka/integration/MetricsDuringTopicCreationDeletionTest.scala @@ -56,13 +56,9 @@ class MetricsDuringTopicCreationDeletionTest extends KafkaServerTestHarness with // Do some Metrics Registry cleanup by removing the metrics that this test checks. // This is a test workaround to the issue that prior harness runs may have left a populated registry. // see https://issues.apache.org/jira/browse/KAFKA-4605 - for (m <- (testedMetrics)) { - Metrics.defaultRegistry.allMetrics.asScala - .filterKeys(k => k.getName.endsWith(m)) - .headOption match { - case Some(e) => Metrics.defaultRegistry.removeMetric(e._1) - case None => - } + for (m <- testedMetrics) { + val metricName = Metrics.defaultRegistry.allMetrics.asScala.keys.find(_.getName.endsWith(m)) + metricName.foreach(Metrics.defaultRegistry.removeMetric) } super.setUp
