This is an automated email from the ASF dual-hosted git repository.
cmccabe pushed a commit to branch 4.0
in repository https://gitbox.apache.org/repos/asf/kafka.git
The following commit(s) were added to refs/heads/4.0 by this push:
new 32064d5b42e KAFKA-18305: validate controller.listener.names is not in
inter.broker.listener.name for kcontrollers (#18222)
32064d5b42e is described below
commit 32064d5b42ebbc17df75745021ddd746ee7f655d
Author: kevin-wu24 <[email protected]>
AuthorDate: Thu Dec 19 15:29:38 2024 -0600
KAFKA-18305: validate controller.listener.names is not in
inter.broker.listener.name for kcontrollers (#18222)
When inter.broker.listener is explicitly set, validate that it is not in
the set of controller.listener.names.
Reviewers: Colin P. McCabe <[email protected]>, David Arthur
<[email protected]>
---
core/src/main/scala/kafka/server/KafkaConfig.scala | 7 +++++++
.../scala/unit/kafka/server/ControllerApisTest.scala | 4 +++-
.../test/scala/unit/kafka/server/KafkaConfigTest.scala | 17 +++++++++++++++++
.../test/scala/unit/kafka/tools/StorageToolTest.scala | 4 +++-
4 files changed, 30 insertions(+), 2 deletions(-)
diff --git a/core/src/main/scala/kafka/server/KafkaConfig.scala
b/core/src/main/scala/kafka/server/KafkaConfig.scala
index 9c1942783db..ed117fde1c5 100755
--- a/core/src/main/scala/kafka/server/KafkaConfig.scala
+++ b/core/src/main/scala/kafka/server/KafkaConfig.scala
@@ -695,6 +695,13 @@ class KafkaConfig private(doLog: Boolean, val props:
util.Map[_, _])
effectiveAdvertisedControllerListeners.size == listeners.size,
s"The ${SocketServerConfigs.LISTENERS_CONFIG} config must only contain
KRaft controller listeners from
${KRaftConfigs.CONTROLLER_LISTENER_NAMES_CONFIG} when
${KRaftConfigs.PROCESS_ROLES_CONFIG}=controller"
)
+ // controller.listener.names must not contain inter.broker.listener.name
when inter.broker.listener.name is explicitly set
+ if
(Option(getString(ReplicationConfigs.INTER_BROKER_LISTENER_NAME_CONFIG)).isDefined)
{
+ require(
+ !controllerListenerNames.contains(interBrokerListenerName.value()),
+ s"${KRaftConfigs.CONTROLLER_LISTENER_NAMES_CONFIG} must not contain
an explicitly set ${ReplicationConfigs.INTER_BROKER_LISTENER_NAME_CONFIG}
configuration value when ${KRaftConfigs.PROCESS_ROLES_CONFIG}=controller'"
+ )
+ }
validateControllerQuorumVotersMustContainNodeIdForKRaftController()
validateAdvertisedControllerListenersNonEmptyForKRaftController()
validateControllerListenerNamesMustAppearInListenersForKRaftController()
diff --git a/core/src/test/scala/unit/kafka/server/ControllerApisTest.scala
b/core/src/test/scala/unit/kafka/server/ControllerApisTest.scala
index e8427bc10db..f2324bc7702 100644
--- a/core/src/test/scala/unit/kafka/server/ControllerApisTest.scala
+++ b/core/src/test/scala/unit/kafka/server/ControllerApisTest.scala
@@ -52,6 +52,7 @@ import org.apache.kafka.common.{ElectionType, Uuid}
import
org.apache.kafka.controller.ControllerRequestContextUtil.ANONYMOUS_CONTEXT
import org.apache.kafka.controller.{Controller, ControllerRequestContext,
ResultOrError}
import org.apache.kafka.image.publisher.ControllerRegistrationsPublisher
+import org.apache.kafka.network.SocketServerConfigs
import org.apache.kafka.network.metrics.RequestChannelMetrics
import org.apache.kafka.raft.QuorumConfig
import org.apache.kafka.server.authorizer.{Action, AuthorizableRequestContext,
AuthorizationResult, Authorizer}
@@ -155,7 +156,8 @@ class ControllerApisTest {
throttle: Boolean = false): ControllerApis
= {
props.put(KRaftConfigs.NODE_ID_CONFIG, nodeId: java.lang.Integer)
props.put(KRaftConfigs.PROCESS_ROLES_CONFIG, "controller")
- props.put(KRaftConfigs.CONTROLLER_LISTENER_NAMES_CONFIG, "PLAINTEXT")
+ props.put(KRaftConfigs.CONTROLLER_LISTENER_NAMES_CONFIG, "CONTROLLER")
+ props.put(SocketServerConfigs.LISTENERS_CONFIG, "CONTROLLER://:9092")
props.put(QuorumConfig.QUORUM_VOTERS_CONFIG, s"$nodeId@localhost:9092")
new ControllerApis(
requestChannel,
diff --git a/core/src/test/scala/unit/kafka/server/KafkaConfigTest.scala
b/core/src/test/scala/unit/kafka/server/KafkaConfigTest.scala
index 23bbe1b6e98..f2f987cb214 100755
--- a/core/src/test/scala/unit/kafka/server/KafkaConfigTest.scala
+++ b/core/src/test/scala/unit/kafka/server/KafkaConfigTest.scala
@@ -1315,6 +1315,23 @@ class KafkaConfigTest {
KafkaConfig.fromProps(props)
}
+ @Test
+ def testControllerListenerNamesValidForKRaftControllerOnly(): Unit = {
+ val props = new Properties()
+ props.setProperty(KRaftConfigs.NODE_ID_CONFIG, "1")
+ props.setProperty(KRaftConfigs.PROCESS_ROLES_CONFIG, "controller")
+ props.setProperty(QuorumConfig.QUORUM_VOTERS_CONFIG, "1@localhost:9092")
+ props.setProperty(SocketServerConfigs.LISTENERS_CONFIG,
"SASL_SSL://:9092,CONTROLLER://:9093")
+
props.setProperty(SocketServerConfigs.LISTENER_SECURITY_PROTOCOL_MAP_CONFIG,
"SASL_SSL:SASL_SSL,CONTROLLER:SASL_SSL")
+ props.put(ReplicationConfigs.INTER_BROKER_LISTENER_NAME_CONFIG, "SASL_SSL")
+ props.put(KRaftConfigs.CONTROLLER_LISTENER_NAMES_CONFIG,
"CONTROLLER,SASL_SSL")
+
+ val expectedExceptionContainsText =
+ """controller.listener.names must not contain an explicitly set
inter.broker.listener.name configuration value
+ |when process.roles=controller""".stripMargin.replaceAll("\n", " ")
+ assertBadConfigContainingMessage(props, expectedExceptionContainsText)
+ }
+
@Test
def testControllerQuorumVoterStringsToNodes(): Unit = {
assertThrows(classOf[ConfigException], () =>
QuorumConfig.quorumVoterStringsToNodes(Collections.singletonList("")))
diff --git a/core/src/test/scala/unit/kafka/tools/StorageToolTest.scala
b/core/src/test/scala/unit/kafka/tools/StorageToolTest.scala
index edfb0c68c0f..9355a65ceb2 100644
--- a/core/src/test/scala/unit/kafka/tools/StorageToolTest.scala
+++ b/core/src/test/scala/unit/kafka/tools/StorageToolTest.scala
@@ -31,6 +31,7 @@ import org.apache.kafka.server.common.{Feature,
MetadataVersion}
import org.apache.kafka.metadata.bootstrap.BootstrapDirectory
import org.apache.kafka.metadata.properties.{MetaPropertiesEnsemble,
PropertiesUtils}
import org.apache.kafka.metadata.storage.FormatterException
+import org.apache.kafka.network.SocketServerConfigs
import org.apache.kafka.raft.QuorumConfig
import org.apache.kafka.server.config.{KRaftConfigs, ServerConfigs,
ServerLogConfigs}
import org.junit.jupiter.api.Assertions.{assertEquals, assertFalse,
assertThrows, assertTrue}
@@ -50,7 +51,8 @@ class StorageToolTest {
properties.setProperty(KRaftConfigs.PROCESS_ROLES_CONFIG, "controller")
properties.setProperty(KRaftConfigs.NODE_ID_CONFIG, "2")
properties.setProperty(QuorumConfig.QUORUM_VOTERS_CONFIG,
s"2@localhost:9092")
- properties.setProperty(KRaftConfigs.CONTROLLER_LISTENER_NAMES_CONFIG,
"PLAINTEXT")
+ properties.put(KRaftConfigs.CONTROLLER_LISTENER_NAMES_CONFIG, "CONTROLLER")
+ properties.put(SocketServerConfigs.LISTENERS_CONFIG, "CONTROLLER://:9092")
properties
}