This is an automated email from the ASF dual-hosted git repository.

ijuma pushed a commit to branch trunk
in repository https://gitbox.apache.org/repos/asf/kafka.git


The following commit(s) were added to refs/heads/trunk by this push:
     new 14daa23b590 KAFKA-18331: Make process.roles and node.id required 
configs (#18414)
14daa23b590 is described below

commit 14daa23b590304e0a224b24849a6eddac77b0973
Author: PoAn Yang <pay...@apache.org>
AuthorDate: Thu Jan 16 15:55:51 2025 +0800

    KAFKA-18331: Make process.roles and node.id required configs (#18414)
    
    In 4.0, there is no ZK mode and both of these configs are required in kraft 
mode.
    
    Reviewers: Ismael Juma <ism...@juma.me.uk>
---
 core/src/main/scala/kafka/server/KafkaConfig.scala |  16 ----
 .../test/scala/unit/kafka/KafkaConfigTest.scala    |  60 ++++++++++++
 .../scala/unit/kafka/server/KafkaConfigTest.scala  | 103 +++++++++++++--------
 .../test/scala/unit/kafka/server/ServerTest.scala  |  21 +----
 .../apache/kafka/server/config/KRaftConfigs.java   |   5 +-
 5 files changed, 126 insertions(+), 79 deletions(-)

diff --git a/core/src/main/scala/kafka/server/KafkaConfig.scala 
b/core/src/main/scala/kafka/server/KafkaConfig.scala
index a8b0cf4328a..e243d40bbb2 100755
--- a/core/src/main/scala/kafka/server/KafkaConfig.scala
+++ b/core/src/main/scala/kafka/server/KafkaConfig.scala
@@ -636,22 +636,6 @@ class KafkaConfig private(doLog: Boolean, val props: 
util.Map[_, _])
     if (nodeId != brokerId) {
       throw new ConfigException(s"You must set 
`${KRaftConfigs.NODE_ID_CONFIG}` to the same value as 
`${ServerConfigs.BROKER_ID_CONFIG}`.")
     }
-    if (requiresZookeeper) {
-      if (zkConnect == null) {
-        throw new ConfigException(s"Missing required configuration 
`${ZkConfigs.ZK_CONNECT_CONFIG}` which has no default value.")
-      }
-      if (brokerIdGenerationEnable) {
-        require(brokerId >= -1 && brokerId <= maxReservedBrokerId, "broker.id 
must be greater than or equal to -1 and not greater than 
reserved.broker.max.id")
-      } else {
-        require(brokerId >= 0, "broker.id must be greater than or equal to 0")
-      }
-    } else {
-      // KRaft-based metadata quorum
-      if (nodeId < 0) {
-        throw new ConfigException(s"Missing configuration 
`${KRaftConfigs.NODE_ID_CONFIG}` which is required " +
-          s"when `process.roles` is defined (i.e. when running in KRaft 
mode).")
-      }
-    }
     require(logRollTimeMillis >= 1, "log.roll.ms must be greater than or equal 
to 1")
     require(logRollTimeJitterMillis >= 0, "log.roll.jitter.ms must be greater 
than or equal to 0")
     require(logRetentionTimeMillis >= 1 || logRetentionTimeMillis == -1, 
"log.retention.ms must be unlimited (-1) or, greater than or equal to 1")
diff --git a/core/src/test/scala/unit/kafka/KafkaConfigTest.scala 
b/core/src/test/scala/unit/kafka/KafkaConfigTest.scala
index fc46b22a38a..8834f6f3608 100644
--- a/core/src/test/scala/unit/kafka/KafkaConfigTest.scala
+++ b/core/src/test/scala/unit/kafka/KafkaConfigTest.scala
@@ -41,6 +41,66 @@ class KafkaConfigTest {
   @AfterEach
   def tearDown(): Unit = Exit.resetExitProcedure()
 
+  @Test
+  def testBrokerRequiredProperties(): Unit = {
+    val properties = new Properties()
+    assertBadConfigContainingMessage(properties,
+      "Missing required configuration \"process.roles\" which has no default 
value.")
+
+    properties.put(KRaftConfigs.PROCESS_ROLES_CONFIG, "broker")
+    assertBadConfigContainingMessage(properties,
+      "Missing required configuration \"node.id\" which has no default value.")
+
+    properties.put(KRaftConfigs.NODE_ID_CONFIG, -1)
+    assertBadConfigContainingMessage(properties,
+      "Invalid value -1 for configuration node.id: Value must be at least 0")
+
+    properties.put(KRaftConfigs.NODE_ID_CONFIG, 0)
+    assertBadConfigContainingMessage(properties,
+      "If using process.roles, either controller.quorum.bootstrap.servers must 
contain the set of bootstrap controllers or controller.quorum.voters must 
contain a parseable set of controllers.")
+
+    properties.put(QuorumConfig.QUORUM_BOOTSTRAP_SERVERS_CONFIG, 
"localhost:9092")
+    assertBadConfigContainingMessage(properties,
+      "requirement failed: controller.listener.names must contain at least one 
value when running KRaft with just the broker role")
+
+    properties.put(KRaftConfigs.CONTROLLER_LISTENER_NAMES_CONFIG, "CONTROLLER")
+    KafkaConfig.fromProps(properties)
+  }
+
+  @Test
+  def testControllerRequiredProperties(): Unit = {
+    val properties = new Properties()
+    assertBadConfigContainingMessage(properties,
+      "Missing required configuration \"process.roles\" which has no default 
value.")
+
+    properties.put(KRaftConfigs.PROCESS_ROLES_CONFIG, "controller")
+    assertBadConfigContainingMessage(properties,
+      "Missing required configuration \"node.id\" which has no default value.")
+
+    properties.put(KRaftConfigs.NODE_ID_CONFIG, -1)
+    assertBadConfigContainingMessage(properties,
+      "Invalid value -1 for configuration node.id: Value must be at least 0")
+
+    properties.put(KRaftConfigs.NODE_ID_CONFIG, 0)
+    assertBadConfigContainingMessage(properties,
+      "If using process.roles, either controller.quorum.bootstrap.servers must 
contain the set of bootstrap controllers or controller.quorum.voters must 
contain a parseable set of controllers.")
+
+    properties.put(QuorumConfig.QUORUM_BOOTSTRAP_SERVERS_CONFIG, 
"localhost:9092")
+    assertBadConfigContainingMessage(properties,
+      "requirement failed: The listeners config must only contain KRaft 
controller listeners from controller.listener.names when 
process.roles=controller")
+
+    properties.put(SocketServerConfigs.LISTENERS_CONFIG, "CONTROLLER://:9092")
+    assertBadConfigContainingMessage(properties,
+      "No security protocol defined for listener CONTROLLER")
+
+    properties.put(SocketServerConfigs.LISTENER_SECURITY_PROTOCOL_MAP_CONFIG, 
"CONTROLLER:PLAINTEXT")
+    assertBadConfigContainingMessage(properties,
+      "requirement failed: The listeners config must only contain KRaft 
controller listeners from controller.listener.names when 
process.roles=controller")
+
+    properties.put(KRaftConfigs.CONTROLLER_LISTENER_NAMES_CONFIG, "CONTROLLER")
+    KafkaConfig.fromProps(properties)
+  }
+
   @Test
   def testGetKafkaConfigFromArgs(): Unit = {
     val propertiesFile = prepareDefaultConfig()
diff --git a/core/src/test/scala/unit/kafka/server/KafkaConfigTest.scala 
b/core/src/test/scala/unit/kafka/server/KafkaConfigTest.scala
index 2926b047180..971018ffc19 100755
--- a/core/src/test/scala/unit/kafka/server/KafkaConfigTest.scala
+++ b/core/src/test/scala/unit/kafka/server/KafkaConfigTest.scala
@@ -153,19 +153,22 @@ class KafkaConfigTest {
 
   @Test
   def testAdvertiseDefaults(): Unit = {
-    val port = 9999
+    val brokerProt = 9999
+    val controllerPort = 10000
     val hostName = "fake-host"
     val props = new Properties()
+    props.setProperty(KRaftConfigs.PROCESS_ROLES_CONFIG, "broker")
     props.setProperty(ServerConfigs.BROKER_ID_CONFIG, "1")
-    props.setProperty(ZkConfigs.ZK_CONNECT_CONFIG, "localhost:2181")
-    props.setProperty(SocketServerConfigs.LISTENERS_CONFIG, 
s"PLAINTEXT://$hostName:$port")
+    props.setProperty(QuorumConfig.QUORUM_BOOTSTRAP_SERVERS_CONFIG, 
s"$hostName:$controllerPort")
+    props.setProperty(KRaftConfigs.CONTROLLER_LISTENER_NAMES_CONFIG, 
"CONTROLLER")
+    props.setProperty(SocketServerConfigs.LISTENERS_CONFIG, 
s"PLAINTEXT://$hostName:$brokerProt")
     val serverConfig = KafkaConfig.fromProps(props)
 
     val endpoints = serverConfig.effectiveAdvertisedBrokerListeners
     assertEquals(1, endpoints.size)
     val endpoint = endpoints.find(_.securityProtocol == 
SecurityProtocol.PLAINTEXT).get
     assertEquals(endpoint.host, hostName)
-    assertEquals(endpoint.port, port)
+    assertEquals(endpoint.port, brokerProt)
   }
 
   @Test
@@ -187,8 +190,10 @@ class KafkaConfigTest {
   @Test
   def testDuplicateListeners(): Unit = {
     val props = new Properties()
+    props.setProperty(KRaftConfigs.PROCESS_ROLES_CONFIG, "broker")
     props.setProperty(ServerConfigs.BROKER_ID_CONFIG, "1")
-    props.setProperty(ZkConfigs.ZK_CONNECT_CONFIG, "localhost:2181")
+    props.setProperty(QuorumConfig.QUORUM_BOOTSTRAP_SERVERS_CONFIG, 
"localhost:9095")
+    props.setProperty(KRaftConfigs.CONTROLLER_LISTENER_NAMES_CONFIG, 
"CONTROLLER")
 
     // listeners with duplicate port
     props.setProperty(SocketServerConfigs.LISTENERS_CONFIG, 
"PLAINTEXT://localhost:9091,SSL://localhost:9091")
@@ -199,7 +204,7 @@ class KafkaConfigTest {
     assertBadConfigContainingMessage(props, "Each listener must have a 
different name")
 
     // advertised listeners can have duplicate ports
-    
props.setProperty(SocketServerConfigs.LISTENER_SECURITY_PROTOCOL_MAP_CONFIG, 
"HOST:SASL_SSL,LB:SASL_SSL")
+    
props.setProperty(SocketServerConfigs.LISTENER_SECURITY_PROTOCOL_MAP_CONFIG, 
"HOST:SASL_SSL,LB:SASL_SSL,CONTROLLER:SASL_SSL")
     props.setProperty(ReplicationConfigs.INTER_BROKER_LISTENER_NAME_CONFIG, 
"HOST")
     props.setProperty(SocketServerConfigs.LISTENERS_CONFIG, 
"HOST://localhost:9091,LB://localhost:9092")
     props.setProperty(SocketServerConfigs.ADVERTISED_LISTENERS_CONFIG, 
"HOST://localhost:9091,LB://localhost:9091")
@@ -213,8 +218,11 @@ class KafkaConfigTest {
   @Test
   def testIPv4AndIPv6SamePortListeners(): Unit = {
     val props = new Properties()
+    props.setProperty(KRaftConfigs.PROCESS_ROLES_CONFIG, "broker")
     props.put(ServerConfigs.BROKER_ID_CONFIG, "1")
-    props.put(ZkConfigs.ZK_CONNECT_CONFIG, "localhost:2181")
+    props.setProperty(QuorumConfig.QUORUM_BOOTSTRAP_SERVERS_CONFIG, 
"CONTROLLER://localhost:9091")
+    props.setProperty(KRaftConfigs.CONTROLLER_LISTENER_NAMES_CONFIG, 
"CONTROLLER")
+    
props.setProperty(SocketServerConfigs.LISTENER_SECURITY_PROTOCOL_MAP_CONFIG, 
SocketServerConfigs.LISTENER_SECURITY_PROTOCOL_MAP_DEFAULT + 
",CONTROLLER:PLAINTEXT")
 
     props.put(SocketServerConfigs.LISTENERS_CONFIG, 
"PLAINTEXT://[::1]:9092,SSL://[::1]:9092")
     var caught = assertThrows(classOf[IllegalArgumentException], () => 
KafkaConfig.fromProps(props))
@@ -464,24 +472,13 @@ class KafkaConfigTest {
       
KafkaConfig.fromProps(props).effectiveListenerSecurityProtocolMap.get(new 
ListenerName("CONTROLLER2")))
   }
 
-  @Test
-  def testControllerListenerNameDoesNotMapToPlaintextByDefaultForNonKRaft(): 
Unit = {
-    val props = new Properties()
-    props.setProperty(ServerConfigs.BROKER_ID_CONFIG, "1")
-    props.setProperty(ZkConfigs.ZK_CONNECT_CONFIG, "localhost:2181")
-    props.setProperty(SocketServerConfigs.LISTENERS_CONFIG, 
"CONTROLLER://localhost:9092")
-    assertBadConfigContainingMessage(props,
-      "Error creating broker listeners from 'CONTROLLER://localhost:9092': No 
security protocol defined for listener CONTROLLER")
-    // Valid now
-    props.setProperty(SocketServerConfigs.LISTENERS_CONFIG, 
"PLAINTEXT://localhost:9092")
-    assertEquals(None, 
KafkaConfig.fromProps(props).effectiveListenerSecurityProtocolMap.get(new 
ListenerName("CONTROLLER")))
-  }
-
   @Test
   def testBadListenerProtocol(): Unit = {
     val props = new Properties()
+    props.setProperty(KRaftConfigs.PROCESS_ROLES_CONFIG, "broker")
     props.setProperty(ServerConfigs.BROKER_ID_CONFIG, "1")
-    props.setProperty(ZkConfigs.ZK_CONNECT_CONFIG, "localhost:2181")
+    props.setProperty(QuorumConfig.QUORUM_BOOTSTRAP_SERVERS_CONFIG, 
"CONTROLLER://localhost:9092")
+    props.setProperty(KRaftConfigs.CONTROLLER_LISTENER_NAMES_CONFIG, 
"CONTROLLER")
     props.setProperty(SocketServerConfigs.LISTENERS_CONFIG, 
"BAD://localhost:9091")
 
     assertFalse(isValidKafkaConfig(props))
@@ -490,11 +487,13 @@ class KafkaConfigTest {
   @Test
   def testListenerNamesWithAdvertisedListenerUnset(): Unit = {
     val props = new Properties()
+    props.setProperty(KRaftConfigs.PROCESS_ROLES_CONFIG, "broker")
     props.setProperty(ServerConfigs.BROKER_ID_CONFIG, "1")
-    props.setProperty(ZkConfigs.ZK_CONNECT_CONFIG, "localhost:2181")
+    props.setProperty(QuorumConfig.QUORUM_BOOTSTRAP_SERVERS_CONFIG, 
"CONTROLLER://localhost:9092")
+    props.setProperty(KRaftConfigs.CONTROLLER_LISTENER_NAMES_CONFIG, 
"CONTROLLER")
 
     props.setProperty(SocketServerConfigs.LISTENERS_CONFIG, 
"CLIENT://localhost:9091,REPLICATION://localhost:9092,INTERNAL://localhost:9093")
-    
props.setProperty(SocketServerConfigs.LISTENER_SECURITY_PROTOCOL_MAP_CONFIG, 
"CLIENT:SSL,REPLICATION:SSL,INTERNAL:PLAINTEXT")
+    
props.setProperty(SocketServerConfigs.LISTENER_SECURITY_PROTOCOL_MAP_CONFIG, 
"CLIENT:SSL,REPLICATION:SSL,INTERNAL:PLAINTEXT,CONTROLLER:PLAINTEXT")
     props.setProperty(ReplicationConfigs.INTER_BROKER_LISTENER_NAME_CONFIG, 
"REPLICATION")
     val config = KafkaConfig.fromProps(props)
     val expectedListeners = Seq(
@@ -506,7 +505,8 @@ class KafkaConfigTest {
     val expectedSecurityProtocolMap = Map(
       new ListenerName("CLIENT") -> SecurityProtocol.SSL,
       new ListenerName("REPLICATION") -> SecurityProtocol.SSL,
-      new ListenerName("INTERNAL") -> SecurityProtocol.PLAINTEXT
+      new ListenerName("INTERNAL") -> SecurityProtocol.PLAINTEXT,
+      new ListenerName("CONTROLLER") -> SecurityProtocol.PLAINTEXT
     )
     assertEquals(expectedSecurityProtocolMap, 
config.effectiveListenerSecurityProtocolMap)
   }
@@ -514,12 +514,14 @@ class KafkaConfigTest {
   @Test
   def testListenerAndAdvertisedListenerNames(): Unit = {
     val props = new Properties()
+    props.setProperty(KRaftConfigs.PROCESS_ROLES_CONFIG, "broker")
     props.setProperty(ServerConfigs.BROKER_ID_CONFIG, "1")
-    props.setProperty(ZkConfigs.ZK_CONNECT_CONFIG, "localhost:2181")
+    props.setProperty(QuorumConfig.QUORUM_BOOTSTRAP_SERVERS_CONFIG, 
"CONTROLLER://localhost:9092")
+    props.setProperty(KRaftConfigs.CONTROLLER_LISTENER_NAMES_CONFIG, 
"CONTROLLER")
 
     props.setProperty(SocketServerConfigs.LISTENERS_CONFIG, 
"EXTERNAL://localhost:9091,INTERNAL://localhost:9093")
     props.setProperty(SocketServerConfigs.ADVERTISED_LISTENERS_CONFIG, 
"EXTERNAL://lb1.example.com:9000,INTERNAL://host1:9093")
-    
props.setProperty(SocketServerConfigs.LISTENER_SECURITY_PROTOCOL_MAP_CONFIG, 
"EXTERNAL:SSL,INTERNAL:PLAINTEXT")
+    
props.setProperty(SocketServerConfigs.LISTENER_SECURITY_PROTOCOL_MAP_CONFIG, 
"EXTERNAL:SSL,INTERNAL:PLAINTEXT,CONTROLLER:PLAINTEXT")
     props.setProperty(ReplicationConfigs.INTER_BROKER_LISTENER_NAME_CONFIG, 
"INTERNAL")
     val config = KafkaConfig.fromProps(props)
 
@@ -537,7 +539,8 @@ class KafkaConfigTest {
 
     val expectedSecurityProtocolMap = Map(
       new ListenerName("EXTERNAL") -> SecurityProtocol.SSL,
-      new ListenerName("INTERNAL") -> SecurityProtocol.PLAINTEXT
+      new ListenerName("INTERNAL") -> SecurityProtocol.PLAINTEXT,
+      new ListenerName("CONTROLLER") -> SecurityProtocol.PLAINTEXT
     )
     assertEquals(expectedSecurityProtocolMap, 
config.effectiveListenerSecurityProtocolMap)
   }
@@ -579,9 +582,12 @@ class KafkaConfigTest {
   @Test
   def testCaseInsensitiveListenerProtocol(): Unit = {
     val props = new Properties()
+    props.setProperty(KRaftConfigs.PROCESS_ROLES_CONFIG, "broker")
     props.setProperty(ServerConfigs.BROKER_ID_CONFIG, "1")
-    props.setProperty(ZkConfigs.ZK_CONNECT_CONFIG, "localhost:2181")
+    props.setProperty(QuorumConfig.QUORUM_BOOTSTRAP_SERVERS_CONFIG, 
"CONTROLLER://localhost:9093")
+    props.setProperty(KRaftConfigs.CONTROLLER_LISTENER_NAMES_CONFIG, 
"CONTROLLER")
     props.setProperty(SocketServerConfigs.LISTENERS_CONFIG, 
"plaintext://localhost:9091,SsL://localhost:9092")
+    
props.setProperty(SocketServerConfigs.LISTENER_SECURITY_PROTOCOL_MAP_CONFIG, 
"PLAINTEXT:PLAINTEXT,SSL:SSL,CONTROLLER:PLAINTEXT")
     val config = KafkaConfig.fromProps(props)
     assertEquals(Some("SSL://localhost:9092"), 
config.listeners.find(_.listenerName.value == "SSL").map(_.connectionString))
     assertEquals(Some("PLAINTEXT://localhost:9091"), 
config.listeners.find(_.listenerName.value == 
"PLAINTEXT").map(_.connectionString))
@@ -594,8 +600,10 @@ class KafkaConfigTest {
   @Test
   def testListenerDefaults(): Unit = {
     val props = new Properties()
+    props.setProperty(KRaftConfigs.PROCESS_ROLES_CONFIG, "broker")
     props.setProperty(ServerConfigs.BROKER_ID_CONFIG, "1")
-    props.setProperty(ZkConfigs.ZK_CONNECT_CONFIG, "localhost:2181")
+    props.setProperty(QuorumConfig.QUORUM_BOOTSTRAP_SERVERS_CONFIG, 
"localhost:9093")
+    props.setProperty(KRaftConfigs.CONTROLLER_LISTENER_NAMES_CONFIG, 
"CONTROLLER")
 
     // configuration with no listeners
     val conf = KafkaConfig.fromProps(props)
@@ -607,10 +615,12 @@ class KafkaConfigTest {
   @Test
   def testVersionConfiguration(): Unit = {
     val props = new Properties()
+    props.setProperty(KRaftConfigs.PROCESS_ROLES_CONFIG, "broker")
     props.setProperty(ServerConfigs.BROKER_ID_CONFIG, "1")
-    props.setProperty(ZkConfigs.ZK_CONNECT_CONFIG, "localhost:2181")
+    props.setProperty(QuorumConfig.QUORUM_BOOTSTRAP_SERVERS_CONFIG, 
"localhost:9092")
+    props.setProperty(KRaftConfigs.CONTROLLER_LISTENER_NAMES_CONFIG, 
"CONTROLLER")
     val conf = KafkaConfig.fromProps(props)
-    assertEquals(MetadataVersion.latestProduction, 
conf.interBrokerProtocolVersion)
+    assertEquals(MetadataVersion.MINIMUM_KRAFT_VERSION, 
conf.interBrokerProtocolVersion)
 
     props.setProperty(ReplicationConfigs.INTER_BROKER_PROTOCOL_VERSION_CONFIG, 
"3.0.0-IV1")
     val conf2 = KafkaConfig.fromProps(props)
@@ -773,7 +783,10 @@ class KafkaConfigTest {
   def testFromPropsInvalid(): Unit = {
     def baseProperties: Properties = {
       val validRequiredProperties = new Properties()
-      validRequiredProperties.setProperty(ZkConfigs.ZK_CONNECT_CONFIG, 
"127.0.0.1:2181")
+      validRequiredProperties.setProperty(KRaftConfigs.PROCESS_ROLES_CONFIG, 
"broker")
+      validRequiredProperties.setProperty(ServerConfigs.BROKER_ID_CONFIG, "1")
+      
validRequiredProperties.setProperty(QuorumConfig.QUORUM_BOOTSTRAP_SERVERS_CONFIG,
 "localhost:9092")
+      
validRequiredProperties.setProperty(KRaftConfigs.CONTROLLER_LISTENER_NAMES_CONFIG,
 "CONTROLLER")
       validRequiredProperties
     }
     // to ensure a basis is valid - bootstraps all needed validation
@@ -1070,7 +1083,10 @@ class KafkaConfigTest {
   def testDynamicLogConfigs(): Unit = {
     def baseProperties: Properties = {
       val validRequiredProperties = new Properties()
-      validRequiredProperties.setProperty(ZkConfigs.ZK_CONNECT_CONFIG, 
"127.0.0.1:2181")
+      validRequiredProperties.setProperty(KRaftConfigs.PROCESS_ROLES_CONFIG, 
"broker")
+      validRequiredProperties.setProperty(ServerConfigs.BROKER_ID_CONFIG, "1")
+      
validRequiredProperties.setProperty(QuorumConfig.QUORUM_BOOTSTRAP_SERVERS_CONFIG,
 "localhost:9093")
+      
validRequiredProperties.setProperty(KRaftConfigs.CONTROLLER_LISTENER_NAMES_CONFIG,
 "CONTROLLER")
       validRequiredProperties
     }
 
@@ -1161,7 +1177,9 @@ class KafkaConfigTest {
   @Test
   def testSpecificProperties(): Unit = {
     val defaults = new Properties()
-    defaults.setProperty(ZkConfigs.ZK_CONNECT_CONFIG, "127.0.0.1:2181")
+    defaults.setProperty(KRaftConfigs.PROCESS_ROLES_CONFIG, "broker")
+    defaults.setProperty(QuorumConfig.QUORUM_BOOTSTRAP_SERVERS_CONFIG, 
"CONTROLLER://localhost:9092")
+    defaults.setProperty(KRaftConfigs.CONTROLLER_LISTENER_NAMES_CONFIG, 
"CONTROLLER")
     // For ZkConnectionTimeoutMs
     defaults.setProperty(ZkConfigs.ZK_SESSION_TIMEOUT_MS_CONFIG, "1234")
     defaults.setProperty(ServerConfigs.BROKER_ID_GENERATION_ENABLE_CONFIG, 
"false")
@@ -1180,7 +1198,6 @@ class KafkaConfigTest {
     defaults.setProperty(MetricConfigs.METRIC_RECORDING_LEVEL_CONFIG, 
Sensor.RecordingLevel.DEBUG.toString)
 
     val config = KafkaConfig.fromProps(defaults)
-    assertEquals("127.0.0.1:2181", config.zkConnect)
     assertEquals(1234, config.zkConnectionTimeoutMs)
     assertEquals(false, config.brokerIdGenerationEnable)
     assertEquals(1, config.maxReservedBrokerId)
@@ -1207,9 +1224,12 @@ class KafkaConfigTest {
   @Test
   def testNonroutableAdvertisedListeners(): Unit = {
     val props = new Properties()
-    props.setProperty(ZkConfigs.ZK_CONNECT_CONFIG, "127.0.0.1:2181")
+    props.setProperty(KRaftConfigs.PROCESS_ROLES_CONFIG, "broker")
+    props.setProperty(ServerConfigs.BROKER_ID_CONFIG, "1")
+    props.setProperty(QuorumConfig.QUORUM_BOOTSTRAP_SERVERS_CONFIG, 
"CONTROLLER://localhost:9092")
+    props.setProperty(KRaftConfigs.CONTROLLER_LISTENER_NAMES_CONFIG, 
"CONTROLLER")
     props.setProperty(SocketServerConfigs.LISTENERS_CONFIG, 
"PLAINTEXT://0.0.0.0:9092")
-    assertFalse(isValidKafkaConfig(props))
+    assertBadConfigContainingMessage(props, "advertised.listeners cannot use 
the nonroutable meta-address 0.0.0.0. Use a routable IP address.")
   }
 
   @Test
@@ -1513,6 +1533,7 @@ class KafkaConfigTest {
   @Test
   def testNodeIdMustNotBeDifferentThanBrokerId(): Unit = {
     val props = new Properties()
+    props.setProperty(KRaftConfigs.PROCESS_ROLES_CONFIG, "broker")
     props.setProperty(ServerConfigs.BROKER_ID_CONFIG, "1")
     props.setProperty(KRaftConfigs.NODE_ID_CONFIG, "2")
     assertEquals("You must set `node.id` to the same value as `broker.id`.",
@@ -1524,8 +1545,7 @@ class KafkaConfigTest {
     val props = new Properties()
     props.setProperty(KRaftConfigs.PROCESS_ROLES_CONFIG, "broker")
     props.setProperty(QuorumConfig.QUORUM_VOTERS_CONFIG, "2@localhost:9093")
-    assertEquals("Missing configuration `node.id` which is required when 
`process.roles` " +
-      "is defined (i.e. when running in KRaft mode).",
+    assertEquals("Missing required configuration \"node.id\" which has no 
default value.",
       assertThrows(classOf[ConfigException], () => 
KafkaConfig.fromProps(props)).getMessage())
   }
 
@@ -1568,7 +1588,10 @@ class KafkaConfigTest {
   @Test
   def testSaslJwksEndpointRetryDefaults(): Unit = {
     val props = new Properties()
-    props.setProperty(ZkConfigs.ZK_CONNECT_CONFIG, "localhost:2181")
+    props.setProperty(KRaftConfigs.PROCESS_ROLES_CONFIG, "broker")
+    props.setProperty(ServerConfigs.BROKER_ID_CONFIG, "1")
+    props.setProperty(QuorumConfig.QUORUM_BOOTSTRAP_SERVERS_CONFIG, 
"CONTROLLER://localhost:9092")
+    props.setProperty(KRaftConfigs.CONTROLLER_LISTENER_NAMES_CONFIG, 
"CONTROLLER")
     val config = KafkaConfig.fromProps(props)
     
assertNotNull(config.getLong(SaslConfigs.SASL_OAUTHBEARER_JWKS_ENDPOINT_RETRY_BACKOFF_MS))
     
assertNotNull(config.getLong(SaslConfigs.SASL_OAUTHBEARER_JWKS_ENDPOINT_RETRY_BACKOFF_MAX_MS))
diff --git a/core/src/test/scala/unit/kafka/server/ServerTest.scala 
b/core/src/test/scala/unit/kafka/server/ServerTest.scala
index 62345c446e2..4b2b900b375 100644
--- a/core/src/test/scala/unit/kafka/server/ServerTest.scala
+++ b/core/src/test/scala/unit/kafka/server/ServerTest.scala
@@ -20,7 +20,7 @@ import java.util.Properties
 import org.apache.kafka.common.Uuid
 import org.apache.kafka.common.metrics.MetricsContext
 import org.apache.kafka.raft.QuorumConfig
-import org.apache.kafka.server.config.{KRaftConfigs, ServerConfigs, ZkConfigs}
+import org.apache.kafka.server.config.KRaftConfigs
 import org.junit.jupiter.api.Assertions._
 import org.junit.jupiter.api.Test
 
@@ -47,23 +47,4 @@ class ServerTest {
       Server.NodeIdLabel -> nodeId.toString
     ), context.contextLabels.asScala)
   }
-
-  @Test
-  def testCreateZkKafkaMetricsContext(): Unit = {
-    val brokerId = 0
-    val clusterId = Uuid.randomUuid().toString
-
-    val props = new Properties()
-    props.put(ServerConfigs.BROKER_ID_CONFIG, brokerId.toString)
-    props.put(ZkConfigs.ZK_CONNECT_CONFIG, "127.0.0.1:0")
-    val config = KafkaConfig.fromProps(props)
-
-    val context = Server.createKafkaMetricsContext(config, clusterId)
-    assertEquals(Map(
-      MetricsContext.NAMESPACE -> Server.MetricsPrefix,
-      Server.ClusterIdLabel -> clusterId,
-      Server.BrokerIdLabel -> brokerId.toString
-    ), context.contextLabels.asScala)
-  }
-
 }
diff --git 
a/server/src/main/java/org/apache/kafka/server/config/KRaftConfigs.java 
b/server/src/main/java/org/apache/kafka/server/config/KRaftConfigs.java
index a693471b7b3..3f07c0b7547 100644
--- a/server/src/main/java/org/apache/kafka/server/config/KRaftConfigs.java
+++ b/server/src/main/java/org/apache/kafka/server/config/KRaftConfigs.java
@@ -21,7 +21,6 @@ import org.apache.kafka.common.config.SaslConfigs;
 import org.apache.kafka.common.record.Records;
 import org.apache.kafka.storage.internals.log.LogConfig;
 
-import java.util.Collections;
 import java.util.concurrent.TimeUnit;
 
 import static org.apache.kafka.common.config.ConfigDef.Importance.HIGH;
@@ -125,8 +124,8 @@ public class KRaftConfigs {
     public static final ConfigDef CONFIG_DEF =  new ConfigDef()
             .define(METADATA_SNAPSHOT_MAX_NEW_RECORD_BYTES_CONFIG, LONG, 
METADATA_SNAPSHOT_MAX_NEW_RECORD_BYTES, atLeast(1), HIGH, 
METADATA_SNAPSHOT_MAX_NEW_RECORD_BYTES_DOC)
             .define(METADATA_SNAPSHOT_MAX_INTERVAL_MS_CONFIG, LONG, 
METADATA_SNAPSHOT_MAX_INTERVAL_MS_DEFAULT, atLeast(0), HIGH, 
METADATA_SNAPSHOT_MAX_INTERVAL_MS_DOC)
-            .define(PROCESS_ROLES_CONFIG, LIST, Collections.emptyList(), 
ConfigDef.ValidList.in("broker", "controller"), HIGH, PROCESS_ROLES_DOC)
-            .define(NODE_ID_CONFIG, INT, EMPTY_NODE_ID, null, HIGH, 
NODE_ID_DOC)
+            .define(PROCESS_ROLES_CONFIG, LIST, ConfigDef.NO_DEFAULT_VALUE, 
ConfigDef.ValidList.in("broker", "controller"), HIGH, PROCESS_ROLES_DOC)
+            .define(NODE_ID_CONFIG, INT, ConfigDef.NO_DEFAULT_VALUE, 
atLeast(0), HIGH, NODE_ID_DOC)
             .define(INITIAL_BROKER_REGISTRATION_TIMEOUT_MS_CONFIG, INT, 
INITIAL_BROKER_REGISTRATION_TIMEOUT_MS_DEFAULT, null, MEDIUM, 
INITIAL_BROKER_REGISTRATION_TIMEOUT_MS_DOC)
             .define(BROKER_HEARTBEAT_INTERVAL_MS_CONFIG, INT, 
BROKER_HEARTBEAT_INTERVAL_MS_DEFAULT, null, MEDIUM, 
BROKER_HEARTBEAT_INTERVAL_MS_DOC)
             .define(BROKER_SESSION_TIMEOUT_MS_CONFIG, INT, 
BROKER_SESSION_TIMEOUT_MS_DEFAULT, null, MEDIUM, BROKER_SESSION_TIMEOUT_MS_DOC)

Reply via email to