Repository: kafka Updated Branches: refs/heads/0.11.0 b9f7aa52d -> 45b326d93
KAFKA-4291; TopicCommand --describe should show topics marked for deletion Developed with edoardocomar Author: Mickael Maison <[email protected]> Reviewers: Vahid Hashemian <[email protected]>, Ismael Juma <[email protected]> Closes #2011 from mimaison/KAFKA-4291 (cherry picked from commit a598c4d26fb06acb455e14e84468306dfa6e1c8b) Signed-off-by: Ismael Juma <[email protected]> Project: http://git-wip-us.apache.org/repos/asf/kafka/repo Commit: http://git-wip-us.apache.org/repos/asf/kafka/commit/45b326d9 Tree: http://git-wip-us.apache.org/repos/asf/kafka/tree/45b326d9 Diff: http://git-wip-us.apache.org/repos/asf/kafka/diff/45b326d9 Branch: refs/heads/0.11.0 Commit: 45b326d9365c767378018357ca5993edceffdace Parents: b9f7aa5 Author: Mickael Maison <[email protected]> Authored: Sat Jun 3 10:07:56 2017 +0100 Committer: Ismael Juma <[email protected]> Committed: Sat Jun 3 10:31:15 2017 +0100 ---------------------------------------------------------------------- .../main/scala/kafka/admin/TopicCommand.scala | 18 +++++++--- core/src/main/scala/kafka/utils/ZkUtils.scala | 4 +++ .../unit/kafka/admin/DeleteTopicTest.scala | 6 ++-- .../unit/kafka/admin/TopicCommandTest.scala | 37 ++++++++++++++++++++ .../test/scala/unit/kafka/utils/TestUtils.scala | 12 ++++++- 5 files changed, 68 insertions(+), 9 deletions(-) ---------------------------------------------------------------------- http://git-wip-us.apache.org/repos/asf/kafka/blob/45b326d9/core/src/main/scala/kafka/admin/TopicCommand.scala ---------------------------------------------------------------------- diff --git a/core/src/main/scala/kafka/admin/TopicCommand.scala b/core/src/main/scala/kafka/admin/TopicCommand.scala index efb9237..9e516b0 100755 --- a/core/src/main/scala/kafka/admin/TopicCommand.scala +++ b/core/src/main/scala/kafka/admin/TopicCommand.scala @@ -153,7 +153,7 @@ object TopicCommand extends Logging { def listTopics(zkUtils: ZkUtils, opts: TopicCommandOptions) { val topics = getTopics(zkUtils, opts) for(topic <- topics) { - if (zkUtils.pathExists(getDeleteTopicPath(topic))) { + if (zkUtils.isTopicMarkedForDeletion(topic)) { println("%s - marked for deletion".format(topic)) } else { println(topic) @@ -199,14 +199,17 @@ object TopicCommand extends Logging { case Some(topicPartitionAssignment) => val describeConfigs: Boolean = !reportUnavailablePartitions && !reportUnderReplicatedPartitions val describePartitions: Boolean = !reportOverriddenConfigs - val sortedPartitions = topicPartitionAssignment.toList.sortWith((m1, m2) => m1._1 < m2._1) + val sortedPartitions = topicPartitionAssignment.toSeq.sortBy(_._1) + val markedForDeletion = zkUtils.isTopicMarkedForDeletion(topic) if (describeConfigs) { val configs = AdminUtils.fetchEntityConfig(zkUtils, ConfigType.Topic, topic).asScala if (!reportOverriddenConfigs || configs.nonEmpty) { val numPartitions = topicPartitionAssignment.size val replicationFactor = topicPartitionAssignment.head._2.size - println("Topic:%s\tPartitionCount:%d\tReplicationFactor:%d\tConfigs:%s" - .format(topic, numPartitions, replicationFactor, configs.map(kv => kv._1 + "=" + kv._2).mkString(","))) + val configsAsString = configs.map { case (k, v) => s"$k=$v" }.mkString(",") + val markedForDeletionString = if (markedForDeletion) "\tMarkedForDeletion:true" else "" + println("Topic:%s\tPartitionCount:%d\tReplicationFactor:%d\tConfigs:%s%s" + .format(topic, numPartitions, replicationFactor, configsAsString, markedForDeletionString)) } } if (describePartitions) { @@ -216,11 +219,16 @@ object TopicCommand extends Logging { if ((!reportUnderReplicatedPartitions && !reportUnavailablePartitions) || (reportUnderReplicatedPartitions && inSyncReplicas.size < assignedReplicas.size) || (reportUnavailablePartitions && (leader.isEmpty || !liveBrokers.contains(leader.get)))) { + + val markedForDeletionString = + if (markedForDeletion && !describeConfigs) "\tMarkedForDeletion: true" else "" print("\tTopic: " + topic) print("\tPartition: " + partitionId) print("\tLeader: " + (if(leader.isDefined) leader.get else "none")) print("\tReplicas: " + assignedReplicas.mkString(",")) - println("\tIsr: " + inSyncReplicas.mkString(",")) + print("\tIsr: " + inSyncReplicas.mkString(",")) + print(markedForDeletionString) + println() } } } http://git-wip-us.apache.org/repos/asf/kafka/blob/45b326d9/core/src/main/scala/kafka/utils/ZkUtils.scala ---------------------------------------------------------------------- diff --git a/core/src/main/scala/kafka/utils/ZkUtils.scala b/core/src/main/scala/kafka/utils/ZkUtils.scala index 899b7c3..76b9569 100644 --- a/core/src/main/scala/kafka/utils/ZkUtils.scala +++ b/core/src/main/scala/kafka/utils/ZkUtils.scala @@ -692,6 +692,10 @@ class ZkUtils(val zkClient: ZkClient, zkClient.exists(path) } + def isTopicMarkedForDeletion(topic: String): Boolean = { + pathExists(getDeleteTopicPath(topic)) + } + def getCluster(): Cluster = { val cluster = new Cluster val nodes = getChildrenParentMayNotExist(BrokerIdsPath) http://git-wip-us.apache.org/repos/asf/kafka/blob/45b326d9/core/src/test/scala/unit/kafka/admin/DeleteTopicTest.scala ---------------------------------------------------------------------- diff --git a/core/src/test/scala/unit/kafka/admin/DeleteTopicTest.scala b/core/src/test/scala/unit/kafka/admin/DeleteTopicTest.scala index 15018f5..7df3693 100644 --- a/core/src/test/scala/unit/kafka/admin/DeleteTopicTest.scala +++ b/core/src/test/scala/unit/kafka/admin/DeleteTopicTest.scala @@ -66,7 +66,7 @@ class DeleteTopicTest extends ZooKeeperTestHarness { servers.filter(s => s.config.brokerId != follower.config.brokerId) .forall(_.getLogManager().getLog(topicPartition).isEmpty), "Replicas 0,1 have not deleted log.") // ensure topic deletion is halted - TestUtils.waitUntilTrue(() => zkUtils.pathExists(getDeleteTopicPath(topic)), + TestUtils.waitUntilTrue(() => zkUtils.isTopicMarkedForDeletion(topic), "Admin path /admin/delete_topic/test path deleted even when a follower replica is down") // restart follower replica follower.startup() @@ -90,7 +90,7 @@ class DeleteTopicTest extends ZooKeeperTestHarness { controller.shutdown() // ensure topic deletion is halted - TestUtils.waitUntilTrue(() => zkUtils.pathExists(getDeleteTopicPath(topic)), + TestUtils.waitUntilTrue(() => zkUtils.isTopicMarkedForDeletion(topic), "Admin path /admin/delete_topic/test path deleted even when a replica is down") controller.startup() @@ -310,7 +310,7 @@ class DeleteTopicTest extends ZooKeeperTestHarness { servers = createTestTopicAndCluster(topic, deleteTopicEnabled = false) // mark the topic for deletion AdminUtils.deleteTopic(zkUtils, "test") - TestUtils.waitUntilTrue(() => !zkUtils.pathExists(getDeleteTopicPath(topic)), + TestUtils.waitUntilTrue(() => !zkUtils.isTopicMarkedForDeletion(topic), "Admin path /admin/delete_topic/%s path not deleted even if deleteTopic is disabled".format(topic)) // verify that topic test is untouched assertTrue(servers.forall(_.getLogManager().getLog(topicPartition).isDefined)) http://git-wip-us.apache.org/repos/asf/kafka/blob/45b326d9/core/src/test/scala/unit/kafka/admin/TopicCommandTest.scala ---------------------------------------------------------------------- diff --git a/core/src/test/scala/unit/kafka/admin/TopicCommandTest.scala b/core/src/test/scala/unit/kafka/admin/TopicCommandTest.scala index ad6cfa5..e72a4e3 100644 --- a/core/src/test/scala/unit/kafka/admin/TopicCommandTest.scala +++ b/core/src/test/scala/unit/kafka/admin/TopicCommandTest.scala @@ -187,4 +187,41 @@ class TopicCommandTest extends ZooKeeperTestHarness with Logging with RackAwareT } checkReplicaDistribution(assignment, rackInfo, rackInfo.size, alteredNumPartitions, replicationFactor) } + + @Test + def testDescribeAndListTopicsMarkedForDeletion() { + val brokers = List(0) + val topic = "testtopic" + val markedForDeletionDescribe = "MarkedForDeletion" + val markedForDeletionList = "marked for deletion" + TestUtils.createBrokersInZk(zkUtils, brokers) + + val createOpts = new TopicCommandOptions(Array("--partitions", "1", "--replication-factor", "1", "--topic", topic)) + TopicCommand.createTopic(zkUtils, createOpts) + + // delete the broker first, so when we attempt to delete the topic it gets into "marked for deletion" + TestUtils.deleteBrokersInZk(zkUtils, brokers) + TopicCommand.deleteTopic(zkUtils, new TopicCommandOptions(Array("--topic", topic))) + + // Test describe topics + def describeTopicsWithConfig() { + TopicCommand.describeTopic(zkUtils, new TopicCommandOptions(Array("--describe"))) + } + val outputWithConfig = TestUtils.grabConsoleOutput(describeTopicsWithConfig) + assertTrue(outputWithConfig.contains(topic) && outputWithConfig.contains(markedForDeletionDescribe)) + + def describeTopicsNoConfig() { + TopicCommand.describeTopic(zkUtils, new TopicCommandOptions(Array("--describe", "--unavailable-partitions"))) + } + val outputNoConfig = TestUtils.grabConsoleOutput(describeTopicsNoConfig) + assertTrue(outputNoConfig.contains(topic) && outputNoConfig.contains(markedForDeletionDescribe)) + + // Test list topics + def listTopics() { + TopicCommand.listTopics(zkUtils, new TopicCommandOptions(Array("--list"))) + } + val output = TestUtils.grabConsoleOutput(listTopics) + assertTrue(output.contains(topic) && output.contains(markedForDeletionList)) + } + } http://git-wip-us.apache.org/repos/asf/kafka/blob/45b326d9/core/src/test/scala/unit/kafka/utils/TestUtils.scala ---------------------------------------------------------------------- diff --git a/core/src/test/scala/unit/kafka/utils/TestUtils.scala b/core/src/test/scala/unit/kafka/utils/TestUtils.scala index 572de9b..a0f4762 100755 --- a/core/src/test/scala/unit/kafka/utils/TestUtils.scala +++ b/core/src/test/scala/unit/kafka/utils/TestUtils.scala @@ -1120,7 +1120,7 @@ object TestUtils extends Logging { def verifyTopicDeletion(zkUtils: ZkUtils, topic: String, numPartitions: Int, servers: Seq[KafkaServer]) { val topicPartitions = (0 until numPartitions).map(new TopicPartition(topic, _)) // wait until admin path for delete topic is deleted, signaling completion of topic deletion - TestUtils.waitUntilTrue(() => !zkUtils.pathExists(getDeleteTopicPath(topic)), + TestUtils.waitUntilTrue(() => !zkUtils.isTopicMarkedForDeletion(topic), "Admin path /admin/delete_topic/%s path not deleted even after a replica is restarted".format(topic)) TestUtils.waitUntilTrue(() => !zkUtils.pathExists(getTopicPath(topic)), "Topic path /brokers/topics/%s not deleted after /admin/delete_topic/%s path is deleted".format(topic, topic)) @@ -1424,6 +1424,16 @@ object TestUtils extends Logging { } } + /** + * Capture the console output during the execution of the provided function. + */ + def grabConsoleOutput(f: => Unit) : String = { + val out = new ByteArrayOutputStream + try scala.Console.withOut(out)(f) + finally scala.Console.out.flush + out.toString + } + } class IntEncoder(props: VerifiableProperties = null) extends Encoder[Int] {
