Repository: kafka Updated Branches: refs/heads/trunk f56bbb651 -> 50eacb7b4
MINOR: Fix one flaky test in MetricsTest and improve checks for another * Fix flakiness of `testBrokerTopicMetricsUnregisteredAfterDeletingTopic` by not consuming messages. Filed KAFKA-5238 to track the issue that metrics for a deleted topic may be re-created if there are fetch requests in the purgatory. * Check the log size in `testBrokerTopicMetricsBytesInOut` before attempting to read the `replicationBytesIn` metric. This helps understand where things have gone wrong if if the metric has not increased (i.e. if it was an issue replicating or with the metric). * Only remove the replication bytes in/out if the metrics are defined. This should not affect the behaviour due to the tags, but it makes the code clearer. We've seen some cases in Jenkins when the metric does not exist and it's still unclear how that can happen. Author: Ismael Juma <[email protected]> Reviewers: Rajini Sivaram <[email protected]> Closes #3042 from ijuma/more-informative-assertion-for-flaky-metrics-test Project: http://git-wip-us.apache.org/repos/asf/kafka/repo Commit: http://git-wip-us.apache.org/repos/asf/kafka/commit/50eacb7b Tree: http://git-wip-us.apache.org/repos/asf/kafka/tree/50eacb7b Diff: http://git-wip-us.apache.org/repos/asf/kafka/diff/50eacb7b Branch: refs/heads/trunk Commit: 50eacb7b45143dbc26a341620290991428194e4d Parents: f56bbb6 Author: Ismael Juma <[email protected]> Authored: Mon May 15 14:08:18 2017 +0100 Committer: Ismael Juma <[email protected]> Committed: Mon May 15 14:08:18 2017 +0100 ---------------------------------------------------------------------- .../kafka/server/KafkaRequestHandler.scala | 6 ++-- .../scala/unit/kafka/metrics/MetricsTest.scala | 35 +++++++++++++------- 2 files changed, 27 insertions(+), 14 deletions(-) ---------------------------------------------------------------------- http://git-wip-us.apache.org/repos/asf/kafka/blob/50eacb7b/core/src/main/scala/kafka/server/KafkaRequestHandler.scala ---------------------------------------------------------------------- diff --git a/core/src/main/scala/kafka/server/KafkaRequestHandler.scala b/core/src/main/scala/kafka/server/KafkaRequestHandler.scala index d1d63f1..8dfbe64 100755 --- a/core/src/main/scala/kafka/server/KafkaRequestHandler.scala +++ b/core/src/main/scala/kafka/server/KafkaRequestHandler.scala @@ -131,8 +131,10 @@ class BrokerTopicMetrics(name: Option[String]) extends KafkaMetricsGroup { removeMetric(BrokerTopicStats.BytesInPerSec, tags) removeMetric(BrokerTopicStats.BytesOutPerSec, tags) removeMetric(BrokerTopicStats.BytesRejectedPerSec, tags) - removeMetric(BrokerTopicStats.ReplicationBytesInPerSec, tags) - removeMetric(BrokerTopicStats.ReplicationBytesOutPerSec, tags) + if (replicationBytesInRate.isDefined) + removeMetric(BrokerTopicStats.ReplicationBytesInPerSec, tags) + if (replicationBytesOutRate.isDefined) + removeMetric(BrokerTopicStats.ReplicationBytesOutPerSec, tags) removeMetric(BrokerTopicStats.FailedProduceRequestsPerSec, tags) removeMetric(BrokerTopicStats.FailedFetchRequestsPerSec, tags) removeMetric(BrokerTopicStats.TotalProduceRequestsPerSec, tags) http://git-wip-us.apache.org/repos/asf/kafka/blob/50eacb7b/core/src/test/scala/unit/kafka/metrics/MetricsTest.scala ---------------------------------------------------------------------- diff --git a/core/src/test/scala/unit/kafka/metrics/MetricsTest.scala b/core/src/test/scala/unit/kafka/metrics/MetricsTest.scala index c586a54..745fea6 100644 --- a/core/src/test/scala/unit/kafka/metrics/MetricsTest.scala +++ b/core/src/test/scala/unit/kafka/metrics/MetricsTest.scala @@ -35,11 +35,11 @@ import scala.collection.JavaConverters._ import scala.util.matching.Regex import kafka.consumer.{ConsumerConfig, ZookeeperConsumerConnector} import kafka.log.LogConfig +import org.apache.kafka.common.TopicPartition class MetricsTest extends KafkaServerTestHarness with Logging { val numNodes = 2 val numParts = 2 - val topic = "topic1" val overridingProps = new Properties overridingProps.put(KafkaConfig.NumPartitionsProp, numParts.toString) @@ -52,6 +52,7 @@ class MetricsTest extends KafkaServerTestHarness with Logging { @Test @deprecated("This test has been deprecated and it will be removed in a future release", "0.10.0.0") def testMetricsLeak() { + val topic = "test-metrics-leak" // create topic topic1 with 1 partition on broker 0 createTopic(zkUtils, topic, numPartitions = 1, replicationFactor = 1, servers = servers) // force creation not client's specific metrics. @@ -74,19 +75,21 @@ class MetricsTest extends KafkaServerTestHarness with Logging { AdminUtils.createTopic(zkUtils, topic, 1, 1) AdminUtils.deleteTopic(zkUtils, topic) TestUtils.verifyTopicDeletion(zkUtils, topic, 1, servers) - assertFalse("Topic metrics exists after deleteTopic", checkTopicMetricsExists(topic)) + assertEquals("Topic metrics exists after deleteTopic", Set.empty, topicMetricGroups(topic)) } @Test def testBrokerTopicMetricsUnregisteredAfterDeletingTopic() { val topic = "test-broker-topic-metric" AdminUtils.createTopic(zkUtils, topic, 2, 1) - createAndShutdownStep(topic, "group0", "consumer0", "producer0") - assertTrue("Topic metrics don't exist", checkTopicMetricsExists(topic)) + // Produce a few messages to create the metrics + // Don't consume messages as it may cause metrics to be re-created causing the test to fail, see KAFKA-5238 + TestUtils.produceMessages(servers, topic, nMessages) + assertTrue("Topic metrics don't exist", topicMetricGroups(topic).nonEmpty) assertNotNull(BrokerTopicStats.getBrokerTopicStats(topic)) AdminUtils.deleteTopic(zkUtils, topic) TestUtils.verifyTopicDeletion(zkUtils, topic, 1, servers) - assertFalse("Topic metrics exists after deleteTopic", checkTopicMetricsExists(topic)) + assertEquals("Topic metrics exists after deleteTopic", Set.empty, topicMetricGroups(topic)) } @Test @@ -110,6 +113,7 @@ class MetricsTest extends KafkaServerTestHarness with Logging { @Test def testBrokerTopicMetricsBytesInOut(): Unit = { + val topic = "test-bytes-in-out" val replicationBytesIn = BrokerTopicStats.ReplicationBytesInPerSec val replicationBytesOut = BrokerTopicStats.ReplicationBytesOutPerSec val bytesIn = s"${BrokerTopicStats.BytesInPerSec},topic=$topic" @@ -121,6 +125,17 @@ class MetricsTest extends KafkaServerTestHarness with Logging { // Produce a few messages to create the metrics TestUtils.produceMessages(servers, topic, nMessages) + // Check the log size for each broker so that we can distinguish between failures caused by replication issues + // versus failures caused by the metrics + val topicPartition = new TopicPartition(topic, 0) + servers.foreach { server => + val log = server.logManager.logsByTopicPartition.get(new TopicPartition(topic, 0)) + val brokerId = server.config.brokerId + val logSize = log.map(_.size) + assertTrue(s"Expected broker $brokerId to have a Log for $topicPartition with positive size, actual: $logSize", + logSize.map(_ > 0).getOrElse(false)) + } + val initialReplicationBytesIn = meterCount(replicationBytesIn) val initialReplicationBytesOut = meterCount(replicationBytesOut) val initialBytesIn = meterCount(bytesIn) @@ -151,13 +166,9 @@ class MetricsTest extends KafkaServerTestHarness with Logging { .count } - private def checkTopicMetricsExists(topic: String): Boolean = { + private def topicMetricGroups(topic: String): Set[String] = { val topicMetricRegex = new Regex(".*BrokerTopicMetrics.*("+topic+")$") - val metricGroups = Metrics.defaultRegistry.groupedMetrics(MetricPredicate.ALL).entrySet - for (metricGroup <- metricGroups.asScala) { - if (topicMetricRegex.pattern.matcher(metricGroup.getKey).matches) - return true - } - false + val metricGroups = Metrics.defaultRegistry.groupedMetrics(MetricPredicate.ALL).keySet.asScala + metricGroups.filter(topicMetricRegex.pattern.matcher(_).matches) } }
