This is an automated email from the ASF dual-hosted git repository.

satishd pushed a commit to branch trunk
in repository https://gitbox.apache.org/repos/asf/kafka.git


The following commit(s) were added to refs/heads/trunk by this push:
     new e28e0bf0f2c KAFKA-14524: Rewrite KafkaMetricsGroup in Java (#13067)
e28e0bf0f2c is described below

commit e28e0bf0f2c21206abccfffb280605dd02404678
Author: Ivan Yurchenko <[email protected]>
AuthorDate: Wed Mar 8 10:29:51 2023 +0000

    KAFKA-14524: Rewrite KafkaMetricsGroup in Java (#13067)
    
    * KAFKA-14524: Rewrite KafkaMetricsGroup in Java
    
    Instead of being a base trait for classes, `KafkaMetricsGroup` is now an 
independent object. User classes could override methods in it to adjust its 
behavior like they used to with the trait model.
    
    Some classes were extending the `KafkaMetricsGroup` trait, but it wasn't 
actually used.
    
    Reviewers: Ismael Juma <[email protected]>, lbownik 
<[email protected]>, Satish Duggana <[email protected]>
---
 core/src/main/scala/kafka/cluster/Partition.scala  |  38 +++---
 .../controller/ControllerChannelManager.scala      |  18 +--
 .../kafka/controller/ControllerEventManager.scala  |  14 +-
 .../scala/kafka/controller/KafkaController.scala   |  42 +++---
 .../coordinator/group/GroupMetadataManager.scala   |  10 +-
 .../TransactionMarkerChannelManager.scala          |  10 +-
 core/src/main/scala/kafka/log/LocalLog.scala       |   3 +-
 core/src/main/scala/kafka/log/LogCleaner.scala     |  17 +--
 .../main/scala/kafka/log/LogCleanerManager.scala   |  18 +--
 core/src/main/scala/kafka/log/LogManager.scala     |  28 ++--
 core/src/main/scala/kafka/log/LogSegment.scala     |   7 +-
 core/src/main/scala/kafka/log/UnifiedLog.scala     |  35 ++---
 .../scala/kafka/log/remote/RemoteLogManager.scala  |   3 +-
 .../scala/kafka/metrics/KafkaMetricsGroup.scala    | 111 ----------------
 .../main/scala/kafka/network/RequestChannel.scala  |  86 +++++++-----
 .../main/scala/kafka/network/SocketServer.scala    |  40 +++---
 .../kafka/server/AbstractFetcherManager.scala      |  17 ++-
 .../scala/kafka/server/AbstractFetcherThread.scala |  25 ++--
 .../scala/kafka/server/AlterPartitionManager.scala |   3 +-
 .../main/scala/kafka/server/ControllerServer.scala |  16 ++-
 .../scala/kafka/server/DelayedDeleteRecords.scala  |   8 +-
 .../src/main/scala/kafka/server/DelayedFetch.scala |  10 +-
 .../main/scala/kafka/server/DelayedOperation.scala |  17 ++-
 .../main/scala/kafka/server/DelayedProduce.scala   |  12 +-
 .../kafka/server/DelegationTokenManager.scala      |   3 +-
 .../src/main/scala/kafka/server/FetchSession.scala |  24 ++--
 core/src/main/scala/kafka/server/KafkaBroker.scala |  28 ++--
 .../scala/kafka/server/KafkaRequestHandler.scala   |  22 +--
 .../main/scala/kafka/server/ReplicaManager.scala   |  47 +++----
 .../main/scala/kafka/server/ZkAdminManager.scala   |   3 +-
 .../server/metadata/BrokerMetadataListener.scala   |   5 +-
 .../server/metadata/BrokerServerMetrics.scala      |  15 +--
 core/src/main/scala/kafka/tools/MirrorMaker.scala  |   9 +-
 core/src/main/scala/kafka/utils/Throttler.scala    |  10 +-
 core/src/main/scala/kafka/zk/KafkaZkClient.scala   |  15 ++-
 .../scala/kafka/zookeeper/ZooKeeperClient.scala    |  22 +--
 .../group/GroupMetadataManagerTest.scala           |   2 +-
 .../unit/kafka/log/LogCleanerIntegrationTest.scala |   3 +-
 .../test/scala/unit/kafka/log/LogCleanerTest.scala |   2 +-
 .../test/scala/unit/kafka/log/LogManagerTest.scala |   8 +-
 .../unit/kafka/metrics/KafkaMetricsGroupTest.scala |  29 ++--
 .../scala/unit/kafka/metrics/MetricsTest.scala     |   3 +-
 .../unit/kafka/network/ConnectionQuotasTest.scala  |   6 +-
 .../kafka/server/metrics/KafkaMetricsGroup.java    | 147 +++++++++++++++++++++
 44 files changed, 555 insertions(+), 436 deletions(-)

diff --git a/core/src/main/scala/kafka/cluster/Partition.scala 
b/core/src/main/scala/kafka/cluster/Partition.scala
index 80af26eda7f..1a0a6ab1b58 100755
--- a/core/src/main/scala/kafka/cluster/Partition.scala
+++ b/core/src/main/scala/kafka/cluster/Partition.scala
@@ -23,7 +23,6 @@ import kafka.api.LeaderAndIsr
 import kafka.common.UnexpectedAppendOffsetException
 import kafka.controller.{KafkaController, StateChangeLogger}
 import kafka.log._
-import kafka.metrics.KafkaMetricsGroup
 import kafka.server._
 import kafka.server.checkpoints.OffsetCheckpoints
 import kafka.server.metadata.{KRaftMetadataCache, ZkMetadataCache}
@@ -45,6 +44,7 @@ import org.apache.kafka.common.{IsolationLevel, 
TopicPartition, Uuid}
 import org.apache.kafka.metadata.LeaderRecoveryState
 import org.apache.kafka.server.common.MetadataVersion
 import org.apache.kafka.storage.internals.log.{AppendOrigin, FetchDataInfo, 
FetchIsolation, FetchParams, LeaderHwChange, LogAppendInfo, LogOffsetMetadata, 
LogOffsetSnapshot, LogOffsetsListener, LogReadInfo, 
LogStartOffsetIncrementReason}
+import org.apache.kafka.server.metrics.KafkaMetricsGroup
 
 import scala.collection.{Map, Seq}
 import scala.jdk.CollectionConverters._
@@ -100,7 +100,9 @@ class DelayedOperations(topicPartition: TopicPartition,
   def numDelayedDelete: Int = deleteRecords.numDelayed
 }
 
-object Partition extends KafkaMetricsGroup {
+object Partition {
+  private val metricsGroup = new KafkaMetricsGroup(classOf[Partition])
+
   def apply(topicPartition: TopicPartition,
             time: Time,
             replicaManager: ReplicaManager): Partition = {
@@ -136,13 +138,13 @@ object Partition extends KafkaMetricsGroup {
   }
 
   def removeMetrics(topicPartition: TopicPartition): Unit = {
-    val tags = Map("topic" -> topicPartition.topic, "partition" -> 
topicPartition.partition.toString)
-    removeMetric("UnderReplicated", tags)
-    removeMetric("UnderMinIsr", tags)
-    removeMetric("InSyncReplicasCount", tags)
-    removeMetric("ReplicasCount", tags)
-    removeMetric("LastStableOffsetLag", tags)
-    removeMetric("AtMinIsr", tags)
+    val tags = Map("topic" -> topicPartition.topic, "partition" -> 
topicPartition.partition.toString).asJava
+    metricsGroup.removeMetric("UnderReplicated", tags)
+    metricsGroup.removeMetric("UnderMinIsr", tags)
+    metricsGroup.removeMetric("InSyncReplicasCount", tags)
+    metricsGroup.removeMetric("ReplicasCount", tags)
+    metricsGroup.removeMetric("LastStableOffsetLag", tags)
+    metricsGroup.removeMetric("AtMinIsr", tags)
   }
 }
 
@@ -284,7 +286,9 @@ class Partition(val topicPartition: TopicPartition,
                 delayedOperations: DelayedOperations,
                 metadataCache: MetadataCache,
                 logManager: LogManager,
-                alterIsrManager: AlterPartitionManager) extends Logging with 
KafkaMetricsGroup {
+                alterIsrManager: AlterPartitionManager) extends Logging {
+
+  import Partition.metricsGroup
 
   def topic: String = topicPartition.topic
   def partitionId: Int = topicPartition.partition
@@ -334,14 +338,14 @@ class Partition(val topicPartition: TopicPartition,
   private var controllerEpoch: Int = KafkaController.InitialControllerEpoch
   this.logIdent = s"[Partition $topicPartition broker=$localBrokerId] "
 
-  private val tags = Map("topic" -> topic, "partition" -> partitionId.toString)
+  private val tags = Map("topic" -> topic, "partition" -> 
partitionId.toString).asJava
 
-  newGauge("UnderReplicated", () => if (isUnderReplicated) 1 else 0, tags)
-  newGauge("InSyncReplicasCount", () => if (isLeader) partitionState.isr.size 
else 0, tags)
-  newGauge("UnderMinIsr", () => if (isUnderMinIsr) 1 else 0, tags)
-  newGauge("AtMinIsr", () => if (isAtMinIsr) 1 else 0, tags)
-  newGauge("ReplicasCount", () => if (isLeader) 
assignmentState.replicationFactor else 0, tags)
-  newGauge("LastStableOffsetLag", () => 
log.map(_.lastStableOffsetLag).getOrElse(0), tags)
+  metricsGroup.newGauge("UnderReplicated", () => if (isUnderReplicated) 1 else 
0, tags)
+  metricsGroup.newGauge("InSyncReplicasCount", () => if (isLeader) 
partitionState.isr.size else 0, tags)
+  metricsGroup.newGauge("UnderMinIsr", () => if (isUnderMinIsr) 1 else 0, tags)
+  metricsGroup.newGauge("AtMinIsr", () => if (isAtMinIsr) 1 else 0, tags)
+  metricsGroup.newGauge("ReplicasCount", () => if (isLeader) 
assignmentState.replicationFactor else 0, tags)
+  metricsGroup.newGauge("LastStableOffsetLag", () => 
log.map(_.lastStableOffsetLag).getOrElse(0), tags)
 
   def hasLateTransaction(currentTimeMs: Long): Boolean = 
leaderLogIfLocal.exists(_.hasLateTransaction(currentTimeMs))
 
diff --git 
a/core/src/main/scala/kafka/controller/ControllerChannelManager.scala 
b/core/src/main/scala/kafka/controller/ControllerChannelManager.scala
index 244c61d55a2..2d413a4eb20 100755
--- a/core/src/main/scala/kafka/controller/ControllerChannelManager.scala
+++ b/core/src/main/scala/kafka/controller/ControllerChannelManager.scala
@@ -19,7 +19,6 @@ package kafka.controller
 import com.yammer.metrics.core.{Gauge, Timer}
 import kafka.api._
 import kafka.cluster.Broker
-import kafka.metrics.KafkaMetricsGroup
 import kafka.server.KafkaConfig
 import kafka.utils.Implicits._
 import kafka.utils._
@@ -37,6 +36,7 @@ import org.apache.kafka.common.security.auth.SecurityProtocol
 import org.apache.kafka.common.utils.{LogContext, Time}
 import org.apache.kafka.server.common.MetadataVersion
 import org.apache.kafka.server.common.MetadataVersion._
+import org.apache.kafka.server.metrics.KafkaMetricsGroup
 import org.apache.kafka.server.util.ShutdownableThread
 
 import java.net.SocketTimeoutException
@@ -54,14 +54,16 @@ class ControllerChannelManager(controllerEpoch: () => Int,
                                time: Time,
                                metrics: Metrics,
                                stateChangeLogger: StateChangeLogger,
-                               threadNamePrefix: Option[String] = None) 
extends Logging with KafkaMetricsGroup {
+                               threadNamePrefix: Option[String] = None) 
extends Logging {
   import ControllerChannelManager._
 
+  private val metricsGroup = new KafkaMetricsGroup(this.getClass)
+
   protected val brokerStateInfo = new mutable.HashMap[Int, 
ControllerBrokerStateInfo]
   private val brokerLock = new Object
   this.logIdent = "[Channel manager on controller " + config.brokerId + "]: "
 
-  newGauge("TotalQueueSize",
+  metricsGroup.newGauge("TotalQueueSize",
     () => brokerLock synchronized {
       brokerStateInfo.values.iterator.map(_.messageQueue.size).sum
     }
@@ -169,7 +171,7 @@ class ControllerChannelManager(controllerEpoch: () => Int,
       case Some(name) => 
s"$name:Controller-${config.brokerId}-to-broker-${broker.id}-send-thread"
     }
 
-    val requestRateAndQueueTimeMetrics = newTimer(
+    val requestRateAndQueueTimeMetrics = metricsGroup.newTimer(
       RequestRateAndQueueTimeMetricName, TimeUnit.MILLISECONDS, 
TimeUnit.SECONDS, brokerMetricTags(broker.id)
     )
 
@@ -177,13 +179,13 @@ class ControllerChannelManager(controllerEpoch: () => Int,
       brokerNode, config, time, requestRateAndQueueTimeMetrics, 
stateChangeLogger, threadName)
     requestThread.setDaemon(false)
 
-    val queueSizeGauge = newGauge(QueueSizeMetricName, () => 
messageQueue.size, brokerMetricTags(broker.id))
+    val queueSizeGauge = metricsGroup.newGauge(QueueSizeMetricName, () => 
messageQueue.size, brokerMetricTags(broker.id))
 
     brokerStateInfo.put(broker.id, ControllerBrokerStateInfo(networkClient, 
brokerNode, messageQueue,
       requestThread, queueSizeGauge, requestRateAndQueueTimeMetrics, 
reconfigurableChannelBuilder))
   }
 
-  private def brokerMetricTags(brokerId: Int) = Map("broker-id" -> 
brokerId.toString)
+  private def brokerMetricTags(brokerId: Int) = Map("broker-id" -> 
brokerId.toString).asJava
 
   private def removeExistingBroker(brokerState: ControllerBrokerStateInfo): 
Unit = {
     try {
@@ -195,8 +197,8 @@ class ControllerChannelManager(controllerEpoch: () => Int,
       brokerState.requestSendThread.shutdown()
       brokerState.networkClient.close()
       brokerState.messageQueue.clear()
-      removeMetric(QueueSizeMetricName, 
brokerMetricTags(brokerState.brokerNode.id))
-      removeMetric(RequestRateAndQueueTimeMetricName, 
brokerMetricTags(brokerState.brokerNode.id))
+      metricsGroup.removeMetric(QueueSizeMetricName, 
brokerMetricTags(brokerState.brokerNode.id))
+      metricsGroup.removeMetric(RequestRateAndQueueTimeMetricName, 
brokerMetricTags(brokerState.brokerNode.id))
       brokerStateInfo.remove(brokerState.brokerNode.id)
     } catch {
       case e: Throwable => error("Error while removing broker by the 
controller", e)
diff --git a/core/src/main/scala/kafka/controller/ControllerEventManager.scala 
b/core/src/main/scala/kafka/controller/ControllerEventManager.scala
index d8b2af41303..a1b4835ea97 100644
--- a/core/src/main/scala/kafka/controller/ControllerEventManager.scala
+++ b/core/src/main/scala/kafka/controller/ControllerEventManager.scala
@@ -23,10 +23,10 @@ import java.util.ArrayList
 import java.util.concurrent.atomic.AtomicBoolean
 import java.util.concurrent.{CountDownLatch, LinkedBlockingQueue, TimeUnit}
 import java.util.concurrent.locks.ReentrantLock
-import kafka.metrics.KafkaMetricsGroup
 import kafka.utils.CoreUtils.inLock
 import kafka.utils.Logging
 import org.apache.kafka.common.utils.Time
+import org.apache.kafka.server.metrics.KafkaMetricsGroup
 import org.apache.kafka.server.util.ShutdownableThread
 
 import scala.collection._
@@ -73,18 +73,20 @@ class ControllerEventManager(controllerId: Int,
                              processor: ControllerEventProcessor,
                              time: Time,
                              rateAndTimeMetrics: Map[ControllerState, Timer],
-                             eventQueueTimeTimeoutMs: Long = 300000) extends 
KafkaMetricsGroup {
+                             eventQueueTimeTimeoutMs: Long = 300000) {
   import ControllerEventManager._
 
+  private val metricsGroup = new KafkaMetricsGroup(this.getClass)
+
   @volatile private var _state: ControllerState = ControllerState.Idle
   private val putLock = new ReentrantLock()
   private val queue = new LinkedBlockingQueue[QueuedEvent]
   // Visible for test
   private[controller] var thread = new 
ControllerEventThread(ControllerEventThreadName)
 
-  private val eventQueueTimeHist = newHistogram(EventQueueTimeMetricName)
+  private val eventQueueTimeHist = 
metricsGroup.newHistogram(EventQueueTimeMetricName)
 
-  newGauge(EventQueueSizeMetricName, () => queue.size)
+  metricsGroup.newGauge(EventQueueSizeMetricName, () => queue.size)
 
   def state: ControllerState = _state
 
@@ -96,8 +98,8 @@ class ControllerEventManager(controllerId: Int,
       clearAndPut(ShutdownEventThread)
       thread.awaitShutdown()
     } finally {
-      removeMetric(EventQueueTimeMetricName)
-      removeMetric(EventQueueSizeMetricName)
+      metricsGroup.removeMetric(EventQueueTimeMetricName)
+      metricsGroup.removeMetric(EventQueueSizeMetricName)
     }
   }
 
diff --git a/core/src/main/scala/kafka/controller/KafkaController.scala 
b/core/src/main/scala/kafka/controller/KafkaController.scala
index ebe768d4fb1..64ae5ad39c1 100644
--- a/core/src/main/scala/kafka/controller/KafkaController.scala
+++ b/core/src/main/scala/kafka/controller/KafkaController.scala
@@ -25,7 +25,6 @@ import kafka.common._
 import kafka.cluster.Broker
 import kafka.controller.KafkaController.{AlterReassignmentsCallback, 
ElectLeadersCallback, ListReassignmentsCallback, UpdateFeaturesCallback}
 import kafka.coordinator.transaction.ZkProducerIdManager
-import kafka.metrics.KafkaMetricsGroup
 import kafka.server._
 import kafka.server.metadata.ZkFinalizedFeatureCache
 import kafka.utils._
@@ -47,6 +46,7 @@ import 
org.apache.kafka.common.requests.{AbstractControlRequest, ApiError, Leade
 import org.apache.kafka.common.utils.{Time, Utils}
 import org.apache.kafka.metadata.LeaderRecoveryState
 import org.apache.kafka.server.common.ProducerIdsBlock
+import org.apache.kafka.server.metrics.KafkaMetricsGroup
 import org.apache.kafka.server.util.KafkaScheduler
 import org.apache.zookeeper.KeeperException
 import org.apache.zookeeper.KeeperException.Code
@@ -81,7 +81,9 @@ class KafkaController(val config: KafkaConfig,
                       brokerFeatures: BrokerFeatures,
                       featureCache: ZkFinalizedFeatureCache,
                       threadNamePrefix: Option[String] = None)
-  extends ControllerEventProcessor with Logging with KafkaMetricsGroup {
+  extends ControllerEventProcessor with Logging {
+
+  private val metricsGroup = new KafkaMetricsGroup(this.getClass)
 
   this.logIdent = s"[Controller id=${config.brokerId}] "
 
@@ -142,19 +144,19 @@ class KafkaController(val config: KafkaConfig,
   /* single-thread scheduler to clean expired tokens */
   private val tokenCleanScheduler = new KafkaScheduler(1, true, 
"delegation-token-cleaner")
 
-  newGauge("ActiveControllerCount", () => if (isActive) 1 else 0)
-  newGauge("OfflinePartitionsCount", () => offlinePartitionCount)
-  newGauge("PreferredReplicaImbalanceCount", () => 
preferredReplicaImbalanceCount)
-  newGauge("ControllerState", () => state.value)
-  newGauge("GlobalTopicCount", () => globalTopicCount)
-  newGauge("GlobalPartitionCount", () => globalPartitionCount)
-  newGauge("TopicsToDeleteCount", () => topicsToDeleteCount)
-  newGauge("ReplicasToDeleteCount", () => replicasToDeleteCount)
-  newGauge("TopicsIneligibleToDeleteCount", () => 
ineligibleTopicsToDeleteCount)
-  newGauge("ReplicasIneligibleToDeleteCount", () => 
ineligibleReplicasToDeleteCount)
-  newGauge("ActiveBrokerCount", () => activeBrokerCount)
+  metricsGroup.newGauge("ActiveControllerCount", () => if (isActive) 1 else 0)
+  metricsGroup.newGauge("OfflinePartitionsCount", () => offlinePartitionCount)
+  metricsGroup.newGauge("PreferredReplicaImbalanceCount", () => 
preferredReplicaImbalanceCount)
+  metricsGroup.newGauge("ControllerState", () => state.value)
+  metricsGroup.newGauge("GlobalTopicCount", () => globalTopicCount)
+  metricsGroup.newGauge("GlobalPartitionCount", () => globalPartitionCount)
+  metricsGroup.newGauge("TopicsToDeleteCount", () => topicsToDeleteCount)
+  metricsGroup.newGauge("ReplicasToDeleteCount", () => replicasToDeleteCount)
+  metricsGroup.newGauge("TopicsIneligibleToDeleteCount", () => 
ineligibleTopicsToDeleteCount)
+  metricsGroup.newGauge("ReplicasIneligibleToDeleteCount", () => 
ineligibleReplicasToDeleteCount)
+  metricsGroup.newGauge("ActiveBrokerCount", () => activeBrokerCount)
   // FencedBrokerCount metric is always 0 in the ZK controller.
-  newGauge("FencedBrokerCount", () => 0)
+  metricsGroup.newGauge("FencedBrokerCount", () => 0)
 
   /**
    * Returns true if this broker is the current controller.
@@ -2727,15 +2729,21 @@ case class LeaderIsrAndControllerEpoch(leaderAndIsr: 
LeaderAndIsr, controllerEpo
   }
 }
 
-private[controller] class ControllerStats extends KafkaMetricsGroup {
-  val uncleanLeaderElectionRate = newMeter("UncleanLeaderElectionsPerSec", 
"elections", TimeUnit.SECONDS)
+private[controller] class ControllerStats {
+  private val metricsGroup = new KafkaMetricsGroup(this.getClass)
+
+  val uncleanLeaderElectionRate = 
metricsGroup.newMeter("UncleanLeaderElectionsPerSec", "elections", 
TimeUnit.SECONDS)
 
   val rateAndTimeMetrics: Map[ControllerState, Timer] = 
ControllerState.values.flatMap { state =>
     state.rateAndTimeMetricName.map { metricName =>
-      state -> newTimer(metricName, TimeUnit.MILLISECONDS, TimeUnit.SECONDS)
+      state -> metricsGroup.newTimer(metricName, TimeUnit.MILLISECONDS, 
TimeUnit.SECONDS)
     }
   }.toMap
 
+  // For test.
+  def removeMetric(name: String): Unit = {
+    metricsGroup.removeMetric(name)
+  }
 }
 
 sealed trait ControllerEvent {
diff --git 
a/core/src/main/scala/kafka/coordinator/group/GroupMetadataManager.scala 
b/core/src/main/scala/kafka/coordinator/group/GroupMetadataManager.scala
index 3e0317fbd9f..b45ee5f4736 100644
--- a/core/src/main/scala/kafka/coordinator/group/GroupMetadataManager.scala
+++ b/core/src/main/scala/kafka/coordinator/group/GroupMetadataManager.scala
@@ -26,7 +26,6 @@ import java.util.concurrent.locks.ReentrantLock
 import java.util.concurrent.ConcurrentHashMap
 import com.yammer.metrics.core.Gauge
 import kafka.common.OffsetAndMetadata
-import kafka.metrics.KafkaMetricsGroup
 import kafka.server.{ReplicaManager, RequestLocal}
 import kafka.utils.CoreUtils.inLock
 import kafka.utils.Implicits._
@@ -46,6 +45,7 @@ import org.apache.kafka.common.{KafkaException, 
MessageFormatter, TopicPartition
 import org.apache.kafka.coordinator.group.generated.{GroupMetadataValue, 
OffsetCommitKey, OffsetCommitValue, GroupMetadataKey => GroupMetadataKeyData}
 import org.apache.kafka.server.common.MetadataVersion
 import org.apache.kafka.server.common.MetadataVersion.{IBP_0_10_1_IV0, 
IBP_2_1_IV0, IBP_2_1_IV1, IBP_2_3_IV0}
+import org.apache.kafka.server.metrics.KafkaMetricsGroup
 import org.apache.kafka.server.util.KafkaScheduler
 import org.apache.kafka.storage.internals.log.{AppendOrigin, FetchIsolation}
 
@@ -58,7 +58,9 @@ class GroupMetadataManager(brokerId: Int,
                            config: OffsetConfig,
                            val replicaManager: ReplicaManager,
                            time: Time,
-                           metrics: Metrics) extends Logging with 
KafkaMetricsGroup {
+                           metrics: Metrics) extends Logging {
+  // Visible for test.
+  private[group] val metricsGroup: KafkaMetricsGroup = new 
KafkaMetricsGroup(this.getClass)
 
   private val compressionType: CompressionType = 
config.offsetsTopicCompressionType
 
@@ -123,8 +125,8 @@ class GroupMetadataManager(brokerId: Int,
   this.logIdent = s"[GroupMetadataManager brokerId=$brokerId] "
 
   private def recreateGauge[T](name: String, gauge: Gauge[T]): Gauge[T] = {
-    removeMetric(name)
-    newGauge(name, gauge)
+    metricsGroup.removeMetric(name)
+    metricsGroup.newGauge(name, gauge)
   }
 
   recreateGauge("NumOffsets",
diff --git 
a/core/src/main/scala/kafka/coordinator/transaction/TransactionMarkerChannelManager.scala
 
b/core/src/main/scala/kafka/coordinator/transaction/TransactionMarkerChannelManager.scala
index 94baf0f976d..049aae97c7e 100644
--- 
a/core/src/main/scala/kafka/coordinator/transaction/TransactionMarkerChannelManager.scala
+++ 
b/core/src/main/scala/kafka/coordinator/transaction/TransactionMarkerChannelManager.scala
@@ -21,7 +21,6 @@ import java.util
 import java.util.concurrent.{BlockingQueue, ConcurrentHashMap, 
LinkedBlockingQueue}
 
 import kafka.common.{InterBrokerSendThread, RequestAndCompletionHandler}
-import kafka.metrics.KafkaMetricsGroup
 import kafka.server.{KafkaConfig, MetadataCache, RequestLocal}
 import kafka.utils.Implicits._
 import kafka.utils.{CoreUtils, Logging}
@@ -35,6 +34,7 @@ import org.apache.kafka.common.security.JaasContext
 import org.apache.kafka.common.utils.{LogContext, Time}
 import org.apache.kafka.common.{Node, Reconfigurable, TopicPartition}
 import org.apache.kafka.server.common.MetadataVersion.IBP_2_8_IV0
+import org.apache.kafka.server.metrics.KafkaMetricsGroup
 
 import scala.collection.{concurrent, immutable}
 import scala.jdk.CollectionConverters._
@@ -133,7 +133,9 @@ class TransactionMarkerChannelManager(
   txnStateManager: TransactionStateManager,
   time: Time
 ) extends InterBrokerSendThread("TxnMarkerSenderThread-" + config.brokerId, 
networkClient, config.requestTimeoutMs, time)
-  with Logging with KafkaMetricsGroup {
+  with Logging {
+
+  private val metricsGroup = new KafkaMetricsGroup(this.getClass)
 
   this.logIdent = "[Transaction Marker Channel Manager " + config.brokerId + 
"]: "
 
@@ -151,8 +153,8 @@ class TransactionMarkerChannelManager(
     if (config.interBrokerProtocolVersion.isAtLeast(IBP_2_8_IV0)) 1
     else 0
 
-  newGauge("UnknownDestinationQueueSize", () => 
markersQueueForUnknownBroker.totalNumMarkers)
-  newGauge("LogAppendRetryQueueSize", () => txnLogAppendRetryQueue.size)
+  metricsGroup.newGauge("UnknownDestinationQueueSize", () => 
markersQueueForUnknownBroker.totalNumMarkers)
+  metricsGroup.newGauge("LogAppendRetryQueueSize", () => 
txnLogAppendRetryQueue.size)
 
   override def shutdown(): Unit = {
     super.shutdown()
diff --git a/core/src/main/scala/kafka/log/LocalLog.scala 
b/core/src/main/scala/kafka/log/LocalLog.scala
index 97d3f5db685..34442e3c3f5 100644
--- a/core/src/main/scala/kafka/log/LocalLog.scala
+++ b/core/src/main/scala/kafka/log/LocalLog.scala
@@ -17,7 +17,6 @@
 
 package kafka.log
 
-import kafka.metrics.KafkaMetricsGroup
 import kafka.utils.Logging
 import org.apache.kafka.common.errors.{KafkaStorageException, 
OffsetOutOfRangeException}
 import org.apache.kafka.common.message.FetchResponseData
@@ -69,7 +68,7 @@ class LocalLog(@volatile private var _dir: File,
                private[log] val scheduler: Scheduler,
                private[log] val time: Time,
                private[log] val topicPartition: TopicPartition,
-               private[log] val logDirFailureChannel: LogDirFailureChannel) 
extends Logging with KafkaMetricsGroup {
+               private[log] val logDirFailureChannel: LogDirFailureChannel) 
extends Logging {
 
   import kafka.log.LocalLog._
 
diff --git a/core/src/main/scala/kafka/log/LogCleaner.scala 
b/core/src/main/scala/kafka/log/LogCleaner.scala
index f03211b2bdb..25511976e31 100644
--- a/core/src/main/scala/kafka/log/LogCleaner.scala
+++ b/core/src/main/scala/kafka/log/LogCleaner.scala
@@ -22,7 +22,6 @@ import java.nio._
 import java.util.Date
 import java.util.concurrent.TimeUnit
 import kafka.common._
-import kafka.metrics.KafkaMetricsGroup
 import kafka.server.{BrokerReconfigurable, KafkaConfig}
 import kafka.utils._
 import org.apache.kafka.common.{KafkaException, TopicPartition}
@@ -32,6 +31,7 @@ import 
org.apache.kafka.common.record.MemoryRecords.RecordFilter
 import org.apache.kafka.common.record.MemoryRecords.RecordFilter.BatchRetention
 import org.apache.kafka.common.record._
 import org.apache.kafka.common.utils.{BufferSupplier, Time}
+import org.apache.kafka.server.metrics.KafkaMetricsGroup
 import org.apache.kafka.server.util.ShutdownableThread
 import org.apache.kafka.storage.internals.log.{AbortedTxn, CleanerConfig, 
LastRecord, LogDirFailureChannel, OffsetMap, SkimpyOffsetMap, TransactionIndex}
 
@@ -96,8 +96,9 @@ class LogCleaner(initialConfig: CleanerConfig,
                  val logDirs: Seq[File],
                  val logs: Pool[TopicPartition, UnifiedLog],
                  val logDirFailureChannel: LogDirFailureChannel,
-                 time: Time = Time.SYSTEM) extends Logging with 
KafkaMetricsGroup with BrokerReconfigurable
-{
+                 time: Time = Time.SYSTEM) extends Logging with 
BrokerReconfigurable {
+  // Visible for test.
+  private[log] val metricsGroup = new KafkaMetricsGroup(this.getClass)
 
   /* Log cleaner configuration which may be dynamically updated */
   @volatile private var config = initialConfig
@@ -125,27 +126,27 @@ class LogCleaner(initialConfig: CleanerConfig,
 
 
   /* a metric to track the maximum utilization of any thread's buffer in the 
last cleaning */
-  newGauge("max-buffer-utilization-percent",
+  metricsGroup.newGauge("max-buffer-utilization-percent",
     () => maxOverCleanerThreads(_.lastStats.bufferUtilization) * 100)
 
   /* a metric to track the recopy rate of each thread's last cleaning */
-  newGauge("cleaner-recopy-percent", () => {
+  metricsGroup.newGauge("cleaner-recopy-percent", () => {
     val stats = cleaners.map(_.lastStats)
     val recopyRate = stats.iterator.map(_.bytesWritten).sum.toDouble / 
math.max(stats.iterator.map(_.bytesRead).sum, 1)
     (100 * recopyRate).toInt
   })
 
   /* a metric to track the maximum cleaning time for the last cleaning from 
each thread */
-  newGauge("max-clean-time-secs",
+  metricsGroup.newGauge("max-clean-time-secs",
     () => maxOverCleanerThreads(_.lastStats.elapsedSecs))
 
 
   // a metric to track delay between the time when a log is required to be 
compacted
   // as determined by max compaction lag and the time of last cleaner run.
-  newGauge("max-compaction-delay-secs",
+  metricsGroup.newGauge("max-compaction-delay-secs",
     () => 
maxOverCleanerThreads(_.lastPreCleanStats.maxCompactionDelayMs.toDouble) / 1000)
 
-  newGauge("DeadThreadCount", () => deadThreadCount)
+  metricsGroup.newGauge("DeadThreadCount", () => deadThreadCount)
 
   private[log] def deadThreadCount: Int = cleaners.count(_.isThreadFailed)
 
diff --git a/core/src/main/scala/kafka/log/LogCleanerManager.scala 
b/core/src/main/scala/kafka/log/LogCleanerManager.scala
index db2738ba6d6..ef5df50ca8f 100755
--- a/core/src/main/scala/kafka/log/LogCleanerManager.scala
+++ b/core/src/main/scala/kafka/log/LogCleanerManager.scala
@@ -22,7 +22,6 @@ import java.util.concurrent.TimeUnit
 import java.util.concurrent.locks.ReentrantLock
 
 import kafka.common.LogCleaningAbortedException
-import kafka.metrics.KafkaMetricsGroup
 import kafka.server.checkpoints.OffsetCheckpointFile
 import kafka.utils.CoreUtils._
 import kafka.utils.{Logging, Pool}
@@ -30,8 +29,10 @@ import org.apache.kafka.common.{KafkaException, 
TopicPartition}
 import org.apache.kafka.common.errors.KafkaStorageException
 import org.apache.kafka.common.utils.Time
 import org.apache.kafka.storage.internals.log.LogDirFailureChannel
+import org.apache.kafka.server.metrics.KafkaMetricsGroup
 
 import scala.collection.{Iterable, Seq, mutable}
+import scala.jdk.CollectionConverters._
 
 private[log] sealed trait LogCleaningState
 private[log] case object LogCleaningInProgress extends LogCleaningState
@@ -60,9 +61,10 @@ private[log] class LogCleaningException(val log: UnifiedLog,
   */
 private[log] class LogCleanerManager(val logDirs: Seq[File],
                                      val logs: Pool[TopicPartition, 
UnifiedLog],
-                                     val logDirFailureChannel: 
LogDirFailureChannel) extends Logging with KafkaMetricsGroup {
+                                     val logDirFailureChannel: 
LogDirFailureChannel) extends Logging {
   import LogCleanerManager._
 
+  private val metricsGroup = new KafkaMetricsGroup(this.getClass)
 
   protected override def loggerName: String = classOf[LogCleaner].getName
 
@@ -88,15 +90,15 @@ private[log] class LogCleanerManager(val logDirs: Seq[File],
 
   /* gauges for tracking the number of partitions marked as uncleanable for 
each log directory */
   for (dir <- logDirs) {
-    newGauge("uncleanable-partitions-count",
+    metricsGroup.newGauge("uncleanable-partitions-count",
       () => inLock(lock) { 
uncleanablePartitions.get(dir.getAbsolutePath).map(_.size).getOrElse(0) },
-      Map("logDirectory" -> dir.getAbsolutePath)
+      Map("logDirectory" -> dir.getAbsolutePath).asJava
     )
   }
 
   /* gauges for tracking the number of uncleanable bytes from uncleanable 
partitions for each log directory */
   for (dir <- logDirs) {
-    newGauge("uncleanable-bytes",
+    metricsGroup.newGauge("uncleanable-bytes",
       () => inLock(lock) {
         uncleanablePartitions.get(dir.getAbsolutePath) match {
           case Some(partitions) =>
@@ -114,17 +116,17 @@ private[log] class LogCleanerManager(val logDirs: 
Seq[File],
           case None => 0
         }
       },
-      Map("logDirectory" -> dir.getAbsolutePath)
+      Map("logDirectory" -> dir.getAbsolutePath).asJava
     )
   }
 
   /* a gauge for tracking the cleanable ratio of the dirtiest log */
   @volatile private var dirtiestLogCleanableRatio = 0.0
-  newGauge("max-dirty-percent", () => (100 * dirtiestLogCleanableRatio).toInt)
+  metricsGroup.newGauge("max-dirty-percent", () => (100 * 
dirtiestLogCleanableRatio).toInt)
 
   /* a gauge for tracking the time since the last log cleaner run, in milli 
seconds */
   @volatile private var timeOfLastRun: Long = Time.SYSTEM.milliseconds
-  newGauge("time-since-last-run-ms", () => Time.SYSTEM.milliseconds - 
timeOfLastRun)
+  metricsGroup.newGauge("time-since-last-run-ms", () => 
Time.SYSTEM.milliseconds - timeOfLastRun)
 
   /**
    * @return the position processed for all logs.
diff --git a/core/src/main/scala/kafka/log/LogManager.scala 
b/core/src/main/scala/kafka/log/LogManager.scala
index d2b9ec56f08..381872acd2f 100755
--- a/core/src/main/scala/kafka/log/LogManager.scala
+++ b/core/src/main/scala/kafka/log/LogManager.scala
@@ -23,7 +23,6 @@ import java.io._
 import java.nio.file.Files
 import java.util.concurrent._
 import java.util.concurrent.atomic.AtomicInteger
-import kafka.metrics.KafkaMetricsGroup
 import kafka.server.checkpoints.OffsetCheckpointFile
 import kafka.server.metadata.ConfigRepository
 import kafka.server._
@@ -42,6 +41,7 @@ import org.apache.kafka.common.config.TopicConfig
 import java.util.Properties
 import org.apache.kafka.server.common.MetadataVersion
 import org.apache.kafka.storage.internals.log.LogConfig.MessageFormatVersion
+import org.apache.kafka.server.metrics.KafkaMetricsGroup
 import org.apache.kafka.server.util.Scheduler
 import org.apache.kafka.storage.internals.log.{CleanerConfig, LogConfig, 
LogDirFailureChannel, ProducerStateManagerConfig}
 
@@ -76,10 +76,12 @@ class LogManager(logDirs: Seq[File],
                  brokerTopicStats: BrokerTopicStats,
                  logDirFailureChannel: LogDirFailureChannel,
                  time: Time,
-                 val keepPartitionMetadataFile: Boolean) extends Logging with 
KafkaMetricsGroup {
+                 val keepPartitionMetadataFile: Boolean) extends Logging {
 
   import LogManager._
 
+  private val metricsGroup = new KafkaMetricsGroup(this.getClass)
+
   val InitialTaskDelayMs = 30 * 1000
 
   private val logCreationOrDeletionLock = new Object
@@ -132,12 +134,12 @@ class LogManager(logDirs: Seq[File],
   @volatile private var _cleaner: LogCleaner = _
   private[kafka] def cleaner: LogCleaner = _cleaner
 
-  newGauge("OfflineLogDirectoryCount", () => offlineLogDirs.size)
+  metricsGroup.newGauge("OfflineLogDirectoryCount", () => offlineLogDirs.size)
 
   for (dir <- logDirs) {
-    newGauge("LogDirectoryOffline",
+    metricsGroup.newGauge("LogDirectoryOffline",
       () => if (_liveLogDirs.contains(dir)) 0 else 1,
-      Map("logDirectory" -> dir.getAbsolutePath))
+      Map("logDirectory" -> dir.getAbsolutePath).asJava)
   }
 
   /**
@@ -471,12 +473,12 @@ class LogManager(logDirs: Seq[File],
                                          numRemainingSegments: 
ConcurrentMap[String, Int]): Unit = {
     debug("Adding log recovery metrics")
     for (dir <- logDirs) {
-      newGauge("remainingLogsToRecover", () => 
numRemainingLogs.get(dir.getAbsolutePath),
-        Map("dir" -> dir.getAbsolutePath))
+      metricsGroup.newGauge("remainingLogsToRecover", () => 
numRemainingLogs.get(dir.getAbsolutePath),
+        Map("dir" -> dir.getAbsolutePath).asJava)
       for (i <- 0 until numRecoveryThreadsPerDataDir) {
         val threadName = logRecoveryThreadName(dir.getAbsolutePath, i)
-        newGauge("remainingSegmentsToRecover", () => 
numRemainingSegments.get(threadName),
-          Map("dir" -> dir.getAbsolutePath, "threadNum" -> i.toString))
+        metricsGroup.newGauge("remainingSegmentsToRecover", () => 
numRemainingSegments.get(threadName),
+          Map("dir" -> dir.getAbsolutePath, "threadNum" -> i.toString).asJava)
       }
     }
   }
@@ -484,9 +486,9 @@ class LogManager(logDirs: Seq[File],
   private[log] def removeLogRecoveryMetrics(): Unit = {
     debug("Removing log recovery metrics")
     for (dir <- logDirs) {
-      removeMetric("remainingLogsToRecover", Map("dir" -> dir.getAbsolutePath))
+      metricsGroup.removeMetric("remainingLogsToRecover", Map("dir" -> 
dir.getAbsolutePath).asJava)
       for (i <- 0 until numRecoveryThreadsPerDataDir) {
-        removeMetric("remainingSegmentsToRecover", Map("dir" -> 
dir.getAbsolutePath, "threadNum" -> i.toString))
+        metricsGroup.removeMetric("remainingSegmentsToRecover", Map("dir" -> 
dir.getAbsolutePath, "threadNum" -> i.toString).asJava)
       }
     }
   }
@@ -575,9 +577,9 @@ class LogManager(logDirs: Seq[File],
   def shutdown(): Unit = {
     info("Shutting down.")
 
-    removeMetric("OfflineLogDirectoryCount")
+    metricsGroup.removeMetric("OfflineLogDirectoryCount")
     for (dir <- logDirs) {
-      removeMetric("LogDirectoryOffline", Map("logDirectory" -> 
dir.getAbsolutePath))
+      metricsGroup.removeMetric("LogDirectoryOffline", Map("logDirectory" -> 
dir.getAbsolutePath).asJava)
     }
 
     val threadPools = ArrayBuffer.empty[ExecutorService]
diff --git a/core/src/main/scala/kafka/log/LogSegment.scala 
b/core/src/main/scala/kafka/log/LogSegment.scala
index 7b2215de911..21a2a8c58fd 100644
--- a/core/src/main/scala/kafka/log/LogSegment.scala
+++ b/core/src/main/scala/kafka/log/LogSegment.scala
@@ -18,13 +18,13 @@ package kafka.log
 
 import com.yammer.metrics.core.Timer
 import kafka.common.LogSegmentOffsetOverflowException
-import kafka.metrics.KafkaMetricsGroup
 import kafka.utils._
 import org.apache.kafka.common.InvalidRecordException
 import org.apache.kafka.common.errors.CorruptRecordException
 import org.apache.kafka.common.record.FileRecords.{LogOffsetPosition, 
TimestampAndOffset}
 import org.apache.kafka.common.record._
 import org.apache.kafka.common.utils.{BufferSupplier, Time, Utils}
+import org.apache.kafka.server.metrics.KafkaMetricsGroup
 import org.apache.kafka.storage.internals.epoch.LeaderEpochFileCache
 import org.apache.kafka.storage.internals.log.{AbortedTxn, AppendOrigin, 
CompletedTxn, FetchDataInfo, LazyIndex, LogConfig, LogOffsetMetadata, 
OffsetIndex, OffsetPosition, ProducerStateManager, RollParams, TimeIndex, 
TimestampOffset, TransactionIndex, TxnIndexSearchResult}
 
@@ -688,6 +688,7 @@ object LogSegment {
   }
 }
 
-object LogFlushStats extends KafkaMetricsGroup {
-  val logFlushTimer: Timer = newTimer("LogFlushRateAndTimeMs", 
TimeUnit.MILLISECONDS, TimeUnit.SECONDS)
+object LogFlushStats {
+  private val metricsGroup = new KafkaMetricsGroup(LogFlushStats.getClass)
+  val logFlushTimer: Timer = metricsGroup.newTimer("LogFlushRateAndTimeMs", 
TimeUnit.MILLISECONDS, TimeUnit.SECONDS)
 }
diff --git a/core/src/main/scala/kafka/log/UnifiedLog.scala 
b/core/src/main/scala/kafka/log/UnifiedLog.scala
index 1d001d87e3c..4463c2f2096 100644
--- a/core/src/main/scala/kafka/log/UnifiedLog.scala
+++ b/core/src/main/scala/kafka/log/UnifiedLog.scala
@@ -20,7 +20,6 @@ package kafka.log
 import com.yammer.metrics.core.MetricName
 import kafka.common.{OffsetsOutOfOrderException, 
UnexpectedAppendOffsetException}
 import kafka.log.remote.RemoteLogManager
-import kafka.metrics.KafkaMetricsGroup
 import kafka.server.{BrokerTopicMetrics, BrokerTopicStats, 
PartitionMetadataFile, RequestLocal}
 import kafka.utils._
 import org.apache.kafka.common.errors._
@@ -36,6 +35,7 @@ import org.apache.kafka.common.{InvalidRecordException, 
KafkaException, TopicPar
 import org.apache.kafka.server.common.{MetadataVersion, OffsetAndEpoch}
 import org.apache.kafka.server.common.MetadataVersion.IBP_0_10_0_IV0
 import 
org.apache.kafka.server.log.remote.metadata.storage.TopicBasedRemoteLogMetadataManagerConfig
+import org.apache.kafka.server.metrics.KafkaMetricsGroup
 import org.apache.kafka.server.record.BrokerCompressionType
 import org.apache.kafka.server.util.Scheduler
 import org.apache.kafka.storage.internals.checkpoint.LeaderEpochCheckpointFile
@@ -44,6 +44,7 @@ import org.apache.kafka.storage.internals.log.{AbortedTxn, 
AppendOrigin, BatchMe
 
 import java.io.{File, IOException}
 import java.nio.file.Files
+import java.util
 import java.util.concurrent.{ConcurrentHashMap, ConcurrentMap}
 import java.util.{Collections, Optional, OptionalInt, OptionalLong}
 import scala.annotation.nowarn
@@ -105,10 +106,17 @@ class UnifiedLog(@volatile var logStartOffset: Long,
                  val keepPartitionMetadataFile: Boolean,
                  val remoteStorageSystemEnable: Boolean = false,
                  remoteLogManager: Option[RemoteLogManager] = None,
-                 @volatile private var logOffsetsListener: LogOffsetsListener 
= LogOffsetsListener.NO_OP_OFFSETS_LISTENER) extends Logging with 
KafkaMetricsGroup {
+                 @volatile private var logOffsetsListener: LogOffsetsListener 
= LogOffsetsListener.NO_OP_OFFSETS_LISTENER) extends Logging {
 
   import kafka.log.UnifiedLog._
 
+  private val metricsGroup = new KafkaMetricsGroup(this.getClass) {
+    // For compatibility, metrics are defined to be under `Log` class
+    override def metricName(name: String, tags: util.Map[String, String]): 
MetricName = {
+      KafkaMetricsGroup.explicitMetricName(getClass.getPackage.getName, "Log", 
name, tags)
+    }
+  }
+
   this.logIdent = s"[UnifiedLog partition=$topicPartition, dir=$parentDir] "
 
   /* A lock that guards all modifications to the log */
@@ -420,16 +428,16 @@ class UnifiedLog(@volatile var logStartOffset: Long,
   }
 
 
-  private var metricNames: Map[String, Map[String, String]] = Map.empty
+  private var metricNames: Map[String, java.util.Map[String, String]] = 
Map.empty
 
   newMetrics()
   private[log] def newMetrics(): Unit = {
-    val tags = Map("topic" -> topicPartition.topic, "partition" -> 
topicPartition.partition.toString) ++
-      (if (isFuture) Map("is-future" -> "true") else Map.empty)
-    newGauge(LogMetricNames.NumLogSegments, () => numberOfSegments, tags)
-    newGauge(LogMetricNames.LogStartOffset, () => logStartOffset, tags)
-    newGauge(LogMetricNames.LogEndOffset, () => logEndOffset, tags)
-    newGauge(LogMetricNames.Size, () => size, tags)
+    val tags = (Map("topic" -> topicPartition.topic, "partition" -> 
topicPartition.partition.toString) ++
+      (if (isFuture) Map("is-future" -> "true") else Map.empty)).asJava
+    metricsGroup.newGauge(LogMetricNames.NumLogSegments, () => 
numberOfSegments, tags)
+    metricsGroup.newGauge(LogMetricNames.LogStartOffset, () => logStartOffset, 
tags)
+    metricsGroup.newGauge(LogMetricNames.LogEndOffset, () => logEndOffset, 
tags)
+    metricsGroup.newGauge(LogMetricNames.Size, () => size, tags)
     metricNames = Map(LogMetricNames.NumLogSegments -> tags,
       LogMetricNames.LogStartOffset -> tags,
       LogMetricNames.LogEndOffset -> tags,
@@ -447,13 +455,6 @@ class UnifiedLog(@volatile var logStartOffset: Long,
     }
   }
 
-  // For compatibility, metrics are defined to be under `Log` class
-  override def metricName(name: String, tags: scala.collection.Map[String, 
String]): MetricName = {
-    val pkg = getClass.getPackage
-    val pkgStr = if (pkg == null) "" else pkg.getName
-    explicitMetricName(pkgStr, "Log", name, tags)
-  }
-
   def loadProducerState(lastOffset: Long): Unit = lock synchronized {
     rebuildProducerState(lastOffset, producerStateManager)
     maybeIncrementFirstUnstableOffset()
@@ -1699,7 +1700,7 @@ class UnifiedLog(@volatile var logStartOffset: Long,
    */
   private[log] def removeLogMetrics(): Unit = {
     metricNames.foreach {
-      case (name, tags) => removeMetric(name, tags)
+      case (name, tags) => metricsGroup.removeMetric(name, tags)
     }
     metricNames = Map.empty
   }
diff --git a/core/src/main/scala/kafka/log/remote/RemoteLogManager.scala 
b/core/src/main/scala/kafka/log/remote/RemoteLogManager.scala
index 8394c74afbd..a0fa0058000 100644
--- a/core/src/main/scala/kafka/log/remote/RemoteLogManager.scala
+++ b/core/src/main/scala/kafka/log/remote/RemoteLogManager.scala
@@ -17,7 +17,6 @@
 package kafka.log.remote
 
 import kafka.cluster.Partition
-import kafka.metrics.KafkaMetricsGroup
 import kafka.server.KafkaConfig
 import kafka.utils.Logging
 import org.apache.kafka.common._
@@ -48,7 +47,7 @@ import scala.jdk.CollectionConverters._
  */
 class RemoteLogManager(rlmConfig: RemoteLogManagerConfig,
                        brokerId: Int,
-                       logDir: String) extends Logging with Closeable with 
KafkaMetricsGroup {
+                       logDir: String) extends Logging with Closeable {
 
   // topic ids received on leadership changes
   private val topicPartitionIds: ConcurrentMap[TopicPartition, Uuid] = new 
ConcurrentHashMap[TopicPartition, Uuid]()
diff --git a/core/src/main/scala/kafka/metrics/KafkaMetricsGroup.scala 
b/core/src/main/scala/kafka/metrics/KafkaMetricsGroup.scala
deleted file mode 100644
index 8a9ed89f3fc..00000000000
--- a/core/src/main/scala/kafka/metrics/KafkaMetricsGroup.scala
+++ /dev/null
@@ -1,111 +0,0 @@
-/**
- * Licensed to the Apache Software Foundation (ASF) under one or more
- * contributor license agreements.  See the NOTICE file distributed with
- * this work for additional information regarding copyright ownership.
- * The ASF licenses this file to You under the Apache License, Version 2.0
- * (the "License"); you may not use this file except in compliance with
- * the License.  You may obtain a copy of the License at
- *
- * http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
- */
-
-package kafka.metrics
-
-import java.util.concurrent.TimeUnit
-
-import com.yammer.metrics.core.{Gauge, Histogram, Meter, MetricName, Timer}
-import kafka.utils.Logging
-import org.apache.kafka.common.utils.Sanitizer
-import org.apache.kafka.server.metrics.KafkaYammerMetrics
-
-trait KafkaMetricsGroup extends Logging {
-
-  /**
-   * Creates a new MetricName object for gauges, meters, etc. created for this
-   * metrics group.
-   * @param name Descriptive name of the metric.
-   * @param tags Additional attributes which mBean will have.
-   * @return Sanitized metric name object.
-   */
-  def metricName(name: String, tags: scala.collection.Map[String, String]): 
MetricName = {
-    val klass = this.getClass
-    val pkg = if (klass.getPackage == null) "" else klass.getPackage.getName
-    val simpleName = klass.getSimpleName.replaceAll("\\$$", "")
-
-    explicitMetricName(pkg, simpleName, name, tags)
-  }
-
-
-  def explicitMetricName(group: String, typeName: String, name: String,
-                                   tags: scala.collection.Map[String, 
String]): MetricName = {
-
-    val nameBuilder: StringBuilder = new StringBuilder
-
-    nameBuilder.append(group)
-
-    nameBuilder.append(":type=")
-
-    nameBuilder.append(typeName)
-
-    if (name.nonEmpty) {
-      nameBuilder.append(",name=")
-      nameBuilder.append(name)
-    }
-
-    val scope: String = toScope(tags).orNull
-    val tagsName = toMBeanName(tags)
-    tagsName.foreach(nameBuilder.append(",").append(_))
-
-    new MetricName(group, typeName, name, scope, nameBuilder.toString)
-  }
-
-  def newGauge[T](name: String, metric: Gauge[T], tags: 
scala.collection.Map[String, String] = Map.empty): Gauge[T] =
-    KafkaYammerMetrics.defaultRegistry().newGauge(metricName(name, tags), 
metric)
-
-  def newMeter(name: String, eventType: String, timeUnit: TimeUnit, tags: 
scala.collection.Map[String, String] = Map.empty): Meter =
-    KafkaYammerMetrics.defaultRegistry().newMeter(metricName(name, tags), 
eventType, timeUnit)
-
-  def newMeter(metricName: MetricName, eventType: String, timeUnit: TimeUnit): 
Meter =
-    KafkaYammerMetrics.defaultRegistry().newMeter(metricName, eventType, 
timeUnit)
-
-  def newHistogram(name: String, biased: Boolean = true, tags: 
scala.collection.Map[String, String] = Map.empty): Histogram =
-    KafkaYammerMetrics.defaultRegistry().newHistogram(metricName(name, tags), 
biased)
-
-  def newTimer(name: String, durationUnit: TimeUnit, rateUnit: TimeUnit, tags: 
scala.collection.Map[String, String] = Map.empty): Timer =
-    KafkaYammerMetrics.defaultRegistry().newTimer(metricName(name, tags), 
durationUnit, rateUnit)
-
-  def removeMetric(name: String, tags: scala.collection.Map[String, String] = 
Map.empty): Unit =
-    KafkaYammerMetrics.defaultRegistry().removeMetric(metricName(name, tags))
-
-  private def toMBeanName(tags: collection.Map[String, String]): 
Option[String] = {
-    val filteredTags = tags.filter { case (_, tagValue) => tagValue != "" }
-    if (filteredTags.nonEmpty) {
-      val tagsString = filteredTags.map { case (key, value) => 
"%s=%s".format(key, Sanitizer.jmxSanitize(value)) }.mkString(",")
-      Some(tagsString)
-    }
-    else None
-  }
-
-  private def toScope(tags: collection.Map[String, String]): Option[String] = {
-    val filteredTags = tags.filter { case (_, tagValue) => tagValue != ""}
-    if (filteredTags.nonEmpty) {
-      // convert dot to _ since reporters like Graphite typically use dot to 
represent hierarchy
-      val tagsString = filteredTags
-        .toList.sortBy(_._1)
-        .map { case (key, value) => "%s.%s".format(key, 
value.replaceAll("\\.", "_"))}
-        .mkString(".")
-
-      Some(tagsString)
-    }
-    else None
-  }
-
-}
-
-object KafkaMetricsGroup extends KafkaMetricsGroup
diff --git a/core/src/main/scala/kafka/network/RequestChannel.scala 
b/core/src/main/scala/kafka/network/RequestChannel.scala
index 8686d15a1d3..0c5cc373f5d 100644
--- a/core/src/main/scala/kafka/network/RequestChannel.scala
+++ b/core/src/main/scala/kafka/network/RequestChannel.scala
@@ -23,7 +23,6 @@ import java.util.concurrent._
 import com.fasterxml.jackson.databind.JsonNode
 import com.typesafe.scalalogging.Logger
 import com.yammer.metrics.core.Meter
-import kafka.metrics.KafkaMetricsGroup
 import kafka.network
 import kafka.server.KafkaConfig
 import kafka.utils.{Logging, NotNothing, Pool}
@@ -37,7 +36,9 @@ import org.apache.kafka.common.protocol.{ApiKeys, Errors}
 import org.apache.kafka.common.requests._
 import org.apache.kafka.common.security.auth.KafkaPrincipal
 import org.apache.kafka.common.utils.{Sanitizer, Time}
+import org.apache.kafka.server.metrics.KafkaMetricsGroup
 
+import java.util
 import scala.annotation.nowarn
 import scala.collection.mutable
 import scala.jdk.CollectionConverters._
@@ -344,16 +345,19 @@ object RequestChannel extends Logging {
 class RequestChannel(val queueSize: Int,
                      val metricNamePrefix: String,
                      time: Time,
-                     val metrics: RequestChannel.Metrics) extends 
KafkaMetricsGroup {
+                     val metrics: RequestChannel.Metrics) {
   import RequestChannel._
+
+  private val metricsGroup = new KafkaMetricsGroup(this.getClass)
+
   private val requestQueue = new ArrayBlockingQueue[BaseRequest](queueSize)
   private val processors = new ConcurrentHashMap[Int, Processor]()
   val requestQueueSizeMetricName = 
metricNamePrefix.concat(RequestQueueSizeMetric)
   val responseQueueSizeMetricName = 
metricNamePrefix.concat(ResponseQueueSizeMetric)
 
-  newGauge(requestQueueSizeMetricName, () => requestQueue.size)
+  metricsGroup.newGauge(requestQueueSizeMetricName, () => requestQueue.size)
 
-  newGauge(responseQueueSizeMetricName, () => {
+  metricsGroup.newGauge(responseQueueSizeMetricName, () => {
     processors.values.asScala.foldLeft(0) {(total, processor) =>
       total + processor.responseQueueSize
     }
@@ -363,13 +367,13 @@ class RequestChannel(val queueSize: Int,
     if (processors.putIfAbsent(processor.id, processor) != null)
       warn(s"Unexpected processor with processorId ${processor.id}")
 
-    newGauge(responseQueueSizeMetricName, () => processor.responseQueueSize,
-      Map(ProcessorMetricTag -> processor.id.toString))
+    metricsGroup.newGauge(responseQueueSizeMetricName, () => 
processor.responseQueueSize,
+      Map(ProcessorMetricTag -> processor.id.toString).asJava)
   }
 
   def removeProcessor(processorId: Int): Unit = {
     processors.remove(processorId)
-    removeMetric(responseQueueSizeMetricName, Map(ProcessorMetricTag -> 
processorId.toString))
+    metricsGroup.removeMetric(responseQueueSizeMetricName, 
Map(ProcessorMetricTag -> processorId.toString).asJava)
   }
 
   /** Send a request to be handled, potentially blocking until there is room 
in the queue for the request */
@@ -497,51 +501,59 @@ object RequestMetrics {
   val ErrorsPerSec = "ErrorsPerSec"
 }
 
-class RequestMetrics(name: String) extends KafkaMetricsGroup {
+class RequestMetrics(name: String) {
 
   import RequestMetrics._
 
-  val tags = Map("request" -> name)
+  private val metricsGroup = new KafkaMetricsGroup(this.getClass)
+
+  val tags = Map("request" -> name).asJava
   val requestRateInternal = new Pool[Short, Meter]()
   // time a request spent in a request queue
-  val requestQueueTimeHist = newHistogram(RequestQueueTimeMs, biased = true, 
tags)
+  val requestQueueTimeHist = metricsGroup.newHistogram(RequestQueueTimeMs, 
true, tags)
   // time a request takes to be processed at the local broker
-  val localTimeHist = newHistogram(LocalTimeMs, biased = true, tags)
+  val localTimeHist = metricsGroup.newHistogram(LocalTimeMs, true, tags)
   // time a request takes to wait on remote brokers (currently only relevant 
to fetch and produce requests)
-  val remoteTimeHist = newHistogram(RemoteTimeMs, biased = true, tags)
+  val remoteTimeHist = metricsGroup.newHistogram(RemoteTimeMs, true, tags)
   // time a request is throttled, not part of the request processing time 
(throttling is done at the client level
   // for clients that support KIP-219 and by muting the channel for the rest)
-  val throttleTimeHist = newHistogram(ThrottleTimeMs, biased = true, tags)
+  val throttleTimeHist = metricsGroup.newHistogram(ThrottleTimeMs, true, tags)
   // time a response spent in a response queue
-  val responseQueueTimeHist = newHistogram(ResponseQueueTimeMs, biased = true, 
tags)
+  val responseQueueTimeHist = metricsGroup.newHistogram(ResponseQueueTimeMs, 
true, tags)
   // time to send the response to the requester
-  val responseSendTimeHist = newHistogram(ResponseSendTimeMs, biased = true, 
tags)
-  val totalTimeHist = newHistogram(TotalTimeMs, biased = true, tags)
+  val responseSendTimeHist = metricsGroup.newHistogram(ResponseSendTimeMs, 
true, tags)
+  val totalTimeHist = metricsGroup.newHistogram(TotalTimeMs, true, tags)
   // request size in bytes
-  val requestBytesHist = newHistogram(RequestBytes, biased = true, tags)
+  val requestBytesHist = metricsGroup.newHistogram(RequestBytes, true, tags)
   // time for message conversions (only relevant to fetch and produce requests)
   val messageConversionsTimeHist =
     if (name == ApiKeys.FETCH.name || name == ApiKeys.PRODUCE.name)
-      Some(newHistogram(MessageConversionsTimeMs, biased = true, tags))
+      Some(metricsGroup.newHistogram(MessageConversionsTimeMs, true, tags))
     else
       None
   // Temporary memory allocated for processing request (only populated for 
fetch and produce requests)
   // This shows the memory allocated for compression/conversions excluding the 
actual request size
   val tempMemoryBytesHist =
     if (name == ApiKeys.FETCH.name || name == ApiKeys.PRODUCE.name)
-      Some(newHistogram(TemporaryMemoryBytes, biased = true, tags))
+      Some(metricsGroup.newHistogram(TemporaryMemoryBytes, true, tags))
     else
       None
 
   private val errorMeters = mutable.Map[Errors, ErrorMeter]()
   Errors.values.foreach(error => errorMeters.put(error, new ErrorMeter(name, 
error)))
 
-  def requestRate(version: Short): Meter = {
-    requestRateInternal.getAndMaybePut(version, newMeter(RequestsPerSec, 
"requests", TimeUnit.SECONDS, tags + ("version" -> version.toString)))
+  def requestRate(version: Short): Meter =
+    requestRateInternal.getAndMaybePut(version, 
metricsGroup.newMeter(RequestsPerSec, "requests", TimeUnit.SECONDS, 
tagsWithVersion(version)))
+
+  private def tagsWithVersion(version: Short): java.util.Map[String, String] = 
{
+    val nameAndVersionTags = new util.HashMap[String, String](tags.size() + 1)
+    nameAndVersionTags.putAll(tags)
+    nameAndVersionTags.put("version", version.toString)
+    nameAndVersionTags
   }
 
   class ErrorMeter(name: String, error: Errors) {
-    private val tags = Map("request" -> name, "error" -> error.name)
+    private val tags = Map("request" -> name, "error" -> error.name).asJava
 
     @volatile private var meter: Meter = _
 
@@ -551,7 +563,7 @@ class RequestMetrics(name: String) extends 
KafkaMetricsGroup {
       else {
         synchronized {
           if (meter == null)
-             meter = newMeter(ErrorsPerSec, "requests", TimeUnit.SECONDS, tags)
+             meter = metricsGroup.newMeter(ErrorsPerSec, "requests", 
TimeUnit.SECONDS, tags)
           meter
         }
       }
@@ -560,7 +572,7 @@ class RequestMetrics(name: String) extends 
KafkaMetricsGroup {
     def removeMeter(): Unit = {
       synchronized {
         if (meter != null) {
-          removeMetric(ErrorsPerSec, tags)
+          metricsGroup.removeMetric(ErrorsPerSec, tags)
           meter = null
         }
       }
@@ -572,19 +584,21 @@ class RequestMetrics(name: String) extends 
KafkaMetricsGroup {
   }
 
   def removeMetrics(): Unit = {
-    for (version <- requestRateInternal.keys) removeMetric(RequestsPerSec, 
tags + ("version" -> version.toString))
-    removeMetric(RequestQueueTimeMs, tags)
-    removeMetric(LocalTimeMs, tags)
-    removeMetric(RemoteTimeMs, tags)
-    removeMetric(RequestsPerSec, tags)
-    removeMetric(ThrottleTimeMs, tags)
-    removeMetric(ResponseQueueTimeMs, tags)
-    removeMetric(TotalTimeMs, tags)
-    removeMetric(ResponseSendTimeMs, tags)
-    removeMetric(RequestBytes, tags)
+    for (version <- requestRateInternal.keys) {
+      metricsGroup.removeMetric(RequestsPerSec, tagsWithVersion(version))
+    }
+    metricsGroup.removeMetric(RequestQueueTimeMs, tags)
+    metricsGroup.removeMetric(LocalTimeMs, tags)
+    metricsGroup.removeMetric(RemoteTimeMs, tags)
+    metricsGroup.removeMetric(RequestsPerSec, tags)
+    metricsGroup.removeMetric(ThrottleTimeMs, tags)
+    metricsGroup.removeMetric(ResponseQueueTimeMs, tags)
+    metricsGroup.removeMetric(TotalTimeMs, tags)
+    metricsGroup.removeMetric(ResponseSendTimeMs, tags)
+    metricsGroup.removeMetric(RequestBytes, tags)
     if (name == ApiKeys.FETCH.name || name == ApiKeys.PRODUCE.name) {
-      removeMetric(MessageConversionsTimeMs, tags)
-      removeMetric(TemporaryMemoryBytes, tags)
+      metricsGroup.removeMetric(MessageConversionsTimeMs, tags)
+      metricsGroup.removeMetric(TemporaryMemoryBytes, tags)
     }
     errorMeters.values.foreach(_.removeMeter())
     errorMeters.clear()
diff --git a/core/src/main/scala/kafka/network/SocketServer.scala 
b/core/src/main/scala/kafka/network/SocketServer.scala
index 6807a2a82fc..feca5fc68b4 100644
--- a/core/src/main/scala/kafka/network/SocketServer.scala
+++ b/core/src/main/scala/kafka/network/SocketServer.scala
@@ -25,7 +25,6 @@ import java.util
 import java.util.concurrent._
 import java.util.concurrent.atomic._
 import kafka.cluster.{BrokerEndPoint, EndPoint}
-import kafka.metrics.KafkaMetricsGroup
 import kafka.network.ConnectionQuotas._
 import kafka.network.Processor._
 import kafka.network.RequestChannel.{CloseConnectionResponse, 
EndThrottlingResponse, NoOpResponse, SendResponse, StartThrottlingResponse}
@@ -47,6 +46,7 @@ import org.apache.kafka.common.requests.{ApiVersionsRequest, 
RequestContext, Req
 import org.apache.kafka.common.security.auth.SecurityProtocol
 import org.apache.kafka.common.utils.{KafkaThread, LogContext, Time, Utils}
 import org.apache.kafka.common.{Endpoint, KafkaException, MetricName, 
Reconfigurable}
+import org.apache.kafka.server.metrics.KafkaMetricsGroup
 import org.apache.kafka.server.util.FutureUtils
 import org.slf4j.event.Level
 
@@ -78,7 +78,9 @@ class SocketServer(val config: KafkaConfig,
                    val time: Time,
                    val credentialProvider: CredentialProvider,
                    val apiVersionManager: ApiVersionManager)
-  extends Logging with KafkaMetricsGroup with BrokerReconfigurable {
+  extends Logging with BrokerReconfigurable {
+
+  private val metricsGroup = new KafkaMetricsGroup(this.getClass)
 
   private val maxQueuedRequests = config.queuedMaxRequests
 
@@ -115,7 +117,7 @@ class SocketServer(val config: KafkaConfig,
   private var stopped = false
 
   // Socket server metrics
-  newGauge(s"${DataPlaneAcceptor.MetricPrefix}NetworkProcessorAvgIdlePercent", 
() => SocketServer.this.synchronized {
+  
metricsGroup.newGauge(s"${DataPlaneAcceptor.MetricPrefix}NetworkProcessorAvgIdlePercent",
 () => SocketServer.this.synchronized {
     val dataPlaneProcessors = dataPlaneAcceptors.asScala.values.flatMap(a => 
a.processors)
     val ioWaitRatioMetricNames = dataPlaneProcessors.map { p =>
       metrics.metricName("io-wait-ratio", MetricsGroup, p.metricTags)
@@ -129,7 +131,7 @@ class SocketServer(val config: KafkaConfig,
     }
   })
   if (config.requiresZookeeper) {
-    
newGauge(s"${ControlPlaneAcceptor.MetricPrefix}NetworkProcessorAvgIdlePercent", 
() => SocketServer.this.synchronized {
+    
metricsGroup.newGauge(s"${ControlPlaneAcceptor.MetricPrefix}NetworkProcessorAvgIdlePercent",
 () => SocketServer.this.synchronized {
       val controlPlaneProcessorOpt = controlPlaneAcceptorOpt.map(a => 
a.processors(0))
       val ioWaitRatioMetricName = controlPlaneProcessorOpt.map { p =>
         metrics.metricName("io-wait-ratio", MetricsGroup, p.metricTags)
@@ -139,9 +141,9 @@ class SocketServer(val config: KafkaConfig,
       }.getOrElse(Double.NaN)
     })
   }
-  newGauge("MemoryPoolAvailable", () => memoryPool.availableMemory)
-  newGauge("MemoryPoolUsed", () => memoryPool.size() - 
memoryPool.availableMemory)
-  newGauge(s"${DataPlaneAcceptor.MetricPrefix}ExpiredConnectionsKilledCount", 
() => SocketServer.this.synchronized {
+  metricsGroup.newGauge("MemoryPoolAvailable", () => 
memoryPool.availableMemory)
+  metricsGroup.newGauge("MemoryPoolUsed", () => memoryPool.size() - 
memoryPool.availableMemory)
+  
metricsGroup.newGauge(s"${DataPlaneAcceptor.MetricPrefix}ExpiredConnectionsKilledCount",
 () => SocketServer.this.synchronized {
     val dataPlaneProcessors = dataPlaneAcceptors.asScala.values.flatMap(a => 
a.processors)
     val expiredConnectionsKilledCountMetricNames = dataPlaneProcessors.map { p 
=>
       metrics.metricName("expired-connections-killed-count", MetricsGroup, 
p.metricTags)
@@ -151,7 +153,7 @@ class SocketServer(val config: KafkaConfig,
     }.sum
   })
   if (config.requiresZookeeper) {
-    
newGauge(s"${ControlPlaneAcceptor.MetricPrefix}ExpiredConnectionsKilledCount", 
() => SocketServer.this.synchronized {
+    
metricsGroup.newGauge(s"${ControlPlaneAcceptor.MetricPrefix}ExpiredConnectionsKilledCount",
 () => SocketServer.this.synchronized {
       val controlPlaneProcessorOpt = controlPlaneAcceptorOpt.map(a => 
a.processors(0))
       val expiredConnectionsKilledCountMetricNames = 
controlPlaneProcessorOpt.map { p =>
         metrics.metricName("expired-connections-killed-count", MetricsGroup, 
p.metricTags)
@@ -579,7 +581,9 @@ private[kafka] abstract class Acceptor(val socketServer: 
SocketServer,
                                        logContext: LogContext,
                                        memoryPool: MemoryPool,
                                        apiVersionManager: ApiVersionManager)
-  extends Runnable with Logging with KafkaMetricsGroup {
+  extends Runnable with Logging {
+
+  private val metricsGroup = new KafkaMetricsGroup(this.getClass)
 
   val shouldRun = new AtomicBoolean(true)
 
@@ -608,12 +612,12 @@ private[kafka] abstract class Acceptor(val socketServer: 
SocketServer,
 
   private[network] val processors = new ArrayBuffer[Processor]()
   // Build the metric name explicitly in order to keep the existing name for 
compatibility
-  private val blockedPercentMeterMetricName = explicitMetricName(
+  private val blockedPercentMeterMetricName = 
KafkaMetricsGroup.explicitMetricName(
     "kafka.network",
     "Acceptor",
     s"${metricPrefix()}AcceptorBlockedPercent",
-    Map(ListenerMetricTag -> endPoint.listenerName.value))
-  private val blockedPercentMeter = 
newMeter(blockedPercentMeterMetricName,"blocked time", TimeUnit.NANOSECONDS)
+    Map(ListenerMetricTag -> endPoint.listenerName.value).asJava)
+  private val blockedPercentMeter = 
metricsGroup.newMeter(blockedPercentMeterMetricName,"blocked time", 
TimeUnit.NANOSECONDS)
   private var currentProcessorIndex = 0
   private[network] val throttledSockets = new 
mutable.PriorityQueue[DelayedCloseSocket]()
   private var started = false
@@ -911,7 +915,9 @@ private[kafka] class Processor(
   isPrivilegedListener: Boolean,
   apiVersionManager: ApiVersionManager,
   threadName: String
-) extends Runnable with KafkaMetricsGroup {
+) extends Runnable with Logging {
+  private val metricsGroup = new KafkaMetricsGroup(this.getClass)
+
   val shouldRun = new AtomicBoolean(true)
 
   val thread = KafkaThread.nonDaemon(threadName, this)
@@ -940,13 +946,13 @@ private[kafka] class Processor(
     NetworkProcessorMetricTag -> id.toString
   ).asJava
 
-  newGauge(IdlePercentMetricName, () => {
+  metricsGroup.newGauge(IdlePercentMetricName, () => {
     Option(metrics.metric(metrics.metricName("io-wait-ratio", MetricsGroup, 
metricTags))).fold(0.0)(m =>
       Math.min(m.metricValue.asInstanceOf[Double], 1.0))
   },
     // for compatibility, only add a networkProcessor tag to the Yammer 
Metrics alias (the equivalent Selector metric
     // also includes the listener name)
-    Map(NetworkProcessorMetricTag -> id.toString)
+    Map(NetworkProcessorMetricTag -> id.toString).asJava
   )
 
   val expiredConnectionsKilledCount = new CumulativeSum()
@@ -1299,7 +1305,7 @@ private[kafka] class Processor(
       close(channel.id)
     }
     selector.close()
-    removeMetric(IdlePercentMetricName, Map(NetworkProcessorMetricTag -> 
id.toString))
+    metricsGroup.removeMetric(IdlePercentMetricName, 
Map(NetworkProcessorMetricTag -> id.toString).asJava)
   }
 
   // 'protected` to allow override for testing
@@ -1367,7 +1373,7 @@ private[kafka] class Processor(
       beginShutdown()
       thread.join()
     } finally {
-      removeMetric("IdlePercent", Map("networkProcessor" -> id.toString))
+      metricsGroup.removeMetric("IdlePercent", Map("networkProcessor" -> 
id.toString).asJava)
       metrics.removeMetric(expiredConnectionsKilledCountMetricName)
     }
   }
diff --git a/core/src/main/scala/kafka/server/AbstractFetcherManager.scala 
b/core/src/main/scala/kafka/server/AbstractFetcherManager.scala
index ddc45693f87..eced213734b 100755
--- a/core/src/main/scala/kafka/server/AbstractFetcherManager.scala
+++ b/core/src/main/scala/kafka/server/AbstractFetcherManager.scala
@@ -18,16 +18,19 @@
 package kafka.server
 
 import kafka.cluster.BrokerEndPoint
-import kafka.metrics.KafkaMetricsGroup
 import kafka.utils.Implicits._
 import kafka.utils.Logging
 import org.apache.kafka.common.{TopicPartition, Uuid}
 import org.apache.kafka.common.utils.Utils
+import org.apache.kafka.server.metrics.KafkaMetricsGroup
 
 import scala.collection.{Map, Set, mutable}
+import scala.jdk.CollectionConverters._
 
 abstract class AbstractFetcherManager[T <: AbstractFetcherThread](val name: 
String, clientId: String, numFetchers: Int)
-  extends Logging with KafkaMetricsGroup {
+  extends Logging {
+  private val metricsGroup = new KafkaMetricsGroup(this.getClass)
+
   // map of (source broker_id, fetcher_id per source broker) => fetcher.
   // package private for test
   private[server] val fetcherThreadMap = new 
mutable.HashMap[BrokerIdAndFetcherId, T]
@@ -36,9 +39,9 @@ abstract class AbstractFetcherManager[T <: 
AbstractFetcherThread](val name: Stri
   val failedPartitions = new FailedPartitions
   this.logIdent = "[" + name + "] "
 
-  private val tags = Map("clientId" -> clientId)
+  private val tags = Map("clientId" -> clientId).asJava
 
-  newGauge("MaxLag", () => {
+  metricsGroup.newGauge("MaxLag", () => {
     // current max lag across all fetchers/topics/partitions
     fetcherThreadMap.values.foldLeft(0L) { (curMaxLagAll, fetcherThread) =>
       val maxLagThread = 
fetcherThread.fetcherLagStats.stats.values.foldLeft(0L)((curMaxLagThread, 
lagMetrics) =>
@@ -47,16 +50,16 @@ abstract class AbstractFetcherManager[T <: 
AbstractFetcherThread](val name: Stri
     }
   }, tags)
 
-  newGauge("MinFetchRate", () => {
+  metricsGroup.newGauge("MinFetchRate", () => {
     // current min fetch rate across all fetchers/topics/partitions
     val headRate = 
fetcherThreadMap.values.headOption.map(_.fetcherStats.requestRate.oneMinuteRate).getOrElse(0.0)
     fetcherThreadMap.values.foldLeft(headRate)((curMinAll, fetcherThread) =>
       math.min(curMinAll, 
fetcherThread.fetcherStats.requestRate.oneMinuteRate))
   }, tags)
 
-  newGauge("FailedPartitionsCount", () => failedPartitions.size, tags)
+  metricsGroup.newGauge("FailedPartitionsCount", () => failedPartitions.size, 
tags)
 
-  newGauge("DeadThreadCount", () => deadThreadCount, tags)
+  metricsGroup.newGauge("DeadThreadCount", () => deadThreadCount, tags)
 
   private[server] def deadThreadCount: Int = lock synchronized { 
fetcherThreadMap.values.count(_.isThreadFailed) }
 
diff --git a/core/src/main/scala/kafka/server/AbstractFetcherThread.scala 
b/core/src/main/scala/kafka/server/AbstractFetcherThread.scala
index 2176ee3518b..a1c5d0ed2fa 100755
--- a/core/src/main/scala/kafka/server/AbstractFetcherThread.scala
+++ b/core/src/main/scala/kafka/server/AbstractFetcherThread.scala
@@ -18,7 +18,6 @@
 package kafka.server
 
 import kafka.common.ClientIdAndBroker
-import kafka.metrics.KafkaMetricsGroup
 import kafka.server.AbstractFetcherThread.{ReplicaFetch, ResultWithPartitions}
 import kafka.utils.CoreUtils.inLock
 import kafka.utils.Implicits._
@@ -34,6 +33,7 @@ import 
org.apache.kafka.common.requests.OffsetsForLeaderEpochResponse.{UNDEFINED
 import org.apache.kafka.common.requests._
 import org.apache.kafka.common.{InvalidRecordException, TopicPartition, Uuid}
 import org.apache.kafka.server.common.OffsetAndEpoch
+import org.apache.kafka.server.metrics.KafkaMetricsGroup
 import org.apache.kafka.server.util.ShutdownableThread
 import org.apache.kafka.storage.internals.log.LogAppendInfo
 
@@ -858,15 +858,16 @@ object FetcherMetrics {
   val BytesPerSec = "BytesPerSec"
 }
 
-class FetcherLagMetrics(metricId: ClientIdTopicPartition) extends 
KafkaMetricsGroup {
+class FetcherLagMetrics(metricId: ClientIdTopicPartition) {
+  private val metricsGroup = new KafkaMetricsGroup(this.getClass)
 
   private[this] val lagVal = new AtomicLong(-1L)
   private[this] val tags = Map(
     "clientId" -> metricId.clientId,
     "topic" -> metricId.topicPartition.topic,
-    "partition" -> metricId.topicPartition.partition.toString)
+    "partition" -> metricId.topicPartition.partition.toString).asJava
 
-  newGauge(FetcherMetrics.ConsumerLag, () => lagVal.get, tags)
+  metricsGroup.newGauge(FetcherMetrics.ConsumerLag, () => lagVal.get, tags)
 
   def lag_=(newLag: Long): Unit = {
     lagVal.set(newLag)
@@ -875,7 +876,7 @@ class FetcherLagMetrics(metricId: ClientIdTopicPartition) 
extends KafkaMetricsGr
   def lag = lagVal.get
 
   def unregister(): Unit = {
-    removeMetric(FetcherMetrics.ConsumerLag, tags)
+    metricsGroup.removeMetric(FetcherMetrics.ConsumerLag, tags)
   }
 }
 
@@ -899,18 +900,20 @@ class FetcherLagStats(metricId: ClientIdAndBroker) {
   }
 }
 
-class FetcherStats(metricId: ClientIdAndBroker) extends KafkaMetricsGroup {
+class FetcherStats(metricId: ClientIdAndBroker) {
+  private val metricsGroup = new KafkaMetricsGroup(this.getClass)
+
   val tags = Map("clientId" -> metricId.clientId,
     "brokerHost" -> metricId.brokerHost,
-    "brokerPort" -> metricId.brokerPort.toString)
+    "brokerPort" -> metricId.brokerPort.toString).asJava
 
-  val requestRate = newMeter(FetcherMetrics.RequestsPerSec, "requests", 
TimeUnit.SECONDS, tags)
+  val requestRate = metricsGroup.newMeter(FetcherMetrics.RequestsPerSec, 
"requests", TimeUnit.SECONDS, tags)
 
-  val byteRate = newMeter(FetcherMetrics.BytesPerSec, "bytes", 
TimeUnit.SECONDS, tags)
+  val byteRate = metricsGroup.newMeter(FetcherMetrics.BytesPerSec, "bytes", 
TimeUnit.SECONDS, tags)
 
   def unregister(): Unit = {
-    removeMetric(FetcherMetrics.RequestsPerSec, tags)
-    removeMetric(FetcherMetrics.BytesPerSec, tags)
+    metricsGroup.removeMetric(FetcherMetrics.RequestsPerSec, tags)
+    metricsGroup.removeMetric(FetcherMetrics.BytesPerSec, tags)
   }
 
 }
diff --git a/core/src/main/scala/kafka/server/AlterPartitionManager.scala 
b/core/src/main/scala/kafka/server/AlterPartitionManager.scala
index 9f844e1958a..f078815734d 100644
--- a/core/src/main/scala/kafka/server/AlterPartitionManager.scala
+++ b/core/src/main/scala/kafka/server/AlterPartitionManager.scala
@@ -20,7 +20,6 @@ import java.util
 import java.util.concurrent.atomic.AtomicBoolean
 import java.util.concurrent.{CompletableFuture, ConcurrentHashMap}
 import kafka.api.LeaderAndIsr
-import kafka.metrics.KafkaMetricsGroup
 import kafka.utils.Logging
 import kafka.zk.KafkaZkClient
 import org.apache.kafka.clients.ClientResponse
@@ -123,7 +122,7 @@ class DefaultAlterPartitionManager(
   val brokerId: Int,
   val brokerEpochSupplier: () => Long,
   val metadataVersionSupplier: () => MetadataVersion
-) extends AlterPartitionManager with Logging with KafkaMetricsGroup {
+) extends AlterPartitionManager with Logging {
 
   // Used to allow only one pending ISR update per partition (visible for 
testing).
   // Note that we key items by TopicPartition despite using TopicIdPartition 
while
diff --git a/core/src/main/scala/kafka/server/ControllerServer.scala 
b/core/src/main/scala/kafka/server/ControllerServer.scala
index b1a15d97741..c231486e41e 100644
--- a/core/src/main/scala/kafka/server/ControllerServer.scala
+++ b/core/src/main/scala/kafka/server/ControllerServer.scala
@@ -18,7 +18,7 @@
 package kafka.server
 
 import kafka.cluster.Broker.ServerInfo
-import kafka.metrics.{KafkaMetricsGroup, LinuxIoMetricsCollector}
+import kafka.metrics.LinuxIoMetricsCollector
 import kafka.migration.MigrationPropagator
 import kafka.network.{DataPlaneAcceptor, SocketServer}
 import kafka.raft.KafkaRaftManager
@@ -44,7 +44,7 @@ import 
org.apache.kafka.metadata.migration.{KRaftMigrationDriver, LegacyPropagat
 import org.apache.kafka.raft.RaftConfig
 import org.apache.kafka.server.authorizer.Authorizer
 import org.apache.kafka.server.common.ApiMessageAndVersion
-import org.apache.kafka.server.metrics.KafkaYammerMetrics
+import org.apache.kafka.server.metrics.{KafkaMetricsGroup, KafkaYammerMetrics}
 import org.apache.kafka.server.policy.{AlterConfigPolicy, CreateTopicPolicy}
 import org.apache.kafka.server.util.{Deadline, FutureUtils}
 
@@ -80,10 +80,12 @@ class ControllerServer(
   val sharedServer: SharedServer,
   val configSchema: KafkaConfigSchema,
   val bootstrapMetadata: BootstrapMetadata,
-) extends Logging with KafkaMetricsGroup {
+) extends Logging {
 
   import kafka.server.Server._
 
+  private val metricsGroup = new KafkaMetricsGroup(this.getClass)
+
   val config = sharedServer.controllerConfig
   val time = sharedServer.time
   def metrics = sharedServer.metrics
@@ -134,13 +136,13 @@ class ControllerServer(
       maybeChangeStatus(STARTING, STARTED)
       this.logIdent = new LogContext(s"[ControllerServer id=${config.nodeId}] 
").logPrefix()
 
-      newGauge("ClusterId", () => clusterId)
-      newGauge("yammer-metrics-count", () =>  
KafkaYammerMetrics.defaultRegistry.allMetrics.size)
+      metricsGroup.newGauge("ClusterId", () => clusterId)
+      metricsGroup.newGauge("yammer-metrics-count", () =>  
KafkaYammerMetrics.defaultRegistry.allMetrics.size)
 
       linuxIoMetricsCollector = new LinuxIoMetricsCollector("/proc", time, 
logger.underlying)
       if (linuxIoMetricsCollector.usable()) {
-        newGauge("linux-disk-read-bytes", () => 
linuxIoMetricsCollector.readBytes())
-        newGauge("linux-disk-write-bytes", () => 
linuxIoMetricsCollector.writeBytes())
+        metricsGroup.newGauge("linux-disk-read-bytes", () => 
linuxIoMetricsCollector.readBytes())
+        metricsGroup.newGauge("linux-disk-write-bytes", () => 
linuxIoMetricsCollector.writeBytes())
       }
 
       val javaListeners = config.controllerListeners.map(_.toJava).asJava
diff --git a/core/src/main/scala/kafka/server/DelayedDeleteRecords.scala 
b/core/src/main/scala/kafka/server/DelayedDeleteRecords.scala
index 317d0b89c37..ef0eceeef94 100644
--- a/core/src/main/scala/kafka/server/DelayedDeleteRecords.scala
+++ b/core/src/main/scala/kafka/server/DelayedDeleteRecords.scala
@@ -20,12 +20,12 @@ package kafka.server
 
 import java.util.concurrent.TimeUnit
 
-import kafka.metrics.KafkaMetricsGroup
 import kafka.utils.Implicits._
 import org.apache.kafka.common.TopicPartition
 import org.apache.kafka.common.message.DeleteRecordsResponseData
 import org.apache.kafka.common.protocol.Errors
 import org.apache.kafka.common.requests.DeleteRecordsResponse
+import org.apache.kafka.server.metrics.KafkaMetricsGroup
 
 import scala.collection._
 
@@ -122,12 +122,12 @@ class DelayedDeleteRecords(delayMs: Long,
   }
 }
 
-object DelayedDeleteRecordsMetrics extends KafkaMetricsGroup {
+object DelayedDeleteRecordsMetrics {
+  private val metricsGroup = new 
KafkaMetricsGroup(DelayedDeleteRecordsMetrics.getClass)
 
-  private val aggregateExpirationMeter = newMeter("ExpiresPerSec", "requests", 
TimeUnit.SECONDS)
+  private val aggregateExpirationMeter = 
metricsGroup.newMeter("ExpiresPerSec", "requests", TimeUnit.SECONDS)
 
   def recordExpiration(partition: TopicPartition): Unit = {
     aggregateExpirationMeter.mark()
   }
 }
-
diff --git a/core/src/main/scala/kafka/server/DelayedFetch.scala 
b/core/src/main/scala/kafka/server/DelayedFetch.scala
index 9106cff910c..423b7bcd223 100644
--- a/core/src/main/scala/kafka/server/DelayedFetch.scala
+++ b/core/src/main/scala/kafka/server/DelayedFetch.scala
@@ -18,15 +18,16 @@
 package kafka.server
 
 import java.util.concurrent.TimeUnit
-import kafka.metrics.KafkaMetricsGroup
 import org.apache.kafka.common.TopicIdPartition
 import org.apache.kafka.common.errors._
 import org.apache.kafka.common.protocol.Errors
 import org.apache.kafka.common.requests.FetchRequest.PartitionData
 import 
org.apache.kafka.common.requests.OffsetsForLeaderEpochResponse.{UNDEFINED_EPOCH,
 UNDEFINED_EPOCH_OFFSET}
+import org.apache.kafka.server.metrics.KafkaMetricsGroup
 import org.apache.kafka.storage.internals.log.{FetchIsolation, FetchParams, 
FetchPartitionData, LogOffsetMetadata}
 
 import scala.collection._
+import scala.jdk.CollectionConverters._
 
 case class FetchPartitionStatus(startOffsetMetadata: LogOffsetMetadata, 
fetchInfo: PartitionData) {
 
@@ -180,9 +181,10 @@ class DelayedFetch(
   }
 }
 
-object DelayedFetchMetrics extends KafkaMetricsGroup {
+object DelayedFetchMetrics {
+  private val metricsGroup = new 
KafkaMetricsGroup(DelayedFetchMetrics.getClass)
   private val FetcherTypeKey = "fetcherType"
-  val followerExpiredRequestMeter = newMeter("ExpiresPerSec", "requests", 
TimeUnit.SECONDS, tags = Map(FetcherTypeKey -> "follower"))
-  val consumerExpiredRequestMeter = newMeter("ExpiresPerSec", "requests", 
TimeUnit.SECONDS, tags = Map(FetcherTypeKey -> "consumer"))
+  val followerExpiredRequestMeter = metricsGroup.newMeter("ExpiresPerSec", 
"requests", TimeUnit.SECONDS, Map(FetcherTypeKey -> "follower").asJava)
+  val consumerExpiredRequestMeter = metricsGroup.newMeter("ExpiresPerSec", 
"requests", TimeUnit.SECONDS, Map(FetcherTypeKey -> "consumer").asJava)
 }
 
diff --git a/core/src/main/scala/kafka/server/DelayedOperation.scala 
b/core/src/main/scala/kafka/server/DelayedOperation.scala
index 8b2d52fcd31..3bdd00b22e5 100644
--- a/core/src/main/scala/kafka/server/DelayedOperation.scala
+++ b/core/src/main/scala/kafka/server/DelayedOperation.scala
@@ -20,14 +20,15 @@ package kafka.server
 import java.util.concurrent._
 import java.util.concurrent.atomic._
 import java.util.concurrent.locks.{Lock, ReentrantLock}
-import kafka.metrics.KafkaMetricsGroup
 import kafka.utils.CoreUtils.inLock
 import kafka.utils._
 import kafka.utils.timer._
+import org.apache.kafka.server.metrics.KafkaMetricsGroup
 import org.apache.kafka.server.util.ShutdownableThread
 
 import scala.collection._
 import scala.collection.mutable.ListBuffer
+import scala.jdk.CollectionConverters._
 
 /**
  * An operation whose processing needs to be delayed for at most the given 
delayMs. For example
@@ -153,7 +154,9 @@ final class DelayedOperationPurgatory[T <: 
DelayedOperation](purgatoryName: Stri
                                                              purgeInterval: 
Int = 1000,
                                                              reaperEnabled: 
Boolean = true,
                                                              timerEnabled: 
Boolean = true)
-        extends Logging with KafkaMetricsGroup {
+        extends Logging {
+  private val metricsGroup = new KafkaMetricsGroup(this.getClass)
+
   /* a list of operation watching keys */
   private class WatcherList {
     val watchersByKey = new Pool[Any, Watchers](Some((key: Any) => new 
Watchers(key)))
@@ -180,9 +183,9 @@ final class DelayedOperationPurgatory[T <: 
DelayedOperation](purgatoryName: Stri
   /* background thread expiring operations that have timed out */
   private val expirationReaper = new ExpiredOperationReaper()
 
-  private val metricsTags = Map("delayedOperation" -> purgatoryName)
-  newGauge("PurgatorySize", () => watched, metricsTags)
-  newGauge("NumDelayedOperations", () => numDelayed, metricsTags)
+  private val metricsTags = Map("delayedOperation" -> purgatoryName).asJava
+  metricsGroup.newGauge("PurgatorySize", () => watched, metricsTags)
+  metricsGroup.newGauge("NumDelayedOperations", () => numDelayed, metricsTags)
 
   if (reaperEnabled)
     expirationReaper.start()
@@ -338,8 +341,8 @@ final class DelayedOperationPurgatory[T <: 
DelayedOperation](purgatoryName: Stri
       expirationReaper.awaitShutdown()
     }
     timeoutTimer.shutdown()
-    removeMetric("PurgatorySize", metricsTags)
-    removeMetric("NumDelayedOperations", metricsTags)
+    metricsGroup.removeMetric("PurgatorySize", metricsTags)
+    metricsGroup.removeMetric("NumDelayedOperations", metricsTags)
   }
 
   /**
diff --git a/core/src/main/scala/kafka/server/DelayedProduce.scala 
b/core/src/main/scala/kafka/server/DelayedProduce.scala
index 5e7e7bf385c..48a0ccbcff4 100644
--- a/core/src/main/scala/kafka/server/DelayedProduce.scala
+++ b/core/src/main/scala/kafka/server/DelayedProduce.scala
@@ -21,14 +21,15 @@ import java.util.concurrent.TimeUnit
 import java.util.concurrent.locks.Lock
 
 import com.yammer.metrics.core.Meter
-import kafka.metrics.KafkaMetricsGroup
 import kafka.utils.Implicits._
 import kafka.utils.Pool
 import org.apache.kafka.common.TopicPartition
 import org.apache.kafka.common.protocol.Errors
 import org.apache.kafka.common.requests.ProduceResponse.PartitionResponse
+import org.apache.kafka.server.metrics.KafkaMetricsGroup
 
 import scala.collection._
+import scala.jdk.CollectionConverters._
 
 case class ProducePartitionStatus(requiredOffset: Long, responseStatus: 
PartitionResponse) {
   @volatile var acksPending = false
@@ -129,15 +130,16 @@ class DelayedProduce(delayMs: Long,
   }
 }
 
-object DelayedProduceMetrics extends KafkaMetricsGroup {
+object DelayedProduceMetrics {
+  private val metricsGroup = new 
KafkaMetricsGroup(DelayedProduceMetrics.getClass)
 
-  private val aggregateExpirationMeter = newMeter("ExpiresPerSec", "requests", 
TimeUnit.SECONDS)
+  private val aggregateExpirationMeter = 
metricsGroup.newMeter("ExpiresPerSec", "requests", TimeUnit.SECONDS)
 
   private val partitionExpirationMeterFactory = (key: TopicPartition) =>
-    newMeter("ExpiresPerSec",
+    metricsGroup.newMeter("ExpiresPerSec",
              "requests",
              TimeUnit.SECONDS,
-             tags = Map("topic" -> key.topic, "partition" -> 
key.partition.toString))
+             Map("topic" -> key.topic, "partition" -> 
key.partition.toString).asJava)
   private val partitionExpirationMeters = new Pool[TopicPartition, 
Meter](valueFactory = Some(partitionExpirationMeterFactory))
 
   def recordExpiration(partition: TopicPartition): Unit = {
diff --git a/core/src/main/scala/kafka/server/DelegationTokenManager.scala 
b/core/src/main/scala/kafka/server/DelegationTokenManager.scala
index a715718c420..62bd83c72c5 100644
--- a/core/src/main/scala/kafka/server/DelegationTokenManager.scala
+++ b/core/src/main/scala/kafka/server/DelegationTokenManager.scala
@@ -25,7 +25,6 @@ import java.util.Base64
 import javax.crypto.spec.SecretKeySpec
 import javax.crypto.{Mac, SecretKey}
 import kafka.common.{NotificationHandler, ZkNodeChangeNotificationListener}
-import kafka.metrics.KafkaMetricsGroup
 import kafka.utils.{CoreUtils, Json, Logging}
 import kafka.zk.{DelegationTokenChangeNotificationSequenceZNode, 
DelegationTokenChangeNotificationZNode, DelegationTokensZNode, KafkaZkClient}
 import org.apache.kafka.common.protocol.Errors
@@ -167,7 +166,7 @@ object DelegationTokenManager {
 class DelegationTokenManager(val config: KafkaConfig,
                              val tokenCache: DelegationTokenCache,
                              val time: Time,
-                             val zkClient: KafkaZkClient) extends Logging with 
KafkaMetricsGroup {
+                             val zkClient: KafkaZkClient) extends Logging {
   this.logIdent = s"[Token Manager on Broker ${config.brokerId}]: "
 
   import DelegationTokenManager._
diff --git a/core/src/main/scala/kafka/server/FetchSession.scala 
b/core/src/main/scala/kafka/server/FetchSession.scala
index b233a5ae69e..8725a09f090 100644
--- a/core/src/main/scala/kafka/server/FetchSession.scala
+++ b/core/src/main/scala/kafka/server/FetchSession.scala
@@ -17,7 +17,6 @@
 
 package kafka.server
 
-import kafka.metrics.KafkaMetricsGroup
 import kafka.utils.Logging
 import org.apache.kafka.common.{TopicIdPartition, TopicPartition, Uuid}
 import org.apache.kafka.common.message.FetchResponseData
@@ -25,11 +24,12 @@ import org.apache.kafka.common.protocol.Errors
 import org.apache.kafka.common.requests.FetchMetadata.{FINAL_EPOCH, 
INITIAL_EPOCH, INVALID_SESSION_ID}
 import org.apache.kafka.common.requests.{FetchRequest, FetchResponse, 
FetchMetadata => JFetchMetadata}
 import org.apache.kafka.common.utils.{ImplicitLinkedHashCollection, Time, 
Utils}
+import org.apache.kafka.server.metrics.KafkaMetricsGroup
 import java.util
-import java.util.Optional
+import java.util.{Collections, Optional}
 import java.util.concurrent.{ThreadLocalRandom, TimeUnit}
 
-import scala.collection.{mutable, _}
+import scala.collection.mutable
 import scala.math.Ordered.orderingToOrdered
 
 object FetchSession {
@@ -564,7 +564,9 @@ case class EvictableKey(privileged: Boolean, size: Int, id: 
Int) extends Compara
   * @param evictionMs The minimum time that an entry must be unused in order 
to be evictable.
   */
 class FetchSessionCache(private val maxEntries: Int,
-                        private val evictionMs: Long) extends Logging with 
KafkaMetricsGroup {
+                        private val evictionMs: Long) extends Logging {
+  private val metricsGroup = new KafkaMetricsGroup(this.getClass)
+
   private var numPartitions: Long = 0
 
   // A map of session ID to FetchSession.
@@ -581,13 +583,13 @@ class FetchSessionCache(private val maxEntries: Int,
   private val evictableByPrivileged = new util.TreeMap[EvictableKey, 
FetchSession]
 
   // Set up metrics.
-  removeMetric(FetchSession.NUM_INCREMENTAL_FETCH_SESSIONS)
-  newGauge(FetchSession.NUM_INCREMENTAL_FETCH_SESSIONS, () => 
FetchSessionCache.this.size)
-  removeMetric(FetchSession.NUM_INCREMENTAL_FETCH_PARTITIONS_CACHED)
-  newGauge(FetchSession.NUM_INCREMENTAL_FETCH_PARTITIONS_CACHED, () => 
FetchSessionCache.this.totalPartitions)
-  removeMetric(FetchSession.INCREMENTAL_FETCH_SESSIONS_EVICTIONS_PER_SEC)
-  private[server] val evictionsMeter = 
newMeter(FetchSession.INCREMENTAL_FETCH_SESSIONS_EVICTIONS_PER_SEC,
-    FetchSession.EVICTIONS, TimeUnit.SECONDS, Map.empty)
+  metricsGroup.removeMetric(FetchSession.NUM_INCREMENTAL_FETCH_SESSIONS)
+  metricsGroup.newGauge(FetchSession.NUM_INCREMENTAL_FETCH_SESSIONS, () => 
FetchSessionCache.this.size)
+  
metricsGroup.removeMetric(FetchSession.NUM_INCREMENTAL_FETCH_PARTITIONS_CACHED)
+  metricsGroup.newGauge(FetchSession.NUM_INCREMENTAL_FETCH_PARTITIONS_CACHED, 
() => FetchSessionCache.this.totalPartitions)
+  
metricsGroup.removeMetric(FetchSession.INCREMENTAL_FETCH_SESSIONS_EVICTIONS_PER_SEC)
+  private[server] val evictionsMeter = 
metricsGroup.newMeter(FetchSession.INCREMENTAL_FETCH_SESSIONS_EVICTIONS_PER_SEC,
+    FetchSession.EVICTIONS, TimeUnit.SECONDS, Collections.emptyMap())
 
   /**
     * Get a session by session ID.
diff --git a/core/src/main/scala/kafka/server/KafkaBroker.scala 
b/core/src/main/scala/kafka/server/KafkaBroker.scala
index db5da9bd906..cb66a9eb089 100644
--- a/core/src/main/scala/kafka/server/KafkaBroker.scala
+++ b/core/src/main/scala/kafka/server/KafkaBroker.scala
@@ -19,9 +19,10 @@ package kafka.server
 
 import com.yammer.metrics.core.MetricName
 import kafka.log.LogManager
-import kafka.metrics.{KafkaMetricsGroup, LinuxIoMetricsCollector}
+import kafka.metrics.LinuxIoMetricsCollector
 import kafka.network.SocketServer
 import kafka.security.CredentialProvider
+import kafka.utils.Logging
 import org.apache.kafka.common.ClusterResource
 import org.apache.kafka.common.internals.ClusterResourceListeners
 import org.apache.kafka.common.metrics.{Metrics, MetricsReporter}
@@ -30,9 +31,10 @@ import org.apache.kafka.common.utils.Time
 import org.apache.kafka.coordinator.group.GroupCoordinator
 import org.apache.kafka.metadata.BrokerState
 import org.apache.kafka.server.authorizer.Authorizer
-import org.apache.kafka.server.metrics.KafkaYammerMetrics
+import org.apache.kafka.server.metrics.{KafkaMetricsGroup, KafkaYammerMetrics}
 import org.apache.kafka.server.util.Scheduler
 
+import java.util
 import scala.collection.Seq
 import scala.jdk.CollectionConverters._
 
@@ -66,7 +68,7 @@ object KafkaBroker {
   val STARTED_MESSAGE = "Kafka Server started"
 }
 
-trait KafkaBroker extends KafkaMetricsGroup {
+trait KafkaBroker extends Logging {
 
   def authorizer: Option[Authorizer]
   def brokerState: BrokerState
@@ -91,20 +93,22 @@ trait KafkaBroker extends KafkaMetricsGroup {
   def credentialProvider: CredentialProvider
   def clientToControllerChannelManager: BrokerToControllerChannelManager
 
-  // For backwards compatibility, we need to keep older metrics tied
-  // to their original name when this class was named `KafkaServer`
-  override def metricName(name: String, metricTags: 
scala.collection.Map[String, String]): MetricName = {
-    explicitMetricName(Server.MetricsPrefix, KafkaBroker.MetricsTypeName, 
name, metricTags)
+  private val metricsGroup = new KafkaMetricsGroup(this.getClass) {
+    // For backwards compatibility, we need to keep older metrics tied
+    // to their original name when this class was named `KafkaServer`
+    override def metricName(name: String, tags: util.Map[String, String]): 
MetricName = {
+      KafkaMetricsGroup.explicitMetricName(Server.MetricsPrefix, 
KafkaBroker.MetricsTypeName, name, tags)
+    }
   }
 
-  newGauge("BrokerState", () => brokerState.value)
-  newGauge("ClusterId", () => clusterId)
-  newGauge("yammer-metrics-count", () =>  
KafkaYammerMetrics.defaultRegistry.allMetrics.size)
+  metricsGroup.newGauge("BrokerState", () => brokerState.value)
+  metricsGroup.newGauge("ClusterId", () => clusterId)
+  metricsGroup.newGauge("yammer-metrics-count", () =>  
KafkaYammerMetrics.defaultRegistry.allMetrics.size)
 
   private val linuxIoMetricsCollector = new LinuxIoMetricsCollector("/proc", 
Time.SYSTEM, logger.underlying)
 
   if (linuxIoMetricsCollector.usable()) {
-    newGauge("linux-disk-read-bytes", () => 
linuxIoMetricsCollector.readBytes())
-    newGauge("linux-disk-write-bytes", () => 
linuxIoMetricsCollector.writeBytes())
+    metricsGroup.newGauge("linux-disk-read-bytes", () => 
linuxIoMetricsCollector.readBytes())
+    metricsGroup.newGauge("linux-disk-write-bytes", () => 
linuxIoMetricsCollector.writeBytes())
   }
 }
diff --git a/core/src/main/scala/kafka/server/KafkaRequestHandler.scala 
b/core/src/main/scala/kafka/server/KafkaRequestHandler.scala
index 4d38c6efc3f..bd5ea797fe2 100755
--- a/core/src/main/scala/kafka/server/KafkaRequestHandler.scala
+++ b/core/src/main/scala/kafka/server/KafkaRequestHandler.scala
@@ -19,14 +19,15 @@ package kafka.server
 
 import kafka.network._
 import kafka.utils._
-import kafka.metrics.KafkaMetricsGroup
 
 import java.util.concurrent.{CountDownLatch, TimeUnit}
 import java.util.concurrent.atomic.AtomicInteger
 import com.yammer.metrics.core.Meter
 import org.apache.kafka.common.internals.FatalExitError
 import org.apache.kafka.common.utils.{KafkaThread, Time}
+import org.apache.kafka.server.metrics.KafkaMetricsGroup
 
+import java.util.Collections
 import scala.collection.mutable
 import scala.jdk.CollectionConverters._
 
@@ -109,11 +110,12 @@ class KafkaRequestHandlerPool(val brokerId: Int,
                               time: Time,
                               numThreads: Int,
                               requestHandlerAvgIdleMetricName: String,
-                              logAndThreadNamePrefix : String) extends Logging 
with KafkaMetricsGroup {
+                              logAndThreadNamePrefix : String) extends Logging 
{
+  private val metricsGroup = new KafkaMetricsGroup(this.getClass)
 
   private val threadPoolSize: AtomicInteger = new AtomicInteger(numThreads)
   /* a meter to track the average free capacity of the request handlers */
-  private val aggregateIdleMeter = newMeter(requestHandlerAvgIdleMetricName, 
"percent", TimeUnit.NANOSECONDS)
+  private val aggregateIdleMeter = 
metricsGroup.newMeter(requestHandlerAvgIdleMetricName, "percent", 
TimeUnit.NANOSECONDS)
 
   this.logIdent = "[" + logAndThreadNamePrefix + " Kafka Request Handler on 
Broker " + brokerId + "], "
   val runnables = new mutable.ArrayBuffer[KafkaRequestHandler](numThreads)
@@ -151,10 +153,12 @@ class KafkaRequestHandlerPool(val brokerId: Int,
   }
 }
 
-class BrokerTopicMetrics(name: Option[String]) extends KafkaMetricsGroup {
-  val tags: scala.collection.Map[String, String] = name match {
-    case None => Map.empty
-    case Some(topic) => Map("topic" -> topic)
+class BrokerTopicMetrics(name: Option[String]) {
+  private val metricsGroup = new KafkaMetricsGroup(this.getClass)
+
+  val tags: java.util.Map[String, String] = name match {
+    case None => Collections.emptyMap()
+    case Some(topic) => Map("topic" -> topic).asJava
   }
 
   case class MeterWrapper(metricType: String, eventType: String) {
@@ -167,7 +171,7 @@ class BrokerTopicMetrics(name: Option[String]) extends 
KafkaMetricsGroup {
         meterLock synchronized {
           meter = lazyMeter
           if (meter == null) {
-            meter = newMeter(metricType, eventType, TimeUnit.SECONDS, tags)
+            meter = metricsGroup.newMeter(metricType, eventType, 
TimeUnit.SECONDS, tags)
             lazyMeter = meter
           }
         }
@@ -177,7 +181,7 @@ class BrokerTopicMetrics(name: Option[String]) extends 
KafkaMetricsGroup {
 
     def close(): Unit = meterLock synchronized {
       if (lazyMeter != null) {
-        removeMetric(metricType, tags)
+        metricsGroup.removeMetric(metricType, tags)
         lazyMeter = null
       }
     }
diff --git a/core/src/main/scala/kafka/server/ReplicaManager.scala 
b/core/src/main/scala/kafka/server/ReplicaManager.scala
index 4ef13903983..1fd21104208 100644
--- a/core/src/main/scala/kafka/server/ReplicaManager.scala
+++ b/core/src/main/scala/kafka/server/ReplicaManager.scala
@@ -22,7 +22,6 @@ import kafka.cluster.{BrokerEndPoint, Partition, 
PartitionListener}
 import kafka.controller.{KafkaController, StateChangeLogger}
 import kafka.log.remote.RemoteLogManager
 import kafka.log.{LogManager, UnifiedLog}
-import kafka.metrics.KafkaMetricsGroup
 import kafka.server.HostedPartition.Online
 import kafka.server.QuotaFactory.QuotaManagers
 import kafka.server.checkpoints.{LazyOffsetCheckpoints, OffsetCheckpointFile, 
OffsetCheckpoints}
@@ -57,6 +56,7 @@ import org.apache.kafka.metadata.LeaderConstants.NO_LEADER
 import org.apache.kafka.server.common.MetadataVersion._
 import org.apache.kafka.server.util.{Scheduler, ShutdownableThread}
 import org.apache.kafka.storage.internals.log.{AppendOrigin, FetchDataInfo, 
FetchParams, FetchPartitionData, LeaderHwChange, LogAppendInfo, LogConfig, 
LogDirFailureChannel, LogOffsetMetadata, LogReadInfo, RecordValidationException}
+import org.apache.kafka.server.metrics.KafkaMetricsGroup
 
 import java.io.File
 import java.nio.file.{Files, Paths}
@@ -194,7 +194,8 @@ class ReplicaManager(val config: KafkaConfig,
                      delayedDeleteRecordsPurgatoryParam: 
Option[DelayedOperationPurgatory[DelayedDeleteRecords]] = None,
                      delayedElectLeaderPurgatoryParam: 
Option[DelayedOperationPurgatory[DelayedElectLeader]] = None,
                      threadNamePrefix: Option[String] = None,
-                     ) extends Logging with KafkaMetricsGroup {
+                     ) extends Logging {
+  private val metricsGroup = new KafkaMetricsGroup(this.getClass)
 
   val delayedProducePurgatory = delayedProducePurgatoryParam.getOrElse(
     DelayedOperationPurgatory[DelayedProduce](
@@ -246,16 +247,16 @@ class ReplicaManager(val config: KafkaConfig,
   // Visible for testing
   private[server] val replicaSelectorOpt: Option[ReplicaSelector] = 
createReplicaSelector()
 
-  newGauge("LeaderCount", () => leaderPartitionsIterator.size)
+  metricsGroup.newGauge("LeaderCount", () => leaderPartitionsIterator.size)
   // Visible for testing
-  private[kafka] val partitionCount = newGauge("PartitionCount", () => 
allPartitions.size)
-  newGauge("OfflineReplicaCount", () => offlinePartitionCount)
-  newGauge("UnderReplicatedPartitions", () => underReplicatedPartitionCount)
-  newGauge("UnderMinIsrPartitionCount", () => 
leaderPartitionsIterator.count(_.isUnderMinIsr))
-  newGauge("AtMinIsrPartitionCount", () => 
leaderPartitionsIterator.count(_.isAtMinIsr))
-  newGauge("ReassigningPartitions", () => reassigningPartitionsCount)
-  newGauge("PartitionsWithLateTransactionsCount", () => lateTransactionsCount)
-  newGauge("ProducerIdCount", () => producerIdCount)
+  private[kafka] val partitionCount = metricsGroup.newGauge("PartitionCount", 
() => allPartitions.size)
+  metricsGroup.newGauge("OfflineReplicaCount", () => offlinePartitionCount)
+  metricsGroup.newGauge("UnderReplicatedPartitions", () => 
underReplicatedPartitionCount)
+  metricsGroup.newGauge("UnderMinIsrPartitionCount", () => 
leaderPartitionsIterator.count(_.isUnderMinIsr))
+  metricsGroup.newGauge("AtMinIsrPartitionCount", () => 
leaderPartitionsIterator.count(_.isAtMinIsr))
+  metricsGroup.newGauge("ReassigningPartitions", () => 
reassigningPartitionsCount)
+  metricsGroup.newGauge("PartitionsWithLateTransactionsCount", () => 
lateTransactionsCount)
+  metricsGroup.newGauge("ProducerIdCount", () => producerIdCount)
 
   def reassigningPartitionsCount: Int = 
leaderPartitionsIterator.count(_.isReassigning)
 
@@ -266,9 +267,9 @@ class ReplicaManager(val config: KafkaConfig,
 
   def producerIdCount: Int = 
onlinePartitionsIterator.map(_.producerIdCount).sum
 
-  val isrExpandRate: Meter = newMeter("IsrExpandsPerSec", "expands", 
TimeUnit.SECONDS)
-  val isrShrinkRate: Meter = newMeter("IsrShrinksPerSec", "shrinks", 
TimeUnit.SECONDS)
-  val failedIsrUpdatesRate: Meter = newMeter("FailedIsrUpdatesPerSec", 
"failedUpdates", TimeUnit.SECONDS)
+  val isrExpandRate: Meter = metricsGroup.newMeter("IsrExpandsPerSec", 
"expands", TimeUnit.SECONDS)
+  val isrShrinkRate: Meter = metricsGroup.newMeter("IsrShrinksPerSec", 
"shrinks", TimeUnit.SECONDS)
+  val failedIsrUpdatesRate: Meter = 
metricsGroup.newMeter("FailedIsrUpdatesPerSec", "failedUpdates", 
TimeUnit.SECONDS)
 
   def underReplicatedPartitionCount: Int = 
leaderPartitionsIterator.count(_.isUnderReplicated)
 
@@ -1921,15 +1922,15 @@ class ReplicaManager(val config: KafkaConfig,
   }
 
   def removeMetrics(): Unit = {
-    removeMetric("LeaderCount")
-    removeMetric("PartitionCount")
-    removeMetric("OfflineReplicaCount")
-    removeMetric("UnderReplicatedPartitions")
-    removeMetric("UnderMinIsrPartitionCount")
-    removeMetric("AtMinIsrPartitionCount")
-    removeMetric("ReassigningPartitions")
-    removeMetric("PartitionsWithLateTransactionsCount")
-    removeMetric("ProducerIdCount")
+    metricsGroup.removeMetric("LeaderCount")
+    metricsGroup.removeMetric("PartitionCount")
+    metricsGroup.removeMetric("OfflineReplicaCount")
+    metricsGroup.removeMetric("UnderReplicatedPartitions")
+    metricsGroup.removeMetric("UnderMinIsrPartitionCount")
+    metricsGroup.removeMetric("AtMinIsrPartitionCount")
+    metricsGroup.removeMetric("ReassigningPartitions")
+    metricsGroup.removeMetric("PartitionsWithLateTransactionsCount")
+    metricsGroup.removeMetric("ProducerIdCount")
   }
 
   def beginControlledShutdown(): Unit = {
diff --git a/core/src/main/scala/kafka/server/ZkAdminManager.scala 
b/core/src/main/scala/kafka/server/ZkAdminManager.scala
index 77e42b21dd6..97cf2b2f713 100644
--- a/core/src/main/scala/kafka/server/ZkAdminManager.scala
+++ b/core/src/main/scala/kafka/server/ZkAdminManager.scala
@@ -20,7 +20,6 @@ import java.util
 import java.util.Properties
 import kafka.admin.{AdminOperationException, AdminUtils}
 import kafka.common.TopicAlreadyMarkedForDeletionException
-import kafka.metrics.KafkaMetricsGroup
 import kafka.server.ConfigAdminManager.{prepareIncrementalConfigs, 
toLoggableProps}
 import kafka.server.DynamicConfig.QuotaConfigs
 import kafka.server.metadata.ZkConfigRepository
@@ -69,7 +68,7 @@ object ZkAdminManager {
 class ZkAdminManager(val config: KafkaConfig,
                      val metrics: Metrics,
                      val metadataCache: MetadataCache,
-                     val zkClient: KafkaZkClient) extends Logging with 
KafkaMetricsGroup {
+                     val zkClient: KafkaZkClient) extends Logging {
 
   this.logIdent = "[Admin Manager on Broker " + config.brokerId + "]: "
 
diff --git 
a/core/src/main/scala/kafka/server/metadata/BrokerMetadataListener.scala 
b/core/src/main/scala/kafka/server/metadata/BrokerMetadataListener.scala
index 789ae89f049..a12f25de471 100644
--- a/core/src/main/scala/kafka/server/metadata/BrokerMetadataListener.scala
+++ b/core/src/main/scala/kafka/server/metadata/BrokerMetadataListener.scala
@@ -16,10 +16,11 @@
  */
 package kafka.server.metadata
 
+import kafka.utils.Logging
+
 import java.util
 import java.util.concurrent.atomic.AtomicBoolean
 import java.util.concurrent.CompletableFuture
-import kafka.metrics.KafkaMetricsGroup
 import org.apache.kafka.common.utils.{LogContext, Time}
 import org.apache.kafka.image.writer.{ImageWriterOptions, RecordListWriter}
 import org.apache.kafka.image.{MetadataDelta, MetadataImage, 
MetadataProvenance}
@@ -42,7 +43,7 @@ class BrokerMetadataListener(
   val snapshotter: Option[MetadataSnapshotter],
   brokerMetrics: BrokerServerMetrics,
   _metadataLoadingFaultHandler: FaultHandler
-) extends RaftClient.Listener[ApiMessageAndVersion] with KafkaMetricsGroup {
+) extends RaftClient.Listener[ApiMessageAndVersion] with Logging {
 
   private val metadataFaultOccurred = new AtomicBoolean(false)
   private val metadataLoadingFaultHandler: FaultHandler = new FaultHandler() {
diff --git 
a/core/src/main/scala/kafka/server/metadata/BrokerServerMetrics.scala 
b/core/src/main/scala/kafka/server/metadata/BrokerServerMetrics.scala
index 6b7701485e2..212909101f2 100644
--- a/core/src/main/scala/kafka/server/metadata/BrokerServerMetrics.scala
+++ b/core/src/main/scala/kafka/server/metadata/BrokerServerMetrics.scala
@@ -17,8 +17,6 @@
 
 package kafka.server.metadata
 
-import kafka.metrics.KafkaMetricsGroup
-
 import java.util.concurrent.atomic.{AtomicLong, AtomicReference}
 import org.apache.kafka.common.MetricName
 import org.apache.kafka.common.metrics.Gauge
@@ -26,19 +24,20 @@ import org.apache.kafka.common.metrics.Metrics
 import org.apache.kafka.common.metrics.MetricConfig
 import org.apache.kafka.image.MetadataProvenance
 import org.apache.kafka.image.loader.MetadataLoaderMetrics
-import org.apache.kafka.server.metrics.KafkaYammerMetrics
+import org.apache.kafka.server.metrics.{KafkaMetricsGroup, KafkaYammerMetrics}
 
+import java.util.Collections
 import java.util.concurrent.TimeUnit.NANOSECONDS
 
 final class BrokerServerMetrics private (
   metrics: Metrics
-) extends MetadataLoaderMetrics with KafkaMetricsGroup {
+) extends MetadataLoaderMetrics {
   import BrokerServerMetrics._
 
-  private val batchProcessingTimeHistName = explicitMetricName("kafka.server",
+  private val batchProcessingTimeHistName = 
KafkaMetricsGroup.explicitMetricName("kafka.server",
     "BrokerMetadataListener",
     "MetadataBatchProcessingTimeUs",
-    Map.empty)
+    Collections.emptyMap())
 
   /**
    * A histogram tracking the time in microseconds it took to process batches 
of events.
@@ -46,10 +45,10 @@ final class BrokerServerMetrics private (
   private val batchProcessingTimeHist =
     
KafkaYammerMetrics.defaultRegistry().newHistogram(batchProcessingTimeHistName, 
true)
 
-  private val batchSizeHistName = explicitMetricName("kafka.server",
+  private val batchSizeHistName = 
KafkaMetricsGroup.explicitMetricName("kafka.server",
     "BrokerMetadataListener",
     "MetadataBatchSizes",
-    Map.empty)
+    Collections.emptyMap())
 
   /**
    * A histogram tracking the sizes of batches that we have processed.
diff --git a/core/src/main/scala/kafka/tools/MirrorMaker.scala 
b/core/src/main/scala/kafka/tools/MirrorMaker.scala
index 067e8aee84c..65c1013fcbf 100755
--- a/core/src/main/scala/kafka/tools/MirrorMaker.scala
+++ b/core/src/main/scala/kafka/tools/MirrorMaker.scala
@@ -24,7 +24,6 @@ import java.util.concurrent.CountDownLatch
 import java.util.regex.Pattern
 import java.util.{Collections, Properties}
 import kafka.consumer.BaseConsumerRecord
-import kafka.metrics.KafkaMetricsGroup
 import kafka.utils._
 import org.apache.kafka.clients.consumer._
 import org.apache.kafka.clients.producer.internals.ErrorLoggingCallback
@@ -34,6 +33,7 @@ import org.apache.kafka.common.record.RecordBatch
 import org.apache.kafka.common.serialization.{ByteArrayDeserializer, 
ByteArraySerializer}
 import org.apache.kafka.common.utils.{Time, Utils}
 import org.apache.kafka.common.{KafkaException, TopicPartition}
+import org.apache.kafka.server.metrics.KafkaMetricsGroup
 import org.apache.kafka.server.util.{CommandDefaultOptions, CommandLineUtils}
 
 import scala.jdk.CollectionConverters._
@@ -61,7 +61,8 @@ import scala.util.{Failure, Success, Try}
  * @deprecated Since 3.0, use the Connect-based MirrorMaker instead (aka MM2).
  */
 @deprecated(message = "Use the Connect-based MirrorMaker instead (aka MM2).", 
since = "3.0")
-object MirrorMaker extends Logging with KafkaMetricsGroup {
+object MirrorMaker extends Logging {
+  private val metricsGroup = new KafkaMetricsGroup(MirrorMaker.getClass)
 
   private[tools] var producer: MirrorMakerProducer = _
   private var mirrorMakerThreads: Seq[MirrorMakerThread] = _
@@ -78,7 +79,7 @@ object MirrorMaker extends Logging with KafkaMetricsGroup {
   // If a message send failed after retries are exhausted. The offset of the 
messages will also be removed from
   // the unacked offset list to avoid offset commit being stuck on that 
offset. In this case, the offset of that
   // message was not really acked, but was skipped. This metric records the 
number of skipped offsets.
-  newGauge("MirrorMaker-numDroppedMessages", () => numDroppedMessages.get())
+  metricsGroup.newGauge("MirrorMaker-numDroppedMessages", () => 
numDroppedMessages.get())
 
   def main(args: Array[String]): Unit = {
 
@@ -185,7 +186,7 @@ object MirrorMaker extends Logging with KafkaMetricsGroup {
   }
 
   class MirrorMakerThread(consumerWrapper: ConsumerWrapper,
-                          val threadId: Int) extends Thread with Logging with 
KafkaMetricsGroup {
+                          val threadId: Int) extends Thread with Logging {
     private val threadName = "mirrormaker-thread-" + threadId
     private val shutdownLatch: CountDownLatch = new CountDownLatch(1)
     private var lastOffsetCommitMs = System.currentTimeMillis()
diff --git a/core/src/main/scala/kafka/utils/Throttler.scala 
b/core/src/main/scala/kafka/utils/Throttler.scala
index a431db5f006..824c7dcfc28 100644
--- a/core/src/main/scala/kafka/utils/Throttler.scala
+++ b/core/src/main/scala/kafka/utils/Throttler.scala
@@ -17,8 +17,8 @@
 
 package kafka.utils
 
-import kafka.metrics.KafkaMetricsGroup
 import org.apache.kafka.common.utils.Time
+import org.apache.kafka.server.metrics.KafkaMetricsGroup
 
 import java.util.concurrent.TimeUnit
 import java.util.Random
@@ -41,10 +41,12 @@ class Throttler(@volatile var desiredRatePerSec: Double,
                 throttleDown: Boolean = true,
                 metricName: String = "throttler",
                 units: String = "entries",
-                time: Time = Time.SYSTEM) extends Logging with 
KafkaMetricsGroup {
-  
+                time: Time = Time.SYSTEM) extends Logging {
+
+  private val metricsGroup = new KafkaMetricsGroup(this.getClass)
+
   private val lock = new Object
-  private val meter = newMeter(metricName, units, TimeUnit.SECONDS)
+  private val meter = metricsGroup.newMeter(metricName, units, 
TimeUnit.SECONDS)
   private val checkIntervalNs = TimeUnit.MILLISECONDS.toNanos(checkIntervalMs)
   private var periodStartNs: Long = time.nanoseconds
   private var observedSoFar: Double = 0.0
diff --git a/core/src/main/scala/kafka/zk/KafkaZkClient.scala 
b/core/src/main/scala/kafka/zk/KafkaZkClient.scala
index 78dda799f17..056cc15a7e9 100644
--- a/core/src/main/scala/kafka/zk/KafkaZkClient.scala
+++ b/core/src/main/scala/kafka/zk/KafkaZkClient.scala
@@ -21,7 +21,6 @@ import com.yammer.metrics.core.MetricName
 import kafka.api.LeaderAndIsr
 import kafka.cluster.Broker
 import kafka.controller.{KafkaController, LeaderIsrAndControllerEpoch, 
ReplicaAssignment}
-import kafka.metrics.KafkaMetricsGroup
 import kafka.security.authorizer.AclAuthorizer.{NoAcls, VersionedAcls}
 import kafka.security.authorizer.AclEntry
 import kafka.server.{ConfigType, KafkaConfig}
@@ -35,6 +34,7 @@ import 
org.apache.kafka.common.security.token.delegation.{DelegationToken, Token
 import org.apache.kafka.common.utils.{Time, Utils}
 import org.apache.kafka.common.{KafkaException, TopicPartition, Uuid}
 import org.apache.kafka.metadata.migration.ZkMigrationLeadershipState
+import org.apache.kafka.server.metrics.KafkaMetricsGroup
 import org.apache.kafka.storage.internals.log.LogConfig
 import org.apache.zookeeper.KeeperException.{Code, NodeExistsException}
 import org.apache.zookeeper.OpResult.{CheckResult, CreateResult, ErrorResult, 
SetDataResult}
@@ -43,6 +43,7 @@ import org.apache.zookeeper.common.ZKConfig
 import org.apache.zookeeper.data.{ACL, Stat}
 import org.apache.zookeeper.{CreateMode, KeeperException, OpResult, ZooKeeper}
 
+import java.util
 import java.lang.{Long => JLong}
 import scala.collection.{Map, Seq, mutable}
 
@@ -59,13 +60,15 @@ case class SuccessfulRegistrationResult(zkControllerEpoch: 
Int, controllerEpochZ
  * monolithic [[kafka.zk.ZkData]] is the way to go.
  */
 class KafkaZkClient private[zk] (zooKeeperClient: ZooKeeperClient, isSecure: 
Boolean, time: Time) extends AutoCloseable with
-  Logging with KafkaMetricsGroup {
+  Logging {
 
-  override def metricName(name: String, metricTags: 
scala.collection.Map[String, String]): MetricName = {
-    explicitMetricName("kafka.server", "ZooKeeperClientMetrics", name, 
metricTags)
+  private val metricsGroup: KafkaMetricsGroup = new 
KafkaMetricsGroup(this.getClass) {
+    override def metricName(name: String, metricTags: util.Map[String, 
String]): MetricName = {
+      KafkaMetricsGroup.explicitMetricName("kafka.server", 
"ZooKeeperClientMetrics", name, metricTags)
+    }
   }
 
-  private val latencyMetric = newHistogram("ZooKeeperRequestLatencyMs")
+  private val latencyMetric = 
metricsGroup.newHistogram("ZooKeeperRequestLatencyMs")
 
   import KafkaZkClient._
 
@@ -1627,7 +1630,7 @@ class KafkaZkClient private[zk] (zooKeeperClient: 
ZooKeeperClient, isSecure: Boo
    * Close the underlying ZooKeeperClient.
    */
   def close(): Unit = {
-    removeMetric("ZooKeeperRequestLatencyMs")
+    metricsGroup.removeMetric("ZooKeeperRequestLatencyMs")
     zooKeeperClient.close()
   }
 
diff --git a/core/src/main/scala/kafka/zookeeper/ZooKeeperClient.scala 
b/core/src/main/scala/kafka/zookeeper/ZooKeeperClient.scala
index 531cf468adc..04c9184791c 100755
--- a/core/src/main/scala/kafka/zookeeper/ZooKeeperClient.scala
+++ b/core/src/main/scala/kafka/zookeeper/ZooKeeperClient.scala
@@ -22,12 +22,12 @@ import java.util.concurrent.locks.{ReentrantLock, 
ReentrantReadWriteLock}
 import java.util.concurrent._
 import java.util.{List => JList}
 import com.yammer.metrics.core.MetricName
-import kafka.metrics.KafkaMetricsGroup
 import kafka.utils.CoreUtils.{inLock, inReadLock, inWriteLock}
 import kafka.utils.Logging
 import kafka.zookeeper.ZooKeeperClient._
 import org.apache.kafka.common.utils.Time
 import org.apache.kafka.server.util.KafkaScheduler
+import org.apache.kafka.server.metrics.KafkaMetricsGroup
 import org.apache.zookeeper.AsyncCallback.{Children2Callback, DataCallback, 
StatCallback}
 import org.apache.zookeeper.KeeperException.Code
 import org.apache.zookeeper.Watcher.Event.{EventType, KeeperState}
@@ -36,6 +36,7 @@ import org.apache.zookeeper.data.{ACL, Stat}
 import org.apache.zookeeper._
 import org.apache.zookeeper.client.ZKClientConfig
 
+import java.util
 import scala.jdk.CollectionConverters._
 import scala.collection.Seq
 import scala.collection.mutable.Set
@@ -62,7 +63,14 @@ class ZooKeeperClient(connectString: String,
                       metricGroup: String,
                       metricType: String,
                       private[zookeeper] val clientConfig: ZKClientConfig,
-                      name: String) extends Logging with KafkaMetricsGroup {
+                      name: String) extends Logging {
+
+  private val metricsGroup: KafkaMetricsGroup = new 
KafkaMetricsGroup(this.getClass) {
+    override def metricName(name: String, metricTags: util.Map[String, 
String]): MetricName = {
+      KafkaMetricsGroup.explicitMetricName(metricGroup, metricType, name, 
metricTags)
+    }
+  }
+
 
   this.logIdent = s"[ZooKeeperClient $name] "
   private val initializationLock = new ReentrantReadWriteLock()
@@ -91,7 +99,7 @@ class ZooKeeperClient(connectString: String,
     stateToEventTypeMap.map { case (state, eventType) =>
       val name = s"ZooKeeper${eventType}PerSec"
       metricNames += name
-      state -> newMeter(name, eventType.toLowerCase(Locale.ROOT), 
TimeUnit.SECONDS)
+      state -> metricsGroup.newMeter(name, eventType.toLowerCase(Locale.ROOT), 
TimeUnit.SECONDS)
     }
   }
 
@@ -100,7 +108,7 @@ class ZooKeeperClient(connectString: String,
   @volatile private var zooKeeper = new ZooKeeper(connectString, 
sessionTimeoutMs, ZooKeeperClientWatcher,
     clientConfig)
 
-  newGauge("SessionState", () => connectionState.toString)
+  metricsGroup.newGauge("SessionState", () => connectionState.toString)
 
   metricNames += "SessionState"
 
@@ -112,10 +120,6 @@ class ZooKeeperClient(connectString: String,
       throw e
   }
 
-  override def metricName(name: String, metricTags: 
scala.collection.Map[String, String]): MetricName = {
-    explicitMetricName(metricGroup, metricType, name, metricTags)
-  }
-
   /**
    * Return the state of the ZooKeeper connection.
    */
@@ -344,7 +348,7 @@ class ZooKeeperClient(connectString: String,
       zNodeChildChangeHandlers.clear()
       stateChangeHandlers.clear()
       zooKeeper.close()
-      metricNames.foreach(removeMetric(_))
+      metricNames.foreach(metricsGroup.removeMetric(_))
     }
     info("Closed.")
   }
diff --git 
a/core/src/test/scala/unit/kafka/coordinator/group/GroupMetadataManagerTest.scala
 
b/core/src/test/scala/unit/kafka/coordinator/group/GroupMetadataManagerTest.scala
index 3052e166a94..e24530918b5 100644
--- 
a/core/src/test/scala/unit/kafka/coordinator/group/GroupMetadataManagerTest.scala
+++ 
b/core/src/test/scala/unit/kafka/coordinator/group/GroupMetadataManagerTest.scala
@@ -2572,7 +2572,7 @@ class GroupMetadataManagerTest {
   }
 
   private def getGauge(manager: GroupMetadataManager, name: String): 
Gauge[Int]  = {
-    
KafkaYammerMetrics.defaultRegistry().allMetrics().get(manager.metricName(name, 
Map.empty)).asInstanceOf[Gauge[Int]]
+    
KafkaYammerMetrics.defaultRegistry().allMetrics().get(manager.metricsGroup.metricName(name,
 Collections.emptyMap())).asInstanceOf[Gauge[Int]]
   }
 
   private def expectMetrics(manager: GroupMetadataManager,
diff --git a/core/src/test/scala/unit/kafka/log/LogCleanerIntegrationTest.scala 
b/core/src/test/scala/unit/kafka/log/LogCleanerIntegrationTest.scala
index ae09f7633cc..efdcae25355 100644
--- a/core/src/test/scala/unit/kafka/log/LogCleanerIntegrationTest.scala
+++ b/core/src/test/scala/unit/kafka/log/LogCleanerIntegrationTest.scala
@@ -20,7 +20,6 @@ package kafka.log
 import java.io.PrintWriter
 
 import com.yammer.metrics.core.{Gauge, MetricName}
-import kafka.metrics.KafkaMetricsGroup
 import kafka.utils.{MockTime, TestUtils}
 import org.apache.kafka.common.TopicPartition
 import org.apache.kafka.common.record.{CompressionType, RecordBatch}
@@ -34,7 +33,7 @@ import scala.jdk.CollectionConverters._
 /**
   * This is an integration test that tests the fully integrated log cleaner
   */
-class LogCleanerIntegrationTest extends AbstractLogCleanerIntegrationTest with 
KafkaMetricsGroup {
+class LogCleanerIntegrationTest extends AbstractLogCleanerIntegrationTest {
 
   val codec: CompressionType = CompressionType.LZ4
 
diff --git a/core/src/test/scala/unit/kafka/log/LogCleanerTest.scala 
b/core/src/test/scala/unit/kafka/log/LogCleanerTest.scala
index f0c9256d506..fdfa61c10e8 100644
--- a/core/src/test/scala/unit/kafka/log/LogCleanerTest.scala
+++ b/core/src/test/scala/unit/kafka/log/LogCleanerTest.scala
@@ -1888,7 +1888,7 @@ class LogCleanerTest {
       time = time)
 
     def checkGauge(name: String): Unit = {
-      val gauge = logCleaner.newGauge(name, () => 999)
+      val gauge = logCleaner.metricsGroup.newGauge(name, () => 999)
       // if there is no cleaners, 0 is default value
       assertEquals(0, gauge.value())
     }
diff --git a/core/src/test/scala/unit/kafka/log/LogManagerTest.scala 
b/core/src/test/scala/unit/kafka/log/LogManagerTest.scala
index a1f842c6c67..465bca9af1b 100755
--- a/core/src/test/scala/unit/kafka/log/LogManagerTest.scala
+++ b/core/src/test/scala/unit/kafka/log/LogManagerTest.scala
@@ -679,10 +679,10 @@ class LogManagerTest {
   }
 
   private def verifyRemainingLogsToRecoverMetric(spyLogManager: LogManager, 
expectedParams: Map[String, Int]): Unit = {
-    val spyLogManagerClassName = spyLogManager.getClass().getSimpleName
+    val logManagerClassName = classOf[LogManager].getSimpleName
     // get all `remainingLogsToRecover` metrics
     val logMetrics: ArrayBuffer[Gauge[Int]] = 
KafkaYammerMetrics.defaultRegistry.allMetrics.asScala
-      .filter { case (metric, _) => metric.getType == 
s"$spyLogManagerClassName" && metric.getName == "remainingLogsToRecover" }
+      .filter { case (metric, _) => metric.getType == s"$logManagerClassName" 
&& metric.getName == "remainingLogsToRecover" }
       .map { case (_, gauge) => gauge }
       .asInstanceOf[ArrayBuffer[Gauge[Int]]]
 
@@ -709,10 +709,10 @@ class LogManagerTest {
                                                      
recoveryThreadsPerDataDir: Int,
                                                      mockMap: 
ConcurrentHashMap[String, Int],
                                                      expectedParams: 
Map[String, Int]): Unit = {
-    val spyLogManagerClassName = spyLogManager.getClass().getSimpleName
+    val logManagerClassName = classOf[LogManager].getSimpleName
     // get all `remainingSegmentsToRecover` metrics
     val logSegmentMetrics: ArrayBuffer[Gauge[Int]] = 
KafkaYammerMetrics.defaultRegistry.allMetrics.asScala
-          .filter { case (metric, _) => metric.getType == 
s"$spyLogManagerClassName" && metric.getName == "remainingSegmentsToRecover" }
+          .filter { case (metric, _) => metric.getType == 
s"$logManagerClassName" && metric.getName == "remainingSegmentsToRecover" }
           .map { case (_, gauge) => gauge }
           .asInstanceOf[ArrayBuffer[Gauge[Int]]]
 
diff --git a/core/src/test/scala/unit/kafka/metrics/KafkaMetricsGroupTest.scala 
b/core/src/test/scala/unit/kafka/metrics/KafkaMetricsGroupTest.scala
index 918553589d6..2b0c46c5c46 100644
--- a/core/src/test/scala/unit/kafka/metrics/KafkaMetricsGroupTest.scala
+++ b/core/src/test/scala/unit/kafka/metrics/KafkaMetricsGroupTest.scala
@@ -17,18 +17,23 @@
 
 package kafka.metrics
 
+import org.apache.kafka.server.metrics.KafkaMetricsGroup
+
 import org.junit.jupiter.api.Assertions.{assertEquals, assertNull}
 import org.junit.jupiter.api.Test
 
+import java.util.Collections
+import scala.jdk.CollectionConverters._
+
 class KafkaMetricsGroupTest {
 
   @Test
   def testUntaggedMetricName(): Unit = {
     val metricName = KafkaMetricsGroup.explicitMetricName(
-      group = "kafka.metrics",
-      typeName = "TestMetrics",
-      name = "TaggedMetric",
-      Map.empty
+      "kafka.metrics",
+      "TestMetrics",
+      "TaggedMetric",
+      Collections.emptyMap()
     )
 
     assertEquals("kafka.metrics", metricName.getGroup)
@@ -41,11 +46,11 @@ class KafkaMetricsGroupTest {
 
   @Test
   def testTaggedMetricName(): Unit = {
-    val tags = Map("foo" -> "bar", "bar" -> "baz", "baz" -> "raz.taz")
+    val tags = Map("foo" -> "bar", "bar" -> "baz", "baz" -> "raz.taz").asJava
     val metricName = KafkaMetricsGroup.explicitMetricName(
-      group = "kafka.metrics",
-      typeName = "TestMetrics",
-      name = "TaggedMetric",
+      "kafka.metrics",
+      "TestMetrics",
+      "TaggedMetric",
       tags
     )
 
@@ -59,11 +64,11 @@ class KafkaMetricsGroupTest {
 
   @Test
   def testTaggedMetricNameWithEmptyValue(): Unit = {
-    val tags = Map("foo" -> "bar", "bar" -> "", "baz" -> "raz.taz")
+    val tags = Map("foo" -> "bar", "bar" -> "", "baz" -> "raz.taz").asJava
     val metricName = KafkaMetricsGroup.explicitMetricName(
-      group = "kafka.metrics",
-      typeName = "TestMetrics",
-      name = "TaggedMetric",
+      "kafka.metrics",
+      "TestMetrics",
+      "TaggedMetric",
       tags
     )
 
diff --git a/core/src/test/scala/unit/kafka/metrics/MetricsTest.scala 
b/core/src/test/scala/unit/kafka/metrics/MetricsTest.scala
index 3131eb57985..db619e2e0a2 100644
--- a/core/src/test/scala/unit/kafka/metrics/MetricsTest.scala
+++ b/core/src/test/scala/unit/kafka/metrics/MetricsTest.scala
@@ -32,6 +32,7 @@ import org.apache.kafka.common.TopicPartition
 import org.apache.kafka.common.config.TopicConfig
 import org.apache.kafka.common.metrics.JmxReporter
 import org.apache.kafka.common.utils.Time
+import org.apache.kafka.server.metrics.KafkaMetricsGroup
 import org.apache.kafka.server.metrics.KafkaYammerMetrics
 import org.junit.jupiter.api.Timeout
 import org.junit.jupiter.params.ParameterizedTest
@@ -158,7 +159,7 @@ class MetricsTest extends KafkaServerTestHarness with 
Logging {
     val path = "C:\\windows-path\\kafka-logs"
     val tags = Map("dir" -> path)
     val expectedMBeanName = Set(tags.keySet.head, 
ObjectName.quote(path)).mkString("=")
-    val metric = KafkaMetricsGroup.metricName("test-metric", tags)
+    val metric = new 
KafkaMetricsGroup(this.getClass).metricName("test-metric", tags.asJava)
     assert(metric.getMBeanName.endsWith(expectedMBeanName))
   }
 
diff --git a/core/src/test/scala/unit/kafka/network/ConnectionQuotasTest.scala 
b/core/src/test/scala/unit/kafka/network/ConnectionQuotasTest.scala
index 2ebcc37306d..725435cc67d 100644
--- a/core/src/test/scala/unit/kafka/network/ConnectionQuotasTest.scala
+++ b/core/src/test/scala/unit/kafka/network/ConnectionQuotasTest.scala
@@ -22,7 +22,6 @@ import java.util
 import java.util.concurrent.{Callable, ExecutorService, Executors, TimeUnit}
 import java.util.{Collections, Properties}
 import com.yammer.metrics.core.Meter
-import kafka.metrics.KafkaMetricsGroup
 import kafka.network.Processor.ListenerMetricTag
 import kafka.server.KafkaConfig
 import kafka.utils.Implicits.MapExtensionMethods
@@ -33,6 +32,7 @@ import org.apache.kafka.common.metrics.internals.MetricsUtils
 import org.apache.kafka.common.metrics.{KafkaMetric, MetricConfig, Metrics}
 import org.apache.kafka.common.network._
 import org.apache.kafka.common.utils.Time
+import org.apache.kafka.server.metrics.KafkaMetricsGroup
 import org.junit.jupiter.api.Assertions._
 import org.junit.jupiter.api._
 
@@ -88,8 +88,8 @@ class ConnectionQuotasTest {
     TestUtils.clearYammerMetrics()
 
     listeners.keys.foreach { name =>
-        blockedPercentMeters.put(name, KafkaMetricsGroup.newMeter(
-          s"${name}BlockedPercent", "blocked time", TimeUnit.NANOSECONDS, 
Map(ListenerMetricTag -> name)))
+        blockedPercentMeters.put(name, new 
KafkaMetricsGroup(this.getClass).newMeter(
+          s"${name}BlockedPercent", "blocked time", TimeUnit.NANOSECONDS, 
Map(ListenerMetricTag -> name).asJava))
     }
     // use system time, because ConnectionQuota causes the current thread to 
wait with timeout, which waits based on
     // system time; so using mock time will likely result in test flakiness 
due to a mixed use of mock and system time
diff --git 
a/server-common/src/main/java/org/apache/kafka/server/metrics/KafkaMetricsGroup.java
 
b/server-common/src/main/java/org/apache/kafka/server/metrics/KafkaMetricsGroup.java
new file mode 100644
index 00000000000..4560cd3f22c
--- /dev/null
+++ 
b/server-common/src/main/java/org/apache/kafka/server/metrics/KafkaMetricsGroup.java
@@ -0,0 +1,147 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ *    http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.kafka.server.metrics;
+
+import java.util.Collections;
+import java.util.List;
+import java.util.Map;
+import java.util.Optional;
+import java.util.concurrent.TimeUnit;
+import java.util.stream.Collectors;
+
+import com.yammer.metrics.core.Gauge;
+import com.yammer.metrics.core.Histogram;
+import com.yammer.metrics.core.Meter;
+import com.yammer.metrics.core.MetricName;
+import com.yammer.metrics.core.Timer;
+import org.apache.kafka.common.utils.Sanitizer;
+
+public class KafkaMetricsGroup {
+    private final Class<?> klass;
+
+    public KafkaMetricsGroup(Class<?> klass) {
+        this.klass = klass;
+    }
+
+    /**
+     * Creates a new MetricName object for gauges, meters, etc. created for 
this
+     * metrics group.
+     * @param name Descriptive name of the metric.
+     * @param tags Additional attributes which mBean will have.
+     * @return Sanitized metric name object.
+     */
+    public MetricName metricName(String name, Map<String, String> tags) {
+        String pkg = klass.getPackage() == null ? "" : 
klass.getPackage().getName();
+        String simpleName = klass.getSimpleName().replaceAll("\\$$", "");
+        return explicitMetricName(pkg, simpleName, name, tags);
+    }
+
+    public static MetricName explicitMetricName(String group, String typeName,
+                                                String name, Map<String, 
String> tags) {
+        StringBuilder nameBuilder = new StringBuilder(100);
+        nameBuilder.append(group);
+        nameBuilder.append(":type=");
+        nameBuilder.append(typeName);
+
+        if (!name.isEmpty()) {
+            nameBuilder.append(",name=");
+            nameBuilder.append(name);
+        }
+
+        String scope = toScope(tags).orElse(null);
+        Optional<String> tagsName = toMBeanName(tags);
+        tagsName.ifPresent(s -> nameBuilder.append(",").append(s));
+
+        return new MetricName(group, typeName, name, scope, 
nameBuilder.toString());
+    }
+
+    public final <T> Gauge<T> newGauge(String name, Gauge<T> metric, 
Map<String, String> tags) {
+        return KafkaYammerMetrics.defaultRegistry().newGauge(metricName(name, 
tags), metric);
+    }
+
+    public final <T> Gauge<T> newGauge(String name, Gauge<T> metric) {
+        return newGauge(name, metric, Collections.emptyMap());
+    }
+
+    public final Meter newMeter(String name, String eventType,
+                                TimeUnit timeUnit, Map<String, String> tags) {
+        return KafkaYammerMetrics.defaultRegistry().newMeter(metricName(name, 
tags), eventType, timeUnit);
+    }
+
+    public final Meter newMeter(String name, String eventType,
+                                TimeUnit timeUnit) {
+        return newMeter(name, eventType, timeUnit, Collections.emptyMap());
+    }
+
+    public final Meter newMeter(MetricName metricName, String eventType, 
TimeUnit timeUnit) {
+        return KafkaYammerMetrics.defaultRegistry().newMeter(metricName, 
eventType, timeUnit);
+    }
+
+    public final Histogram newHistogram(String name, boolean biased, 
Map<String, String> tags) {
+        return 
KafkaYammerMetrics.defaultRegistry().newHistogram(metricName(name, tags), 
biased);
+    }
+
+    public final Histogram newHistogram(String name) {
+        return newHistogram(name, true, Collections.emptyMap());
+    }
+
+    public final Timer newTimer(String name, TimeUnit durationUnit, TimeUnit 
rateUnit, Map<String, String> tags) {
+        return KafkaYammerMetrics.defaultRegistry().newTimer(metricName(name, 
tags), durationUnit, rateUnit);
+    }
+
+    public final Timer newTimer(String name, TimeUnit durationUnit, TimeUnit 
rateUnit) {
+        return newTimer(name, durationUnit, rateUnit, Collections.emptyMap());
+    }
+
+    public final void removeMetric(String name, Map<String, String> tags) {
+        KafkaYammerMetrics.defaultRegistry().removeMetric(metricName(name, 
tags));
+    }
+
+    public final void removeMetric(String name) {
+        removeMetric(name, Collections.emptyMap());
+    }
+
+    private static Optional<String> toMBeanName(Map<String, String> tags) {
+        List<Map.Entry<String, String>> filteredTags = tags.entrySet().stream()
+                .filter(entry -> !entry.getValue().equals(""))
+                .collect(Collectors.toList());
+        if (!filteredTags.isEmpty()) {
+            String tagsString = filteredTags.stream()
+                    .map(entry -> entry.getKey() + "=" + 
Sanitizer.jmxSanitize(entry.getValue()))
+                    .collect(Collectors.joining(","));
+            return Optional.of(tagsString);
+        } else {
+            return Optional.empty();
+        }
+    }
+
+    private static Optional<String> toScope(Map<String, String> tags) {
+        List<Map.Entry<String, String>> filteredTags = tags.entrySet().stream()
+                .filter(entry -> !entry.getValue().equals(""))
+                .collect(Collectors.toList());
+        if (!filteredTags.isEmpty()) {
+            // convert dot to _ since reporters like Graphite typically use 
dot to represent hierarchy
+            String tagsString = filteredTags.stream()
+                    .sorted(Map.Entry.comparingByKey())
+                    .map(entry -> entry.getKey() + "." + 
entry.getValue().replaceAll("\\.", "_"))
+                    .collect(Collectors.joining("."));
+            return Optional.of(tagsString);
+        } else {
+            return Optional.empty();
+        }
+    }
+}

Reply via email to