This is an automated email from the ASF dual-hosted git repository.

chia7712 pushed a commit to branch trunk
in repository https://gitbox.apache.org/repos/asf/kafka.git


The following commit(s) were added to refs/heads/trunk by this push:
     new 5fac9057496 KAFKA-17222 Remove the subclass of KafkaMetricsGroup 
(#16752)
5fac9057496 is described below

commit 5fac9057496ded1de838f6283aa7430b7794d354
Author: bboyleonp666 <[email protected]>
AuthorDate: Wed Aug 7 01:26:49 2024 +0800

    KAFKA-17222 Remove the subclass of KafkaMetricsGroup (#16752)
    
    The method overrides of metricName in KafkaMetricsGroup are no longer 
required since there's a new constructor that implement this with the same 
behavior.
    
    Reviewers: PoAn Yang <[email protected]>, Chia-Ping Tsai 
<[email protected]>
---
 core/src/main/scala/kafka/log/UnifiedLog.scala     |  9 ++------
 .../main/scala/kafka/network/SocketServer.scala    |  5 ++---
 core/src/main/scala/kafka/server/KafkaBroker.scala | 12 +++--------
 .../server/metadata/BrokerServerMetrics.scala      | 11 +++-------
 core/src/main/scala/kafka/zk/KafkaZkClient.scala   |  8 +-------
 .../scala/kafka/zookeeper/ZooKeeperClient.scala    |  9 +-------
 .../unit/kafka/metrics/KafkaMetricsGroupTest.scala | 24 ++++++----------------
 .../kafka/server/metrics/KafkaMetricsGroup.java    |  2 +-
 .../kafka/storage/internals/log/LogSegment.java    | 13 +++---------
 9 files changed, 22 insertions(+), 71 deletions(-)

diff --git a/core/src/main/scala/kafka/log/UnifiedLog.scala 
b/core/src/main/scala/kafka/log/UnifiedLog.scala
index 4a89ae4d56c..e6e7e469554 100644
--- a/core/src/main/scala/kafka/log/UnifiedLog.scala
+++ b/core/src/main/scala/kafka/log/UnifiedLog.scala
@@ -17,7 +17,6 @@
 
 package kafka.log
 
-import com.yammer.metrics.core.MetricName
 import kafka.common.{OffsetsOutOfOrderException, 
UnexpectedAppendOffsetException}
 import kafka.log.LocalLog.nextOption
 import kafka.log.remote.RemoteLogManager
@@ -111,12 +110,8 @@ class UnifiedLog(@volatile var logStartOffset: Long,
 
   import kafka.log.UnifiedLog._
 
-  private val metricsGroup = new KafkaMetricsGroup(this.getClass) {
-    // For compatibility, metrics are defined to be under `Log` class
-    override def metricName(name: String, tags: util.Map[String, String]): 
MetricName = {
-      KafkaMetricsGroup.explicitMetricName(getClass.getPackage.getName, "Log", 
name, tags)
-    }
-  }
+  // For compatibility, metrics are defined to be under `Log` class
+  private val metricsGroup = new 
KafkaMetricsGroup(getClass.getPackage.getName, "Log")
 
   this.logIdent = s"[UnifiedLog partition=$topicPartition, dir=$parentDir] "
 
diff --git a/core/src/main/scala/kafka/network/SocketServer.scala 
b/core/src/main/scala/kafka/network/SocketServer.scala
index c9d6a86c549..72ca2baa1fa 100644
--- a/core/src/main/scala/kafka/network/SocketServer.scala
+++ b/core/src/main/scala/kafka/network/SocketServer.scala
@@ -608,9 +608,8 @@ private[kafka] abstract class Acceptor(val socketServer: 
SocketServer,
 
   private[network] val processors = new ArrayBuffer[Processor]()
   // Build the metric name explicitly in order to keep the existing name for 
compatibility
-  private val blockedPercentMeterMetricName = 
KafkaMetricsGroup.explicitMetricName(
-    "kafka.network",
-    "Acceptor",
+  private val backwardCompatibilityMetricGroup = new 
KafkaMetricsGroup("kafka.network", "Acceptor")
+  private val blockedPercentMeterMetricName = 
backwardCompatibilityMetricGroup.metricName(
     s"${metricPrefix()}AcceptorBlockedPercent",
     Map(ListenerMetricTag -> endPoint.listenerName.value).asJava)
   private val blockedPercentMeter = 
metricsGroup.newMeter(blockedPercentMeterMetricName,"blocked time", 
TimeUnit.NANOSECONDS)
diff --git a/core/src/main/scala/kafka/server/KafkaBroker.scala 
b/core/src/main/scala/kafka/server/KafkaBroker.scala
index 59a7c9c7ba0..bcdf6593b2e 100644
--- a/core/src/main/scala/kafka/server/KafkaBroker.scala
+++ b/core/src/main/scala/kafka/server/KafkaBroker.scala
@@ -17,7 +17,6 @@
 
 package kafka.server
 
-import com.yammer.metrics.core.MetricName
 import kafka.log.LogManager
 import kafka.log.remote.RemoteLogManager
 import kafka.network.SocketServer
@@ -37,7 +36,6 @@ import org.apache.kafka.server.metrics.{KafkaMetricsGroup, 
KafkaYammerMetrics, L
 import org.apache.kafka.server.util.Scheduler
 
 import java.time.Duration
-import java.util
 import scala.collection.Seq
 import scala.jdk.CollectionConverters._
 
@@ -103,13 +101,9 @@ trait KafkaBroker extends Logging {
   def clientToControllerChannelManager: NodeToControllerChannelManager
   def tokenCache: DelegationTokenCache
 
-  private val metricsGroup = new KafkaMetricsGroup(this.getClass) {
-    // For backwards compatibility, we need to keep older metrics tied
-    // to their original name when this class was named `KafkaServer`
-    override def metricName(name: String, tags: util.Map[String, String]): 
MetricName = {
-      KafkaMetricsGroup.explicitMetricName(Server.MetricsPrefix, 
KafkaBroker.MetricsTypeName, name, tags)
-    }
-  }
+  // For backwards compatibility, we need to keep older metrics tied
+  // to their original name when this class was named `KafkaServer`
+  private val metricsGroup = new KafkaMetricsGroup(Server.MetricsPrefix, 
KafkaBroker.MetricsTypeName)
 
   metricsGroup.newGauge("BrokerState", () => brokerState.value)
   metricsGroup.newGauge("ClusterId", () => clusterId)
diff --git 
a/core/src/main/scala/kafka/server/metadata/BrokerServerMetrics.scala 
b/core/src/main/scala/kafka/server/metadata/BrokerServerMetrics.scala
index ad8c21f9608..78f975640b9 100644
--- a/core/src/main/scala/kafka/server/metadata/BrokerServerMetrics.scala
+++ b/core/src/main/scala/kafka/server/metadata/BrokerServerMetrics.scala
@@ -33,10 +33,8 @@ final class BrokerServerMetrics private (
 ) extends AutoCloseable {
   import BrokerServerMetrics._
 
-  private val batchProcessingTimeHistName = 
KafkaMetricsGroup.explicitMetricName("kafka.server",
-    "BrokerMetadataListener",
-    "MetadataBatchProcessingTimeUs",
-    Collections.emptyMap())
+  private val metricsGroup = new 
KafkaMetricsGroup("kafka.server","BrokerMetadataListener")
+  private val batchProcessingTimeHistName = 
metricsGroup.metricName("MetadataBatchProcessingTimeUs", Collections.emptyMap())
 
   /**
    * A histogram tracking the time in microseconds it took to process batches 
of events.
@@ -44,10 +42,7 @@ final class BrokerServerMetrics private (
   private val batchProcessingTimeHist =
     
KafkaYammerMetrics.defaultRegistry().newHistogram(batchProcessingTimeHistName, 
true)
 
-  private val batchSizeHistName = 
KafkaMetricsGroup.explicitMetricName("kafka.server",
-    "BrokerMetadataListener",
-    "MetadataBatchSizes",
-    Collections.emptyMap())
+  private val batchSizeHistName = 
metricsGroup.metricName("MetadataBatchSizes", Collections.emptyMap())
 
   /**
    * A histogram tracking the sizes of batches that we have processed.
diff --git a/core/src/main/scala/kafka/zk/KafkaZkClient.scala 
b/core/src/main/scala/kafka/zk/KafkaZkClient.scala
index cb22d3caccf..63ffc6aaedc 100644
--- a/core/src/main/scala/kafka/zk/KafkaZkClient.scala
+++ b/core/src/main/scala/kafka/zk/KafkaZkClient.scala
@@ -17,7 +17,6 @@
 package kafka.zk
 
 import java.util.Properties
-import com.yammer.metrics.core.MetricName
 import kafka.api.LeaderAndIsr
 import kafka.cluster.Broker
 import kafka.controller.{KafkaController, LeaderIsrAndControllerEpoch, 
ReplicaAssignment}
@@ -44,7 +43,6 @@ import org.apache.zookeeper.common.ZKConfig
 import org.apache.zookeeper.data.{ACL, Stat}
 import org.apache.zookeeper.{CreateMode, KeeperException, OpResult, ZooKeeper}
 
-import java.util
 import java.lang.{Long => JLong}
 import scala.collection.{Map, Seq, mutable}
 
@@ -67,11 +65,7 @@ class KafkaZkClient private[zk] (
   enableEntityConfigControllerCheck: Boolean
 ) extends AutoCloseable with Logging {
 
-  private val metricsGroup: KafkaMetricsGroup = new 
KafkaMetricsGroup(this.getClass) {
-    override def metricName(name: String, metricTags: util.Map[String, 
String]): MetricName = {
-      KafkaMetricsGroup.explicitMetricName("kafka.server", 
"ZooKeeperClientMetrics", name, metricTags)
-    }
-  }
+  private val metricsGroup: KafkaMetricsGroup = new 
KafkaMetricsGroup("kafka.server", "ZooKeeperClientMetrics")
 
   private val latencyMetric = 
metricsGroup.newHistogram("ZooKeeperRequestLatencyMs")
 
diff --git a/core/src/main/scala/kafka/zookeeper/ZooKeeperClient.scala 
b/core/src/main/scala/kafka/zookeeper/ZooKeeperClient.scala
index daa60f4bf66..3f77acf3dbb 100755
--- a/core/src/main/scala/kafka/zookeeper/ZooKeeperClient.scala
+++ b/core/src/main/scala/kafka/zookeeper/ZooKeeperClient.scala
@@ -21,7 +21,6 @@ import java.util.Locale
 import java.util.concurrent.locks.{ReentrantLock, ReentrantReadWriteLock}
 import java.util.concurrent._
 import java.util.{List => JList}
-import com.yammer.metrics.core.MetricName
 import kafka.utils.CoreUtils.{inLock, inReadLock, inWriteLock}
 import kafka.utils.Logging
 import kafka.zookeeper.ZooKeeperClient._
@@ -36,7 +35,6 @@ import org.apache.zookeeper.data.{ACL, Stat}
 import org.apache.zookeeper._
 import org.apache.zookeeper.client.ZKClientConfig
 
-import java.util
 import scala.jdk.CollectionConverters._
 import scala.collection.{Seq, mutable}
 
@@ -64,12 +62,7 @@ class ZooKeeperClient(connectString: String,
                       private[zookeeper] val clientConfig: ZKClientConfig,
                       name: String) extends Logging {
 
-  private val metricsGroup: KafkaMetricsGroup = new 
KafkaMetricsGroup(this.getClass) {
-    override def metricName(name: String, metricTags: util.Map[String, 
String]): MetricName = {
-      KafkaMetricsGroup.explicitMetricName(metricGroup, metricType, name, 
metricTags)
-    }
-  }
-
+  private val metricsGroup: KafkaMetricsGroup = new 
KafkaMetricsGroup(metricGroup, metricType)
 
   this.logIdent = s"[ZooKeeperClient $name] "
   private val initializationLock = new ReentrantReadWriteLock()
diff --git a/core/src/test/scala/unit/kafka/metrics/KafkaMetricsGroupTest.scala 
b/core/src/test/scala/unit/kafka/metrics/KafkaMetricsGroupTest.scala
index 2b0c46c5c46..75946f14075 100644
--- a/core/src/test/scala/unit/kafka/metrics/KafkaMetricsGroupTest.scala
+++ b/core/src/test/scala/unit/kafka/metrics/KafkaMetricsGroupTest.scala
@@ -29,12 +29,8 @@ class KafkaMetricsGroupTest {
 
   @Test
   def testUntaggedMetricName(): Unit = {
-    val metricName = KafkaMetricsGroup.explicitMetricName(
-      "kafka.metrics",
-      "TestMetrics",
-      "TaggedMetric",
-      Collections.emptyMap()
-    )
+    val metricsGroup = new KafkaMetricsGroup("kafka.metrics", "TestMetrics")
+    val metricName = metricsGroup.metricName("TaggedMetric", 
Collections.emptyMap())
 
     assertEquals("kafka.metrics", metricName.getGroup)
     assertEquals("TestMetrics", metricName.getType)
@@ -47,12 +43,8 @@ class KafkaMetricsGroupTest {
   @Test
   def testTaggedMetricName(): Unit = {
     val tags = Map("foo" -> "bar", "bar" -> "baz", "baz" -> "raz.taz").asJava
-    val metricName = KafkaMetricsGroup.explicitMetricName(
-      "kafka.metrics",
-      "TestMetrics",
-      "TaggedMetric",
-      tags
-    )
+    val metricsGroup = new KafkaMetricsGroup("kafka.metrics", "TestMetrics")
+    val metricName = metricsGroup.metricName("TaggedMetric", tags)
 
     assertEquals("kafka.metrics", metricName.getGroup)
     assertEquals("TestMetrics", metricName.getType)
@@ -65,12 +57,8 @@ class KafkaMetricsGroupTest {
   @Test
   def testTaggedMetricNameWithEmptyValue(): Unit = {
     val tags = Map("foo" -> "bar", "bar" -> "", "baz" -> "raz.taz").asJava
-    val metricName = KafkaMetricsGroup.explicitMetricName(
-      "kafka.metrics",
-      "TestMetrics",
-      "TaggedMetric",
-      tags
-    )
+    val metricsGroup = new KafkaMetricsGroup("kafka.metrics", "TestMetrics")
+    val metricName = metricsGroup.metricName("TaggedMetric", tags)
 
     assertEquals("kafka.metrics", metricName.getGroup)
     assertEquals("TestMetrics", metricName.getType)
diff --git 
a/server-common/src/main/java/org/apache/kafka/server/metrics/KafkaMetricsGroup.java
 
b/server-common/src/main/java/org/apache/kafka/server/metrics/KafkaMetricsGroup.java
index 5c5607db737..9a01b2ac247 100644
--- 
a/server-common/src/main/java/org/apache/kafka/server/metrics/KafkaMetricsGroup.java
+++ 
b/server-common/src/main/java/org/apache/kafka/server/metrics/KafkaMetricsGroup.java
@@ -61,7 +61,7 @@ public class KafkaMetricsGroup {
         return explicitMetricName(this.pkg, this.simpleName, name, tags);
     }
 
-    public static MetricName explicitMetricName(String group, String typeName,
+    private static MetricName explicitMetricName(String group, String typeName,
                                                 String name, Map<String, 
String> tags) {
         StringBuilder nameBuilder = new StringBuilder(100);
         nameBuilder.append(group);
diff --git 
a/storage/src/main/java/org/apache/kafka/storage/internals/log/LogSegment.java 
b/storage/src/main/java/org/apache/kafka/storage/internals/log/LogSegment.java
index 2113bf1de29..3f9c8b3989d 100644
--- 
a/storage/src/main/java/org/apache/kafka/storage/internals/log/LogSegment.java
+++ 
b/storage/src/main/java/org/apache/kafka/storage/internals/log/LogSegment.java
@@ -29,7 +29,6 @@ import org.apache.kafka.common.utils.Utils;
 import org.apache.kafka.server.metrics.KafkaMetricsGroup;
 import org.apache.kafka.storage.internals.epoch.LeaderEpochFileCache;
 
-import com.yammer.metrics.core.MetricName;
 import com.yammer.metrics.core.Timer;
 
 import org.slf4j.Logger;
@@ -44,7 +43,6 @@ import java.nio.file.Files;
 import java.nio.file.NoSuchFileException;
 import java.nio.file.attribute.FileTime;
 import java.util.Iterator;
-import java.util.Map;
 import java.util.Optional;
 import java.util.OptionalLong;
 import java.util.concurrent.Callable;
@@ -72,14 +70,9 @@ public class LogSegment implements Closeable {
     private static final Pattern FUTURE_DIR_PATTERN = 
Pattern.compile("^(\\S+)-(\\S+)\\.(\\S+)" + FUTURE_DIR_SUFFIX);
 
     static {
-        KafkaMetricsGroup logFlushStatsMetricsGroup = new 
KafkaMetricsGroup(LogSegment.class) {
-            @Override
-            public MetricName metricName(String name, Map<String, String> 
tags) {
-                // Override the group and type names for compatibility - this 
metrics group was previously defined within
-                // a Scala object named `kafka.log.LogFlushStats`
-                return KafkaMetricsGroup.explicitMetricName("kafka.log", 
"LogFlushStats", name, tags);
-            }
-        };
+        // For compatibility - this metrics group was previously defined within
+        // a Scala object named `kafka.log.LogFlushStats`
+        KafkaMetricsGroup logFlushStatsMetricsGroup = new 
KafkaMetricsGroup("kafka.log", "LogFlushStats");
         LOG_FLUSH_TIMER = 
logFlushStatsMetricsGroup.newTimer("LogFlushRateAndTimeMs", 
TimeUnit.MILLISECONDS, TimeUnit.SECONDS);
     }
 

Reply via email to