This is an automated email from the ASF dual-hosted git repository. junrao pushed a commit to branch trunk in repository https://gitbox.apache.org/repos/asf/kafka.git
The following commit(s) were added to refs/heads/trunk by this push: new 2ef6ee2 KAFKA-6630: Speed up the processing of TopicDeletionStopReplicaResponseReceived events on the controller (#4668) 2ef6ee2 is described below commit 2ef6ee2338178c7501f5bd4c7cce5f4cea9d3e17 Author: gitlw <lucasatu...@gmail.com> AuthorDate: Thu Mar 29 22:08:28 2018 -0700 KAFKA-6630: Speed up the processing of TopicDeletionStopReplicaResponseReceived events on the controller (#4668) Reviewed by Jun Rao <jun...@gmail.com> --- .../scala/kafka/controller/ControllerContext.scala | 90 ++++++++++++---- .../scala/kafka/controller/KafkaController.scala | 117 ++++++++++----------- .../kafka/controller/PartitionStateMachine.scala | 2 +- .../kafka/controller/ReplicaStateMachine.scala | 7 +- .../kafka/controller/TopicDeletionManager.scala | 3 +- .../controller/PartitionStateMachineTest.scala | 16 +-- .../kafka/controller/ReplicaStateMachineTest.scala | 12 +-- 7 files changed, 148 insertions(+), 99 deletions(-) diff --git a/core/src/main/scala/kafka/controller/ControllerContext.scala b/core/src/main/scala/kafka/controller/ControllerContext.scala index 541bce8..f4671cf 100644 --- a/core/src/main/scala/kafka/controller/ControllerContext.scala +++ b/core/src/main/scala/kafka/controller/ControllerContext.scala @@ -31,14 +31,46 @@ class ControllerContext { var epoch: Int = KafkaController.InitialControllerEpoch - 1 var epochZkVersion: Int = KafkaController.InitialControllerEpochZkVersion - 1 var allTopics: Set[String] = Set.empty - var partitionReplicaAssignment: mutable.Map[TopicPartition, Seq[Int]] = mutable.Map.empty - var partitionLeadershipInfo: mutable.Map[TopicPartition, LeaderIsrAndControllerEpoch] = mutable.Map.empty + private var partitionReplicaAssignmentUnderlying: mutable.Map[String, mutable.Map[Int, Seq[Int]]] = mutable.Map.empty + val partitionLeadershipInfo: mutable.Map[TopicPartition, LeaderIsrAndControllerEpoch] = mutable.Map.empty val partitionsBeingReassigned: mutable.Map[TopicPartition, ReassignedPartitionsContext] = mutable.Map.empty val replicasOnOfflineDirs: mutable.Map[Int, Set[TopicPartition]] = mutable.Map.empty private var liveBrokersUnderlying: Set[Broker] = Set.empty private var liveBrokerIdsUnderlying: Set[Int] = Set.empty + def partitionReplicaAssignment(topicPartition: TopicPartition): Seq[Int] = { + partitionReplicaAssignmentUnderlying.getOrElse(topicPartition.topic, mutable.Map.empty) + .getOrElse(topicPartition.partition, Seq.empty) + } + + private def clearTopicsState(): Unit = { + allTopics = Set.empty + partitionReplicaAssignmentUnderlying.clear() + partitionLeadershipInfo.clear() + partitionsBeingReassigned.clear() + replicasOnOfflineDirs.clear() + } + + def updatePartitionReplicaAssignment(topicPartition: TopicPartition, newReplicas: Seq[Int]): Unit = { + partitionReplicaAssignmentUnderlying.getOrElseUpdate(topicPartition.topic, mutable.Map.empty) + .put(topicPartition.partition, newReplicas) + } + + def partitionReplicaAssignmentForTopic(topic : String): Map[TopicPartition, Seq[Int]] = { + partitionReplicaAssignmentUnderlying.getOrElse(topic, Map.empty).map { + case (partition, replicas) => (new TopicPartition(topic, partition), replicas) + }.toMap + } + + def allPartitions: Set[TopicPartition] = { + partitionReplicaAssignmentUnderlying.flatMap { + case (topic, topicReplicaAssignment) => topicReplicaAssignment.map { + case (partition, _) => new TopicPartition(topic, partition) + } + }.toSet + } + // setter def liveBrokers_=(brokers: Set[Broker]) { liveBrokersUnderlying = brokers @@ -53,8 +85,12 @@ class ControllerContext { def liveOrShuttingDownBrokers = liveBrokersUnderlying def partitionsOnBroker(brokerId: Int): Set[TopicPartition] = { - partitionReplicaAssignment.collect { - case (topicPartition, replicas) if replicas.contains(brokerId) => topicPartition + partitionReplicaAssignmentUnderlying.flatMap { + case (topic, topicReplicaAssignment) => topicReplicaAssignment.filter { + case (_, replicas) => replicas.contains(brokerId) + }.map { + case (partition, _) => new TopicPartition(topic, partition) + } }.toSet } @@ -68,22 +104,26 @@ class ControllerContext { def replicasOnBrokers(brokerIds: Set[Int]): Set[PartitionAndReplica] = { brokerIds.flatMap { brokerId => - partitionReplicaAssignment.collect { case (topicPartition, replicas) if replicas.contains(brokerId) => - PartitionAndReplica(topicPartition, brokerId) + partitionReplicaAssignmentUnderlying.flatMap { + case (topic, topicReplicaAssignment) => topicReplicaAssignment.collect { + case (partition, replicas) if replicas.contains(brokerId) => + PartitionAndReplica(new TopicPartition(topic, partition), brokerId) + } } - }.toSet + } } def replicasForTopic(topic: String): Set[PartitionAndReplica] = { - partitionReplicaAssignment - .filter { case (topicPartition, _) => topicPartition.topic == topic } - .flatMap { case (topicPartition, replicas) => - replicas.map(PartitionAndReplica(topicPartition, _)) - }.toSet + partitionReplicaAssignmentUnderlying.getOrElse(topic, mutable.Map.empty).flatMap { + case (partition, replicas) => replicas.map(r => PartitionAndReplica(new TopicPartition(topic, partition), r)) + }.toSet } - def partitionsForTopic(topic: String): collection.Set[TopicPartition] = - partitionReplicaAssignment.keySet.filter(topicPartition => topicPartition.topic == topic) + def partitionsForTopic(topic: String): collection.Set[TopicPartition] = { + partitionReplicaAssignmentUnderlying.getOrElse(topic, mutable.Map.empty).map { + case (partition, _) => new TopicPartition(topic, partition) + }.toSet + } def allLiveReplicas(): Set[PartitionAndReplica] = { replicasOnBrokers(liveBrokerIds).filter { partitionAndReplica => @@ -98,10 +138,24 @@ class ControllerContext { } } - def removeTopic(topic: String) = { - partitionLeadershipInfo = partitionLeadershipInfo.filter { case (topicPartition, _) => topicPartition.topic != topic } - partitionReplicaAssignment = partitionReplicaAssignment.filter { case (topicPartition, _) => topicPartition.topic != topic } - allTopics -= topic + def resetContext(): Unit = { + if (controllerChannelManager != null) { + controllerChannelManager.shutdown() + controllerChannelManager = null + } + shuttingDownBrokerIds.clear() + epoch = 0 + epochZkVersion = 0 + clearTopicsState() + liveBrokers = Set.empty } + def removeTopic(topic: String): Unit = { + allTopics -= topic + partitionReplicaAssignmentUnderlying.remove(topic) + partitionLeadershipInfo.foreach { + case (topicPartition, _) if topicPartition.topic == topic => partitionLeadershipInfo.remove(topicPartition) + case _ => + } + } } diff --git a/core/src/main/scala/kafka/controller/KafkaController.scala b/core/src/main/scala/kafka/controller/KafkaController.scala index 2cb3f7c..4778a7a 100644 --- a/core/src/main/scala/kafka/controller/KafkaController.scala +++ b/core/src/main/scala/kafka/controller/KafkaController.scala @@ -309,7 +309,7 @@ class KafkaController(val config: KafkaConfig, zkClient: KafkaZkClient, time: Ti replicaStateMachine.shutdown() zkClient.unregisterZNodeChildChangeHandler(brokerChangeHandler.path) - resetControllerContext() + controllerContext.resetContext() info("Resigned") } @@ -569,28 +569,28 @@ class KafkaController(val config: KafkaConfig, zkClient: KafkaZkClient, time: Ti } val newReplicas = reassignedPartitionContext.newReplicas val topic = tp.topic - controllerContext.partitionReplicaAssignment.get(tp) match { - case Some(assignedReplicas) => - if (assignedReplicas == newReplicas) { - info(s"Partition $tp to be reassigned is already assigned to replicas " + - s"${newReplicas.mkString(",")}. Ignoring request for partition reassignment.") - removePartitionFromReassignedPartitions(tp) - } else { - try { - info(s"Handling reassignment of partition $tp to new replicas ${newReplicas.mkString(",")}") - // first register ISR change listener - reassignedPartitionContext.registerReassignIsrChangeHandler(zkClient) - // mark topic ineligible for deletion for the partitions being reassigned - topicDeletionManager.markTopicIneligibleForDeletion(Set(topic)) - onPartitionReassignment(tp, reassignedPartitionContext) - } catch { - case e: Throwable => - error(s"Error completing reassignment of partition $tp", e) - // remove the partition from the admin path to unblock the admin client - removePartitionFromReassignedPartitions(tp) - } + val assignedReplicas = controllerContext.partitionReplicaAssignment(tp) + if (assignedReplicas.nonEmpty) { + if (assignedReplicas == newReplicas) { + info(s"Partition $tp to be reassigned is already assigned to replicas " + + s"${newReplicas.mkString(",")}. Ignoring request for partition reassignment.") + removePartitionFromReassignedPartitions(tp) + } else { + try { + info(s"Handling reassignment of partition $tp to new replicas ${newReplicas.mkString(",")}") + // first register ISR change listener + reassignedPartitionContext.registerReassignIsrChangeHandler(zkClient) + // mark topic ineligible for deletion for the partitions being reassigned + topicDeletionManager.markTopicIneligibleForDeletion(Set(topic)) + onPartitionReassignment(tp, reassignedPartitionContext) + } catch { + case e: Throwable => + error(s"Error completing reassignment of partition $tp", e) + // remove the partition from the admin path to unblock the admin client + removePartitionFromReassignedPartitions(tp) } - case None => + } + } else { error(s"Ignoring request to reassign partition $tp that doesn't exist.") removePartitionFromReassignedPartitions(tp) } @@ -643,8 +643,10 @@ class KafkaController(val config: KafkaConfig, zkClient: KafkaZkClient, time: Ti controllerContext.liveBrokers = zkClient.getAllBrokersInCluster.toSet controllerContext.allTopics = zkClient.getAllTopicsInCluster.toSet registerPartitionModificationsHandlers(controllerContext.allTopics.toSeq) - controllerContext.partitionReplicaAssignment = mutable.Map.empty ++ zkClient.getReplicaAssignmentForTopics(controllerContext.allTopics.toSet) - controllerContext.partitionLeadershipInfo = new mutable.HashMap[TopicPartition, LeaderIsrAndControllerEpoch] + zkClient.getReplicaAssignmentForTopics(controllerContext.allTopics.toSet).foreach { + case (topicPartition, assignedReplicas) => controllerContext.updatePartitionReplicaAssignment(topicPartition, assignedReplicas) + } + controllerContext.partitionLeadershipInfo.clear() controllerContext.shuttingDownBrokerIds = mutable.Set.empty[Int] // register broker modifications handlers registerBrokerModificationsHandler(controllerContext.liveBrokers.map(_.id)) @@ -662,10 +664,10 @@ class KafkaController(val config: KafkaConfig, zkClient: KafkaZkClient, time: Ti val partitionsUndergoingPreferredReplicaElection = zkClient.getPreferredReplicaElection // check if they are already completed or topic was deleted val partitionsThatCompletedPreferredReplicaElection = partitionsUndergoingPreferredReplicaElection.filter { partition => - val replicasOpt = controllerContext.partitionReplicaAssignment.get(partition) - val topicDeleted = replicasOpt.isEmpty + val replicas = controllerContext.partitionReplicaAssignment(partition) + val topicDeleted = replicas.isEmpty val successful = - if (!topicDeleted) controllerContext.partitionLeadershipInfo(partition).leaderAndIsr.leader == replicasOpt.get.head else false + if (!topicDeleted) controllerContext.partitionLeadershipInfo(partition).leaderAndIsr.leader == replicas.head else false successful || topicDeleted } val pendingPreferredReplicaElectionsIgnoringTopicDeletion = partitionsUndergoingPreferredReplicaElection -- partitionsThatCompletedPreferredReplicaElection @@ -678,21 +680,6 @@ class KafkaController(val config: KafkaConfig, zkClient: KafkaZkClient, time: Ti pendingPreferredReplicaElections } - private def resetControllerContext(): Unit = { - if (controllerContext.controllerChannelManager != null) { - controllerContext.controllerChannelManager.shutdown() - controllerContext.controllerChannelManager = null - } - controllerContext.shuttingDownBrokerIds.clear() - controllerContext.epoch = 0 - controllerContext.epochZkVersion = 0 - controllerContext.allTopics = Set.empty - controllerContext.partitionReplicaAssignment.clear() - controllerContext.partitionLeadershipInfo.clear() - controllerContext.partitionsBeingReassigned.clear() - controllerContext.liveBrokers = Set.empty - } - private def initializePartitionReassignment() { // read the partitions being reassigned from zookeeper path /admin/reassign_partitions val partitionsBeingReassigned = zkClient.getPartitionReassignment @@ -706,9 +693,10 @@ class KafkaController(val config: KafkaConfig, zkClient: KafkaZkClient, time: Ti private def fetchTopicDeletionsInProgress(): (Set[String], Set[String]) = { val topicsToBeDeleted = zkClient.getTopicDeletions.toSet - val topicsWithOfflineReplicas = controllerContext.partitionReplicaAssignment.filter { case (partition, replicas) => - replicas.exists(r => !controllerContext.isReplicaOnline(r, partition)) - }.keySet.map(_.topic) + val topicsWithOfflineReplicas = controllerContext.allTopics.filter { topic => { + val replicasForTopic = controllerContext.replicasForTopic(topic) + replicasForTopic.exists(r => !controllerContext.isReplicaOnline(r.replica, r.topicPartition)) + }} val topicsForWhichPartitionReassignmentIsInProgress = controllerContext.partitionsBeingReassigned.keySet.map(_.topic) val topicsIneligibleForDeletion = topicsWithOfflineReplicas | topicsForWhichPartitionReassignmentIsInProgress info(s"List of topics to be deleted: ${topicsToBeDeleted.mkString(",")}") @@ -722,7 +710,7 @@ class KafkaController(val config: KafkaConfig, zkClient: KafkaZkClient, time: Ti controllerContext.controllerChannelManager.startup() } - private def updateLeaderAndIsrCache(partitions: Seq[TopicPartition] = controllerContext.partitionReplicaAssignment.keys.toSeq) { + private def updateLeaderAndIsrCache(partitions: Seq[TopicPartition] = controllerContext.allPartitions.toSeq) { val leaderIsrAndControllerEpochs = zkClient.getTopicPartitionStates(partitions) leaderIsrAndControllerEpochs.foreach { case (partition, leaderIsrAndControllerEpoch) => controllerContext.partitionLeadershipInfo.put(partition, leaderIsrAndControllerEpoch) @@ -742,7 +730,7 @@ class KafkaController(val config: KafkaConfig, zkClient: KafkaZkClient, time: Ti // change the assigned replica list to just the reassigned replicas in the cache so it gets sent out on the LeaderAndIsr // request to the current or new leader. This will prevent it from adding the old replicas to the ISR val oldAndNewReplicas = controllerContext.partitionReplicaAssignment(topicPartition) - controllerContext.partitionReplicaAssignment.put(topicPartition, reassignedReplicas) + controllerContext.updatePartitionReplicaAssignment(topicPartition, reassignedReplicas) if (!reassignedPartitionContext.newReplicas.contains(currentLeader)) { info(s"Leader $currentLeader for partition $topicPartition being reassigned, " + s"is not in the new list of replicas ${reassignedReplicas.mkString(",")}. Re-electing leader") @@ -778,14 +766,13 @@ class KafkaController(val config: KafkaConfig, zkClient: KafkaZkClient, time: Ti private def updateAssignedReplicasForPartition(partition: TopicPartition, replicas: Seq[Int]) { - val partitionsAndReplicasForThisTopic = controllerContext.partitionReplicaAssignment.filter(_._1.topic == partition.topic) - partitionsAndReplicasForThisTopic.put(partition, replicas) - val setDataResponse = zkClient.setTopicAssignmentRaw(partition.topic, partitionsAndReplicasForThisTopic.toMap) + controllerContext.updatePartitionReplicaAssignment(partition, replicas) + val setDataResponse = zkClient.setTopicAssignmentRaw(partition.topic, controllerContext.partitionReplicaAssignmentForTopic(partition.topic)) setDataResponse.resultCode match { case Code.OK => info(s"Updated assigned replicas for partition $partition being reassigned to ${replicas.mkString(",")}") // update the assigned replica list after a successful zookeeper write - controllerContext.partitionReplicaAssignment.put(partition, replicas) + controllerContext.updatePartitionReplicaAssignment(partition, replicas) case Code.NONODE => throw new IllegalStateException(s"Topic ${partition.topic} doesn't exist") case _ => throw new KafkaException(setDataResponse.resultException.get) } @@ -971,9 +958,12 @@ class KafkaController(val config: KafkaConfig, zkClient: KafkaZkClient, time: Ti private def checkAndTriggerAutoLeaderRebalance(): Unit = { trace("Checking need to trigger auto leader balancing") val preferredReplicasForTopicsByBrokers: Map[Int, Map[TopicPartition, Seq[Int]]] = - controllerContext.partitionReplicaAssignment.filterNot { case (tp, _) => - topicDeletionManager.isTopicQueuedUpForDeletion(tp.topic) - }.groupBy { case (_, assignedReplicas) => assignedReplicas.head } + controllerContext.allPartitions.filterNot { + tp => topicDeletionManager.isTopicQueuedUpForDeletion(tp.topic) + }.map { tp => + (tp, controllerContext.partitionReplicaAssignment(tp) ) + }.toMap.groupBy { case (_, assignedReplicas) => assignedReplicas.head } + debug(s"Preferred replicas by broker $preferredReplicasForTopicsByBrokers") // for each broker, check if a preferred replica election needs to be triggered @@ -1155,7 +1145,8 @@ class KafkaController(val config: KafkaConfig, zkClient: KafkaZkClient, time: Ti if (!isActive) { 0 } else { - controllerContext.partitionReplicaAssignment.count { case (topicPartition, replicas) => + controllerContext.allPartitions.count { topicPartition => + val replicas = controllerContext.partitionReplicaAssignment(topicPartition) val preferredReplica = replicas.head val leadershipInfo = controllerContext.partitionLeadershipInfo.get(topicPartition) leadershipInfo.map(_.leaderAndIsr.leader != preferredReplica).getOrElse(false) && @@ -1273,9 +1264,10 @@ class KafkaController(val config: KafkaConfig, zkClient: KafkaZkClient, time: Ti registerPartitionModificationsHandlers(newTopics.toSeq) val addedPartitionReplicaAssignment = zkClient.getReplicaAssignmentForTopics(newTopics) - controllerContext.partitionReplicaAssignment = controllerContext.partitionReplicaAssignment.filter(p => - !deletedTopics.contains(p._1.topic)) - controllerContext.partitionReplicaAssignment ++= addedPartitionReplicaAssignment + deletedTopics.foreach(controllerContext.removeTopic) + addedPartitionReplicaAssignment.foreach { + case (topicAndPartition, newReplicas) => controllerContext.updatePartitionReplicaAssignment(topicAndPartition, newReplicas) + } info(s"New topics: [$newTopics], deleted topics: [$deletedTopics], new partition replica assignment " + s"[$addedPartitionReplicaAssignment]") if (addedPartitionReplicaAssignment.nonEmpty) @@ -1315,8 +1307,9 @@ class KafkaController(val config: KafkaConfig, zkClient: KafkaZkClient, time: Ti override def process(): Unit = { if (!isActive) return val partitionReplicaAssignment = zkClient.getReplicaAssignmentForTopics(immutable.Set(topic)) - val partitionsToBeAdded = partitionReplicaAssignment.filter(p => - !controllerContext.partitionReplicaAssignment.contains(p._1)) + val partitionsToBeAdded = partitionReplicaAssignment.filter { case (topicPartition, _) => + controllerContext.partitionReplicaAssignment(topicPartition).isEmpty + } if (topicDeletionManager.isTopicQueuedUpForDeletion(topic)) if (partitionsToBeAdded.nonEmpty) { warn("Skipping adding partitions %s for topic %s since it is currently being deleted" @@ -1330,7 +1323,9 @@ class KafkaController(val config: KafkaConfig, zkClient: KafkaZkClient, time: Ti else { if (partitionsToBeAdded.nonEmpty) { info(s"New partitions to be added $partitionsToBeAdded") - controllerContext.partitionReplicaAssignment ++= partitionsToBeAdded + partitionsToBeAdded.foreach { case (topicPartition, assignedReplicas) => + controllerContext.updatePartitionReplicaAssignment(topicPartition, assignedReplicas) + } onNewPartitionCreation(partitionsToBeAdded.keySet) } } diff --git a/core/src/main/scala/kafka/controller/PartitionStateMachine.scala b/core/src/main/scala/kafka/controller/PartitionStateMachine.scala index 2e27272..74bc59f 100755 --- a/core/src/main/scala/kafka/controller/PartitionStateMachine.scala +++ b/core/src/main/scala/kafka/controller/PartitionStateMachine.scala @@ -76,7 +76,7 @@ class PartitionStateMachine(config: KafkaConfig, * zookeeper */ private def initializePartitionState() { - for (topicPartition <- controllerContext.partitionReplicaAssignment.keys) { + for (topicPartition <- controllerContext.allPartitions) { // check if leader and isr path exists for partition. If not, then it is in NEW state controllerContext.partitionLeadershipInfo.get(topicPartition) match { case Some(currentLeaderIsrAndEpoch) => diff --git a/core/src/main/scala/kafka/controller/ReplicaStateMachine.scala b/core/src/main/scala/kafka/controller/ReplicaStateMachine.scala index 85af764..a2d04e6 100644 --- a/core/src/main/scala/kafka/controller/ReplicaStateMachine.scala +++ b/core/src/main/scala/kafka/controller/ReplicaStateMachine.scala @@ -80,7 +80,8 @@ class ReplicaStateMachine(config: KafkaConfig, * in zookeeper */ private def initializeReplicaState() { - controllerContext.partitionReplicaAssignment.foreach { case (partition, replicas) => + controllerContext.allPartitions.foreach { partition => + val replicas = controllerContext.partitionReplicaAssignment(partition) replicas.foreach { replicaId => val partitionAndReplica = PartitionAndReplica(partition, replicaId) if (controllerContext.isReplicaOnline(replicaId, partition)) @@ -181,7 +182,7 @@ class ReplicaStateMachine(config: KafkaConfig, case NewReplica => val assignment = controllerContext.partitionReplicaAssignment(partition) if (!assignment.contains(replicaId)) { - controllerContext.partitionReplicaAssignment.put(partition, assignment :+ replicaId) + controllerContext.updatePartitionReplicaAssignment(partition, assignment :+ replicaId) } case _ => controllerContext.partitionLeadershipInfo.get(partition) match { @@ -237,7 +238,7 @@ class ReplicaStateMachine(config: KafkaConfig, case NonExistentReplica => validReplicas.foreach { replica => val currentAssignedReplicas = controllerContext.partitionReplicaAssignment(replica.topicPartition) - controllerContext.partitionReplicaAssignment.put(replica.topicPartition, currentAssignedReplicas.filterNot(_ == replica.replica)) + controllerContext.updatePartitionReplicaAssignment(replica.topicPartition, currentAssignedReplicas.filterNot(_ == replica.replica)) logSuccessfulTransition(replicaId, replica.topicPartition, replicaState(replica), NonExistentReplica) replicaState.remove(replica) } diff --git a/core/src/main/scala/kafka/controller/TopicDeletionManager.scala b/core/src/main/scala/kafka/controller/TopicDeletionManager.scala index b1d8394..6e14551 100755 --- a/core/src/main/scala/kafka/controller/TopicDeletionManager.scala +++ b/core/src/main/scala/kafka/controller/TopicDeletionManager.scala @@ -255,9 +255,8 @@ class TopicDeletionManager(controller: KafkaController, // send update metadata so that brokers stop serving data for topics to be deleted val partitions = topics.flatMap(controllerContext.partitionsForTopic) controller.sendUpdateMetadataRequest(controllerContext.liveOrShuttingDownBrokerIds.toSeq, partitions) - val partitionReplicaAssignmentByTopic = controllerContext.partitionReplicaAssignment.groupBy(p => p._1.topic) topics.foreach { topic => - onPartitionDeletion(partitionReplicaAssignmentByTopic(topic).keySet) + onPartitionDeletion(controllerContext.partitionsForTopic(topic)) } } diff --git a/core/src/test/scala/unit/kafka/controller/PartitionStateMachineTest.scala b/core/src/test/scala/unit/kafka/controller/PartitionStateMachineTest.scala index 32e0d43..52f4599 100644 --- a/core/src/test/scala/unit/kafka/controller/PartitionStateMachineTest.scala +++ b/core/src/test/scala/unit/kafka/controller/PartitionStateMachineTest.scala @@ -80,7 +80,7 @@ class PartitionStateMachineTest extends JUnitSuite { @Test def testNewPartitionToOnlinePartitionTransition(): Unit = { controllerContext.liveBrokers = Set(TestUtils.createBroker(brokerId, "host", 0)) - controllerContext.partitionReplicaAssignment.put(partition, Seq(brokerId)) + controllerContext.updatePartitionReplicaAssignment(partition, Seq(brokerId)) partitionState.put(partition, NewPartition) val leaderIsrAndControllerEpoch = LeaderIsrAndControllerEpoch(LeaderAndIsr(brokerId, List(brokerId)), controllerEpoch) EasyMock.expect(mockControllerBrokerRequestBatch.newBatch()) @@ -98,7 +98,7 @@ class PartitionStateMachineTest extends JUnitSuite { @Test def testNewPartitionToOnlinePartitionTransitionZkUtilsExceptionFromCreateStates(): Unit = { controllerContext.liveBrokers = Set(TestUtils.createBroker(brokerId, "host", 0)) - controllerContext.partitionReplicaAssignment.put(partition, Seq(brokerId)) + controllerContext.updatePartitionReplicaAssignment(partition, Seq(brokerId)) partitionState.put(partition, NewPartition) val leaderIsrAndControllerEpoch = LeaderIsrAndControllerEpoch(LeaderAndIsr(brokerId, List(brokerId)), controllerEpoch) EasyMock.expect(mockControllerBrokerRequestBatch.newBatch()) @@ -114,7 +114,7 @@ class PartitionStateMachineTest extends JUnitSuite { @Test def testNewPartitionToOnlinePartitionTransitionErrorCodeFromCreateStates(): Unit = { controllerContext.liveBrokers = Set(TestUtils.createBroker(brokerId, "host", 0)) - controllerContext.partitionReplicaAssignment.put(partition, Seq(brokerId)) + controllerContext.updatePartitionReplicaAssignment(partition, Seq(brokerId)) partitionState.put(partition, NewPartition) val leaderIsrAndControllerEpoch = LeaderIsrAndControllerEpoch(LeaderAndIsr(brokerId, List(brokerId)), controllerEpoch) EasyMock.expect(mockControllerBrokerRequestBatch.newBatch()) @@ -144,7 +144,7 @@ class PartitionStateMachineTest extends JUnitSuite { @Test def testOnlinePartitionToOnlineTransition(): Unit = { controllerContext.liveBrokers = Set(TestUtils.createBroker(brokerId, "host", 0)) - controllerContext.partitionReplicaAssignment.put(partition, Seq(brokerId)) + controllerContext.updatePartitionReplicaAssignment(partition, Seq(brokerId)) partitionState.put(partition, OnlinePartition) val leaderAndIsr = LeaderAndIsr(brokerId, List(brokerId)) val leaderIsrAndControllerEpoch = LeaderIsrAndControllerEpoch(leaderAndIsr, controllerEpoch) @@ -175,7 +175,7 @@ class PartitionStateMachineTest extends JUnitSuite { val otherBrokerId = brokerId + 1 controllerContext.liveBrokers = Set(TestUtils.createBroker(brokerId, "host", 0), TestUtils.createBroker(otherBrokerId, "host", 0)) controllerContext.shuttingDownBrokerIds.add(brokerId) - controllerContext.partitionReplicaAssignment.put(partition, Seq(brokerId, otherBrokerId)) + controllerContext.updatePartitionReplicaAssignment(partition, Seq(brokerId, otherBrokerId)) partitionState.put(partition, OnlinePartition) val leaderAndIsr = LeaderAndIsr(brokerId, List(brokerId, otherBrokerId)) val leaderIsrAndControllerEpoch = LeaderIsrAndControllerEpoch(leaderAndIsr, controllerEpoch) @@ -226,7 +226,7 @@ class PartitionStateMachineTest extends JUnitSuite { @Test def testOfflinePartitionToOnlinePartitionTransition(): Unit = { controllerContext.liveBrokers = Set(TestUtils.createBroker(brokerId, "host", 0)) - controllerContext.partitionReplicaAssignment.put(partition, Seq(brokerId)) + controllerContext.updatePartitionReplicaAssignment(partition, Seq(brokerId)) partitionState.put(partition, OfflinePartition) val leaderAndIsr = LeaderAndIsr(LeaderAndIsr.NoLeader, List(brokerId)) val leaderIsrAndControllerEpoch = LeaderIsrAndControllerEpoch(leaderAndIsr, controllerEpoch) @@ -257,7 +257,7 @@ class PartitionStateMachineTest extends JUnitSuite { @Test def testOfflinePartitionToOnlinePartitionTransitionZkUtilsExceptionFromStateLookup(): Unit = { controllerContext.liveBrokers = Set(TestUtils.createBroker(brokerId, "host", 0)) - controllerContext.partitionReplicaAssignment.put(partition, Seq(brokerId)) + controllerContext.updatePartitionReplicaAssignment(partition, Seq(brokerId)) partitionState.put(partition, OfflinePartition) val leaderAndIsr = LeaderAndIsr(LeaderAndIsr.NoLeader, List(brokerId)) val leaderIsrAndControllerEpoch = LeaderIsrAndControllerEpoch(leaderAndIsr, controllerEpoch) @@ -278,7 +278,7 @@ class PartitionStateMachineTest extends JUnitSuite { @Test def testOfflinePartitionToOnlinePartitionTransitionErrorCodeFromStateLookup(): Unit = { controllerContext.liveBrokers = Set(TestUtils.createBroker(brokerId, "host", 0)) - controllerContext.partitionReplicaAssignment.put(partition, Seq(brokerId)) + controllerContext.updatePartitionReplicaAssignment(partition, Seq(brokerId)) partitionState.put(partition, OfflinePartition) val leaderAndIsr = LeaderAndIsr(LeaderAndIsr.NoLeader, List(brokerId)) val leaderIsrAndControllerEpoch = LeaderIsrAndControllerEpoch(leaderAndIsr, controllerEpoch) diff --git a/core/src/test/scala/unit/kafka/controller/ReplicaStateMachineTest.scala b/core/src/test/scala/unit/kafka/controller/ReplicaStateMachineTest.scala index 4d38aac..6a961a5 100644 --- a/core/src/test/scala/unit/kafka/controller/ReplicaStateMachineTest.scala +++ b/core/src/test/scala/unit/kafka/controller/ReplicaStateMachineTest.scala @@ -104,7 +104,7 @@ class ReplicaStateMachineTest extends JUnitSuite { @Test def testNewReplicaToOnlineReplicaTransition(): Unit = { replicaState.put(replica, NewReplica) - controllerContext.partitionReplicaAssignment.put(partition, Seq(brokerId)) + controllerContext.updatePartitionReplicaAssignment(partition, Seq(brokerId)) replicaStateMachine.handleStateChanges(replicas, OnlineReplica) assertEquals(OnlineReplica, replicaState(replica)) } @@ -150,7 +150,7 @@ class ReplicaStateMachineTest extends JUnitSuite { @Test def testOnlineReplicaToOnlineReplicaTransition(): Unit = { replicaState.put(replica, OnlineReplica) - controllerContext.partitionReplicaAssignment.put(partition, Seq(brokerId)) + controllerContext.updatePartitionReplicaAssignment(partition, Seq(brokerId)) val leaderIsrAndControllerEpoch = LeaderIsrAndControllerEpoch(LeaderAndIsr(brokerId, List(brokerId)), controllerEpoch) controllerContext.partitionLeadershipInfo.put(partition, leaderIsrAndControllerEpoch) EasyMock.expect(mockControllerBrokerRequestBatch.newBatch()) @@ -168,7 +168,7 @@ class ReplicaStateMachineTest extends JUnitSuite { val otherBrokerId = brokerId + 1 val replicaIds = List(brokerId, otherBrokerId) replicaState.put(replica, OnlineReplica) - controllerContext.partitionReplicaAssignment.put(partition, replicaIds) + controllerContext.updatePartitionReplicaAssignment(partition, replicaIds) val leaderAndIsr = LeaderAndIsr(brokerId, replicaIds) val leaderIsrAndControllerEpoch = LeaderIsrAndControllerEpoch(leaderAndIsr, controllerEpoch) controllerContext.partitionLeadershipInfo.put(partition, leaderIsrAndControllerEpoch) @@ -225,7 +225,7 @@ class ReplicaStateMachineTest extends JUnitSuite { @Test def testOfflineReplicaToOnlineReplicaTransition(): Unit = { replicaState.put(replica, OfflineReplica) - controllerContext.partitionReplicaAssignment.put(partition, Seq(brokerId)) + controllerContext.updatePartitionReplicaAssignment(partition, Seq(brokerId)) val leaderIsrAndControllerEpoch = LeaderIsrAndControllerEpoch(LeaderAndIsr(brokerId, List(brokerId)), controllerEpoch) controllerContext.partitionLeadershipInfo.put(partition, leaderIsrAndControllerEpoch) EasyMock.expect(mockControllerBrokerRequestBatch.newBatch()) @@ -299,7 +299,7 @@ class ReplicaStateMachineTest extends JUnitSuite { @Test def testReplicaDeletionSuccessfulToNonexistentReplicaTransition(): Unit = { replicaState.put(replica, ReplicaDeletionSuccessful) - controllerContext.partitionReplicaAssignment.put(partition, Seq(brokerId)) + controllerContext.updatePartitionReplicaAssignment(partition, Seq(brokerId)) replicaStateMachine.handleStateChanges(replicas, NonExistentReplica) assertEquals(Seq.empty, controllerContext.partitionReplicaAssignment(partition)) assertEquals(None, replicaState.get(replica)) @@ -343,7 +343,7 @@ class ReplicaStateMachineTest extends JUnitSuite { @Test def testReplicaDeletionIneligibleToOnlineReplicaTransition(): Unit = { replicaState.put(replica, ReplicaDeletionIneligible) - controllerContext.partitionReplicaAssignment.put(partition, Seq(brokerId)) + controllerContext.updatePartitionReplicaAssignment(partition, Seq(brokerId)) val leaderIsrAndControllerEpoch = LeaderIsrAndControllerEpoch(LeaderAndIsr(brokerId, List(brokerId)), controllerEpoch) controllerContext.partitionLeadershipInfo.put(partition, leaderIsrAndControllerEpoch) EasyMock.expect(mockControllerBrokerRequestBatch.newBatch()) -- To stop receiving notification emails like this one, please contact jun...@apache.org.