This is an automated email from the ASF dual-hosted git repository.
ijuma pushed a commit to branch trunk
in repository https://gitbox.apache.org/repos/asf/kafka.git
The following commit(s) were added to refs/heads/trunk by this push:
new 13a83d58f89 KAFKA-15853: Move ProcessRole to server module (#15166)
13a83d58f89 is described below
commit 13a83d58f897de2f55d8d3342ffb058b230a9183
Author: Omnia Ibrahim <[email protected]>
AuthorDate: Wed Jan 10 23:13:06 2024 +0000
KAFKA-15853: Move ProcessRole to server module (#15166)
Prepare to move KafkaConfig (#15103).
Reviewers: Ismael Juma <[email protected]>
---
core/src/main/scala/kafka/raft/RaftManager.scala | 4 +--
.../scala/kafka/server/DynamicBrokerConfig.scala | 4 +--
core/src/main/scala/kafka/server/KafkaConfig.scala | 16 +++++-----
.../main/scala/kafka/server/KafkaRaftServer.scala | 14 ++-------
.../src/main/scala/kafka/server/SharedServer.scala | 8 ++---
.../scala/unit/kafka/raft/RaftManagerTest.scala | 18 +++++-------
.../java/org/apache/kafka/server/ProcessRole.java | 34 ++++++++++++++++++++++
7 files changed, 61 insertions(+), 37 deletions(-)
diff --git a/core/src/main/scala/kafka/raft/RaftManager.scala
b/core/src/main/scala/kafka/raft/RaftManager.scala
index d80a0d50137..a9e64fb967b 100644
--- a/core/src/main/scala/kafka/raft/RaftManager.scala
+++ b/core/src/main/scala/kafka/raft/RaftManager.scala
@@ -24,7 +24,6 @@ import java.util.OptionalInt
import java.util.concurrent.CompletableFuture
import kafka.log.LogManager
import kafka.log.UnifiedLog
-import kafka.server.KafkaRaftServer.ControllerRole
import kafka.server.KafkaConfig
import kafka.utils.CoreUtils
import kafka.utils.FileLock
@@ -42,6 +41,7 @@ import org.apache.kafka.common.security.auth.SecurityProtocol
import org.apache.kafka.common.utils.{LogContext, Time}
import org.apache.kafka.raft.RaftConfig.{AddressSpec, InetAddressSpec,
NON_ROUTABLE_ADDRESS, UnknownAddressSpec}
import org.apache.kafka.raft.{FileBasedStateStore, KafkaNetworkChannel,
KafkaRaftClient, KafkaRaftClientDriver, LeaderAndEpoch, RaftClient, RaftConfig,
ReplicatedLog}
+import org.apache.kafka.server.ProcessRole
import org.apache.kafka.server.common.serialization.RecordSerde
import org.apache.kafka.server.util.KafkaScheduler
import org.apache.kafka.server.fault.FaultHandler
@@ -120,7 +120,7 @@ class KafkaRaftManager[T](
.map(Paths.get(_).toAbsolutePath)
.contains(Paths.get(config.metadataLogDir).toAbsolutePath)
// Or this node is only a controller
- val isOnlyController = config.processRoles == Set(ControllerRole)
+ val isOnlyController = config.processRoles ==
Set(ProcessRole.ControllerRole)
if (differentMetadataLogDir || isOnlyController) {
Some(KafkaRaftManager.lockDataDir(new File(config.metadataLogDir)))
diff --git a/core/src/main/scala/kafka/server/DynamicBrokerConfig.scala
b/core/src/main/scala/kafka/server/DynamicBrokerConfig.scala
index d63272c3731..7f607d49a4f 100755
--- a/core/src/main/scala/kafka/server/DynamicBrokerConfig.scala
+++ b/core/src/main/scala/kafka/server/DynamicBrokerConfig.scala
@@ -25,7 +25,6 @@ import kafka.cluster.EndPoint
import kafka.log.{LogCleaner, LogManager}
import kafka.network.{DataPlaneAcceptor, SocketServer}
import kafka.server.DynamicBrokerConfig._
-import kafka.server.KafkaRaftServer.BrokerRole
import kafka.utils.{CoreUtils, Logging, PasswordEncoder}
import kafka.utils.Implicits._
import kafka.zk.{AdminZkClient, KafkaZkClient}
@@ -36,6 +35,7 @@ import org.apache.kafka.common.config.types.Password
import org.apache.kafka.common.network.{ListenerName, ListenerReconfigurable}
import org.apache.kafka.common.security.authenticator.LoginManager
import org.apache.kafka.common.utils.{ConfigUtils, Utils}
+import org.apache.kafka.server.ProcessRole
import org.apache.kafka.server.config.{ConfigEntityName, ConfigType,
ServerTopicConfigSynonyms}
import org.apache.kafka.server.log.remote.storage.RemoteLogManagerConfig
import org.apache.kafka.server.metrics.ClientMetricsReceiverPlugin
@@ -287,7 +287,7 @@ class DynamicBrokerConfig(private val kafkaConfig:
KafkaConfig) extends Logging
case Some(authz: Reconfigurable) => addReconfigurable(authz)
case _ =>
}
- if (!kafkaConfig.processRoles.contains(BrokerRole)) {
+ if (!kafkaConfig.processRoles.contains(ProcessRole.BrokerRole)) {
// only add these if the controller isn't also running the broker role
// because these would already be added via the broker in that case
addReconfigurable(controller.kafkaYammerMetrics)
diff --git a/core/src/main/scala/kafka/server/KafkaConfig.scala
b/core/src/main/scala/kafka/server/KafkaConfig.scala
index 03d55c309e8..2afb1d64387 100755
--- a/core/src/main/scala/kafka/server/KafkaConfig.scala
+++ b/core/src/main/scala/kafka/server/KafkaConfig.scala
@@ -25,7 +25,6 @@ import kafka.coordinator.group.OffsetConfig
import kafka.coordinator.transaction.{TransactionLog, TransactionStateManager}
import kafka.security.authorizer.AuthorizerUtils
import kafka.server.KafkaConfig.{ControllerListenerNamesProp,
ListenerSecurityProtocolMapProp}
-import kafka.server.KafkaRaftServer.{BrokerRole, ControllerRole, ProcessRole}
import kafka.utils.CoreUtils.parseCsvList
import kafka.utils.{CoreUtils, Logging}
import kafka.utils.Implicits._
@@ -45,6 +44,7 @@ import org.apache.kafka.common.utils.Utils
import org.apache.kafka.coordinator.group.Group.GroupType
import org.apache.kafka.coordinator.group.assignor.{PartitionAssignor,
RangeAssignor, UniformAssignor}
import org.apache.kafka.raft.RaftConfig
+import org.apache.kafka.server.ProcessRole
import org.apache.kafka.server.authorizer.Authorizer
import org.apache.kafka.server.common.{MetadataVersion,
MetadataVersionValidator}
import org.apache.kafka.server.common.MetadataVersion._
@@ -1771,8 +1771,8 @@ class KafkaConfig private(doLog: Boolean, val props:
java.util.Map[_, _], dynami
private def parseProcessRoles(): Set[ProcessRole] = {
val roles = getList(KafkaConfig.ProcessRolesProp).asScala.map {
- case "broker" => BrokerRole
- case "controller" => ControllerRole
+ case "broker" => ProcessRole.BrokerRole
+ case "controller" => ProcessRole.ControllerRole
case role => throw new ConfigException(s"Unknown process role '$role'" +
" (only 'broker' and 'controller' are allowed roles)")
}
@@ -1787,7 +1787,7 @@ class KafkaConfig private(doLog: Boolean, val props:
java.util.Map[_, _], dynami
}
def isKRaftCombinedMode: Boolean = {
- processRoles == Set(BrokerRole, ControllerRole)
+ processRoles == Set(ProcessRole.BrokerRole, ProcessRole.ControllerRole)
}
def metadataLogDir: String = {
@@ -2362,9 +2362,9 @@ class KafkaConfig private(doLog: Boolean, val props:
java.util.Map[_, _], dynami
def validateAdvertisedListenersNonEmptyForBroker(): Unit = {
require(advertisedListenerNames.nonEmpty,
"There must be at least one advertised listener." + (
- if (processRoles.contains(BrokerRole)) s" Perhaps all listeners
appear in $ControllerListenerNamesProp?" else ""))
+ if (processRoles.contains(ProcessRole.BrokerRole)) s" Perhaps all
listeners appear in $ControllerListenerNamesProp?" else ""))
}
- if (processRoles == Set(BrokerRole)) {
+ if (processRoles == Set(ProcessRole.BrokerRole)) {
// KRaft broker-only
validateNonEmptyQuorumVotersForKRaft()
validateControlPlaneListenerEmptyForKRaft()
@@ -2391,7 +2391,7 @@ class KafkaConfig private(doLog: Boolean, val props:
java.util.Map[_, _], dynami
warn(s"${KafkaConfig.ControllerListenerNamesProp} has multiple
entries; only the first will be used since
${KafkaConfig.ProcessRolesProp}=broker: ${controllerListenerNames.asJava}")
}
validateAdvertisedListenersNonEmptyForBroker()
- } else if (processRoles == Set(ControllerRole)) {
+ } else if (processRoles == Set(ProcessRole.ControllerRole)) {
// KRaft controller-only
validateNonEmptyQuorumVotersForKRaft()
validateControlPlaneListenerEmptyForKRaft()
@@ -2439,7 +2439,7 @@ class KafkaConfig private(doLog: Boolean, val props:
java.util.Map[_, _], dynami
}
val listenerNames = listeners.map(_.listenerName).toSet
- if (processRoles.isEmpty || processRoles.contains(BrokerRole)) {
+ if (processRoles.isEmpty || processRoles.contains(ProcessRole.BrokerRole))
{
// validations for all broker setups (i.e. ZooKeeper and KRaft
broker-only and KRaft co-located)
validateAdvertisedListenersNonEmptyForBroker()
require(advertisedListenerNames.contains(interBrokerListenerName),
diff --git a/core/src/main/scala/kafka/server/KafkaRaftServer.scala
b/core/src/main/scala/kafka/server/KafkaRaftServer.scala
index 32bd6f4937b..4ae5a8b0237 100644
--- a/core/src/main/scala/kafka/server/KafkaRaftServer.scala
+++ b/core/src/main/scala/kafka/server/KafkaRaftServer.scala
@@ -20,7 +20,6 @@ import java.io.File
import java.util.concurrent.CompletableFuture
import kafka.log.UnifiedLog
import kafka.metrics.KafkaMetricsReporter
-import kafka.server.KafkaRaftServer.{BrokerRole, ControllerRole}
import kafka.utils.{CoreUtils, Logging, Mx4jLoader, VerifiableProperties}
import org.apache.kafka.common.config.{ConfigDef, ConfigResource}
import org.apache.kafka.common.internals.Topic
@@ -31,6 +30,7 @@ import
org.apache.kafka.metadata.bootstrap.{BootstrapDirectory, BootstrapMetadat
import
org.apache.kafka.metadata.properties.MetaPropertiesEnsemble.VerificationFlag.{REQUIRE_AT_LEAST_ONE_VALID,
REQUIRE_METADATA_LOG_DIR}
import org.apache.kafka.metadata.properties.{MetaProperties,
MetaPropertiesEnsemble}
import org.apache.kafka.raft.RaftConfig
+import org.apache.kafka.server.ProcessRole
import org.apache.kafka.server.config.ServerTopicConfigSynonyms
import org.apache.kafka.server.metrics.KafkaYammerMetrics
import org.apache.kafka.storage.internals.log.LogConfig
@@ -77,13 +77,13 @@ class KafkaRaftServer(
new StandardFaultHandlerFactory(),
)
- private val broker: Option[BrokerServer] = if
(config.processRoles.contains(BrokerRole)) {
+ private val broker: Option[BrokerServer] = if
(config.processRoles.contains(ProcessRole.BrokerRole)) {
Some(new BrokerServer(sharedServer))
} else {
None
}
- private val controller: Option[ControllerServer] = if
(config.processRoles.contains(ControllerRole)) {
+ private val controller: Option[ControllerServer] = if
(config.processRoles.contains(ProcessRole.ControllerRole)) {
Some(new ControllerServer(
sharedServer,
KafkaRaftServer.configSchema,
@@ -121,14 +121,6 @@ object KafkaRaftServer {
val MetadataPartition = Topic.CLUSTER_METADATA_TOPIC_PARTITION
val MetadataTopicId = Uuid.METADATA_TOPIC_ID
- sealed trait ProcessRole
- case object BrokerRole extends ProcessRole {
- override def toString: String = "broker"
- }
- case object ControllerRole extends ProcessRole {
- override def toString: String = "controller"
- }
-
/**
* Initialize the configured log directories, including both
[[KafkaConfig.MetadataLogDirProp]]
* and [[KafkaConfig.LogDirProp]]. This method performs basic validation to
ensure that all
diff --git a/core/src/main/scala/kafka/server/SharedServer.scala
b/core/src/main/scala/kafka/server/SharedServer.scala
index b093d66995f..31b4957e3f5 100644
--- a/core/src/main/scala/kafka/server/SharedServer.scala
+++ b/core/src/main/scala/kafka/server/SharedServer.scala
@@ -18,7 +18,6 @@
package kafka.server
import kafka.raft.KafkaRaftManager
-import kafka.server.KafkaRaftServer.{BrokerRole, ControllerRole}
import kafka.server.Server.MetricsPrefix
import kafka.server.metadata.BrokerServerMetrics
import kafka.utils.{CoreUtils, Logging}
@@ -33,6 +32,7 @@ import
org.apache.kafka.image.publisher.metrics.SnapshotEmitterMetrics
import org.apache.kafka.metadata.MetadataRecordSerde
import org.apache.kafka.metadata.properties.MetaPropertiesEnsemble
import org.apache.kafka.raft.RaftConfig.AddressSpec
+import org.apache.kafka.server.ProcessRole
import org.apache.kafka.server.common.ApiMessageAndVersion
import org.apache.kafka.server.fault.{FaultHandler, LoggingFaultHandler,
ProcessTerminatingFaultHandler}
import org.apache.kafka.server.metrics.KafkaYammerMetrics
@@ -171,7 +171,7 @@ class SharedServer(
*/
def metadataLoaderFaultHandler: FaultHandler = faultHandlerFactory.build(
name = "metadata loading",
- fatal = sharedServerConfig.processRoles.contains(ControllerRole),
+ fatal =
sharedServerConfig.processRoles.contains(ProcessRole.ControllerRole),
action = () => SharedServer.this.synchronized {
Option(brokerMetrics).foreach(_.metadataLoadErrorCount.getAndIncrement())
Option(controllerServerMetrics).foreach(_.incrementMetadataErrorCount())
@@ -247,10 +247,10 @@ class SharedServer(
}
sharedServerConfig.dynamicConfig.initialize(zkClientOpt = None,
clientMetricsReceiverPluginOpt = None)
- if (sharedServerConfig.processRoles.contains(BrokerRole)) {
+ if (sharedServerConfig.processRoles.contains(ProcessRole.BrokerRole)) {
brokerMetrics = BrokerServerMetrics(metrics)
}
- if (sharedServerConfig.processRoles.contains(ControllerRole)) {
+ if
(sharedServerConfig.processRoles.contains(ProcessRole.ControllerRole)) {
controllerServerMetrics = new
ControllerMetadataMetrics(Optional.of(KafkaYammerMetrics.defaultRegistry()))
}
val _raftManager = new KafkaRaftManager[ApiMessageAndVersion](
diff --git a/core/src/test/scala/unit/kafka/raft/RaftManagerTest.scala
b/core/src/test/scala/unit/kafka/raft/RaftManagerTest.scala
index d42d6141ac9..711426468c5 100644
--- a/core/src/test/scala/unit/kafka/raft/RaftManagerTest.scala
+++ b/core/src/test/scala/unit/kafka/raft/RaftManagerTest.scala
@@ -24,9 +24,6 @@ import java.util.Properties
import java.util.concurrent.CompletableFuture
import kafka.log.LogManager
import kafka.server.KafkaConfig
-import kafka.server.KafkaRaftServer.BrokerRole
-import kafka.server.KafkaRaftServer.ControllerRole
-import kafka.server.KafkaRaftServer.ProcessRole
import kafka.utils.TestUtils
import kafka.tools.TestRaftServer.ByteArraySerde
import org.apache.kafka.common.TopicPartition
@@ -34,6 +31,7 @@ import org.apache.kafka.common.Uuid
import org.apache.kafka.common.metrics.Metrics
import org.apache.kafka.common.utils.Time
import org.apache.kafka.raft.RaftConfig
+import org.apache.kafka.server.ProcessRole
import org.junit.jupiter.api.Assertions._
import org.junit.jupiter.api.Test
import org.junit.jupiter.params.ParameterizedTest
@@ -58,16 +56,16 @@ class RaftManagerTest {
props.setProperty(KafkaConfig.ProcessRolesProp, processRoles.mkString(","))
props.setProperty(KafkaConfig.NodeIdProp, nodeId.toString)
props.setProperty(KafkaConfig.ControllerListenerNamesProp, "SSL")
- if (processRoles.contains(BrokerRole)) {
+ if (processRoles.contains(ProcessRole.BrokerRole)) {
props.setProperty(KafkaConfig.InterBrokerListenerNameProp, "PLAINTEXT")
- if (processRoles.contains(ControllerRole)) { // co-located
+ if (processRoles.contains(ProcessRole.ControllerRole)) { // co-located
props.setProperty(KafkaConfig.ListenersProp,
"PLAINTEXT://localhost:9092,SSL://localhost:9093")
props.setProperty(KafkaConfig.QuorumVotersProp,
s"${nodeId}@localhost:9093")
} else { // broker-only
val voterId = nodeId + 1
props.setProperty(KafkaConfig.QuorumVotersProp,
s"${voterId}@localhost:9093")
}
- } else if (processRoles.contains(ControllerRole)) { // controller-only
+ } else if (processRoles.contains(ProcessRole.ControllerRole)) { //
controller-only
props.setProperty(KafkaConfig.ListenersProp, "SSL://localhost:9093")
props.setProperty(KafkaConfig.QuorumVotersProp,
s"${nodeId}@localhost:9093")
}
@@ -100,10 +98,10 @@ class RaftManagerTest {
def testNodeIdPresent(processRoles: String): Unit = {
var processRolesSet = Set.empty[ProcessRole]
if (processRoles.contains("broker")) {
- processRolesSet = processRolesSet ++ Set(BrokerRole)
+ processRolesSet = processRolesSet ++ Set(ProcessRole.BrokerRole)
}
if (processRoles.contains("controller")) {
- processRolesSet = processRolesSet ++ Set(ControllerRole)
+ processRolesSet = processRolesSet ++ Set(ProcessRole.ControllerRole)
}
val logDir = TestUtils.tempDir()
@@ -140,7 +138,7 @@ class RaftManagerTest {
val raftManager = createRaftManager(
new TopicPartition("__raft_id_test", 0),
createConfig(
- Set(ControllerRole),
+ Set(ProcessRole.ControllerRole),
nodeId,
logDir,
metadataDir
@@ -164,7 +162,7 @@ class RaftManagerTest {
val raftManager = createRaftManager(
new TopicPartition("__raft_id_test", 0),
createConfig(
- Set(BrokerRole),
+ Set(ProcessRole.BrokerRole),
nodeId,
logDir,
metadataDir
diff --git a/server/src/main/java/org/apache/kafka/server/ProcessRole.java
b/server/src/main/java/org/apache/kafka/server/ProcessRole.java
new file mode 100644
index 00000000000..e4f7f70f594
--- /dev/null
+++ b/server/src/main/java/org/apache/kafka/server/ProcessRole.java
@@ -0,0 +1,34 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.kafka.server;
+
+public enum ProcessRole {
+ BrokerRole("broker"),
+ ControllerRole("controller");
+
+ private final String roleName;
+
+ ProcessRole(String roleName) {
+ this.roleName = roleName;
+ }
+
+ @Override
+ public String toString() {
+ return roleName;
+ }
+}