This is an automated email from the ASF dual-hosted git repository.
jsancio pushed a commit to branch 4.3
in repository https://gitbox.apache.org/repos/asf/kafka.git
The following commit(s) were added to refs/heads/4.3 by this push:
new 84b51283d69 KAFKA-20380; backwards compatible advertised.listeners
when it is not defined (#21918)
84b51283d69 is described below
commit 84b51283d69c072a0a37becc99b0e57f39344bba
Author: Kevin Wu <[email protected]>
AuthorDate: Tue May 5 14:41:49 2026 -0500
KAFKA-20380; backwards compatible advertised.listeners when it is not
defined (#21918)
Kafka can incorrectly resolve the advertised listener for controllers if
it is not specified. For controller configurations that specify the
controller.quorum.voters but not the advertised.listener, Kafka can
deduce the advertise listener for the default listener. In this cases,
Kafka will automatically set the advertised listener for the default
listener to the endpoint specified in controller.quorum.voters.
Reviewers: José Armando García Sancio <[email protected]>
Co-authored-by: José Armando García Sancio <[email protected]>
---
core/src/main/scala/kafka/server/KafkaConfig.scala | 49 ++++++++++++++++------
.../scala/unit/kafka/server/KafkaConfigTest.scala | 32 ++++++++++----
.../java/org/apache/kafka/raft/QuorumConfig.java | 2 +-
3 files changed, 62 insertions(+), 21 deletions(-)
diff --git a/core/src/main/scala/kafka/server/KafkaConfig.scala
b/core/src/main/scala/kafka/server/KafkaConfig.scala
index c4268d0f104..c7f27adecc3 100755
--- a/core/src/main/scala/kafka/server/KafkaConfig.scala
+++ b/core/src/main/scala/kafka/server/KafkaConfig.scala
@@ -473,22 +473,47 @@ class KafkaConfig private(doLog: Boolean, val props:
util.Map[_, _])
}
val controllerListenersValue = controllerListeners
- controllerListenerNames.asScala.flatMap { name =>
+ def nameToEndpoint(name: String, isDefault: Boolean): Option[Endpoint] = {
controllerAdvertisedListeners
.find(endpoint =>
ListenerName.normalised(endpoint.listener).equals(ListenerName.normalised(name)))
- .orElse(
- // If users don't define advertised.listeners, the advertised
controller listeners inherit from listeners configuration
- // which match listener names in controller.listener.names.
- // Removing "0.0.0.0" host to avoid validation errors. This is to be
compatible with the old behavior before 3.9.
- // The null or "" host does a reverse lookup in
ListenerInfo#withWildcardHostnamesResolved.
+ .orElse {
controllerListenersValue
.find(endpoint =>
ListenerName.normalised(endpoint.listener).equals(ListenerName.normalised(name)))
- .map(endpoint => if (endpoint.host == "0.0.0.0") {
- new Endpoint(endpoint.listener, endpoint.securityProtocol, null,
endpoint.port)
- } else {
- endpoint
- })
- )
+ .map { endpoint =>
+ val voterListenerOverride = {
+ // the user did not provide an advertised listener for the
default controller listener;
+ // if controller.quorum.voters defines an endpoint for this
node, use that as the advertised listener
+ if (isDefault && (endpoint.host == null || endpoint.host ==
"0.0.0.0")) {
+ val votersAddress =
QuorumConfig.parseVoterConnections(quorumConfig.voters).asScala.get(nodeId())
+ votersAddress.map { socketAddress =>
+ new Endpoint(
+ endpoint.listener,
+ endpoint.securityProtocol,
+ socketAddress.getHostString,
+ socketAddress.getPort
+ )
+ }
+ } else {
+ None
+ }
+ }
+ voterListenerOverride.getOrElse {
+ // Removing "0.0.0.0" host to avoid validation errors.
+ // This is to be compatible with the old behavior before 3.9.
+ // The null or "" host does a reverse lookup in
ListenerInfo#withWildcardHostnamesResolved.
+ if (endpoint.host == "0.0.0.0") {
+ new Endpoint(endpoint.listener, endpoint.securityProtocol,
null, endpoint.port)
+ } else {
+ endpoint
+ }
+ }
+ }
+ }
+ }
+
+ controllerListenerNames.asScala.toList match {
+ case Nil => Nil
+ case head :: tail => nameToEndpoint(head, true).toList ++
tail.flatMap(nameToEndpoint(_, false))
}
}
diff --git a/core/src/test/scala/unit/kafka/server/KafkaConfigTest.scala
b/core/src/test/scala/unit/kafka/server/KafkaConfigTest.scala
index cf9063a3c2b..4cf4d349e4a 100755
--- a/core/src/test/scala/unit/kafka/server/KafkaConfigTest.scala
+++ b/core/src/test/scala/unit/kafka/server/KafkaConfigTest.scala
@@ -349,17 +349,33 @@ class KafkaConfigTest {
}
@Test
- def
testEffectAdvertiseControllerListenerForControllerWithoutAdvertisement(): Unit
= {
+ def
testEffectAdvertiseControllerListenerForControllerWithoutAdvertisementAndVotersConfig():
Unit = {
val props = new Properties()
props.setProperty(KRaftConfigs.PROCESS_ROLES_CONFIG, "controller")
- props.setProperty(SocketServerConfigs.LISTENERS_CONFIG,
"CONTROLLER://localhost:9093")
props.setProperty(KRaftConfigs.NODE_ID_CONFIG, "2")
- props.setProperty(QuorumConfig.QUORUM_VOTERS_CONFIG, "2@localhost:9093")
props.setProperty(KRaftConfigs.CONTROLLER_LISTENER_NAMES_CONFIG,
"CONTROLLER")
+ props.setProperty(QuorumConfig.QUORUM_VOTERS_CONFIG,
"[email protected]:9092")
+ props.setProperty(SocketServerConfigs.LISTENERS_CONFIG,
"CONTROLLER://:9093")
val config = KafkaConfig.fromProps(props)
assertEquals(
- Seq(new Endpoint("CONTROLLER", SecurityProtocol.PLAINTEXT, "localhost",
9093)),
+ Seq(new Endpoint("CONTROLLER", SecurityProtocol.PLAINTEXT,
"lb1.example.com", 9092)),
+ config.effectiveAdvertisedControllerListeners
+ )
+ }
+
+ @Test
+ def
testEffectAdvertiseControllerListenerForControllerWithoutAdvertisementOrVotersConfig():
Unit = {
+ val props = new Properties()
+ props.setProperty(KRaftConfigs.PROCESS_ROLES_CONFIG, "controller")
+ props.setProperty(KRaftConfigs.NODE_ID_CONFIG, "2")
+ props.setProperty(KRaftConfigs.CONTROLLER_LISTENER_NAMES_CONFIG,
"CONTROLLER")
+ props.setProperty(QuorumConfig.QUORUM_BOOTSTRAP_SERVERS_CONFIG,
"localhost:9092")
+ props.setProperty(SocketServerConfigs.LISTENERS_CONFIG,
"CONTROLLER://:9093")
+
+ val config = KafkaConfig.fromProps(props)
+ assertEquals(
+ Seq(new Endpoint("CONTROLLER", SecurityProtocol.PLAINTEXT, null, 9093)),
config.effectiveAdvertisedControllerListeners
)
}
@@ -368,17 +384,17 @@ class KafkaConfigTest {
def
testEffectAdvertiseControllerListenerForControllerWithMixedAdvertisement():
Unit = {
val props = new Properties()
props.setProperty(KRaftConfigs.PROCESS_ROLES_CONFIG, "controller")
- props.setProperty(SocketServerConfigs.LISTENERS_CONFIG,
"CONTROLLER://localhost:9093,CONTROLLER_NEW://localhost:9094")
props.setProperty(KRaftConfigs.NODE_ID_CONFIG, "2")
- props.setProperty(QuorumConfig.QUORUM_VOTERS_CONFIG, "2@localhost:9093")
props.setProperty(KRaftConfigs.CONTROLLER_LISTENER_NAMES_CONFIG,
"CONTROLLER,CONTROLLER_NEW")
props.setProperty(SocketServerConfigs.ADVERTISED_LISTENERS_CONFIG,
"CONTROLLER://lb1.example.com:9000")
+ props.setProperty(QuorumConfig.QUORUM_VOTERS_CONFIG,
"[email protected]:9092")
+ props.setProperty(SocketServerConfigs.LISTENERS_CONFIG,
"CONTROLLER://:9093,CONTROLLER_NEW://:9094")
val config = KafkaConfig.fromProps(props)
assertEquals(
Seq(
new Endpoint("CONTROLLER", SecurityProtocol.PLAINTEXT,
"lb1.example.com", 9000),
- new Endpoint("CONTROLLER_NEW", SecurityProtocol.PLAINTEXT,
"localhost", 9094)
+ new Endpoint("CONTROLLER_NEW", SecurityProtocol.PLAINTEXT, null, 9094)
),
config.effectiveAdvertisedControllerListeners
)
@@ -1470,7 +1486,7 @@ class KafkaConfigTest {
@Test
def testValidQuorumVotersParsingWithIpAddress(): Unit = {
val expected = new util.HashMap[Integer, InetSocketAddress]()
- expected.put(1, new InetSocketAddress("127.0.0.1", 9092))
+ expected.put(1, InetSocketAddress.createUnresolved("127.0.0.1", 9092))
assertValidQuorumVoters(expected, "[email protected]:9092")
}
diff --git a/raft/src/main/java/org/apache/kafka/raft/QuorumConfig.java
b/raft/src/main/java/org/apache/kafka/raft/QuorumConfig.java
index 3ecc1b2c190..a52d676b26a 100644
--- a/raft/src/main/java/org/apache/kafka/raft/QuorumConfig.java
+++ b/raft/src/main/java/org/apache/kafka/raft/QuorumConfig.java
@@ -243,7 +243,7 @@ public class QuorumConfig {
+ ". Each entry should be in the form
`{id}@{host}:{port}`.");
}
- InetSocketAddress address = new InetSocketAddress(host, port);
+ InetSocketAddress address =
InetSocketAddress.createUnresolved(host, port);
if (address.getHostString().equals(NON_ROUTABLE_HOST) &&
requireRoutableAddresses) {
throw new ConfigException(
String.format("Host string (%s) is not routeable",
address.getHostString())