This is an automated email from the ASF dual-hosted git repository.

cmccabe pushed a commit to branch trunk
in repository https://gitbox.apache.org/repos/asf/kafka.git


The following commit(s) were added to refs/heads/trunk by this push:
     new 06462c7be1 KAFKA-14051: Create metrics reporters in KRaft remote 
controllers (#12396)
06462c7be1 is described below

commit 06462c7be175cb730b20de0333d1ac48b3286282
Author: Ron Dagostino <[email protected]>
AuthorDate: Thu Jul 21 19:59:05 2022 -0400

    KAFKA-14051: Create metrics reporters in KRaft remote controllers (#12396)
    
    KRaft remote controllers do not yet support dynamic reconfiguration 
(https://issues.apache.org/jira/browse/KAFKA-14057). Until we implement that, 
in the meantime we see that the instantiation of the configured metric 
reporters is actually performed as part of the wiring for dynamic 
reconfiguration. Since that wiring does not exist yet for KRaft remote 
controllers, this patch refactors out the instantiation of the metric reporters 
from the reconfiguration of them and adjusts the contro [...]
    
    Reviewers: Colin P. McCabe <[email protected]>
---
 .../main/scala/kafka/server/ControllerServer.scala | 15 +++++++
 .../scala/kafka/server/DynamicBrokerConfig.scala   | 46 +++++++++++++++-------
 2 files changed, 46 insertions(+), 15 deletions(-)

diff --git a/core/src/main/scala/kafka/server/ControllerServer.scala 
b/core/src/main/scala/kafka/server/ControllerServer.scala
index 67b3f0276d..cff88d2b6b 100644
--- a/core/src/main/scala/kafka/server/ControllerServer.scala
+++ b/core/src/main/scala/kafka/server/ControllerServer.scala
@@ -28,6 +28,7 @@ import kafka.network.{DataPlaneAcceptor, SocketServer}
 import kafka.raft.RaftManager
 import kafka.security.CredentialProvider
 import kafka.server.KafkaConfig.{AlterConfigPolicyClassNameProp, 
CreateTopicPolicyClassNameProp}
+import kafka.server.KafkaRaftServer.BrokerRole
 import kafka.server.QuotaFactory.QuotaManagers
 import kafka.utils.{CoreUtils, Logging}
 import org.apache.kafka.clients.ApiVersions
@@ -68,6 +69,8 @@ class ControllerServer(
 ) extends Logging with KafkaMetricsGroup {
   import kafka.server.Server._
 
+  config.dynamicConfig.initialize(zkClientOpt = None)
+
   val lock = new ReentrantLock()
   val awaitShutdownCond = lock.newCondition()
   var status: ProcessStatus = SHUTDOWN
@@ -97,6 +100,13 @@ class ControllerServer(
     true
   }
 
+  private def doRemoteKraftSetup(): Unit = {
+    // Explicitly configure metric reporters on this remote controller.
+    // We do not yet support dynamic reconfiguration on remote controllers in 
general;
+    // remove this once that is implemented.
+    new DynamicMetricReporterState(config.nodeId, config, metrics, clusterId)
+  }
+
   def clusterId: String = metaProperties.clusterId
 
   def startup(): Unit = {
@@ -202,6 +212,11 @@ class ControllerServer(
       }
       controller = controllerBuilder.build()
 
+      // Perform any setup that is done only when this node is a 
controller-only node.
+      if (!config.processRoles.contains(BrokerRole)) {
+        doRemoteKraftSetup()
+      }
+
       quotaManagers = QuotaFactory.instantiate(config, metrics, time, 
threadNamePrefix.getOrElse(""))
       controllerApis = new ControllerApis(socketServer.dataPlaneRequestChannel,
         authorizer,
diff --git a/core/src/main/scala/kafka/server/DynamicBrokerConfig.scala 
b/core/src/main/scala/kafka/server/DynamicBrokerConfig.scala
index 33511147e6..d7828ce0c0 100755
--- a/core/src/main/scala/kafka/server/DynamicBrokerConfig.scala
+++ b/core/src/main/scala/kafka/server/DynamicBrokerConfig.scala
@@ -30,7 +30,7 @@ import kafka.utils.Implicits._
 import kafka.zk.{AdminZkClient, KafkaZkClient}
 import org.apache.kafka.common.Reconfigurable
 import org.apache.kafka.common.config.{AbstractConfig, ConfigDef, 
ConfigException, SslConfigs}
-import org.apache.kafka.common.metrics.MetricsReporter
+import org.apache.kafka.common.metrics.{Metrics, MetricsReporter}
 import org.apache.kafka.common.config.types.Password
 import org.apache.kafka.common.network.{ListenerName, ListenerReconfigurable}
 import org.apache.kafka.common.security.authenticator.LoginManager
@@ -254,7 +254,7 @@ class DynamicBrokerConfig(private val kafkaConfig: 
KafkaConfig) extends Logging
       case _ =>
     }
     addReconfigurable(kafkaServer.kafkaYammerMetrics)
-    addReconfigurable(new DynamicMetricsReporters(kafkaConfig.brokerId, 
kafkaServer))
+    addReconfigurable(new DynamicMetricsReporters(kafkaConfig.brokerId, 
kafkaServer.config, kafkaServer.metrics, kafkaServer.clusterId))
     addReconfigurable(new DynamicClientQuotaCallback(kafkaServer))
 
     addBrokerReconfigurable(new DynamicThreadPool(kafkaServer))
@@ -744,17 +744,18 @@ class DynamicThreadPool(server: KafkaBroker) extends 
BrokerReconfigurable {
   }
 }
 
-class DynamicMetricsReporters(brokerId: Int, server: KafkaBroker) extends 
Reconfigurable {
+class DynamicMetricsReporters(brokerId: Int, config: KafkaConfig, metrics: 
Metrics, clusterId: String) extends Reconfigurable {
+  private val reporterState = new DynamicMetricReporterState(brokerId, config, 
metrics, clusterId)
+  private val currentReporters = reporterState.currentReporters
+  private val dynamicConfig = reporterState.dynamicConfig
 
-  private val dynamicConfig = server.config.dynamicConfig
-  private val metrics = server.metrics
-  private val propsOverride = Map[String, AnyRef](KafkaConfig.BrokerIdProp -> 
brokerId.toString)
-  private val currentReporters = mutable.Map[String, MetricsReporter]()
+  private def metricsReporterClasses(configs: util.Map[String, _]): 
mutable.Buffer[String] =
+    reporterState.metricsReporterClasses(configs)
 
-  
createReporters(dynamicConfig.currentKafkaConfig.getList(KafkaConfig.MetricReporterClassesProp),
-    Collections.emptyMap[String, Object])
+  private def createReporters(reporterClasses: util.List[String], 
updatedConfigs: util.Map[String, _]): Unit =
+    reporterState.createReporters(reporterClasses, updatedConfigs)
 
-  private[server] def currentMetricsReporters: List[MetricsReporter] = 
currentReporters.values.toList
+  private def removeReporter(className: String): Unit = 
reporterState.removeReporter(className)
 
   override def configure(configs: util.Map[String, _]): Unit = {}
 
@@ -797,8 +798,23 @@ class DynamicMetricsReporters(brokerId: Int, server: 
KafkaBroker) extends Reconf
     val added = updatedMetricsReporters.filterNot(currentReporters.keySet)
     createReporters(added.asJava, configs)
   }
+}
+
+class DynamicMetricReporterState(brokerId: Int, config: KafkaConfig, metrics: 
Metrics, clusterId: String) {
+  private[server] val dynamicConfig = config.dynamicConfig
+  private val propsOverride = Map[String, AnyRef](KafkaConfig.BrokerIdProp -> 
brokerId.toString)
+  private[server] val currentReporters = mutable.Map[String, MetricsReporter]()
+  createReporters(config, clusterId, 
metricsReporterClasses(dynamicConfig.currentKafkaConfig.values()).asJava,
+    Collections.emptyMap[String, Object])
+
+  private[server] def createReporters(reporterClasses: util.List[String],
+                                      updatedConfigs: util.Map[String, _]): 
Unit = {
+    createReporters(config, clusterId, reporterClasses, updatedConfigs)
+  }
 
-  private def createReporters(reporterClasses: util.List[String],
+  private def createReporters(config: KafkaConfig,
+                              clusterId: String,
+                              reporterClasses: util.List[String],
                               updatedConfigs: util.Map[String, _]): Unit = {
     val props = new util.HashMap[String, AnyRef]
     updatedConfigs.forEach((k, v) => props.put(k, v.asInstanceOf[AnyRef]))
@@ -807,19 +823,19 @@ class DynamicMetricsReporters(brokerId: Int, server: 
KafkaBroker) extends Reconf
     // Call notifyMetricsReporters first to satisfy the contract for 
MetricsReporter.contextChange,
     // which provides that MetricsReporter.contextChange must be called before 
the first call to MetricsReporter.init.
     // The first call to MetricsReporter.init is done when we call 
metrics.addReporter below.
-    KafkaBroker.notifyMetricsReporters(server.clusterId, server.config, 
reporters.asScala)
+    KafkaBroker.notifyMetricsReporters(clusterId, config, reporters.asScala)
     reporters.forEach { reporter =>
       metrics.addReporter(reporter)
       currentReporters += reporter.getClass.getName -> reporter
     }
-    KafkaBroker.notifyClusterListeners(server.clusterId, reporters.asScala)
+    KafkaBroker.notifyClusterListeners(clusterId, reporters.asScala)
   }
 
-  private def removeReporter(className: String): Unit = {
+  private[server] def removeReporter(className: String): Unit = {
     currentReporters.remove(className).foreach(metrics.removeReporter)
   }
 
-  private def metricsReporterClasses(configs: util.Map[String, _]): 
mutable.Buffer[String] = {
+  private[server] def metricsReporterClasses(configs: util.Map[String, _]): 
mutable.Buffer[String] = {
     
configs.get(KafkaConfig.MetricReporterClassesProp).asInstanceOf[util.List[String]].asScala
   }
 }

Reply via email to