KAFKA-921; Expose max lag and min fetch rate mbeans for consumers and replica fetchers; reviewed by Jun Rao.
Project: http://git-wip-us.apache.org/repos/asf/kafka/repo Commit: http://git-wip-us.apache.org/repos/asf/kafka/commit/4850519a Tree: http://git-wip-us.apache.org/repos/asf/kafka/tree/4850519a Diff: http://git-wip-us.apache.org/repos/asf/kafka/diff/4850519a Branch: refs/heads/trunk Commit: 4850519a2c32b3ee882eb79375ba77de8d0488f7 Parents: 1caae2c Author: Joel Koshy <jjko...@gmail.com> Authored: Fri May 31 10:43:28 2013 -0700 Committer: Joel Koshy <jjko...@gmail.com> Committed: Fri May 31 10:43:28 2013 -0700 ---------------------------------------------------------------------- .../kafka/consumer/ConsumerFetcherManager.scala | 3 +- .../kafka/server/AbstractFetcherManager.scala | 34 +++++++++++++++++++- .../kafka/server/AbstractFetcherThread.scala | 2 +- .../kafka/server/ReplicaFetcherManager.scala | 3 +- 4 files changed, 38 insertions(+), 4 deletions(-) ---------------------------------------------------------------------- http://git-wip-us.apache.org/repos/asf/kafka/blob/4850519a/core/src/main/scala/kafka/consumer/ConsumerFetcherManager.scala ---------------------------------------------------------------------- diff --git a/core/src/main/scala/kafka/consumer/ConsumerFetcherManager.scala b/core/src/main/scala/kafka/consumer/ConsumerFetcherManager.scala index 96bd886..3e497b9 100644 --- a/core/src/main/scala/kafka/consumer/ConsumerFetcherManager.scala +++ b/core/src/main/scala/kafka/consumer/ConsumerFetcherManager.scala @@ -38,7 +38,8 @@ import java.util.concurrent.atomic.AtomicInteger class ConsumerFetcherManager(private val consumerIdString: String, private val config: ConsumerConfig, private val zkClient : ZkClient) - extends AbstractFetcherManager("ConsumerFetcherManager-%d".format(SystemTime.milliseconds), 1) { + extends AbstractFetcherManager("ConsumerFetcherManager-%d".format(SystemTime.milliseconds), + config.groupId, 1) { private var partitionMap: immutable.Map[TopicAndPartition, PartitionTopicInfo] = null private var cluster: Cluster = null private val noLeaderPartitionSet = new mutable.HashSet[TopicAndPartition] http://git-wip-us.apache.org/repos/asf/kafka/blob/4850519a/core/src/main/scala/kafka/server/AbstractFetcherManager.scala ---------------------------------------------------------------------- diff --git a/core/src/main/scala/kafka/server/AbstractFetcherManager.scala b/core/src/main/scala/kafka/server/AbstractFetcherManager.scala index 4269219..15b7bd3 100644 --- a/core/src/main/scala/kafka/server/AbstractFetcherManager.scala +++ b/core/src/main/scala/kafka/server/AbstractFetcherManager.scala @@ -20,13 +20,45 @@ package kafka.server import scala.collection.mutable import kafka.utils.Logging import kafka.cluster.Broker +import kafka.metrics.KafkaMetricsGroup +import com.yammer.metrics.core.Gauge -abstract class AbstractFetcherManager(protected val name: String, numFetchers: Int = 1) extends Logging { +abstract class AbstractFetcherManager(protected val name: String, metricPrefix: String, numFetchers: Int = 1) + extends Logging with KafkaMetricsGroup { // map of (source brokerid, fetcher Id per source broker) => fetcher private val fetcherThreadMap = new mutable.HashMap[BrokerAndFetcherId, AbstractFetcherThread] private val mapLock = new Object this.logIdent = "[" + name + "] " + newGauge( + metricPrefix + "-MaxLag", + new Gauge[Long] { + // current max lag across all fetchers/topics/partitions + def value = fetcherThreadMap.foldLeft(0L)((curMaxAll, fetcherThreadMapEntry) => { + fetcherThreadMapEntry._2.fetcherLagStats.stats.foldLeft(0L)((curMaxThread, fetcherLagStatsEntry) => { + curMaxThread.max(fetcherLagStatsEntry._2.lag) + }).max(curMaxAll) + }) + } + ) + + newGauge( + metricPrefix + "-MinFetchRate", + { + new Gauge[Double] { + // current min fetch rate across all fetchers/topics/partitions + def value = { + val headRate: Double = + fetcherThreadMap.headOption.map(_._2.fetcherStats.requestRate.oneMinuteRate).getOrElse(0) + + fetcherThreadMap.foldLeft(headRate)((curMinAll, fetcherThreadMapEntry) => { + fetcherThreadMapEntry._2.fetcherStats.requestRate.oneMinuteRate.min(curMinAll) + }) + } + } + } + ) + private def getFetcherId(topic: String, partitionId: Int) : Int = { (topic.hashCode() + 31 * partitionId) % numFetchers } http://git-wip-us.apache.org/repos/asf/kafka/blob/4850519a/core/src/main/scala/kafka/server/AbstractFetcherThread.scala ---------------------------------------------------------------------- diff --git a/core/src/main/scala/kafka/server/AbstractFetcherThread.scala b/core/src/main/scala/kafka/server/AbstractFetcherThread.scala index 48100f4..b7cbb98 100644 --- a/core/src/main/scala/kafka/server/AbstractFetcherThread.scala +++ b/core/src/main/scala/kafka/server/AbstractFetcherThread.scala @@ -221,7 +221,7 @@ class FetcherLagMetrics(metricId: ClientIdBrokerTopicPartition) extends KafkaMet class FetcherLagStats(metricId: ClientIdAndBroker) { private val valueFactory = (k: ClientIdBrokerTopicPartition) => new FetcherLagMetrics(k) - private val stats = new Pool[ClientIdBrokerTopicPartition, FetcherLagMetrics](Some(valueFactory)) + val stats = new Pool[ClientIdBrokerTopicPartition, FetcherLagMetrics](Some(valueFactory)) def getFetcherLagStats(topic: String, partitionId: Int): FetcherLagMetrics = { stats.getAndMaybePut(new ClientIdBrokerTopicPartition(metricId.clientId, metricId.brokerInfo, topic, partitionId)) http://git-wip-us.apache.org/repos/asf/kafka/blob/4850519a/core/src/main/scala/kafka/server/ReplicaFetcherManager.scala ---------------------------------------------------------------------- diff --git a/core/src/main/scala/kafka/server/ReplicaFetcherManager.scala b/core/src/main/scala/kafka/server/ReplicaFetcherManager.scala index 7f775ec..351dbba 100644 --- a/core/src/main/scala/kafka/server/ReplicaFetcherManager.scala +++ b/core/src/main/scala/kafka/server/ReplicaFetcherManager.scala @@ -20,7 +20,8 @@ package kafka.server import kafka.cluster.Broker class ReplicaFetcherManager(private val brokerConfig: KafkaConfig, private val replicaMgr: ReplicaManager) - extends AbstractFetcherManager("ReplicaFetcherManager on broker " + brokerConfig.brokerId, brokerConfig.numReplicaFetchers) { + extends AbstractFetcherManager("ReplicaFetcherManager on broker " + brokerConfig.brokerId, + "Replica", brokerConfig.numReplicaFetchers) { override def createFetcherThread(fetcherId: Int, sourceBroker: Broker): AbstractFetcherThread = { new ReplicaFetcherThread("ReplicaFetcherThread-%d-%d".format(fetcherId, sourceBroker.id), sourceBroker, brokerConfig, replicaMgr)