Repository: kafka
Updated Branches:
  refs/heads/trunk f56bbb651 -> 50eacb7b4


MINOR: Fix one flaky test in MetricsTest and improve checks for another

* Fix flakiness of `testBrokerTopicMetricsUnregisteredAfterDeletingTopic` by not
consuming messages. Filed KAFKA-5238 to track the issue that metrics for a 
deleted
topic may be re-created if there are fetch requests in the purgatory.

* Check the log size in `testBrokerTopicMetricsBytesInOut` before attempting to 
read
the `replicationBytesIn` metric. This helps understand where things have gone 
wrong
if if the metric has not increased (i.e. if it was an issue replicating or with 
the metric).

* Only remove the replication bytes in/out if the metrics are defined. This 
should not
affect the behaviour due to the tags, but it makes the code clearer. We've seen 
some
cases in Jenkins when the metric does not exist and it's still unclear how that 
can
happen.

Author: Ismael Juma <[email protected]>

Reviewers: Rajini Sivaram <[email protected]>

Closes #3042 from ijuma/more-informative-assertion-for-flaky-metrics-test


Project: http://git-wip-us.apache.org/repos/asf/kafka/repo
Commit: http://git-wip-us.apache.org/repos/asf/kafka/commit/50eacb7b
Tree: http://git-wip-us.apache.org/repos/asf/kafka/tree/50eacb7b
Diff: http://git-wip-us.apache.org/repos/asf/kafka/diff/50eacb7b

Branch: refs/heads/trunk
Commit: 50eacb7b45143dbc26a341620290991428194e4d
Parents: f56bbb6
Author: Ismael Juma <[email protected]>
Authored: Mon May 15 14:08:18 2017 +0100
Committer: Ismael Juma <[email protected]>
Committed: Mon May 15 14:08:18 2017 +0100

----------------------------------------------------------------------
 .../kafka/server/KafkaRequestHandler.scala      |  6 ++--
 .../scala/unit/kafka/metrics/MetricsTest.scala  | 35 +++++++++++++-------
 2 files changed, 27 insertions(+), 14 deletions(-)
----------------------------------------------------------------------


