This is an automated email from the ASF dual-hosted git repository.

cmccabe pushed a commit to branch cmccabe_2023-06-21_some_minor_fixes
in repository https://gitbox.apache.org/repos/asf/kafka.git

commit 72737ab52235466324e2f553e4442fba4dc9cc2c
Author: Colin P. McCabe <[email protected]>
AuthorDate: Wed Jun 21 19:41:16 2023 -0700

    MINOR: some minor cleanups for process startup
    
    Move registration of linux-disk-read-bytes, linux-disk-write-bytes metrics 
into a common function
    in LinuxIoMetricsCollector.scala. Also do it for isolated controller 
processes.
    
    Create authorizers using AuthorizerUtils.configureAuthorizer and close them 
using
    AuthorizerUtils.closeAuthorizer. This ensures that we configure the 
authorizers immediately after
    creating them.
---
 .../kafka/metrics/LinuxIoMetricsCollector.scala      | 20 +++++++++++++++++++-
 .../kafka/security/authorizer/AuthorizerUtils.scala  | 17 +++++++++++++++--
 core/src/main/scala/kafka/server/BrokerServer.scala  |  9 +++++----
 .../main/scala/kafka/server/ControllerServer.scala   | 15 ++++++---------
 core/src/main/scala/kafka/server/KafkaServer.scala   |  6 +++---
 core/src/main/scala/kafka/server/Server.scala        | 12 ++++++++++++
 6 files changed, 60 insertions(+), 19 deletions(-)

diff --git a/core/src/main/scala/kafka/metrics/LinuxIoMetricsCollector.scala 
b/core/src/main/scala/kafka/metrics/LinuxIoMetricsCollector.scala
index 5a41dbad73c..34dcaa312bf 100644
--- a/core/src/main/scala/kafka/metrics/LinuxIoMetricsCollector.scala
+++ b/core/src/main/scala/kafka/metrics/LinuxIoMetricsCollector.scala
@@ -17,9 +17,11 @@
 
 package kafka.metrics
 
-import java.nio.file.{Files, Paths}
+import com.yammer.metrics.core.{Gauge, MetricsRegistry}
 
+import java.nio.file.{Files, Paths}
 import org.apache.kafka.common.utils.Time
+import org.apache.kafka.server.metrics.KafkaYammerMetrics
 import org.slf4j.Logger
 
 import scala.jdk.CollectionConverters._
@@ -94,6 +96,22 @@ class LinuxIoMetricsCollector(procRoot: String, val time: 
Time, val logger: Logg
       false
     }
   }
