Repository: kafka Updated Branches: refs/heads/trunk c5aeaa7d8 -> 1af096039
MINOR: Various small improvements to kafka.metrics.MetricsTest `testBrokerTopicMetricsUnregisteredAfterDeletingTopic` seemed completely broken, as it was creating a topic but producing/consuming to another one. Authored with mpburg Author: Mickael Maison <[email protected]> Reviewers: Ismael Juma <[email protected]> Closes #3034 from mimaison/Fix-testBrokerTopicMetricsBytesInOut Project: http://git-wip-us.apache.org/repos/asf/kafka/repo Commit: http://git-wip-us.apache.org/repos/asf/kafka/commit/1af09603 Tree: http://git-wip-us.apache.org/repos/asf/kafka/tree/1af09603 Diff: http://git-wip-us.apache.org/repos/asf/kafka/diff/1af09603 Branch: refs/heads/trunk Commit: 1af096039d7a2d8fd865b0eaee28c70f10e07008 Parents: c5aeaa7 Author: Mickael Maison <[email protected]> Authored: Sat May 13 09:43:58 2017 +0100 Committer: Ismael Juma <[email protected]> Committed: Sat May 13 09:45:17 2017 +0100 ---------------------------------------------------------------------- .../scala/unit/kafka/metrics/MetricsTest.scala | 27 ++++++++++---------- 1 file changed, 14 insertions(+), 13 deletions(-) ---------------------------------------------------------------------- http://git-wip-us.apache.org/repos/asf/kafka/blob/1af09603/core/src/test/scala/unit/kafka/metrics/MetricsTest.scala ---------------------------------------------------------------------- diff --git a/core/src/test/scala/unit/kafka/metrics/MetricsTest.scala b/core/src/test/scala/unit/kafka/metrics/MetricsTest.scala index d54e0b3..c586a54 100644 --- a/core/src/test/scala/unit/kafka/metrics/MetricsTest.scala +++ b/core/src/test/scala/unit/kafka/metrics/MetricsTest.scala @@ -41,7 +41,7 @@ class MetricsTest extends KafkaServerTestHarness with Logging { val numParts = 2 val topic = "topic1" - val overridingProps = new Properties() + val overridingProps = new Properties overridingProps.put(KafkaConfig.NumPartitionsProp, numParts.toString) def generateConfigs() = @@ -55,16 +55,16 @@ class MetricsTest extends KafkaServerTestHarness with Logging { // create topic topic1 with 1 partition on broker 0 createTopic(zkUtils, topic, numPartitions = 1, replicationFactor = 1, servers = servers) // force creation not client's specific metrics. - createAndShutdownStep("group0", "consumer0", "producer0") + createAndShutdownStep(topic, "group0", "consumer0", "producer0") //this assertion is only used for creating the metrics for DelayedFetchMetrics, it should never fail, but should not be removed assertNotNull(DelayedFetchMetrics) - val countOfStaticMetrics = Metrics.defaultRegistry().allMetrics().keySet().size + val countOfStaticMetrics = Metrics.defaultRegistry.allMetrics.keySet.size for (i <- 0 to 5) { - createAndShutdownStep("group" + i % 3, "consumer" + i % 2, "producer" + i % 2) - assertEquals(countOfStaticMetrics, Metrics.defaultRegistry().allMetrics().keySet().size) + createAndShutdownStep(topic, "group" + i % 3, "consumer" + i % 2, "producer" + i % 2) + assertEquals(countOfStaticMetrics, Metrics.defaultRegistry.allMetrics.keySet.size) } } @@ -81,7 +81,8 @@ class MetricsTest extends KafkaServerTestHarness with Logging { def testBrokerTopicMetricsUnregisteredAfterDeletingTopic() { val topic = "test-broker-topic-metric" AdminUtils.createTopic(zkUtils, topic, 2, 1) - createAndShutdownStep("group0", "consumer0", "producer0") + createAndShutdownStep(topic, "group0", "consumer0", "producer0") + assertTrue("Topic metrics don't exist", checkTopicMetricsExists(topic)) assertNotNull(BrokerTopicStats.getBrokerTopicStats(topic)) AdminUtils.deleteTopic(zkUtils, topic) TestUtils.verifyTopicDeletion(zkUtils, topic, 1, servers) @@ -91,17 +92,17 @@ class MetricsTest extends KafkaServerTestHarness with Logging { @Test def testClusterIdMetric(): Unit = { // Check if clusterId metric exists. - val metrics = Metrics.defaultRegistry().allMetrics + val metrics = Metrics.defaultRegistry.allMetrics assertEquals(metrics.keySet.asScala.count(_.getMBeanName == "kafka.server:type=KafkaServer,name=ClusterId"), 1) } @deprecated("This test has been deprecated and it will be removed in a future release", "0.10.0.0") - def createAndShutdownStep(group: String, consumerId: String, producerId: String): Unit = { + def createAndShutdownStep(topic: String, group: String, consumerId: String, producerId: String): Unit = { sendMessages(servers, topic, nMessages) // create a consumer val consumerConfig1 = new ConsumerConfig(TestUtils.createConsumerProperties(zkConnect, group, consumerId)) val zkConsumerConnector1 = new ZookeeperConsumerConnector(consumerConfig1, true) - val topicMessageStreams1 = zkConsumerConnector1.createMessageStreams(Map(topic -> 1), new StringDecoder(), new StringDecoder()) + val topicMessageStreams1 = zkConsumerConnector1.createMessageStreams(Map(topic -> 1), new StringDecoder, new StringDecoder) getMessages(topicMessageStreams1, nMessages) zkConsumerConnector1.shutdown() @@ -114,7 +115,7 @@ class MetricsTest extends KafkaServerTestHarness with Logging { val bytesIn = s"${BrokerTopicStats.BytesInPerSec},topic=$topic" val bytesOut = s"${BrokerTopicStats.BytesOutPerSec},topic=$topic" - val topicConfig = new Properties() + val topicConfig = new Properties topicConfig.setProperty(LogConfig.MinInSyncReplicasProp, "2") createTopic(zkUtils, topic, 1, numNodes, servers, topicConfig) // Produce a few messages to create the metrics @@ -151,10 +152,10 @@ class MetricsTest extends KafkaServerTestHarness with Logging { } private def checkTopicMetricsExists(topic: String): Boolean = { - val topicMetricRegex = new Regex(".*("+topic+")$") - val metricGroups = Metrics.defaultRegistry().groupedMetrics(MetricPredicate.ALL).entrySet() + val topicMetricRegex = new Regex(".*BrokerTopicMetrics.*("+topic+")$") + val metricGroups = Metrics.defaultRegistry.groupedMetrics(MetricPredicate.ALL).entrySet for (metricGroup <- metricGroups.asScala) { - if (topicMetricRegex.pattern.matcher(metricGroup.getKey()).matches) + if (topicMetricRegex.pattern.matcher(metricGroup.getKey).matches) return true } false
