KAFKA-921; Expose max lag and min fetch rate mbeans for consumers and replica 
fetchers; reviewed by Jun Rao.


Project: http://git-wip-us.apache.org/repos/asf/kafka/repo
Commit: http://git-wip-us.apache.org/repos/asf/kafka/commit/4850519a
Tree: http://git-wip-us.apache.org/repos/asf/kafka/tree/4850519a
Diff: http://git-wip-us.apache.org/repos/asf/kafka/diff/4850519a

Branch: refs/heads/trunk
Commit: 4850519a2c32b3ee882eb79375ba77de8d0488f7
Parents: 1caae2c
Author: Joel Koshy <jjko...@gmail.com>
Authored: Fri May 31 10:43:28 2013 -0700
Committer: Joel Koshy <jjko...@gmail.com>
Committed: Fri May 31 10:43:28 2013 -0700

----------------------------------------------------------------------
 .../kafka/consumer/ConsumerFetcherManager.scala |  3 +-
 .../kafka/server/AbstractFetcherManager.scala   | 34 +++++++++++++++++++-
 .../kafka/server/AbstractFetcherThread.scala    |  2 +-
 .../kafka/server/ReplicaFetcherManager.scala    |  3 +-
 4 files changed, 38 insertions(+), 4 deletions(-)
----------------------------------------------------------------------


http://git-wip-us.apache.org/repos/asf/kafka/blob/4850519a/core/src/main/scala/kafka/consumer/ConsumerFetcherManager.scala
----------------------------------------------------------------------
diff --git a/core/src/main/scala/kafka/consumer/ConsumerFetcherManager.scala 
b/core/src/main/scala/kafka/consumer/ConsumerFetcherManager.scala
index 96bd886..3e497b9 100644
--- a/core/src/main/scala/kafka/consumer/ConsumerFetcherManager.scala
+++ b/core/src/main/scala/kafka/consumer/ConsumerFetcherManager.scala
@@ -38,7 +38,8 @@ import java.util.concurrent.atomic.AtomicInteger
 class ConsumerFetcherManager(private val consumerIdString: String,
                              private val config: ConsumerConfig,
                              private val zkClient : ZkClient)
