Repository: kafka Updated Branches: refs/heads/trunk c6f08b609 -> 780225506
KAFKA-1576: Make delete topic command a little more user-friendly. Patch from Gwen Shapira. Project: http://git-wip-us.apache.org/repos/asf/kafka/repo Commit: http://git-wip-us.apache.org/repos/asf/kafka/commit/78022550 Tree: http://git-wip-us.apache.org/repos/asf/kafka/tree/78022550 Diff: http://git-wip-us.apache.org/repos/asf/kafka/diff/78022550 Branch: refs/heads/trunk Commit: 780225506d89b587d3974615b8444ef2b62122d5 Parents: c6f08b6 Author: Jay Kreps <[email protected]> Authored: Mon Aug 18 11:44:42 2014 -0700 Committer: Jay Kreps <[email protected]> Committed: Mon Aug 18 11:44:42 2014 -0700 ---------------------------------------------------------------------- .../main/scala/kafka/admin/TopicCommand.scala | 23 ++++++++++++++++++-- 1 file changed, 21 insertions(+), 2 deletions(-) ---------------------------------------------------------------------- http://git-wip-us.apache.org/repos/asf/kafka/blob/78022550/core/src/main/scala/kafka/admin/TopicCommand.scala ---------------------------------------------------------------------- diff --git a/core/src/main/scala/kafka/admin/TopicCommand.scala b/core/src/main/scala/kafka/admin/TopicCommand.scala index 003a09c..b3f2e82 100644 --- a/core/src/main/scala/kafka/admin/TopicCommand.scala +++ b/core/src/main/scala/kafka/admin/TopicCommand.scala @@ -19,8 +19,10 @@ package kafka.admin import joptsimple._ import java.util.Properties +import kafka.admin.AdminOperationException import kafka.utils._ import org.I0Itec.zkclient.ZkClient +import org.I0Itec.zkclient.exception.ZkNodeExistsException import scala.collection._ import scala.collection.JavaConversions._ import kafka.cluster.Broker @@ -121,14 +123,31 @@ object TopicCommand { def listTopics(zkClient: ZkClient, opts: TopicCommandOptions) { val topics = getTopics(zkClient, opts) - for(topic <- topics) + for(topic <- topics) { + if (ZkUtils.pathExists(zkClient,ZkUtils.getDeleteTopicPath(topic))) { + println("%s - marked for deletion".format(topic)) + } else { println(topic) + } + } } def deleteTopic(zkClient: ZkClient, opts: TopicCommandOptions) { val topics = getTopics(zkClient, opts) + if (topics.length == 0) { + println("Topic %s does not exist".format(opts.options.valueOf(opts.topicOpt))) + } topics.foreach { topic => - ZkUtils.createPersistentPath(zkClient, ZkUtils.getDeleteTopicPath(topic)) + try { + ZkUtils.createPersistentPath(zkClient, ZkUtils.getDeleteTopicPath(topic)) + println("Topic %s is marked for deletion.".format(topic)) + println("Note: This will have no impact if delete.topic.enable is not set to true.") + } catch { + case e: ZkNodeExistsException => + println("Topic %s is already marked for deletion.".format(topic)) + case e2: Throwable => + throw new AdminOperationException("Error while deleting topic %s".format(topic)) + } } }
