This is an automated email from the ASF dual-hosted git repository.

jsancio pushed a commit to branch 3.9
in repository https://gitbox.apache.org/repos/asf/kafka.git


The following commit(s) were added to refs/heads/3.9 by this push:
     new 4d0216dd9f0 KAFKA-18281: Kafka is improperly validating non-advertised 
listeners for routable controller addresses (#18387)
4d0216dd9f0 is described below

commit 4d0216dd9f0d1c125313e04521726f6934f056e0
Author: PoAn Yang <[email protected]>
AuthorDate: Tue Feb 25 10:51:28 2025 +0800

    KAFKA-18281: Kafka is improperly validating non-advertised listeners for 
routable controller addresses (#18387)
    
    When a cluster is configured with a dynamic controller quorum, KRaft 
replica's endpoint are computed using the advertised.listeners property and not 
the quorum.controller.voters property. This change in the configuration makes 
it difficult to keeping all previous node configurations compatible with the 
new endpoint discovery functionality.
    
    The least intrusive solution is to rely on Kafka's reverse hostname lookup 
when the hostname is not specified. The effective advertised controller 
listener now remove '0.0.0.0' hostname if the endpoint came from the listener 
configuration and not the advertised.listener configuration.
    
    Reviewers: José Armando García Sancio <[email protected]>, Alyssa Huang 
<[email protected]>
---
 core/src/main/scala/kafka/server/KafkaConfig.scala | 39 +++++++----
 .../scala/unit/kafka/server/KafkaConfigTest.scala  | 76 ++++++++++++++++++++++
 2 files changed, 103 insertions(+), 12 deletions(-)

diff --git a/core/src/main/scala/kafka/server/KafkaConfig.scala 
b/core/src/main/scala/kafka/server/KafkaConfig.scala
index 7992f833bc3..e22cc103408 100755
--- a/core/src/main/scala/kafka/server/KafkaConfig.scala
+++ b/core/src/main/scala/kafka/server/KafkaConfig.scala
@@ -747,17 +747,42 @@ class KafkaConfig private(doLog: Boolean, val props: 
util.Map[_, _])
   }
 
   def effectiveAdvertisedControllerListeners: Seq[EndPoint] = {
-    val controllerAdvertisedListeners = advertisedListeners.filter(l => 
controllerListenerNames.contains(l.listenerName.value()))
+    val advertisedListenersProp = 
getString(SocketServerConfigs.ADVERTISED_LISTENERS_CONFIG)
+    val controllerAdvertisedListeners = if (advertisedListenersProp != null) {
+      CoreUtils.listenerListToEndPoints(advertisedListenersProp, 
effectiveListenerSecurityProtocolMap, requireDistinctPorts=false)
+        .filter(l => controllerListenerNames.contains(l.listenerName.value()))
+    } else {
+      Seq.empty
+    }
     val controllerListenersValue = controllerListeners
 
     controllerListenerNames.flatMap { name =>
       controllerAdvertisedListeners
         .find(endpoint => 
endpoint.listenerName.equals(ListenerName.normalised(name)))
-        .orElse(controllerListenersValue.find(endpoint => 
endpoint.listenerName.equals(ListenerName.normalised(name))))
+        .orElse(
+          // If users don't define advertised.listeners, the advertised 
controller listeners inherit from listeners configuration
+          // which match listener names in controller.listener.names.
+          // Removing "0.0.0.0" host to avoid validation errors. This is to be 
compatible with the old behavior before 3.9.
+          // The null or "" host does a reverse lookup in 
ListenerInfo#withWildcardHostnamesResolved.
+          controllerListenersValue
+            .find(endpoint => 
endpoint.listenerName.equals(ListenerName.normalised(name)))
+            .map(endpoint => if (endpoint.host == "0.0.0.0") {
+              new EndPoint(null, endpoint.port, endpoint.listenerName, 
endpoint.securityProtocol)
+            } else {
+              endpoint
+            })
+        )
     }
   }
 
   def effectiveAdvertisedBrokerListeners: Seq[EndPoint] = {
+    val advertisedListenersProp = 
getString(SocketServerConfigs.ADVERTISED_LISTENERS_CONFIG)
+    val advertisedListeners = if (advertisedListenersProp != null) {
+      CoreUtils.listenerListToEndPoints(advertisedListenersProp, 
effectiveListenerSecurityProtocolMap, requireDistinctPorts=false)
+    } else {
+      listeners
+    }
+    // Only expose broker listeners
     advertisedListeners.filter(l => {
       if (!controllerListenerNames.contains(l.listenerName.value())) {
         true
@@ -771,16 +796,6 @@ class KafkaConfig private(doLog: Boolean, val props: 
util.Map[_, _])
     })
   }
 
-  // Use advertised listeners if defined, fallback to listeners otherwise
-  private def advertisedListeners: Seq[EndPoint] = {
-    val advertisedListenersProp = 
getString(SocketServerConfigs.ADVERTISED_LISTENERS_CONFIG)
-    if (advertisedListenersProp != null) {
-      CoreUtils.listenerListToEndPoints(advertisedListenersProp, 
effectiveListenerSecurityProtocolMap, requireDistinctPorts=false)
-    } else {
-      listeners
-    }
-  }
-
   private def getInterBrokerListenerNameAndSecurityProtocol: (ListenerName, 
SecurityProtocol) = {
     Option(getString(ReplicationConfigs.INTER_BROKER_LISTENER_NAME_CONFIG)) 
match {
       case Some(_) if 
originals.containsKey(ReplicationConfigs.INTER_BROKER_SECURITY_PROTOCOL_CONFIG) 
=>
diff --git a/core/src/test/scala/unit/kafka/server/KafkaConfigTest.scala 
b/core/src/test/scala/unit/kafka/server/KafkaConfigTest.scala
index 541080aee8a..9c924297c12 100755
--- a/core/src/test/scala/unit/kafka/server/KafkaConfigTest.scala
+++ b/core/src/test/scala/unit/kafka/server/KafkaConfigTest.scala
@@ -1350,6 +1350,82 @@ class KafkaConfigTest {
     KafkaConfig.fromProps(props)
   }
 
+  @Test
+  def testImplicitAllBindingListenersCanBeAdvertisedForBroker(): Unit = {
+    val props = new Properties()
+    props.setProperty(KRaftConfigs.PROCESS_ROLES_CONFIG, "broker")
+    val listeners = "PLAINTEXT://:9092"
+    props.setProperty(SocketServerConfigs.LISTENERS_CONFIG, listeners)
+    props.setProperty(SocketServerConfigs.ADVERTISED_LISTENERS_CONFIG, 
listeners) // explicitly setting it in broker
+    props.setProperty(KRaftConfigs.CONTROLLER_LISTENER_NAMES_CONFIG, 
"CONTROLLER")
+    props.setProperty(KRaftConfigs.NODE_ID_CONFIG, "2")
+    props.setProperty(QuorumConfig.QUORUM_VOTERS_CONFIG, "1@localhost:9093")
+
+    // Valid
+    KafkaConfig.fromProps(props)
+
+    // Also valid if we allow advertised listeners to derive from listeners
+    props.remove(SocketServerConfigs.ADVERTISED_LISTENERS_CONFIG)
+    KafkaConfig.fromProps(props)
+  }
+
+  @Test
+  def testExplicitAllBindingListenersCannotBeUsedForBroker(): Unit = {
+    val props = new Properties()
+    props.setProperty(KRaftConfigs.PROCESS_ROLES_CONFIG, "broker")
+    val listeners = "PLAINTEXT://0.0.0.0:9092"
+    props.setProperty(SocketServerConfigs.LISTENERS_CONFIG, listeners)
+    props.setProperty(SocketServerConfigs.ADVERTISED_LISTENERS_CONFIG, 
listeners) // explicitly setting it in KRaft
+    props.setProperty(KRaftConfigs.CONTROLLER_LISTENER_NAMES_CONFIG, 
"CONTROLLER")
+    props.setProperty(KRaftConfigs.NODE_ID_CONFIG, "2")
+    props.setProperty(QuorumConfig.QUORUM_VOTERS_CONFIG, "1@localhost:9093")
+
+    val expectedExceptionContainsText = "advertised.listeners cannot use the 
nonroutable meta-address 0.0.0.0. Use a routable IP address."
+    assertBadConfigContainingMessage(props, expectedExceptionContainsText)
+
+    // invalid if we allow advertised listeners to derive from listeners
+    props.remove(SocketServerConfigs.ADVERTISED_LISTENERS_CONFIG)
+    assertBadConfigContainingMessage(props, expectedExceptionContainsText)
+  }
+
+  @Test
+  def 
testImplicitAllBindingControllerListenersCanBeAdvertisedForKRaftController(): 
Unit = {
+    val props = new Properties()
+    props.setProperty(KRaftConfigs.PROCESS_ROLES_CONFIG, "controller")
+    val listeners = "CONTROLLER://:9093"
+    props.setProperty(SocketServerConfigs.LISTENERS_CONFIG, listeners)
+    props.setProperty(SocketServerConfigs.ADVERTISED_LISTENERS_CONFIG, 
listeners) // explicitly setting it in KRaft
+    props.setProperty(KRaftConfigs.CONTROLLER_LISTENER_NAMES_CONFIG, 
"CONTROLLER")
+    props.setProperty(KRaftConfigs.NODE_ID_CONFIG, "2")
+    props.setProperty(QuorumConfig.QUORUM_VOTERS_CONFIG, "2@localhost:9093")
+
+    // Valid
+    KafkaConfig.fromProps(props)
+
+    // Also valid if we allow advertised listeners to derive from 
listeners/controller.listener.names
+    props.remove(SocketServerConfigs.ADVERTISED_LISTENERS_CONFIG)
+    KafkaConfig.fromProps(props)
+  }
+
+  @Test
+  def 
testExplicitAllBindingControllerListenersCanBeAdvertisedForKRaftController(): 
Unit = {
+    val props = new Properties()
+    props.setProperty(KRaftConfigs.PROCESS_ROLES_CONFIG, "controller")
+    val listeners = "CONTROLLER://0.0.0.0:9093"
+    props.setProperty(SocketServerConfigs.LISTENERS_CONFIG, listeners)
+    props.setProperty(SocketServerConfigs.ADVERTISED_LISTENERS_CONFIG, 
listeners) // explicitly setting it in KRaft
+    props.setProperty(KRaftConfigs.CONTROLLER_LISTENER_NAMES_CONFIG, 
"CONTROLLER")
+    props.setProperty(KRaftConfigs.NODE_ID_CONFIG, "2")
+    props.setProperty(QuorumConfig.QUORUM_VOTERS_CONFIG, "2@localhost:9093")
+
+    val expectedExceptionContainsText = "advertised.listeners cannot use the 
nonroutable meta-address 0.0.0.0. Use a routable IP address."
+    assertBadConfigContainingMessage(props, expectedExceptionContainsText)
+
+    // Valid if we allow advertised listeners to derive from 
listeners/controller.listener.names
+    props.remove(SocketServerConfigs.ADVERTISED_LISTENERS_CONFIG)
+    KafkaConfig.fromProps(props)
+  }
+
   @Test
   def testControllerListenersCanBeAdvertisedForKRaftCombined(): Unit = {
     val props = new Properties()

Reply via email to