-        extends 
AbstractFetcherManager("ConsumerFetcherManager-%d".format(SystemTime.milliseconds),
 1) {
+        extends 
AbstractFetcherManager("ConsumerFetcherManager-%d".format(SystemTime.milliseconds),
+                                       config.groupId, 1) {
   private var partitionMap: immutable.Map[TopicAndPartition, 
PartitionTopicInfo] = null
   private var cluster: Cluster = null
   private val noLeaderPartitionSet = new mutable.HashSet[TopicAndPartition]

http://git-wip-us.apache.org/repos/asf/kafka/blob/4850519a/core/src/main/scala/kafka/server/AbstractFetcherManager.scala
----------------------------------------------------------------------
diff --git a/core/src/main/scala/kafka/server/AbstractFetcherManager.scala 
b/core/src/main/scala/kafka/server/AbstractFetcherManager.scala
index 4269219..15b7bd3 100644
--- a/core/src/main/scala/kafka/server/AbstractFetcherManager.scala
+++ b/core/src/main/scala/kafka/server/AbstractFetcherManager.scala
@@ -20,13 +20,45 @@ package kafka.server
 import scala.collection.mutable
 import kafka.utils.Logging
 import kafka.cluster.Broker
+import kafka.metrics.KafkaMetricsGroup
+import com.yammer.metrics.core.Gauge
 
-abstract class AbstractFetcherManager(protected val name: String, numFetchers: 
Int = 1) extends Logging {
+abstract class AbstractFetcherManager(protected val name: String, 
metricPrefix: String, numFetchers: Int = 1)
+  extends Logging with KafkaMetricsGroup {
     // map of (source brokerid, fetcher Id per source broker) => fetcher
   private val fetcherThreadMap = new mutable.HashMap[BrokerAndFetcherId, 
AbstractFetcherThread]
   private val mapLock = new Object
   this.logIdent = "[" + name + "] "
 
+  newGauge(
+    metricPrefix + "-MaxLag",
+    new Gauge[Long] {
+      // current max lag across all fetchers/topics/partitions
+      def value = fetcherThreadMap.foldLeft(0L)((curMaxAll, 
fetcherThreadMapEntry) => {
+        
fetcherThreadMapEntry._2.fetcherLagStats.stats.foldLeft(0L)((curMaxThread, 
fetcherLagStatsEntry) => {
+          curMaxThread.max(fetcherLagStatsEntry._2.lag)
+        }).max(curMaxAll)
+      })
+    }
+  )
+
+  newGauge(
+    metricPrefix + "-MinFetchRate",
+    {
+      new Gauge[Double] {
+        // current min fetch rate across all fetchers/topics/partitions
+        def value = {
+          val headRate: Double =
+            
fetcherThreadMap.headOption.map(_._2.fetcherStats.requestRate.oneMinuteRate).getOrElse(0)
+
+          fetcherThreadMap.foldLeft(headRate)((curMinAll, 
fetcherThreadMapEntry) => {
+            
fetcherThreadMapEntry._2.fetcherStats.requestRate.oneMinuteRate.min(curMinAll)
+          })
+        }
+      }
+    }
+  )
+
   private def getFetcherId(topic: String, partitionId: Int) : Int = {
     (topic.hashCode() + 31 * partitionId) % numFetchers
   }

http://git-wip-us.apache.org/repos/asf/kafka/blob/4850519a/core/src/main/scala/kafka/server/AbstractFetcherThread.scala
----------------------------------------------------------------------
diff --git a/core/src/main/scala/kafka/server/AbstractFetcherThread.scala 
b/core/src/main/scala/kafka/server/AbstractFetcherThread.scala
index 48100f4..b7cbb98 100644
--- a/core/src/main/scala/kafka/server/AbstractFetcherThread.scala
+++ b/core/src/main/scala/kafka/server/AbstractFetcherThread.scala
@@ -221,7 +221,7 @@ class FetcherLagMetrics(metricId: 
ClientIdBrokerTopicPartition) extends KafkaMet
 
 class FetcherLagStats(metricId: ClientIdAndBroker) {
   private val valueFactory = (k: ClientIdBrokerTopicPartition) => new 
FetcherLagMetrics(k)
-  private val stats = new Pool[ClientIdBrokerTopicPartition, 
FetcherLagMetrics](Some(valueFactory))
+  val stats = new Pool[ClientIdBrokerTopicPartition, 
FetcherLagMetrics](Some(valueFactory))
 
   def getFetcherLagStats(topic: String, partitionId: Int): FetcherLagMetrics = 
{
     stats.getAndMaybePut(new ClientIdBrokerTopicPartition(metricId.clientId, 
metricId.brokerInfo, topic, partitionId))

http://git-wip-us.apache.org/repos/asf/kafka/blob/4850519a/core/src/main/scala/kafka/server/ReplicaFetcherManager.scala
----------------------------------------------------------------------
diff --git a/core/src/main/scala/kafka/server/ReplicaFetcherManager.scala 
b/core/src/main/scala/kafka/server/ReplicaFetcherManager.scala
index 7f775ec..351dbba 100644
--- a/core/src/main/scala/kafka/server/ReplicaFetcherManager.scala
+++ b/core/src/main/scala/kafka/server/ReplicaFetcherManager.scala
@@ -20,7 +20,8 @@ package kafka.server
 import kafka.cluster.Broker
 
 class ReplicaFetcherManager(private val brokerConfig: KafkaConfig, private val 
replicaMgr: ReplicaManager)
-        extends AbstractFetcherManager("ReplicaFetcherManager on broker " + 
brokerConfig.brokerId, brokerConfig.numReplicaFetchers) {
+        extends AbstractFetcherManager("ReplicaFetcherManager on broker " + 
brokerConfig.brokerId,
+                                       "Replica", 
brokerConfig.numReplicaFetchers) {
 
   override def createFetcherThread(fetcherId: Int, sourceBroker: Broker): 
AbstractFetcherThread = {
     new ReplicaFetcherThread("ReplicaFetcherThread-%d-%d".format(fetcherId, 
sourceBroker.id), sourceBroker, brokerConfig, replicaMgr)

Reply via email to