This is an automated email from the ASF dual-hosted git repository.

cmccabe pushed a commit to branch 4.0
in repository https://gitbox.apache.org/repos/asf/kafka.git


The following commit(s) were added to refs/heads/4.0 by this push:
     new 32064d5b42e KAFKA-18305: validate controller.listener.names is not in 
inter.broker.listener.name for kcontrollers (#18222)
32064d5b42e is described below

commit 32064d5b42ebbc17df75745021ddd746ee7f655d
Author: kevin-wu24 <[email protected]>
AuthorDate: Thu Dec 19 15:29:38 2024 -0600

    KAFKA-18305: validate controller.listener.names is not in 
inter.broker.listener.name for kcontrollers (#18222)
    
    When inter.broker.listener is explicitly set, validate that it is not in 
the set of controller.listener.names.
    
    Reviewers: Colin P. McCabe <[email protected]>, David Arthur 
<[email protected]>
---
 core/src/main/scala/kafka/server/KafkaConfig.scala      |  7 +++++++
 .../scala/unit/kafka/server/ControllerApisTest.scala    |  4 +++-
 .../test/scala/unit/kafka/server/KafkaConfigTest.scala  | 17 +++++++++++++++++
 .../test/scala/unit/kafka/tools/StorageToolTest.scala   |  4 +++-
 4 files changed, 30 insertions(+), 2 deletions(-)

diff --git a/core/src/main/scala/kafka/server/KafkaConfig.scala 
b/core/src/main/scala/kafka/server/KafkaConfig.scala
index 9c1942783db..ed117fde1c5 100755
--- a/core/src/main/scala/kafka/server/KafkaConfig.scala
+++ b/core/src/main/scala/kafka/server/KafkaConfig.scala
@@ -695,6 +695,13 @@ class KafkaConfig private(doLog: Boolean, val props: 
util.Map[_, _])
         effectiveAdvertisedControllerListeners.size == listeners.size,
         s"The ${SocketServerConfigs.LISTENERS_CONFIG} config must only contain 
KRaft controller listeners from 
${KRaftConfigs.CONTROLLER_LISTENER_NAMES_CONFIG} when 
${KRaftConfigs.PROCESS_ROLES_CONFIG}=controller"
       )
+      // controller.listener.names must not contain inter.broker.listener.name 
when inter.broker.listener.name is explicitly set
+      if 
(Option(getString(ReplicationConfigs.INTER_BROKER_LISTENER_NAME_CONFIG)).isDefined)
 {
+        require(
+          !controllerListenerNames.contains(interBrokerListenerName.value()),
+          s"${KRaftConfigs.CONTROLLER_LISTENER_NAMES_CONFIG} must not contain 
an explicitly set ${ReplicationConfigs.INTER_BROKER_LISTENER_NAME_CONFIG} 
configuration value when ${KRaftConfigs.PROCESS_ROLES_CONFIG}=controller'"
+        )
+      }
       validateControllerQuorumVotersMustContainNodeIdForKRaftController()
       validateAdvertisedControllerListenersNonEmptyForKRaftController()
       validateControllerListenerNamesMustAppearInListenersForKRaftController()
diff --git a/core/src/test/scala/unit/kafka/server/ControllerApisTest.scala 
b/core/src/test/scala/unit/kafka/server/ControllerApisTest.scala
index e8427bc10db..f2324bc7702 100644
--- a/core/src/test/scala/unit/kafka/server/ControllerApisTest.scala
+++ b/core/src/test/scala/unit/kafka/server/ControllerApisTest.scala
@@ -52,6 +52,7 @@ import org.apache.kafka.common.{ElectionType, Uuid}
 import 
org.apache.kafka.controller.ControllerRequestContextUtil.ANONYMOUS_CONTEXT
 import org.apache.kafka.controller.{Controller, ControllerRequestContext, 
ResultOrError}
 import org.apache.kafka.image.publisher.ControllerRegistrationsPublisher
+import org.apache.kafka.network.SocketServerConfigs
 import org.apache.kafka.network.metrics.RequestChannelMetrics
 import org.apache.kafka.raft.QuorumConfig
 import org.apache.kafka.server.authorizer.{Action, AuthorizableRequestContext, 
AuthorizationResult, Authorizer}
@@ -155,7 +156,8 @@ class ControllerApisTest {
                                    throttle: Boolean = false): ControllerApis 
= {
     props.put(KRaftConfigs.NODE_ID_CONFIG, nodeId: java.lang.Integer)
     props.put(KRaftConfigs.PROCESS_ROLES_CONFIG, "controller")
-    props.put(KRaftConfigs.CONTROLLER_LISTENER_NAMES_CONFIG, "PLAINTEXT")
+    props.put(KRaftConfigs.CONTROLLER_LISTENER_NAMES_CONFIG, "CONTROLLER")
+    props.put(SocketServerConfigs.LISTENERS_CONFIG, "CONTROLLER://:9092")
     props.put(QuorumConfig.QUORUM_VOTERS_CONFIG, s"$nodeId@localhost:9092")
     new ControllerApis(
       requestChannel,
diff --git a/core/src/test/scala/unit/kafka/server/KafkaConfigTest.scala 
b/core/src/test/scala/unit/kafka/server/KafkaConfigTest.scala
index 23bbe1b6e98..f2f987cb214 100755
--- a/core/src/test/scala/unit/kafka/server/KafkaConfigTest.scala
+++ b/core/src/test/scala/unit/kafka/server/KafkaConfigTest.scala
@@ -1315,6 +1315,23 @@ class KafkaConfigTest {
     KafkaConfig.fromProps(props)
   }
 
+  @Test
+  def testControllerListenerNamesValidForKRaftControllerOnly(): Unit = {
+    val props = new Properties()
+    props.setProperty(KRaftConfigs.NODE_ID_CONFIG, "1")
+    props.setProperty(KRaftConfigs.PROCESS_ROLES_CONFIG, "controller")
+    props.setProperty(QuorumConfig.QUORUM_VOTERS_CONFIG, "1@localhost:9092")
+    props.setProperty(SocketServerConfigs.LISTENERS_CONFIG, 
"SASL_SSL://:9092,CONTROLLER://:9093")
+    
props.setProperty(SocketServerConfigs.LISTENER_SECURITY_PROTOCOL_MAP_CONFIG, 
"SASL_SSL:SASL_SSL,CONTROLLER:SASL_SSL")
+    props.put(ReplicationConfigs.INTER_BROKER_LISTENER_NAME_CONFIG, "SASL_SSL")
+    props.put(KRaftConfigs.CONTROLLER_LISTENER_NAMES_CONFIG, 
"CONTROLLER,SASL_SSL")
+
+    val expectedExceptionContainsText =
+      """controller.listener.names must not contain an explicitly set 
inter.broker.listener.name configuration value
+        |when process.roles=controller""".stripMargin.replaceAll("\n", " ")
+    assertBadConfigContainingMessage(props, expectedExceptionContainsText)
+  }
+
   @Test
   def testControllerQuorumVoterStringsToNodes(): Unit = {
     assertThrows(classOf[ConfigException], () => 
QuorumConfig.quorumVoterStringsToNodes(Collections.singletonList("")))
diff --git a/core/src/test/scala/unit/kafka/tools/StorageToolTest.scala 
b/core/src/test/scala/unit/kafka/tools/StorageToolTest.scala
index edfb0c68c0f..9355a65ceb2 100644
--- a/core/src/test/scala/unit/kafka/tools/StorageToolTest.scala
+++ b/core/src/test/scala/unit/kafka/tools/StorageToolTest.scala
@@ -31,6 +31,7 @@ import org.apache.kafka.server.common.{Feature, 
MetadataVersion}
 import org.apache.kafka.metadata.bootstrap.BootstrapDirectory
 import org.apache.kafka.metadata.properties.{MetaPropertiesEnsemble, 
PropertiesUtils}
 import org.apache.kafka.metadata.storage.FormatterException
+import org.apache.kafka.network.SocketServerConfigs
 import org.apache.kafka.raft.QuorumConfig
 import org.apache.kafka.server.config.{KRaftConfigs, ServerConfigs, 
ServerLogConfigs}
 import org.junit.jupiter.api.Assertions.{assertEquals, assertFalse, 
assertThrows, assertTrue}
@@ -50,7 +51,8 @@ class StorageToolTest {
     properties.setProperty(KRaftConfigs.PROCESS_ROLES_CONFIG, "controller")
     properties.setProperty(KRaftConfigs.NODE_ID_CONFIG, "2")
     properties.setProperty(QuorumConfig.QUORUM_VOTERS_CONFIG, 
s"2@localhost:9092")
-    properties.setProperty(KRaftConfigs.CONTROLLER_LISTENER_NAMES_CONFIG, 
"PLAINTEXT")
+    properties.put(KRaftConfigs.CONTROLLER_LISTENER_NAMES_CONFIG, "CONTROLLER")
+    properties.put(SocketServerConfigs.LISTENERS_CONFIG, "CONTROLLER://:9092")
     properties
   }
 

Reply via email to