Repository: kafka Updated Branches: refs/heads/0.8.2 0b312a6b9 -> 2a1e3d451
KAFKA-1902; fix MetricName so that Yammer reporter can work correctly; patched by Jun Rao; reviewed by Manikumar Reddy, Manikumar Reddy and Joel Koshy Project: http://git-wip-us.apache.org/repos/asf/kafka/repo Commit: http://git-wip-us.apache.org/repos/asf/kafka/commit/2a1e3d45 Tree: http://git-wip-us.apache.org/repos/asf/kafka/tree/2a1e3d45 Diff: http://git-wip-us.apache.org/repos/asf/kafka/diff/2a1e3d45 Branch: refs/heads/0.8.2 Commit: 2a1e3d4510e8fadb0cad0cb7290baf54aae39c23 Parents: 0b312a6 Author: Jun Rao <jun...@gmail.com> Authored: Wed Jan 28 18:44:16 2015 -0600 Committer: Jun Rao <jun...@gmail.com> Committed: Wed Jan 28 18:44:16 2015 -0600 ---------------------------------------------------------------------- .../scala/kafka/metrics/KafkaMetricsGroup.scala | 27 ++++++++++++++++++-- 1 file changed, 25 insertions(+), 2 deletions(-) ---------------------------------------------------------------------- http://git-wip-us.apache.org/repos/asf/kafka/blob/2a1e3d45/core/src/main/scala/kafka/metrics/KafkaMetricsGroup.scala ---------------------------------------------------------------------- diff --git a/core/src/main/scala/kafka/metrics/KafkaMetricsGroup.scala b/core/src/main/scala/kafka/metrics/KafkaMetricsGroup.scala index e9e4918..9e31184 100644 --- a/core/src/main/scala/kafka/metrics/KafkaMetricsGroup.scala +++ b/core/src/main/scala/kafka/metrics/KafkaMetricsGroup.scala @@ -61,9 +61,15 @@ trait KafkaMetricsGroup extends Logging { nameBuilder.append(name) } - KafkaMetricsGroup.toMBeanName(tags).map(mbeanName => nameBuilder.append(",").append(mbeanName)) + val scope: String = KafkaMetricsGroup.toScope(tags).getOrElse(null) + val tagsName = KafkaMetricsGroup.toMBeanName(tags) + tagsName match { + case Some(tn) => + nameBuilder.append(",").append(tn) + case None => + } - new MetricName(group, typeName, name, null, nameBuilder.toString()) + new MetricName(group, typeName, name, scope, nameBuilder.toString()) } def newGauge[T](name: String, metric: Gauge[T], tags: scala.collection.Map[String, String] = Map.empty) = @@ -160,6 +166,23 @@ object KafkaMetricsGroup extends KafkaMetricsGroup with Logging { } } + private def toScope(tags: collection.Map[String, String]): Option[String] = { + val filteredTags = tags + .filter { case (tagKey, tagValue) => tagValue != ""} + if (filteredTags.nonEmpty) { + // convert dot to _ since reporters like Graphite typically use dot to represent hierarchy + val tagsString = filteredTags + .toList.sortWith((t1, t2) => t1._1 < t2._1) + .map { case (key, value) => "%s.%s".format(key, value.replaceAll("\\.", "_"))} + .mkString(".") + + Some(tagsString) + } + else { + None + } + } + def removeAllConsumerMetrics(clientId: String) { FetchRequestAndResponseStatsRegistry.removeConsumerFetchRequestAndResponseStats(clientId) ConsumerTopicStatsRegistry.removeConsumerTopicStat(clientId)