This is an automated email from the ASF dual-hosted git repository. ijuma pushed a commit to branch trunk in repository https://gitbox.apache.org/repos/asf/kafka.git
The following commit(s) were added to refs/heads/trunk by this push: new 14daa23b590 KAFKA-18331: Make process.roles and node.id required configs (#18414) 14daa23b590 is described below commit 14daa23b590304e0a224b24849a6eddac77b0973 Author: PoAn Yang <pay...@apache.org> AuthorDate: Thu Jan 16 15:55:51 2025 +0800 KAFKA-18331: Make process.roles and node.id required configs (#18414) In 4.0, there is no ZK mode and both of these configs are required in kraft mode. Reviewers: Ismael Juma <ism...@juma.me.uk> --- core/src/main/scala/kafka/server/KafkaConfig.scala | 16 ---- .../test/scala/unit/kafka/KafkaConfigTest.scala | 60 ++++++++++++ .../scala/unit/kafka/server/KafkaConfigTest.scala | 103 +++++++++++++-------- .../test/scala/unit/kafka/server/ServerTest.scala | 21 +---- .../apache/kafka/server/config/KRaftConfigs.java | 5 +- 5 files changed, 126 insertions(+), 79 deletions(-) diff --git a/core/src/main/scala/kafka/server/KafkaConfig.scala b/core/src/main/scala/kafka/server/KafkaConfig.scala index a8b0cf4328a..e243d40bbb2 100755 --- a/core/src/main/scala/kafka/server/KafkaConfig.scala +++ b/core/src/main/scala/kafka/server/KafkaConfig.scala @@ -636,22 +636,6 @@ class KafkaConfig private(doLog: Boolean, val props: util.Map[_, _]) if (nodeId != brokerId) { throw new ConfigException(s"You must set `${KRaftConfigs.NODE_ID_CONFIG}` to the same value as `${ServerConfigs.BROKER_ID_CONFIG}`.") } - if (requiresZookeeper) { - if (zkConnect == null) { - throw new ConfigException(s"Missing required configuration `${ZkConfigs.ZK_CONNECT_CONFIG}` which has no default value.") - } - if (brokerIdGenerationEnable) { - require(brokerId >= -1 && brokerId <= maxReservedBrokerId, "broker.id must be greater than or equal to -1 and not greater than reserved.broker.max.id") - } else { - require(brokerId >= 0, "broker.id must be greater than or equal to 0") - } - } else { - // KRaft-based metadata quorum - if (nodeId < 0) { - throw new ConfigException(s"Missing configuration `${KRaftConfigs.NODE_ID_CONFIG}` which is required " + - s"when `process.roles` is defined (i.e. when running in KRaft mode).") - } - } require(logRollTimeMillis >= 1, "log.roll.ms must be greater than or equal to 1") require(logRollTimeJitterMillis >= 0, "log.roll.jitter.ms must be greater than or equal to 0") require(logRetentionTimeMillis >= 1 || logRetentionTimeMillis == -1, "log.retention.ms must be unlimited (-1) or, greater than or equal to 1") diff --git a/core/src/test/scala/unit/kafka/KafkaConfigTest.scala b/core/src/test/scala/unit/kafka/KafkaConfigTest.scala index fc46b22a38a..8834f6f3608 100644 --- a/core/src/test/scala/unit/kafka/KafkaConfigTest.scala +++ b/core/src/test/scala/unit/kafka/KafkaConfigTest.scala @@ -41,6 +41,66 @@ class KafkaConfigTest { @AfterEach def tearDown(): Unit = Exit.resetExitProcedure() + @Test + def testBrokerRequiredProperties(): Unit = { + val properties = new Properties() + assertBadConfigContainingMessage(properties, + "Missing required configuration \"process.roles\" which has no default value.") + + properties.put(KRaftConfigs.PROCESS_ROLES_CONFIG, "broker") + assertBadConfigContainingMessage(properties, + "Missing required configuration \"node.id\" which has no default value.") + + properties.put(KRaftConfigs.NODE_ID_CONFIG, -1) + assertBadConfigContainingMessage(properties, + "Invalid value -1 for configuration node.id: Value must be at least 0") + + properties.put(KRaftConfigs.NODE_ID_CONFIG, 0) + assertBadConfigContainingMessage(properties, + "If using process.roles, either controller.quorum.bootstrap.servers must contain the set of bootstrap controllers or controller.quorum.voters must contain a parseable set of controllers.") + + properties.put(QuorumConfig.QUORUM_BOOTSTRAP_SERVERS_CONFIG, "localhost:9092") + assertBadConfigContainingMessage(properties, + "requirement failed: controller.listener.names must contain at least one value when running KRaft with just the broker role") + + properties.put(KRaftConfigs.CONTROLLER_LISTENER_NAMES_CONFIG, "CONTROLLER") + KafkaConfig.fromProps(properties) + } + + @Test + def testControllerRequiredProperties(): Unit = { + val properties = new Properties() + assertBadConfigContainingMessage(properties, + "Missing required configuration \"process.roles\" which has no default value.") + + properties.put(KRaftConfigs.PROCESS_ROLES_CONFIG, "controller") + assertBadConfigContainingMessage(properties, + "Missing required configuration \"node.id\" which has no default value.") + + properties.put(KRaftConfigs.NODE_ID_CONFIG, -1) + assertBadConfigContainingMessage(properties, + "Invalid value -1 for configuration node.id: Value must be at least 0") + + properties.put(KRaftConfigs.NODE_ID_CONFIG, 0) + assertBadConfigContainingMessage(properties, + "If using process.roles, either controller.quorum.bootstrap.servers must contain the set of bootstrap controllers or controller.quorum.voters must contain a parseable set of controllers.") + + properties.put(QuorumConfig.QUORUM_BOOTSTRAP_SERVERS_CONFIG, "localhost:9092") + assertBadConfigContainingMessage(properties, + "requirement failed: The listeners config must only contain KRaft controller listeners from controller.listener.names when process.roles=controller") + + properties.put(SocketServerConfigs.LISTENERS_CONFIG, "CONTROLLER://:9092") + assertBadConfigContainingMessage(properties, + "No security protocol defined for listener CONTROLLER") + + properties.put(SocketServerConfigs.LISTENER_SECURITY_PROTOCOL_MAP_CONFIG, "CONTROLLER:PLAINTEXT") + assertBadConfigContainingMessage(properties, + "requirement failed: The listeners config must only contain KRaft controller listeners from controller.listener.names when process.roles=controller") + + properties.put(KRaftConfigs.CONTROLLER_LISTENER_NAMES_CONFIG, "CONTROLLER") + KafkaConfig.fromProps(properties) + } + @Test def testGetKafkaConfigFromArgs(): Unit = { val propertiesFile = prepareDefaultConfig() diff --git a/core/src/test/scala/unit/kafka/server/KafkaConfigTest.scala b/core/src/test/scala/unit/kafka/server/KafkaConfigTest.scala index 2926b047180..971018ffc19 100755 --- a/core/src/test/scala/unit/kafka/server/KafkaConfigTest.scala +++ b/core/src/test/scala/unit/kafka/server/KafkaConfigTest.scala @@ -153,19 +153,22 @@ class KafkaConfigTest { @Test def testAdvertiseDefaults(): Unit = { - val port = 9999 + val brokerProt = 9999 + val controllerPort = 10000 val hostName = "fake-host" val props = new Properties() + props.setProperty(KRaftConfigs.PROCESS_ROLES_CONFIG, "broker") props.setProperty(ServerConfigs.BROKER_ID_CONFIG, "1") - props.setProperty(ZkConfigs.ZK_CONNECT_CONFIG, "localhost:2181") - props.setProperty(SocketServerConfigs.LISTENERS_CONFIG, s"PLAINTEXT://$hostName:$port") + props.setProperty(QuorumConfig.QUORUM_BOOTSTRAP_SERVERS_CONFIG, s"$hostName:$controllerPort") + props.setProperty(KRaftConfigs.CONTROLLER_LISTENER_NAMES_CONFIG, "CONTROLLER") + props.setProperty(SocketServerConfigs.LISTENERS_CONFIG, s"PLAINTEXT://$hostName:$brokerProt") val serverConfig = KafkaConfig.fromProps(props) val endpoints = serverConfig.effectiveAdvertisedBrokerListeners assertEquals(1, endpoints.size) val endpoint = endpoints.find(_.securityProtocol == SecurityProtocol.PLAINTEXT).get assertEquals(endpoint.host, hostName) - assertEquals(endpoint.port, port) + assertEquals(endpoint.port, brokerProt) } @Test @@ -187,8 +190,10 @@ class KafkaConfigTest { @Test def testDuplicateListeners(): Unit = { val props = new Properties() + props.setProperty(KRaftConfigs.PROCESS_ROLES_CONFIG, "broker") props.setProperty(ServerConfigs.BROKER_ID_CONFIG, "1") - props.setProperty(ZkConfigs.ZK_CONNECT_CONFIG, "localhost:2181") + props.setProperty(QuorumConfig.QUORUM_BOOTSTRAP_SERVERS_CONFIG, "localhost:9095") + props.setProperty(KRaftConfigs.CONTROLLER_LISTENER_NAMES_CONFIG, "CONTROLLER") // listeners with duplicate port props.setProperty(SocketServerConfigs.LISTENERS_CONFIG, "PLAINTEXT://localhost:9091,SSL://localhost:9091") @@ -199,7 +204,7 @@ class KafkaConfigTest { assertBadConfigContainingMessage(props, "Each listener must have a different name") // advertised listeners can have duplicate ports - props.setProperty(SocketServerConfigs.LISTENER_SECURITY_PROTOCOL_MAP_CONFIG, "HOST:SASL_SSL,LB:SASL_SSL") + props.setProperty(SocketServerConfigs.LISTENER_SECURITY_PROTOCOL_MAP_CONFIG, "HOST:SASL_SSL,LB:SASL_SSL,CONTROLLER:SASL_SSL") props.setProperty(ReplicationConfigs.INTER_BROKER_LISTENER_NAME_CONFIG, "HOST") props.setProperty(SocketServerConfigs.LISTENERS_CONFIG, "HOST://localhost:9091,LB://localhost:9092") props.setProperty(SocketServerConfigs.ADVERTISED_LISTENERS_CONFIG, "HOST://localhost:9091,LB://localhost:9091") @@ -213,8 +218,11 @@ class KafkaConfigTest { @Test def testIPv4AndIPv6SamePortListeners(): Unit = { val props = new Properties() + props.setProperty(KRaftConfigs.PROCESS_ROLES_CONFIG, "broker") props.put(ServerConfigs.BROKER_ID_CONFIG, "1") - props.put(ZkConfigs.ZK_CONNECT_CONFIG, "localhost:2181") + props.setProperty(QuorumConfig.QUORUM_BOOTSTRAP_SERVERS_CONFIG, "CONTROLLER://localhost:9091") + props.setProperty(KRaftConfigs.CONTROLLER_LISTENER_NAMES_CONFIG, "CONTROLLER") + props.setProperty(SocketServerConfigs.LISTENER_SECURITY_PROTOCOL_MAP_CONFIG, SocketServerConfigs.LISTENER_SECURITY_PROTOCOL_MAP_DEFAULT + ",CONTROLLER:PLAINTEXT") props.put(SocketServerConfigs.LISTENERS_CONFIG, "PLAINTEXT://[::1]:9092,SSL://[::1]:9092") var caught = assertThrows(classOf[IllegalArgumentException], () => KafkaConfig.fromProps(props)) @@ -464,24 +472,13 @@ class KafkaConfigTest { KafkaConfig.fromProps(props).effectiveListenerSecurityProtocolMap.get(new ListenerName("CONTROLLER2"))) } - @Test - def testControllerListenerNameDoesNotMapToPlaintextByDefaultForNonKRaft(): Unit = { - val props = new Properties() - props.setProperty(ServerConfigs.BROKER_ID_CONFIG, "1") - props.setProperty(ZkConfigs.ZK_CONNECT_CONFIG, "localhost:2181") - props.setProperty(SocketServerConfigs.LISTENERS_CONFIG, "CONTROLLER://localhost:9092") - assertBadConfigContainingMessage(props, - "Error creating broker listeners from 'CONTROLLER://localhost:9092': No security protocol defined for listener CONTROLLER") - // Valid now - props.setProperty(SocketServerConfigs.LISTENERS_CONFIG, "PLAINTEXT://localhost:9092") - assertEquals(None, KafkaConfig.fromProps(props).effectiveListenerSecurityProtocolMap.get(new ListenerName("CONTROLLER"))) - } - @Test def testBadListenerProtocol(): Unit = { val props = new Properties() + props.setProperty(KRaftConfigs.PROCESS_ROLES_CONFIG, "broker") props.setProperty(ServerConfigs.BROKER_ID_CONFIG, "1") - props.setProperty(ZkConfigs.ZK_CONNECT_CONFIG, "localhost:2181") + props.setProperty(QuorumConfig.QUORUM_BOOTSTRAP_SERVERS_CONFIG, "CONTROLLER://localhost:9092") + props.setProperty(KRaftConfigs.CONTROLLER_LISTENER_NAMES_CONFIG, "CONTROLLER") props.setProperty(SocketServerConfigs.LISTENERS_CONFIG, "BAD://localhost:9091") assertFalse(isValidKafkaConfig(props)) @@ -490,11 +487,13 @@ class KafkaConfigTest { @Test def testListenerNamesWithAdvertisedListenerUnset(): Unit = { val props = new Properties() + props.setProperty(KRaftConfigs.PROCESS_ROLES_CONFIG, "broker") props.setProperty(ServerConfigs.BROKER_ID_CONFIG, "1") - props.setProperty(ZkConfigs.ZK_CONNECT_CONFIG, "localhost:2181") + props.setProperty(QuorumConfig.QUORUM_BOOTSTRAP_SERVERS_CONFIG, "CONTROLLER://localhost:9092") + props.setProperty(KRaftConfigs.CONTROLLER_LISTENER_NAMES_CONFIG, "CONTROLLER") props.setProperty(SocketServerConfigs.LISTENERS_CONFIG, "CLIENT://localhost:9091,REPLICATION://localhost:9092,INTERNAL://localhost:9093") - props.setProperty(SocketServerConfigs.LISTENER_SECURITY_PROTOCOL_MAP_CONFIG, "CLIENT:SSL,REPLICATION:SSL,INTERNAL:PLAINTEXT") + props.setProperty(SocketServerConfigs.LISTENER_SECURITY_PROTOCOL_MAP_CONFIG, "CLIENT:SSL,REPLICATION:SSL,INTERNAL:PLAINTEXT,CONTROLLER:PLAINTEXT") props.setProperty(ReplicationConfigs.INTER_BROKER_LISTENER_NAME_CONFIG, "REPLICATION") val config = KafkaConfig.fromProps(props) val expectedListeners = Seq( @@ -506,7 +505,8 @@ class KafkaConfigTest { val expectedSecurityProtocolMap = Map( new ListenerName("CLIENT") -> SecurityProtocol.SSL, new ListenerName("REPLICATION") -> SecurityProtocol.SSL, - new ListenerName("INTERNAL") -> SecurityProtocol.PLAINTEXT + new ListenerName("INTERNAL") -> SecurityProtocol.PLAINTEXT, + new ListenerName("CONTROLLER") -> SecurityProtocol.PLAINTEXT ) assertEquals(expectedSecurityProtocolMap, config.effectiveListenerSecurityProtocolMap) } @@ -514,12 +514,14 @@ class KafkaConfigTest { @Test def testListenerAndAdvertisedListenerNames(): Unit = { val props = new Properties() + props.setProperty(KRaftConfigs.PROCESS_ROLES_CONFIG, "broker") props.setProperty(ServerConfigs.BROKER_ID_CONFIG, "1") - props.setProperty(ZkConfigs.ZK_CONNECT_CONFIG, "localhost:2181") + props.setProperty(QuorumConfig.QUORUM_BOOTSTRAP_SERVERS_CONFIG, "CONTROLLER://localhost:9092") + props.setProperty(KRaftConfigs.CONTROLLER_LISTENER_NAMES_CONFIG, "CONTROLLER") props.setProperty(SocketServerConfigs.LISTENERS_CONFIG, "EXTERNAL://localhost:9091,INTERNAL://localhost:9093") props.setProperty(SocketServerConfigs.ADVERTISED_LISTENERS_CONFIG, "EXTERNAL://lb1.example.com:9000,INTERNAL://host1:9093") - props.setProperty(SocketServerConfigs.LISTENER_SECURITY_PROTOCOL_MAP_CONFIG, "EXTERNAL:SSL,INTERNAL:PLAINTEXT") + props.setProperty(SocketServerConfigs.LISTENER_SECURITY_PROTOCOL_MAP_CONFIG, "EXTERNAL:SSL,INTERNAL:PLAINTEXT,CONTROLLER:PLAINTEXT") props.setProperty(ReplicationConfigs.INTER_BROKER_LISTENER_NAME_CONFIG, "INTERNAL") val config = KafkaConfig.fromProps(props) @@ -537,7 +539,8 @@ class KafkaConfigTest { val expectedSecurityProtocolMap = Map( new ListenerName("EXTERNAL") -> SecurityProtocol.SSL, - new ListenerName("INTERNAL") -> SecurityProtocol.PLAINTEXT + new ListenerName("INTERNAL") -> SecurityProtocol.PLAINTEXT, + new ListenerName("CONTROLLER") -> SecurityProtocol.PLAINTEXT ) assertEquals(expectedSecurityProtocolMap, config.effectiveListenerSecurityProtocolMap) } @@ -579,9 +582,12 @@ class KafkaConfigTest { @Test def testCaseInsensitiveListenerProtocol(): Unit = { val props = new Properties() + props.setProperty(KRaftConfigs.PROCESS_ROLES_CONFIG, "broker") props.setProperty(ServerConfigs.BROKER_ID_CONFIG, "1") - props.setProperty(ZkConfigs.ZK_CONNECT_CONFIG, "localhost:2181") + props.setProperty(QuorumConfig.QUORUM_BOOTSTRAP_SERVERS_CONFIG, "CONTROLLER://localhost:9093") + props.setProperty(KRaftConfigs.CONTROLLER_LISTENER_NAMES_CONFIG, "CONTROLLER") props.setProperty(SocketServerConfigs.LISTENERS_CONFIG, "plaintext://localhost:9091,SsL://localhost:9092") + props.setProperty(SocketServerConfigs.LISTENER_SECURITY_PROTOCOL_MAP_CONFIG, "PLAINTEXT:PLAINTEXT,SSL:SSL,CONTROLLER:PLAINTEXT") val config = KafkaConfig.fromProps(props) assertEquals(Some("SSL://localhost:9092"), config.listeners.find(_.listenerName.value == "SSL").map(_.connectionString)) assertEquals(Some("PLAINTEXT://localhost:9091"), config.listeners.find(_.listenerName.value == "PLAINTEXT").map(_.connectionString)) @@ -594,8 +600,10 @@ class KafkaConfigTest { @Test def testListenerDefaults(): Unit = { val props = new Properties() + props.setProperty(KRaftConfigs.PROCESS_ROLES_CONFIG, "broker") props.setProperty(ServerConfigs.BROKER_ID_CONFIG, "1") - props.setProperty(ZkConfigs.ZK_CONNECT_CONFIG, "localhost:2181") + props.setProperty(QuorumConfig.QUORUM_BOOTSTRAP_SERVERS_CONFIG, "localhost:9093") + props.setProperty(KRaftConfigs.CONTROLLER_LISTENER_NAMES_CONFIG, "CONTROLLER") // configuration with no listeners val conf = KafkaConfig.fromProps(props) @@ -607,10 +615,12 @@ class KafkaConfigTest { @Test def testVersionConfiguration(): Unit = { val props = new Properties() + props.setProperty(KRaftConfigs.PROCESS_ROLES_CONFIG, "broker") props.setProperty(ServerConfigs.BROKER_ID_CONFIG, "1") - props.setProperty(ZkConfigs.ZK_CONNECT_CONFIG, "localhost:2181") + props.setProperty(QuorumConfig.QUORUM_BOOTSTRAP_SERVERS_CONFIG, "localhost:9092") + props.setProperty(KRaftConfigs.CONTROLLER_LISTENER_NAMES_CONFIG, "CONTROLLER") val conf = KafkaConfig.fromProps(props) - assertEquals(MetadataVersion.latestProduction, conf.interBrokerProtocolVersion) + assertEquals(MetadataVersion.MINIMUM_KRAFT_VERSION, conf.interBrokerProtocolVersion) props.setProperty(ReplicationConfigs.INTER_BROKER_PROTOCOL_VERSION_CONFIG, "3.0.0-IV1") val conf2 = KafkaConfig.fromProps(props) @@ -773,7 +783,10 @@ class KafkaConfigTest { def testFromPropsInvalid(): Unit = { def baseProperties: Properties = { val validRequiredProperties = new Properties() - validRequiredProperties.setProperty(ZkConfigs.ZK_CONNECT_CONFIG, "127.0.0.1:2181") + validRequiredProperties.setProperty(KRaftConfigs.PROCESS_ROLES_CONFIG, "broker") + validRequiredProperties.setProperty(ServerConfigs.BROKER_ID_CONFIG, "1") + validRequiredProperties.setProperty(QuorumConfig.QUORUM_BOOTSTRAP_SERVERS_CONFIG, "localhost:9092") + validRequiredProperties.setProperty(KRaftConfigs.CONTROLLER_LISTENER_NAMES_CONFIG, "CONTROLLER") validRequiredProperties } // to ensure a basis is valid - bootstraps all needed validation @@ -1070,7 +1083,10 @@ class KafkaConfigTest { def testDynamicLogConfigs(): Unit = { def baseProperties: Properties = { val validRequiredProperties = new Properties() - validRequiredProperties.setProperty(ZkConfigs.ZK_CONNECT_CONFIG, "127.0.0.1:2181") + validRequiredProperties.setProperty(KRaftConfigs.PROCESS_ROLES_CONFIG, "broker") + validRequiredProperties.setProperty(ServerConfigs.BROKER_ID_CONFIG, "1") + validRequiredProperties.setProperty(QuorumConfig.QUORUM_BOOTSTRAP_SERVERS_CONFIG, "localhost:9093") + validRequiredProperties.setProperty(KRaftConfigs.CONTROLLER_LISTENER_NAMES_CONFIG, "CONTROLLER") validRequiredProperties } @@ -1161,7 +1177,9 @@ class KafkaConfigTest { @Test def testSpecificProperties(): Unit = { val defaults = new Properties() - defaults.setProperty(ZkConfigs.ZK_CONNECT_CONFIG, "127.0.0.1:2181") + defaults.setProperty(KRaftConfigs.PROCESS_ROLES_CONFIG, "broker") + defaults.setProperty(QuorumConfig.QUORUM_BOOTSTRAP_SERVERS_CONFIG, "CONTROLLER://localhost:9092") + defaults.setProperty(KRaftConfigs.CONTROLLER_LISTENER_NAMES_CONFIG, "CONTROLLER") // For ZkConnectionTimeoutMs defaults.setProperty(ZkConfigs.ZK_SESSION_TIMEOUT_MS_CONFIG, "1234") defaults.setProperty(ServerConfigs.BROKER_ID_GENERATION_ENABLE_CONFIG, "false") @@ -1180,7 +1198,6 @@ class KafkaConfigTest { defaults.setProperty(MetricConfigs.METRIC_RECORDING_LEVEL_CONFIG, Sensor.RecordingLevel.DEBUG.toString) val config = KafkaConfig.fromProps(defaults) - assertEquals("127.0.0.1:2181", config.zkConnect) assertEquals(1234, config.zkConnectionTimeoutMs) assertEquals(false, config.brokerIdGenerationEnable) assertEquals(1, config.maxReservedBrokerId) @@ -1207,9 +1224,12 @@ class KafkaConfigTest { @Test def testNonroutableAdvertisedListeners(): Unit = { val props = new Properties() - props.setProperty(ZkConfigs.ZK_CONNECT_CONFIG, "127.0.0.1:2181") + props.setProperty(KRaftConfigs.PROCESS_ROLES_CONFIG, "broker") + props.setProperty(ServerConfigs.BROKER_ID_CONFIG, "1") + props.setProperty(QuorumConfig.QUORUM_BOOTSTRAP_SERVERS_CONFIG, "CONTROLLER://localhost:9092") + props.setProperty(KRaftConfigs.CONTROLLER_LISTENER_NAMES_CONFIG, "CONTROLLER") props.setProperty(SocketServerConfigs.LISTENERS_CONFIG, "PLAINTEXT://0.0.0.0:9092") - assertFalse(isValidKafkaConfig(props)) + assertBadConfigContainingMessage(props, "advertised.listeners cannot use the nonroutable meta-address 0.0.0.0. Use a routable IP address.") } @Test @@ -1513,6 +1533,7 @@ class KafkaConfigTest { @Test def testNodeIdMustNotBeDifferentThanBrokerId(): Unit = { val props = new Properties() + props.setProperty(KRaftConfigs.PROCESS_ROLES_CONFIG, "broker") props.setProperty(ServerConfigs.BROKER_ID_CONFIG, "1") props.setProperty(KRaftConfigs.NODE_ID_CONFIG, "2") assertEquals("You must set `node.id` to the same value as `broker.id`.", @@ -1524,8 +1545,7 @@ class KafkaConfigTest { val props = new Properties() props.setProperty(KRaftConfigs.PROCESS_ROLES_CONFIG, "broker") props.setProperty(QuorumConfig.QUORUM_VOTERS_CONFIG, "2@localhost:9093") - assertEquals("Missing configuration `node.id` which is required when `process.roles` " + - "is defined (i.e. when running in KRaft mode).", + assertEquals("Missing required configuration \"node.id\" which has no default value.", assertThrows(classOf[ConfigException], () => KafkaConfig.fromProps(props)).getMessage()) } @@ -1568,7 +1588,10 @@ class KafkaConfigTest { @Test def testSaslJwksEndpointRetryDefaults(): Unit = { val props = new Properties() - props.setProperty(ZkConfigs.ZK_CONNECT_CONFIG, "localhost:2181") + props.setProperty(KRaftConfigs.PROCESS_ROLES_CONFIG, "broker") + props.setProperty(ServerConfigs.BROKER_ID_CONFIG, "1") + props.setProperty(QuorumConfig.QUORUM_BOOTSTRAP_SERVERS_CONFIG, "CONTROLLER://localhost:9092") + props.setProperty(KRaftConfigs.CONTROLLER_LISTENER_NAMES_CONFIG, "CONTROLLER") val config = KafkaConfig.fromProps(props) assertNotNull(config.getLong(SaslConfigs.SASL_OAUTHBEARER_JWKS_ENDPOINT_RETRY_BACKOFF_MS)) assertNotNull(config.getLong(SaslConfigs.SASL_OAUTHBEARER_JWKS_ENDPOINT_RETRY_BACKOFF_MAX_MS)) diff --git a/core/src/test/scala/unit/kafka/server/ServerTest.scala b/core/src/test/scala/unit/kafka/server/ServerTest.scala index 62345c446e2..4b2b900b375 100644 --- a/core/src/test/scala/unit/kafka/server/ServerTest.scala +++ b/core/src/test/scala/unit/kafka/server/ServerTest.scala @@ -20,7 +20,7 @@ import java.util.Properties import org.apache.kafka.common.Uuid import org.apache.kafka.common.metrics.MetricsContext import org.apache.kafka.raft.QuorumConfig -import org.apache.kafka.server.config.{KRaftConfigs, ServerConfigs, ZkConfigs} +import org.apache.kafka.server.config.KRaftConfigs import org.junit.jupiter.api.Assertions._ import org.junit.jupiter.api.Test @@ -47,23 +47,4 @@ class ServerTest { Server.NodeIdLabel -> nodeId.toString ), context.contextLabels.asScala) } - - @Test - def testCreateZkKafkaMetricsContext(): Unit = { - val brokerId = 0 - val clusterId = Uuid.randomUuid().toString - - val props = new Properties() - props.put(ServerConfigs.BROKER_ID_CONFIG, brokerId.toString) - props.put(ZkConfigs.ZK_CONNECT_CONFIG, "127.0.0.1:0") - val config = KafkaConfig.fromProps(props) - - val context = Server.createKafkaMetricsContext(config, clusterId) - assertEquals(Map( - MetricsContext.NAMESPACE -> Server.MetricsPrefix, - Server.ClusterIdLabel -> clusterId, - Server.BrokerIdLabel -> brokerId.toString - ), context.contextLabels.asScala) - } - } diff --git a/server/src/main/java/org/apache/kafka/server/config/KRaftConfigs.java b/server/src/main/java/org/apache/kafka/server/config/KRaftConfigs.java index a693471b7b3..3f07c0b7547 100644 --- a/server/src/main/java/org/apache/kafka/server/config/KRaftConfigs.java +++ b/server/src/main/java/org/apache/kafka/server/config/KRaftConfigs.java @@ -21,7 +21,6 @@ import org.apache.kafka.common.config.SaslConfigs; import org.apache.kafka.common.record.Records; import org.apache.kafka.storage.internals.log.LogConfig; -import java.util.Collections; import java.util.concurrent.TimeUnit; import static org.apache.kafka.common.config.ConfigDef.Importance.HIGH; @@ -125,8 +124,8 @@ public class KRaftConfigs { public static final ConfigDef CONFIG_DEF = new ConfigDef() .define(METADATA_SNAPSHOT_MAX_NEW_RECORD_BYTES_CONFIG, LONG, METADATA_SNAPSHOT_MAX_NEW_RECORD_BYTES, atLeast(1), HIGH, METADATA_SNAPSHOT_MAX_NEW_RECORD_BYTES_DOC) .define(METADATA_SNAPSHOT_MAX_INTERVAL_MS_CONFIG, LONG, METADATA_SNAPSHOT_MAX_INTERVAL_MS_DEFAULT, atLeast(0), HIGH, METADATA_SNAPSHOT_MAX_INTERVAL_MS_DOC) - .define(PROCESS_ROLES_CONFIG, LIST, Collections.emptyList(), ConfigDef.ValidList.in("broker", "controller"), HIGH, PROCESS_ROLES_DOC) - .define(NODE_ID_CONFIG, INT, EMPTY_NODE_ID, null, HIGH, NODE_ID_DOC) + .define(PROCESS_ROLES_CONFIG, LIST, ConfigDef.NO_DEFAULT_VALUE, ConfigDef.ValidList.in("broker", "controller"), HIGH, PROCESS_ROLES_DOC) + .define(NODE_ID_CONFIG, INT, ConfigDef.NO_DEFAULT_VALUE, atLeast(0), HIGH, NODE_ID_DOC) .define(INITIAL_BROKER_REGISTRATION_TIMEOUT_MS_CONFIG, INT, INITIAL_BROKER_REGISTRATION_TIMEOUT_MS_DEFAULT, null, MEDIUM, INITIAL_BROKER_REGISTRATION_TIMEOUT_MS_DOC) .define(BROKER_HEARTBEAT_INTERVAL_MS_CONFIG, INT, BROKER_HEARTBEAT_INTERVAL_MS_DEFAULT, null, MEDIUM, BROKER_HEARTBEAT_INTERVAL_MS_DOC) .define(BROKER_SESSION_TIMEOUT_MS_CONFIG, INT, BROKER_SESSION_TIMEOUT_MS_DEFAULT, null, MEDIUM, BROKER_SESSION_TIMEOUT_MS_DOC)