This is an automated email from the ASF dual-hosted git repository.
jgus pushed a commit to branch 2.4
in repository https://gitbox.apache.org/repos/asf/kafka.git
The following commit(s) were added to refs/heads/2.4 by this push:
new dd8f823 KAFKA-9041; Flaky Test
LogCleanerIntegrationTest#testIsThreadFailed (#7542)
dd8f823 is described below
commit dd8f8230a80897d4d5807325c2f7816de9c4a2d3
Author: Viktor Somogyi <[email protected]>
AuthorDate: Sat Oct 19 19:13:19 2019 +0200
KAFKA-9041; Flaky Test LogCleanerIntegrationTest#testIsThreadFailed (#7542)
Aims to fix the flaky LogCleanerIntegrationTest#testIsThreadFailed by
changing how metrics are cleaned.
Reviewers: Jason Gustafson <[email protected]>
---
.../scala/unit/kafka/log/LogCleanerIntegrationTest.scala | 16 +++++++---------
1 file changed, 7 insertions(+), 9 deletions(-)
diff --git a/core/src/test/scala/unit/kafka/log/LogCleanerIntegrationTest.scala
b/core/src/test/scala/unit/kafka/log/LogCleanerIntegrationTest.scala
index 5697507..d148c3f 100644
--- a/core/src/test/scala/unit/kafka/log/LogCleanerIntegrationTest.scala
+++ b/core/src/test/scala/unit/kafka/log/LogCleanerIntegrationTest.scala
@@ -27,7 +27,7 @@ import org.apache.kafka.common.TopicPartition
import org.apache.kafka.common.record.{CompressionType, RecordBatch}
import org.apache.kafka.test.TestUtils.DEFAULT_MAX_WAIT_MS
import org.junit.Assert._
-import org.junit.Test
+import org.junit.{After, Test}
import scala.collection.JavaConverters.mapAsScalaMapConverter
import scala.collection.{Iterable, JavaConverters, Seq}
@@ -42,6 +42,11 @@ class LogCleanerIntegrationTest extends
AbstractLogCleanerIntegrationTest with K
val time = new MockTime()
val topicPartitions = Array(new TopicPartition("log", 0), new
TopicPartition("log", 1), new TopicPartition("log", 2))
+ @After
+ def cleanup(): Unit = {
+ TestUtils.clearYammerMetrics()
+ }
+
@Test(timeout = DEFAULT_MAX_WAIT_MS)
def testMarksPartitionsAsOfflineAndPopulatesUncleanableMetrics(): Unit = {
val largeMessageKey = 20
@@ -95,7 +100,7 @@ class LogCleanerIntegrationTest extends
AbstractLogCleanerIntegrationTest with K
}
private def getGauge[T](metricName: String): Gauge[T] = {
- getGauge(_.getName.endsWith(metricName))
+ getGauge(mName => mName.getName.endsWith(metricName) && mName.getScope ==
null)
}
private def getGauge[T](metricName: String, metricScope: String): Gauge[T] =
{
@@ -196,7 +201,6 @@ class LogCleanerIntegrationTest extends
AbstractLogCleanerIntegrationTest with K
@Test
def testIsThreadFailed(): Unit = {
val metricName = "DeadThreadCount"
- removeMetric(metricName) // remove the existing metric so it will be
attached to this object below on creation
cleaner = makeCleaner(partitions = topicPartitions, maxMessageSize =
100000, backOffMs = 100)
cleaner.startup()
assertEquals(0, cleaner.deadThreadCount)
@@ -211,10 +215,4 @@ class LogCleanerIntegrationTest extends
AbstractLogCleanerIntegrationTest with K
assertEquals(cleaner.cleaners.size, getGauge[Int](metricName).value())
assertEquals(cleaner.cleaners.size, cleaner.deadThreadCount)
}
-
- private def removeMetric(name: String): Unit = {
- val metricName = Metrics.defaultRegistry().allMetrics()
- .asScala.find(p => p._1.getName.endsWith(name)).get._1
- Metrics.defaultRegistry().removeMetric(metricName)
- }
}