Repository: kafka
Updated Branches:
  refs/heads/trunk e0150a25e -> 640082776


KAFKA-4956: Verify client-side throttle time metrics in quota test

Author: Rajini Sivaram <rajinisiva...@googlemail.com>

Reviewers: Jun Rao <jun...@gmail.com>

Closes #3190 from rajinisivaram/KAFKA-4956-unittest


Project: http://git-wip-us.apache.org/repos/asf/kafka/repo
Commit: http://git-wip-us.apache.org/repos/asf/kafka/commit/64008277
Tree: http://git-wip-us.apache.org/repos/asf/kafka/tree/64008277
Diff: http://git-wip-us.apache.org/repos/asf/kafka/diff/64008277

Branch: refs/heads/trunk
Commit: 640082776b429067613922c576a57ec716b1dbe9
Parents: e0150a2
Author: Rajini Sivaram <rajinisiva...@googlemail.com>
Authored: Thu Jun 1 15:45:30 2017 +0100
Committer: Rajini Sivaram <rajinisiva...@googlemail.com>
Committed: Thu Jun 1 15:45:30 2017 +0100

----------------------------------------------------------------------
 .../integration/kafka/api/BaseQuotaTest.scala   | 38 ++++++++++++++++----
 1 file changed, 31 insertions(+), 7 deletions(-)
----------------------------------------------------------------------


http://git-wip-us.apache.org/repos/asf/kafka/blob/64008277/core/src/test/scala/integration/kafka/api/BaseQuotaTest.scala
----------------------------------------------------------------------
diff --git a/core/src/test/scala/integration/kafka/api/BaseQuotaTest.scala 
b/core/src/test/scala/integration/kafka/api/BaseQuotaTest.scala
index 918bb55..32f19e2 100644
--- a/core/src/test/scala/integration/kafka/api/BaseQuotaTest.scala
+++ b/core/src/test/scala/integration/kafka/api/BaseQuotaTest.scala
@@ -14,19 +14,17 @@
 
 package kafka.api
 
-import java.util.{Collections, Properties}
+import java.util.{Collections, HashMap, Properties}
 
-import kafka.server.{DynamicConfig, KafkaConfig, KafkaServer, QuotaId}
+import kafka.server.{ClientQuotaManagerConfig, DynamicConfig, KafkaConfig, 
KafkaServer, QuotaId, QuotaType}
 import kafka.utils.TestUtils
 import org.apache.kafka.clients.consumer.{ConsumerConfig, KafkaConsumer}
 import org.apache.kafka.clients.producer._
 import org.apache.kafka.clients.producer.internals.ErrorLoggingCallback
 import org.apache.kafka.common.{MetricName, TopicPartition}
-import org.apache.kafka.common.metrics.Quota
+import org.apache.kafka.common.metrics.{KafkaMetric, Quota}
 import org.junit.Assert._
 import org.junit.{Before, Test}
-import kafka.server.QuotaType
-import org.apache.kafka.common.metrics.KafkaMetric
 
 abstract class BaseQuotaTest extends IntegrationTestHarness {
 
@@ -83,12 +81,16 @@ abstract class BaseQuotaTest extends IntegrationTestHarness 
{
   def testThrottledProducerConsumer() {
 
     val numRecords = 1000
-    val produced = produceUntilThrottled(producers.head, numRecords)
+    val producer = producers.head
+    val produced = produceUntilThrottled(producer, numRecords)
     assertTrue("Should have been throttled", producerThrottleMetric.value > 0)
+    verifyProducerThrottleTimeMetric(producer)
 
     // Consumer should read in a bursty manner and get throttled immediately
-    consumeUntilThrottled(consumers.head, produced)
+    val consumer = consumers.head
+    consumeUntilThrottled(consumer, produced)
     assertTrue("Should have been throttled", consumerThrottleMetric.value > 0)
+    verifyConsumerThrottleTimeMetric(consumer)
   }
 
   @Test
@@ -152,6 +154,7 @@ abstract class BaseQuotaTest extends IntegrationTestHarness 
{
     }
 
     assertTrue("Should have been throttled", throttled)
+    verifyConsumerThrottleTimeMetric(consumer, 
Some(ClientQuotaManagerConfig.DefaultQuotaWindowSizeSeconds * 1000.0))
 
     assertNotNull("Exempt requests not recorded", exemptRequestMetric)
     assertTrue("Exempt requests not recorded", exemptRequestMetric.value > 0)
@@ -205,6 +208,27 @@ abstract class BaseQuotaTest extends 
IntegrationTestHarness {
     }
   }
 
+  private def verifyProducerThrottleTimeMetric(producer: KafkaProducer[_, _]) {
+    val tags = new HashMap[String, String]
+    tags.put("client-id", producerClientId)
+    val avgMetric = producer.metrics.get(new 
MetricName("produce-throttle-time-avg", "producer-metrics", "", tags))
+    val maxMetric = producer.metrics.get(new 
MetricName("produce-throttle-time-max", "producer-metrics", "", tags))
+
+    TestUtils.waitUntilTrue(() => avgMetric.value > 0.0 && maxMetric.value > 
0.0,
+        s"Producer throttle metric not updated: avg=${avgMetric.value} 
max=${maxMetric.value}")
+  }
+
+  private def verifyConsumerThrottleTimeMetric(consumer: KafkaConsumer[_, _], 
maxThrottleTime: Option[Double] = None) {
+    val tags = new HashMap[String, String]
+    tags.put("client-id", consumerClientId)
+    val avgMetric = consumer.metrics.get(new 
MetricName("fetch-throttle-time-avg", "consumer-fetch-manager-metrics", "", 
tags))
+    val maxMetric = consumer.metrics.get(new 
MetricName("fetch-throttle-time-max", "consumer-fetch-manager-metrics", "", 
tags))
+
+    TestUtils.waitUntilTrue(() => avgMetric.value > 0.0 && maxMetric.value > 
0.0,
+        s"Consumer throttle metric not updated: avg=${avgMetric.value} 
max=${maxMetric.value}")
+    maxThrottleTime.foreach(max => assertTrue(s"Maximum consumer throttle too 
high: ${maxMetric.value}", maxMetric.value <= max))
+  }
+
   private def throttleMetricName(quotaType: QuotaType, quotaId: QuotaId): 
MetricName = {
     leaderNode.metrics.metricName("throttle-time",
                                   quotaType.toString,

Reply via email to