This is an automated email from the ASF dual-hosted git repository.
chia7712 pushed a commit to branch trunk
in repository https://gitbox.apache.org/repos/asf/kafka.git
The following commit(s) were added to refs/heads/trunk by this push:
new c8fe551139c KAFKA-19030 Remove metricNamePrefix from RequestChannel
(#19374)
c8fe551139c is described below
commit c8fe551139ca106165c0c0b42f6c7b1db03f490d
Author: Parker Chang <[email protected]>
AuthorDate: Sat Apr 12 23:22:40 2025 +0800
KAFKA-19030 Remove metricNamePrefix from RequestChannel (#19374)
As described in the JIRA ticket, `controlPlaneRequestChannelOpt` was
removed from KRaft mode, so there's no need to use the metrics prefix
anymore.
This change removes `metricNamePrefix` from RequestChannel and the
related files.
It also removes `DataPlaneAcceptor#MetricPrefix`, since
`DataPlaneAcceptor` is the only implementation of `Acceptor`.
Since the implementation of KIP-291 is essentially removed, we can also
remove `logAndThreadNamePrefix` and `DataPlaneAcceptor#ThreadPrefix`.
Reviewers: PoAn Yang <[email protected]>, Ken Huang
<[email protected]>, Chia-Ping Tsai <[email protected]>
---
.../main/scala/kafka/network/RequestChannel.scala | 11 ++++-------
core/src/main/scala/kafka/network/SocketServer.scala | 20 ++++++--------------
core/src/main/scala/kafka/server/BrokerServer.scala | 5 ++---
.../main/scala/kafka/server/ControllerServer.scala | 5 ++---
.../scala/kafka/server/KafkaRequestHandler.scala | 5 ++---
core/src/main/scala/kafka/tools/TestRaftServer.scala | 5 ++---
.../scala/kafka/server/KafkaRequestHandlerTest.scala | 8 ++++----
7 files changed, 22 insertions(+), 37 deletions(-)
diff --git a/core/src/main/scala/kafka/network/RequestChannel.scala
b/core/src/main/scala/kafka/network/RequestChannel.scala
index a16e03a1916..dfb96ef8e11 100644
--- a/core/src/main/scala/kafka/network/RequestChannel.scala
+++ b/core/src/main/scala/kafka/network/RequestChannel.scala
@@ -340,7 +340,6 @@ object RequestChannel extends Logging {
}
class RequestChannel(val queueSize: Int,
- val metricNamePrefix: String,
time: Time,
val metrics: RequestChannelMetrics) {
import RequestChannel._
@@ -349,13 +348,11 @@ class RequestChannel(val queueSize: Int,
private val requestQueue = new ArrayBlockingQueue[BaseRequest](queueSize)
private val processors = new ConcurrentHashMap[Int, Processor]()
- private val requestQueueSizeMetricName =
metricNamePrefix.concat(RequestQueueSizeMetric)
- private val responseQueueSizeMetricName =
metricNamePrefix.concat(ResponseQueueSizeMetric)
private val callbackQueue = new ArrayBlockingQueue[BaseRequest](queueSize)
- metricsGroup.newGauge(requestQueueSizeMetricName, () => requestQueue.size)
+ metricsGroup.newGauge(RequestQueueSizeMetric, () => requestQueue.size)
- metricsGroup.newGauge(responseQueueSizeMetricName, () => {
+ metricsGroup.newGauge(ResponseQueueSizeMetric, () => {
processors.values.asScala.foldLeft(0) {(total, processor) =>
total + processor.responseQueueSize
}
@@ -365,13 +362,13 @@ class RequestChannel(val queueSize: Int,
if (processors.putIfAbsent(processor.id, processor) != null)
warn(s"Unexpected processor with processorId ${processor.id}")
- metricsGroup.newGauge(responseQueueSizeMetricName, () =>
processor.responseQueueSize,
+ metricsGroup.newGauge(ResponseQueueSizeMetric, () =>
processor.responseQueueSize,
Map(ProcessorMetricTag -> processor.id.toString).asJava)
}
def removeProcessor(processorId: Int): Unit = {
processors.remove(processorId)
- metricsGroup.removeMetric(responseQueueSizeMetricName,
Map(ProcessorMetricTag -> processorId.toString).asJava)
+ metricsGroup.removeMetric(ResponseQueueSizeMetric, Map(ProcessorMetricTag
-> processorId.toString).asJava)
}
/** Send a request to be handled, potentially blocking until there is room
in the queue for the request */
diff --git a/core/src/main/scala/kafka/network/SocketServer.scala
b/core/src/main/scala/kafka/network/SocketServer.scala
index c520b17fa06..79cd0bc8ce2 100644
--- a/core/src/main/scala/kafka/network/SocketServer.scala
+++ b/core/src/main/scala/kafka/network/SocketServer.scala
@@ -97,7 +97,7 @@ class SocketServer(
private val memoryPool = if (config.queuedMaxBytes > 0) new
SimpleMemoryPool(config.queuedMaxBytes, config.socketRequestMaxBytes, false,
memoryPoolSensor) else MemoryPool.NONE
// data-plane
private[network] val dataPlaneAcceptors = new ConcurrentHashMap[EndPoint,
DataPlaneAcceptor]()
- val dataPlaneRequestChannel = new RequestChannel(maxQueuedRequests,
DataPlaneAcceptor.MetricPrefix, time, apiVersionManager.newRequestMetrics)
+ val dataPlaneRequestChannel = new RequestChannel(maxQueuedRequests, time,
apiVersionManager.newRequestMetrics)
private[this] val nextProcessorId: AtomicInteger = new AtomicInteger(0)
val connectionQuotas = new ConnectionQuotas(config, time, metrics)
@@ -113,7 +113,7 @@ class SocketServer(
private var stopped = false
// Socket server metrics
-
metricsGroup.newGauge(s"${DataPlaneAcceptor.MetricPrefix}NetworkProcessorAvgIdlePercent",
() => SocketServer.this.synchronized {
+ metricsGroup.newGauge(s"NetworkProcessorAvgIdlePercent", () =>
SocketServer.this.synchronized {
val dataPlaneProcessors = dataPlaneAcceptors.asScala.values.flatMap(a =>
a.processors)
val ioWaitRatioMetricNames = dataPlaneProcessors.map { p =>
metrics.metricName("io-wait-ratio", MetricsGroup, p.metricTags)
@@ -129,7 +129,7 @@ class SocketServer(
metricsGroup.newGauge("MemoryPoolAvailable", () =>
memoryPool.availableMemory)
metricsGroup.newGauge("MemoryPoolUsed", () => memoryPool.size() -
memoryPool.availableMemory)
-
metricsGroup.newGauge(s"${DataPlaneAcceptor.MetricPrefix}ExpiredConnectionsKilledCount",
() => SocketServer.this.synchronized {
+ metricsGroup.newGauge(s"ExpiredConnectionsKilledCount", () =>
SocketServer.this.synchronized {
val dataPlaneProcessors = dataPlaneAcceptors.asScala.values.flatMap(a =>
a.processors)
val expiredConnectionsKilledCountMetricNames = dataPlaneProcessors.map { p
=>
metrics.metricName("expired-connections-killed-count", MetricsGroup,
p.metricTags)
@@ -370,8 +370,6 @@ object SocketServer {
}
object DataPlaneAcceptor {
- val ThreadPrefix: String = "data-plane"
- val MetricPrefix: String = ""
val ListenerReconfigurableConfigs: Set[String] =
Set(SocketServerConfigs.NUM_NETWORK_THREADS_CONFIG)
}
@@ -402,9 +400,6 @@ class DataPlaneAcceptor(socketServer: SocketServer,
memoryPool,
apiVersionManager) with ListenerReconfigurable {
- override def metricPrefix(): String = DataPlaneAcceptor.MetricPrefix
- override def threadPrefix(): String = DataPlaneAcceptor.ThreadPrefix
-
/**
* Returns the listener name associated with this reconfigurable.
Listener-specific
* configs corresponding to this listener name are provided for
reconfiguration.
@@ -495,9 +490,6 @@ private[kafka] abstract class Acceptor(val socketServer:
SocketServer,
val shouldRun = new AtomicBoolean(true)
- def metricPrefix(): String
- def threadPrefix(): String
-
private val sendBufferSize = config.socketSendBufferBytes
private val recvBufferSize = config.socketReceiveBufferBytes
private val listenBacklogSize = config.socketListenBacklogSize
@@ -522,7 +514,7 @@ private[kafka] abstract class Acceptor(val socketServer:
SocketServer,
// Build the metric name explicitly in order to keep the existing name for
compatibility
private val backwardCompatibilityMetricGroup = new
KafkaMetricsGroup("kafka.network", "Acceptor")
private val blockedPercentMeterMetricName =
backwardCompatibilityMetricGroup.metricName(
- s"${metricPrefix()}AcceptorBlockedPercent",
+ "AcceptorBlockedPercent",
Map(ListenerMetricTag -> endPoint.listenerName.value).asJava)
private val blockedPercentMeter =
metricsGroup.newMeter(blockedPercentMeterMetricName,"blocked time",
TimeUnit.NANOSECONDS)
private var currentProcessorIndex = 0
@@ -531,7 +523,7 @@ private[kafka] abstract class Acceptor(val socketServer:
SocketServer,
private[network] val startedFuture = new CompletableFuture[Void]()
val thread: KafkaThread = KafkaThread.nonDaemon(
-
s"${threadPrefix()}-kafka-socket-acceptor-${endPoint.listenerName}-${endPoint.securityProtocol}-${endPoint.port}",
+
s"data-plane-kafka-socket-acceptor-${endPoint.listenerName}-${endPoint.securityProtocol}-${endPoint.port}",
this)
def start(): Unit = synchronized {
@@ -769,7 +761,7 @@ private[kafka] abstract class Acceptor(val socketServer:
SocketServer,
listenerName: ListenerName,
securityProtocol: SecurityProtocol,
connectionDisconnectListeners:
Seq[ConnectionDisconnectListener]): Processor = {
- val name =
s"${threadPrefix()}-kafka-network-thread-$nodeId-${endPoint.listenerName}-${endPoint.securityProtocol}-$id"
+ val name =
s"data-plane-kafka-network-thread-$nodeId-${endPoint.listenerName}-${endPoint.securityProtocol}-$id"
new Processor(id,
time,
config.socketRequestMaxBytes,
diff --git a/core/src/main/scala/kafka/server/BrokerServer.scala
b/core/src/main/scala/kafka/server/BrokerServer.scala
index 31ebc14c960..5f52e6abdcb 100644
--- a/core/src/main/scala/kafka/server/BrokerServer.scala
+++ b/core/src/main/scala/kafka/server/BrokerServer.scala
@@ -21,7 +21,7 @@ import kafka.coordinator.group.{CoordinatorLoaderImpl,
CoordinatorPartitionWrite
import kafka.coordinator.transaction.TransactionCoordinator
import kafka.log.LogManager
import kafka.log.remote.RemoteLogManager
-import kafka.network.{DataPlaneAcceptor, SocketServer}
+import kafka.network.SocketServer
import kafka.raft.KafkaRaftManager
import kafka.server.metadata._
import kafka.server.share.{ShareCoordinatorMetadataCacheHelperImpl,
SharePartitionManager}
@@ -470,8 +470,7 @@ class BrokerServer(
dataPlaneRequestHandlerPool = new KafkaRequestHandlerPool(config.nodeId,
socketServer.dataPlaneRequestChannel, dataPlaneRequestProcessor, time,
- config.numIoThreads,
s"${DataPlaneAcceptor.MetricPrefix}RequestHandlerAvgIdlePercent",
- DataPlaneAcceptor.ThreadPrefix)
+ config.numIoThreads, "RequestHandlerAvgIdlePercent")
// Start RemoteLogManager before initializing broker metadata publishers.
remoteLogManagerOpt.foreach { rlm =>
diff --git a/core/src/main/scala/kafka/server/ControllerServer.scala
b/core/src/main/scala/kafka/server/ControllerServer.scala
index e7537878ca6..3ddf97d2705 100644
--- a/core/src/main/scala/kafka/server/ControllerServer.scala
+++ b/core/src/main/scala/kafka/server/ControllerServer.scala
@@ -17,7 +17,7 @@
package kafka.server
-import kafka.network.{DataPlaneAcceptor, SocketServer}
+import kafka.network.SocketServer
import kafka.raft.KafkaRaftManager
import kafka.server.QuotaFactory.QuotaManagers
@@ -285,8 +285,7 @@ class ControllerServer(
controllerApis,
time,
config.numIoThreads,
- s"${DataPlaneAcceptor.MetricPrefix}RequestHandlerAvgIdlePercent",
- DataPlaneAcceptor.ThreadPrefix,
+ "RequestHandlerAvgIdlePercent",
"controller")
// Set up the metadata cache publisher.
diff --git a/core/src/main/scala/kafka/server/KafkaRequestHandler.scala
b/core/src/main/scala/kafka/server/KafkaRequestHandler.scala
index fa251e27529..815fe4966eb 100755
--- a/core/src/main/scala/kafka/server/KafkaRequestHandler.scala
+++ b/core/src/main/scala/kafka/server/KafkaRequestHandler.scala
@@ -199,7 +199,6 @@ class KafkaRequestHandlerPool(
time: Time,
numThreads: Int,
requestHandlerAvgIdleMetricName: String,
- logAndThreadNamePrefix : String,
nodeName: String = "broker"
) extends Logging {
private val metricsGroup = new KafkaMetricsGroup(this.getClass)
@@ -208,7 +207,7 @@ class KafkaRequestHandlerPool(
/* a meter to track the average free capacity of the request handlers */
private val aggregateIdleMeter =
metricsGroup.newMeter(requestHandlerAvgIdleMetricName, "percent",
TimeUnit.NANOSECONDS)
- this.logIdent = s"[$logAndThreadNamePrefix Kafka Request Handler on
${nodeName.capitalize} $brokerId] "
+ this.logIdent = s"[data-plane Kafka Request Handler on
${nodeName.capitalize} $brokerId] "
val runnables = new mutable.ArrayBuffer[KafkaRequestHandler](numThreads)
for (i <- 0 until numThreads) {
createHandler(i)
@@ -216,7 +215,7 @@ class KafkaRequestHandlerPool(
def createHandler(id: Int): Unit = synchronized {
runnables += new KafkaRequestHandler(id, brokerId, aggregateIdleMeter,
threadPoolSize, requestChannel, apis, time, nodeName)
- KafkaThread.daemon(logAndThreadNamePrefix + "-kafka-request-handler-" +
id, runnables(id)).start()
+ KafkaThread.daemon("data-plane-kafka-request-handler-" + id,
runnables(id)).start()
}
def resizeThreadPool(newSize: Int): Unit = synchronized {
diff --git a/core/src/main/scala/kafka/tools/TestRaftServer.scala
b/core/src/main/scala/kafka/tools/TestRaftServer.scala
index 418a276bbd1..69d296fe467 100644
--- a/core/src/main/scala/kafka/tools/TestRaftServer.scala
+++ b/core/src/main/scala/kafka/tools/TestRaftServer.scala
@@ -21,7 +21,7 @@ import java.net.InetSocketAddress
import java.util.concurrent.atomic.{AtomicInteger, AtomicLong}
import java.util.concurrent.{CompletableFuture, CountDownLatch,
LinkedBlockingDeque, TimeUnit}
import joptsimple.{OptionException, OptionSpec}
-import kafka.network.{DataPlaneAcceptor, SocketServer}
+import kafka.network.SocketServer
import kafka.raft.{DefaultExternalKRaftMetrics, KafkaRaftManager, RaftManager}
import kafka.server.{KafkaConfig, KafkaRequestHandlerPool,
SimpleApiVersionManager}
import kafka.utils.{CoreUtils, Logging}
@@ -130,8 +130,7 @@ class TestRaftServer(
requestHandler,
time,
config.numIoThreads,
- s"${DataPlaneAcceptor.MetricPrefix}RequestHandlerAvgIdlePercent",
- DataPlaneAcceptor.ThreadPrefix
+ "RequestHandlerAvgIdlePercent"
)
workloadGenerator.start()
diff --git a/core/src/test/scala/kafka/server/KafkaRequestHandlerTest.scala
b/core/src/test/scala/kafka/server/KafkaRequestHandlerTest.scala
index 495ad0b1c00..6e8efa28b86 100644
--- a/core/src/test/scala/kafka/server/KafkaRequestHandlerTest.scala
+++ b/core/src/test/scala/kafka/server/KafkaRequestHandlerTest.scala
@@ -57,7 +57,7 @@ class KafkaRequestHandlerTest {
val time = new MockTime()
val startTime = time.nanoseconds()
val metrics = new RequestChannelMetrics(Collections.emptySet[ApiKeys])
- val requestChannel = new RequestChannel(10, "", time, metrics)
+ val requestChannel = new RequestChannel(10, time, metrics)
val apiHandler = mock(classOf[ApiRequestHandler])
try {
val handler = new KafkaRequestHandler(0, 0, mock(classOf[Meter]), new
AtomicInteger(1), requestChannel, apiHandler, time)
@@ -95,7 +95,7 @@ class KafkaRequestHandlerTest {
val time = new MockTime()
val metrics = mock(classOf[RequestChannelMetrics])
val apiHandler = mock(classOf[ApiRequestHandler])
- val requestChannel = new RequestChannel(10, "", time, metrics)
+ val requestChannel = new RequestChannel(10, time, metrics)
val handler = new KafkaRequestHandler(0, 0, mock(classOf[Meter]), new
AtomicInteger(1), requestChannel, apiHandler, time)
var handledCount = 0
@@ -131,7 +131,7 @@ class KafkaRequestHandlerTest {
val time = new MockTime()
val metrics = mock(classOf[RequestChannelMetrics])
val apiHandler = mock(classOf[ApiRequestHandler])
- val requestChannel = new RequestChannel(10, "", time, metrics)
+ val requestChannel = new RequestChannel(10, time, metrics)
val handler = new KafkaRequestHandler(0, 0, mock(classOf[Meter]), new
AtomicInteger(1), requestChannel, apiHandler, time)
val originalRequestLocal = mock(classOf[RequestLocal])
@@ -165,7 +165,7 @@ class KafkaRequestHandlerTest {
val time = new MockTime()
val metrics = mock(classOf[RequestChannelMetrics])
val apiHandler = mock(classOf[ApiRequestHandler])
- val requestChannel = new RequestChannel(10, "", time, metrics)
+ val requestChannel = new RequestChannel(10, time, metrics)
val handler = new KafkaRequestHandler(0, 0, mock(classOf[Meter]), new
AtomicInteger(1), requestChannel, apiHandler, time)
val originalRequestLocal = mock(classOf[RequestLocal])