This is an automated email from the ASF dual-hosted git repository.

divijv pushed a commit to branch trunk
in repository https://gitbox.apache.org/repos/asf/kafka.git


The following commit(s) were added to refs/heads/trunk by this push:
     new 48eb8c90ef5 KAFKA-15129: [1/N] Remove metrics in LogCleanerManager 
when LogCleaner shutdown (#13924)
48eb8c90ef5 is described below

commit 48eb8c90ef5f474e54694b74377d20f9e378174c
Author: hudeqi <[email protected]>
AuthorDate: Mon Jul 3 22:14:30 2023 +0800

    KAFKA-15129: [1/N] Remove metrics in LogCleanerManager when LogCleaner 
shutdown (#13924)
    
    Reviewers: Divij Vaidya <[email protected]>, Christo Lolov 
<[email protected]>
    
    ---------
    
    Co-authored-by: Deqi Hu <[email protected]>
---
 core/src/main/scala/kafka/log/LogCleaner.scala     |  1 +
 .../main/scala/kafka/log/LogCleanerManager.scala   | 38 ++++++++++++++++++----
 .../test/scala/unit/kafka/log/LogCleanerTest.scala | 21 ++++++++++--
 3 files changed, 52 insertions(+), 8 deletions(-)

diff --git a/core/src/main/scala/kafka/log/LogCleaner.scala 
b/core/src/main/scala/kafka/log/LogCleaner.scala
index 78afdc86b40..ff8a687b5ee 100644
--- a/core/src/main/scala/kafka/log/LogCleaner.scala
+++ b/core/src/main/scala/kafka/log/LogCleaner.scala
@@ -179,6 +179,7 @@ class LogCleaner(initialConfig: CleanerConfig,
    */
   def removeMetrics(): Unit = {
     LogCleaner.MetricNames.foreach(metricsGroup.removeMetric)
+    cleanerManager.removeMetrics()
   }
 
   /**
diff --git a/core/src/main/scala/kafka/log/LogCleanerManager.scala 
b/core/src/main/scala/kafka/log/LogCleanerManager.scala
index ef5df50ca8f..e8a56f4567a 100755
--- a/core/src/main/scala/kafka/log/LogCleanerManager.scala
+++ b/core/src/main/scala/kafka/log/LogCleanerManager.scala
@@ -88,17 +88,23 @@ private[log] class LogCleanerManager(val logDirs: Seq[File],
   /* for coordinating the pausing and the cleaning of a partition */
   private val pausedCleaningCond = lock.newCondition()
 
+  // Avoid adding legacy tags for a metric when initializing 
`LogCleanerManager`
+  GaugeMetricNameWithTag.clear()
   /* gauges for tracking the number of partitions marked as uncleanable for 
each log directory */
   for (dir <- logDirs) {
-    metricsGroup.newGauge("uncleanable-partitions-count",
+    val metricTag = Map("logDirectory" -> dir.getAbsolutePath).asJava
+    metricsGroup.newGauge(UncleanablePartitionsCountMetricName,
       () => inLock(lock) { 
uncleanablePartitions.get(dir.getAbsolutePath).map(_.size).getOrElse(0) },
-      Map("logDirectory" -> dir.getAbsolutePath).asJava
+      metricTag
     )
+    
GaugeMetricNameWithTag.computeIfAbsent(UncleanablePartitionsCountMetricName, k 
=> new java.util.ArrayList[java.util.Map[String, String]]())
+      .add(metricTag)
   }
 
   /* gauges for tracking the number of uncleanable bytes from uncleanable 
partitions for each log directory */
   for (dir <- logDirs) {
-    metricsGroup.newGauge("uncleanable-bytes",
+    val metricTag = Map("logDirectory" -> dir.getAbsolutePath).asJava
+    metricsGroup.newGauge(UncleanableBytesMetricName,
       () => inLock(lock) {
         uncleanablePartitions.get(dir.getAbsolutePath) match {
           case Some(partitions) =>
@@ -116,17 +122,19 @@ private[log] class LogCleanerManager(val logDirs: 
Seq[File],
           case None => 0
         }
       },
-      Map("logDirectory" -> dir.getAbsolutePath).asJava
+      metricTag
     )
+    GaugeMetricNameWithTag.computeIfAbsent(UncleanableBytesMetricName, k => 
new java.util.ArrayList[java.util.Map[String, String]]())
+      .add(metricTag)
   }
 
   /* a gauge for tracking the cleanable ratio of the dirtiest log */
   @volatile private var dirtiestLogCleanableRatio = 0.0
-  metricsGroup.newGauge("max-dirty-percent", () => (100 * 
dirtiestLogCleanableRatio).toInt)
+  metricsGroup.newGauge(MaxDirtyPercentMetricName, () => (100 * 
dirtiestLogCleanableRatio).toInt)
 
   /* a gauge for tracking the time since the last log cleaner run, in milli 
seconds */
   @volatile private var timeOfLastRun: Long = Time.SYSTEM.milliseconds
-  metricsGroup.newGauge("time-since-last-run-ms", () => 
Time.SYSTEM.milliseconds - timeOfLastRun)
+  metricsGroup.newGauge(TimeSinceLastRunMsMetricName, () => 
Time.SYSTEM.milliseconds - timeOfLastRun)
 
   /**
    * @return the position processed for all logs.
@@ -538,6 +546,13 @@ private[log] class LogCleanerManager(val logDirs: 
Seq[File],
       logDirsToRemove.foreach { uncleanablePartitions.remove }
     }
   }
+
+  def removeMetrics(): Unit = {
+    GaugeMetricNameNoTag.foreach(metricsGroup.removeMetric)
+    GaugeMetricNameWithTag.asScala.foreach(metricNameAndTags => {
+      metricNameAndTags._2.asScala.foreach(tag => 
metricsGroup.removeMetric(metricNameAndTags._1, tag))
+    })
+  }
 }
 
 /**
@@ -555,6 +570,17 @@ private case class OffsetsToClean(firstDirtyOffset: Long,
 }
 
 private[log] object LogCleanerManager extends Logging {
+  private val UncleanablePartitionsCountMetricName = 
"uncleanable-partitions-count"
+  private val UncleanableBytesMetricName = "uncleanable-bytes"
+  private val MaxDirtyPercentMetricName = "max-dirty-percent"
+  private val TimeSinceLastRunMsMetricName = "time-since-last-run-ms"
+
+  private[log] val GaugeMetricNameWithTag = new java.util.HashMap[String, 
java.util.List[java.util.Map[String, String]]]()
+
+  private[log] val GaugeMetricNameNoTag = Set(
+    MaxDirtyPercentMetricName,
+    TimeSinceLastRunMsMetricName
+  )
 
   def isCompactAndDelete(log: UnifiedLog): Boolean = {
     log.config.compact && log.config.delete
diff --git a/core/src/test/scala/unit/kafka/log/LogCleanerTest.scala 
b/core/src/test/scala/unit/kafka/log/LogCleanerTest.scala
index 5a3ce6e43dd..f6e4f82f873 100644
--- a/core/src/test/scala/unit/kafka/log/LogCleanerTest.scala
+++ b/core/src/test/scala/unit/kafka/log/LogCleanerTest.scala
@@ -30,6 +30,7 @@ import org.apache.kafka.server.util.MockTime
 import org.apache.kafka.storage.internals.log.{AbortedTxn, AppendOrigin, 
CleanerConfig, LogAppendInfo, LogConfig, LogDirFailureChannel, LogFileUtils, 
LogStartOffsetIncrementReason, OffsetMap, ProducerStateManager, 
ProducerStateManagerConfig}
 import org.junit.jupiter.api.Assertions._
 import org.junit.jupiter.api.{AfterEach, Test}
+import org.mockito.ArgumentMatchers
 import org.mockito.ArgumentMatchers.{any, anyString}
 import org.mockito.Mockito.{mockConstruction, times, verify, 
verifyNoMoreInteractions}
 
@@ -71,7 +72,7 @@ class LogCleanerTest {
     val mockMetricsGroupCtor = mockConstruction(classOf[KafkaMetricsGroup])
     try {
       val logCleaner = new LogCleaner(new CleanerConfig(true),
-        logDirs = Array(TestUtils.tempDir()),
+        logDirs = Array(TestUtils.tempDir(), TestUtils.tempDir()),
         logs = new Pool[TopicPartition, UnifiedLog](),
         logDirFailureChannel = new LogDirFailureChannel(1),
         time = time)
@@ -83,11 +84,27 @@ class LogCleanerTest {
       val numMetricsRegistered = LogCleaner.MetricNames.size
       verify(mockMetricsGroup, 
times(numMetricsRegistered)).newGauge(anyString(), any())
       
-      // verify that each metric is removed
+      // verify that each metric in `LogCleaner` is removed
       LogCleaner.MetricNames.foreach(verify(mockMetricsGroup).removeMetric(_))
 
+      // verify that each metric in `LogCleanerManager` is removed
+      val mockLogCleanerManagerMetricsGroup = 
mockMetricsGroupCtor.constructed.get(1)
+      LogCleanerManager.GaugeMetricNameNoTag.foreach(metricName => 
verify(mockLogCleanerManagerMetricsGroup).newGauge(ArgumentMatchers.eq(metricName),
 any()))
+      
LogCleanerManager.GaugeMetricNameWithTag.asScala.foreach(metricNameAndTags => {
+        metricNameAndTags._2.asScala.foreach(tags => {
+          
verify(mockLogCleanerManagerMetricsGroup).newGauge(ArgumentMatchers.eq(metricNameAndTags._1),
 any(), ArgumentMatchers.eq(tags))
+        })
+      })
+      
LogCleanerManager.GaugeMetricNameNoTag.foreach(verify(mockLogCleanerManagerMetricsGroup).removeMetric(_))
+      
LogCleanerManager.GaugeMetricNameWithTag.asScala.foreach(metricNameAndTags => {
+        metricNameAndTags._2.asScala.foreach(tags => {
+          
verify(mockLogCleanerManagerMetricsGroup).removeMetric(ArgumentMatchers.eq(metricNameAndTags._1),
 ArgumentMatchers.eq(tags))
+        })
+      })
+
       // assert that we have verified all invocations on
       verifyNoMoreInteractions(mockMetricsGroup)
+      verifyNoMoreInteractions(mockLogCleanerManagerMetricsGroup)
     } finally {
       mockMetricsGroupCtor.close()
     }

Reply via email to