This is an automated email from the ASF dual-hosted git repository. cmccabe pushed a commit to branch cmccabe_2023-06-21_some_minor_fixes in repository https://gitbox.apache.org/repos/asf/kafka.git
commit 72737ab52235466324e2f553e4442fba4dc9cc2c Author: Colin P. McCabe <[email protected]> AuthorDate: Wed Jun 21 19:41:16 2023 -0700 MINOR: some minor cleanups for process startup Move registration of linux-disk-read-bytes, linux-disk-write-bytes metrics into a common function in LinuxIoMetricsCollector.scala. Also do it for isolated controller processes. Create authorizers using AuthorizerUtils.configureAuthorizer and close them using AuthorizerUtils.closeAuthorizer. This ensures that we configure the authorizers immediately after creating them. --- .../kafka/metrics/LinuxIoMetricsCollector.scala | 20 +++++++++++++++++++- .../kafka/security/authorizer/AuthorizerUtils.scala | 17 +++++++++++++++-- core/src/main/scala/kafka/server/BrokerServer.scala | 9 +++++---- .../main/scala/kafka/server/ControllerServer.scala | 15 ++++++--------- core/src/main/scala/kafka/server/KafkaServer.scala | 6 +++--- core/src/main/scala/kafka/server/Server.scala | 12 ++++++++++++ 6 files changed, 60 insertions(+), 19 deletions(-) diff --git a/core/src/main/scala/kafka/metrics/LinuxIoMetricsCollector.scala b/core/src/main/scala/kafka/metrics/LinuxIoMetricsCollector.scala index 5a41dbad73c..34dcaa312bf 100644 --- a/core/src/main/scala/kafka/metrics/LinuxIoMetricsCollector.scala +++ b/core/src/main/scala/kafka/metrics/LinuxIoMetricsCollector.scala @@ -17,9 +17,11 @@ package kafka.metrics -import java.nio.file.{Files, Paths} +import com.yammer.metrics.core.{Gauge, MetricsRegistry} +import java.nio.file.{Files, Paths} import org.apache.kafka.common.utils.Time +import org.apache.kafka.server.metrics.KafkaYammerMetrics import org.slf4j.Logger import scala.jdk.CollectionConverters._ @@ -94,6 +96,22 @@ class LinuxIoMetricsCollector(procRoot: String, val time: Time, val logger: Logg false } } + + def maybeRegisterMetrics(registry: MetricsRegistry): Unit = { + def registerGauge(name: String, gauge: Gauge[Long]): Unit = { + val metricName = KafkaYammerMetrics.getMetricName( + "kafka.server", + "KafkaServer", + name + ) + registry.newGauge(metricName, gauge) + } + + if (usable()) { + registerGauge("linux-disk-read-bytes", () => readBytes()) + registerGauge("linux-disk-write-bytes", () => writeBytes()) + } + } } object LinuxIoMetricsCollector { diff --git a/core/src/main/scala/kafka/security/authorizer/AuthorizerUtils.scala b/core/src/main/scala/kafka/security/authorizer/AuthorizerUtils.scala index 0e417d677eb..db61e93c486 100644 --- a/core/src/main/scala/kafka/security/authorizer/AuthorizerUtils.scala +++ b/core/src/main/scala/kafka/security/authorizer/AuthorizerUtils.scala @@ -18,20 +18,33 @@ package kafka.security.authorizer import java.net.InetAddress - import kafka.network.RequestChannel.Session +import kafka.server.KafkaConfig +import kafka.utils.{CoreUtils, Logging} import org.apache.kafka.common.resource.Resource import org.apache.kafka.common.security.auth.{KafkaPrincipal, SecurityProtocol} import org.apache.kafka.common.utils.Utils import org.apache.kafka.server.authorizer.{AuthorizableRequestContext, Authorizer} -object AuthorizerUtils { +object AuthorizerUtils extends Logging{ def createAuthorizer(className: String): Authorizer = Utils.newInstance(className, classOf[Authorizer]) def isClusterResource(name: String): Boolean = name.equals(Resource.CLUSTER_NAME) + def configureAuthorizer(config: KafkaConfig): Option[Authorizer] = { + val authorizerOpt = config.createNewAuthorizer() + authorizerOpt.foreach { authorizer => + authorizer.configure(config.originals) + } + authorizerOpt + } + + def closeAuthorizer(authorizer: Authorizer): Unit = { + CoreUtils.swallow(authorizer.close(), this) + } + def sessionToRequestContext(session: Session): AuthorizableRequestContext = { new AuthorizableRequestContext { override def clientId(): String = "" diff --git a/core/src/main/scala/kafka/server/BrokerServer.scala b/core/src/main/scala/kafka/server/BrokerServer.scala index 2bf29c32d97..59f9c655445 100644 --- a/core/src/main/scala/kafka/server/BrokerServer.scala +++ b/core/src/main/scala/kafka/server/BrokerServer.scala @@ -25,6 +25,7 @@ import kafka.log.remote.RemoteLogManager import kafka.network.{DataPlaneAcceptor, SocketServer} import kafka.raft.KafkaRaftManager import kafka.security.CredentialProvider +import kafka.security.authorizer.AuthorizerUtils import kafka.server.metadata.{BrokerMetadataPublisher, ClientQuotaMetadataManager, DynamicClientQuotaPublisher, DynamicConfigPublisher, KRaftMetadataCache, ScramPublisher} import kafka.utils.CoreUtils import org.apache.kafka.clients.NetworkClient @@ -75,6 +76,8 @@ class BrokerServer( // Get raftManager from SharedServer. It will be initialized during startup. def raftManager: KafkaRaftManager[ApiMessageAndVersion] = sharedServer.raftManager + Server.maybeRegisterLinuxMetrics(config, time, logger.underlying) + override def brokerState: BrokerState = Option(lifecycleManager). flatMap(m => Some(m.state)).getOrElse(BrokerState.NOT_RUNNING) @@ -172,7 +175,6 @@ class BrokerServer( sharedServer.startForBroker() info("Starting broker") - config.dynamicConfig.initialize(zkClientOpt = None) lifecycleManager = new BrokerLifecycleManager(config, @@ -368,8 +370,7 @@ class BrokerServer( } // Create and initialize an authorizer if one is configured. - authorizer = config.createNewAuthorizer() - authorizer.foreach(_.configure(config.originals)) + authorizer = AuthorizerUtils.configureAuthorizer(config) val fetchManager = new FetchManager(Time.SYSTEM, new FetchSessionCache(config.maxIncrementalFetchSessionCacheSlots, @@ -568,7 +569,7 @@ class BrokerServer( CoreUtils.swallow(dataPlaneRequestProcessor.close(), this) if (controlPlaneRequestProcessor != null) CoreUtils.swallow(controlPlaneRequestProcessor.close(), this) - CoreUtils.swallow(authorizer.foreach(_.close()), this) + authorizer.foreach(AuthorizerUtils.closeAuthorizer) /** * We must shutdown the scheduler early because otherwise, the scheduler could touch other diff --git a/core/src/main/scala/kafka/server/ControllerServer.scala b/core/src/main/scala/kafka/server/ControllerServer.scala index 36d8f5eca1e..59b50051772 100644 --- a/core/src/main/scala/kafka/server/ControllerServer.scala +++ b/core/src/main/scala/kafka/server/ControllerServer.scala @@ -22,6 +22,7 @@ import kafka.migration.MigrationPropagator import kafka.network.{DataPlaneAcceptor, SocketServer} import kafka.raft.KafkaRaftManager import kafka.security.CredentialProvider +import kafka.security.authorizer.AuthorizerUtils import kafka.server.KafkaConfig.{AlterConfigPolicyClassNameProp, CreateTopicPolicyClassNameProp} import kafka.server.QuotaFactory.QuotaManagers @@ -145,15 +146,10 @@ class ControllerServer( metricsGroup.newGauge("ClusterId", () => clusterId) metricsGroup.newGauge("yammer-metrics-count", () => KafkaYammerMetrics.defaultRegistry.allMetrics.size) - linuxIoMetricsCollector = new LinuxIoMetricsCollector("/proc", time, logger.underlying) - if (linuxIoMetricsCollector.usable()) { - metricsGroup.newGauge("linux-disk-read-bytes", () => linuxIoMetricsCollector.readBytes()) - metricsGroup.newGauge("linux-disk-write-bytes", () => linuxIoMetricsCollector.writeBytes()) - } + Server.maybeRegisterLinuxMetrics(config, time, logger.underlying) val javaListeners = config.controllerListeners.map(_.toJava).asJava - authorizer = config.createNewAuthorizer() - authorizer.foreach(_.configure(config.originals)) + authorizer = AuthorizerUtils.configureAuthorizer(config) val endpointReadyFutures = { val builder = new EndpointReadyFutures.Builder() @@ -288,7 +284,8 @@ class ControllerServer( sharedServer.metaProps, controllerNodes.asScala.toSeq, apiVersionManager) - controllerApisHandlerPool = new KafkaRequestHandlerPool(config.nodeId, + controllerApisHandlerPool = new KafkaRequestHandlerPool( + config.nodeId, socketServer.dataPlaneRequestChannel, controllerApis, time, @@ -397,7 +394,7 @@ class ControllerServer( controller.close() if (quorumControllerMetrics != null) CoreUtils.swallow(quorumControllerMetrics.close(), this) - CoreUtils.swallow(authorizer.foreach(_.close()), this) + authorizer.foreach(AuthorizerUtils.closeAuthorizer(_)) createTopicPolicy.foreach(policy => CoreUtils.swallow(policy.close(), this)) alterConfigPolicy.foreach(policy => CoreUtils.swallow(policy.close(), this)) socketServerFirstBoundPortFuture.completeExceptionally(new RuntimeException("shutting down")) diff --git a/core/src/main/scala/kafka/server/KafkaServer.scala b/core/src/main/scala/kafka/server/KafkaServer.scala index 10acd74241c..93378d905fb 100755 --- a/core/src/main/scala/kafka/server/KafkaServer.scala +++ b/core/src/main/scala/kafka/server/KafkaServer.scala @@ -28,6 +28,7 @@ import kafka.metrics.KafkaMetricsReporter import kafka.network.{ControlPlaneAcceptor, DataPlaneAcceptor, RequestChannel, SocketServer} import kafka.raft.KafkaRaftManager import kafka.security.CredentialProvider +import kafka.security.authorizer.AuthorizerUtils import kafka.server.metadata.{OffsetTrackingListener, ZkConfigRepository, ZkMetadataCache} import kafka.utils._ import kafka.zk.{AdminZkClient, BrokerInfo, KafkaZkClient} @@ -488,8 +489,7 @@ class KafkaServer( ) /* Get the authorizer and initialize it if one is specified.*/ - authorizer = config.createNewAuthorizer() - authorizer.foreach(_.configure(config.originals)) + authorizer = AuthorizerUtils.configureAuthorizer(config) val authorizerFutures: Map[Endpoint, CompletableFuture[Void]] = authorizer match { case Some(authZ) => authZ.start(brokerInfo.broker.toServerInfo(clusterId, config)).asScala.map { case (ep, cs) => @@ -912,7 +912,7 @@ class KafkaServer( CoreUtils.swallow(dataPlaneRequestProcessor.close(), this) if (controlPlaneRequestProcessor != null) CoreUtils.swallow(controlPlaneRequestProcessor.close(), this) - CoreUtils.swallow(authorizer.foreach(_.close()), this) + authorizer.foreach(AuthorizerUtils.closeAuthorizer) if (adminManager != null) CoreUtils.swallow(adminManager.shutdown(), this) diff --git a/core/src/main/scala/kafka/server/Server.scala b/core/src/main/scala/kafka/server/Server.scala index d85060cc72d..c5c865aa5a4 100644 --- a/core/src/main/scala/kafka/server/Server.scala +++ b/core/src/main/scala/kafka/server/Server.scala @@ -16,9 +16,12 @@ */ package kafka.server +import kafka.metrics.LinuxIoMetricsCollector import org.apache.kafka.clients.CommonClientConfigs import org.apache.kafka.common.metrics.{KafkaMetricsContext, MetricConfig, Metrics, MetricsReporter, Sensor} import org.apache.kafka.common.utils.Time +import org.apache.kafka.server.metrics.KafkaYammerMetrics +import org.slf4j.Logger import java.util import java.util.concurrent.TimeUnit @@ -45,6 +48,15 @@ object Server { buildMetrics(config, time, metricsContext) } + def maybeRegisterLinuxMetrics( + config: KafkaConfig, + time: Time, + logger: Logger + ): Unit = { + val linuxIoMetricsCollector = new LinuxIoMetricsCollector("/proc", time, logger) + linuxIoMetricsCollector.maybeRegisterMetrics(KafkaYammerMetrics.defaultRegistry()) + } + private def buildMetrics( config: KafkaConfig, time: Time,