http://git-wip-us.apache.org/repos/asf/kafka/blob/50eacb7b/core/src/main/scala/kafka/server/KafkaRequestHandler.scala
----------------------------------------------------------------------
diff --git a/core/src/main/scala/kafka/server/KafkaRequestHandler.scala 
b/core/src/main/scala/kafka/server/KafkaRequestHandler.scala
index d1d63f1..8dfbe64 100755
--- a/core/src/main/scala/kafka/server/KafkaRequestHandler.scala
+++ b/core/src/main/scala/kafka/server/KafkaRequestHandler.scala
@@ -131,8 +131,10 @@ class BrokerTopicMetrics(name: Option[String]) extends 
KafkaMetricsGroup {
     removeMetric(BrokerTopicStats.BytesInPerSec, tags)
     removeMetric(BrokerTopicStats.BytesOutPerSec, tags)
     removeMetric(BrokerTopicStats.BytesRejectedPerSec, tags)
-    removeMetric(BrokerTopicStats.ReplicationBytesInPerSec, tags)
-    removeMetric(BrokerTopicStats.ReplicationBytesOutPerSec, tags)
+    if (replicationBytesInRate.isDefined)
+      removeMetric(BrokerTopicStats.ReplicationBytesInPerSec, tags)
+    if (replicationBytesOutRate.isDefined)
+      removeMetric(BrokerTopicStats.ReplicationBytesOutPerSec, tags)
     removeMetric(BrokerTopicStats.FailedProduceRequestsPerSec, tags)
     removeMetric(BrokerTopicStats.FailedFetchRequestsPerSec, tags)
     removeMetric(BrokerTopicStats.TotalProduceRequestsPerSec, tags)

http://git-wip-us.apache.org/repos/asf/kafka/blob/50eacb7b/core/src/test/scala/unit/kafka/metrics/MetricsTest.scala
----------------------------------------------------------------------
diff --git a/core/src/test/scala/unit/kafka/metrics/MetricsTest.scala 
b/core/src/test/scala/unit/kafka/metrics/MetricsTest.scala
index c586a54..745fea6 100644
--- a/core/src/test/scala/unit/kafka/metrics/MetricsTest.scala
+++ b/core/src/test/scala/unit/kafka/metrics/MetricsTest.scala
@@ -35,11 +35,11 @@ import scala.collection.JavaConverters._
 import scala.util.matching.Regex
 import kafka.consumer.{ConsumerConfig, ZookeeperConsumerConnector}
 import kafka.log.LogConfig
+import org.apache.kafka.common.TopicPartition
 
 class MetricsTest extends KafkaServerTestHarness with Logging {
   val numNodes = 2
   val numParts = 2
-  val topic = "topic1"
 
   val overridingProps = new Properties
   overridingProps.put(KafkaConfig.NumPartitionsProp, numParts.toString)
@@ -52,6 +52,7 @@ class MetricsTest extends KafkaServerTestHarness with Logging 
{
   @Test
   @deprecated("This test has been deprecated and it will be removed in a 
future release", "0.10.0.0")
   def testMetricsLeak() {
+    val topic = "test-metrics-leak"
     // create topic topic1 with 1 partition on broker 0
     createTopic(zkUtils, topic, numPartitions = 1, replicationFactor = 1, 
servers = servers)
     // force creation not client's specific metrics.
@@ -74,19 +75,21 @@ class MetricsTest extends KafkaServerTestHarness with 
Logging {
     AdminUtils.createTopic(zkUtils, topic, 1, 1)
     AdminUtils.deleteTopic(zkUtils, topic)
     TestUtils.verifyTopicDeletion(zkUtils, topic, 1, servers)
-    assertFalse("Topic metrics exists after deleteTopic", 
checkTopicMetricsExists(topic))
+    assertEquals("Topic metrics exists after deleteTopic", Set.empty, 
topicMetricGroups(topic))
   }
 
   @Test
   def testBrokerTopicMetricsUnregisteredAfterDeletingTopic() {
     val topic = "test-broker-topic-metric"
     AdminUtils.createTopic(zkUtils, topic, 2, 1)
-    createAndShutdownStep(topic, "group0", "consumer0", "producer0")
-    assertTrue("Topic metrics don't exist", checkTopicMetricsExists(topic))
+    // Produce a few messages to create the metrics
+    // Don't consume messages as it may cause metrics to be re-created causing 
the test to fail, see KAFKA-5238
+    TestUtils.produceMessages(servers, topic, nMessages)
+    assertTrue("Topic metrics don't exist", topicMetricGroups(topic).nonEmpty)
     assertNotNull(BrokerTopicStats.getBrokerTopicStats(topic))
     AdminUtils.deleteTopic(zkUtils, topic)
     TestUtils.verifyTopicDeletion(zkUtils, topic, 1, servers)
-    assertFalse("Topic metrics exists after deleteTopic", 
checkTopicMetricsExists(topic))
+    assertEquals("Topic metrics exists after deleteTopic", Set.empty, 
topicMetricGroups(topic))
   }
 
   @Test
@@ -110,6 +113,7 @@ class MetricsTest extends KafkaServerTestHarness with 
Logging {
 
   @Test
   def testBrokerTopicMetricsBytesInOut(): Unit = {
+    val topic = "test-bytes-in-out"
     val replicationBytesIn = BrokerTopicStats.ReplicationBytesInPerSec
     val replicationBytesOut = BrokerTopicStats.ReplicationBytesOutPerSec
     val bytesIn = s"${BrokerTopicStats.BytesInPerSec},topic=$topic"
@@ -121,6 +125,17 @@ class MetricsTest extends KafkaServerTestHarness with 
Logging {
     // Produce a few messages to create the metrics
     TestUtils.produceMessages(servers, topic, nMessages)
 
+    // Check the log size for each broker so that we can distinguish between 
failures caused by replication issues
+    // versus failures caused by the metrics
+    val topicPartition = new TopicPartition(topic, 0)
+    servers.foreach { server =>
+      val log = server.logManager.logsByTopicPartition.get(new 
TopicPartition(topic, 0))
+      val brokerId = server.config.brokerId
+      val logSize = log.map(_.size)
+      assertTrue(s"Expected broker $brokerId to have a Log for $topicPartition 
with positive size, actual: $logSize",
+        logSize.map(_ > 0).getOrElse(false))
+    }
+
     val initialReplicationBytesIn = meterCount(replicationBytesIn)
     val initialReplicationBytesOut = meterCount(replicationBytesOut)
     val initialBytesIn = meterCount(bytesIn)
@@ -151,13 +166,9 @@ class MetricsTest extends KafkaServerTestHarness with 
Logging {
       .count
   }
 
-  private def checkTopicMetricsExists(topic: String): Boolean = {
+  private def topicMetricGroups(topic: String): Set[String] = {
     val topicMetricRegex = new Regex(".*BrokerTopicMetrics.*("+topic+")$")
-    val metricGroups = 
Metrics.defaultRegistry.groupedMetrics(MetricPredicate.ALL).entrySet
-    for (metricGroup <- metricGroups.asScala) {
-      if (topicMetricRegex.pattern.matcher(metricGroup.getKey).matches)
-        return true
-    }
-    false
+    val metricGroups = 
Metrics.defaultRegistry.groupedMetrics(MetricPredicate.ALL).keySet.asScala
+    metricGroups.filter(topicMetricRegex.pattern.matcher(_).matches)
   }
 }

Reply via email to