This is an automated email from the ASF dual-hosted git repository.
chia7712 pushed a commit to branch trunk
in repository https://gitbox.apache.org/repos/asf/kafka.git
The following commit(s) were added to refs/heads/trunk by this push:
new 5fac9057496 KAFKA-17222 Remove the subclass of KafkaMetricsGroup
(#16752)
5fac9057496 is described below
commit 5fac9057496ded1de838f6283aa7430b7794d354
Author: bboyleonp666 <[email protected]>
AuthorDate: Wed Aug 7 01:26:49 2024 +0800
KAFKA-17222 Remove the subclass of KafkaMetricsGroup (#16752)
The method overrides of metricName in KafkaMetricsGroup are no longer
required since there's a new constructor that implement this with the same
behavior.
Reviewers: PoAn Yang <[email protected]>, Chia-Ping Tsai
<[email protected]>
---
core/src/main/scala/kafka/log/UnifiedLog.scala | 9 ++------
.../main/scala/kafka/network/SocketServer.scala | 5 ++---
core/src/main/scala/kafka/server/KafkaBroker.scala | 12 +++--------
.../server/metadata/BrokerServerMetrics.scala | 11 +++-------
core/src/main/scala/kafka/zk/KafkaZkClient.scala | 8 +-------
.../scala/kafka/zookeeper/ZooKeeperClient.scala | 9 +-------
.../unit/kafka/metrics/KafkaMetricsGroupTest.scala | 24 ++++++----------------
.../kafka/server/metrics/KafkaMetricsGroup.java | 2 +-
.../kafka/storage/internals/log/LogSegment.java | 13 +++---------
9 files changed, 22 insertions(+), 71 deletions(-)
diff --git a/core/src/main/scala/kafka/log/UnifiedLog.scala
b/core/src/main/scala/kafka/log/UnifiedLog.scala
index 4a89ae4d56c..e6e7e469554 100644
--- a/core/src/main/scala/kafka/log/UnifiedLog.scala
+++ b/core/src/main/scala/kafka/log/UnifiedLog.scala
@@ -17,7 +17,6 @@
package kafka.log
-import com.yammer.metrics.core.MetricName
import kafka.common.{OffsetsOutOfOrderException,
UnexpectedAppendOffsetException}
import kafka.log.LocalLog.nextOption
import kafka.log.remote.RemoteLogManager
@@ -111,12 +110,8 @@ class UnifiedLog(@volatile var logStartOffset: Long,
import kafka.log.UnifiedLog._
- private val metricsGroup = new KafkaMetricsGroup(this.getClass) {
- // For compatibility, metrics are defined to be under `Log` class
- override def metricName(name: String, tags: util.Map[String, String]):
MetricName = {
- KafkaMetricsGroup.explicitMetricName(getClass.getPackage.getName, "Log",
name, tags)
- }
- }
+ // For compatibility, metrics are defined to be under `Log` class
+ private val metricsGroup = new
KafkaMetricsGroup(getClass.getPackage.getName, "Log")
this.logIdent = s"[UnifiedLog partition=$topicPartition, dir=$parentDir] "
diff --git a/core/src/main/scala/kafka/network/SocketServer.scala
b/core/src/main/scala/kafka/network/SocketServer.scala
index c9d6a86c549..72ca2baa1fa 100644
--- a/core/src/main/scala/kafka/network/SocketServer.scala
+++ b/core/src/main/scala/kafka/network/SocketServer.scala
@@ -608,9 +608,8 @@ private[kafka] abstract class Acceptor(val socketServer:
SocketServer,
private[network] val processors = new ArrayBuffer[Processor]()
// Build the metric name explicitly in order to keep the existing name for
compatibility
- private val blockedPercentMeterMetricName =
KafkaMetricsGroup.explicitMetricName(
- "kafka.network",
- "Acceptor",
+ private val backwardCompatibilityMetricGroup = new
KafkaMetricsGroup("kafka.network", "Acceptor")
+ private val blockedPercentMeterMetricName =
backwardCompatibilityMetricGroup.metricName(
s"${metricPrefix()}AcceptorBlockedPercent",
Map(ListenerMetricTag -> endPoint.listenerName.value).asJava)
private val blockedPercentMeter =
metricsGroup.newMeter(blockedPercentMeterMetricName,"blocked time",
TimeUnit.NANOSECONDS)
diff --git a/core/src/main/scala/kafka/server/KafkaBroker.scala
b/core/src/main/scala/kafka/server/KafkaBroker.scala
index 59a7c9c7ba0..bcdf6593b2e 100644
--- a/core/src/main/scala/kafka/server/KafkaBroker.scala
+++ b/core/src/main/scala/kafka/server/KafkaBroker.scala
@@ -17,7 +17,6 @@
package kafka.server
-import com.yammer.metrics.core.MetricName
import kafka.log.LogManager
import kafka.log.remote.RemoteLogManager
import kafka.network.SocketServer
@@ -37,7 +36,6 @@ import org.apache.kafka.server.metrics.{KafkaMetricsGroup,
KafkaYammerMetrics, L
import org.apache.kafka.server.util.Scheduler
import java.time.Duration
-import java.util
import scala.collection.Seq
import scala.jdk.CollectionConverters._
@@ -103,13 +101,9 @@ trait KafkaBroker extends Logging {
def clientToControllerChannelManager: NodeToControllerChannelManager
def tokenCache: DelegationTokenCache
- private val metricsGroup = new KafkaMetricsGroup(this.getClass) {
- // For backwards compatibility, we need to keep older metrics tied
- // to their original name when this class was named `KafkaServer`
- override def metricName(name: String, tags: util.Map[String, String]):
MetricName = {
- KafkaMetricsGroup.explicitMetricName(Server.MetricsPrefix,
KafkaBroker.MetricsTypeName, name, tags)
- }
- }
+ // For backwards compatibility, we need to keep older metrics tied
+ // to their original name when this class was named `KafkaServer`
+ private val metricsGroup = new KafkaMetricsGroup(Server.MetricsPrefix,
KafkaBroker.MetricsTypeName)
metricsGroup.newGauge("BrokerState", () => brokerState.value)
metricsGroup.newGauge("ClusterId", () => clusterId)
diff --git
a/core/src/main/scala/kafka/server/metadata/BrokerServerMetrics.scala
b/core/src/main/scala/kafka/server/metadata/BrokerServerMetrics.scala
index ad8c21f9608..78f975640b9 100644
--- a/core/src/main/scala/kafka/server/metadata/BrokerServerMetrics.scala
+++ b/core/src/main/scala/kafka/server/metadata/BrokerServerMetrics.scala
@@ -33,10 +33,8 @@ final class BrokerServerMetrics private (
) extends AutoCloseable {
import BrokerServerMetrics._
- private val batchProcessingTimeHistName =
KafkaMetricsGroup.explicitMetricName("kafka.server",
- "BrokerMetadataListener",
- "MetadataBatchProcessingTimeUs",
- Collections.emptyMap())
+ private val metricsGroup = new
KafkaMetricsGroup("kafka.server","BrokerMetadataListener")
+ private val batchProcessingTimeHistName =
metricsGroup.metricName("MetadataBatchProcessingTimeUs", Collections.emptyMap())
/**
* A histogram tracking the time in microseconds it took to process batches
of events.
@@ -44,10 +42,7 @@ final class BrokerServerMetrics private (
private val batchProcessingTimeHist =
KafkaYammerMetrics.defaultRegistry().newHistogram(batchProcessingTimeHistName,
true)
- private val batchSizeHistName =
KafkaMetricsGroup.explicitMetricName("kafka.server",
- "BrokerMetadataListener",
- "MetadataBatchSizes",
- Collections.emptyMap())
+ private val batchSizeHistName =
metricsGroup.metricName("MetadataBatchSizes", Collections.emptyMap())
/**
* A histogram tracking the sizes of batches that we have processed.
diff --git a/core/src/main/scala/kafka/zk/KafkaZkClient.scala
b/core/src/main/scala/kafka/zk/KafkaZkClient.scala
index cb22d3caccf..63ffc6aaedc 100644
--- a/core/src/main/scala/kafka/zk/KafkaZkClient.scala
+++ b/core/src/main/scala/kafka/zk/KafkaZkClient.scala
@@ -17,7 +17,6 @@
package kafka.zk
import java.util.Properties
-import com.yammer.metrics.core.MetricName
import kafka.api.LeaderAndIsr
import kafka.cluster.Broker
import kafka.controller.{KafkaController, LeaderIsrAndControllerEpoch,
ReplicaAssignment}
@@ -44,7 +43,6 @@ import org.apache.zookeeper.common.ZKConfig
import org.apache.zookeeper.data.{ACL, Stat}
import org.apache.zookeeper.{CreateMode, KeeperException, OpResult, ZooKeeper}
-import java.util
import java.lang.{Long => JLong}
import scala.collection.{Map, Seq, mutable}
@@ -67,11 +65,7 @@ class KafkaZkClient private[zk] (
enableEntityConfigControllerCheck: Boolean
) extends AutoCloseable with Logging {
- private val metricsGroup: KafkaMetricsGroup = new
KafkaMetricsGroup(this.getClass) {
- override def metricName(name: String, metricTags: util.Map[String,
String]): MetricName = {
- KafkaMetricsGroup.explicitMetricName("kafka.server",
"ZooKeeperClientMetrics", name, metricTags)
- }
- }
+ private val metricsGroup: KafkaMetricsGroup = new
KafkaMetricsGroup("kafka.server", "ZooKeeperClientMetrics")
private val latencyMetric =
metricsGroup.newHistogram("ZooKeeperRequestLatencyMs")
diff --git a/core/src/main/scala/kafka/zookeeper/ZooKeeperClient.scala
b/core/src/main/scala/kafka/zookeeper/ZooKeeperClient.scala
index daa60f4bf66..3f77acf3dbb 100755
--- a/core/src/main/scala/kafka/zookeeper/ZooKeeperClient.scala
+++ b/core/src/main/scala/kafka/zookeeper/ZooKeeperClient.scala
@@ -21,7 +21,6 @@ import java.util.Locale
import java.util.concurrent.locks.{ReentrantLock, ReentrantReadWriteLock}
import java.util.concurrent._
import java.util.{List => JList}
-import com.yammer.metrics.core.MetricName
import kafka.utils.CoreUtils.{inLock, inReadLock, inWriteLock}
import kafka.utils.Logging
import kafka.zookeeper.ZooKeeperClient._
@@ -36,7 +35,6 @@ import org.apache.zookeeper.data.{ACL, Stat}
import org.apache.zookeeper._
import org.apache.zookeeper.client.ZKClientConfig
-import java.util
import scala.jdk.CollectionConverters._
import scala.collection.{Seq, mutable}
@@ -64,12 +62,7 @@ class ZooKeeperClient(connectString: String,
private[zookeeper] val clientConfig: ZKClientConfig,
name: String) extends Logging {
- private val metricsGroup: KafkaMetricsGroup = new
KafkaMetricsGroup(this.getClass) {
- override def metricName(name: String, metricTags: util.Map[String,
String]): MetricName = {
- KafkaMetricsGroup.explicitMetricName(metricGroup, metricType, name,
metricTags)
- }
- }
-
+ private val metricsGroup: KafkaMetricsGroup = new
KafkaMetricsGroup(metricGroup, metricType)
this.logIdent = s"[ZooKeeperClient $name] "
private val initializationLock = new ReentrantReadWriteLock()
diff --git a/core/src/test/scala/unit/kafka/metrics/KafkaMetricsGroupTest.scala
b/core/src/test/scala/unit/kafka/metrics/KafkaMetricsGroupTest.scala
index 2b0c46c5c46..75946f14075 100644
--- a/core/src/test/scala/unit/kafka/metrics/KafkaMetricsGroupTest.scala
+++ b/core/src/test/scala/unit/kafka/metrics/KafkaMetricsGroupTest.scala
@@ -29,12 +29,8 @@ class KafkaMetricsGroupTest {
@Test
def testUntaggedMetricName(): Unit = {
- val metricName = KafkaMetricsGroup.explicitMetricName(
- "kafka.metrics",
- "TestMetrics",
- "TaggedMetric",
- Collections.emptyMap()
- )
+ val metricsGroup = new KafkaMetricsGroup("kafka.metrics", "TestMetrics")
+ val metricName = metricsGroup.metricName("TaggedMetric",
Collections.emptyMap())
assertEquals("kafka.metrics", metricName.getGroup)
assertEquals("TestMetrics", metricName.getType)
@@ -47,12 +43,8 @@ class KafkaMetricsGroupTest {
@Test
def testTaggedMetricName(): Unit = {
val tags = Map("foo" -> "bar", "bar" -> "baz", "baz" -> "raz.taz").asJava
- val metricName = KafkaMetricsGroup.explicitMetricName(
- "kafka.metrics",
- "TestMetrics",
- "TaggedMetric",
- tags
- )
+ val metricsGroup = new KafkaMetricsGroup("kafka.metrics", "TestMetrics")
+ val metricName = metricsGroup.metricName("TaggedMetric", tags)
assertEquals("kafka.metrics", metricName.getGroup)
assertEquals("TestMetrics", metricName.getType)
@@ -65,12 +57,8 @@ class KafkaMetricsGroupTest {
@Test
def testTaggedMetricNameWithEmptyValue(): Unit = {
val tags = Map("foo" -> "bar", "bar" -> "", "baz" -> "raz.taz").asJava
- val metricName = KafkaMetricsGroup.explicitMetricName(
- "kafka.metrics",
- "TestMetrics",
- "TaggedMetric",
- tags
- )
+ val metricsGroup = new KafkaMetricsGroup("kafka.metrics", "TestMetrics")
+ val metricName = metricsGroup.metricName("TaggedMetric", tags)
assertEquals("kafka.metrics", metricName.getGroup)
assertEquals("TestMetrics", metricName.getType)
diff --git
a/server-common/src/main/java/org/apache/kafka/server/metrics/KafkaMetricsGroup.java
b/server-common/src/main/java/org/apache/kafka/server/metrics/KafkaMetricsGroup.java
index 5c5607db737..9a01b2ac247 100644
---
a/server-common/src/main/java/org/apache/kafka/server/metrics/KafkaMetricsGroup.java
+++
b/server-common/src/main/java/org/apache/kafka/server/metrics/KafkaMetricsGroup.java
@@ -61,7 +61,7 @@ public class KafkaMetricsGroup {
return explicitMetricName(this.pkg, this.simpleName, name, tags);
}
- public static MetricName explicitMetricName(String group, String typeName,
+ private static MetricName explicitMetricName(String group, String typeName,
String name, Map<String,
String> tags) {
StringBuilder nameBuilder = new StringBuilder(100);
nameBuilder.append(group);
diff --git
a/storage/src/main/java/org/apache/kafka/storage/internals/log/LogSegment.java
b/storage/src/main/java/org/apache/kafka/storage/internals/log/LogSegment.java
index 2113bf1de29..3f9c8b3989d 100644
---
a/storage/src/main/java/org/apache/kafka/storage/internals/log/LogSegment.java
+++
b/storage/src/main/java/org/apache/kafka/storage/internals/log/LogSegment.java
@@ -29,7 +29,6 @@ import org.apache.kafka.common.utils.Utils;
import org.apache.kafka.server.metrics.KafkaMetricsGroup;
import org.apache.kafka.storage.internals.epoch.LeaderEpochFileCache;
-import com.yammer.metrics.core.MetricName;
import com.yammer.metrics.core.Timer;
import org.slf4j.Logger;
@@ -44,7 +43,6 @@ import java.nio.file.Files;
import java.nio.file.NoSuchFileException;
import java.nio.file.attribute.FileTime;
import java.util.Iterator;
-import java.util.Map;
import java.util.Optional;
import java.util.OptionalLong;
import java.util.concurrent.Callable;
@@ -72,14 +70,9 @@ public class LogSegment implements Closeable {
private static final Pattern FUTURE_DIR_PATTERN =
Pattern.compile("^(\\S+)-(\\S+)\\.(\\S+)" + FUTURE_DIR_SUFFIX);
static {
- KafkaMetricsGroup logFlushStatsMetricsGroup = new
KafkaMetricsGroup(LogSegment.class) {
- @Override
- public MetricName metricName(String name, Map<String, String>
tags) {
- // Override the group and type names for compatibility - this
metrics group was previously defined within
- // a Scala object named `kafka.log.LogFlushStats`
- return KafkaMetricsGroup.explicitMetricName("kafka.log",
"LogFlushStats", name, tags);
- }
- };
+ // For compatibility - this metrics group was previously defined within
+ // a Scala object named `kafka.log.LogFlushStats`
+ KafkaMetricsGroup logFlushStatsMetricsGroup = new
KafkaMetricsGroup("kafka.log", "LogFlushStats");
LOG_FLUSH_TIMER =
logFlushStatsMetricsGroup.newTimer("LogFlushRateAndTimeMs",
TimeUnit.MILLISECONDS, TimeUnit.SECONDS);
}