+
+  def maybeRegisterMetrics(registry: MetricsRegistry): Unit = {
+    def registerGauge(name: String, gauge: Gauge[Long]): Unit = {
+      val metricName = KafkaYammerMetrics.getMetricName(
+        "kafka.server",
+        "KafkaServer",
+        name
+      )
+      registry.newGauge(metricName, gauge)
+    }
+
+    if (usable()) {
+      registerGauge("linux-disk-read-bytes", () => readBytes())
+      registerGauge("linux-disk-write-bytes", () => writeBytes())
+    }
+  }
 }
 
 object LinuxIoMetricsCollector {
diff --git 
a/core/src/main/scala/kafka/security/authorizer/AuthorizerUtils.scala 
b/core/src/main/scala/kafka/security/authorizer/AuthorizerUtils.scala
index 0e417d677eb..db61e93c486 100644
--- a/core/src/main/scala/kafka/security/authorizer/AuthorizerUtils.scala
+++ b/core/src/main/scala/kafka/security/authorizer/AuthorizerUtils.scala
@@ -18,20 +18,33 @@
 package kafka.security.authorizer
 
 import java.net.InetAddress
-
 import kafka.network.RequestChannel.Session
+import kafka.server.KafkaConfig
+import kafka.utils.{CoreUtils, Logging}
 import org.apache.kafka.common.resource.Resource
 import org.apache.kafka.common.security.auth.{KafkaPrincipal, SecurityProtocol}
 import org.apache.kafka.common.utils.Utils
 import org.apache.kafka.server.authorizer.{AuthorizableRequestContext, 
Authorizer}
 
 
-object AuthorizerUtils {
+object AuthorizerUtils extends Logging{
 
   def createAuthorizer(className: String): Authorizer = 
Utils.newInstance(className, classOf[Authorizer])
 
   def isClusterResource(name: String): Boolean = 
name.equals(Resource.CLUSTER_NAME)
 
+  def configureAuthorizer(config: KafkaConfig): Option[Authorizer] = {
+    val authorizerOpt = config.createNewAuthorizer()
+    authorizerOpt.foreach { authorizer =>
+      authorizer.configure(config.originals)
+    }
+    authorizerOpt
+  }
+
+  def closeAuthorizer(authorizer: Authorizer): Unit = {
+    CoreUtils.swallow(authorizer.close(), this)
+  }
+
   def sessionToRequestContext(session: Session): AuthorizableRequestContext = {
     new AuthorizableRequestContext {
       override def clientId(): String = ""
diff --git a/core/src/main/scala/kafka/server/BrokerServer.scala 
b/core/src/main/scala/kafka/server/BrokerServer.scala
index 2bf29c32d97..59f9c655445 100644
--- a/core/src/main/scala/kafka/server/BrokerServer.scala
+++ b/core/src/main/scala/kafka/server/BrokerServer.scala
@@ -25,6 +25,7 @@ import kafka.log.remote.RemoteLogManager
 import kafka.network.{DataPlaneAcceptor, SocketServer}
 import kafka.raft.KafkaRaftManager
 import kafka.security.CredentialProvider
+import kafka.security.authorizer.AuthorizerUtils
 import kafka.server.metadata.{BrokerMetadataPublisher, 
ClientQuotaMetadataManager, DynamicClientQuotaPublisher, 
DynamicConfigPublisher, KRaftMetadataCache, ScramPublisher}
 import kafka.utils.CoreUtils
 import org.apache.kafka.clients.NetworkClient
@@ -75,6 +76,8 @@ class BrokerServer(
   // Get raftManager from SharedServer. It will be initialized during startup.
   def raftManager: KafkaRaftManager[ApiMessageAndVersion] = 
sharedServer.raftManager
 
+  Server.maybeRegisterLinuxMetrics(config, time, logger.underlying)
+
   override def brokerState: BrokerState = Option(lifecycleManager).
     flatMap(m => Some(m.state)).getOrElse(BrokerState.NOT_RUNNING)
 
@@ -172,7 +175,6 @@ class BrokerServer(
       sharedServer.startForBroker()
 
       info("Starting broker")
-
       config.dynamicConfig.initialize(zkClientOpt = None)
 
       lifecycleManager = new BrokerLifecycleManager(config,
@@ -368,8 +370,7 @@ class BrokerServer(
       }
 
       // Create and initialize an authorizer if one is configured.
-      authorizer = config.createNewAuthorizer()
-      authorizer.foreach(_.configure(config.originals))
+      authorizer = AuthorizerUtils.configureAuthorizer(config)
 
       val fetchManager = new FetchManager(Time.SYSTEM,
         new FetchSessionCache(config.maxIncrementalFetchSessionCacheSlots,
@@ -568,7 +569,7 @@ class BrokerServer(
         CoreUtils.swallow(dataPlaneRequestProcessor.close(), this)
       if (controlPlaneRequestProcessor != null)
         CoreUtils.swallow(controlPlaneRequestProcessor.close(), this)
-      CoreUtils.swallow(authorizer.foreach(_.close()), this)
+      authorizer.foreach(AuthorizerUtils.closeAuthorizer)
 
       /**
        * We must shutdown the scheduler early because otherwise, the scheduler 
could touch other
diff --git a/core/src/main/scala/kafka/server/ControllerServer.scala 
b/core/src/main/scala/kafka/server/ControllerServer.scala
index 36d8f5eca1e..59b50051772 100644
--- a/core/src/main/scala/kafka/server/ControllerServer.scala
+++ b/core/src/main/scala/kafka/server/ControllerServer.scala
@@ -22,6 +22,7 @@ import kafka.migration.MigrationPropagator
 import kafka.network.{DataPlaneAcceptor, SocketServer}
 import kafka.raft.KafkaRaftManager
 import kafka.security.CredentialProvider
+import kafka.security.authorizer.AuthorizerUtils
 import kafka.server.KafkaConfig.{AlterConfigPolicyClassNameProp, 
CreateTopicPolicyClassNameProp}
 import kafka.server.QuotaFactory.QuotaManagers
 
@@ -145,15 +146,10 @@ class ControllerServer(
       metricsGroup.newGauge("ClusterId", () => clusterId)
       metricsGroup.newGauge("yammer-metrics-count", () =>  
KafkaYammerMetrics.defaultRegistry.allMetrics.size)
 
-      linuxIoMetricsCollector = new LinuxIoMetricsCollector("/proc", time, 
logger.underlying)
-      if (linuxIoMetricsCollector.usable()) {
-        metricsGroup.newGauge("linux-disk-read-bytes", () => 
linuxIoMetricsCollector.readBytes())
-        metricsGroup.newGauge("linux-disk-write-bytes", () => 
linuxIoMetricsCollector.writeBytes())
-      }
+      Server.maybeRegisterLinuxMetrics(config, time, logger.underlying)
 
       val javaListeners = config.controllerListeners.map(_.toJava).asJava
-      authorizer = config.createNewAuthorizer()
-      authorizer.foreach(_.configure(config.originals))
+      authorizer = AuthorizerUtils.configureAuthorizer(config)
 
       val endpointReadyFutures = {
         val builder = new EndpointReadyFutures.Builder()
@@ -288,7 +284,8 @@ class ControllerServer(
         sharedServer.metaProps,
         controllerNodes.asScala.toSeq,
         apiVersionManager)
-      controllerApisHandlerPool = new KafkaRequestHandlerPool(config.nodeId,
+      controllerApisHandlerPool = new KafkaRequestHandlerPool(
+        config.nodeId,
         socketServer.dataPlaneRequestChannel,
         controllerApis,
         time,
@@ -397,7 +394,7 @@ class ControllerServer(
         controller.close()
       if (quorumControllerMetrics != null)
         CoreUtils.swallow(quorumControllerMetrics.close(), this)
-      CoreUtils.swallow(authorizer.foreach(_.close()), this)
+      authorizer.foreach(AuthorizerUtils.closeAuthorizer(_))
       createTopicPolicy.foreach(policy => CoreUtils.swallow(policy.close(), 
this))
       alterConfigPolicy.foreach(policy => CoreUtils.swallow(policy.close(), 
this))
       socketServerFirstBoundPortFuture.completeExceptionally(new 
RuntimeException("shutting down"))
diff --git a/core/src/main/scala/kafka/server/KafkaServer.scala 
b/core/src/main/scala/kafka/server/KafkaServer.scala
index 10acd74241c..93378d905fb 100755
--- a/core/src/main/scala/kafka/server/KafkaServer.scala
+++ b/core/src/main/scala/kafka/server/KafkaServer.scala
@@ -28,6 +28,7 @@ import kafka.metrics.KafkaMetricsReporter
 import kafka.network.{ControlPlaneAcceptor, DataPlaneAcceptor, RequestChannel, 
SocketServer}
 import kafka.raft.KafkaRaftManager
 import kafka.security.CredentialProvider
+import kafka.security.authorizer.AuthorizerUtils
 import kafka.server.metadata.{OffsetTrackingListener, ZkConfigRepository, 
ZkMetadataCache}
 import kafka.utils._
 import kafka.zk.{AdminZkClient, BrokerInfo, KafkaZkClient}
@@ -488,8 +489,7 @@ class KafkaServer(
         )
 
         /* Get the authorizer and initialize it if one is specified.*/
-        authorizer = config.createNewAuthorizer()
-        authorizer.foreach(_.configure(config.originals))
+        authorizer = AuthorizerUtils.configureAuthorizer(config)
         val authorizerFutures: Map[Endpoint, CompletableFuture[Void]] = 
authorizer match {
           case Some(authZ) =>
             authZ.start(brokerInfo.broker.toServerInfo(clusterId, 
config)).asScala.map { case (ep, cs) =>
@@ -912,7 +912,7 @@ class KafkaServer(
           CoreUtils.swallow(dataPlaneRequestProcessor.close(), this)
         if (controlPlaneRequestProcessor != null)
           CoreUtils.swallow(controlPlaneRequestProcessor.close(), this)
-        CoreUtils.swallow(authorizer.foreach(_.close()), this)
+        authorizer.foreach(AuthorizerUtils.closeAuthorizer)
         if (adminManager != null)
           CoreUtils.swallow(adminManager.shutdown(), this)
 
diff --git a/core/src/main/scala/kafka/server/Server.scala 
b/core/src/main/scala/kafka/server/Server.scala
index d85060cc72d..c5c865aa5a4 100644
--- a/core/src/main/scala/kafka/server/Server.scala
+++ b/core/src/main/scala/kafka/server/Server.scala
@@ -16,9 +16,12 @@
  */
 package kafka.server
 
+import kafka.metrics.LinuxIoMetricsCollector
 import org.apache.kafka.clients.CommonClientConfigs
 import org.apache.kafka.common.metrics.{KafkaMetricsContext, MetricConfig, 
Metrics, MetricsReporter, Sensor}
 import org.apache.kafka.common.utils.Time
+import org.apache.kafka.server.metrics.KafkaYammerMetrics
+import org.slf4j.Logger
 
 import java.util
 import java.util.concurrent.TimeUnit
@@ -45,6 +48,15 @@ object Server {
     buildMetrics(config, time, metricsContext)
   }
 
+  def maybeRegisterLinuxMetrics(
+    config: KafkaConfig,
+    time: Time,
+    logger: Logger
+  ): Unit = {
+    val linuxIoMetricsCollector = new LinuxIoMetricsCollector("/proc", time, 
logger)
+    
linuxIoMetricsCollector.maybeRegisterMetrics(KafkaYammerMetrics.defaultRegistry())
+  }
+
   private def buildMetrics(
     config: KafkaConfig,
     time: Time,

Reply via email to