Updated Branches: refs/heads/0.8 2f4bfc645 -> fd94251d8
ConsumerOffsetChecker does not work with 0.8; kafka-685; patched by Maxime Brugidou; reviewed by Jun Rao Project: http://git-wip-us.apache.org/repos/asf/kafka/repo Commit: http://git-wip-us.apache.org/repos/asf/kafka/commit/fd94251d Tree: http://git-wip-us.apache.org/repos/asf/kafka/tree/fd94251d Diff: http://git-wip-us.apache.org/repos/asf/kafka/diff/fd94251d Branch: refs/heads/0.8 Commit: fd94251d89180cdb6cf815cf2d6f568e15b85c59 Parents: 2f4bfc6 Author: Jun Rao <jun...@gmail.com> Authored: Tue Jan 8 13:43:53 2013 -0800 Committer: Jun Rao <jun...@gmail.com> Committed: Tue Jan 8 13:43:53 2013 -0800 ---------------------------------------------------------------------- .../scala/kafka/tools/ConsumerOffsetChecker.scala | 65 +++++++-------- 1 files changed, 31 insertions(+), 34 deletions(-) ---------------------------------------------------------------------- http://git-wip-us.apache.org/repos/asf/kafka/blob/fd94251d/core/src/main/scala/kafka/tools/ConsumerOffsetChecker.scala ---------------------------------------------------------------------- diff --git a/core/src/main/scala/kafka/tools/ConsumerOffsetChecker.scala b/core/src/main/scala/kafka/tools/ConsumerOffsetChecker.scala index db9acc9..3161435 100644 --- a/core/src/main/scala/kafka/tools/ConsumerOffsetChecker.scala +++ b/core/src/main/scala/kafka/tools/ConsumerOffsetChecker.scala @@ -29,66 +29,59 @@ import scala.collection._ object ConsumerOffsetChecker extends Logging { - private val consumerMap: mutable.Map[String, Option[SimpleConsumer]] = mutable.Map() + private val consumerMap: mutable.Map[Int, Option[SimpleConsumer]] = mutable.Map() - private val BidPidPattern = """(\d+)-(\d+)""".r + private val BrokerIpPattern = """^([^:]+):(\d+).*$""".r + // e.g., 127.0.0.1:9092:9999 (with JMX port) - private val BrokerIpPattern = """.*:([^:]+):(\d+$)""".r - // e.g., 127.0.0.1-1315436360737:127.0.0.1:9092 - // e.g., host.domain.com-1315436360737:host.domain.com:9092 - - private def getConsumer(zkClient: ZkClient, bid: String): Option[SimpleConsumer] = { + private def getConsumer(zkClient: ZkClient, bid: Int): Option[SimpleConsumer] = { val brokerInfo = ZkUtils.readDataMaybeNull(zkClient, "/brokers/ids/%s".format(bid))._1 val consumer = brokerInfo match { - case BrokerIpPattern(ip, port) => + case Some(BrokerIpPattern(ip, port)) => Some(new SimpleConsumer(ip, port.toInt, 10000, 100000, "ConsumerOffsetChecker")) case _ => - error("Could not parse broker info %s".format(brokerInfo)) + error("Could not parse broker info %s with regex %s".format(brokerInfo, BrokerIpPattern.toString())) None } consumer } private def processPartition(zkClient: ZkClient, - group: String, topic: String, bidPid: String) { + group: String, topic: String, pid: Int) { val offset = ZkUtils.readData(zkClient, "/consumers/%s/offsets/%s/%s". - format(group, topic, bidPid))._1.toLong + format(group, topic, pid))._1.toLong val owner = ZkUtils.readDataMaybeNull(zkClient, "/consumers/%s/owners/%s/%s". - format(group, topic, bidPid))._1 - println("%s,%s,%s (Group,Topic,BrokerId-PartitionId)".format(group, topic, bidPid)) - println("%20s%s".format("Owner = ", owner)) - println("%20s%d".format("Consumer offset = ", offset)) - println("%20s%,d (%,.2fG)".format("= ", offset, offset / math.pow(1024, 3))) - - bidPid match { - case BidPidPattern(bid, pid) => - val consumerOpt = consumerMap.getOrElseUpdate( - bid, getConsumer(zkClient, bid)) + format(group, topic, pid))._1 + + ZkUtils.getLeaderForPartition(zkClient, topic, pid) match { + case Some(bid) => + val consumerOpt = consumerMap.getOrElseUpdate(bid, getConsumer(zkClient, bid)) consumerOpt match { case Some(consumer) => - val topicAndPartition = TopicAndPartition(topic, pid.toInt) + val topicAndPartition = TopicAndPartition(topic, pid) val request = OffsetRequest(immutable.Map(topicAndPartition -> PartitionOffsetRequestInfo(OffsetRequest.LatestTime, 1))) val logSize = consumer.getOffsetsBefore(request).partitionErrorAndOffsets(topicAndPartition).offsets.head - println("%20s%d".format("Log size = ", logSize)) - println("%20s%,d (%,.2fG)".format("= ", logSize, logSize / math.pow(1024, 3))) val lag = logSize - offset - println("%20s%d".format("Consumer lag = ", lag)) - println("%20s%,d (%,.2fG)".format("= ", lag, lag / math.pow(1024, 3))) - println() + println("%-15s %-30s %-3s %-15s %-15s %-15s %s".format(group, topic, pid, offset, logSize, lag, + owner match {case Some(ownerStr) => ownerStr case None => "none"})) + consumer.close() case None => // ignore } - case _ => - error("Could not parse broker/partition pair %s".format(bidPid)) + case None => + error("No broker for partition %s - %s".format(topic, pid)) } } private def processTopic(zkClient: ZkClient, group: String, topic: String) { - val bidsPids = ZkUtils.getChildrenParentMayNotExist( - zkClient, "/consumers/%s/offsets/%s".format(group, topic)).toList - bidsPids.sorted.foreach { - bidPid => processPartition(zkClient, group, topic, bidPid) + val pidMap = ZkUtils.getPartitionsForTopics(zkClient, Seq(topic)) + pidMap.get(topic) match { + case Some(pids) => + pids.sorted.foreach { + pid => processPartition(zkClient, group, topic, pid) + } + case None => // ignore } } @@ -112,6 +105,7 @@ object ConsumerOffsetChecker extends Logging { withRequiredArg().ofType(classOf[String]) val groupOpt = parser.accepts("group", "Consumer group."). withRequiredArg().ofType(classOf[String]) + parser.accepts("broker-info", "Print broker info") parser.accepts("help", "Print this message.") val options = parser.parse(args : _*) @@ -147,11 +141,14 @@ object ConsumerOffsetChecker extends Logging { debug("zkConnect = %s; topics = %s; group = %s".format( zkConnect, topicList.toString(), group)) + println("%-15s %-30s %-3s %-15s %-15s %-15s %s".format("Group", "Topic", "Pid", "Offset", "logSize", "Lag", "Owner")) topicList.sorted.foreach { topic => processTopic(zkClient, group, topic) } - printBrokerInfo() + if (options.has("broker-info")) + printBrokerInfo(); + } finally { for (consumerOpt <- consumerMap.values) {