This is an automated email from the ASF dual-hosted git repository.

chia7712 pushed a commit to branch 4.0
in repository https://gitbox.apache.org/repos/asf/kafka.git


The following commit(s) were added to refs/heads/4.0 by this push:
     new 15ec0536654 KAFKA-18656 Backport KAFKA-18597 to 4.0 (#20026)
15ec0536654 is described below

commit 15ec0536654f36bea73945f594f640b2592d689e
Author: Logan Zhu <loganzh...@outlook.com>
AuthorDate: Wed Jun 25 19:19:30 2025 +0800

    KAFKA-18656 Backport KAFKA-18597 to 4.0 (#20026)
    
    Backport of [KAFKA-18597](https://github.com/apache/kafka/pull/18627) to
    the 4.0 branch.
    
    Reviewers: Chia-Ping Tsai <chia7...@gmail.com>
---
 core/src/main/scala/kafka/log/LogCleaner.scala     |  19 +--
 .../test/scala/unit/kafka/log/LogCleanerTest.scala | 168 ++++++++++++++++++---
 2 files changed, 153 insertions(+), 34 deletions(-)

diff --git a/core/src/main/scala/kafka/log/LogCleaner.scala 
b/core/src/main/scala/kafka/log/LogCleaner.scala
index 5d7ee518963..e1f4d4afa2d 100644
--- a/core/src/main/scala/kafka/log/LogCleaner.scala
+++ b/core/src/main/scala/kafka/log/LogCleaner.scala
@@ -117,14 +117,14 @@ class LogCleaner(initialConfig: CleanerConfig,
 
   /**
    * @param f to compute the result
-   * @return the max value (int value) or 0 if there is no cleaner
+   * @return the max value or 0 if there is no cleaner
    */
-  private[log] def maxOverCleanerThreads(f: CleanerThread => Double): Int =
-    cleaners.map(f).maxOption.getOrElse(0.0d).toInt
+  private[log] def maxOverCleanerThreads(f: CleanerThread => Double): Double =
+    cleaners.map(f).maxOption.getOrElse(0.0d)
 
   /* a metric to track the maximum utilization of any thread's buffer in the 
last cleaning */
   metricsGroup.newGauge(MaxBufferUtilizationPercentMetricName,
-    () => maxOverCleanerThreads(_.lastStats.bufferUtilization) * 100)
+    () => (maxOverCleanerThreads(_.lastStats.bufferUtilization) * 100).toInt)
 
   /* a metric to track the recopy rate of each thread's last cleaning */
   metricsGroup.newGauge(CleanerRecopyPercentMetricName, () => {
@@ -134,12 +134,12 @@ class LogCleaner(initialConfig: CleanerConfig,
   })
 
   /* a metric to track the maximum cleaning time for the last cleaning from 
each thread */
-  metricsGroup.newGauge(MaxCleanTimeMetricName, () => 
maxOverCleanerThreads(_.lastStats.elapsedSecs))
+  metricsGroup.newGauge(MaxCleanTimeMetricName, () => 
maxOverCleanerThreads(_.lastStats.elapsedSecs).toInt)
 
   // a metric to track delay between the time when a log is required to be 
compacted
   // as determined by max compaction lag and the time of last cleaner run.
   metricsGroup.newGauge(MaxCompactionDelayMetricsName,
-    () => 
maxOverCleanerThreads(_.lastPreCleanStats.maxCompactionDelayMs.toDouble) / 1000)
+    () => 
(maxOverCleanerThreads(_.lastPreCleanStats.maxCompactionDelayMs.toDouble) / 
1000).toInt)
 
   metricsGroup.newGauge(DeadThreadCountMetricName, () => deadThreadCount)
 
@@ -523,10 +523,11 @@ object LogCleaner {
 
   }
 
-  private val MaxBufferUtilizationPercentMetricName = 
"max-buffer-utilization-percent"
+  // Visible for test.
+  private[log] val MaxBufferUtilizationPercentMetricName = 
"max-buffer-utilization-percent"
   private val CleanerRecopyPercentMetricName = "cleaner-recopy-percent"
-  private val MaxCleanTimeMetricName = "max-clean-time-secs"
-  private val MaxCompactionDelayMetricsName = "max-compaction-delay-secs"
+  private[log] val MaxCleanTimeMetricName = "max-clean-time-secs"
+  private[log] val MaxCompactionDelayMetricsName = "max-compaction-delay-secs"
   private val DeadThreadCountMetricName = "DeadThreadCount"
   // package private for testing
   private[log] val MetricNames = Set(
diff --git a/core/src/test/scala/unit/kafka/log/LogCleanerTest.scala 
b/core/src/test/scala/unit/kafka/log/LogCleanerTest.scala
index 106a4a78a93..9ca39163194 100644
--- a/core/src/test/scala/unit/kafka/log/LogCleanerTest.scala
+++ b/core/src/test/scala/unit/kafka/log/LogCleanerTest.scala
@@ -18,6 +18,7 @@
 package kafka.log
 
 import kafka.common._
+import kafka.log.LogCleaner.{MaxBufferUtilizationPercentMetricName, 
MaxCleanTimeMetricName, MaxCompactionDelayMetricsName}
 import kafka.server.KafkaConfig
 import kafka.utils.{CoreUtils, Logging, Pool, TestUtils}
 import org.apache.kafka.common.TopicPartition
@@ -2048,42 +2049,159 @@ class LogCleanerTest extends Logging {
   }
 
   @Test
-  def testMaxOverCleanerThreads(): Unit = {
-    val logCleaner =  new LogCleaner(new CleanerConfig(true),
+  def testMaxBufferUtilizationPercentMetric(): Unit = {
+    val logCleaner = new LogCleaner(
+      new CleanerConfig(true),
       logDirs = Array(TestUtils.tempDir(), TestUtils.tempDir()),
       logs = new Pool[TopicPartition, UnifiedLog](),
       logDirFailureChannel = new LogDirFailureChannel(1),
-      time = time)
+      time = time
+    )
+
+    def assertMaxBufferUtilizationPercent(expected: Int): Unit = {
+      val gauge = 
logCleaner.metricsGroup.newGauge(MaxBufferUtilizationPercentMetricName,
+        () => (logCleaner.maxOverCleanerThreads(_.lastStats.bufferUtilization) 
* 100).toInt)
+      assertEquals(expected, gauge.value())
+    }
+
+    try {
+      // No CleanerThreads
+      assertMaxBufferUtilizationPercent(0)
+
+      val cleaners = logCleaner.cleaners
+
+      val cleaner1 = new logCleaner.CleanerThread(1)
+      cleaner1.lastStats = new CleanerStats(time)
+      cleaner1.lastStats.bufferUtilization = 0.75
+      cleaners += cleaner1
+
+      val cleaner2 = new logCleaner.CleanerThread(2)
+      cleaner2.lastStats = new CleanerStats(time)
+      cleaner2.lastStats.bufferUtilization = 0.85
+      cleaners += cleaner2
+
+      val cleaner3 = new logCleaner.CleanerThread(3)
+      cleaner3.lastStats = new CleanerStats(time)
+      cleaner3.lastStats.bufferUtilization = 0.65
+      cleaners += cleaner3
+
+      // expect the gauge value to reflect the maximum bufferUtilization
+      assertMaxBufferUtilizationPercent(85)
+
+      // Update bufferUtilization and verify the gauge value updates
+      cleaner1.lastStats.bufferUtilization = 0.9
+      assertMaxBufferUtilizationPercent(90)
+
+      // All CleanerThreads have the same bufferUtilization
+      cleaners.foreach(_.lastStats.bufferUtilization = 0.5)
+      assertMaxBufferUtilizationPercent(50)
+    } finally {
+      logCleaner.shutdown()
+    }
+  }
+
+  @Test
+  def testMaxCleanTimeMetric(): Unit = {
+    val logCleaner = new LogCleaner(
+      new CleanerConfig(true),
+      logDirs = Array(TestUtils.tempDir(), TestUtils.tempDir()),
+      logs = new Pool[TopicPartition, UnifiedLog](),
+      logDirFailureChannel = new LogDirFailureChannel(1),
+      time = time
+    )
+
+    def assertMaxCleanTime(expected: Int): Unit = {
+      val gauge = logCleaner.metricsGroup.newGauge(MaxCleanTimeMetricName,
+        () => logCleaner.maxOverCleanerThreads(_.lastStats.elapsedSecs).toInt)
+      assertEquals(expected, gauge.value())
+    }
 
-    val cleaners = logCleaner.cleaners
+    try {
+      // No CleanerThreads
+      assertMaxCleanTime(0)
 
-    val cleaner1 = new logCleaner.CleanerThread(1)
-    cleaner1.lastStats = new CleanerStats(time)
-    cleaner1.lastStats.bufferUtilization = 0.75
-    cleaners += cleaner1
+      val cleaners = logCleaner.cleaners
 
-    val cleaner2 = new logCleaner.CleanerThread(2)
-    cleaner2.lastStats = new CleanerStats(time)
-    cleaner2.lastStats.bufferUtilization = 0.85
-    cleaners += cleaner2
+      val cleaner1 = new logCleaner.CleanerThread(1)
+      cleaner1.lastStats = new CleanerStats(time)
+      cleaner1.lastStats.endTime = cleaner1.lastStats.startTime + 1_000L
+      cleaners += cleaner1
 
-    val cleaner3 = new logCleaner.CleanerThread(3)
-    cleaner3.lastStats = new CleanerStats(time)
-    cleaner3.lastStats.bufferUtilization = 0.65
-    cleaners += cleaner3
+      val cleaner2 = new logCleaner.CleanerThread(2)
+      cleaner2.lastStats = new CleanerStats(time)
+      cleaner2.lastStats.endTime = cleaner2.lastStats.startTime + 2_000L
+      cleaners += cleaner2
 
-    assertEquals(0, 
logCleaner.maxOverCleanerThreads(_.lastStats.bufferUtilization))
+      val cleaner3 = new logCleaner.CleanerThread(3)
+      cleaner3.lastStats = new CleanerStats(time)
+      cleaner3.lastStats.endTime = cleaner3.lastStats.startTime + 3_000L
+      cleaners += cleaner3
 
-    cleaners.clear()
+      // expect the gauge value to reflect the maximum cleanTime
+      assertMaxCleanTime(3)
 
-    cleaner1.lastStats.bufferUtilization = 5d
-    cleaners += cleaner1
-    cleaner2.lastStats.bufferUtilization = 6d
-    cleaners += cleaner2
-    cleaner3.lastStats.bufferUtilization = 7d
-    cleaners += cleaner3
+      // Update cleanTime and verify the gauge value updates
+      cleaner1.lastStats.endTime = cleaner1.lastStats.startTime + 4_000L
+      assertMaxCleanTime(4)
 
-    assertEquals(7, 
logCleaner.maxOverCleanerThreads(_.lastStats.bufferUtilization))
+      // All CleanerThreads have the same cleanTime
+      cleaners.foreach(cleaner => cleaner.lastStats.endTime = 
cleaner.lastStats.startTime + 1_500L)
+      assertMaxCleanTime(1)
+    } finally {
+      logCleaner.shutdown()
+    }
+  }
+
+  @Test
+  def testMaxCompactionDelayMetrics(): Unit = {
+    val logCleaner = new LogCleaner(
+      new CleanerConfig(true),
+      logDirs = Array(TestUtils.tempDir(), TestUtils.tempDir()),
+      logs = new Pool[TopicPartition, UnifiedLog](),
+      logDirFailureChannel = new LogDirFailureChannel(1),
+      time = time
+    )
+
+    def assertMaxCompactionDelay(expected: Int): Unit = {
+      val gauge = 
logCleaner.metricsGroup.newGauge(MaxCompactionDelayMetricsName,
+        () => 
(logCleaner.maxOverCleanerThreads(_.lastPreCleanStats.maxCompactionDelayMs.toDouble)
 / 1000).toInt)
+      assertEquals(expected, gauge.value())
+    }
+
+    try {
+      // No CleanerThreads
+      assertMaxCompactionDelay(0)
+
+      val cleaners = logCleaner.cleaners
+
+      val cleaner1 = new logCleaner.CleanerThread(1)
+      cleaner1.lastStats = new CleanerStats(time)
+      cleaner1.lastPreCleanStats.maxCompactionDelayMs = 1_000L
+      cleaners += cleaner1
+
+      val cleaner2 = new logCleaner.CleanerThread(2)
+      cleaner2.lastStats = new CleanerStats(time)
+      cleaner2.lastPreCleanStats.maxCompactionDelayMs = 2_000L
+      cleaners += cleaner2
+
+      val cleaner3 = new logCleaner.CleanerThread(3)
+      cleaner3.lastStats = new CleanerStats(time)
+      cleaner3.lastPreCleanStats.maxCompactionDelayMs = 3_000L
+      cleaners += cleaner3
+
+      // expect the gauge value to reflect the maximum CompactionDelay
+      assertMaxCompactionDelay(3)
+
+      // Update CompactionDelay and verify the gauge value updates
+      cleaner1.lastPreCleanStats.maxCompactionDelayMs = 4_000L
+      assertMaxCompactionDelay(4)
+
+      // All CleanerThreads have the same CompactionDelay
+      cleaners.foreach(_.lastPreCleanStats.maxCompactionDelayMs = 1_500L)
+      assertMaxCompactionDelay(1)
+    } finally {
+      logCleaner.shutdown()
+    }
   }
 
   private def writeToLog(log: UnifiedLog, keysAndValues: Iterable[(Int, Int)], 
offsetSeq: Iterable[Long]): Iterable[Long] = {

Reply via email to