This is an automated email from the ASF dual-hosted git repository.
satishd pushed a commit to branch trunk
in repository https://gitbox.apache.org/repos/asf/kafka.git
The following commit(s) were added to refs/heads/trunk by this push:
new e28e0bf0f2c KAFKA-14524: Rewrite KafkaMetricsGroup in Java (#13067)
e28e0bf0f2c is described below
commit e28e0bf0f2c21206abccfffb280605dd02404678
Author: Ivan Yurchenko <[email protected]>
AuthorDate: Wed Mar 8 10:29:51 2023 +0000
KAFKA-14524: Rewrite KafkaMetricsGroup in Java (#13067)
* KAFKA-14524: Rewrite KafkaMetricsGroup in Java
Instead of being a base trait for classes, `KafkaMetricsGroup` is now an
independent object. User classes could override methods in it to adjust its
behavior like they used to with the trait model.
Some classes were extending the `KafkaMetricsGroup` trait, but it wasn't
actually used.
Reviewers: Ismael Juma <[email protected]>, lbownik
<[email protected]>, Satish Duggana <[email protected]>
---
core/src/main/scala/kafka/cluster/Partition.scala | 38 +++---
.../controller/ControllerChannelManager.scala | 18 +--
.../kafka/controller/ControllerEventManager.scala | 14 +-
.../scala/kafka/controller/KafkaController.scala | 42 +++---
.../coordinator/group/GroupMetadataManager.scala | 10 +-
.../TransactionMarkerChannelManager.scala | 10 +-
core/src/main/scala/kafka/log/LocalLog.scala | 3 +-
core/src/main/scala/kafka/log/LogCleaner.scala | 17 +--
.../main/scala/kafka/log/LogCleanerManager.scala | 18 +--
core/src/main/scala/kafka/log/LogManager.scala | 28 ++--
core/src/main/scala/kafka/log/LogSegment.scala | 7 +-
core/src/main/scala/kafka/log/UnifiedLog.scala | 35 ++---
.../scala/kafka/log/remote/RemoteLogManager.scala | 3 +-
.../scala/kafka/metrics/KafkaMetricsGroup.scala | 111 ----------------
.../main/scala/kafka/network/RequestChannel.scala | 86 +++++++-----
.../main/scala/kafka/network/SocketServer.scala | 40 +++---
.../kafka/server/AbstractFetcherManager.scala | 17 ++-
.../scala/kafka/server/AbstractFetcherThread.scala | 25 ++--
.../scala/kafka/server/AlterPartitionManager.scala | 3 +-
.../main/scala/kafka/server/ControllerServer.scala | 16 ++-
.../scala/kafka/server/DelayedDeleteRecords.scala | 8 +-
.../src/main/scala/kafka/server/DelayedFetch.scala | 10 +-
.../main/scala/kafka/server/DelayedOperation.scala | 17 ++-
.../main/scala/kafka/server/DelayedProduce.scala | 12 +-
.../kafka/server/DelegationTokenManager.scala | 3 +-
.../src/main/scala/kafka/server/FetchSession.scala | 24 ++--
core/src/main/scala/kafka/server/KafkaBroker.scala | 28 ++--
.../scala/kafka/server/KafkaRequestHandler.scala | 22 +--
.../main/scala/kafka/server/ReplicaManager.scala | 47 +++----
.../main/scala/kafka/server/ZkAdminManager.scala | 3 +-
.../server/metadata/BrokerMetadataListener.scala | 5 +-
.../server/metadata/BrokerServerMetrics.scala | 15 +--
core/src/main/scala/kafka/tools/MirrorMaker.scala | 9 +-
core/src/main/scala/kafka/utils/Throttler.scala | 10 +-
core/src/main/scala/kafka/zk/KafkaZkClient.scala | 15 ++-
.../scala/kafka/zookeeper/ZooKeeperClient.scala | 22 +--
.../group/GroupMetadataManagerTest.scala | 2 +-
.../unit/kafka/log/LogCleanerIntegrationTest.scala | 3 +-
.../test/scala/unit/kafka/log/LogCleanerTest.scala | 2 +-
.../test/scala/unit/kafka/log/LogManagerTest.scala | 8 +-
.../unit/kafka/metrics/KafkaMetricsGroupTest.scala | 29 ++--
.../scala/unit/kafka/metrics/MetricsTest.scala | 3 +-
.../unit/kafka/network/ConnectionQuotasTest.scala | 6 +-
.../kafka/server/metrics/KafkaMetricsGroup.java | 147 +++++++++++++++++++++
44 files changed, 555 insertions(+), 436 deletions(-)
diff --git a/core/src/main/scala/kafka/cluster/Partition.scala
b/core/src/main/scala/kafka/cluster/Partition.scala
index 80af26eda7f..1a0a6ab1b58 100755
--- a/core/src/main/scala/kafka/cluster/Partition.scala
+++ b/core/src/main/scala/kafka/cluster/Partition.scala
@@ -23,7 +23,6 @@ import kafka.api.LeaderAndIsr
import kafka.common.UnexpectedAppendOffsetException
import kafka.controller.{KafkaController, StateChangeLogger}
import kafka.log._
-import kafka.metrics.KafkaMetricsGroup
import kafka.server._
import kafka.server.checkpoints.OffsetCheckpoints
import kafka.server.metadata.{KRaftMetadataCache, ZkMetadataCache}
@@ -45,6 +44,7 @@ import org.apache.kafka.common.{IsolationLevel,
TopicPartition, Uuid}
import org.apache.kafka.metadata.LeaderRecoveryState
import org.apache.kafka.server.common.MetadataVersion
import org.apache.kafka.storage.internals.log.{AppendOrigin, FetchDataInfo,
FetchIsolation, FetchParams, LeaderHwChange, LogAppendInfo, LogOffsetMetadata,
LogOffsetSnapshot, LogOffsetsListener, LogReadInfo,
LogStartOffsetIncrementReason}
+import org.apache.kafka.server.metrics.KafkaMetricsGroup
import scala.collection.{Map, Seq}
import scala.jdk.CollectionConverters._
@@ -100,7 +100,9 @@ class DelayedOperations(topicPartition: TopicPartition,
def numDelayedDelete: Int = deleteRecords.numDelayed
}
-object Partition extends KafkaMetricsGroup {
+object Partition {
+ private val metricsGroup = new KafkaMetricsGroup(classOf[Partition])
+
def apply(topicPartition: TopicPartition,
time: Time,
replicaManager: ReplicaManager): Partition = {
@@ -136,13 +138,13 @@ object Partition extends KafkaMetricsGroup {
}
def removeMetrics(topicPartition: TopicPartition): Unit = {
- val tags = Map("topic" -> topicPartition.topic, "partition" ->
topicPartition.partition.toString)
- removeMetric("UnderReplicated", tags)
- removeMetric("UnderMinIsr", tags)
- removeMetric("InSyncReplicasCount", tags)
- removeMetric("ReplicasCount", tags)
- removeMetric("LastStableOffsetLag", tags)
- removeMetric("AtMinIsr", tags)
+ val tags = Map("topic" -> topicPartition.topic, "partition" ->
topicPartition.partition.toString).asJava
+ metricsGroup.removeMetric("UnderReplicated", tags)
+ metricsGroup.removeMetric("UnderMinIsr", tags)
+ metricsGroup.removeMetric("InSyncReplicasCount", tags)
+ metricsGroup.removeMetric("ReplicasCount", tags)
+ metricsGroup.removeMetric("LastStableOffsetLag", tags)
+ metricsGroup.removeMetric("AtMinIsr", tags)
}
}
@@ -284,7 +286,9 @@ class Partition(val topicPartition: TopicPartition,
delayedOperations: DelayedOperations,
metadataCache: MetadataCache,
logManager: LogManager,
- alterIsrManager: AlterPartitionManager) extends Logging with
KafkaMetricsGroup {
+ alterIsrManager: AlterPartitionManager) extends Logging {
+
+ import Partition.metricsGroup
def topic: String = topicPartition.topic
def partitionId: Int = topicPartition.partition
@@ -334,14 +338,14 @@ class Partition(val topicPartition: TopicPartition,
private var controllerEpoch: Int = KafkaController.InitialControllerEpoch
this.logIdent = s"[Partition $topicPartition broker=$localBrokerId] "
- private val tags = Map("topic" -> topic, "partition" -> partitionId.toString)
+ private val tags = Map("topic" -> topic, "partition" ->
partitionId.toString).asJava
- newGauge("UnderReplicated", () => if (isUnderReplicated) 1 else 0, tags)
- newGauge("InSyncReplicasCount", () => if (isLeader) partitionState.isr.size
else 0, tags)
- newGauge("UnderMinIsr", () => if (isUnderMinIsr) 1 else 0, tags)
- newGauge("AtMinIsr", () => if (isAtMinIsr) 1 else 0, tags)
- newGauge("ReplicasCount", () => if (isLeader)
assignmentState.replicationFactor else 0, tags)
- newGauge("LastStableOffsetLag", () =>
log.map(_.lastStableOffsetLag).getOrElse(0), tags)
+ metricsGroup.newGauge("UnderReplicated", () => if (isUnderReplicated) 1 else
0, tags)
+ metricsGroup.newGauge("InSyncReplicasCount", () => if (isLeader)
partitionState.isr.size else 0, tags)
+ metricsGroup.newGauge("UnderMinIsr", () => if (isUnderMinIsr) 1 else 0, tags)
+ metricsGroup.newGauge("AtMinIsr", () => if (isAtMinIsr) 1 else 0, tags)
+ metricsGroup.newGauge("ReplicasCount", () => if (isLeader)
assignmentState.replicationFactor else 0, tags)
+ metricsGroup.newGauge("LastStableOffsetLag", () =>
log.map(_.lastStableOffsetLag).getOrElse(0), tags)
def hasLateTransaction(currentTimeMs: Long): Boolean =
leaderLogIfLocal.exists(_.hasLateTransaction(currentTimeMs))
diff --git
a/core/src/main/scala/kafka/controller/ControllerChannelManager.scala
b/core/src/main/scala/kafka/controller/ControllerChannelManager.scala
index 244c61d55a2..2d413a4eb20 100755
--- a/core/src/main/scala/kafka/controller/ControllerChannelManager.scala
+++ b/core/src/main/scala/kafka/controller/ControllerChannelManager.scala
@@ -19,7 +19,6 @@ package kafka.controller
import com.yammer.metrics.core.{Gauge, Timer}
import kafka.api._
import kafka.cluster.Broker
-import kafka.metrics.KafkaMetricsGroup
import kafka.server.KafkaConfig
import kafka.utils.Implicits._
import kafka.utils._
@@ -37,6 +36,7 @@ import org.apache.kafka.common.security.auth.SecurityProtocol
import org.apache.kafka.common.utils.{LogContext, Time}
import org.apache.kafka.server.common.MetadataVersion
import org.apache.kafka.server.common.MetadataVersion._
+import org.apache.kafka.server.metrics.KafkaMetricsGroup
import org.apache.kafka.server.util.ShutdownableThread
import java.net.SocketTimeoutException
@@ -54,14 +54,16 @@ class ControllerChannelManager(controllerEpoch: () => Int,
time: Time,
metrics: Metrics,
stateChangeLogger: StateChangeLogger,
- threadNamePrefix: Option[String] = None)
extends Logging with KafkaMetricsGroup {
+ threadNamePrefix: Option[String] = None)
extends Logging {
import ControllerChannelManager._
+ private val metricsGroup = new KafkaMetricsGroup(this.getClass)
+
protected val brokerStateInfo = new mutable.HashMap[Int,
ControllerBrokerStateInfo]
private val brokerLock = new Object
this.logIdent = "[Channel manager on controller " + config.brokerId + "]: "
- newGauge("TotalQueueSize",
+ metricsGroup.newGauge("TotalQueueSize",
() => brokerLock synchronized {
brokerStateInfo.values.iterator.map(_.messageQueue.size).sum
}
@@ -169,7 +171,7 @@ class ControllerChannelManager(controllerEpoch: () => Int,
case Some(name) =>
s"$name:Controller-${config.brokerId}-to-broker-${broker.id}-send-thread"
}
- val requestRateAndQueueTimeMetrics = newTimer(
+ val requestRateAndQueueTimeMetrics = metricsGroup.newTimer(
RequestRateAndQueueTimeMetricName, TimeUnit.MILLISECONDS,
TimeUnit.SECONDS, brokerMetricTags(broker.id)
)
@@ -177,13 +179,13 @@ class ControllerChannelManager(controllerEpoch: () => Int,
brokerNode, config, time, requestRateAndQueueTimeMetrics,
stateChangeLogger, threadName)
requestThread.setDaemon(false)
- val queueSizeGauge = newGauge(QueueSizeMetricName, () =>
messageQueue.size, brokerMetricTags(broker.id))
+ val queueSizeGauge = metricsGroup.newGauge(QueueSizeMetricName, () =>
messageQueue.size, brokerMetricTags(broker.id))
brokerStateInfo.put(broker.id, ControllerBrokerStateInfo(networkClient,
brokerNode, messageQueue,
requestThread, queueSizeGauge, requestRateAndQueueTimeMetrics,
reconfigurableChannelBuilder))
}
- private def brokerMetricTags(brokerId: Int) = Map("broker-id" ->
brokerId.toString)
+ private def brokerMetricTags(brokerId: Int) = Map("broker-id" ->
brokerId.toString).asJava
private def removeExistingBroker(brokerState: ControllerBrokerStateInfo):
Unit = {
try {
@@ -195,8 +197,8 @@ class ControllerChannelManager(controllerEpoch: () => Int,
brokerState.requestSendThread.shutdown()
brokerState.networkClient.close()
brokerState.messageQueue.clear()
- removeMetric(QueueSizeMetricName,
brokerMetricTags(brokerState.brokerNode.id))
- removeMetric(RequestRateAndQueueTimeMetricName,
brokerMetricTags(brokerState.brokerNode.id))
+ metricsGroup.removeMetric(QueueSizeMetricName,
brokerMetricTags(brokerState.brokerNode.id))
+ metricsGroup.removeMetric(RequestRateAndQueueTimeMetricName,
brokerMetricTags(brokerState.brokerNode.id))
brokerStateInfo.remove(brokerState.brokerNode.id)
} catch {
case e: Throwable => error("Error while removing broker by the
controller", e)
diff --git a/core/src/main/scala/kafka/controller/ControllerEventManager.scala
b/core/src/main/scala/kafka/controller/ControllerEventManager.scala
index d8b2af41303..a1b4835ea97 100644
--- a/core/src/main/scala/kafka/controller/ControllerEventManager.scala
+++ b/core/src/main/scala/kafka/controller/ControllerEventManager.scala
@@ -23,10 +23,10 @@ import java.util.ArrayList
import java.util.concurrent.atomic.AtomicBoolean
import java.util.concurrent.{CountDownLatch, LinkedBlockingQueue, TimeUnit}
import java.util.concurrent.locks.ReentrantLock
-import kafka.metrics.KafkaMetricsGroup
import kafka.utils.CoreUtils.inLock
import kafka.utils.Logging
import org.apache.kafka.common.utils.Time
+import org.apache.kafka.server.metrics.KafkaMetricsGroup
import org.apache.kafka.server.util.ShutdownableThread
import scala.collection._
@@ -73,18 +73,20 @@ class ControllerEventManager(controllerId: Int,
processor: ControllerEventProcessor,
time: Time,
rateAndTimeMetrics: Map[ControllerState, Timer],
- eventQueueTimeTimeoutMs: Long = 300000) extends
KafkaMetricsGroup {
+ eventQueueTimeTimeoutMs: Long = 300000) {
import ControllerEventManager._
+ private val metricsGroup = new KafkaMetricsGroup(this.getClass)
+
@volatile private var _state: ControllerState = ControllerState.Idle
private val putLock = new ReentrantLock()
private val queue = new LinkedBlockingQueue[QueuedEvent]
// Visible for test
private[controller] var thread = new
ControllerEventThread(ControllerEventThreadName)
- private val eventQueueTimeHist = newHistogram(EventQueueTimeMetricName)
+ private val eventQueueTimeHist =
metricsGroup.newHistogram(EventQueueTimeMetricName)
- newGauge(EventQueueSizeMetricName, () => queue.size)
+ metricsGroup.newGauge(EventQueueSizeMetricName, () => queue.size)
def state: ControllerState = _state
@@ -96,8 +98,8 @@ class ControllerEventManager(controllerId: Int,
clearAndPut(ShutdownEventThread)
thread.awaitShutdown()
} finally {
- removeMetric(EventQueueTimeMetricName)
- removeMetric(EventQueueSizeMetricName)
+ metricsGroup.removeMetric(EventQueueTimeMetricName)
+ metricsGroup.removeMetric(EventQueueSizeMetricName)
}
}
diff --git a/core/src/main/scala/kafka/controller/KafkaController.scala
b/core/src/main/scala/kafka/controller/KafkaController.scala
index ebe768d4fb1..64ae5ad39c1 100644
--- a/core/src/main/scala/kafka/controller/KafkaController.scala
+++ b/core/src/main/scala/kafka/controller/KafkaController.scala
@@ -25,7 +25,6 @@ import kafka.common._
import kafka.cluster.Broker
import kafka.controller.KafkaController.{AlterReassignmentsCallback,
ElectLeadersCallback, ListReassignmentsCallback, UpdateFeaturesCallback}
import kafka.coordinator.transaction.ZkProducerIdManager
-import kafka.metrics.KafkaMetricsGroup
import kafka.server._
import kafka.server.metadata.ZkFinalizedFeatureCache
import kafka.utils._
@@ -47,6 +46,7 @@ import
org.apache.kafka.common.requests.{AbstractControlRequest, ApiError, Leade
import org.apache.kafka.common.utils.{Time, Utils}
import org.apache.kafka.metadata.LeaderRecoveryState
import org.apache.kafka.server.common.ProducerIdsBlock
+import org.apache.kafka.server.metrics.KafkaMetricsGroup
import org.apache.kafka.server.util.KafkaScheduler
import org.apache.zookeeper.KeeperException
import org.apache.zookeeper.KeeperException.Code
@@ -81,7 +81,9 @@ class KafkaController(val config: KafkaConfig,
brokerFeatures: BrokerFeatures,
featureCache: ZkFinalizedFeatureCache,
threadNamePrefix: Option[String] = None)
- extends ControllerEventProcessor with Logging with KafkaMetricsGroup {
+ extends ControllerEventProcessor with Logging {
+
+ private val metricsGroup = new KafkaMetricsGroup(this.getClass)
this.logIdent = s"[Controller id=${config.brokerId}] "
@@ -142,19 +144,19 @@ class KafkaController(val config: KafkaConfig,
/* single-thread scheduler to clean expired tokens */
private val tokenCleanScheduler = new KafkaScheduler(1, true,
"delegation-token-cleaner")
- newGauge("ActiveControllerCount", () => if (isActive) 1 else 0)
- newGauge("OfflinePartitionsCount", () => offlinePartitionCount)
- newGauge("PreferredReplicaImbalanceCount", () =>
preferredReplicaImbalanceCount)
- newGauge("ControllerState", () => state.value)
- newGauge("GlobalTopicCount", () => globalTopicCount)
- newGauge("GlobalPartitionCount", () => globalPartitionCount)
- newGauge("TopicsToDeleteCount", () => topicsToDeleteCount)
- newGauge("ReplicasToDeleteCount", () => replicasToDeleteCount)
- newGauge("TopicsIneligibleToDeleteCount", () =>
ineligibleTopicsToDeleteCount)
- newGauge("ReplicasIneligibleToDeleteCount", () =>
ineligibleReplicasToDeleteCount)
- newGauge("ActiveBrokerCount", () => activeBrokerCount)
+ metricsGroup.newGauge("ActiveControllerCount", () => if (isActive) 1 else 0)
+ metricsGroup.newGauge("OfflinePartitionsCount", () => offlinePartitionCount)
+ metricsGroup.newGauge("PreferredReplicaImbalanceCount", () =>
preferredReplicaImbalanceCount)
+ metricsGroup.newGauge("ControllerState", () => state.value)
+ metricsGroup.newGauge("GlobalTopicCount", () => globalTopicCount)
+ metricsGroup.newGauge("GlobalPartitionCount", () => globalPartitionCount)
+ metricsGroup.newGauge("TopicsToDeleteCount", () => topicsToDeleteCount)
+ metricsGroup.newGauge("ReplicasToDeleteCount", () => replicasToDeleteCount)
+ metricsGroup.newGauge("TopicsIneligibleToDeleteCount", () =>
ineligibleTopicsToDeleteCount)
+ metricsGroup.newGauge("ReplicasIneligibleToDeleteCount", () =>
ineligibleReplicasToDeleteCount)
+ metricsGroup.newGauge("ActiveBrokerCount", () => activeBrokerCount)
// FencedBrokerCount metric is always 0 in the ZK controller.
- newGauge("FencedBrokerCount", () => 0)
+ metricsGroup.newGauge("FencedBrokerCount", () => 0)
/**
* Returns true if this broker is the current controller.
@@ -2727,15 +2729,21 @@ case class LeaderIsrAndControllerEpoch(leaderAndIsr:
LeaderAndIsr, controllerEpo
}
}
-private[controller] class ControllerStats extends KafkaMetricsGroup {
- val uncleanLeaderElectionRate = newMeter("UncleanLeaderElectionsPerSec",
"elections", TimeUnit.SECONDS)
+private[controller] class ControllerStats {
+ private val metricsGroup = new KafkaMetricsGroup(this.getClass)
+
+ val uncleanLeaderElectionRate =
metricsGroup.newMeter("UncleanLeaderElectionsPerSec", "elections",
TimeUnit.SECONDS)
val rateAndTimeMetrics: Map[ControllerState, Timer] =
ControllerState.values.flatMap { state =>
state.rateAndTimeMetricName.map { metricName =>
- state -> newTimer(metricName, TimeUnit.MILLISECONDS, TimeUnit.SECONDS)
+ state -> metricsGroup.newTimer(metricName, TimeUnit.MILLISECONDS,
TimeUnit.SECONDS)
}
}.toMap
+ // For test.
+ def removeMetric(name: String): Unit = {
+ metricsGroup.removeMetric(name)
+ }
}
sealed trait ControllerEvent {
diff --git
a/core/src/main/scala/kafka/coordinator/group/GroupMetadataManager.scala
b/core/src/main/scala/kafka/coordinator/group/GroupMetadataManager.scala
index 3e0317fbd9f..b45ee5f4736 100644
--- a/core/src/main/scala/kafka/coordinator/group/GroupMetadataManager.scala
+++ b/core/src/main/scala/kafka/coordinator/group/GroupMetadataManager.scala
@@ -26,7 +26,6 @@ import java.util.concurrent.locks.ReentrantLock
import java.util.concurrent.ConcurrentHashMap
import com.yammer.metrics.core.Gauge
import kafka.common.OffsetAndMetadata
-import kafka.metrics.KafkaMetricsGroup
import kafka.server.{ReplicaManager, RequestLocal}
import kafka.utils.CoreUtils.inLock
import kafka.utils.Implicits._
@@ -46,6 +45,7 @@ import org.apache.kafka.common.{KafkaException,
MessageFormatter, TopicPartition
import org.apache.kafka.coordinator.group.generated.{GroupMetadataValue,
OffsetCommitKey, OffsetCommitValue, GroupMetadataKey => GroupMetadataKeyData}
import org.apache.kafka.server.common.MetadataVersion
import org.apache.kafka.server.common.MetadataVersion.{IBP_0_10_1_IV0,
IBP_2_1_IV0, IBP_2_1_IV1, IBP_2_3_IV0}
+import org.apache.kafka.server.metrics.KafkaMetricsGroup
import org.apache.kafka.server.util.KafkaScheduler
import org.apache.kafka.storage.internals.log.{AppendOrigin, FetchIsolation}
@@ -58,7 +58,9 @@ class GroupMetadataManager(brokerId: Int,
config: OffsetConfig,
val replicaManager: ReplicaManager,
time: Time,
- metrics: Metrics) extends Logging with
KafkaMetricsGroup {
+ metrics: Metrics) extends Logging {
+ // Visible for test.
+ private[group] val metricsGroup: KafkaMetricsGroup = new
KafkaMetricsGroup(this.getClass)
private val compressionType: CompressionType =
config.offsetsTopicCompressionType
@@ -123,8 +125,8 @@ class GroupMetadataManager(brokerId: Int,
this.logIdent = s"[GroupMetadataManager brokerId=$brokerId] "
private def recreateGauge[T](name: String, gauge: Gauge[T]): Gauge[T] = {
- removeMetric(name)
- newGauge(name, gauge)
+ metricsGroup.removeMetric(name)
+ metricsGroup.newGauge(name, gauge)
}
recreateGauge("NumOffsets",
diff --git
a/core/src/main/scala/kafka/coordinator/transaction/TransactionMarkerChannelManager.scala
b/core/src/main/scala/kafka/coordinator/transaction/TransactionMarkerChannelManager.scala
index 94baf0f976d..049aae97c7e 100644
---
a/core/src/main/scala/kafka/coordinator/transaction/TransactionMarkerChannelManager.scala
+++
b/core/src/main/scala/kafka/coordinator/transaction/TransactionMarkerChannelManager.scala
@@ -21,7 +21,6 @@ import java.util
import java.util.concurrent.{BlockingQueue, ConcurrentHashMap,
LinkedBlockingQueue}
import kafka.common.{InterBrokerSendThread, RequestAndCompletionHandler}
-import kafka.metrics.KafkaMetricsGroup
import kafka.server.{KafkaConfig, MetadataCache, RequestLocal}
import kafka.utils.Implicits._
import kafka.utils.{CoreUtils, Logging}
@@ -35,6 +34,7 @@ import org.apache.kafka.common.security.JaasContext
import org.apache.kafka.common.utils.{LogContext, Time}
import org.apache.kafka.common.{Node, Reconfigurable, TopicPartition}
import org.apache.kafka.server.common.MetadataVersion.IBP_2_8_IV0
+import org.apache.kafka.server.metrics.KafkaMetricsGroup
import scala.collection.{concurrent, immutable}
import scala.jdk.CollectionConverters._
@@ -133,7 +133,9 @@ class TransactionMarkerChannelManager(
txnStateManager: TransactionStateManager,
time: Time
) extends InterBrokerSendThread("TxnMarkerSenderThread-" + config.brokerId,
networkClient, config.requestTimeoutMs, time)
- with Logging with KafkaMetricsGroup {
+ with Logging {
+
+ private val metricsGroup = new KafkaMetricsGroup(this.getClass)
this.logIdent = "[Transaction Marker Channel Manager " + config.brokerId +
"]: "
@@ -151,8 +153,8 @@ class TransactionMarkerChannelManager(
if (config.interBrokerProtocolVersion.isAtLeast(IBP_2_8_IV0)) 1
else 0
- newGauge("UnknownDestinationQueueSize", () =>
markersQueueForUnknownBroker.totalNumMarkers)
- newGauge("LogAppendRetryQueueSize", () => txnLogAppendRetryQueue.size)
+ metricsGroup.newGauge("UnknownDestinationQueueSize", () =>
markersQueueForUnknownBroker.totalNumMarkers)
+ metricsGroup.newGauge("LogAppendRetryQueueSize", () =>
txnLogAppendRetryQueue.size)
override def shutdown(): Unit = {
super.shutdown()
diff --git a/core/src/main/scala/kafka/log/LocalLog.scala
b/core/src/main/scala/kafka/log/LocalLog.scala
index 97d3f5db685..34442e3c3f5 100644
--- a/core/src/main/scala/kafka/log/LocalLog.scala
+++ b/core/src/main/scala/kafka/log/LocalLog.scala
@@ -17,7 +17,6 @@
package kafka.log
-import kafka.metrics.KafkaMetricsGroup
import kafka.utils.Logging
import org.apache.kafka.common.errors.{KafkaStorageException,
OffsetOutOfRangeException}
import org.apache.kafka.common.message.FetchResponseData
@@ -69,7 +68,7 @@ class LocalLog(@volatile private var _dir: File,
private[log] val scheduler: Scheduler,
private[log] val time: Time,
private[log] val topicPartition: TopicPartition,
- private[log] val logDirFailureChannel: LogDirFailureChannel)
extends Logging with KafkaMetricsGroup {
+ private[log] val logDirFailureChannel: LogDirFailureChannel)
extends Logging {
import kafka.log.LocalLog._
diff --git a/core/src/main/scala/kafka/log/LogCleaner.scala
b/core/src/main/scala/kafka/log/LogCleaner.scala
index f03211b2bdb..25511976e31 100644
--- a/core/src/main/scala/kafka/log/LogCleaner.scala
+++ b/core/src/main/scala/kafka/log/LogCleaner.scala
@@ -22,7 +22,6 @@ import java.nio._
import java.util.Date
import java.util.concurrent.TimeUnit
import kafka.common._
-import kafka.metrics.KafkaMetricsGroup
import kafka.server.{BrokerReconfigurable, KafkaConfig}
import kafka.utils._
import org.apache.kafka.common.{KafkaException, TopicPartition}
@@ -32,6 +31,7 @@ import
org.apache.kafka.common.record.MemoryRecords.RecordFilter
import org.apache.kafka.common.record.MemoryRecords.RecordFilter.BatchRetention
import org.apache.kafka.common.record._
import org.apache.kafka.common.utils.{BufferSupplier, Time}
+import org.apache.kafka.server.metrics.KafkaMetricsGroup
import org.apache.kafka.server.util.ShutdownableThread
import org.apache.kafka.storage.internals.log.{AbortedTxn, CleanerConfig,
LastRecord, LogDirFailureChannel, OffsetMap, SkimpyOffsetMap, TransactionIndex}
@@ -96,8 +96,9 @@ class LogCleaner(initialConfig: CleanerConfig,
val logDirs: Seq[File],
val logs: Pool[TopicPartition, UnifiedLog],
val logDirFailureChannel: LogDirFailureChannel,
- time: Time = Time.SYSTEM) extends Logging with
KafkaMetricsGroup with BrokerReconfigurable
-{
+ time: Time = Time.SYSTEM) extends Logging with
BrokerReconfigurable {
+ // Visible for test.
+ private[log] val metricsGroup = new KafkaMetricsGroup(this.getClass)
/* Log cleaner configuration which may be dynamically updated */
@volatile private var config = initialConfig
@@ -125,27 +126,27 @@ class LogCleaner(initialConfig: CleanerConfig,
/* a metric to track the maximum utilization of any thread's buffer in the
last cleaning */
- newGauge("max-buffer-utilization-percent",
+ metricsGroup.newGauge("max-buffer-utilization-percent",
() => maxOverCleanerThreads(_.lastStats.bufferUtilization) * 100)
/* a metric to track the recopy rate of each thread's last cleaning */
- newGauge("cleaner-recopy-percent", () => {
+ metricsGroup.newGauge("cleaner-recopy-percent", () => {
val stats = cleaners.map(_.lastStats)
val recopyRate = stats.iterator.map(_.bytesWritten).sum.toDouble /
math.max(stats.iterator.map(_.bytesRead).sum, 1)
(100 * recopyRate).toInt
})
/* a metric to track the maximum cleaning time for the last cleaning from
each thread */
- newGauge("max-clean-time-secs",
+ metricsGroup.newGauge("max-clean-time-secs",
() => maxOverCleanerThreads(_.lastStats.elapsedSecs))
// a metric to track delay between the time when a log is required to be
compacted
// as determined by max compaction lag and the time of last cleaner run.
- newGauge("max-compaction-delay-secs",
+ metricsGroup.newGauge("max-compaction-delay-secs",
() =>
maxOverCleanerThreads(_.lastPreCleanStats.maxCompactionDelayMs.toDouble) / 1000)
- newGauge("DeadThreadCount", () => deadThreadCount)
+ metricsGroup.newGauge("DeadThreadCount", () => deadThreadCount)
private[log] def deadThreadCount: Int = cleaners.count(_.isThreadFailed)
diff --git a/core/src/main/scala/kafka/log/LogCleanerManager.scala
b/core/src/main/scala/kafka/log/LogCleanerManager.scala
index db2738ba6d6..ef5df50ca8f 100755
--- a/core/src/main/scala/kafka/log/LogCleanerManager.scala
+++ b/core/src/main/scala/kafka/log/LogCleanerManager.scala
@@ -22,7 +22,6 @@ import java.util.concurrent.TimeUnit
import java.util.concurrent.locks.ReentrantLock
import kafka.common.LogCleaningAbortedException
-import kafka.metrics.KafkaMetricsGroup
import kafka.server.checkpoints.OffsetCheckpointFile
import kafka.utils.CoreUtils._
import kafka.utils.{Logging, Pool}
@@ -30,8 +29,10 @@ import org.apache.kafka.common.{KafkaException,
TopicPartition}
import org.apache.kafka.common.errors.KafkaStorageException
import org.apache.kafka.common.utils.Time
import org.apache.kafka.storage.internals.log.LogDirFailureChannel
+import org.apache.kafka.server.metrics.KafkaMetricsGroup
import scala.collection.{Iterable, Seq, mutable}
+import scala.jdk.CollectionConverters._
private[log] sealed trait LogCleaningState
private[log] case object LogCleaningInProgress extends LogCleaningState
@@ -60,9 +61,10 @@ private[log] class LogCleaningException(val log: UnifiedLog,
*/
private[log] class LogCleanerManager(val logDirs: Seq[File],
val logs: Pool[TopicPartition,
UnifiedLog],
- val logDirFailureChannel:
LogDirFailureChannel) extends Logging with KafkaMetricsGroup {
+ val logDirFailureChannel:
LogDirFailureChannel) extends Logging {
import LogCleanerManager._
+ private val metricsGroup = new KafkaMetricsGroup(this.getClass)
protected override def loggerName: String = classOf[LogCleaner].getName
@@ -88,15 +90,15 @@ private[log] class LogCleanerManager(val logDirs: Seq[File],
/* gauges for tracking the number of partitions marked as uncleanable for
each log directory */
for (dir <- logDirs) {
- newGauge("uncleanable-partitions-count",
+ metricsGroup.newGauge("uncleanable-partitions-count",
() => inLock(lock) {
uncleanablePartitions.get(dir.getAbsolutePath).map(_.size).getOrElse(0) },
- Map("logDirectory" -> dir.getAbsolutePath)
+ Map("logDirectory" -> dir.getAbsolutePath).asJava
)
}
/* gauges for tracking the number of uncleanable bytes from uncleanable
partitions for each log directory */
for (dir <- logDirs) {
- newGauge("uncleanable-bytes",
+ metricsGroup.newGauge("uncleanable-bytes",
() => inLock(lock) {
uncleanablePartitions.get(dir.getAbsolutePath) match {
case Some(partitions) =>
@@ -114,17 +116,17 @@ private[log] class LogCleanerManager(val logDirs:
Seq[File],
case None => 0
}
},
- Map("logDirectory" -> dir.getAbsolutePath)
+ Map("logDirectory" -> dir.getAbsolutePath).asJava
)
}
/* a gauge for tracking the cleanable ratio of the dirtiest log */
@volatile private var dirtiestLogCleanableRatio = 0.0
- newGauge("max-dirty-percent", () => (100 * dirtiestLogCleanableRatio).toInt)
+ metricsGroup.newGauge("max-dirty-percent", () => (100 *
dirtiestLogCleanableRatio).toInt)
/* a gauge for tracking the time since the last log cleaner run, in milli
seconds */
@volatile private var timeOfLastRun: Long = Time.SYSTEM.milliseconds
- newGauge("time-since-last-run-ms", () => Time.SYSTEM.milliseconds -
timeOfLastRun)
+ metricsGroup.newGauge("time-since-last-run-ms", () =>
Time.SYSTEM.milliseconds - timeOfLastRun)
/**
* @return the position processed for all logs.
diff --git a/core/src/main/scala/kafka/log/LogManager.scala
b/core/src/main/scala/kafka/log/LogManager.scala
index d2b9ec56f08..381872acd2f 100755
--- a/core/src/main/scala/kafka/log/LogManager.scala
+++ b/core/src/main/scala/kafka/log/LogManager.scala
@@ -23,7 +23,6 @@ import java.io._
import java.nio.file.Files
import java.util.concurrent._
import java.util.concurrent.atomic.AtomicInteger
-import kafka.metrics.KafkaMetricsGroup
import kafka.server.checkpoints.OffsetCheckpointFile
import kafka.server.metadata.ConfigRepository
import kafka.server._
@@ -42,6 +41,7 @@ import org.apache.kafka.common.config.TopicConfig
import java.util.Properties
import org.apache.kafka.server.common.MetadataVersion
import org.apache.kafka.storage.internals.log.LogConfig.MessageFormatVersion
+import org.apache.kafka.server.metrics.KafkaMetricsGroup
import org.apache.kafka.server.util.Scheduler
import org.apache.kafka.storage.internals.log.{CleanerConfig, LogConfig,
LogDirFailureChannel, ProducerStateManagerConfig}
@@ -76,10 +76,12 @@ class LogManager(logDirs: Seq[File],
brokerTopicStats: BrokerTopicStats,
logDirFailureChannel: LogDirFailureChannel,
time: Time,
- val keepPartitionMetadataFile: Boolean) extends Logging with
KafkaMetricsGroup {
+ val keepPartitionMetadataFile: Boolean) extends Logging {
import LogManager._
+ private val metricsGroup = new KafkaMetricsGroup(this.getClass)
+
val InitialTaskDelayMs = 30 * 1000
private val logCreationOrDeletionLock = new Object
@@ -132,12 +134,12 @@ class LogManager(logDirs: Seq[File],
@volatile private var _cleaner: LogCleaner = _
private[kafka] def cleaner: LogCleaner = _cleaner
- newGauge("OfflineLogDirectoryCount", () => offlineLogDirs.size)
+ metricsGroup.newGauge("OfflineLogDirectoryCount", () => offlineLogDirs.size)
for (dir <- logDirs) {
- newGauge("LogDirectoryOffline",
+ metricsGroup.newGauge("LogDirectoryOffline",
() => if (_liveLogDirs.contains(dir)) 0 else 1,
- Map("logDirectory" -> dir.getAbsolutePath))
+ Map("logDirectory" -> dir.getAbsolutePath).asJava)
}
/**
@@ -471,12 +473,12 @@ class LogManager(logDirs: Seq[File],
numRemainingSegments:
ConcurrentMap[String, Int]): Unit = {
debug("Adding log recovery metrics")
for (dir <- logDirs) {
- newGauge("remainingLogsToRecover", () =>
numRemainingLogs.get(dir.getAbsolutePath),
- Map("dir" -> dir.getAbsolutePath))
+ metricsGroup.newGauge("remainingLogsToRecover", () =>
numRemainingLogs.get(dir.getAbsolutePath),
+ Map("dir" -> dir.getAbsolutePath).asJava)
for (i <- 0 until numRecoveryThreadsPerDataDir) {
val threadName = logRecoveryThreadName(dir.getAbsolutePath, i)
- newGauge("remainingSegmentsToRecover", () =>
numRemainingSegments.get(threadName),
- Map("dir" -> dir.getAbsolutePath, "threadNum" -> i.toString))
+ metricsGroup.newGauge("remainingSegmentsToRecover", () =>
numRemainingSegments.get(threadName),
+ Map("dir" -> dir.getAbsolutePath, "threadNum" -> i.toString).asJava)
}
}
}
@@ -484,9 +486,9 @@ class LogManager(logDirs: Seq[File],
private[log] def removeLogRecoveryMetrics(): Unit = {
debug("Removing log recovery metrics")
for (dir <- logDirs) {
- removeMetric("remainingLogsToRecover", Map("dir" -> dir.getAbsolutePath))
+ metricsGroup.removeMetric("remainingLogsToRecover", Map("dir" ->
dir.getAbsolutePath).asJava)
for (i <- 0 until numRecoveryThreadsPerDataDir) {
- removeMetric("remainingSegmentsToRecover", Map("dir" ->
dir.getAbsolutePath, "threadNum" -> i.toString))
+ metricsGroup.removeMetric("remainingSegmentsToRecover", Map("dir" ->
dir.getAbsolutePath, "threadNum" -> i.toString).asJava)
}
}
}
@@ -575,9 +577,9 @@ class LogManager(logDirs: Seq[File],
def shutdown(): Unit = {
info("Shutting down.")
- removeMetric("OfflineLogDirectoryCount")
+ metricsGroup.removeMetric("OfflineLogDirectoryCount")
for (dir <- logDirs) {
- removeMetric("LogDirectoryOffline", Map("logDirectory" ->
dir.getAbsolutePath))
+ metricsGroup.removeMetric("LogDirectoryOffline", Map("logDirectory" ->
dir.getAbsolutePath).asJava)
}
val threadPools = ArrayBuffer.empty[ExecutorService]
diff --git a/core/src/main/scala/kafka/log/LogSegment.scala
b/core/src/main/scala/kafka/log/LogSegment.scala
index 7b2215de911..21a2a8c58fd 100644
--- a/core/src/main/scala/kafka/log/LogSegment.scala
+++ b/core/src/main/scala/kafka/log/LogSegment.scala
@@ -18,13 +18,13 @@ package kafka.log
import com.yammer.metrics.core.Timer
import kafka.common.LogSegmentOffsetOverflowException
-import kafka.metrics.KafkaMetricsGroup
import kafka.utils._
import org.apache.kafka.common.InvalidRecordException
import org.apache.kafka.common.errors.CorruptRecordException
import org.apache.kafka.common.record.FileRecords.{LogOffsetPosition,
TimestampAndOffset}
import org.apache.kafka.common.record._
import org.apache.kafka.common.utils.{BufferSupplier, Time, Utils}
+import org.apache.kafka.server.metrics.KafkaMetricsGroup
import org.apache.kafka.storage.internals.epoch.LeaderEpochFileCache
import org.apache.kafka.storage.internals.log.{AbortedTxn, AppendOrigin,
CompletedTxn, FetchDataInfo, LazyIndex, LogConfig, LogOffsetMetadata,
OffsetIndex, OffsetPosition, ProducerStateManager, RollParams, TimeIndex,
TimestampOffset, TransactionIndex, TxnIndexSearchResult}
@@ -688,6 +688,7 @@ object LogSegment {
}
}
-object LogFlushStats extends KafkaMetricsGroup {
- val logFlushTimer: Timer = newTimer("LogFlushRateAndTimeMs",
TimeUnit.MILLISECONDS, TimeUnit.SECONDS)
+object LogFlushStats {
+ private val metricsGroup = new KafkaMetricsGroup(LogFlushStats.getClass)
+ val logFlushTimer: Timer = metricsGroup.newTimer("LogFlushRateAndTimeMs",
TimeUnit.MILLISECONDS, TimeUnit.SECONDS)
}
diff --git a/core/src/main/scala/kafka/log/UnifiedLog.scala
b/core/src/main/scala/kafka/log/UnifiedLog.scala
index 1d001d87e3c..4463c2f2096 100644
--- a/core/src/main/scala/kafka/log/UnifiedLog.scala
+++ b/core/src/main/scala/kafka/log/UnifiedLog.scala
@@ -20,7 +20,6 @@ package kafka.log
import com.yammer.metrics.core.MetricName
import kafka.common.{OffsetsOutOfOrderException,
UnexpectedAppendOffsetException}
import kafka.log.remote.RemoteLogManager
-import kafka.metrics.KafkaMetricsGroup
import kafka.server.{BrokerTopicMetrics, BrokerTopicStats,
PartitionMetadataFile, RequestLocal}
import kafka.utils._
import org.apache.kafka.common.errors._
@@ -36,6 +35,7 @@ import org.apache.kafka.common.{InvalidRecordException,
KafkaException, TopicPar
import org.apache.kafka.server.common.{MetadataVersion, OffsetAndEpoch}
import org.apache.kafka.server.common.MetadataVersion.IBP_0_10_0_IV0
import
org.apache.kafka.server.log.remote.metadata.storage.TopicBasedRemoteLogMetadataManagerConfig
+import org.apache.kafka.server.metrics.KafkaMetricsGroup
import org.apache.kafka.server.record.BrokerCompressionType
import org.apache.kafka.server.util.Scheduler
import org.apache.kafka.storage.internals.checkpoint.LeaderEpochCheckpointFile
@@ -44,6 +44,7 @@ import org.apache.kafka.storage.internals.log.{AbortedTxn,
AppendOrigin, BatchMe
import java.io.{File, IOException}
import java.nio.file.Files
+import java.util
import java.util.concurrent.{ConcurrentHashMap, ConcurrentMap}
import java.util.{Collections, Optional, OptionalInt, OptionalLong}
import scala.annotation.nowarn
@@ -105,10 +106,17 @@ class UnifiedLog(@volatile var logStartOffset: Long,
val keepPartitionMetadataFile: Boolean,
val remoteStorageSystemEnable: Boolean = false,
remoteLogManager: Option[RemoteLogManager] = None,
- @volatile private var logOffsetsListener: LogOffsetsListener
= LogOffsetsListener.NO_OP_OFFSETS_LISTENER) extends Logging with
KafkaMetricsGroup {
+ @volatile private var logOffsetsListener: LogOffsetsListener
= LogOffsetsListener.NO_OP_OFFSETS_LISTENER) extends Logging {
import kafka.log.UnifiedLog._
+ private val metricsGroup = new KafkaMetricsGroup(this.getClass) {
+ // For compatibility, metrics are defined to be under `Log` class
+ override def metricName(name: String, tags: util.Map[String, String]):
MetricName = {
+ KafkaMetricsGroup.explicitMetricName(getClass.getPackage.getName, "Log",
name, tags)
+ }
+ }
+
this.logIdent = s"[UnifiedLog partition=$topicPartition, dir=$parentDir] "
/* A lock that guards all modifications to the log */
@@ -420,16 +428,16 @@ class UnifiedLog(@volatile var logStartOffset: Long,
}
- private var metricNames: Map[String, Map[String, String]] = Map.empty
+ private var metricNames: Map[String, java.util.Map[String, String]] =
Map.empty
newMetrics()
private[log] def newMetrics(): Unit = {
- val tags = Map("topic" -> topicPartition.topic, "partition" ->
topicPartition.partition.toString) ++
- (if (isFuture) Map("is-future" -> "true") else Map.empty)
- newGauge(LogMetricNames.NumLogSegments, () => numberOfSegments, tags)
- newGauge(LogMetricNames.LogStartOffset, () => logStartOffset, tags)
- newGauge(LogMetricNames.LogEndOffset, () => logEndOffset, tags)
- newGauge(LogMetricNames.Size, () => size, tags)
+ val tags = (Map("topic" -> topicPartition.topic, "partition" ->
topicPartition.partition.toString) ++
+ (if (isFuture) Map("is-future" -> "true") else Map.empty)).asJava
+ metricsGroup.newGauge(LogMetricNames.NumLogSegments, () =>
numberOfSegments, tags)
+ metricsGroup.newGauge(LogMetricNames.LogStartOffset, () => logStartOffset,
tags)
+ metricsGroup.newGauge(LogMetricNames.LogEndOffset, () => logEndOffset,
tags)
+ metricsGroup.newGauge(LogMetricNames.Size, () => size, tags)
metricNames = Map(LogMetricNames.NumLogSegments -> tags,
LogMetricNames.LogStartOffset -> tags,
LogMetricNames.LogEndOffset -> tags,
@@ -447,13 +455,6 @@ class UnifiedLog(@volatile var logStartOffset: Long,
}
}
- // For compatibility, metrics are defined to be under `Log` class
- override def metricName(name: String, tags: scala.collection.Map[String,
String]): MetricName = {
- val pkg = getClass.getPackage
- val pkgStr = if (pkg == null) "" else pkg.getName
- explicitMetricName(pkgStr, "Log", name, tags)
- }
-
def loadProducerState(lastOffset: Long): Unit = lock synchronized {
rebuildProducerState(lastOffset, producerStateManager)
maybeIncrementFirstUnstableOffset()
@@ -1699,7 +1700,7 @@ class UnifiedLog(@volatile var logStartOffset: Long,
*/
private[log] def removeLogMetrics(): Unit = {
metricNames.foreach {
- case (name, tags) => removeMetric(name, tags)
+ case (name, tags) => metricsGroup.removeMetric(name, tags)
}
metricNames = Map.empty
}
diff --git a/core/src/main/scala/kafka/log/remote/RemoteLogManager.scala
b/core/src/main/scala/kafka/log/remote/RemoteLogManager.scala
index 8394c74afbd..a0fa0058000 100644
--- a/core/src/main/scala/kafka/log/remote/RemoteLogManager.scala
+++ b/core/src/main/scala/kafka/log/remote/RemoteLogManager.scala
@@ -17,7 +17,6 @@
package kafka.log.remote
import kafka.cluster.Partition
-import kafka.metrics.KafkaMetricsGroup
import kafka.server.KafkaConfig
import kafka.utils.Logging
import org.apache.kafka.common._
@@ -48,7 +47,7 @@ import scala.jdk.CollectionConverters._
*/
class RemoteLogManager(rlmConfig: RemoteLogManagerConfig,
brokerId: Int,
- logDir: String) extends Logging with Closeable with
KafkaMetricsGroup {
+ logDir: String) extends Logging with Closeable {
// topic ids received on leadership changes
private val topicPartitionIds: ConcurrentMap[TopicPartition, Uuid] = new
ConcurrentHashMap[TopicPartition, Uuid]()
diff --git a/core/src/main/scala/kafka/metrics/KafkaMetricsGroup.scala
b/core/src/main/scala/kafka/metrics/KafkaMetricsGroup.scala
deleted file mode 100644
index 8a9ed89f3fc..00000000000
--- a/core/src/main/scala/kafka/metrics/KafkaMetricsGroup.scala
+++ /dev/null
@@ -1,111 +0,0 @@
-/**
- * Licensed to the Apache Software Foundation (ASF) under one or more
- * contributor license agreements. See the NOTICE file distributed with
- * this work for additional information regarding copyright ownership.
- * The ASF licenses this file to You under the Apache License, Version 2.0
- * (the "License"); you may not use this file except in compliance with
- * the License. You may obtain a copy of the License at
- *
- * http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
- */
-
-package kafka.metrics
-
-import java.util.concurrent.TimeUnit
-
-import com.yammer.metrics.core.{Gauge, Histogram, Meter, MetricName, Timer}
-import kafka.utils.Logging
-import org.apache.kafka.common.utils.Sanitizer
-import org.apache.kafka.server.metrics.KafkaYammerMetrics
-
-trait KafkaMetricsGroup extends Logging {
-
- /**
- * Creates a new MetricName object for gauges, meters, etc. created for this
- * metrics group.
- * @param name Descriptive name of the metric.
- * @param tags Additional attributes which mBean will have.
- * @return Sanitized metric name object.
- */
- def metricName(name: String, tags: scala.collection.Map[String, String]):
MetricName = {
- val klass = this.getClass
- val pkg = if (klass.getPackage == null) "" else klass.getPackage.getName
- val simpleName = klass.getSimpleName.replaceAll("\\$$", "")
-
- explicitMetricName(pkg, simpleName, name, tags)
- }
-
-
- def explicitMetricName(group: String, typeName: String, name: String,
- tags: scala.collection.Map[String,
String]): MetricName = {
-
- val nameBuilder: StringBuilder = new StringBuilder
-
- nameBuilder.append(group)
-
- nameBuilder.append(":type=")
-
- nameBuilder.append(typeName)
-
- if (name.nonEmpty) {
- nameBuilder.append(",name=")
- nameBuilder.append(name)
- }
-
- val scope: String = toScope(tags).orNull
- val tagsName = toMBeanName(tags)
- tagsName.foreach(nameBuilder.append(",").append(_))
-
- new MetricName(group, typeName, name, scope, nameBuilder.toString)
- }
-
- def newGauge[T](name: String, metric: Gauge[T], tags:
scala.collection.Map[String, String] = Map.empty): Gauge[T] =
- KafkaYammerMetrics.defaultRegistry().newGauge(metricName(name, tags),
metric)
-
- def newMeter(name: String, eventType: String, timeUnit: TimeUnit, tags:
scala.collection.Map[String, String] = Map.empty): Meter =
- KafkaYammerMetrics.defaultRegistry().newMeter(metricName(name, tags),
eventType, timeUnit)
-
- def newMeter(metricName: MetricName, eventType: String, timeUnit: TimeUnit):
Meter =
- KafkaYammerMetrics.defaultRegistry().newMeter(metricName, eventType,
timeUnit)
-
- def newHistogram(name: String, biased: Boolean = true, tags:
scala.collection.Map[String, String] = Map.empty): Histogram =
- KafkaYammerMetrics.defaultRegistry().newHistogram(metricName(name, tags),
biased)
-
- def newTimer(name: String, durationUnit: TimeUnit, rateUnit: TimeUnit, tags:
scala.collection.Map[String, String] = Map.empty): Timer =
- KafkaYammerMetrics.defaultRegistry().newTimer(metricName(name, tags),
durationUnit, rateUnit)
-
- def removeMetric(name: String, tags: scala.collection.Map[String, String] =
Map.empty): Unit =
- KafkaYammerMetrics.defaultRegistry().removeMetric(metricName(name, tags))
-
- private def toMBeanName(tags: collection.Map[String, String]):
Option[String] = {
- val filteredTags = tags.filter { case (_, tagValue) => tagValue != "" }
- if (filteredTags.nonEmpty) {
- val tagsString = filteredTags.map { case (key, value) =>
"%s=%s".format(key, Sanitizer.jmxSanitize(value)) }.mkString(",")
- Some(tagsString)
- }
- else None
- }
-
- private def toScope(tags: collection.Map[String, String]): Option[String] = {
- val filteredTags = tags.filter { case (_, tagValue) => tagValue != ""}
- if (filteredTags.nonEmpty) {
- // convert dot to _ since reporters like Graphite typically use dot to
represent hierarchy
- val tagsString = filteredTags
- .toList.sortBy(_._1)
- .map { case (key, value) => "%s.%s".format(key,
value.replaceAll("\\.", "_"))}
- .mkString(".")
-
- Some(tagsString)
- }
- else None
- }
-
-}
-
-object KafkaMetricsGroup extends KafkaMetricsGroup
diff --git a/core/src/main/scala/kafka/network/RequestChannel.scala
b/core/src/main/scala/kafka/network/RequestChannel.scala
index 8686d15a1d3..0c5cc373f5d 100644
--- a/core/src/main/scala/kafka/network/RequestChannel.scala
+++ b/core/src/main/scala/kafka/network/RequestChannel.scala
@@ -23,7 +23,6 @@ import java.util.concurrent._
import com.fasterxml.jackson.databind.JsonNode
import com.typesafe.scalalogging.Logger
import com.yammer.metrics.core.Meter
-import kafka.metrics.KafkaMetricsGroup
import kafka.network
import kafka.server.KafkaConfig
import kafka.utils.{Logging, NotNothing, Pool}
@@ -37,7 +36,9 @@ import org.apache.kafka.common.protocol.{ApiKeys, Errors}
import org.apache.kafka.common.requests._
import org.apache.kafka.common.security.auth.KafkaPrincipal
import org.apache.kafka.common.utils.{Sanitizer, Time}
+import org.apache.kafka.server.metrics.KafkaMetricsGroup
+import java.util
import scala.annotation.nowarn
import scala.collection.mutable
import scala.jdk.CollectionConverters._
@@ -344,16 +345,19 @@ object RequestChannel extends Logging {
class RequestChannel(val queueSize: Int,
val metricNamePrefix: String,
time: Time,
- val metrics: RequestChannel.Metrics) extends
KafkaMetricsGroup {
+ val metrics: RequestChannel.Metrics) {
import RequestChannel._
+
+ private val metricsGroup = new KafkaMetricsGroup(this.getClass)
+
private val requestQueue = new ArrayBlockingQueue[BaseRequest](queueSize)
private val processors = new ConcurrentHashMap[Int, Processor]()
val requestQueueSizeMetricName =
metricNamePrefix.concat(RequestQueueSizeMetric)
val responseQueueSizeMetricName =
metricNamePrefix.concat(ResponseQueueSizeMetric)
- newGauge(requestQueueSizeMetricName, () => requestQueue.size)
+ metricsGroup.newGauge(requestQueueSizeMetricName, () => requestQueue.size)
- newGauge(responseQueueSizeMetricName, () => {
+ metricsGroup.newGauge(responseQueueSizeMetricName, () => {
processors.values.asScala.foldLeft(0) {(total, processor) =>
total + processor.responseQueueSize
}
@@ -363,13 +367,13 @@ class RequestChannel(val queueSize: Int,
if (processors.putIfAbsent(processor.id, processor) != null)
warn(s"Unexpected processor with processorId ${processor.id}")
- newGauge(responseQueueSizeMetricName, () => processor.responseQueueSize,
- Map(ProcessorMetricTag -> processor.id.toString))
+ metricsGroup.newGauge(responseQueueSizeMetricName, () =>
processor.responseQueueSize,
+ Map(ProcessorMetricTag -> processor.id.toString).asJava)
}
def removeProcessor(processorId: Int): Unit = {
processors.remove(processorId)
- removeMetric(responseQueueSizeMetricName, Map(ProcessorMetricTag ->
processorId.toString))
+ metricsGroup.removeMetric(responseQueueSizeMetricName,
Map(ProcessorMetricTag -> processorId.toString).asJava)
}
/** Send a request to be handled, potentially blocking until there is room
in the queue for the request */
@@ -497,51 +501,59 @@ object RequestMetrics {
val ErrorsPerSec = "ErrorsPerSec"
}
-class RequestMetrics(name: String) extends KafkaMetricsGroup {
+class RequestMetrics(name: String) {
import RequestMetrics._
- val tags = Map("request" -> name)
+ private val metricsGroup = new KafkaMetricsGroup(this.getClass)
+
+ val tags = Map("request" -> name).asJava
val requestRateInternal = new Pool[Short, Meter]()
// time a request spent in a request queue
- val requestQueueTimeHist = newHistogram(RequestQueueTimeMs, biased = true,
tags)
+ val requestQueueTimeHist = metricsGroup.newHistogram(RequestQueueTimeMs,
true, tags)
// time a request takes to be processed at the local broker
- val localTimeHist = newHistogram(LocalTimeMs, biased = true, tags)
+ val localTimeHist = metricsGroup.newHistogram(LocalTimeMs, true, tags)
// time a request takes to wait on remote brokers (currently only relevant
to fetch and produce requests)
- val remoteTimeHist = newHistogram(RemoteTimeMs, biased = true, tags)
+ val remoteTimeHist = metricsGroup.newHistogram(RemoteTimeMs, true, tags)
// time a request is throttled, not part of the request processing time
(throttling is done at the client level
// for clients that support KIP-219 and by muting the channel for the rest)
- val throttleTimeHist = newHistogram(ThrottleTimeMs, biased = true, tags)
+ val throttleTimeHist = metricsGroup.newHistogram(ThrottleTimeMs, true, tags)
// time a response spent in a response queue
- val responseQueueTimeHist = newHistogram(ResponseQueueTimeMs, biased = true,
tags)
+ val responseQueueTimeHist = metricsGroup.newHistogram(ResponseQueueTimeMs,
true, tags)
// time to send the response to the requester
- val responseSendTimeHist = newHistogram(ResponseSendTimeMs, biased = true,
tags)
- val totalTimeHist = newHistogram(TotalTimeMs, biased = true, tags)
+ val responseSendTimeHist = metricsGroup.newHistogram(ResponseSendTimeMs,
true, tags)
+ val totalTimeHist = metricsGroup.newHistogram(TotalTimeMs, true, tags)
// request size in bytes
- val requestBytesHist = newHistogram(RequestBytes, biased = true, tags)
+ val requestBytesHist = metricsGroup.newHistogram(RequestBytes, true, tags)
// time for message conversions (only relevant to fetch and produce requests)
val messageConversionsTimeHist =
if (name == ApiKeys.FETCH.name || name == ApiKeys.PRODUCE.name)
- Some(newHistogram(MessageConversionsTimeMs, biased = true, tags))
+ Some(metricsGroup.newHistogram(MessageConversionsTimeMs, true, tags))
else
None
// Temporary memory allocated for processing request (only populated for
fetch and produce requests)
// This shows the memory allocated for compression/conversions excluding the
actual request size
val tempMemoryBytesHist =
if (name == ApiKeys.FETCH.name || name == ApiKeys.PRODUCE.name)
- Some(newHistogram(TemporaryMemoryBytes, biased = true, tags))
+ Some(metricsGroup.newHistogram(TemporaryMemoryBytes, true, tags))
else
None
private val errorMeters = mutable.Map[Errors, ErrorMeter]()
Errors.values.foreach(error => errorMeters.put(error, new ErrorMeter(name,
error)))
- def requestRate(version: Short): Meter = {
- requestRateInternal.getAndMaybePut(version, newMeter(RequestsPerSec,
"requests", TimeUnit.SECONDS, tags + ("version" -> version.toString)))
+ def requestRate(version: Short): Meter =
+ requestRateInternal.getAndMaybePut(version,
metricsGroup.newMeter(RequestsPerSec, "requests", TimeUnit.SECONDS,
tagsWithVersion(version)))
+
+ private def tagsWithVersion(version: Short): java.util.Map[String, String] =
{
+ val nameAndVersionTags = new util.HashMap[String, String](tags.size() + 1)
+ nameAndVersionTags.putAll(tags)
+ nameAndVersionTags.put("version", version.toString)
+ nameAndVersionTags
}
class ErrorMeter(name: String, error: Errors) {
- private val tags = Map("request" -> name, "error" -> error.name)
+ private val tags = Map("request" -> name, "error" -> error.name).asJava
@volatile private var meter: Meter = _
@@ -551,7 +563,7 @@ class RequestMetrics(name: String) extends
KafkaMetricsGroup {
else {
synchronized {
if (meter == null)
- meter = newMeter(ErrorsPerSec, "requests", TimeUnit.SECONDS, tags)
+ meter = metricsGroup.newMeter(ErrorsPerSec, "requests",
TimeUnit.SECONDS, tags)
meter
}
}
@@ -560,7 +572,7 @@ class RequestMetrics(name: String) extends
KafkaMetricsGroup {
def removeMeter(): Unit = {
synchronized {
if (meter != null) {
- removeMetric(ErrorsPerSec, tags)
+ metricsGroup.removeMetric(ErrorsPerSec, tags)
meter = null
}
}
@@ -572,19 +584,21 @@ class RequestMetrics(name: String) extends
KafkaMetricsGroup {
}
def removeMetrics(): Unit = {
- for (version <- requestRateInternal.keys) removeMetric(RequestsPerSec,
tags + ("version" -> version.toString))
- removeMetric(RequestQueueTimeMs, tags)
- removeMetric(LocalTimeMs, tags)
- removeMetric(RemoteTimeMs, tags)
- removeMetric(RequestsPerSec, tags)
- removeMetric(ThrottleTimeMs, tags)
- removeMetric(ResponseQueueTimeMs, tags)
- removeMetric(TotalTimeMs, tags)
- removeMetric(ResponseSendTimeMs, tags)
- removeMetric(RequestBytes, tags)
+ for (version <- requestRateInternal.keys) {
+ metricsGroup.removeMetric(RequestsPerSec, tagsWithVersion(version))
+ }
+ metricsGroup.removeMetric(RequestQueueTimeMs, tags)
+ metricsGroup.removeMetric(LocalTimeMs, tags)
+ metricsGroup.removeMetric(RemoteTimeMs, tags)
+ metricsGroup.removeMetric(RequestsPerSec, tags)
+ metricsGroup.removeMetric(ThrottleTimeMs, tags)
+ metricsGroup.removeMetric(ResponseQueueTimeMs, tags)
+ metricsGroup.removeMetric(TotalTimeMs, tags)
+ metricsGroup.removeMetric(ResponseSendTimeMs, tags)
+ metricsGroup.removeMetric(RequestBytes, tags)
if (name == ApiKeys.FETCH.name || name == ApiKeys.PRODUCE.name) {
- removeMetric(MessageConversionsTimeMs, tags)
- removeMetric(TemporaryMemoryBytes, tags)
+ metricsGroup.removeMetric(MessageConversionsTimeMs, tags)
+ metricsGroup.removeMetric(TemporaryMemoryBytes, tags)
}
errorMeters.values.foreach(_.removeMeter())
errorMeters.clear()
diff --git a/core/src/main/scala/kafka/network/SocketServer.scala
b/core/src/main/scala/kafka/network/SocketServer.scala
index 6807a2a82fc..feca5fc68b4 100644
--- a/core/src/main/scala/kafka/network/SocketServer.scala
+++ b/core/src/main/scala/kafka/network/SocketServer.scala
@@ -25,7 +25,6 @@ import java.util
import java.util.concurrent._
import java.util.concurrent.atomic._
import kafka.cluster.{BrokerEndPoint, EndPoint}
-import kafka.metrics.KafkaMetricsGroup
import kafka.network.ConnectionQuotas._
import kafka.network.Processor._
import kafka.network.RequestChannel.{CloseConnectionResponse,
EndThrottlingResponse, NoOpResponse, SendResponse, StartThrottlingResponse}
@@ -47,6 +46,7 @@ import org.apache.kafka.common.requests.{ApiVersionsRequest,
RequestContext, Req
import org.apache.kafka.common.security.auth.SecurityProtocol
import org.apache.kafka.common.utils.{KafkaThread, LogContext, Time, Utils}
import org.apache.kafka.common.{Endpoint, KafkaException, MetricName,
Reconfigurable}
+import org.apache.kafka.server.metrics.KafkaMetricsGroup
import org.apache.kafka.server.util.FutureUtils
import org.slf4j.event.Level
@@ -78,7 +78,9 @@ class SocketServer(val config: KafkaConfig,
val time: Time,
val credentialProvider: CredentialProvider,
val apiVersionManager: ApiVersionManager)
- extends Logging with KafkaMetricsGroup with BrokerReconfigurable {
+ extends Logging with BrokerReconfigurable {
+
+ private val metricsGroup = new KafkaMetricsGroup(this.getClass)
private val maxQueuedRequests = config.queuedMaxRequests
@@ -115,7 +117,7 @@ class SocketServer(val config: KafkaConfig,
private var stopped = false
// Socket server metrics
- newGauge(s"${DataPlaneAcceptor.MetricPrefix}NetworkProcessorAvgIdlePercent",
() => SocketServer.this.synchronized {
+
metricsGroup.newGauge(s"${DataPlaneAcceptor.MetricPrefix}NetworkProcessorAvgIdlePercent",
() => SocketServer.this.synchronized {
val dataPlaneProcessors = dataPlaneAcceptors.asScala.values.flatMap(a =>
a.processors)
val ioWaitRatioMetricNames = dataPlaneProcessors.map { p =>
metrics.metricName("io-wait-ratio", MetricsGroup, p.metricTags)
@@ -129,7 +131,7 @@ class SocketServer(val config: KafkaConfig,
}
})
if (config.requiresZookeeper) {
-
newGauge(s"${ControlPlaneAcceptor.MetricPrefix}NetworkProcessorAvgIdlePercent",
() => SocketServer.this.synchronized {
+
metricsGroup.newGauge(s"${ControlPlaneAcceptor.MetricPrefix}NetworkProcessorAvgIdlePercent",
() => SocketServer.this.synchronized {
val controlPlaneProcessorOpt = controlPlaneAcceptorOpt.map(a =>
a.processors(0))
val ioWaitRatioMetricName = controlPlaneProcessorOpt.map { p =>
metrics.metricName("io-wait-ratio", MetricsGroup, p.metricTags)
@@ -139,9 +141,9 @@ class SocketServer(val config: KafkaConfig,
}.getOrElse(Double.NaN)
})
}
- newGauge("MemoryPoolAvailable", () => memoryPool.availableMemory)
- newGauge("MemoryPoolUsed", () => memoryPool.size() -
memoryPool.availableMemory)
- newGauge(s"${DataPlaneAcceptor.MetricPrefix}ExpiredConnectionsKilledCount",
() => SocketServer.this.synchronized {
+ metricsGroup.newGauge("MemoryPoolAvailable", () =>
memoryPool.availableMemory)
+ metricsGroup.newGauge("MemoryPoolUsed", () => memoryPool.size() -
memoryPool.availableMemory)
+
metricsGroup.newGauge(s"${DataPlaneAcceptor.MetricPrefix}ExpiredConnectionsKilledCount",
() => SocketServer.this.synchronized {
val dataPlaneProcessors = dataPlaneAcceptors.asScala.values.flatMap(a =>
a.processors)
val expiredConnectionsKilledCountMetricNames = dataPlaneProcessors.map { p
=>
metrics.metricName("expired-connections-killed-count", MetricsGroup,
p.metricTags)
@@ -151,7 +153,7 @@ class SocketServer(val config: KafkaConfig,
}.sum
})
if (config.requiresZookeeper) {
-
newGauge(s"${ControlPlaneAcceptor.MetricPrefix}ExpiredConnectionsKilledCount",
() => SocketServer.this.synchronized {
+
metricsGroup.newGauge(s"${ControlPlaneAcceptor.MetricPrefix}ExpiredConnectionsKilledCount",
() => SocketServer.this.synchronized {
val controlPlaneProcessorOpt = controlPlaneAcceptorOpt.map(a =>
a.processors(0))
val expiredConnectionsKilledCountMetricNames =
controlPlaneProcessorOpt.map { p =>
metrics.metricName("expired-connections-killed-count", MetricsGroup,
p.metricTags)
@@ -579,7 +581,9 @@ private[kafka] abstract class Acceptor(val socketServer:
SocketServer,
logContext: LogContext,
memoryPool: MemoryPool,
apiVersionManager: ApiVersionManager)
- extends Runnable with Logging with KafkaMetricsGroup {
+ extends Runnable with Logging {
+
+ private val metricsGroup = new KafkaMetricsGroup(this.getClass)
val shouldRun = new AtomicBoolean(true)
@@ -608,12 +612,12 @@ private[kafka] abstract class Acceptor(val socketServer:
SocketServer,
private[network] val processors = new ArrayBuffer[Processor]()
// Build the metric name explicitly in order to keep the existing name for
compatibility
- private val blockedPercentMeterMetricName = explicitMetricName(
+ private val blockedPercentMeterMetricName =
KafkaMetricsGroup.explicitMetricName(
"kafka.network",
"Acceptor",
s"${metricPrefix()}AcceptorBlockedPercent",
- Map(ListenerMetricTag -> endPoint.listenerName.value))
- private val blockedPercentMeter =
newMeter(blockedPercentMeterMetricName,"blocked time", TimeUnit.NANOSECONDS)
+ Map(ListenerMetricTag -> endPoint.listenerName.value).asJava)
+ private val blockedPercentMeter =
metricsGroup.newMeter(blockedPercentMeterMetricName,"blocked time",
TimeUnit.NANOSECONDS)
private var currentProcessorIndex = 0
private[network] val throttledSockets = new
mutable.PriorityQueue[DelayedCloseSocket]()
private var started = false
@@ -911,7 +915,9 @@ private[kafka] class Processor(
isPrivilegedListener: Boolean,
apiVersionManager: ApiVersionManager,
threadName: String
-) extends Runnable with KafkaMetricsGroup {
+) extends Runnable with Logging {
+ private val metricsGroup = new KafkaMetricsGroup(this.getClass)
+
val shouldRun = new AtomicBoolean(true)
val thread = KafkaThread.nonDaemon(threadName, this)
@@ -940,13 +946,13 @@ private[kafka] class Processor(
NetworkProcessorMetricTag -> id.toString
).asJava
- newGauge(IdlePercentMetricName, () => {
+ metricsGroup.newGauge(IdlePercentMetricName, () => {
Option(metrics.metric(metrics.metricName("io-wait-ratio", MetricsGroup,
metricTags))).fold(0.0)(m =>
Math.min(m.metricValue.asInstanceOf[Double], 1.0))
},
// for compatibility, only add a networkProcessor tag to the Yammer
Metrics alias (the equivalent Selector metric
// also includes the listener name)
- Map(NetworkProcessorMetricTag -> id.toString)
+ Map(NetworkProcessorMetricTag -> id.toString).asJava
)
val expiredConnectionsKilledCount = new CumulativeSum()
@@ -1299,7 +1305,7 @@ private[kafka] class Processor(
close(channel.id)
}
selector.close()
- removeMetric(IdlePercentMetricName, Map(NetworkProcessorMetricTag ->
id.toString))
+ metricsGroup.removeMetric(IdlePercentMetricName,
Map(NetworkProcessorMetricTag -> id.toString).asJava)
}
// 'protected` to allow override for testing
@@ -1367,7 +1373,7 @@ private[kafka] class Processor(
beginShutdown()
thread.join()
} finally {
- removeMetric("IdlePercent", Map("networkProcessor" -> id.toString))
+ metricsGroup.removeMetric("IdlePercent", Map("networkProcessor" ->
id.toString).asJava)
metrics.removeMetric(expiredConnectionsKilledCountMetricName)
}
}
diff --git a/core/src/main/scala/kafka/server/AbstractFetcherManager.scala
b/core/src/main/scala/kafka/server/AbstractFetcherManager.scala
index ddc45693f87..eced213734b 100755
--- a/core/src/main/scala/kafka/server/AbstractFetcherManager.scala
+++ b/core/src/main/scala/kafka/server/AbstractFetcherManager.scala
@@ -18,16 +18,19 @@
package kafka.server
import kafka.cluster.BrokerEndPoint
-import kafka.metrics.KafkaMetricsGroup
import kafka.utils.Implicits._
import kafka.utils.Logging
import org.apache.kafka.common.{TopicPartition, Uuid}
import org.apache.kafka.common.utils.Utils
+import org.apache.kafka.server.metrics.KafkaMetricsGroup
import scala.collection.{Map, Set, mutable}
+import scala.jdk.CollectionConverters._
abstract class AbstractFetcherManager[T <: AbstractFetcherThread](val name:
String, clientId: String, numFetchers: Int)
- extends Logging with KafkaMetricsGroup {
+ extends Logging {
+ private val metricsGroup = new KafkaMetricsGroup(this.getClass)
+
// map of (source broker_id, fetcher_id per source broker) => fetcher.
// package private for test
private[server] val fetcherThreadMap = new
mutable.HashMap[BrokerIdAndFetcherId, T]
@@ -36,9 +39,9 @@ abstract class AbstractFetcherManager[T <:
AbstractFetcherThread](val name: Stri
val failedPartitions = new FailedPartitions
this.logIdent = "[" + name + "] "
- private val tags = Map("clientId" -> clientId)
+ private val tags = Map("clientId" -> clientId).asJava
- newGauge("MaxLag", () => {
+ metricsGroup.newGauge("MaxLag", () => {
// current max lag across all fetchers/topics/partitions
fetcherThreadMap.values.foldLeft(0L) { (curMaxLagAll, fetcherThread) =>
val maxLagThread =
fetcherThread.fetcherLagStats.stats.values.foldLeft(0L)((curMaxLagThread,
lagMetrics) =>
@@ -47,16 +50,16 @@ abstract class AbstractFetcherManager[T <:
AbstractFetcherThread](val name: Stri
}
}, tags)
- newGauge("MinFetchRate", () => {
+ metricsGroup.newGauge("MinFetchRate", () => {
// current min fetch rate across all fetchers/topics/partitions
val headRate =
fetcherThreadMap.values.headOption.map(_.fetcherStats.requestRate.oneMinuteRate).getOrElse(0.0)
fetcherThreadMap.values.foldLeft(headRate)((curMinAll, fetcherThread) =>
math.min(curMinAll,
fetcherThread.fetcherStats.requestRate.oneMinuteRate))
}, tags)
- newGauge("FailedPartitionsCount", () => failedPartitions.size, tags)
+ metricsGroup.newGauge("FailedPartitionsCount", () => failedPartitions.size,
tags)
- newGauge("DeadThreadCount", () => deadThreadCount, tags)
+ metricsGroup.newGauge("DeadThreadCount", () => deadThreadCount, tags)
private[server] def deadThreadCount: Int = lock synchronized {
fetcherThreadMap.values.count(_.isThreadFailed) }
diff --git a/core/src/main/scala/kafka/server/AbstractFetcherThread.scala
b/core/src/main/scala/kafka/server/AbstractFetcherThread.scala
index 2176ee3518b..a1c5d0ed2fa 100755
--- a/core/src/main/scala/kafka/server/AbstractFetcherThread.scala
+++ b/core/src/main/scala/kafka/server/AbstractFetcherThread.scala
@@ -18,7 +18,6 @@
package kafka.server
import kafka.common.ClientIdAndBroker
-import kafka.metrics.KafkaMetricsGroup
import kafka.server.AbstractFetcherThread.{ReplicaFetch, ResultWithPartitions}
import kafka.utils.CoreUtils.inLock
import kafka.utils.Implicits._
@@ -34,6 +33,7 @@ import
org.apache.kafka.common.requests.OffsetsForLeaderEpochResponse.{UNDEFINED
import org.apache.kafka.common.requests._
import org.apache.kafka.common.{InvalidRecordException, TopicPartition, Uuid}
import org.apache.kafka.server.common.OffsetAndEpoch
+import org.apache.kafka.server.metrics.KafkaMetricsGroup
import org.apache.kafka.server.util.ShutdownableThread
import org.apache.kafka.storage.internals.log.LogAppendInfo
@@ -858,15 +858,16 @@ object FetcherMetrics {
val BytesPerSec = "BytesPerSec"
}
-class FetcherLagMetrics(metricId: ClientIdTopicPartition) extends
KafkaMetricsGroup {
+class FetcherLagMetrics(metricId: ClientIdTopicPartition) {
+ private val metricsGroup = new KafkaMetricsGroup(this.getClass)
private[this] val lagVal = new AtomicLong(-1L)
private[this] val tags = Map(
"clientId" -> metricId.clientId,
"topic" -> metricId.topicPartition.topic,
- "partition" -> metricId.topicPartition.partition.toString)
+ "partition" -> metricId.topicPartition.partition.toString).asJava
- newGauge(FetcherMetrics.ConsumerLag, () => lagVal.get, tags)
+ metricsGroup.newGauge(FetcherMetrics.ConsumerLag, () => lagVal.get, tags)
def lag_=(newLag: Long): Unit = {
lagVal.set(newLag)
@@ -875,7 +876,7 @@ class FetcherLagMetrics(metricId: ClientIdTopicPartition)
extends KafkaMetricsGr
def lag = lagVal.get
def unregister(): Unit = {
- removeMetric(FetcherMetrics.ConsumerLag, tags)
+ metricsGroup.removeMetric(FetcherMetrics.ConsumerLag, tags)
}
}
@@ -899,18 +900,20 @@ class FetcherLagStats(metricId: ClientIdAndBroker) {
}
}
-class FetcherStats(metricId: ClientIdAndBroker) extends KafkaMetricsGroup {
+class FetcherStats(metricId: ClientIdAndBroker) {
+ private val metricsGroup = new KafkaMetricsGroup(this.getClass)
+
val tags = Map("clientId" -> metricId.clientId,
"brokerHost" -> metricId.brokerHost,
- "brokerPort" -> metricId.brokerPort.toString)
+ "brokerPort" -> metricId.brokerPort.toString).asJava
- val requestRate = newMeter(FetcherMetrics.RequestsPerSec, "requests",
TimeUnit.SECONDS, tags)
+ val requestRate = metricsGroup.newMeter(FetcherMetrics.RequestsPerSec,
"requests", TimeUnit.SECONDS, tags)
- val byteRate = newMeter(FetcherMetrics.BytesPerSec, "bytes",
TimeUnit.SECONDS, tags)
+ val byteRate = metricsGroup.newMeter(FetcherMetrics.BytesPerSec, "bytes",
TimeUnit.SECONDS, tags)
def unregister(): Unit = {
- removeMetric(FetcherMetrics.RequestsPerSec, tags)
- removeMetric(FetcherMetrics.BytesPerSec, tags)
+ metricsGroup.removeMetric(FetcherMetrics.RequestsPerSec, tags)
+ metricsGroup.removeMetric(FetcherMetrics.BytesPerSec, tags)
}
}
diff --git a/core/src/main/scala/kafka/server/AlterPartitionManager.scala
b/core/src/main/scala/kafka/server/AlterPartitionManager.scala
index 9f844e1958a..f078815734d 100644
--- a/core/src/main/scala/kafka/server/AlterPartitionManager.scala
+++ b/core/src/main/scala/kafka/server/AlterPartitionManager.scala
@@ -20,7 +20,6 @@ import java.util
import java.util.concurrent.atomic.AtomicBoolean
import java.util.concurrent.{CompletableFuture, ConcurrentHashMap}
import kafka.api.LeaderAndIsr
-import kafka.metrics.KafkaMetricsGroup
import kafka.utils.Logging
import kafka.zk.KafkaZkClient
import org.apache.kafka.clients.ClientResponse
@@ -123,7 +122,7 @@ class DefaultAlterPartitionManager(
val brokerId: Int,
val brokerEpochSupplier: () => Long,
val metadataVersionSupplier: () => MetadataVersion
-) extends AlterPartitionManager with Logging with KafkaMetricsGroup {
+) extends AlterPartitionManager with Logging {
// Used to allow only one pending ISR update per partition (visible for
testing).
// Note that we key items by TopicPartition despite using TopicIdPartition
while
diff --git a/core/src/main/scala/kafka/server/ControllerServer.scala
b/core/src/main/scala/kafka/server/ControllerServer.scala
index b1a15d97741..c231486e41e 100644
--- a/core/src/main/scala/kafka/server/ControllerServer.scala
+++ b/core/src/main/scala/kafka/server/ControllerServer.scala
@@ -18,7 +18,7 @@
package kafka.server
import kafka.cluster.Broker.ServerInfo
-import kafka.metrics.{KafkaMetricsGroup, LinuxIoMetricsCollector}
+import kafka.metrics.LinuxIoMetricsCollector
import kafka.migration.MigrationPropagator
import kafka.network.{DataPlaneAcceptor, SocketServer}
import kafka.raft.KafkaRaftManager
@@ -44,7 +44,7 @@ import
org.apache.kafka.metadata.migration.{KRaftMigrationDriver, LegacyPropagat
import org.apache.kafka.raft.RaftConfig
import org.apache.kafka.server.authorizer.Authorizer
import org.apache.kafka.server.common.ApiMessageAndVersion
-import org.apache.kafka.server.metrics.KafkaYammerMetrics
+import org.apache.kafka.server.metrics.{KafkaMetricsGroup, KafkaYammerMetrics}
import org.apache.kafka.server.policy.{AlterConfigPolicy, CreateTopicPolicy}
import org.apache.kafka.server.util.{Deadline, FutureUtils}
@@ -80,10 +80,12 @@ class ControllerServer(
val sharedServer: SharedServer,
val configSchema: KafkaConfigSchema,
val bootstrapMetadata: BootstrapMetadata,
-) extends Logging with KafkaMetricsGroup {
+) extends Logging {
import kafka.server.Server._
+ private val metricsGroup = new KafkaMetricsGroup(this.getClass)
+
val config = sharedServer.controllerConfig
val time = sharedServer.time
def metrics = sharedServer.metrics
@@ -134,13 +136,13 @@ class ControllerServer(
maybeChangeStatus(STARTING, STARTED)
this.logIdent = new LogContext(s"[ControllerServer id=${config.nodeId}]
").logPrefix()
- newGauge("ClusterId", () => clusterId)
- newGauge("yammer-metrics-count", () =>
KafkaYammerMetrics.defaultRegistry.allMetrics.size)
+ metricsGroup.newGauge("ClusterId", () => clusterId)
+ metricsGroup.newGauge("yammer-metrics-count", () =>
KafkaYammerMetrics.defaultRegistry.allMetrics.size)
linuxIoMetricsCollector = new LinuxIoMetricsCollector("/proc", time,
logger.underlying)
if (linuxIoMetricsCollector.usable()) {
- newGauge("linux-disk-read-bytes", () =>
linuxIoMetricsCollector.readBytes())
- newGauge("linux-disk-write-bytes", () =>
linuxIoMetricsCollector.writeBytes())
+ metricsGroup.newGauge("linux-disk-read-bytes", () =>
linuxIoMetricsCollector.readBytes())
+ metricsGroup.newGauge("linux-disk-write-bytes", () =>
linuxIoMetricsCollector.writeBytes())
}
val javaListeners = config.controllerListeners.map(_.toJava).asJava
diff --git a/core/src/main/scala/kafka/server/DelayedDeleteRecords.scala
b/core/src/main/scala/kafka/server/DelayedDeleteRecords.scala
index 317d0b89c37..ef0eceeef94 100644
--- a/core/src/main/scala/kafka/server/DelayedDeleteRecords.scala
+++ b/core/src/main/scala/kafka/server/DelayedDeleteRecords.scala
@@ -20,12 +20,12 @@ package kafka.server
import java.util.concurrent.TimeUnit
-import kafka.metrics.KafkaMetricsGroup
import kafka.utils.Implicits._
import org.apache.kafka.common.TopicPartition
import org.apache.kafka.common.message.DeleteRecordsResponseData
import org.apache.kafka.common.protocol.Errors
import org.apache.kafka.common.requests.DeleteRecordsResponse
+import org.apache.kafka.server.metrics.KafkaMetricsGroup
import scala.collection._
@@ -122,12 +122,12 @@ class DelayedDeleteRecords(delayMs: Long,
}
}
-object DelayedDeleteRecordsMetrics extends KafkaMetricsGroup {
+object DelayedDeleteRecordsMetrics {
+ private val metricsGroup = new
KafkaMetricsGroup(DelayedDeleteRecordsMetrics.getClass)
- private val aggregateExpirationMeter = newMeter("ExpiresPerSec", "requests",
TimeUnit.SECONDS)
+ private val aggregateExpirationMeter =
metricsGroup.newMeter("ExpiresPerSec", "requests", TimeUnit.SECONDS)
def recordExpiration(partition: TopicPartition): Unit = {
aggregateExpirationMeter.mark()
}
}
-
diff --git a/core/src/main/scala/kafka/server/DelayedFetch.scala
b/core/src/main/scala/kafka/server/DelayedFetch.scala
index 9106cff910c..423b7bcd223 100644
--- a/core/src/main/scala/kafka/server/DelayedFetch.scala
+++ b/core/src/main/scala/kafka/server/DelayedFetch.scala
@@ -18,15 +18,16 @@
package kafka.server
import java.util.concurrent.TimeUnit
-import kafka.metrics.KafkaMetricsGroup
import org.apache.kafka.common.TopicIdPartition
import org.apache.kafka.common.errors._
import org.apache.kafka.common.protocol.Errors
import org.apache.kafka.common.requests.FetchRequest.PartitionData
import
org.apache.kafka.common.requests.OffsetsForLeaderEpochResponse.{UNDEFINED_EPOCH,
UNDEFINED_EPOCH_OFFSET}
+import org.apache.kafka.server.metrics.KafkaMetricsGroup
import org.apache.kafka.storage.internals.log.{FetchIsolation, FetchParams,
FetchPartitionData, LogOffsetMetadata}
import scala.collection._
+import scala.jdk.CollectionConverters._
case class FetchPartitionStatus(startOffsetMetadata: LogOffsetMetadata,
fetchInfo: PartitionData) {
@@ -180,9 +181,10 @@ class DelayedFetch(
}
}
-object DelayedFetchMetrics extends KafkaMetricsGroup {
+object DelayedFetchMetrics {
+ private val metricsGroup = new
KafkaMetricsGroup(DelayedFetchMetrics.getClass)
private val FetcherTypeKey = "fetcherType"
- val followerExpiredRequestMeter = newMeter("ExpiresPerSec", "requests",
TimeUnit.SECONDS, tags = Map(FetcherTypeKey -> "follower"))
- val consumerExpiredRequestMeter = newMeter("ExpiresPerSec", "requests",
TimeUnit.SECONDS, tags = Map(FetcherTypeKey -> "consumer"))
+ val followerExpiredRequestMeter = metricsGroup.newMeter("ExpiresPerSec",
"requests", TimeUnit.SECONDS, Map(FetcherTypeKey -> "follower").asJava)
+ val consumerExpiredRequestMeter = metricsGroup.newMeter("ExpiresPerSec",
"requests", TimeUnit.SECONDS, Map(FetcherTypeKey -> "consumer").asJava)
}
diff --git a/core/src/main/scala/kafka/server/DelayedOperation.scala
b/core/src/main/scala/kafka/server/DelayedOperation.scala
index 8b2d52fcd31..3bdd00b22e5 100644
--- a/core/src/main/scala/kafka/server/DelayedOperation.scala
+++ b/core/src/main/scala/kafka/server/DelayedOperation.scala
@@ -20,14 +20,15 @@ package kafka.server
import java.util.concurrent._
import java.util.concurrent.atomic._
import java.util.concurrent.locks.{Lock, ReentrantLock}
-import kafka.metrics.KafkaMetricsGroup
import kafka.utils.CoreUtils.inLock
import kafka.utils._
import kafka.utils.timer._
+import org.apache.kafka.server.metrics.KafkaMetricsGroup
import org.apache.kafka.server.util.ShutdownableThread
import scala.collection._
import scala.collection.mutable.ListBuffer
+import scala.jdk.CollectionConverters._
/**
* An operation whose processing needs to be delayed for at most the given
delayMs. For example
@@ -153,7 +154,9 @@ final class DelayedOperationPurgatory[T <:
DelayedOperation](purgatoryName: Stri
purgeInterval:
Int = 1000,
reaperEnabled:
Boolean = true,
timerEnabled:
Boolean = true)
- extends Logging with KafkaMetricsGroup {
+ extends Logging {
+ private val metricsGroup = new KafkaMetricsGroup(this.getClass)
+
/* a list of operation watching keys */
private class WatcherList {
val watchersByKey = new Pool[Any, Watchers](Some((key: Any) => new
Watchers(key)))
@@ -180,9 +183,9 @@ final class DelayedOperationPurgatory[T <:
DelayedOperation](purgatoryName: Stri
/* background thread expiring operations that have timed out */
private val expirationReaper = new ExpiredOperationReaper()
- private val metricsTags = Map("delayedOperation" -> purgatoryName)
- newGauge("PurgatorySize", () => watched, metricsTags)
- newGauge("NumDelayedOperations", () => numDelayed, metricsTags)
+ private val metricsTags = Map("delayedOperation" -> purgatoryName).asJava
+ metricsGroup.newGauge("PurgatorySize", () => watched, metricsTags)
+ metricsGroup.newGauge("NumDelayedOperations", () => numDelayed, metricsTags)
if (reaperEnabled)
expirationReaper.start()
@@ -338,8 +341,8 @@ final class DelayedOperationPurgatory[T <:
DelayedOperation](purgatoryName: Stri
expirationReaper.awaitShutdown()
}
timeoutTimer.shutdown()
- removeMetric("PurgatorySize", metricsTags)
- removeMetric("NumDelayedOperations", metricsTags)
+ metricsGroup.removeMetric("PurgatorySize", metricsTags)
+ metricsGroup.removeMetric("NumDelayedOperations", metricsTags)
}
/**
diff --git a/core/src/main/scala/kafka/server/DelayedProduce.scala
b/core/src/main/scala/kafka/server/DelayedProduce.scala
index 5e7e7bf385c..48a0ccbcff4 100644
--- a/core/src/main/scala/kafka/server/DelayedProduce.scala
+++ b/core/src/main/scala/kafka/server/DelayedProduce.scala
@@ -21,14 +21,15 @@ import java.util.concurrent.TimeUnit
import java.util.concurrent.locks.Lock
import com.yammer.metrics.core.Meter
-import kafka.metrics.KafkaMetricsGroup
import kafka.utils.Implicits._
import kafka.utils.Pool
import org.apache.kafka.common.TopicPartition
import org.apache.kafka.common.protocol.Errors
import org.apache.kafka.common.requests.ProduceResponse.PartitionResponse
+import org.apache.kafka.server.metrics.KafkaMetricsGroup
import scala.collection._
+import scala.jdk.CollectionConverters._
case class ProducePartitionStatus(requiredOffset: Long, responseStatus:
PartitionResponse) {
@volatile var acksPending = false
@@ -129,15 +130,16 @@ class DelayedProduce(delayMs: Long,
}
}
-object DelayedProduceMetrics extends KafkaMetricsGroup {
+object DelayedProduceMetrics {
+ private val metricsGroup = new
KafkaMetricsGroup(DelayedProduceMetrics.getClass)
- private val aggregateExpirationMeter = newMeter("ExpiresPerSec", "requests",
TimeUnit.SECONDS)
+ private val aggregateExpirationMeter =
metricsGroup.newMeter("ExpiresPerSec", "requests", TimeUnit.SECONDS)
private val partitionExpirationMeterFactory = (key: TopicPartition) =>
- newMeter("ExpiresPerSec",
+ metricsGroup.newMeter("ExpiresPerSec",
"requests",
TimeUnit.SECONDS,
- tags = Map("topic" -> key.topic, "partition" ->
key.partition.toString))
+ Map("topic" -> key.topic, "partition" ->
key.partition.toString).asJava)
private val partitionExpirationMeters = new Pool[TopicPartition,
Meter](valueFactory = Some(partitionExpirationMeterFactory))
def recordExpiration(partition: TopicPartition): Unit = {
diff --git a/core/src/main/scala/kafka/server/DelegationTokenManager.scala
b/core/src/main/scala/kafka/server/DelegationTokenManager.scala
index a715718c420..62bd83c72c5 100644
--- a/core/src/main/scala/kafka/server/DelegationTokenManager.scala
+++ b/core/src/main/scala/kafka/server/DelegationTokenManager.scala
@@ -25,7 +25,6 @@ import java.util.Base64
import javax.crypto.spec.SecretKeySpec
import javax.crypto.{Mac, SecretKey}
import kafka.common.{NotificationHandler, ZkNodeChangeNotificationListener}
-import kafka.metrics.KafkaMetricsGroup
import kafka.utils.{CoreUtils, Json, Logging}
import kafka.zk.{DelegationTokenChangeNotificationSequenceZNode,
DelegationTokenChangeNotificationZNode, DelegationTokensZNode, KafkaZkClient}
import org.apache.kafka.common.protocol.Errors
@@ -167,7 +166,7 @@ object DelegationTokenManager {
class DelegationTokenManager(val config: KafkaConfig,
val tokenCache: DelegationTokenCache,
val time: Time,
- val zkClient: KafkaZkClient) extends Logging with
KafkaMetricsGroup {
+ val zkClient: KafkaZkClient) extends Logging {
this.logIdent = s"[Token Manager on Broker ${config.brokerId}]: "
import DelegationTokenManager._
diff --git a/core/src/main/scala/kafka/server/FetchSession.scala
b/core/src/main/scala/kafka/server/FetchSession.scala
index b233a5ae69e..8725a09f090 100644
--- a/core/src/main/scala/kafka/server/FetchSession.scala
+++ b/core/src/main/scala/kafka/server/FetchSession.scala
@@ -17,7 +17,6 @@
package kafka.server
-import kafka.metrics.KafkaMetricsGroup
import kafka.utils.Logging
import org.apache.kafka.common.{TopicIdPartition, TopicPartition, Uuid}
import org.apache.kafka.common.message.FetchResponseData
@@ -25,11 +24,12 @@ import org.apache.kafka.common.protocol.Errors
import org.apache.kafka.common.requests.FetchMetadata.{FINAL_EPOCH,
INITIAL_EPOCH, INVALID_SESSION_ID}
import org.apache.kafka.common.requests.{FetchRequest, FetchResponse,
FetchMetadata => JFetchMetadata}
import org.apache.kafka.common.utils.{ImplicitLinkedHashCollection, Time,
Utils}
+import org.apache.kafka.server.metrics.KafkaMetricsGroup
import java.util
-import java.util.Optional
+import java.util.{Collections, Optional}
import java.util.concurrent.{ThreadLocalRandom, TimeUnit}
-import scala.collection.{mutable, _}
+import scala.collection.mutable
import scala.math.Ordered.orderingToOrdered
object FetchSession {
@@ -564,7 +564,9 @@ case class EvictableKey(privileged: Boolean, size: Int, id:
Int) extends Compara
* @param evictionMs The minimum time that an entry must be unused in order
to be evictable.
*/
class FetchSessionCache(private val maxEntries: Int,
- private val evictionMs: Long) extends Logging with
KafkaMetricsGroup {
+ private val evictionMs: Long) extends Logging {
+ private val metricsGroup = new KafkaMetricsGroup(this.getClass)
+
private var numPartitions: Long = 0
// A map of session ID to FetchSession.
@@ -581,13 +583,13 @@ class FetchSessionCache(private val maxEntries: Int,
private val evictableByPrivileged = new util.TreeMap[EvictableKey,
FetchSession]
// Set up metrics.
- removeMetric(FetchSession.NUM_INCREMENTAL_FETCH_SESSIONS)
- newGauge(FetchSession.NUM_INCREMENTAL_FETCH_SESSIONS, () =>
FetchSessionCache.this.size)
- removeMetric(FetchSession.NUM_INCREMENTAL_FETCH_PARTITIONS_CACHED)
- newGauge(FetchSession.NUM_INCREMENTAL_FETCH_PARTITIONS_CACHED, () =>
FetchSessionCache.this.totalPartitions)
- removeMetric(FetchSession.INCREMENTAL_FETCH_SESSIONS_EVICTIONS_PER_SEC)
- private[server] val evictionsMeter =
newMeter(FetchSession.INCREMENTAL_FETCH_SESSIONS_EVICTIONS_PER_SEC,
- FetchSession.EVICTIONS, TimeUnit.SECONDS, Map.empty)
+ metricsGroup.removeMetric(FetchSession.NUM_INCREMENTAL_FETCH_SESSIONS)
+ metricsGroup.newGauge(FetchSession.NUM_INCREMENTAL_FETCH_SESSIONS, () =>
FetchSessionCache.this.size)
+
metricsGroup.removeMetric(FetchSession.NUM_INCREMENTAL_FETCH_PARTITIONS_CACHED)
+ metricsGroup.newGauge(FetchSession.NUM_INCREMENTAL_FETCH_PARTITIONS_CACHED,
() => FetchSessionCache.this.totalPartitions)
+
metricsGroup.removeMetric(FetchSession.INCREMENTAL_FETCH_SESSIONS_EVICTIONS_PER_SEC)
+ private[server] val evictionsMeter =
metricsGroup.newMeter(FetchSession.INCREMENTAL_FETCH_SESSIONS_EVICTIONS_PER_SEC,
+ FetchSession.EVICTIONS, TimeUnit.SECONDS, Collections.emptyMap())
/**
* Get a session by session ID.
diff --git a/core/src/main/scala/kafka/server/KafkaBroker.scala
b/core/src/main/scala/kafka/server/KafkaBroker.scala
index db5da9bd906..cb66a9eb089 100644
--- a/core/src/main/scala/kafka/server/KafkaBroker.scala
+++ b/core/src/main/scala/kafka/server/KafkaBroker.scala
@@ -19,9 +19,10 @@ package kafka.server
import com.yammer.metrics.core.MetricName
import kafka.log.LogManager
-import kafka.metrics.{KafkaMetricsGroup, LinuxIoMetricsCollector}
+import kafka.metrics.LinuxIoMetricsCollector
import kafka.network.SocketServer
import kafka.security.CredentialProvider
+import kafka.utils.Logging
import org.apache.kafka.common.ClusterResource
import org.apache.kafka.common.internals.ClusterResourceListeners
import org.apache.kafka.common.metrics.{Metrics, MetricsReporter}
@@ -30,9 +31,10 @@ import org.apache.kafka.common.utils.Time
import org.apache.kafka.coordinator.group.GroupCoordinator
import org.apache.kafka.metadata.BrokerState
import org.apache.kafka.server.authorizer.Authorizer
-import org.apache.kafka.server.metrics.KafkaYammerMetrics
+import org.apache.kafka.server.metrics.{KafkaMetricsGroup, KafkaYammerMetrics}
import org.apache.kafka.server.util.Scheduler
+import java.util
import scala.collection.Seq
import scala.jdk.CollectionConverters._
@@ -66,7 +68,7 @@ object KafkaBroker {
val STARTED_MESSAGE = "Kafka Server started"
}
-trait KafkaBroker extends KafkaMetricsGroup {
+trait KafkaBroker extends Logging {
def authorizer: Option[Authorizer]
def brokerState: BrokerState
@@ -91,20 +93,22 @@ trait KafkaBroker extends KafkaMetricsGroup {
def credentialProvider: CredentialProvider
def clientToControllerChannelManager: BrokerToControllerChannelManager
- // For backwards compatibility, we need to keep older metrics tied
- // to their original name when this class was named `KafkaServer`
- override def metricName(name: String, metricTags:
scala.collection.Map[String, String]): MetricName = {
- explicitMetricName(Server.MetricsPrefix, KafkaBroker.MetricsTypeName,
name, metricTags)
+ private val metricsGroup = new KafkaMetricsGroup(this.getClass) {
+ // For backwards compatibility, we need to keep older metrics tied
+ // to their original name when this class was named `KafkaServer`
+ override def metricName(name: String, tags: util.Map[String, String]):
MetricName = {
+ KafkaMetricsGroup.explicitMetricName(Server.MetricsPrefix,
KafkaBroker.MetricsTypeName, name, tags)
+ }
}
- newGauge("BrokerState", () => brokerState.value)
- newGauge("ClusterId", () => clusterId)
- newGauge("yammer-metrics-count", () =>
KafkaYammerMetrics.defaultRegistry.allMetrics.size)
+ metricsGroup.newGauge("BrokerState", () => brokerState.value)
+ metricsGroup.newGauge("ClusterId", () => clusterId)
+ metricsGroup.newGauge("yammer-metrics-count", () =>
KafkaYammerMetrics.defaultRegistry.allMetrics.size)
private val linuxIoMetricsCollector = new LinuxIoMetricsCollector("/proc",
Time.SYSTEM, logger.underlying)
if (linuxIoMetricsCollector.usable()) {
- newGauge("linux-disk-read-bytes", () =>
linuxIoMetricsCollector.readBytes())
- newGauge("linux-disk-write-bytes", () =>
linuxIoMetricsCollector.writeBytes())
+ metricsGroup.newGauge("linux-disk-read-bytes", () =>
linuxIoMetricsCollector.readBytes())
+ metricsGroup.newGauge("linux-disk-write-bytes", () =>
linuxIoMetricsCollector.writeBytes())
}
}
diff --git a/core/src/main/scala/kafka/server/KafkaRequestHandler.scala
b/core/src/main/scala/kafka/server/KafkaRequestHandler.scala
index 4d38c6efc3f..bd5ea797fe2 100755
--- a/core/src/main/scala/kafka/server/KafkaRequestHandler.scala
+++ b/core/src/main/scala/kafka/server/KafkaRequestHandler.scala
@@ -19,14 +19,15 @@ package kafka.server
import kafka.network._
import kafka.utils._
-import kafka.metrics.KafkaMetricsGroup
import java.util.concurrent.{CountDownLatch, TimeUnit}
import java.util.concurrent.atomic.AtomicInteger
import com.yammer.metrics.core.Meter
import org.apache.kafka.common.internals.FatalExitError
import org.apache.kafka.common.utils.{KafkaThread, Time}
+import org.apache.kafka.server.metrics.KafkaMetricsGroup
+import java.util.Collections
import scala.collection.mutable
import scala.jdk.CollectionConverters._
@@ -109,11 +110,12 @@ class KafkaRequestHandlerPool(val brokerId: Int,
time: Time,
numThreads: Int,
requestHandlerAvgIdleMetricName: String,
- logAndThreadNamePrefix : String) extends Logging
with KafkaMetricsGroup {
+ logAndThreadNamePrefix : String) extends Logging
{
+ private val metricsGroup = new KafkaMetricsGroup(this.getClass)
private val threadPoolSize: AtomicInteger = new AtomicInteger(numThreads)
/* a meter to track the average free capacity of the request handlers */
- private val aggregateIdleMeter = newMeter(requestHandlerAvgIdleMetricName,
"percent", TimeUnit.NANOSECONDS)
+ private val aggregateIdleMeter =
metricsGroup.newMeter(requestHandlerAvgIdleMetricName, "percent",
TimeUnit.NANOSECONDS)
this.logIdent = "[" + logAndThreadNamePrefix + " Kafka Request Handler on
Broker " + brokerId + "], "
val runnables = new mutable.ArrayBuffer[KafkaRequestHandler](numThreads)
@@ -151,10 +153,12 @@ class KafkaRequestHandlerPool(val brokerId: Int,
}
}
-class BrokerTopicMetrics(name: Option[String]) extends KafkaMetricsGroup {
- val tags: scala.collection.Map[String, String] = name match {
- case None => Map.empty
- case Some(topic) => Map("topic" -> topic)
+class BrokerTopicMetrics(name: Option[String]) {
+ private val metricsGroup = new KafkaMetricsGroup(this.getClass)
+
+ val tags: java.util.Map[String, String] = name match {
+ case None => Collections.emptyMap()
+ case Some(topic) => Map("topic" -> topic).asJava
}
case class MeterWrapper(metricType: String, eventType: String) {
@@ -167,7 +171,7 @@ class BrokerTopicMetrics(name: Option[String]) extends
KafkaMetricsGroup {
meterLock synchronized {
meter = lazyMeter
if (meter == null) {
- meter = newMeter(metricType, eventType, TimeUnit.SECONDS, tags)
+ meter = metricsGroup.newMeter(metricType, eventType,
TimeUnit.SECONDS, tags)
lazyMeter = meter
}
}
@@ -177,7 +181,7 @@ class BrokerTopicMetrics(name: Option[String]) extends
KafkaMetricsGroup {
def close(): Unit = meterLock synchronized {
if (lazyMeter != null) {
- removeMetric(metricType, tags)
+ metricsGroup.removeMetric(metricType, tags)
lazyMeter = null
}
}
diff --git a/core/src/main/scala/kafka/server/ReplicaManager.scala
b/core/src/main/scala/kafka/server/ReplicaManager.scala
index 4ef13903983..1fd21104208 100644
--- a/core/src/main/scala/kafka/server/ReplicaManager.scala
+++ b/core/src/main/scala/kafka/server/ReplicaManager.scala
@@ -22,7 +22,6 @@ import kafka.cluster.{BrokerEndPoint, Partition,
PartitionListener}
import kafka.controller.{KafkaController, StateChangeLogger}
import kafka.log.remote.RemoteLogManager
import kafka.log.{LogManager, UnifiedLog}
-import kafka.metrics.KafkaMetricsGroup
import kafka.server.HostedPartition.Online
import kafka.server.QuotaFactory.QuotaManagers
import kafka.server.checkpoints.{LazyOffsetCheckpoints, OffsetCheckpointFile,
OffsetCheckpoints}
@@ -57,6 +56,7 @@ import org.apache.kafka.metadata.LeaderConstants.NO_LEADER
import org.apache.kafka.server.common.MetadataVersion._
import org.apache.kafka.server.util.{Scheduler, ShutdownableThread}
import org.apache.kafka.storage.internals.log.{AppendOrigin, FetchDataInfo,
FetchParams, FetchPartitionData, LeaderHwChange, LogAppendInfo, LogConfig,
LogDirFailureChannel, LogOffsetMetadata, LogReadInfo, RecordValidationException}
+import org.apache.kafka.server.metrics.KafkaMetricsGroup
import java.io.File
import java.nio.file.{Files, Paths}
@@ -194,7 +194,8 @@ class ReplicaManager(val config: KafkaConfig,
delayedDeleteRecordsPurgatoryParam:
Option[DelayedOperationPurgatory[DelayedDeleteRecords]] = None,
delayedElectLeaderPurgatoryParam:
Option[DelayedOperationPurgatory[DelayedElectLeader]] = None,
threadNamePrefix: Option[String] = None,
- ) extends Logging with KafkaMetricsGroup {
+ ) extends Logging {
+ private val metricsGroup = new KafkaMetricsGroup(this.getClass)
val delayedProducePurgatory = delayedProducePurgatoryParam.getOrElse(
DelayedOperationPurgatory[DelayedProduce](
@@ -246,16 +247,16 @@ class ReplicaManager(val config: KafkaConfig,
// Visible for testing
private[server] val replicaSelectorOpt: Option[ReplicaSelector] =
createReplicaSelector()
- newGauge("LeaderCount", () => leaderPartitionsIterator.size)
+ metricsGroup.newGauge("LeaderCount", () => leaderPartitionsIterator.size)
// Visible for testing
- private[kafka] val partitionCount = newGauge("PartitionCount", () =>
allPartitions.size)
- newGauge("OfflineReplicaCount", () => offlinePartitionCount)
- newGauge("UnderReplicatedPartitions", () => underReplicatedPartitionCount)
- newGauge("UnderMinIsrPartitionCount", () =>
leaderPartitionsIterator.count(_.isUnderMinIsr))
- newGauge("AtMinIsrPartitionCount", () =>
leaderPartitionsIterator.count(_.isAtMinIsr))
- newGauge("ReassigningPartitions", () => reassigningPartitionsCount)
- newGauge("PartitionsWithLateTransactionsCount", () => lateTransactionsCount)
- newGauge("ProducerIdCount", () => producerIdCount)
+ private[kafka] val partitionCount = metricsGroup.newGauge("PartitionCount",
() => allPartitions.size)
+ metricsGroup.newGauge("OfflineReplicaCount", () => offlinePartitionCount)
+ metricsGroup.newGauge("UnderReplicatedPartitions", () =>
underReplicatedPartitionCount)
+ metricsGroup.newGauge("UnderMinIsrPartitionCount", () =>
leaderPartitionsIterator.count(_.isUnderMinIsr))
+ metricsGroup.newGauge("AtMinIsrPartitionCount", () =>
leaderPartitionsIterator.count(_.isAtMinIsr))
+ metricsGroup.newGauge("ReassigningPartitions", () =>
reassigningPartitionsCount)
+ metricsGroup.newGauge("PartitionsWithLateTransactionsCount", () =>
lateTransactionsCount)
+ metricsGroup.newGauge("ProducerIdCount", () => producerIdCount)
def reassigningPartitionsCount: Int =
leaderPartitionsIterator.count(_.isReassigning)
@@ -266,9 +267,9 @@ class ReplicaManager(val config: KafkaConfig,
def producerIdCount: Int =
onlinePartitionsIterator.map(_.producerIdCount).sum
- val isrExpandRate: Meter = newMeter("IsrExpandsPerSec", "expands",
TimeUnit.SECONDS)
- val isrShrinkRate: Meter = newMeter("IsrShrinksPerSec", "shrinks",
TimeUnit.SECONDS)
- val failedIsrUpdatesRate: Meter = newMeter("FailedIsrUpdatesPerSec",
"failedUpdates", TimeUnit.SECONDS)
+ val isrExpandRate: Meter = metricsGroup.newMeter("IsrExpandsPerSec",
"expands", TimeUnit.SECONDS)
+ val isrShrinkRate: Meter = metricsGroup.newMeter("IsrShrinksPerSec",
"shrinks", TimeUnit.SECONDS)
+ val failedIsrUpdatesRate: Meter =
metricsGroup.newMeter("FailedIsrUpdatesPerSec", "failedUpdates",
TimeUnit.SECONDS)
def underReplicatedPartitionCount: Int =
leaderPartitionsIterator.count(_.isUnderReplicated)
@@ -1921,15 +1922,15 @@ class ReplicaManager(val config: KafkaConfig,
}
def removeMetrics(): Unit = {
- removeMetric("LeaderCount")
- removeMetric("PartitionCount")
- removeMetric("OfflineReplicaCount")
- removeMetric("UnderReplicatedPartitions")
- removeMetric("UnderMinIsrPartitionCount")
- removeMetric("AtMinIsrPartitionCount")
- removeMetric("ReassigningPartitions")
- removeMetric("PartitionsWithLateTransactionsCount")
- removeMetric("ProducerIdCount")
+ metricsGroup.removeMetric("LeaderCount")
+ metricsGroup.removeMetric("PartitionCount")
+ metricsGroup.removeMetric("OfflineReplicaCount")
+ metricsGroup.removeMetric("UnderReplicatedPartitions")
+ metricsGroup.removeMetric("UnderMinIsrPartitionCount")
+ metricsGroup.removeMetric("AtMinIsrPartitionCount")
+ metricsGroup.removeMetric("ReassigningPartitions")
+ metricsGroup.removeMetric("PartitionsWithLateTransactionsCount")
+ metricsGroup.removeMetric("ProducerIdCount")
}
def beginControlledShutdown(): Unit = {
diff --git a/core/src/main/scala/kafka/server/ZkAdminManager.scala
b/core/src/main/scala/kafka/server/ZkAdminManager.scala
index 77e42b21dd6..97cf2b2f713 100644
--- a/core/src/main/scala/kafka/server/ZkAdminManager.scala
+++ b/core/src/main/scala/kafka/server/ZkAdminManager.scala
@@ -20,7 +20,6 @@ import java.util
import java.util.Properties
import kafka.admin.{AdminOperationException, AdminUtils}
import kafka.common.TopicAlreadyMarkedForDeletionException
-import kafka.metrics.KafkaMetricsGroup
import kafka.server.ConfigAdminManager.{prepareIncrementalConfigs,
toLoggableProps}
import kafka.server.DynamicConfig.QuotaConfigs
import kafka.server.metadata.ZkConfigRepository
@@ -69,7 +68,7 @@ object ZkAdminManager {
class ZkAdminManager(val config: KafkaConfig,
val metrics: Metrics,
val metadataCache: MetadataCache,
- val zkClient: KafkaZkClient) extends Logging with
KafkaMetricsGroup {
+ val zkClient: KafkaZkClient) extends Logging {
this.logIdent = "[Admin Manager on Broker " + config.brokerId + "]: "
diff --git
a/core/src/main/scala/kafka/server/metadata/BrokerMetadataListener.scala
b/core/src/main/scala/kafka/server/metadata/BrokerMetadataListener.scala
index 789ae89f049..a12f25de471 100644
--- a/core/src/main/scala/kafka/server/metadata/BrokerMetadataListener.scala
+++ b/core/src/main/scala/kafka/server/metadata/BrokerMetadataListener.scala
@@ -16,10 +16,11 @@
*/
package kafka.server.metadata
+import kafka.utils.Logging
+
import java.util
import java.util.concurrent.atomic.AtomicBoolean
import java.util.concurrent.CompletableFuture
-import kafka.metrics.KafkaMetricsGroup
import org.apache.kafka.common.utils.{LogContext, Time}
import org.apache.kafka.image.writer.{ImageWriterOptions, RecordListWriter}
import org.apache.kafka.image.{MetadataDelta, MetadataImage,
MetadataProvenance}
@@ -42,7 +43,7 @@ class BrokerMetadataListener(
val snapshotter: Option[MetadataSnapshotter],
brokerMetrics: BrokerServerMetrics,
_metadataLoadingFaultHandler: FaultHandler
-) extends RaftClient.Listener[ApiMessageAndVersion] with KafkaMetricsGroup {
+) extends RaftClient.Listener[ApiMessageAndVersion] with Logging {
private val metadataFaultOccurred = new AtomicBoolean(false)
private val metadataLoadingFaultHandler: FaultHandler = new FaultHandler() {
diff --git
a/core/src/main/scala/kafka/server/metadata/BrokerServerMetrics.scala
b/core/src/main/scala/kafka/server/metadata/BrokerServerMetrics.scala
index 6b7701485e2..212909101f2 100644
--- a/core/src/main/scala/kafka/server/metadata/BrokerServerMetrics.scala
+++ b/core/src/main/scala/kafka/server/metadata/BrokerServerMetrics.scala
@@ -17,8 +17,6 @@
package kafka.server.metadata
-import kafka.metrics.KafkaMetricsGroup
-
import java.util.concurrent.atomic.{AtomicLong, AtomicReference}
import org.apache.kafka.common.MetricName
import org.apache.kafka.common.metrics.Gauge
@@ -26,19 +24,20 @@ import org.apache.kafka.common.metrics.Metrics
import org.apache.kafka.common.metrics.MetricConfig
import org.apache.kafka.image.MetadataProvenance
import org.apache.kafka.image.loader.MetadataLoaderMetrics
-import org.apache.kafka.server.metrics.KafkaYammerMetrics
+import org.apache.kafka.server.metrics.{KafkaMetricsGroup, KafkaYammerMetrics}
+import java.util.Collections
import java.util.concurrent.TimeUnit.NANOSECONDS
final class BrokerServerMetrics private (
metrics: Metrics
-) extends MetadataLoaderMetrics with KafkaMetricsGroup {
+) extends MetadataLoaderMetrics {
import BrokerServerMetrics._
- private val batchProcessingTimeHistName = explicitMetricName("kafka.server",
+ private val batchProcessingTimeHistName =
KafkaMetricsGroup.explicitMetricName("kafka.server",
"BrokerMetadataListener",
"MetadataBatchProcessingTimeUs",
- Map.empty)
+ Collections.emptyMap())
/**
* A histogram tracking the time in microseconds it took to process batches
of events.
@@ -46,10 +45,10 @@ final class BrokerServerMetrics private (
private val batchProcessingTimeHist =
KafkaYammerMetrics.defaultRegistry().newHistogram(batchProcessingTimeHistName,
true)
- private val batchSizeHistName = explicitMetricName("kafka.server",
+ private val batchSizeHistName =
KafkaMetricsGroup.explicitMetricName("kafka.server",
"BrokerMetadataListener",
"MetadataBatchSizes",
- Map.empty)
+ Collections.emptyMap())
/**
* A histogram tracking the sizes of batches that we have processed.
diff --git a/core/src/main/scala/kafka/tools/MirrorMaker.scala
b/core/src/main/scala/kafka/tools/MirrorMaker.scala
index 067e8aee84c..65c1013fcbf 100755
--- a/core/src/main/scala/kafka/tools/MirrorMaker.scala
+++ b/core/src/main/scala/kafka/tools/MirrorMaker.scala
@@ -24,7 +24,6 @@ import java.util.concurrent.CountDownLatch
import java.util.regex.Pattern
import java.util.{Collections, Properties}
import kafka.consumer.BaseConsumerRecord
-import kafka.metrics.KafkaMetricsGroup
import kafka.utils._
import org.apache.kafka.clients.consumer._
import org.apache.kafka.clients.producer.internals.ErrorLoggingCallback
@@ -34,6 +33,7 @@ import org.apache.kafka.common.record.RecordBatch
import org.apache.kafka.common.serialization.{ByteArrayDeserializer,
ByteArraySerializer}
import org.apache.kafka.common.utils.{Time, Utils}
import org.apache.kafka.common.{KafkaException, TopicPartition}
+import org.apache.kafka.server.metrics.KafkaMetricsGroup
import org.apache.kafka.server.util.{CommandDefaultOptions, CommandLineUtils}
import scala.jdk.CollectionConverters._
@@ -61,7 +61,8 @@ import scala.util.{Failure, Success, Try}
* @deprecated Since 3.0, use the Connect-based MirrorMaker instead (aka MM2).
*/
@deprecated(message = "Use the Connect-based MirrorMaker instead (aka MM2).",
since = "3.0")
-object MirrorMaker extends Logging with KafkaMetricsGroup {
+object MirrorMaker extends Logging {
+ private val metricsGroup = new KafkaMetricsGroup(MirrorMaker.getClass)
private[tools] var producer: MirrorMakerProducer = _
private var mirrorMakerThreads: Seq[MirrorMakerThread] = _
@@ -78,7 +79,7 @@ object MirrorMaker extends Logging with KafkaMetricsGroup {
// If a message send failed after retries are exhausted. The offset of the
messages will also be removed from
// the unacked offset list to avoid offset commit being stuck on that
offset. In this case, the offset of that
// message was not really acked, but was skipped. This metric records the
number of skipped offsets.
- newGauge("MirrorMaker-numDroppedMessages", () => numDroppedMessages.get())
+ metricsGroup.newGauge("MirrorMaker-numDroppedMessages", () =>
numDroppedMessages.get())
def main(args: Array[String]): Unit = {
@@ -185,7 +186,7 @@ object MirrorMaker extends Logging with KafkaMetricsGroup {
}
class MirrorMakerThread(consumerWrapper: ConsumerWrapper,
- val threadId: Int) extends Thread with Logging with
KafkaMetricsGroup {
+ val threadId: Int) extends Thread with Logging {
private val threadName = "mirrormaker-thread-" + threadId
private val shutdownLatch: CountDownLatch = new CountDownLatch(1)
private var lastOffsetCommitMs = System.currentTimeMillis()
diff --git a/core/src/main/scala/kafka/utils/Throttler.scala
b/core/src/main/scala/kafka/utils/Throttler.scala
index a431db5f006..824c7dcfc28 100644
--- a/core/src/main/scala/kafka/utils/Throttler.scala
+++ b/core/src/main/scala/kafka/utils/Throttler.scala
@@ -17,8 +17,8 @@
package kafka.utils
-import kafka.metrics.KafkaMetricsGroup
import org.apache.kafka.common.utils.Time
+import org.apache.kafka.server.metrics.KafkaMetricsGroup
import java.util.concurrent.TimeUnit
import java.util.Random
@@ -41,10 +41,12 @@ class Throttler(@volatile var desiredRatePerSec: Double,
throttleDown: Boolean = true,
metricName: String = "throttler",
units: String = "entries",
- time: Time = Time.SYSTEM) extends Logging with
KafkaMetricsGroup {
-
+ time: Time = Time.SYSTEM) extends Logging {
+
+ private val metricsGroup = new KafkaMetricsGroup(this.getClass)
+
private val lock = new Object
- private val meter = newMeter(metricName, units, TimeUnit.SECONDS)
+ private val meter = metricsGroup.newMeter(metricName, units,
TimeUnit.SECONDS)
private val checkIntervalNs = TimeUnit.MILLISECONDS.toNanos(checkIntervalMs)
private var periodStartNs: Long = time.nanoseconds
private var observedSoFar: Double = 0.0
diff --git a/core/src/main/scala/kafka/zk/KafkaZkClient.scala
b/core/src/main/scala/kafka/zk/KafkaZkClient.scala
index 78dda799f17..056cc15a7e9 100644
--- a/core/src/main/scala/kafka/zk/KafkaZkClient.scala
+++ b/core/src/main/scala/kafka/zk/KafkaZkClient.scala
@@ -21,7 +21,6 @@ import com.yammer.metrics.core.MetricName
import kafka.api.LeaderAndIsr
import kafka.cluster.Broker
import kafka.controller.{KafkaController, LeaderIsrAndControllerEpoch,
ReplicaAssignment}
-import kafka.metrics.KafkaMetricsGroup
import kafka.security.authorizer.AclAuthorizer.{NoAcls, VersionedAcls}
import kafka.security.authorizer.AclEntry
import kafka.server.{ConfigType, KafkaConfig}
@@ -35,6 +34,7 @@ import
org.apache.kafka.common.security.token.delegation.{DelegationToken, Token
import org.apache.kafka.common.utils.{Time, Utils}
import org.apache.kafka.common.{KafkaException, TopicPartition, Uuid}
import org.apache.kafka.metadata.migration.ZkMigrationLeadershipState
+import org.apache.kafka.server.metrics.KafkaMetricsGroup
import org.apache.kafka.storage.internals.log.LogConfig
import org.apache.zookeeper.KeeperException.{Code, NodeExistsException}
import org.apache.zookeeper.OpResult.{CheckResult, CreateResult, ErrorResult,
SetDataResult}
@@ -43,6 +43,7 @@ import org.apache.zookeeper.common.ZKConfig
import org.apache.zookeeper.data.{ACL, Stat}
import org.apache.zookeeper.{CreateMode, KeeperException, OpResult, ZooKeeper}
+import java.util
import java.lang.{Long => JLong}
import scala.collection.{Map, Seq, mutable}
@@ -59,13 +60,15 @@ case class SuccessfulRegistrationResult(zkControllerEpoch:
Int, controllerEpochZ
* monolithic [[kafka.zk.ZkData]] is the way to go.
*/
class KafkaZkClient private[zk] (zooKeeperClient: ZooKeeperClient, isSecure:
Boolean, time: Time) extends AutoCloseable with
- Logging with KafkaMetricsGroup {
+ Logging {
- override def metricName(name: String, metricTags:
scala.collection.Map[String, String]): MetricName = {
- explicitMetricName("kafka.server", "ZooKeeperClientMetrics", name,
metricTags)
+ private val metricsGroup: KafkaMetricsGroup = new
KafkaMetricsGroup(this.getClass) {
+ override def metricName(name: String, metricTags: util.Map[String,
String]): MetricName = {
+ KafkaMetricsGroup.explicitMetricName("kafka.server",
"ZooKeeperClientMetrics", name, metricTags)
+ }
}
- private val latencyMetric = newHistogram("ZooKeeperRequestLatencyMs")
+ private val latencyMetric =
metricsGroup.newHistogram("ZooKeeperRequestLatencyMs")
import KafkaZkClient._
@@ -1627,7 +1630,7 @@ class KafkaZkClient private[zk] (zooKeeperClient:
ZooKeeperClient, isSecure: Boo
* Close the underlying ZooKeeperClient.
*/
def close(): Unit = {
- removeMetric("ZooKeeperRequestLatencyMs")
+ metricsGroup.removeMetric("ZooKeeperRequestLatencyMs")
zooKeeperClient.close()
}
diff --git a/core/src/main/scala/kafka/zookeeper/ZooKeeperClient.scala
b/core/src/main/scala/kafka/zookeeper/ZooKeeperClient.scala
index 531cf468adc..04c9184791c 100755
--- a/core/src/main/scala/kafka/zookeeper/ZooKeeperClient.scala
+++ b/core/src/main/scala/kafka/zookeeper/ZooKeeperClient.scala
@@ -22,12 +22,12 @@ import java.util.concurrent.locks.{ReentrantLock,
ReentrantReadWriteLock}
import java.util.concurrent._
import java.util.{List => JList}
import com.yammer.metrics.core.MetricName
-import kafka.metrics.KafkaMetricsGroup
import kafka.utils.CoreUtils.{inLock, inReadLock, inWriteLock}
import kafka.utils.Logging
import kafka.zookeeper.ZooKeeperClient._
import org.apache.kafka.common.utils.Time
import org.apache.kafka.server.util.KafkaScheduler
+import org.apache.kafka.server.metrics.KafkaMetricsGroup
import org.apache.zookeeper.AsyncCallback.{Children2Callback, DataCallback,
StatCallback}
import org.apache.zookeeper.KeeperException.Code
import org.apache.zookeeper.Watcher.Event.{EventType, KeeperState}
@@ -36,6 +36,7 @@ import org.apache.zookeeper.data.{ACL, Stat}
import org.apache.zookeeper._
import org.apache.zookeeper.client.ZKClientConfig
+import java.util
import scala.jdk.CollectionConverters._
import scala.collection.Seq
import scala.collection.mutable.Set
@@ -62,7 +63,14 @@ class ZooKeeperClient(connectString: String,
metricGroup: String,
metricType: String,
private[zookeeper] val clientConfig: ZKClientConfig,
- name: String) extends Logging with KafkaMetricsGroup {
+ name: String) extends Logging {
+
+ private val metricsGroup: KafkaMetricsGroup = new
KafkaMetricsGroup(this.getClass) {
+ override def metricName(name: String, metricTags: util.Map[String,
String]): MetricName = {
+ KafkaMetricsGroup.explicitMetricName(metricGroup, metricType, name,
metricTags)
+ }
+ }
+
this.logIdent = s"[ZooKeeperClient $name] "
private val initializationLock = new ReentrantReadWriteLock()
@@ -91,7 +99,7 @@ class ZooKeeperClient(connectString: String,
stateToEventTypeMap.map { case (state, eventType) =>
val name = s"ZooKeeper${eventType}PerSec"
metricNames += name
- state -> newMeter(name, eventType.toLowerCase(Locale.ROOT),
TimeUnit.SECONDS)
+ state -> metricsGroup.newMeter(name, eventType.toLowerCase(Locale.ROOT),
TimeUnit.SECONDS)
}
}
@@ -100,7 +108,7 @@ class ZooKeeperClient(connectString: String,
@volatile private var zooKeeper = new ZooKeeper(connectString,
sessionTimeoutMs, ZooKeeperClientWatcher,
clientConfig)
- newGauge("SessionState", () => connectionState.toString)
+ metricsGroup.newGauge("SessionState", () => connectionState.toString)
metricNames += "SessionState"
@@ -112,10 +120,6 @@ class ZooKeeperClient(connectString: String,
throw e
}
- override def metricName(name: String, metricTags:
scala.collection.Map[String, String]): MetricName = {
- explicitMetricName(metricGroup, metricType, name, metricTags)
- }
-
/**
* Return the state of the ZooKeeper connection.
*/
@@ -344,7 +348,7 @@ class ZooKeeperClient(connectString: String,
zNodeChildChangeHandlers.clear()
stateChangeHandlers.clear()
zooKeeper.close()
- metricNames.foreach(removeMetric(_))
+ metricNames.foreach(metricsGroup.removeMetric(_))
}
info("Closed.")
}
diff --git
a/core/src/test/scala/unit/kafka/coordinator/group/GroupMetadataManagerTest.scala
b/core/src/test/scala/unit/kafka/coordinator/group/GroupMetadataManagerTest.scala
index 3052e166a94..e24530918b5 100644
---
a/core/src/test/scala/unit/kafka/coordinator/group/GroupMetadataManagerTest.scala
+++
b/core/src/test/scala/unit/kafka/coordinator/group/GroupMetadataManagerTest.scala
@@ -2572,7 +2572,7 @@ class GroupMetadataManagerTest {
}
private def getGauge(manager: GroupMetadataManager, name: String):
Gauge[Int] = {
-
KafkaYammerMetrics.defaultRegistry().allMetrics().get(manager.metricName(name,
Map.empty)).asInstanceOf[Gauge[Int]]
+
KafkaYammerMetrics.defaultRegistry().allMetrics().get(manager.metricsGroup.metricName(name,
Collections.emptyMap())).asInstanceOf[Gauge[Int]]
}
private def expectMetrics(manager: GroupMetadataManager,
diff --git a/core/src/test/scala/unit/kafka/log/LogCleanerIntegrationTest.scala
b/core/src/test/scala/unit/kafka/log/LogCleanerIntegrationTest.scala
index ae09f7633cc..efdcae25355 100644
--- a/core/src/test/scala/unit/kafka/log/LogCleanerIntegrationTest.scala
+++ b/core/src/test/scala/unit/kafka/log/LogCleanerIntegrationTest.scala
@@ -20,7 +20,6 @@ package kafka.log
import java.io.PrintWriter
import com.yammer.metrics.core.{Gauge, MetricName}
-import kafka.metrics.KafkaMetricsGroup
import kafka.utils.{MockTime, TestUtils}
import org.apache.kafka.common.TopicPartition
import org.apache.kafka.common.record.{CompressionType, RecordBatch}
@@ -34,7 +33,7 @@ import scala.jdk.CollectionConverters._
/**
* This is an integration test that tests the fully integrated log cleaner
*/
-class LogCleanerIntegrationTest extends AbstractLogCleanerIntegrationTest with
KafkaMetricsGroup {
+class LogCleanerIntegrationTest extends AbstractLogCleanerIntegrationTest {
val codec: CompressionType = CompressionType.LZ4
diff --git a/core/src/test/scala/unit/kafka/log/LogCleanerTest.scala
b/core/src/test/scala/unit/kafka/log/LogCleanerTest.scala
index f0c9256d506..fdfa61c10e8 100644
--- a/core/src/test/scala/unit/kafka/log/LogCleanerTest.scala
+++ b/core/src/test/scala/unit/kafka/log/LogCleanerTest.scala
@@ -1888,7 +1888,7 @@ class LogCleanerTest {
time = time)
def checkGauge(name: String): Unit = {
- val gauge = logCleaner.newGauge(name, () => 999)
+ val gauge = logCleaner.metricsGroup.newGauge(name, () => 999)
// if there is no cleaners, 0 is default value
assertEquals(0, gauge.value())
}
diff --git a/core/src/test/scala/unit/kafka/log/LogManagerTest.scala
b/core/src/test/scala/unit/kafka/log/LogManagerTest.scala
index a1f842c6c67..465bca9af1b 100755
--- a/core/src/test/scala/unit/kafka/log/LogManagerTest.scala
+++ b/core/src/test/scala/unit/kafka/log/LogManagerTest.scala
@@ -679,10 +679,10 @@ class LogManagerTest {
}
private def verifyRemainingLogsToRecoverMetric(spyLogManager: LogManager,
expectedParams: Map[String, Int]): Unit = {
- val spyLogManagerClassName = spyLogManager.getClass().getSimpleName
+ val logManagerClassName = classOf[LogManager].getSimpleName
// get all `remainingLogsToRecover` metrics
val logMetrics: ArrayBuffer[Gauge[Int]] =
KafkaYammerMetrics.defaultRegistry.allMetrics.asScala
- .filter { case (metric, _) => metric.getType ==
s"$spyLogManagerClassName" && metric.getName == "remainingLogsToRecover" }
+ .filter { case (metric, _) => metric.getType == s"$logManagerClassName"
&& metric.getName == "remainingLogsToRecover" }
.map { case (_, gauge) => gauge }
.asInstanceOf[ArrayBuffer[Gauge[Int]]]
@@ -709,10 +709,10 @@ class LogManagerTest {
recoveryThreadsPerDataDir: Int,
mockMap:
ConcurrentHashMap[String, Int],
expectedParams:
Map[String, Int]): Unit = {
- val spyLogManagerClassName = spyLogManager.getClass().getSimpleName
+ val logManagerClassName = classOf[LogManager].getSimpleName
// get all `remainingSegmentsToRecover` metrics
val logSegmentMetrics: ArrayBuffer[Gauge[Int]] =
KafkaYammerMetrics.defaultRegistry.allMetrics.asScala
- .filter { case (metric, _) => metric.getType ==
s"$spyLogManagerClassName" && metric.getName == "remainingSegmentsToRecover" }
+ .filter { case (metric, _) => metric.getType ==
s"$logManagerClassName" && metric.getName == "remainingSegmentsToRecover" }
.map { case (_, gauge) => gauge }
.asInstanceOf[ArrayBuffer[Gauge[Int]]]
diff --git a/core/src/test/scala/unit/kafka/metrics/KafkaMetricsGroupTest.scala
b/core/src/test/scala/unit/kafka/metrics/KafkaMetricsGroupTest.scala
index 918553589d6..2b0c46c5c46 100644
--- a/core/src/test/scala/unit/kafka/metrics/KafkaMetricsGroupTest.scala
+++ b/core/src/test/scala/unit/kafka/metrics/KafkaMetricsGroupTest.scala
@@ -17,18 +17,23 @@
package kafka.metrics
+import org.apache.kafka.server.metrics.KafkaMetricsGroup
+
import org.junit.jupiter.api.Assertions.{assertEquals, assertNull}
import org.junit.jupiter.api.Test
+import java.util.Collections
+import scala.jdk.CollectionConverters._
+
class KafkaMetricsGroupTest {
@Test
def testUntaggedMetricName(): Unit = {
val metricName = KafkaMetricsGroup.explicitMetricName(
- group = "kafka.metrics",
- typeName = "TestMetrics",
- name = "TaggedMetric",
- Map.empty
+ "kafka.metrics",
+ "TestMetrics",
+ "TaggedMetric",
+ Collections.emptyMap()
)
assertEquals("kafka.metrics", metricName.getGroup)
@@ -41,11 +46,11 @@ class KafkaMetricsGroupTest {
@Test
def testTaggedMetricName(): Unit = {
- val tags = Map("foo" -> "bar", "bar" -> "baz", "baz" -> "raz.taz")
+ val tags = Map("foo" -> "bar", "bar" -> "baz", "baz" -> "raz.taz").asJava
val metricName = KafkaMetricsGroup.explicitMetricName(
- group = "kafka.metrics",
- typeName = "TestMetrics",
- name = "TaggedMetric",
+ "kafka.metrics",
+ "TestMetrics",
+ "TaggedMetric",
tags
)
@@ -59,11 +64,11 @@ class KafkaMetricsGroupTest {
@Test
def testTaggedMetricNameWithEmptyValue(): Unit = {
- val tags = Map("foo" -> "bar", "bar" -> "", "baz" -> "raz.taz")
+ val tags = Map("foo" -> "bar", "bar" -> "", "baz" -> "raz.taz").asJava
val metricName = KafkaMetricsGroup.explicitMetricName(
- group = "kafka.metrics",
- typeName = "TestMetrics",
- name = "TaggedMetric",
+ "kafka.metrics",
+ "TestMetrics",
+ "TaggedMetric",
tags
)
diff --git a/core/src/test/scala/unit/kafka/metrics/MetricsTest.scala
b/core/src/test/scala/unit/kafka/metrics/MetricsTest.scala
index 3131eb57985..db619e2e0a2 100644
--- a/core/src/test/scala/unit/kafka/metrics/MetricsTest.scala
+++ b/core/src/test/scala/unit/kafka/metrics/MetricsTest.scala
@@ -32,6 +32,7 @@ import org.apache.kafka.common.TopicPartition
import org.apache.kafka.common.config.TopicConfig
import org.apache.kafka.common.metrics.JmxReporter
import org.apache.kafka.common.utils.Time
+import org.apache.kafka.server.metrics.KafkaMetricsGroup
import org.apache.kafka.server.metrics.KafkaYammerMetrics
import org.junit.jupiter.api.Timeout
import org.junit.jupiter.params.ParameterizedTest
@@ -158,7 +159,7 @@ class MetricsTest extends KafkaServerTestHarness with
Logging {
val path = "C:\\windows-path\\kafka-logs"
val tags = Map("dir" -> path)
val expectedMBeanName = Set(tags.keySet.head,
ObjectName.quote(path)).mkString("=")
- val metric = KafkaMetricsGroup.metricName("test-metric", tags)
+ val metric = new
KafkaMetricsGroup(this.getClass).metricName("test-metric", tags.asJava)
assert(metric.getMBeanName.endsWith(expectedMBeanName))
}
diff --git a/core/src/test/scala/unit/kafka/network/ConnectionQuotasTest.scala
b/core/src/test/scala/unit/kafka/network/ConnectionQuotasTest.scala
index 2ebcc37306d..725435cc67d 100644
--- a/core/src/test/scala/unit/kafka/network/ConnectionQuotasTest.scala
+++ b/core/src/test/scala/unit/kafka/network/ConnectionQuotasTest.scala
@@ -22,7 +22,6 @@ import java.util
import java.util.concurrent.{Callable, ExecutorService, Executors, TimeUnit}
import java.util.{Collections, Properties}
import com.yammer.metrics.core.Meter
-import kafka.metrics.KafkaMetricsGroup
import kafka.network.Processor.ListenerMetricTag
import kafka.server.KafkaConfig
import kafka.utils.Implicits.MapExtensionMethods
@@ -33,6 +32,7 @@ import org.apache.kafka.common.metrics.internals.MetricsUtils
import org.apache.kafka.common.metrics.{KafkaMetric, MetricConfig, Metrics}
import org.apache.kafka.common.network._
import org.apache.kafka.common.utils.Time
+import org.apache.kafka.server.metrics.KafkaMetricsGroup
import org.junit.jupiter.api.Assertions._
import org.junit.jupiter.api._
@@ -88,8 +88,8 @@ class ConnectionQuotasTest {
TestUtils.clearYammerMetrics()
listeners.keys.foreach { name =>
- blockedPercentMeters.put(name, KafkaMetricsGroup.newMeter(
- s"${name}BlockedPercent", "blocked time", TimeUnit.NANOSECONDS,
Map(ListenerMetricTag -> name)))
+ blockedPercentMeters.put(name, new
KafkaMetricsGroup(this.getClass).newMeter(
+ s"${name}BlockedPercent", "blocked time", TimeUnit.NANOSECONDS,
Map(ListenerMetricTag -> name).asJava))
}
// use system time, because ConnectionQuota causes the current thread to
wait with timeout, which waits based on
// system time; so using mock time will likely result in test flakiness
due to a mixed use of mock and system time
diff --git
a/server-common/src/main/java/org/apache/kafka/server/metrics/KafkaMetricsGroup.java
b/server-common/src/main/java/org/apache/kafka/server/metrics/KafkaMetricsGroup.java
new file mode 100644
index 00000000000..4560cd3f22c
--- /dev/null
+++
b/server-common/src/main/java/org/apache/kafka/server/metrics/KafkaMetricsGroup.java
@@ -0,0 +1,147 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.kafka.server.metrics;
+
+import java.util.Collections;
+import java.util.List;
+import java.util.Map;
+import java.util.Optional;
+import java.util.concurrent.TimeUnit;
+import java.util.stream.Collectors;
+
+import com.yammer.metrics.core.Gauge;
+import com.yammer.metrics.core.Histogram;
+import com.yammer.metrics.core.Meter;
+import com.yammer.metrics.core.MetricName;
+import com.yammer.metrics.core.Timer;
+import org.apache.kafka.common.utils.Sanitizer;
+
+public class KafkaMetricsGroup {
+ private final Class<?> klass;
+
+ public KafkaMetricsGroup(Class<?> klass) {
+ this.klass = klass;
+ }
+
+ /**
+ * Creates a new MetricName object for gauges, meters, etc. created for
this
+ * metrics group.
+ * @param name Descriptive name of the metric.
+ * @param tags Additional attributes which mBean will have.
+ * @return Sanitized metric name object.
+ */
+ public MetricName metricName(String name, Map<String, String> tags) {
+ String pkg = klass.getPackage() == null ? "" :
klass.getPackage().getName();
+ String simpleName = klass.getSimpleName().replaceAll("\\$$", "");
+ return explicitMetricName(pkg, simpleName, name, tags);
+ }
+
+ public static MetricName explicitMetricName(String group, String typeName,
+ String name, Map<String,
String> tags) {
+ StringBuilder nameBuilder = new StringBuilder(100);
+ nameBuilder.append(group);
+ nameBuilder.append(":type=");
+ nameBuilder.append(typeName);
+
+ if (!name.isEmpty()) {
+ nameBuilder.append(",name=");
+ nameBuilder.append(name);
+ }
+
+ String scope = toScope(tags).orElse(null);
+ Optional<String> tagsName = toMBeanName(tags);
+ tagsName.ifPresent(s -> nameBuilder.append(",").append(s));
+
+ return new MetricName(group, typeName, name, scope,
nameBuilder.toString());
+ }
+
+ public final <T> Gauge<T> newGauge(String name, Gauge<T> metric,
Map<String, String> tags) {
+ return KafkaYammerMetrics.defaultRegistry().newGauge(metricName(name,
tags), metric);
+ }
+
+ public final <T> Gauge<T> newGauge(String name, Gauge<T> metric) {
+ return newGauge(name, metric, Collections.emptyMap());
+ }
+
+ public final Meter newMeter(String name, String eventType,
+ TimeUnit timeUnit, Map<String, String> tags) {
+ return KafkaYammerMetrics.defaultRegistry().newMeter(metricName(name,
tags), eventType, timeUnit);
+ }
+
+ public final Meter newMeter(String name, String eventType,
+ TimeUnit timeUnit) {
+ return newMeter(name, eventType, timeUnit, Collections.emptyMap());
+ }
+
+ public final Meter newMeter(MetricName metricName, String eventType,
TimeUnit timeUnit) {
+ return KafkaYammerMetrics.defaultRegistry().newMeter(metricName,
eventType, timeUnit);
+ }
+
+ public final Histogram newHistogram(String name, boolean biased,
Map<String, String> tags) {
+ return
KafkaYammerMetrics.defaultRegistry().newHistogram(metricName(name, tags),
biased);
+ }
+
+ public final Histogram newHistogram(String name) {
+ return newHistogram(name, true, Collections.emptyMap());
+ }
+
+ public final Timer newTimer(String name, TimeUnit durationUnit, TimeUnit
rateUnit, Map<String, String> tags) {
+ return KafkaYammerMetrics.defaultRegistry().newTimer(metricName(name,
tags), durationUnit, rateUnit);
+ }
+
+ public final Timer newTimer(String name, TimeUnit durationUnit, TimeUnit
rateUnit) {
+ return newTimer(name, durationUnit, rateUnit, Collections.emptyMap());
+ }
+
+ public final void removeMetric(String name, Map<String, String> tags) {
+ KafkaYammerMetrics.defaultRegistry().removeMetric(metricName(name,
tags));
+ }
+
+ public final void removeMetric(String name) {
+ removeMetric(name, Collections.emptyMap());
+ }
+
+ private static Optional<String> toMBeanName(Map<String, String> tags) {
+ List<Map.Entry<String, String>> filteredTags = tags.entrySet().stream()
+ .filter(entry -> !entry.getValue().equals(""))
+ .collect(Collectors.toList());
+ if (!filteredTags.isEmpty()) {
+ String tagsString = filteredTags.stream()
+ .map(entry -> entry.getKey() + "=" +
Sanitizer.jmxSanitize(entry.getValue()))
+ .collect(Collectors.joining(","));
+ return Optional.of(tagsString);
+ } else {
+ return Optional.empty();
+ }
+ }
+
+ private static Optional<String> toScope(Map<String, String> tags) {
+ List<Map.Entry<String, String>> filteredTags = tags.entrySet().stream()
+ .filter(entry -> !entry.getValue().equals(""))
+ .collect(Collectors.toList());
+ if (!filteredTags.isEmpty()) {
+ // convert dot to _ since reporters like Graphite typically use
dot to represent hierarchy
+ String tagsString = filteredTags.stream()
+ .sorted(Map.Entry.comparingByKey())
+ .map(entry -> entry.getKey() + "." +
entry.getValue().replaceAll("\\.", "_"))
+ .collect(Collectors.joining("."));
+ return Optional.of(tagsString);
+ } else {
+ return Optional.empty();
+ }
+ }
+}