Repository: kafka Updated Branches: refs/heads/trunk 911c768bc -> d96866243
KAFKA-5194; Include only client traffic in BytesOutPerSec metric (KIP-153) Also added 2 new metrics to account for incoming/outgoing traffic due to internal replication - ReplicationBytesInPerSec - ReplicationBytesOutPerSec Author: Mickael Maison <[email protected]> Reviewers: Jun Rao <[email protected]>, Ismael Juma <[email protected]> Closes #3003 from mimaison/KAFKA-5194 Project: http://git-wip-us.apache.org/repos/asf/kafka/repo Commit: http://git-wip-us.apache.org/repos/asf/kafka/commit/d9686624 Tree: http://git-wip-us.apache.org/repos/asf/kafka/tree/d9686624 Diff: http://git-wip-us.apache.org/repos/asf/kafka/diff/d9686624 Branch: refs/heads/trunk Commit: d96866243990a4d739ec8fc239f0c2758bba66e8 Parents: 911c768 Author: Mickael Maison <[email protected]> Authored: Fri May 12 11:50:19 2017 +0100 Committer: Ismael Juma <[email protected]> Committed: Fri May 12 12:33:48 2017 +0100 ---------------------------------------------------------------------- .../src/main/scala/kafka/server/KafkaApis.scala | 3 +- .../kafka/server/KafkaRequestHandler.scala | 31 +++++++++++ .../kafka/server/ReplicaFetcherThread.scala | 1 + .../scala/unit/kafka/metrics/MetricsTest.scala | 56 ++++++++++++++++---- .../test/scala/unit/kafka/utils/TestUtils.scala | 25 ++++++++- 5 files changed, 102 insertions(+), 14 deletions(-) ---------------------------------------------------------------------- http://git-wip-us.apache.org/repos/asf/kafka/blob/d9686624/core/src/main/scala/kafka/server/KafkaApis.scala ---------------------------------------------------------------------- diff --git a/core/src/main/scala/kafka/server/KafkaApis.scala b/core/src/main/scala/kafka/server/KafkaApis.scala index fbd74ac..150d16d 100644 --- a/core/src/main/scala/kafka/server/KafkaApis.scala +++ b/core/src/main/scala/kafka/server/KafkaApis.scala @@ -516,8 +516,7 @@ class KafkaApis(val requestChannel: RequestChannel, fetchedPartitionData.put(topicPartition, data) // record the bytes out metrics only when the response is being sent - BrokerTopicStats.getBrokerTopicStats(topicPartition.topic).bytesOutRate.mark(data.records.sizeInBytes) - BrokerTopicStats.getBrokerAllTopicsStats().bytesOutRate.mark(data.records.sizeInBytes) + BrokerTopicStats.updateBytesOut(topicPartition.topic, fetchRequest.isFromFollower, data.records.sizeInBytes) } val response = new FetchResponse(fetchedPartitionData, 0) http://git-wip-us.apache.org/repos/asf/kafka/blob/d9686624/core/src/main/scala/kafka/server/KafkaRequestHandler.scala ---------------------------------------------------------------------- diff --git a/core/src/main/scala/kafka/server/KafkaRequestHandler.scala b/core/src/main/scala/kafka/server/KafkaRequestHandler.scala index a1600cb..d1d63f1 100755 --- a/core/src/main/scala/kafka/server/KafkaRequestHandler.scala +++ b/core/src/main/scala/kafka/server/KafkaRequestHandler.scala @@ -115,6 +115,12 @@ class BrokerTopicMetrics(name: Option[String]) extends KafkaMetricsGroup { val bytesInRate = newMeter(BrokerTopicStats.BytesInPerSec, "bytes", TimeUnit.SECONDS, tags) val bytesOutRate = newMeter(BrokerTopicStats.BytesOutPerSec, "bytes", TimeUnit.SECONDS, tags) val bytesRejectedRate = newMeter(BrokerTopicStats.BytesRejectedPerSec, "bytes", TimeUnit.SECONDS, tags) + private[server] val replicationBytesInRate = + if (name.isEmpty) Some(newMeter(BrokerTopicStats.ReplicationBytesInPerSec, "bytes", TimeUnit.SECONDS, tags)) + else None + private[server] val replicationBytesOutRate = + if (name.isEmpty) Some(newMeter(BrokerTopicStats.ReplicationBytesOutPerSec, "bytes", TimeUnit.SECONDS, tags)) + else None val failedProduceRequestRate = newMeter(BrokerTopicStats.FailedProduceRequestsPerSec, "requests", TimeUnit.SECONDS, tags) val failedFetchRequestRate = newMeter(BrokerTopicStats.FailedFetchRequestsPerSec, "requests", TimeUnit.SECONDS, tags) val totalProduceRequestRate = newMeter(BrokerTopicStats.TotalProduceRequestsPerSec, "requests", TimeUnit.SECONDS, tags) @@ -125,6 +131,8 @@ class BrokerTopicMetrics(name: Option[String]) extends KafkaMetricsGroup { removeMetric(BrokerTopicStats.BytesInPerSec, tags) removeMetric(BrokerTopicStats.BytesOutPerSec, tags) removeMetric(BrokerTopicStats.BytesRejectedPerSec, tags) + removeMetric(BrokerTopicStats.ReplicationBytesInPerSec, tags) + removeMetric(BrokerTopicStats.ReplicationBytesOutPerSec, tags) removeMetric(BrokerTopicStats.FailedProduceRequestsPerSec, tags) removeMetric(BrokerTopicStats.FailedFetchRequestsPerSec, tags) removeMetric(BrokerTopicStats.TotalProduceRequestsPerSec, tags) @@ -137,6 +145,8 @@ object BrokerTopicStats extends Logging { val BytesInPerSec = "BytesInPerSec" val BytesOutPerSec = "BytesOutPerSec" val BytesRejectedPerSec = "BytesRejectedPerSec" + val ReplicationBytesInPerSec = "ReplicationBytesInPerSec" + val ReplicationBytesOutPerSec = "ReplicationBytesOutPerSec" val FailedProduceRequestsPerSec = "FailedProduceRequestsPerSec" val FailedFetchRequestsPerSec = "FailedFetchRequestsPerSec" val TotalProduceRequestsPerSec = "TotalProduceRequestsPerSec" @@ -152,9 +162,30 @@ object BrokerTopicStats extends Logging { stats.getAndMaybePut(topic) } + def updateReplicationBytesIn(value: Long) { + getBrokerAllTopicsStats.replicationBytesInRate.foreach { metric => + metric.mark(value) + } + } + + private def updateReplicationBytesOut(value: Long) { + getBrokerAllTopicsStats.replicationBytesOutRate.foreach { metric => + metric.mark(value) + } + } + def removeMetrics(topic: String) { val metrics = stats.remove(topic) if (metrics != null) metrics.close() } + + def updateBytesOut(topic: String, isFollower: Boolean, value: Long) { + if (isFollower) { + updateReplicationBytesOut(value) + } else { + getBrokerTopicStats(topic).bytesOutRate.mark(value) + getBrokerAllTopicsStats.bytesOutRate.mark(value) + } + } } http://git-wip-us.apache.org/repos/asf/kafka/blob/d9686624/core/src/main/scala/kafka/server/ReplicaFetcherThread.scala ---------------------------------------------------------------------- diff --git a/core/src/main/scala/kafka/server/ReplicaFetcherThread.scala b/core/src/main/scala/kafka/server/ReplicaFetcherThread.scala index 9016fcf..1148e92 100644 --- a/core/src/main/scala/kafka/server/ReplicaFetcherThread.scala +++ b/core/src/main/scala/kafka/server/ReplicaFetcherThread.scala @@ -112,6 +112,7 @@ class ReplicaFetcherThread(name: String, trace(s"Follower ${replica.brokerId} set replica high watermark for partition $topicPartition to $followerHighWatermark") if (quota.isThrottled(topicPartition)) quota.record(records.sizeInBytes) + BrokerTopicStats.updateReplicationBytesIn(records.sizeInBytes) } catch { case e: KafkaStorageException => fatal(s"Disk error while replicating data for $topicPartition", e) http://git-wip-us.apache.org/repos/asf/kafka/blob/d9686624/core/src/test/scala/unit/kafka/metrics/MetricsTest.scala ---------------------------------------------------------------------- diff --git a/core/src/test/scala/unit/kafka/metrics/MetricsTest.scala b/core/src/test/scala/unit/kafka/metrics/MetricsTest.scala index 16f0636..f33055f 100644 --- a/core/src/test/scala/unit/kafka/metrics/MetricsTest.scala +++ b/core/src/test/scala/unit/kafka/metrics/MetricsTest.scala @@ -20,8 +20,8 @@ package kafka.metrics import java.util.Properties import com.yammer.metrics.Metrics -import com.yammer.metrics.core.{Metric, MetricName, MetricPredicate} -import org.junit.{After, Test} +import com.yammer.metrics.core.{Meter, MetricPredicate} +import org.junit.Test import org.junit.Assert._ import kafka.integration.KafkaServerTestHarness import kafka.server._ @@ -48,11 +48,6 @@ class MetricsTest extends KafkaServerTestHarness with Logging { val nMessages = 2 - @After - override def tearDown() { - super.tearDown() - } - @Test @deprecated("This test has been deprecated and it will be removed in a future release", "0.10.0.0") def testMetricsLeak() { @@ -93,10 +88,10 @@ class MetricsTest extends KafkaServerTestHarness with Logging { } @Test - def testClusterIdMetric(): Unit ={ + def testClusterIdMetric(): Unit = { // Check if clusterId metric exists. val metrics = Metrics.defaultRegistry().allMetrics - assertEquals(metrics.keySet.asScala.count(_.getMBeanName().equals("kafka.server:type=KafkaServer,name=ClusterId")), 1) + assertEquals(metrics.keySet.asScala.count(_.getMBeanName == "kafka.server:type=KafkaServer,name=ClusterId"), 1) } @deprecated("This test has been deprecated and it will be removed in a future release", "0.10.0.0") @@ -111,10 +106,51 @@ class MetricsTest extends KafkaServerTestHarness with Logging { zkConsumerConnector1.shutdown() } + @Test + def testBrokerTopicMetricsBytesInOut(): Unit = { + val replicationBytesIn = BrokerTopicStats.ReplicationBytesInPerSec + val replicationBytesOut = BrokerTopicStats.ReplicationBytesOutPerSec + val bytesIn = s"${BrokerTopicStats.BytesInPerSec},topic=$topic" + val bytesOut = s"${BrokerTopicStats.BytesOutPerSec},topic=$topic" + + createTopic(zkUtils, topic, 1, numNodes, servers) + // Produce a few messages to create the metrics + TestUtils.produceMessages(servers, topic, nMessages) + + val initialReplicationBytesIn = meterCount(replicationBytesIn) + val initialReplicationBytesOut = meterCount(replicationBytesOut) + val initialBytesIn = meterCount(bytesIn) + val initialBytesOut = meterCount(bytesOut) + + // Produce a few messages to make the metrics tick + TestUtils.produceMessages(servers, topic, nMessages) + + assertTrue(meterCount(replicationBytesIn) > initialReplicationBytesIn) + assertTrue(meterCount(replicationBytesOut) > initialReplicationBytesOut) + assertTrue(meterCount(bytesIn) > initialBytesIn) + // BytesOut doesn't include replication, so it shouldn't have changed + assertEquals(initialBytesOut, meterCount(bytesOut)) + + // Consume messages to make bytesOut tick + TestUtils.consumeTopicRecords(servers, topic, nMessages * 2) + + assertTrue(meterCount(bytesOut) > initialBytesOut) + } + + private def meterCount(metricName: String): Long = { + Metrics.defaultRegistry.allMetrics.asScala + .filterKeys(_.getMBeanName.endsWith(metricName)) + .values + .headOption + .getOrElse(fail(s"Unable to find metric $metricName")) + .asInstanceOf[Meter] + .count + } + private def checkTopicMetricsExists(topic: String): Boolean = { val topicMetricRegex = new Regex(".*("+topic+")$") val metricGroups = Metrics.defaultRegistry().groupedMetrics(MetricPredicate.ALL).entrySet() - for(metricGroup <- metricGroups.asScala) { + for (metricGroup <- metricGroups.asScala) { if (topicMetricRegex.pattern.matcher(metricGroup.getKey()).matches) return true } http://git-wip-us.apache.org/repos/asf/kafka/blob/d9686624/core/src/test/scala/unit/kafka/utils/TestUtils.scala ---------------------------------------------------------------------- diff --git a/core/src/test/scala/unit/kafka/utils/TestUtils.scala b/core/src/test/scala/unit/kafka/utils/TestUtils.scala index a51a07c..f254ee4 100755 --- a/core/src/test/scala/unit/kafka/utils/TestUtils.scala +++ b/core/src/test/scala/unit/kafka/utils/TestUtils.scala @@ -22,7 +22,7 @@ import java.nio._ import java.nio.channels._ import java.nio.charset.Charset import java.security.cert.X509Certificate -import java.util.Properties +import java.util.{ArrayList, Collections, Properties} import java.util.concurrent.{Callable, Executors, TimeUnit} import javax.net.ssl.X509TrustManager @@ -40,7 +40,7 @@ import kafka.server._ import kafka.server.checkpoints.OffsetCheckpointFile import kafka.utils.ZkUtils._ import org.apache.kafka.clients.CommonClientConfigs -import org.apache.kafka.clients.consumer.{KafkaConsumer, RangeAssignor} +import org.apache.kafka.clients.consumer.{ConsumerRecord, KafkaConsumer, RangeAssignor} import org.apache.kafka.clients.producer.{KafkaProducer, ProducerConfig, ProducerRecord} import org.apache.kafka.common.TopicPartition import org.apache.kafka.common.network.{ListenerName, Mode} @@ -1298,6 +1298,27 @@ object TestUtils extends Logging { assertTrue(s"$message failed with exception(s) $exceptions", exceptions.isEmpty) } + + def consumeTopicRecords[K, V](servers: Seq[KafkaServer], topic: String, numMessages: Int, + waitTime: Long = JTestUtils.DEFAULT_MAX_WAIT_MS): Seq[ConsumerRecord[Array[Byte], Array[Byte]]] = { + val consumer = createNewConsumer(TestUtils.getBrokerListStrFromServers(servers), + securityProtocol = SecurityProtocol.PLAINTEXT) + try { + consumer.subscribe(Collections.singleton(topic)) + consumeRecords(consumer, numMessages, waitTime) + } finally consumer.close() + } + + def consumeRecords[K, V](consumer: KafkaConsumer[K, V], numMessages: Int, + waitTime: Long = JTestUtils.DEFAULT_MAX_WAIT_MS): Seq[ConsumerRecord[K, V]] = { + val records = new ArrayBuffer[ConsumerRecord[K, V]]() + waitUntilTrue(() => { + records ++= consumer.poll(50).asScala + records.size >= numMessages + }, s"Consumed ${records.size} records until timeout instead of the expected $numMessages records", waitTime) + assertEquals("Consumed more records than expected", numMessages, records.size) + records + } } class IntEncoder(props: VerifiableProperties = null) extends Encoder[Int] {
