Repository: kafka Updated Branches: refs/heads/trunk 1949a76bc -> 006630fd9
MINOR: Fix metric collection NPE during shutdown Collecting socket server metrics during shutdown may throw NullPointerException Author: Xavier Léauté <[email protected]> Reviewers: Ismael Juma <[email protected]> Closes #2221 from xvrl/fix-metrics-npe-on-shutdown Project: http://git-wip-us.apache.org/repos/asf/kafka/repo Commit: http://git-wip-us.apache.org/repos/asf/kafka/commit/006630fd Tree: http://git-wip-us.apache.org/repos/asf/kafka/tree/006630fd Diff: http://git-wip-us.apache.org/repos/asf/kafka/diff/006630fd Branch: refs/heads/trunk Commit: 006630fd93d8efb823e5b5f7d61584138df984a6 Parents: 1949a76 Author: Xavier Léauté <[email protected]> Authored: Thu Dec 8 22:26:25 2016 +0000 Committer: Ismael Juma <[email protected]> Committed: Thu Dec 8 23:42:48 2016 +0000 ---------------------------------------------------------------------- .../apache/kafka/common/metrics/Metrics.java | 4 +++ .../main/scala/kafka/network/SocketServer.scala | 7 ++-- .../unit/kafka/network/SocketServerTest.scala | 34 ++++++++++++++------ 3 files changed, 33 insertions(+), 12 deletions(-) ---------------------------------------------------------------------- http://git-wip-us.apache.org/repos/asf/kafka/blob/006630fd/clients/src/main/java/org/apache/kafka/common/metrics/Metrics.java ---------------------------------------------------------------------- diff --git a/clients/src/main/java/org/apache/kafka/common/metrics/Metrics.java b/clients/src/main/java/org/apache/kafka/common/metrics/Metrics.java index bd20e13..78dad18 100644 --- a/clients/src/main/java/org/apache/kafka/common/metrics/Metrics.java +++ b/clients/src/main/java/org/apache/kafka/common/metrics/Metrics.java @@ -389,6 +389,10 @@ public class Metrics implements Closeable { return this.metrics; } + public KafkaMetric metric(MetricName metricName) { + return this.metrics.get(metricName); + } + /** * This iterates over every Sensor and triggers a removeSensor if it has expired * Package private for testing http://git-wip-us.apache.org/repos/asf/kafka/blob/006630fd/core/src/main/scala/kafka/network/SocketServer.scala ---------------------------------------------------------------------- diff --git a/core/src/main/scala/kafka/network/SocketServer.scala b/core/src/main/scala/kafka/network/SocketServer.scala index e98445f..55061ed 100644 --- a/core/src/main/scala/kafka/network/SocketServer.scala +++ b/core/src/main/scala/kafka/network/SocketServer.scala @@ -105,8 +105,9 @@ class SocketServer(val config: KafkaConfig, val metrics: Metrics, val time: Time newGauge("NetworkProcessorAvgIdlePercent", new Gauge[Double] { - def value = allMetricNames.map( metricName => - metrics.metrics().get(metricName).value()).sum / totalProcessorThreads + def value = allMetricNames.map { metricName => + Option(metrics.metric(metricName)).fold(0.0)(_.value) + }.sum / totalProcessorThreads } ) @@ -389,7 +390,7 @@ private[kafka] class Processor(val id: Int, newGauge("IdlePercent", new Gauge[Double] { def value = { - metrics.metrics().get(metrics.metricName("io-wait-ratio", "socket-server-metrics", metricTags)).value() + Option(metrics.metric(metrics.metricName("io-wait-ratio", "socket-server-metrics", metricTags))).fold(0.0)(_.value) } }, metricTags.asScala http://git-wip-us.apache.org/repos/asf/kafka/blob/006630fd/core/src/test/scala/unit/kafka/network/SocketServerTest.scala ---------------------------------------------------------------------- diff --git a/core/src/test/scala/unit/kafka/network/SocketServerTest.scala b/core/src/test/scala/unit/kafka/network/SocketServerTest.scala index 7d0764b..c6f90ff 100644 --- a/core/src/test/scala/unit/kafka/network/SocketServerTest.scala +++ b/core/src/test/scala/unit/kafka/network/SocketServerTest.scala @@ -17,27 +17,29 @@ package kafka.network -import java.net._ -import javax.net.ssl._ import java.io._ -import java.util.HashMap -import java.util.Random +import java.net._ import java.nio.ByteBuffer +import java.util.{HashMap, Random} +import javax.net.ssl._ +import com.yammer.metrics.core.Gauge +import com.yammer.metrics.{Metrics => YammerMetrics} +import kafka.server.KafkaConfig +import kafka.utils.TestUtils +import org.apache.kafka.common.TopicPartition import org.apache.kafka.common.metrics.Metrics import org.apache.kafka.common.network.NetworkSend import org.apache.kafka.common.protocol.{ApiKeys, SecurityProtocol} -import org.apache.kafka.common.security.auth.KafkaPrincipal -import org.apache.kafka.common.TopicPartition +import org.apache.kafka.common.record.MemoryRecords import org.apache.kafka.common.requests.{ProduceRequest, RequestHeader} +import org.apache.kafka.common.security.auth.KafkaPrincipal import org.apache.kafka.common.utils.Time -import kafka.server.KafkaConfig -import kafka.utils.TestUtils -import org.apache.kafka.common.record.MemoryRecords import org.junit.Assert._ import org.junit._ import org.scalatest.junit.JUnitSuite +import scala.collection.JavaConverters.mapAsScalaMapConverter import scala.collection.mutable.ArrayBuffer class SocketServerTest extends JUnitSuite { @@ -395,4 +397,18 @@ class SocketServerTest extends JUnitSuite { } + @Test + def testMetricCollectionAfterShutdown(): Unit = { + server.shutdown() + + val sum = YammerMetrics + .defaultRegistry + .allMetrics.asScala + .filterKeys(k => k.getName.endsWith("IdlePercent") || k.getName.endsWith("NetworkProcessorAvgIdlePercent")) + .collect { case (_, metric: Gauge[_]) => metric.value.asInstanceOf[Double] } + .sum + + assertEquals(0, sum, 0) + } + }
