This is an automated email from the ASF dual-hosted git repository.
chia7712 pushed a commit to branch trunk
in repository https://gitbox.apache.org/repos/asf/kafka.git
The following commit(s) were added to refs/heads/trunk by this push:
new 73f40518c6c MINOR: Move listener validation logic from KafkaConfig to
AbstractKafkaConfig (#20567)
73f40518c6c is described below
commit 73f40518c6c1fa3ce7b6dc23048e500a4b7c9521
Author: TaiJuWu <[email protected]>
AuthorDate: Sat Jan 31 10:53:09 2026 +0800
MINOR: Move listener validation logic from KafkaConfig to
AbstractKafkaConfig (#20567)
This patch moves the listener parsing and validation logic (specifically
`listenerListToEndPoints` and its internal `validate` methods) from the
Scala `KafkaConfig` class to the Java `AbstractKafkaConfig` class.
Reviewers: Ken Huang <[email protected]>, TengYao Chi
<[email protected]>, Chia-Ping Tsai <[email protected]>
---
build.gradle | 3 +-
checkstyle/import-control-server.xml | 1 +
core/src/main/scala/kafka/server/KafkaConfig.scala | 82 +---------------------
.../scala/unit/kafka/server/KafkaConfigTest.scala | 8 +--
.../kafka/server/config/AbstractKafkaConfig.java | 82 ++++++++++++++++++++++
5 files changed, 92 insertions(+), 84 deletions(-)
diff --git a/build.gradle b/build.gradle
index 73bf31153a7..59f83131212 100644
--- a/build.gradle
+++ b/build.gradle
@@ -969,6 +969,8 @@ project(':server') {
compileOnly libs.bndlib
compileOnly libs.spotbugs
+ implementation libs.commonsValidator
+
implementation project(':clients')
implementation project(':metadata')
implementation project(':server-common')
@@ -1091,7 +1093,6 @@ project(':core') {
implementation project(':share-coordinator')
implementation libs.argparse4j
- implementation libs.commonsValidator
implementation libs.jacksonDatabind
implementation libs.jacksonDataformatCsv
implementation libs.jacksonJDK8Datatypes
diff --git a/checkstyle/import-control-server.xml
b/checkstyle/import-control-server.xml
index 4b087d70e36..74f60c8f162 100644
--- a/checkstyle/import-control-server.xml
+++ b/checkstyle/import-control-server.xml
@@ -32,6 +32,7 @@
<allow pkg="javax.net.ssl" />
<allow pkg="javax.security" />
<allow pkg="net.jqwik.api" />
+ <allow pkg="org.apache.commons.validator.routines" />
<!-- no one depends on the server -->
<disallow pkg="kafka" />
diff --git a/core/src/main/scala/kafka/server/KafkaConfig.scala
b/core/src/main/scala/kafka/server/KafkaConfig.scala
index 740130cfac2..1cc68ff31f1 100755
--- a/core/src/main/scala/kafka/server/KafkaConfig.scala
+++ b/core/src/main/scala/kafka/server/KafkaConfig.scala
@@ -22,7 +22,6 @@ import java.util.concurrent.TimeUnit
import java.util.Properties
import kafka.utils.Logging
import kafka.utils.Implicits._
-import org.apache.commons.validator.routines.InetAddressValidator
import org.apache.kafka.common.{Endpoint, Reconfigurable}
import org.apache.kafka.common.config.{ConfigDef, ConfigException,
ConfigResource, TopicConfig}
import org.apache.kafka.common.config.ConfigDef.ConfigKey
@@ -144,81 +143,6 @@ object KafkaConfig {
}
output
}
-
- def listenerListToEndPoints(listeners: java.util.List[String],
securityProtocolMap: java.util.Map[ListenerName, SecurityProtocol]):
Seq[Endpoint] = {
- listenerListToEndPoints(listeners, securityProtocolMap,
requireDistinctPorts = true)
- }
-
- private def checkDuplicateListenerPorts(endpoints: Seq[Endpoint], listeners:
java.util.List[String]): Unit = {
- val distinctPorts = endpoints.map(_.port).distinct
- require(distinctPorts.size == endpoints.map(_.port).size, s"Each listener
must have a different port, listeners: $listeners")
- }
-
- def listenerListToEndPoints(listeners: java.util.List[String],
securityProtocolMap: java.util.Map[ListenerName, SecurityProtocol],
requireDistinctPorts: Boolean): Seq[Endpoint] = {
- def validateOneIsIpv4AndOtherIpv6(first: String, second: String): Boolean
= {
- val inetAddressValidator = InetAddressValidator.getInstance()
- (inetAddressValidator.isValidInet4Address(first) &&
inetAddressValidator.isValidInet6Address(second)) ||
- (inetAddressValidator.isValidInet6Address(first) &&
inetAddressValidator.isValidInet4Address(second))
- }
-
- def validate(endPoints: Seq[Endpoint]): Unit = {
- val distinctListenerNames = endPoints.map(_.listener).distinct
- require(distinctListenerNames.size == endPoints.size, s"Each listener
must have a different name, listeners: $listeners")
-
- val (duplicatePorts, _) = endPoints.filter {
- // filter port 0 for unit tests
- ep => ep.port != 0
- }.groupBy(_.port).partition {
- case (_, endpoints) => endpoints.size > 1
- }
-
- // Exception case, let's allow duplicate ports if one host is on IPv4
and the other one is on IPv6
- val duplicatePortsPartitionedByValidIps = duplicatePorts.map {
- case (port, eps) =>
- (port, eps.partition(ep =>
- ep.host != null &&
InetAddressValidator.getInstance().isValid(ep.host)
- ))
- }
-
- // Iterate through every grouping of duplicates by port to see if they
are valid
- duplicatePortsPartitionedByValidIps.foreach {
- case (port, (duplicatesWithIpHosts, duplicatesWithoutIpHosts)) =>
- if (requireDistinctPorts)
- checkDuplicateListenerPorts(duplicatesWithoutIpHosts, listeners)
-
- duplicatesWithIpHosts match {
- case eps if eps.isEmpty =>
- case Seq(ep1, ep2) =>
- if (requireDistinctPorts) {
- val errorMessage = "If you have two listeners on " +
- s"the same port then one needs to be IPv4 and the other
IPv6, listeners: $listeners, port: $port"
- require(validateOneIsIpv4AndOtherIpv6(ep1.host, ep2.host),
errorMessage)
-
- // If we reach this point it means that even though
duplicatesWithIpHosts in isolation can be valid, if
- // there happens to be ANOTHER listener on this port without
an IP host (such as a null host) then its
- // not valid.
- if (duplicatesWithoutIpHosts.nonEmpty)
- throw new IllegalArgumentException(errorMessage)
- }
- case _ =>
- // Having more than 2 duplicate endpoints doesn't make sense
since we only have 2 IP stacks (one is IPv4
- // and the other is IPv6)
- if (requireDistinctPorts)
- throw new IllegalArgumentException("Each listener must have a
different port unless exactly one listener has " +
- s"an IPv4 address and the other IPv6 address, listeners:
$listeners, port: $port")
- }
- }
- }
-
- val endPoints = try {
- SocketServerConfigs.listenerListToEndPoints(listeners,
securityProtocolMap).asScala
- } catch {
- case e: Exception =>
- throw new IllegalArgumentException(s"Error creating broker listeners
from '$listeners': ${e.getMessage}", e)
- }
- validate(endPoints)
- endPoints
- }
}
/**
@@ -516,7 +440,7 @@ class KafkaConfig private(doLog: Boolean, val props:
util.Map[_, _])
}
def listeners: Seq[Endpoint] =
-
KafkaConfig.listenerListToEndPoints(getList(SocketServerConfigs.LISTENERS_CONFIG),
effectiveListenerSecurityProtocolMap)
+
AbstractKafkaConfig.listenerListToEndPoints(getList(SocketServerConfigs.LISTENERS_CONFIG),
effectiveListenerSecurityProtocolMap).asScala
def controllerListeners: Seq[Endpoint] =
listeners.filter(l => controllerListenerNames.contains(l.listener))
@@ -533,7 +457,7 @@ class KafkaConfig private(doLog: Boolean, val props:
util.Map[_, _])
def effectiveAdvertisedControllerListeners: Seq[Endpoint] = {
val advertisedListenersProp =
getList(SocketServerConfigs.ADVERTISED_LISTENERS_CONFIG)
val controllerAdvertisedListeners = if (advertisedListenersProp != null) {
- KafkaConfig.listenerListToEndPoints(advertisedListenersProp,
effectiveListenerSecurityProtocolMap, requireDistinctPorts=false)
+ AbstractKafkaConfig.listenerListToEndPoints(advertisedListenersProp,
effectiveListenerSecurityProtocolMap, false).asScala
.filter(l => controllerListenerNames.contains(l.listener))
} else {
Seq.empty
@@ -563,7 +487,7 @@ class KafkaConfig private(doLog: Boolean, val props:
util.Map[_, _])
// Use advertised listeners if defined, fallback to listeners otherwise
val advertisedListenersProp =
getList(SocketServerConfigs.ADVERTISED_LISTENERS_CONFIG)
val advertisedListeners = if (advertisedListenersProp != null) {
- KafkaConfig.listenerListToEndPoints(advertisedListenersProp,
effectiveListenerSecurityProtocolMap, requireDistinctPorts=false)
+ AbstractKafkaConfig.listenerListToEndPoints(advertisedListenersProp,
effectiveListenerSecurityProtocolMap, false).asScala
} else {
listeners
}
diff --git a/core/src/test/scala/unit/kafka/server/KafkaConfigTest.scala
b/core/src/test/scala/unit/kafka/server/KafkaConfigTest.scala
index e34a2a85fd2..82f04d9d059 100755
--- a/core/src/test/scala/unit/kafka/server/KafkaConfigTest.scala
+++ b/core/src/test/scala/unit/kafka/server/KafkaConfigTest.scala
@@ -38,7 +38,7 @@ import
org.apache.kafka.coordinator.share.ShareCoordinatorConfig
import org.apache.kafka.coordinator.transaction.{TransactionLogConfig,
TransactionStateManagerConfig}
import org.apache.kafka.network.SocketServerConfigs
import org.apache.kafka.raft.{KRaftConfigs, MetadataLogConfig, QuorumConfig}
-import org.apache.kafka.server.config.{DelegationTokenManagerConfigs,
QuotaConfig, ReplicationConfigs, ServerConfigs, ServerLogConfigs,
ServerTopicConfigSynonyms}
+import org.apache.kafka.server.config.{AbstractKafkaConfig,
DelegationTokenManagerConfigs, QuotaConfig, ReplicationConfigs, ServerConfigs,
ServerLogConfigs, ServerTopicConfigSynonyms}
import org.apache.kafka.server.log.remote.storage.RemoteLogManagerConfig
import org.apache.kafka.server.metrics.MetricConfigs
import org.apache.kafka.storage.internals.log.CleanerConfig
@@ -608,7 +608,7 @@ class KafkaConfigTest {
private def listenerListToEndPoints(listenerList: java.util.List[String],
securityProtocolMap: util.Map[ListenerName,
SecurityProtocol] = SocketServerConfigs.DEFAULT_NAME_TO_SECURITY_PROTO) =
- KafkaConfig.listenerListToEndPoints(listenerList, securityProtocolMap)
+ AbstractKafkaConfig.listenerListToEndPoints(listenerList,
securityProtocolMap)
@Test
def testListenerDefaults(): Unit = {
@@ -620,9 +620,9 @@ class KafkaConfigTest {
// configuration with no listeners
val conf = KafkaConfig.fromProps(props)
- assertEquals(listenerListToEndPoints(util.List.of("PLAINTEXT://:9092")),
conf.listeners)
+
assertEquals(listenerListToEndPoints(util.List.of("PLAINTEXT://:9092")).asScala,
conf.listeners)
assertNull(conf.listeners.find(_.securityProtocol ==
SecurityProtocol.PLAINTEXT).get.host)
- assertEquals(conf.effectiveAdvertisedBrokerListeners,
listenerListToEndPoints(util.List.of("PLAINTEXT://:9092")))
+
assertEquals(listenerListToEndPoints(util.List.of("PLAINTEXT://:9092")).asScala,
conf.effectiveAdvertisedBrokerListeners)
}
private def isValidKafkaConfig(props: Properties): Boolean = {
diff --git
a/server/src/main/java/org/apache/kafka/server/config/AbstractKafkaConfig.java
b/server/src/main/java/org/apache/kafka/server/config/AbstractKafkaConfig.java
index e649d6e6035..973f71c680f 100644
---
a/server/src/main/java/org/apache/kafka/server/config/AbstractKafkaConfig.java
+++
b/server/src/main/java/org/apache/kafka/server/config/AbstractKafkaConfig.java
@@ -16,6 +16,7 @@
*/
package org.apache.kafka.server.config;
+import org.apache.kafka.common.Endpoint;
import org.apache.kafka.common.KafkaException;
import org.apache.kafka.common.Reconfigurable;
import org.apache.kafka.common.config.AbstractConfig;
@@ -41,10 +42,13 @@ import org.apache.kafka.server.util.Csv;
import org.apache.kafka.storage.internals.log.CleanerConfig;
import org.apache.kafka.storage.internals.log.LogConfig;
+import org.apache.commons.validator.routines.InetAddressValidator;
+
import java.util.List;
import java.util.Locale;
import java.util.Map;
import java.util.Optional;
+import java.util.Set;
import java.util.stream.Collectors;
/**
@@ -54,6 +58,9 @@ import java.util.stream.Collectors;
* For more details check KAFKA-15853
*/
public abstract class AbstractKafkaConfig extends AbstractConfig {
+
+ private static final InetAddressValidator INET_ADDRESS_VALIDATOR =
InetAddressValidator.getInstance();
+
public static final ConfigDef CONFIG_DEF = Utils.mergeConfigs(List.of(
RemoteLogManagerConfig.configDef(),
ServerConfigs.CONFIG_DEF,
@@ -179,6 +186,81 @@ public abstract class AbstractKafkaConfig extends
AbstractConfig {
}
}
+ public static List<Endpoint> listenerListToEndPoints(List<String>
listeners, Map<ListenerName, SecurityProtocol> securityProtocolMap) {
+ return listenerListToEndPoints(listeners, securityProtocolMap, true);
+ }
+
+ public static List<Endpoint> listenerListToEndPoints(List<String>
listeners, Map<ListenerName, SecurityProtocol> securityProtocolMap, boolean
requireDistinctPorts) {
+ try {
+ List<Endpoint> endPoints =
SocketServerConfigs.listenerListToEndPoints(listeners, securityProtocolMap);
+ validate(endPoints, listeners, requireDistinctPorts);
+ return endPoints;
+ } catch (Exception e) {
+ throw new IllegalArgumentException(String.format("Error creating
broker listeners from '%s': %s", listeners, e.getMessage()), e);
+ }
+ }
+
+ private static void validate(List<Endpoint> endPoints, List<String>
listeners, boolean requireDistinctPorts) {
+ long distinctListenerNames =
endPoints.stream().map(Endpoint::listener).distinct().count();
+ if (distinctListenerNames != endPoints.size()) {
+ throw new IllegalArgumentException("Each listener must have a
different name, listeners: " + listeners);
+ }
+
+ if (!requireDistinctPorts) return;
+
+ endPoints.stream()
+ .filter(ep -> ep.port() != 0) // filter port 0 for unit tests
+ .collect(Collectors.groupingBy(Endpoint::port))
+ .entrySet().stream()
+ .filter(entry -> entry.getValue().size() > 1)
+ .forEach(entry -> {
+ // Iterate through every grouping of duplicates by port to see
if they are valid
+ int port = entry.getKey();
+ List<Endpoint> eps = entry.getValue();
+ // Exception case, let's allow duplicate ports if one host is
on IPv4 and the other one is on IPv6
+ Map<Boolean, List<Endpoint>> partitionedByValidIp =
eps.stream()
+ .collect(Collectors.partitioningBy(ep -> ep.host() !=
null && INET_ADDRESS_VALIDATOR.isValid(ep.host())));
+
+ List<Endpoint> duplicatesWithIpHosts =
partitionedByValidIp.get(true);
+ List<Endpoint> duplicatesWithoutIpHosts =
partitionedByValidIp.get(false);
+
+ checkDuplicateListenerPorts(duplicatesWithoutIpHosts,
listeners);
+
+ if (duplicatesWithIpHosts.isEmpty()) return;
+ if (duplicatesWithIpHosts.size() == 2) {
+ String errorMessage = "If you have two listeners on the
same port then one needs to be IPv4 and the other IPv6, listeners: " +
listeners + ", port: " + port;
+ Endpoint ep1 = duplicatesWithIpHosts.get(0);
+ Endpoint ep2 = duplicatesWithIpHosts.get(1);
+ if (!validateOneIsIpv4AndOtherIpv6(ep1.host(),
ep2.host())) {
+ throw new IllegalArgumentException(errorMessage);
+ }
+
+ // If we reach this point it means that even though
duplicatesWithIpHosts in isolation can be valid, if
+ // there happens to be ANOTHER listener on this port
without an IP host (such as a null host) then its
+ // not valid.
+ if (!duplicatesWithoutIpHosts.isEmpty()) {
+ throw new IllegalArgumentException(errorMessage);
+ }
+ return;
+ }
+ // Having more than 2 duplicate endpoints doesn't make sense
since we only have 2 IP stacks (one is IPv4
+ // and the other is IPv6)
+ throw new IllegalArgumentException("Each listener must have a
different port unless exactly one listener has an IPv4 address and the other
IPv6 address, listeners: " + listeners + ", port: " + port);
+ });
+ }
+
+ private static boolean validateOneIsIpv4AndOtherIpv6(String first, String
second) {
+ return (INET_ADDRESS_VALIDATOR.isValidInet4Address(first) &&
INET_ADDRESS_VALIDATOR.isValidInet6Address(second)) ||
+ (INET_ADDRESS_VALIDATOR.isValidInet6Address(first) &&
INET_ADDRESS_VALIDATOR.isValidInet4Address(second));
+ }
+
+ private static void checkDuplicateListenerPorts(List<Endpoint> endpoints,
List<String> listeners) {
+ Set<Integer> distinctPorts =
endpoints.stream().map(Endpoint::port).collect(Collectors.toSet());
+ if (endpoints.size() != distinctPorts.size()) {
+ throw new IllegalArgumentException("Each listener must have a
different port, listeners: " + listeners);
+ }
+ }
+
private static SecurityProtocol securityProtocol(String protocolName,
String configName) {
try {
return SecurityProtocol.forName(protocolName);