This is an automated email from the ASF dual-hosted git repository.
jsancio pushed a commit to branch 3.9
in repository https://gitbox.apache.org/repos/asf/kafka.git
The following commit(s) were added to refs/heads/3.9 by this push:
new 4d0216dd9f0 KAFKA-18281: Kafka is improperly validating non-advertised
listeners for routable controller addresses (#18387)
4d0216dd9f0 is described below
commit 4d0216dd9f0d1c125313e04521726f6934f056e0
Author: PoAn Yang <[email protected]>
AuthorDate: Tue Feb 25 10:51:28 2025 +0800
KAFKA-18281: Kafka is improperly validating non-advertised listeners for
routable controller addresses (#18387)
When a cluster is configured with a dynamic controller quorum, KRaft
replica's endpoint are computed using the advertised.listeners property and not
the quorum.controller.voters property. This change in the configuration makes
it difficult to keeping all previous node configurations compatible with the
new endpoint discovery functionality.
The least intrusive solution is to rely on Kafka's reverse hostname lookup
when the hostname is not specified. The effective advertised controller
listener now remove '0.0.0.0' hostname if the endpoint came from the listener
configuration and not the advertised.listener configuration.
Reviewers: José Armando García Sancio <[email protected]>, Alyssa Huang
<[email protected]>
---
core/src/main/scala/kafka/server/KafkaConfig.scala | 39 +++++++----
.../scala/unit/kafka/server/KafkaConfigTest.scala | 76 ++++++++++++++++++++++
2 files changed, 103 insertions(+), 12 deletions(-)
diff --git a/core/src/main/scala/kafka/server/KafkaConfig.scala
b/core/src/main/scala/kafka/server/KafkaConfig.scala
index 7992f833bc3..e22cc103408 100755
--- a/core/src/main/scala/kafka/server/KafkaConfig.scala
+++ b/core/src/main/scala/kafka/server/KafkaConfig.scala
@@ -747,17 +747,42 @@ class KafkaConfig private(doLog: Boolean, val props:
util.Map[_, _])
}
def effectiveAdvertisedControllerListeners: Seq[EndPoint] = {
- val controllerAdvertisedListeners = advertisedListeners.filter(l =>
controllerListenerNames.contains(l.listenerName.value()))
+ val advertisedListenersProp =
getString(SocketServerConfigs.ADVERTISED_LISTENERS_CONFIG)
+ val controllerAdvertisedListeners = if (advertisedListenersProp != null) {
+ CoreUtils.listenerListToEndPoints(advertisedListenersProp,
effectiveListenerSecurityProtocolMap, requireDistinctPorts=false)
+ .filter(l => controllerListenerNames.contains(l.listenerName.value()))
+ } else {
+ Seq.empty
+ }
val controllerListenersValue = controllerListeners
controllerListenerNames.flatMap { name =>
controllerAdvertisedListeners
.find(endpoint =>
endpoint.listenerName.equals(ListenerName.normalised(name)))
- .orElse(controllerListenersValue.find(endpoint =>
endpoint.listenerName.equals(ListenerName.normalised(name))))
+ .orElse(
+ // If users don't define advertised.listeners, the advertised
controller listeners inherit from listeners configuration
+ // which match listener names in controller.listener.names.
+ // Removing "0.0.0.0" host to avoid validation errors. This is to be
compatible with the old behavior before 3.9.
+ // The null or "" host does a reverse lookup in
ListenerInfo#withWildcardHostnamesResolved.
+ controllerListenersValue
+ .find(endpoint =>
endpoint.listenerName.equals(ListenerName.normalised(name)))
+ .map(endpoint => if (endpoint.host == "0.0.0.0") {
+ new EndPoint(null, endpoint.port, endpoint.listenerName,
endpoint.securityProtocol)
+ } else {
+ endpoint
+ })
+ )
}
}
def effectiveAdvertisedBrokerListeners: Seq[EndPoint] = {
+ val advertisedListenersProp =
getString(SocketServerConfigs.ADVERTISED_LISTENERS_CONFIG)
+ val advertisedListeners = if (advertisedListenersProp != null) {
+ CoreUtils.listenerListToEndPoints(advertisedListenersProp,
effectiveListenerSecurityProtocolMap, requireDistinctPorts=false)
+ } else {
+ listeners
+ }
+ // Only expose broker listeners
advertisedListeners.filter(l => {
if (!controllerListenerNames.contains(l.listenerName.value())) {
true
@@ -771,16 +796,6 @@ class KafkaConfig private(doLog: Boolean, val props:
util.Map[_, _])
})
}
- // Use advertised listeners if defined, fallback to listeners otherwise
- private def advertisedListeners: Seq[EndPoint] = {
- val advertisedListenersProp =
getString(SocketServerConfigs.ADVERTISED_LISTENERS_CONFIG)
- if (advertisedListenersProp != null) {
- CoreUtils.listenerListToEndPoints(advertisedListenersProp,
effectiveListenerSecurityProtocolMap, requireDistinctPorts=false)
- } else {
- listeners
- }
- }
-
private def getInterBrokerListenerNameAndSecurityProtocol: (ListenerName,
SecurityProtocol) = {
Option(getString(ReplicationConfigs.INTER_BROKER_LISTENER_NAME_CONFIG))
match {
case Some(_) if
originals.containsKey(ReplicationConfigs.INTER_BROKER_SECURITY_PROTOCOL_CONFIG)
=>
diff --git a/core/src/test/scala/unit/kafka/server/KafkaConfigTest.scala
b/core/src/test/scala/unit/kafka/server/KafkaConfigTest.scala
index 541080aee8a..9c924297c12 100755
--- a/core/src/test/scala/unit/kafka/server/KafkaConfigTest.scala
+++ b/core/src/test/scala/unit/kafka/server/KafkaConfigTest.scala
@@ -1350,6 +1350,82 @@ class KafkaConfigTest {
KafkaConfig.fromProps(props)
}
+ @Test
+ def testImplicitAllBindingListenersCanBeAdvertisedForBroker(): Unit = {
+ val props = new Properties()
+ props.setProperty(KRaftConfigs.PROCESS_ROLES_CONFIG, "broker")
+ val listeners = "PLAINTEXT://:9092"
+ props.setProperty(SocketServerConfigs.LISTENERS_CONFIG, listeners)
+ props.setProperty(SocketServerConfigs.ADVERTISED_LISTENERS_CONFIG,
listeners) // explicitly setting it in broker
+ props.setProperty(KRaftConfigs.CONTROLLER_LISTENER_NAMES_CONFIG,
"CONTROLLER")
+ props.setProperty(KRaftConfigs.NODE_ID_CONFIG, "2")
+ props.setProperty(QuorumConfig.QUORUM_VOTERS_CONFIG, "1@localhost:9093")
+
+ // Valid
+ KafkaConfig.fromProps(props)
+
+ // Also valid if we allow advertised listeners to derive from listeners
+ props.remove(SocketServerConfigs.ADVERTISED_LISTENERS_CONFIG)
+ KafkaConfig.fromProps(props)
+ }
+
+ @Test
+ def testExplicitAllBindingListenersCannotBeUsedForBroker(): Unit = {
+ val props = new Properties()
+ props.setProperty(KRaftConfigs.PROCESS_ROLES_CONFIG, "broker")
+ val listeners = "PLAINTEXT://0.0.0.0:9092"
+ props.setProperty(SocketServerConfigs.LISTENERS_CONFIG, listeners)
+ props.setProperty(SocketServerConfigs.ADVERTISED_LISTENERS_CONFIG,
listeners) // explicitly setting it in KRaft
+ props.setProperty(KRaftConfigs.CONTROLLER_LISTENER_NAMES_CONFIG,
"CONTROLLER")
+ props.setProperty(KRaftConfigs.NODE_ID_CONFIG, "2")
+ props.setProperty(QuorumConfig.QUORUM_VOTERS_CONFIG, "1@localhost:9093")
+
+ val expectedExceptionContainsText = "advertised.listeners cannot use the
nonroutable meta-address 0.0.0.0. Use a routable IP address."
+ assertBadConfigContainingMessage(props, expectedExceptionContainsText)
+
+ // invalid if we allow advertised listeners to derive from listeners
+ props.remove(SocketServerConfigs.ADVERTISED_LISTENERS_CONFIG)
+ assertBadConfigContainingMessage(props, expectedExceptionContainsText)
+ }
+
+ @Test
+ def
testImplicitAllBindingControllerListenersCanBeAdvertisedForKRaftController():
Unit = {
+ val props = new Properties()
+ props.setProperty(KRaftConfigs.PROCESS_ROLES_CONFIG, "controller")
+ val listeners = "CONTROLLER://:9093"
+ props.setProperty(SocketServerConfigs.LISTENERS_CONFIG, listeners)
+ props.setProperty(SocketServerConfigs.ADVERTISED_LISTENERS_CONFIG,
listeners) // explicitly setting it in KRaft
+ props.setProperty(KRaftConfigs.CONTROLLER_LISTENER_NAMES_CONFIG,
"CONTROLLER")
+ props.setProperty(KRaftConfigs.NODE_ID_CONFIG, "2")
+ props.setProperty(QuorumConfig.QUORUM_VOTERS_CONFIG, "2@localhost:9093")
+
+ // Valid
+ KafkaConfig.fromProps(props)
+
+ // Also valid if we allow advertised listeners to derive from
listeners/controller.listener.names
+ props.remove(SocketServerConfigs.ADVERTISED_LISTENERS_CONFIG)
+ KafkaConfig.fromProps(props)
+ }
+
+ @Test
+ def
testExplicitAllBindingControllerListenersCanBeAdvertisedForKRaftController():
Unit = {
+ val props = new Properties()
+ props.setProperty(KRaftConfigs.PROCESS_ROLES_CONFIG, "controller")
+ val listeners = "CONTROLLER://0.0.0.0:9093"
+ props.setProperty(SocketServerConfigs.LISTENERS_CONFIG, listeners)
+ props.setProperty(SocketServerConfigs.ADVERTISED_LISTENERS_CONFIG,
listeners) // explicitly setting it in KRaft
+ props.setProperty(KRaftConfigs.CONTROLLER_LISTENER_NAMES_CONFIG,
"CONTROLLER")
+ props.setProperty(KRaftConfigs.NODE_ID_CONFIG, "2")
+ props.setProperty(QuorumConfig.QUORUM_VOTERS_CONFIG, "2@localhost:9093")
+
+ val expectedExceptionContainsText = "advertised.listeners cannot use the
nonroutable meta-address 0.0.0.0. Use a routable IP address."
+ assertBadConfigContainingMessage(props, expectedExceptionContainsText)
+
+ // Valid if we allow advertised listeners to derive from
listeners/controller.listener.names
+ props.remove(SocketServerConfigs.ADVERTISED_LISTENERS_CONFIG)
+ KafkaConfig.fromProps(props)
+ }
+
@Test
def testControllerListenersCanBeAdvertisedForKRaftCombined(): Unit = {
val props = new Properties()