This is an automated email from the ASF dual-hosted git repository.

chia7712 pushed a commit to branch trunk
in repository https://gitbox.apache.org/repos/asf/kafka.git


The following commit(s) were added to refs/heads/trunk by this push:
     new c8fe551139c KAFKA-19030 Remove metricNamePrefix from RequestChannel 
(#19374)
c8fe551139c is described below

commit c8fe551139ca106165c0c0b42f6c7b1db03f490d
Author: Parker Chang <[email protected]>
AuthorDate: Sat Apr 12 23:22:40 2025 +0800

    KAFKA-19030 Remove metricNamePrefix from RequestChannel (#19374)
    
    As described in the JIRA ticket, `controlPlaneRequestChannelOpt` was
    removed from KRaft mode, so there's no need to use the metrics prefix
    anymore.
    
    This change removes `metricNamePrefix` from RequestChannel and the
    related files.
    
    It also removes `DataPlaneAcceptor#MetricPrefix`, since
    `DataPlaneAcceptor`  is the only implementation of `Acceptor`.
    
    Since the implementation of KIP-291 is essentially removed, we can also
    remove `logAndThreadNamePrefix` and `DataPlaneAcceptor#ThreadPrefix`.
    
    Reviewers: PoAn Yang <[email protected]>, Ken Huang
    <[email protected]>, Chia-Ping Tsai <[email protected]>
---
 .../main/scala/kafka/network/RequestChannel.scala    | 11 ++++-------
 core/src/main/scala/kafka/network/SocketServer.scala | 20 ++++++--------------
 core/src/main/scala/kafka/server/BrokerServer.scala  |  5 ++---
 .../main/scala/kafka/server/ControllerServer.scala   |  5 ++---
 .../scala/kafka/server/KafkaRequestHandler.scala     |  5 ++---
 core/src/main/scala/kafka/tools/TestRaftServer.scala |  5 ++---
 .../scala/kafka/server/KafkaRequestHandlerTest.scala |  8 ++++----
 7 files changed, 22 insertions(+), 37 deletions(-)

diff --git a/core/src/main/scala/kafka/network/RequestChannel.scala 
b/core/src/main/scala/kafka/network/RequestChannel.scala
index a16e03a1916..dfb96ef8e11 100644
--- a/core/src/main/scala/kafka/network/RequestChannel.scala
+++ b/core/src/main/scala/kafka/network/RequestChannel.scala
@@ -340,7 +340,6 @@ object RequestChannel extends Logging {
 }
 
 class RequestChannel(val queueSize: Int,
-                     val metricNamePrefix: String,
                      time: Time,
                      val metrics: RequestChannelMetrics) {
   import RequestChannel._
@@ -349,13 +348,11 @@ class RequestChannel(val queueSize: Int,
 
   private val requestQueue = new ArrayBlockingQueue[BaseRequest](queueSize)
   private val processors = new ConcurrentHashMap[Int, Processor]()
-  private val requestQueueSizeMetricName = 
metricNamePrefix.concat(RequestQueueSizeMetric)
-  private val responseQueueSizeMetricName = 
metricNamePrefix.concat(ResponseQueueSizeMetric)
   private val callbackQueue = new ArrayBlockingQueue[BaseRequest](queueSize)
 
-  metricsGroup.newGauge(requestQueueSizeMetricName, () => requestQueue.size)
+  metricsGroup.newGauge(RequestQueueSizeMetric, () => requestQueue.size)
 
-  metricsGroup.newGauge(responseQueueSizeMetricName, () => {
+  metricsGroup.newGauge(ResponseQueueSizeMetric, () => {
     processors.values.asScala.foldLeft(0) {(total, processor) =>
       total + processor.responseQueueSize
     }
@@ -365,13 +362,13 @@ class RequestChannel(val queueSize: Int,
     if (processors.putIfAbsent(processor.id, processor) != null)
       warn(s"Unexpected processor with processorId ${processor.id}")
 
-    metricsGroup.newGauge(responseQueueSizeMetricName, () => 
processor.responseQueueSize,
+    metricsGroup.newGauge(ResponseQueueSizeMetric, () => 
processor.responseQueueSize,
       Map(ProcessorMetricTag -> processor.id.toString).asJava)
   }
 
   def removeProcessor(processorId: Int): Unit = {
     processors.remove(processorId)
-    metricsGroup.removeMetric(responseQueueSizeMetricName, 
Map(ProcessorMetricTag -> processorId.toString).asJava)
+    metricsGroup.removeMetric(ResponseQueueSizeMetric, Map(ProcessorMetricTag 
-> processorId.toString).asJava)
   }
 
   /** Send a request to be handled, potentially blocking until there is room 
in the queue for the request */
diff --git a/core/src/main/scala/kafka/network/SocketServer.scala 
b/core/src/main/scala/kafka/network/SocketServer.scala
index c520b17fa06..79cd0bc8ce2 100644
--- a/core/src/main/scala/kafka/network/SocketServer.scala
+++ b/core/src/main/scala/kafka/network/SocketServer.scala
@@ -97,7 +97,7 @@ class SocketServer(
   private val memoryPool = if (config.queuedMaxBytes > 0) new 
SimpleMemoryPool(config.queuedMaxBytes, config.socketRequestMaxBytes, false, 
memoryPoolSensor) else MemoryPool.NONE
   // data-plane
   private[network] val dataPlaneAcceptors = new ConcurrentHashMap[EndPoint, 
DataPlaneAcceptor]()
-  val dataPlaneRequestChannel = new RequestChannel(maxQueuedRequests, 
DataPlaneAcceptor.MetricPrefix, time, apiVersionManager.newRequestMetrics)
+  val dataPlaneRequestChannel = new RequestChannel(maxQueuedRequests, time, 
apiVersionManager.newRequestMetrics)
 
   private[this] val nextProcessorId: AtomicInteger = new AtomicInteger(0)
   val connectionQuotas = new ConnectionQuotas(config, time, metrics)
@@ -113,7 +113,7 @@ class SocketServer(
   private var stopped = false
 
   // Socket server metrics
-  
metricsGroup.newGauge(s"${DataPlaneAcceptor.MetricPrefix}NetworkProcessorAvgIdlePercent",
 () => SocketServer.this.synchronized {
+  metricsGroup.newGauge(s"NetworkProcessorAvgIdlePercent", () => 
SocketServer.this.synchronized {
     val dataPlaneProcessors = dataPlaneAcceptors.asScala.values.flatMap(a => 
a.processors)
     val ioWaitRatioMetricNames = dataPlaneProcessors.map { p =>
       metrics.metricName("io-wait-ratio", MetricsGroup, p.metricTags)
@@ -129,7 +129,7 @@ class SocketServer(
 
   metricsGroup.newGauge("MemoryPoolAvailable", () => 
memoryPool.availableMemory)
   metricsGroup.newGauge("MemoryPoolUsed", () => memoryPool.size() - 
memoryPool.availableMemory)
-  
metricsGroup.newGauge(s"${DataPlaneAcceptor.MetricPrefix}ExpiredConnectionsKilledCount",
 () => SocketServer.this.synchronized {
+  metricsGroup.newGauge(s"ExpiredConnectionsKilledCount", () => 
SocketServer.this.synchronized {
     val dataPlaneProcessors = dataPlaneAcceptors.asScala.values.flatMap(a => 
a.processors)
     val expiredConnectionsKilledCountMetricNames = dataPlaneProcessors.map { p 
=>
       metrics.metricName("expired-connections-killed-count", MetricsGroup, 
p.metricTags)
@@ -370,8 +370,6 @@ object SocketServer {
 }
 
 object DataPlaneAcceptor {
-  val ThreadPrefix: String = "data-plane"
-  val MetricPrefix: String = ""
   val ListenerReconfigurableConfigs: Set[String] = 
Set(SocketServerConfigs.NUM_NETWORK_THREADS_CONFIG)
 }
 
@@ -402,9 +400,6 @@ class DataPlaneAcceptor(socketServer: SocketServer,
                    memoryPool,
                    apiVersionManager) with ListenerReconfigurable {
 
-  override def metricPrefix(): String = DataPlaneAcceptor.MetricPrefix
-  override def threadPrefix(): String = DataPlaneAcceptor.ThreadPrefix
-
   /**
    * Returns the listener name associated with this reconfigurable. 
Listener-specific
    * configs corresponding to this listener name are provided for 
reconfiguration.
@@ -495,9 +490,6 @@ private[kafka] abstract class Acceptor(val socketServer: 
SocketServer,
 
   val shouldRun = new AtomicBoolean(true)
 
-  def metricPrefix(): String
-  def threadPrefix(): String
-
   private val sendBufferSize = config.socketSendBufferBytes
   private val recvBufferSize = config.socketReceiveBufferBytes
   private val listenBacklogSize = config.socketListenBacklogSize
@@ -522,7 +514,7 @@ private[kafka] abstract class Acceptor(val socketServer: 
SocketServer,
   // Build the metric name explicitly in order to keep the existing name for 
compatibility
   private val backwardCompatibilityMetricGroup = new 
KafkaMetricsGroup("kafka.network", "Acceptor")
   private val blockedPercentMeterMetricName = 
backwardCompatibilityMetricGroup.metricName(
-    s"${metricPrefix()}AcceptorBlockedPercent",
+    "AcceptorBlockedPercent",
     Map(ListenerMetricTag -> endPoint.listenerName.value).asJava)
   private val blockedPercentMeter = 
metricsGroup.newMeter(blockedPercentMeterMetricName,"blocked time", 
TimeUnit.NANOSECONDS)
   private var currentProcessorIndex = 0
@@ -531,7 +523,7 @@ private[kafka] abstract class Acceptor(val socketServer: 
SocketServer,
   private[network] val startedFuture = new CompletableFuture[Void]()
 
   val thread: KafkaThread = KafkaThread.nonDaemon(
-    
s"${threadPrefix()}-kafka-socket-acceptor-${endPoint.listenerName}-${endPoint.securityProtocol}-${endPoint.port}",
+    
s"data-plane-kafka-socket-acceptor-${endPoint.listenerName}-${endPoint.securityProtocol}-${endPoint.port}",
     this)
 
   def start(): Unit = synchronized {
@@ -769,7 +761,7 @@ private[kafka] abstract class Acceptor(val socketServer: 
SocketServer,
                    listenerName: ListenerName,
                    securityProtocol: SecurityProtocol,
                    connectionDisconnectListeners: 
Seq[ConnectionDisconnectListener]): Processor = {
-    val name = 
s"${threadPrefix()}-kafka-network-thread-$nodeId-${endPoint.listenerName}-${endPoint.securityProtocol}-$id"
+    val name = 
s"data-plane-kafka-network-thread-$nodeId-${endPoint.listenerName}-${endPoint.securityProtocol}-$id"
     new Processor(id,
                   time,
                   config.socketRequestMaxBytes,
diff --git a/core/src/main/scala/kafka/server/BrokerServer.scala 
b/core/src/main/scala/kafka/server/BrokerServer.scala
index 31ebc14c960..5f52e6abdcb 100644
--- a/core/src/main/scala/kafka/server/BrokerServer.scala
+++ b/core/src/main/scala/kafka/server/BrokerServer.scala
@@ -21,7 +21,7 @@ import kafka.coordinator.group.{CoordinatorLoaderImpl, 
CoordinatorPartitionWrite
 import kafka.coordinator.transaction.TransactionCoordinator
 import kafka.log.LogManager
 import kafka.log.remote.RemoteLogManager
-import kafka.network.{DataPlaneAcceptor, SocketServer}
+import kafka.network.SocketServer
 import kafka.raft.KafkaRaftManager
 import kafka.server.metadata._
 import kafka.server.share.{ShareCoordinatorMetadataCacheHelperImpl, 
SharePartitionManager}
@@ -470,8 +470,7 @@ class BrokerServer(
 
       dataPlaneRequestHandlerPool = new KafkaRequestHandlerPool(config.nodeId,
         socketServer.dataPlaneRequestChannel, dataPlaneRequestProcessor, time,
-        config.numIoThreads, 
s"${DataPlaneAcceptor.MetricPrefix}RequestHandlerAvgIdlePercent",
-        DataPlaneAcceptor.ThreadPrefix)
+        config.numIoThreads, "RequestHandlerAvgIdlePercent")
 
       // Start RemoteLogManager before initializing broker metadata publishers.
       remoteLogManagerOpt.foreach { rlm =>
diff --git a/core/src/main/scala/kafka/server/ControllerServer.scala 
b/core/src/main/scala/kafka/server/ControllerServer.scala
index e7537878ca6..3ddf97d2705 100644
--- a/core/src/main/scala/kafka/server/ControllerServer.scala
+++ b/core/src/main/scala/kafka/server/ControllerServer.scala
@@ -17,7 +17,7 @@
 
 package kafka.server
 
-import kafka.network.{DataPlaneAcceptor, SocketServer}
+import kafka.network.SocketServer
 import kafka.raft.KafkaRaftManager
 import kafka.server.QuotaFactory.QuotaManagers
 
@@ -285,8 +285,7 @@ class ControllerServer(
         controllerApis,
         time,
         config.numIoThreads,
-        s"${DataPlaneAcceptor.MetricPrefix}RequestHandlerAvgIdlePercent",
-        DataPlaneAcceptor.ThreadPrefix,
+        "RequestHandlerAvgIdlePercent",
         "controller")
 
       // Set up the metadata cache publisher.
diff --git a/core/src/main/scala/kafka/server/KafkaRequestHandler.scala 
b/core/src/main/scala/kafka/server/KafkaRequestHandler.scala
index fa251e27529..815fe4966eb 100755
--- a/core/src/main/scala/kafka/server/KafkaRequestHandler.scala
+++ b/core/src/main/scala/kafka/server/KafkaRequestHandler.scala
@@ -199,7 +199,6 @@ class KafkaRequestHandlerPool(
   time: Time,
   numThreads: Int,
   requestHandlerAvgIdleMetricName: String,
-  logAndThreadNamePrefix : String,
   nodeName: String = "broker"
 ) extends Logging {
   private val metricsGroup = new KafkaMetricsGroup(this.getClass)
@@ -208,7 +207,7 @@ class KafkaRequestHandlerPool(
   /* a meter to track the average free capacity of the request handlers */
   private val aggregateIdleMeter = 
metricsGroup.newMeter(requestHandlerAvgIdleMetricName, "percent", 
TimeUnit.NANOSECONDS)
 
-  this.logIdent = s"[$logAndThreadNamePrefix Kafka Request Handler on 
${nodeName.capitalize} $brokerId] "
+  this.logIdent = s"[data-plane Kafka Request Handler on 
${nodeName.capitalize} $brokerId] "
   val runnables = new mutable.ArrayBuffer[KafkaRequestHandler](numThreads)
   for (i <- 0 until numThreads) {
     createHandler(i)
@@ -216,7 +215,7 @@ class KafkaRequestHandlerPool(
 
   def createHandler(id: Int): Unit = synchronized {
     runnables += new KafkaRequestHandler(id, brokerId, aggregateIdleMeter, 
threadPoolSize, requestChannel, apis, time, nodeName)
-    KafkaThread.daemon(logAndThreadNamePrefix + "-kafka-request-handler-" + 
id, runnables(id)).start()
+    KafkaThread.daemon("data-plane-kafka-request-handler-" + id, 
runnables(id)).start()
   }
 
   def resizeThreadPool(newSize: Int): Unit = synchronized {
diff --git a/core/src/main/scala/kafka/tools/TestRaftServer.scala 
b/core/src/main/scala/kafka/tools/TestRaftServer.scala
index 418a276bbd1..69d296fe467 100644
--- a/core/src/main/scala/kafka/tools/TestRaftServer.scala
+++ b/core/src/main/scala/kafka/tools/TestRaftServer.scala
@@ -21,7 +21,7 @@ import java.net.InetSocketAddress
 import java.util.concurrent.atomic.{AtomicInteger, AtomicLong}
 import java.util.concurrent.{CompletableFuture, CountDownLatch, 
LinkedBlockingDeque, TimeUnit}
 import joptsimple.{OptionException, OptionSpec}
-import kafka.network.{DataPlaneAcceptor, SocketServer}
+import kafka.network.SocketServer
 import kafka.raft.{DefaultExternalKRaftMetrics, KafkaRaftManager, RaftManager}
 import kafka.server.{KafkaConfig, KafkaRequestHandlerPool, 
SimpleApiVersionManager}
 import kafka.utils.{CoreUtils, Logging}
@@ -130,8 +130,7 @@ class TestRaftServer(
       requestHandler,
       time,
       config.numIoThreads,
-      s"${DataPlaneAcceptor.MetricPrefix}RequestHandlerAvgIdlePercent",
-      DataPlaneAcceptor.ThreadPrefix
+      "RequestHandlerAvgIdlePercent"
     )
 
     workloadGenerator.start()
diff --git a/core/src/test/scala/kafka/server/KafkaRequestHandlerTest.scala 
b/core/src/test/scala/kafka/server/KafkaRequestHandlerTest.scala
index 495ad0b1c00..6e8efa28b86 100644
--- a/core/src/test/scala/kafka/server/KafkaRequestHandlerTest.scala
+++ b/core/src/test/scala/kafka/server/KafkaRequestHandlerTest.scala
@@ -57,7 +57,7 @@ class KafkaRequestHandlerTest {
     val time = new MockTime()
     val startTime = time.nanoseconds()
     val metrics = new RequestChannelMetrics(Collections.emptySet[ApiKeys])
-    val requestChannel = new RequestChannel(10, "", time, metrics)
+    val requestChannel = new RequestChannel(10, time, metrics)
     val apiHandler = mock(classOf[ApiRequestHandler])
     try {
       val handler = new KafkaRequestHandler(0, 0, mock(classOf[Meter]), new 
AtomicInteger(1), requestChannel, apiHandler, time)
@@ -95,7 +95,7 @@ class KafkaRequestHandlerTest {
     val time = new MockTime()
     val metrics = mock(classOf[RequestChannelMetrics])
     val apiHandler = mock(classOf[ApiRequestHandler])
-    val requestChannel = new RequestChannel(10, "", time, metrics)
+    val requestChannel = new RequestChannel(10, time, metrics)
     val handler = new KafkaRequestHandler(0, 0, mock(classOf[Meter]), new 
AtomicInteger(1), requestChannel, apiHandler, time)
 
     var handledCount = 0
@@ -131,7 +131,7 @@ class KafkaRequestHandlerTest {
     val time = new MockTime()
     val metrics = mock(classOf[RequestChannelMetrics])
     val apiHandler = mock(classOf[ApiRequestHandler])
-    val requestChannel = new RequestChannel(10, "", time, metrics)
+    val requestChannel = new RequestChannel(10, time, metrics)
     val handler = new KafkaRequestHandler(0, 0, mock(classOf[Meter]), new 
AtomicInteger(1), requestChannel, apiHandler, time)
 
     val originalRequestLocal = mock(classOf[RequestLocal])
@@ -165,7 +165,7 @@ class KafkaRequestHandlerTest {
     val time = new MockTime()
     val metrics = mock(classOf[RequestChannelMetrics])
     val apiHandler = mock(classOf[ApiRequestHandler])
-    val requestChannel = new RequestChannel(10, "", time, metrics)
+    val requestChannel = new RequestChannel(10, time, metrics)
     val handler = new KafkaRequestHandler(0, 0, mock(classOf[Meter]), new 
AtomicInteger(1), requestChannel, apiHandler, time)
 
     val originalRequestLocal = mock(classOf[RequestLocal])

Reply via email to