This is an automated email from the ASF dual-hosted git repository.

jsancio pushed a commit to branch 4.0
in repository https://gitbox.apache.org/repos/asf/kafka.git


The following commit(s) were added to refs/heads/4.0 by this push:
     new 56cfd3b4b72 KAFKA-20380; backwards compatible advertised.listeners 
when it is not defined (#22218)
56cfd3b4b72 is described below

commit 56cfd3b4b725a22ef997a7ac398e0a076888038b
Author: Kevin Wu <[email protected]>
AuthorDate: Thu May 7 04:49:22 2026 -0700

    KAFKA-20380; backwards compatible advertised.listeners when it is not 
defined (#22218)
    
    Kafka can incorrectly resolve the advertised listener for controllers if
    it is not specified. For controller configurations that specify the
    controller.quorum.voters but not the advertised.listener, Kafka can
    deduce the advertise listener for the default listener. In this cases,
    Kafka will automatically set the advertised listener for the default
    listener to the endpoint specified in controller.quorum.voters.
    
    Reviewers: José Armando García Sancio <[email protected]>
    Co-authored-by: José Armando García Sancio <[email protected]>
---
 core/src/main/scala/kafka/server/KafkaConfig.scala | 49 ++++++++++++++++------
 .../scala/unit/kafka/server/KafkaConfigTest.scala  | 32 ++++++++++----
 .../java/org/apache/kafka/raft/QuorumConfig.java   |  2 +-
 3 files changed, 62 insertions(+), 21 deletions(-)

diff --git a/core/src/main/scala/kafka/server/KafkaConfig.scala 
b/core/src/main/scala/kafka/server/KafkaConfig.scala
index 37f60691c6e..f3208d3c4f1 100755
--- a/core/src/main/scala/kafka/server/KafkaConfig.scala
+++ b/core/src/main/scala/kafka/server/KafkaConfig.scala
@@ -528,22 +528,47 @@ class KafkaConfig private(doLog: Boolean, val props: 
util.Map[_, _])
     }
     val controllerListenersValue = controllerListeners
 
-    controllerListenerNames.flatMap { name =>
+    def nameToEndpoint(name: String, isDefault: Boolean): Option[EndPoint] = {
       controllerAdvertisedListeners
         .find(endpoint => 
endpoint.listenerName.equals(ListenerName.normalised(name)))
-        .orElse(
-          // If users don't define advertised.listeners, the advertised 
controller listeners inherit from listeners configuration
-          // which match listener names in controller.listener.names.
-          // Removing "0.0.0.0" host to avoid validation errors. This is to be 
compatible with the old behavior before 3.9.
-          // The null or "" host does a reverse lookup in 
ListenerInfo#withWildcardHostnamesResolved.
+        .orElse {
           controllerListenersValue
             .find(endpoint => 
endpoint.listenerName.equals(ListenerName.normalised(name)))
-            .map(endpoint => if (endpoint.host == "0.0.0.0") {
-              new EndPoint(null, endpoint.port, endpoint.listenerName, 
endpoint.securityProtocol)
-            } else {
-              endpoint
-            })
-        )
+            .map { endpoint =>
+              val voterListenerOverride = {
+                // the user did not provide an advertised listener for the 
default controller listener;
+                // if controller.quorum.voters defines an endpoint for this 
node, use that as the advertised listener
+                if (isDefault && (endpoint.host == null || endpoint.host == 
"0.0.0.0")) {
+                  val votersAddress = 
QuorumConfig.parseVoterConnections(quorumConfig.voters).asScala.get(nodeId)
+                  votersAddress.map { socketAddress =>
+                    new EndPoint(
+                      socketAddress.getHostString,
+                      socketAddress.getPort,
+                      endpoint.listenerName,
+                      endpoint.securityProtocol
+                    )
+                  }
+                } else {
+                  None
+                }
+              }
+              voterListenerOverride.getOrElse {
+                // Removing "0.0.0.0" host to avoid validation errors.
+                // This is to be compatible with the old behavior before 3.9.
+                // The null or "" host does a reverse lookup in 
ListenerInfo#withWildcardHostnamesResolved.
+                if (endpoint.host == "0.0.0.0") {
+                  new EndPoint(null, endpoint.port, endpoint.listenerName, 
endpoint.securityProtocol)
+                } else {
+                  endpoint
+                }
+              }
+            }
+        }
+    }
+
+    controllerListenerNames.toList match {
+      case Nil => Nil
+      case head :: tail => nameToEndpoint(head, true).toList ++ 
tail.flatMap(nameToEndpoint(_, false))
     }
   }
 
diff --git a/core/src/test/scala/unit/kafka/server/KafkaConfigTest.scala 
b/core/src/test/scala/unit/kafka/server/KafkaConfigTest.scala
index 70111b5fde8..89de26cf47f 100755
--- a/core/src/test/scala/unit/kafka/server/KafkaConfigTest.scala
+++ b/core/src/test/scala/unit/kafka/server/KafkaConfigTest.scala
@@ -348,17 +348,33 @@ class KafkaConfigTest {
   }
 
   @Test
-  def 
testEffectAdvertiseControllerListenerForControllerWithoutAdvertisement(): Unit 
= {
+  def 
testEffectAdvertiseControllerListenerForControllerWithoutAdvertisementAndVotersConfig():
 Unit = {
     val props = new Properties()
     props.setProperty(KRaftConfigs.PROCESS_ROLES_CONFIG, "controller")
-    props.setProperty(SocketServerConfigs.LISTENERS_CONFIG, 
"CONTROLLER://localhost:9093")
     props.setProperty(KRaftConfigs.NODE_ID_CONFIG, "2")
-    props.setProperty(QuorumConfig.QUORUM_VOTERS_CONFIG, "2@localhost:9093")
     props.setProperty(KRaftConfigs.CONTROLLER_LISTENER_NAMES_CONFIG, 
"CONTROLLER")
+    props.setProperty(QuorumConfig.QUORUM_VOTERS_CONFIG, 
"[email protected]:9092")
+    props.setProperty(SocketServerConfigs.LISTENERS_CONFIG, 
"CONTROLLER://:9093")
 
     val config = KafkaConfig.fromProps(props)
     assertEquals(
-      Seq(EndPoint("localhost", 9093, ListenerName.normalised("CONTROLLER"), 
SecurityProtocol.PLAINTEXT)),
+      Seq(EndPoint("lb1.example.com", 9092, 
ListenerName.normalised("CONTROLLER"), SecurityProtocol.PLAINTEXT)),
+      config.effectiveAdvertisedControllerListeners
+    )
+  }
+
+  @Test
+  def 
testEffectAdvertiseControllerListenerForControllerWithoutAdvertisementOrVotersConfig():
 Unit = {
+    val props = new Properties()
+    props.setProperty(KRaftConfigs.PROCESS_ROLES_CONFIG, "controller")
+    props.setProperty(KRaftConfigs.NODE_ID_CONFIG, "2")
+    props.setProperty(KRaftConfigs.CONTROLLER_LISTENER_NAMES_CONFIG, 
"CONTROLLER")
+    props.setProperty(QuorumConfig.QUORUM_BOOTSTRAP_SERVERS_CONFIG, 
"localhost:9092")
+    props.setProperty(SocketServerConfigs.LISTENERS_CONFIG, 
"CONTROLLER://:9093")
+
+    val config = KafkaConfig.fromProps(props)
+    assertEquals(
+      Seq(EndPoint(null, 9093, ListenerName.normalised("CONTROLLER"), 
SecurityProtocol.PLAINTEXT)),
       config.effectiveAdvertisedControllerListeners
     )
   }
@@ -367,17 +383,17 @@ class KafkaConfigTest {
   def 
testEffectAdvertiseControllerListenerForControllerWithMixedAdvertisement(): 
Unit = {
     val props = new Properties()
     props.setProperty(KRaftConfigs.PROCESS_ROLES_CONFIG, "controller")
-    props.setProperty(SocketServerConfigs.LISTENERS_CONFIG, 
"CONTROLLER://localhost:9093,CONTROLLER_NEW://localhost:9094")
     props.setProperty(KRaftConfigs.NODE_ID_CONFIG, "2")
-    props.setProperty(QuorumConfig.QUORUM_VOTERS_CONFIG, "2@localhost:9093")
     props.setProperty(KRaftConfigs.CONTROLLER_LISTENER_NAMES_CONFIG, 
"CONTROLLER,CONTROLLER_NEW")
     props.setProperty(SocketServerConfigs.ADVERTISED_LISTENERS_CONFIG, 
"CONTROLLER://lb1.example.com:9000")
+    props.setProperty(QuorumConfig.QUORUM_VOTERS_CONFIG, 
"[email protected]:9092")
+    props.setProperty(SocketServerConfigs.LISTENERS_CONFIG, 
"CONTROLLER://:9093,CONTROLLER_NEW://:9094")
 
     val config = KafkaConfig.fromProps(props)
     assertEquals(
       Seq(
         EndPoint("lb1.example.com", 9000, 
ListenerName.normalised("CONTROLLER"), SecurityProtocol.PLAINTEXT),
-        EndPoint("localhost", 9094, ListenerName.normalised("CONTROLLER_NEW"), 
SecurityProtocol.PLAINTEXT)
+        EndPoint(null, 9094, ListenerName.normalised("CONTROLLER_NEW"), 
SecurityProtocol.PLAINTEXT)
       ),
       config.effectiveAdvertisedControllerListeners
     )
@@ -1418,7 +1434,7 @@ class KafkaConfigTest {
   @Test
   def testValidQuorumVotersParsingWithIpAddress(): Unit = {
     val expected = new util.HashMap[Integer, InetSocketAddress]()
-    expected.put(1, new InetSocketAddress("127.0.0.1", 9092))
+    expected.put(1, InetSocketAddress.createUnresolved("127.0.0.1", 9092))
     assertValidQuorumVoters(expected, "[email protected]:9092")
   }
 
diff --git a/raft/src/main/java/org/apache/kafka/raft/QuorumConfig.java 
b/raft/src/main/java/org/apache/kafka/raft/QuorumConfig.java
index 1a7fff83ee6..77ad32a2a68 100644
--- a/raft/src/main/java/org/apache/kafka/raft/QuorumConfig.java
+++ b/raft/src/main/java/org/apache/kafka/raft/QuorumConfig.java
@@ -204,7 +204,7 @@ public class QuorumConfig {
                     + ". Each entry should be in the form 
`{id}@{host}:{port}`.");
             }
 
-            InetSocketAddress address = new InetSocketAddress(host, port);
+            InetSocketAddress address = 
InetSocketAddress.createUnresolved(host, port);
             if (address.getHostString().equals(NON_ROUTABLE_HOST) && 
requireRoutableAddresses) {
                 throw new ConfigException(
                     String.format("Host string (%s) is not routeable", 
address.getHostString())

Reply via email to