This is an automated email from the ASF dual-hosted git repository.
divijv pushed a commit to branch trunk
in repository https://gitbox.apache.org/repos/asf/kafka.git
The following commit(s) were added to refs/heads/trunk by this push:
new 48eb8c90ef5 KAFKA-15129: [1/N] Remove metrics in LogCleanerManager
when LogCleaner shutdown (#13924)
48eb8c90ef5 is described below
commit 48eb8c90ef5f474e54694b74377d20f9e378174c
Author: hudeqi <[email protected]>
AuthorDate: Mon Jul 3 22:14:30 2023 +0800
KAFKA-15129: [1/N] Remove metrics in LogCleanerManager when LogCleaner
shutdown (#13924)
Reviewers: Divij Vaidya <[email protected]>, Christo Lolov
<[email protected]>
---------
Co-authored-by: Deqi Hu <[email protected]>
---
core/src/main/scala/kafka/log/LogCleaner.scala | 1 +
.../main/scala/kafka/log/LogCleanerManager.scala | 38 ++++++++++++++++++----
.../test/scala/unit/kafka/log/LogCleanerTest.scala | 21 ++++++++++--
3 files changed, 52 insertions(+), 8 deletions(-)
diff --git a/core/src/main/scala/kafka/log/LogCleaner.scala
b/core/src/main/scala/kafka/log/LogCleaner.scala
index 78afdc86b40..ff8a687b5ee 100644
--- a/core/src/main/scala/kafka/log/LogCleaner.scala
+++ b/core/src/main/scala/kafka/log/LogCleaner.scala
@@ -179,6 +179,7 @@ class LogCleaner(initialConfig: CleanerConfig,
*/
def removeMetrics(): Unit = {
LogCleaner.MetricNames.foreach(metricsGroup.removeMetric)
+ cleanerManager.removeMetrics()
}
/**
diff --git a/core/src/main/scala/kafka/log/LogCleanerManager.scala
b/core/src/main/scala/kafka/log/LogCleanerManager.scala
index ef5df50ca8f..e8a56f4567a 100755
--- a/core/src/main/scala/kafka/log/LogCleanerManager.scala
+++ b/core/src/main/scala/kafka/log/LogCleanerManager.scala
@@ -88,17 +88,23 @@ private[log] class LogCleanerManager(val logDirs: Seq[File],
/* for coordinating the pausing and the cleaning of a partition */
private val pausedCleaningCond = lock.newCondition()
+ // Avoid adding legacy tags for a metric when initializing
`LogCleanerManager`
+ GaugeMetricNameWithTag.clear()
/* gauges for tracking the number of partitions marked as uncleanable for
each log directory */
for (dir <- logDirs) {
- metricsGroup.newGauge("uncleanable-partitions-count",
+ val metricTag = Map("logDirectory" -> dir.getAbsolutePath).asJava
+ metricsGroup.newGauge(UncleanablePartitionsCountMetricName,
() => inLock(lock) {
uncleanablePartitions.get(dir.getAbsolutePath).map(_.size).getOrElse(0) },
- Map("logDirectory" -> dir.getAbsolutePath).asJava
+ metricTag
)
+
GaugeMetricNameWithTag.computeIfAbsent(UncleanablePartitionsCountMetricName, k
=> new java.util.ArrayList[java.util.Map[String, String]]())
+ .add(metricTag)
}
/* gauges for tracking the number of uncleanable bytes from uncleanable
partitions for each log directory */
for (dir <- logDirs) {
- metricsGroup.newGauge("uncleanable-bytes",
+ val metricTag = Map("logDirectory" -> dir.getAbsolutePath).asJava
+ metricsGroup.newGauge(UncleanableBytesMetricName,
() => inLock(lock) {
uncleanablePartitions.get(dir.getAbsolutePath) match {
case Some(partitions) =>
@@ -116,17 +122,19 @@ private[log] class LogCleanerManager(val logDirs:
Seq[File],
case None => 0
}
},
- Map("logDirectory" -> dir.getAbsolutePath).asJava
+ metricTag
)
+ GaugeMetricNameWithTag.computeIfAbsent(UncleanableBytesMetricName, k =>
new java.util.ArrayList[java.util.Map[String, String]]())
+ .add(metricTag)
}
/* a gauge for tracking the cleanable ratio of the dirtiest log */
@volatile private var dirtiestLogCleanableRatio = 0.0
- metricsGroup.newGauge("max-dirty-percent", () => (100 *
dirtiestLogCleanableRatio).toInt)
+ metricsGroup.newGauge(MaxDirtyPercentMetricName, () => (100 *
dirtiestLogCleanableRatio).toInt)
/* a gauge for tracking the time since the last log cleaner run, in milli
seconds */
@volatile private var timeOfLastRun: Long = Time.SYSTEM.milliseconds
- metricsGroup.newGauge("time-since-last-run-ms", () =>
Time.SYSTEM.milliseconds - timeOfLastRun)
+ metricsGroup.newGauge(TimeSinceLastRunMsMetricName, () =>
Time.SYSTEM.milliseconds - timeOfLastRun)
/**
* @return the position processed for all logs.
@@ -538,6 +546,13 @@ private[log] class LogCleanerManager(val logDirs:
Seq[File],
logDirsToRemove.foreach { uncleanablePartitions.remove }
}
}
+
+ def removeMetrics(): Unit = {
+ GaugeMetricNameNoTag.foreach(metricsGroup.removeMetric)
+ GaugeMetricNameWithTag.asScala.foreach(metricNameAndTags => {
+ metricNameAndTags._2.asScala.foreach(tag =>
metricsGroup.removeMetric(metricNameAndTags._1, tag))
+ })
+ }
}
/**
@@ -555,6 +570,17 @@ private case class OffsetsToClean(firstDirtyOffset: Long,
}
private[log] object LogCleanerManager extends Logging {
+ private val UncleanablePartitionsCountMetricName =
"uncleanable-partitions-count"
+ private val UncleanableBytesMetricName = "uncleanable-bytes"
+ private val MaxDirtyPercentMetricName = "max-dirty-percent"
+ private val TimeSinceLastRunMsMetricName = "time-since-last-run-ms"
+
+ private[log] val GaugeMetricNameWithTag = new java.util.HashMap[String,
java.util.List[java.util.Map[String, String]]]()
+
+ private[log] val GaugeMetricNameNoTag = Set(
+ MaxDirtyPercentMetricName,
+ TimeSinceLastRunMsMetricName
+ )
def isCompactAndDelete(log: UnifiedLog): Boolean = {
log.config.compact && log.config.delete
diff --git a/core/src/test/scala/unit/kafka/log/LogCleanerTest.scala
b/core/src/test/scala/unit/kafka/log/LogCleanerTest.scala
index 5a3ce6e43dd..f6e4f82f873 100644
--- a/core/src/test/scala/unit/kafka/log/LogCleanerTest.scala
+++ b/core/src/test/scala/unit/kafka/log/LogCleanerTest.scala
@@ -30,6 +30,7 @@ import org.apache.kafka.server.util.MockTime
import org.apache.kafka.storage.internals.log.{AbortedTxn, AppendOrigin,
CleanerConfig, LogAppendInfo, LogConfig, LogDirFailureChannel, LogFileUtils,
LogStartOffsetIncrementReason, OffsetMap, ProducerStateManager,
ProducerStateManagerConfig}
import org.junit.jupiter.api.Assertions._
import org.junit.jupiter.api.{AfterEach, Test}
+import org.mockito.ArgumentMatchers
import org.mockito.ArgumentMatchers.{any, anyString}
import org.mockito.Mockito.{mockConstruction, times, verify,
verifyNoMoreInteractions}
@@ -71,7 +72,7 @@ class LogCleanerTest {
val mockMetricsGroupCtor = mockConstruction(classOf[KafkaMetricsGroup])
try {
val logCleaner = new LogCleaner(new CleanerConfig(true),
- logDirs = Array(TestUtils.tempDir()),
+ logDirs = Array(TestUtils.tempDir(), TestUtils.tempDir()),
logs = new Pool[TopicPartition, UnifiedLog](),
logDirFailureChannel = new LogDirFailureChannel(1),
time = time)
@@ -83,11 +84,27 @@ class LogCleanerTest {
val numMetricsRegistered = LogCleaner.MetricNames.size
verify(mockMetricsGroup,
times(numMetricsRegistered)).newGauge(anyString(), any())
- // verify that each metric is removed
+ // verify that each metric in `LogCleaner` is removed
LogCleaner.MetricNames.foreach(verify(mockMetricsGroup).removeMetric(_))
+ // verify that each metric in `LogCleanerManager` is removed
+ val mockLogCleanerManagerMetricsGroup =
mockMetricsGroupCtor.constructed.get(1)
+ LogCleanerManager.GaugeMetricNameNoTag.foreach(metricName =>
verify(mockLogCleanerManagerMetricsGroup).newGauge(ArgumentMatchers.eq(metricName),
any()))
+
LogCleanerManager.GaugeMetricNameWithTag.asScala.foreach(metricNameAndTags => {
+ metricNameAndTags._2.asScala.foreach(tags => {
+
verify(mockLogCleanerManagerMetricsGroup).newGauge(ArgumentMatchers.eq(metricNameAndTags._1),
any(), ArgumentMatchers.eq(tags))
+ })
+ })
+
LogCleanerManager.GaugeMetricNameNoTag.foreach(verify(mockLogCleanerManagerMetricsGroup).removeMetric(_))
+
LogCleanerManager.GaugeMetricNameWithTag.asScala.foreach(metricNameAndTags => {
+ metricNameAndTags._2.asScala.foreach(tags => {
+
verify(mockLogCleanerManagerMetricsGroup).removeMetric(ArgumentMatchers.eq(metricNameAndTags._1),
ArgumentMatchers.eq(tags))
+ })
+ })
+
// assert that we have verified all invocations on
verifyNoMoreInteractions(mockMetricsGroup)
+ verifyNoMoreInteractions(mockLogCleanerManagerMetricsGroup)
} finally {
mockMetricsGroupCtor.close()
}