Repository: kafka Updated Branches: refs/heads/trunk 238e73978 -> f21f8f2d4
KAFKA-4982; Add listener tags to socket-server-metrics (KIP-136) Author: Edoardo Comar <[email protected]> Reviewers: Ismael Juma <[email protected]> Closes #3004 from edoardocomar/KAFKA-4982 Project: http://git-wip-us.apache.org/repos/asf/kafka/repo Commit: http://git-wip-us.apache.org/repos/asf/kafka/commit/f21f8f2d Tree: http://git-wip-us.apache.org/repos/asf/kafka/tree/f21f8f2d Diff: http://git-wip-us.apache.org/repos/asf/kafka/diff/f21f8f2d Branch: refs/heads/trunk Commit: f21f8f2d44f97eba4ce155ac9fcc8432f00cad24 Parents: 238e739 Author: Edoardo Comar <[email protected]> Authored: Sat May 13 01:54:46 2017 +0100 Committer: Ismael Juma <[email protected]> Committed: Sat May 13 02:42:37 2017 +0100 ---------------------------------------------------------------------- .../main/scala/kafka/network/SocketServer.scala | 24 +++++++++++--------- .../unit/kafka/network/SocketServerTest.scala | 21 +++++++++++++++++ 2 files changed, 34 insertions(+), 11 deletions(-) ---------------------------------------------------------------------- http://git-wip-us.apache.org/repos/asf/kafka/blob/f21f8f2d/core/src/main/scala/kafka/network/SocketServer.scala ---------------------------------------------------------------------- diff --git a/core/src/main/scala/kafka/network/SocketServer.scala b/core/src/main/scala/kafka/network/SocketServer.scala index b2a3456..fb647fa 100644 --- a/core/src/main/scala/kafka/network/SocketServer.scala +++ b/core/src/main/scala/kafka/network/SocketServer.scala @@ -21,7 +21,6 @@ import java.io.IOException import java.net._ import java.nio.channels._ import java.nio.channels.{Selector => NSelector} -import java.util import java.util.concurrent._ import java.util.concurrent.atomic._ @@ -34,7 +33,7 @@ import kafka.server.KafkaConfig import kafka.utils._ import org.apache.kafka.common.errors.InvalidRequestException import org.apache.kafka.common.metrics._ -import org.apache.kafka.common.network.{ChannelBuilders, KafkaChannel, ListenerName, Mode, Selectable, Selector => KSelector} +import org.apache.kafka.common.network.{ChannelBuilders, KafkaChannel, ListenerName, Selectable, Selector => KSelector} import org.apache.kafka.common.security.auth.KafkaPrincipal import org.apache.kafka.common.protocol.SecurityProtocol import org.apache.kafka.common.protocol.types.SchemaException @@ -68,12 +67,6 @@ class SocketServer(val config: KafkaConfig, val metrics: Metrics, val time: Time private[network] val acceptors = mutable.Map[EndPoint, Acceptor]() private var connectionQuotas: ConnectionQuotas = _ - private val allMetricNames = (0 until totalProcessorThreads).map { i => - val tags = new util.HashMap[String, String]() - tags.put("networkProcessor", i.toString) - metrics.metricName("io-wait-ratio", "socket-server-metrics", tags) - } - /** * Start the socket server */ @@ -107,7 +100,11 @@ class SocketServer(val config: KafkaConfig, val metrics: Metrics, val time: Time newGauge("NetworkProcessorAvgIdlePercent", new Gauge[Double] { - def value = allMetricNames.map { metricName => + private val ioWaitRatioMetricNames = processors.map { p => + metrics.metricName("io-wait-ratio", "socket-server-metrics", p.metricTags) + } + + def value = ioWaitRatioMetricNames.map { metricName => Option(metrics.metric(metricName)).fold(0.0)(_.value) }.sum / totalProcessorThreads } @@ -400,7 +397,10 @@ private[kafka] class Processor(val id: Int, private val newConnections = new ConcurrentLinkedQueue[SocketChannel]() private val inflightResponses = mutable.Map[String, RequestChannel.Response]() - private val metricTags = Map("networkProcessor" -> id.toString).asJava + private[kafka] val metricTags = mutable.LinkedHashMap( + "listener" -> listenerName.value, + "networkProcessor" -> id.toString + ).asJava newGauge("IdlePercent", new Gauge[Double] { @@ -408,7 +408,9 @@ private[kafka] class Processor(val id: Int, Option(metrics.metric(metrics.metricName("io-wait-ratio", "socket-server-metrics", metricTags))).fold(0.0)(_.value) } }, - metricTags.asScala + // for compatibility, only add a networkProcessor tag to the Yammer Metrics alias (the equivalent Selector metric + // also includes the listener name) + Map("networkProcessor" -> id.toString) ) private val selector = new KSelector( http://git-wip-us.apache.org/repos/asf/kafka/blob/f21f8f2d/core/src/test/scala/unit/kafka/network/SocketServerTest.scala ---------------------------------------------------------------------- diff --git a/core/src/test/scala/unit/kafka/network/SocketServerTest.scala b/core/src/test/scala/unit/kafka/network/SocketServerTest.scala index 9b278ae..7678550 100644 --- a/core/src/test/scala/unit/kafka/network/SocketServerTest.scala +++ b/core/src/test/scala/unit/kafka/network/SocketServerTest.scala @@ -419,4 +419,25 @@ class SocketServerTest extends JUnitSuite { assertEquals(Map.empty, nonZeroMetricNamesAndValues) } + @Test + def testProcessorMetricsTags(): Unit = { + val kafkaMetricNames = metrics.metrics.keySet.asScala.filter(_.tags.asScala.get("listener").nonEmpty) + assertFalse(kafkaMetricNames.isEmpty) + + val expectedListeners = Set("PLAINTEXT", "TRACE") + kafkaMetricNames.foreach { kafkaMetricName => + assertTrue(expectedListeners.contains(kafkaMetricName.tags.get("listener"))) + } + + // legacy metrics not tagged + val yammerMetricsNames = YammerMetrics.defaultRegistry.allMetrics.asScala + .filterKeys(_.getType.equals("Processor")) + .collect { case (k, _: Gauge[_]) => k } + assertFalse(yammerMetricsNames.isEmpty) + + yammerMetricsNames.foreach { yammerMetricName => + assertFalse(yammerMetricName.getMBeanName.contains("listener=")) + } + } + }
