[ https://issues.apache.org/jira/browse/KAFKA-6612?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel&focusedCommentId=16412336#comment-16412336 ]
ASF GitHub Bot commented on KAFKA-6612: --------------------------------------- becketqin closed pull request #4666: KAFKA-6612: Added logic to prevent increasing partition counts during topic deletion URL: https://github.com/apache/kafka/pull/4666 This is a PR merged from a forked repository. As GitHub hides the original diff on merge, it is displayed below for the sake of provenance: As this is a foreign pull request (from a fork), the diff is supplied below (as it won't show otherwise due to GitHub magic): diff --git a/core/src/main/scala/kafka/controller/KafkaController.scala b/core/src/main/scala/kafka/controller/KafkaController.scala index a8707ad887d..c24c5762709 100644 --- a/core/src/main/scala/kafka/controller/KafkaController.scala +++ b/core/src/main/scala/kafka/controller/KafkaController.scala @@ -1308,14 +1308,31 @@ class KafkaController(val config: KafkaConfig, zkClient: KafkaZkClient, time: Ti case class PartitionModifications(topic: String) extends ControllerEvent { override def state: ControllerState = ControllerState.TopicChange + def restorePartitionReplicaAssignment(topic: String, newPartitionReplicaAssignment : immutable.Map[TopicPartition, Seq[Int]]): Unit = { + info("Restoring the partition replica assignment for topic %s".format(topic)) + + val existingPartitions = zkClient.getChildren(TopicPartitionsZNode.path(topic)) + val existingPartitionReplicaAssignment = newPartitionReplicaAssignment.filter(p => + existingPartitions.contains(p._1.partition.toString)) + + zkClient.setTopicAssignment(topic, existingPartitionReplicaAssignment) + } + override def process(): Unit = { if (!isActive) return val partitionReplicaAssignment = zkClient.getReplicaAssignmentForTopics(immutable.Set(topic)) val partitionsToBeAdded = partitionReplicaAssignment.filter(p => !controllerContext.partitionReplicaAssignment.contains(p._1)) if (topicDeletionManager.isTopicQueuedUpForDeletion(topic)) - error(s"Skipping adding partitions ${partitionsToBeAdded.map(_._1.partition).mkString(",")} for topic $topic " + - "since it is currently being deleted") + if (partitionsToBeAdded.nonEmpty) { + warn("Skipping adding partitions %s for topic %s since it is currently being deleted" + .format(partitionsToBeAdded.map(_._1.partition).mkString(","), topic)) + + restorePartitionReplicaAssignment(topic, partitionReplicaAssignment) + } else { + // This can happen if existing partition replica assignment are restored to prevent increasing partition count during topic deletion + info("Ignoring partition change during topic deletion as no new partitions are added") + } else { if (partitionsToBeAdded.nonEmpty) { info(s"New partitions to be added $partitionsToBeAdded") diff --git a/core/src/test/scala/unit/kafka/admin/DeleteTopicTest.scala b/core/src/test/scala/unit/kafka/admin/DeleteTopicTest.scala index 897cc598123..ef455d4457e 100644 --- a/core/src/test/scala/unit/kafka/admin/DeleteTopicTest.scala +++ b/core/src/test/scala/unit/kafka/admin/DeleteTopicTest.scala @@ -24,7 +24,8 @@ import org.junit.Assert._ import org.junit.{After, Test} import java.util.Properties -import kafka.common.TopicAlreadyMarkedForDeletionException +import kafka.common.{TopicAlreadyMarkedForDeletionException, TopicAndPartition} +import kafka.controller.{OfflineReplica, PartitionAndReplica, ReplicaDeletionSuccessful} import org.apache.kafka.common.TopicPartition import org.apache.kafka.common.errors.UnknownTopicOrPartitionException @@ -145,6 +146,86 @@ class DeleteTopicTest extends ZooKeeperTestHarness { TestUtils.verifyTopicDeletion(zkClient, topic, 1, servers) } + private def getController() : (KafkaServer, Int) = { + val controllerId = zkClient.getControllerId.getOrElse(fail("Controller doesn't exist")) + val controller = servers.find(s => s.config.brokerId == controllerId).get + (controller, controllerId) + } + + private def ensureControllerExists() = { + TestUtils.waitUntilTrue(() => { + try { + getController() + true + } catch { + case _: Throwable => false + } + }, "Controller should eventually exist") + } + + private def getAllReplicasFromAssignment(topic : String, assignment : Map[Int, Seq[Int]]) : Set[PartitionAndReplica] = { + assignment.flatMap { case (partition, replicas) => + replicas.map {r => new PartitionAndReplica(new TopicPartition(topic, partition), r)} + }.toSet + } + + @Test + def testIncreasePartitiovnCountDuringDeleteTopic() { + val expectedReplicaAssignment = Map(0 -> List(0, 1, 2)) + val topic = "test" + val topicPartition = new TopicPartition(topic, 0) + val brokerConfigs = TestUtils.createBrokerConfigs(4, zkConnect, false) + brokerConfigs.foreach(p => p.setProperty("delete.topic.enable", "true")) + // create brokers + val allServers = brokerConfigs.map(b => TestUtils.createServer(KafkaConfig.fromProps(b))) + this.servers = allServers + val servers = allServers.filter(s => expectedReplicaAssignment(0).contains(s.config.brokerId)) + // create the topic + adminZkClient.createOrUpdateTopicPartitionAssignmentPathInZK(topic, expectedReplicaAssignment) + // wait until replica log is created on every broker + TestUtils.waitUntilTrue(() => servers.forall(_.getLogManager().getLog(topicPartition).isDefined), + "Replicas for topic test not created.") + // shutdown a broker to make sure the following topic deletion will be suspended + val leaderIdOpt = zkClient.getLeaderForPartition(topicPartition) + assertTrue("Leader should exist for partition [test,0]", leaderIdOpt.isDefined) + val follower = servers.filter(s => s.config.brokerId != leaderIdOpt.get).last + follower.shutdown() + // start topic deletion + adminZkClient.deleteTopic(topic) + + // make sure deletion of all of the topic's replicas have been tried + ensureControllerExists() + val (controller, controllerId) = getController() + val allReplicasForTopic = getAllReplicasFromAssignment(topic, expectedReplicaAssignment) + TestUtils.waitUntilTrue(() => { + val replicasInDeletionSuccessful = controller.kafkaController.replicaStateMachine.replicasInState(topic, ReplicaDeletionSuccessful) + val offlineReplicas = controller.kafkaController.replicaStateMachine.replicasInState(topic, OfflineReplica) + allReplicasForTopic == (replicasInDeletionSuccessful union offlineReplicas) + }, s"Not all replicas for topic $topic are in states of either ReplicaDeletionSuccessful or OfflineReplica") + + // increase the partition count for topic + val topicCommandOptions = new TopicCommand.TopicCommandOptions(Array("--zookeeper", zkConnect, "--alter", "--topic", topic, "--partitions", "2")) + TopicCommand.alterTopic(zkClient, topicCommandOptions) + + // trigger a controller switch now + val previousControllerId = controllerId + + controller.shutdown() + + ensureControllerExists() + // wait until a new controller to show up + TestUtils.waitUntilTrue(() => { + val (newController, newControllerId) = getController() + newControllerId != previousControllerId + }, "The new controller should not have the failed controller id") + + // bring back the failed brokers + follower.startup() + controller.startup() + TestUtils.verifyTopicDeletion(zkClient, topic, 2, servers) + } + + @Test def testDeleteTopicDuringAddPartition() { val topic = "test" ---------------------------------------------------------------- This is an automated message from the Apache Git Service. To respond to the message, please log on GitHub and use the URL above to go to the specific comment. For queries about this service, please contact Infrastructure at: us...@infra.apache.org > Added logic to prevent increasing partition counts during topic deletion > ------------------------------------------------------------------------ > > Key: KAFKA-6612 > URL: https://issues.apache.org/jira/browse/KAFKA-6612 > Project: Kafka > Issue Type: Improvement > Reporter: Lucas Wang > Assignee: Lucas Wang > Priority: Major > > Problem: trying to increase the partition count of a topic while the topic > deletion is in progress can cause the topic to be never deleted. > In the current code base, if a topic deletion is still in progress and the > partition count is increased, > the new partition and its replica assignment be created on zookeeper as data > of the path /brokers/topics/<topic>. > Upon detecting the change, the controller sees the topic is being deleted, > and therefore ignores the partition change. Therefore the zk path > /brokers/topics/<topic>/partitions/<partition id> will NOT be created. > If a controller switch happens next, the added partition will be detected by > the new controller and stored in the > controllerContext.partitionReplicaAssignment. The new controller then tries > to delete the topic by first transitioning its replicas to OfflineReplica. > However the transition to OfflineReplica state will NOT succeed since there > is no leader for the partition. Since the only state change path for a > replica to be successfully deleted is OfflineReplica -> > ReplicaDeletionStarted -> ReplicaDeletionSuccessful, not being able to enter > the OfflineReplica state means the replica can never be successfully deleted. -- This message was sent by Atlassian JIRA (v7.6.3#76005)