This is an automated email from the ASF dual-hosted git repository.

ijuma pushed a commit to branch trunk
in repository https://gitbox.apache.org/repos/asf/kafka.git


The following commit(s) were added to refs/heads/trunk by this push:
     new 13a83d58f89 KAFKA-15853: Move ProcessRole to server module (#15166)
13a83d58f89 is described below

commit 13a83d58f897de2f55d8d3342ffb058b230a9183
Author: Omnia Ibrahim <[email protected]>
AuthorDate: Wed Jan 10 23:13:06 2024 +0000

    KAFKA-15853: Move ProcessRole to server module (#15166)
    
    Prepare to move KafkaConfig (#15103).
    
    Reviewers: Ismael Juma <[email protected]>
---
 core/src/main/scala/kafka/raft/RaftManager.scala   |  4 +--
 .../scala/kafka/server/DynamicBrokerConfig.scala   |  4 +--
 core/src/main/scala/kafka/server/KafkaConfig.scala | 16 +++++-----
 .../main/scala/kafka/server/KafkaRaftServer.scala  | 14 ++-------
 .../src/main/scala/kafka/server/SharedServer.scala |  8 ++---
 .../scala/unit/kafka/raft/RaftManagerTest.scala    | 18 +++++-------
 .../java/org/apache/kafka/server/ProcessRole.java  | 34 ++++++++++++++++++++++
 7 files changed, 61 insertions(+), 37 deletions(-)

diff --git a/core/src/main/scala/kafka/raft/RaftManager.scala 
b/core/src/main/scala/kafka/raft/RaftManager.scala
index d80a0d50137..a9e64fb967b 100644
--- a/core/src/main/scala/kafka/raft/RaftManager.scala
+++ b/core/src/main/scala/kafka/raft/RaftManager.scala
@@ -24,7 +24,6 @@ import java.util.OptionalInt
 import java.util.concurrent.CompletableFuture
 import kafka.log.LogManager
 import kafka.log.UnifiedLog
-import kafka.server.KafkaRaftServer.ControllerRole
 import kafka.server.KafkaConfig
 import kafka.utils.CoreUtils
 import kafka.utils.FileLock
@@ -42,6 +41,7 @@ import org.apache.kafka.common.security.auth.SecurityProtocol
 import org.apache.kafka.common.utils.{LogContext, Time}
 import org.apache.kafka.raft.RaftConfig.{AddressSpec, InetAddressSpec, 
NON_ROUTABLE_ADDRESS, UnknownAddressSpec}
 import org.apache.kafka.raft.{FileBasedStateStore, KafkaNetworkChannel, 
KafkaRaftClient, KafkaRaftClientDriver, LeaderAndEpoch, RaftClient, RaftConfig, 
ReplicatedLog}
+import org.apache.kafka.server.ProcessRole
 import org.apache.kafka.server.common.serialization.RecordSerde
 import org.apache.kafka.server.util.KafkaScheduler
 import org.apache.kafka.server.fault.FaultHandler
@@ -120,7 +120,7 @@ class KafkaRaftManager[T](
       .map(Paths.get(_).toAbsolutePath)
       .contains(Paths.get(config.metadataLogDir).toAbsolutePath)
     // Or this node is only a controller
-    val isOnlyController = config.processRoles == Set(ControllerRole)
+    val isOnlyController = config.processRoles == 
Set(ProcessRole.ControllerRole)
 
     if (differentMetadataLogDir || isOnlyController) {
       Some(KafkaRaftManager.lockDataDir(new File(config.metadataLogDir)))
diff --git a/core/src/main/scala/kafka/server/DynamicBrokerConfig.scala 
b/core/src/main/scala/kafka/server/DynamicBrokerConfig.scala
index d63272c3731..7f607d49a4f 100755
--- a/core/src/main/scala/kafka/server/DynamicBrokerConfig.scala
+++ b/core/src/main/scala/kafka/server/DynamicBrokerConfig.scala
@@ -25,7 +25,6 @@ import kafka.cluster.EndPoint
 import kafka.log.{LogCleaner, LogManager}
 import kafka.network.{DataPlaneAcceptor, SocketServer}
 import kafka.server.DynamicBrokerConfig._
-import kafka.server.KafkaRaftServer.BrokerRole
 import kafka.utils.{CoreUtils, Logging, PasswordEncoder}
 import kafka.utils.Implicits._
 import kafka.zk.{AdminZkClient, KafkaZkClient}
@@ -36,6 +35,7 @@ import org.apache.kafka.common.config.types.Password
 import org.apache.kafka.common.network.{ListenerName, ListenerReconfigurable}
 import org.apache.kafka.common.security.authenticator.LoginManager
 import org.apache.kafka.common.utils.{ConfigUtils, Utils}
+import org.apache.kafka.server.ProcessRole
 import org.apache.kafka.server.config.{ConfigEntityName, ConfigType, 
ServerTopicConfigSynonyms}
 import org.apache.kafka.server.log.remote.storage.RemoteLogManagerConfig
 import org.apache.kafka.server.metrics.ClientMetricsReceiverPlugin
@@ -287,7 +287,7 @@ class DynamicBrokerConfig(private val kafkaConfig: 
KafkaConfig) extends Logging
       case Some(authz: Reconfigurable) => addReconfigurable(authz)
       case _ =>
     }
-    if (!kafkaConfig.processRoles.contains(BrokerRole)) {
+    if (!kafkaConfig.processRoles.contains(ProcessRole.BrokerRole)) {
       // only add these if the controller isn't also running the broker role
       // because these would already be added via the broker in that case
       addReconfigurable(controller.kafkaYammerMetrics)
diff --git a/core/src/main/scala/kafka/server/KafkaConfig.scala 
b/core/src/main/scala/kafka/server/KafkaConfig.scala
index 03d55c309e8..2afb1d64387 100755
--- a/core/src/main/scala/kafka/server/KafkaConfig.scala
+++ b/core/src/main/scala/kafka/server/KafkaConfig.scala
@@ -25,7 +25,6 @@ import kafka.coordinator.group.OffsetConfig
 import kafka.coordinator.transaction.{TransactionLog, TransactionStateManager}
 import kafka.security.authorizer.AuthorizerUtils
 import kafka.server.KafkaConfig.{ControllerListenerNamesProp, 
ListenerSecurityProtocolMapProp}
-import kafka.server.KafkaRaftServer.{BrokerRole, ControllerRole, ProcessRole}
 import kafka.utils.CoreUtils.parseCsvList
 import kafka.utils.{CoreUtils, Logging}
 import kafka.utils.Implicits._
@@ -45,6 +44,7 @@ import org.apache.kafka.common.utils.Utils
 import org.apache.kafka.coordinator.group.Group.GroupType
 import org.apache.kafka.coordinator.group.assignor.{PartitionAssignor, 
RangeAssignor, UniformAssignor}
 import org.apache.kafka.raft.RaftConfig
+import org.apache.kafka.server.ProcessRole
 import org.apache.kafka.server.authorizer.Authorizer
 import org.apache.kafka.server.common.{MetadataVersion, 
MetadataVersionValidator}
 import org.apache.kafka.server.common.MetadataVersion._
@@ -1771,8 +1771,8 @@ class KafkaConfig private(doLog: Boolean, val props: 
java.util.Map[_, _], dynami
 
   private def parseProcessRoles(): Set[ProcessRole] = {
     val roles = getList(KafkaConfig.ProcessRolesProp).asScala.map {
-      case "broker" => BrokerRole
-      case "controller" => ControllerRole
+      case "broker" => ProcessRole.BrokerRole
+      case "controller" => ProcessRole.ControllerRole
       case role => throw new ConfigException(s"Unknown process role '$role'" +
         " (only 'broker' and 'controller' are allowed roles)")
     }
@@ -1787,7 +1787,7 @@ class KafkaConfig private(doLog: Boolean, val props: 
java.util.Map[_, _], dynami
   }
 
   def isKRaftCombinedMode: Boolean = {
-    processRoles == Set(BrokerRole, ControllerRole)
+    processRoles == Set(ProcessRole.BrokerRole, ProcessRole.ControllerRole)
   }
 
   def metadataLogDir: String = {
@@ -2362,9 +2362,9 @@ class KafkaConfig private(doLog: Boolean, val props: 
java.util.Map[_, _], dynami
     def validateAdvertisedListenersNonEmptyForBroker(): Unit = {
       require(advertisedListenerNames.nonEmpty,
         "There must be at least one advertised listener." + (
-          if (processRoles.contains(BrokerRole)) s" Perhaps all listeners 
appear in $ControllerListenerNamesProp?" else ""))
+          if (processRoles.contains(ProcessRole.BrokerRole)) s" Perhaps all 
listeners appear in $ControllerListenerNamesProp?" else ""))
     }
-    if (processRoles == Set(BrokerRole)) {
+    if (processRoles == Set(ProcessRole.BrokerRole)) {
       // KRaft broker-only
       validateNonEmptyQuorumVotersForKRaft()
       validateControlPlaneListenerEmptyForKRaft()
@@ -2391,7 +2391,7 @@ class KafkaConfig private(doLog: Boolean, val props: 
java.util.Map[_, _], dynami
         warn(s"${KafkaConfig.ControllerListenerNamesProp} has multiple 
entries; only the first will be used since 
${KafkaConfig.ProcessRolesProp}=broker: ${controllerListenerNames.asJava}")
       }
       validateAdvertisedListenersNonEmptyForBroker()
-    } else if (processRoles == Set(ControllerRole)) {
+    } else if (processRoles == Set(ProcessRole.ControllerRole)) {
       // KRaft controller-only
       validateNonEmptyQuorumVotersForKRaft()
       validateControlPlaneListenerEmptyForKRaft()
@@ -2439,7 +2439,7 @@ class KafkaConfig private(doLog: Boolean, val props: 
java.util.Map[_, _], dynami
     }
 
     val listenerNames = listeners.map(_.listenerName).toSet
-    if (processRoles.isEmpty || processRoles.contains(BrokerRole)) {
+    if (processRoles.isEmpty || processRoles.contains(ProcessRole.BrokerRole)) 
{
       // validations for all broker setups (i.e. ZooKeeper and KRaft 
broker-only and KRaft co-located)
       validateAdvertisedListenersNonEmptyForBroker()
       require(advertisedListenerNames.contains(interBrokerListenerName),
diff --git a/core/src/main/scala/kafka/server/KafkaRaftServer.scala 
b/core/src/main/scala/kafka/server/KafkaRaftServer.scala
index 32bd6f4937b..4ae5a8b0237 100644
--- a/core/src/main/scala/kafka/server/KafkaRaftServer.scala
+++ b/core/src/main/scala/kafka/server/KafkaRaftServer.scala
@@ -20,7 +20,6 @@ import java.io.File
 import java.util.concurrent.CompletableFuture
 import kafka.log.UnifiedLog
 import kafka.metrics.KafkaMetricsReporter
-import kafka.server.KafkaRaftServer.{BrokerRole, ControllerRole}
 import kafka.utils.{CoreUtils, Logging, Mx4jLoader, VerifiableProperties}
 import org.apache.kafka.common.config.{ConfigDef, ConfigResource}
 import org.apache.kafka.common.internals.Topic
@@ -31,6 +30,7 @@ import 
org.apache.kafka.metadata.bootstrap.{BootstrapDirectory, BootstrapMetadat
 import 
org.apache.kafka.metadata.properties.MetaPropertiesEnsemble.VerificationFlag.{REQUIRE_AT_LEAST_ONE_VALID,
 REQUIRE_METADATA_LOG_DIR}
 import org.apache.kafka.metadata.properties.{MetaProperties, 
MetaPropertiesEnsemble}
 import org.apache.kafka.raft.RaftConfig
+import org.apache.kafka.server.ProcessRole
 import org.apache.kafka.server.config.ServerTopicConfigSynonyms
 import org.apache.kafka.server.metrics.KafkaYammerMetrics
 import org.apache.kafka.storage.internals.log.LogConfig
@@ -77,13 +77,13 @@ class KafkaRaftServer(
     new StandardFaultHandlerFactory(),
   )
 
-  private val broker: Option[BrokerServer] = if 
(config.processRoles.contains(BrokerRole)) {
+  private val broker: Option[BrokerServer] = if 
(config.processRoles.contains(ProcessRole.BrokerRole)) {
     Some(new BrokerServer(sharedServer))
   } else {
     None
   }
 
-  private val controller: Option[ControllerServer] = if 
(config.processRoles.contains(ControllerRole)) {
+  private val controller: Option[ControllerServer] = if 
(config.processRoles.contains(ProcessRole.ControllerRole)) {
     Some(new ControllerServer(
       sharedServer,
       KafkaRaftServer.configSchema,
@@ -121,14 +121,6 @@ object KafkaRaftServer {
   val MetadataPartition = Topic.CLUSTER_METADATA_TOPIC_PARTITION
   val MetadataTopicId = Uuid.METADATA_TOPIC_ID
 
-  sealed trait ProcessRole
-  case object BrokerRole extends ProcessRole {
-    override def toString: String = "broker"
-  }
-  case object ControllerRole extends ProcessRole {
-    override def toString: String = "controller"
-  }
-
   /**
    * Initialize the configured log directories, including both 
[[KafkaConfig.MetadataLogDirProp]]
    * and [[KafkaConfig.LogDirProp]]. This method performs basic validation to 
ensure that all
diff --git a/core/src/main/scala/kafka/server/SharedServer.scala 
b/core/src/main/scala/kafka/server/SharedServer.scala
index b093d66995f..31b4957e3f5 100644
--- a/core/src/main/scala/kafka/server/SharedServer.scala
+++ b/core/src/main/scala/kafka/server/SharedServer.scala
@@ -18,7 +18,6 @@
 package kafka.server
 
 import kafka.raft.KafkaRaftManager
-import kafka.server.KafkaRaftServer.{BrokerRole, ControllerRole}
 import kafka.server.Server.MetricsPrefix
 import kafka.server.metadata.BrokerServerMetrics
 import kafka.utils.{CoreUtils, Logging}
@@ -33,6 +32,7 @@ import 
org.apache.kafka.image.publisher.metrics.SnapshotEmitterMetrics
 import org.apache.kafka.metadata.MetadataRecordSerde
 import org.apache.kafka.metadata.properties.MetaPropertiesEnsemble
 import org.apache.kafka.raft.RaftConfig.AddressSpec
+import org.apache.kafka.server.ProcessRole
 import org.apache.kafka.server.common.ApiMessageAndVersion
 import org.apache.kafka.server.fault.{FaultHandler, LoggingFaultHandler, 
ProcessTerminatingFaultHandler}
 import org.apache.kafka.server.metrics.KafkaYammerMetrics
@@ -171,7 +171,7 @@ class SharedServer(
    */
   def metadataLoaderFaultHandler: FaultHandler = faultHandlerFactory.build(
     name = "metadata loading",
-    fatal = sharedServerConfig.processRoles.contains(ControllerRole),
+    fatal = 
sharedServerConfig.processRoles.contains(ProcessRole.ControllerRole),
     action = () => SharedServer.this.synchronized {
       Option(brokerMetrics).foreach(_.metadataLoadErrorCount.getAndIncrement())
       Option(controllerServerMetrics).foreach(_.incrementMetadataErrorCount())
@@ -247,10 +247,10 @@ class SharedServer(
         }
         sharedServerConfig.dynamicConfig.initialize(zkClientOpt = None, 
clientMetricsReceiverPluginOpt = None)
 
-        if (sharedServerConfig.processRoles.contains(BrokerRole)) {
+        if (sharedServerConfig.processRoles.contains(ProcessRole.BrokerRole)) {
           brokerMetrics = BrokerServerMetrics(metrics)
         }
-        if (sharedServerConfig.processRoles.contains(ControllerRole)) {
+        if 
(sharedServerConfig.processRoles.contains(ProcessRole.ControllerRole)) {
           controllerServerMetrics = new 
ControllerMetadataMetrics(Optional.of(KafkaYammerMetrics.defaultRegistry()))
         }
         val _raftManager = new KafkaRaftManager[ApiMessageAndVersion](
diff --git a/core/src/test/scala/unit/kafka/raft/RaftManagerTest.scala 
b/core/src/test/scala/unit/kafka/raft/RaftManagerTest.scala
index d42d6141ac9..711426468c5 100644
--- a/core/src/test/scala/unit/kafka/raft/RaftManagerTest.scala
+++ b/core/src/test/scala/unit/kafka/raft/RaftManagerTest.scala
@@ -24,9 +24,6 @@ import java.util.Properties
 import java.util.concurrent.CompletableFuture
 import kafka.log.LogManager
 import kafka.server.KafkaConfig
-import kafka.server.KafkaRaftServer.BrokerRole
-import kafka.server.KafkaRaftServer.ControllerRole
-import kafka.server.KafkaRaftServer.ProcessRole
 import kafka.utils.TestUtils
 import kafka.tools.TestRaftServer.ByteArraySerde
 import org.apache.kafka.common.TopicPartition
@@ -34,6 +31,7 @@ import org.apache.kafka.common.Uuid
 import org.apache.kafka.common.metrics.Metrics
 import org.apache.kafka.common.utils.Time
 import org.apache.kafka.raft.RaftConfig
+import org.apache.kafka.server.ProcessRole
 import org.junit.jupiter.api.Assertions._
 import org.junit.jupiter.api.Test
 import org.junit.jupiter.params.ParameterizedTest
@@ -58,16 +56,16 @@ class RaftManagerTest {
     props.setProperty(KafkaConfig.ProcessRolesProp, processRoles.mkString(","))
     props.setProperty(KafkaConfig.NodeIdProp, nodeId.toString)
     props.setProperty(KafkaConfig.ControllerListenerNamesProp, "SSL")
-    if (processRoles.contains(BrokerRole)) {
+    if (processRoles.contains(ProcessRole.BrokerRole)) {
       props.setProperty(KafkaConfig.InterBrokerListenerNameProp, "PLAINTEXT")
-      if (processRoles.contains(ControllerRole)) { // co-located
+      if (processRoles.contains(ProcessRole.ControllerRole)) { // co-located
         props.setProperty(KafkaConfig.ListenersProp, 
"PLAINTEXT://localhost:9092,SSL://localhost:9093")
         props.setProperty(KafkaConfig.QuorumVotersProp, 
s"${nodeId}@localhost:9093")
       } else { // broker-only
         val voterId = nodeId + 1
         props.setProperty(KafkaConfig.QuorumVotersProp, 
s"${voterId}@localhost:9093")
       }
-    } else if (processRoles.contains(ControllerRole)) { // controller-only
+    } else if (processRoles.contains(ProcessRole.ControllerRole)) { // 
controller-only
       props.setProperty(KafkaConfig.ListenersProp, "SSL://localhost:9093")
       props.setProperty(KafkaConfig.QuorumVotersProp, 
s"${nodeId}@localhost:9093")
     }
@@ -100,10 +98,10 @@ class RaftManagerTest {
   def testNodeIdPresent(processRoles: String): Unit = {
     var processRolesSet = Set.empty[ProcessRole]
     if (processRoles.contains("broker")) {
-      processRolesSet = processRolesSet ++ Set(BrokerRole)
+      processRolesSet = processRolesSet ++ Set(ProcessRole.BrokerRole)
     }
     if (processRoles.contains("controller")) {
-      processRolesSet = processRolesSet ++ Set(ControllerRole)
+      processRolesSet = processRolesSet ++ Set(ProcessRole.ControllerRole)
     }
 
     val logDir = TestUtils.tempDir()
@@ -140,7 +138,7 @@ class RaftManagerTest {
     val raftManager = createRaftManager(
       new TopicPartition("__raft_id_test", 0),
       createConfig(
-        Set(ControllerRole),
+        Set(ProcessRole.ControllerRole),
         nodeId,
         logDir,
         metadataDir
@@ -164,7 +162,7 @@ class RaftManagerTest {
     val raftManager = createRaftManager(
       new TopicPartition("__raft_id_test", 0),
       createConfig(
-        Set(BrokerRole),
+        Set(ProcessRole.BrokerRole),
         nodeId,
         logDir,
         metadataDir
diff --git a/server/src/main/java/org/apache/kafka/server/ProcessRole.java 
b/server/src/main/java/org/apache/kafka/server/ProcessRole.java
new file mode 100644
index 00000000000..e4f7f70f594
--- /dev/null
+++ b/server/src/main/java/org/apache/kafka/server/ProcessRole.java
@@ -0,0 +1,34 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ *    http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.kafka.server;
+
+public enum ProcessRole {
+    BrokerRole("broker"),
+    ControllerRole("controller");
+
+    private final String roleName;
+
+    ProcessRole(String roleName) {
+        this.roleName = roleName;
+    }
+
+    @Override
+    public String toString() {
+        return roleName;
+    }
+}

Reply via email to