This is an automated email from the ASF dual-hosted git repository.

chia7712 pushed a commit to branch trunk
in repository https://gitbox.apache.org/repos/asf/kafka.git


The following commit(s) were added to refs/heads/trunk by this push:
     new 73f40518c6c MINOR: Move listener validation logic from KafkaConfig to 
AbstractKafkaConfig (#20567)
73f40518c6c is described below

commit 73f40518c6c1fa3ce7b6dc23048e500a4b7c9521
Author: TaiJuWu <[email protected]>
AuthorDate: Sat Jan 31 10:53:09 2026 +0800

    MINOR: Move listener validation logic from KafkaConfig to 
AbstractKafkaConfig (#20567)
    
    This patch moves the listener parsing and validation logic (specifically
    `listenerListToEndPoints` and its internal `validate` methods) from the
    Scala `KafkaConfig` class to the Java `AbstractKafkaConfig` class.
    
    Reviewers: Ken Huang <[email protected]>, TengYao Chi
     <[email protected]>, Chia-Ping Tsai <[email protected]>
---
 build.gradle                                       |  3 +-
 checkstyle/import-control-server.xml               |  1 +
 core/src/main/scala/kafka/server/KafkaConfig.scala | 82 +---------------------
 .../scala/unit/kafka/server/KafkaConfigTest.scala  |  8 +--
 .../kafka/server/config/AbstractKafkaConfig.java   | 82 ++++++++++++++++++++++
 5 files changed, 92 insertions(+), 84 deletions(-)

diff --git a/build.gradle b/build.gradle
index 73bf31153a7..59f83131212 100644
--- a/build.gradle
+++ b/build.gradle
@@ -969,6 +969,8 @@ project(':server') {
     compileOnly libs.bndlib
     compileOnly libs.spotbugs
 
+    implementation libs.commonsValidator
+
     implementation project(':clients')
     implementation project(':metadata')
     implementation project(':server-common')
@@ -1091,7 +1093,6 @@ project(':core') {
     implementation project(':share-coordinator')
 
     implementation libs.argparse4j
-    implementation libs.commonsValidator
     implementation libs.jacksonDatabind
     implementation libs.jacksonDataformatCsv
     implementation libs.jacksonJDK8Datatypes
diff --git a/checkstyle/import-control-server.xml 
b/checkstyle/import-control-server.xml
index 4b087d70e36..74f60c8f162 100644
--- a/checkstyle/import-control-server.xml
+++ b/checkstyle/import-control-server.xml
@@ -32,6 +32,7 @@
   <allow pkg="javax.net.ssl" />
   <allow pkg="javax.security" />
   <allow pkg="net.jqwik.api" />
+  <allow pkg="org.apache.commons.validator.routines" />
 
   <!-- no one depends on the server -->
   <disallow pkg="kafka" />
diff --git a/core/src/main/scala/kafka/server/KafkaConfig.scala 
b/core/src/main/scala/kafka/server/KafkaConfig.scala
index 740130cfac2..1cc68ff31f1 100755
--- a/core/src/main/scala/kafka/server/KafkaConfig.scala
+++ b/core/src/main/scala/kafka/server/KafkaConfig.scala
@@ -22,7 +22,6 @@ import java.util.concurrent.TimeUnit
 import java.util.Properties
 import kafka.utils.Logging
 import kafka.utils.Implicits._
-import org.apache.commons.validator.routines.InetAddressValidator
 import org.apache.kafka.common.{Endpoint, Reconfigurable}
 import org.apache.kafka.common.config.{ConfigDef, ConfigException, 
ConfigResource, TopicConfig}
 import org.apache.kafka.common.config.ConfigDef.ConfigKey
@@ -144,81 +143,6 @@ object KafkaConfig {
     }
     output
   }
-
-  def listenerListToEndPoints(listeners: java.util.List[String], 
securityProtocolMap: java.util.Map[ListenerName, SecurityProtocol]): 
Seq[Endpoint] = {
-    listenerListToEndPoints(listeners, securityProtocolMap, 
requireDistinctPorts = true)
-  }
-
-  private def checkDuplicateListenerPorts(endpoints: Seq[Endpoint], listeners: 
java.util.List[String]): Unit = {
-    val distinctPorts = endpoints.map(_.port).distinct
-    require(distinctPorts.size == endpoints.map(_.port).size, s"Each listener 
must have a different port, listeners: $listeners")
-  }
-
-  def listenerListToEndPoints(listeners: java.util.List[String], 
securityProtocolMap: java.util.Map[ListenerName, SecurityProtocol], 
requireDistinctPorts: Boolean): Seq[Endpoint] = {
-    def validateOneIsIpv4AndOtherIpv6(first: String, second: String): Boolean 
= {
-      val inetAddressValidator = InetAddressValidator.getInstance()
-      (inetAddressValidator.isValidInet4Address(first) && 
inetAddressValidator.isValidInet6Address(second)) ||
-        (inetAddressValidator.isValidInet6Address(first) && 
inetAddressValidator.isValidInet4Address(second))
-    }
-
-    def validate(endPoints: Seq[Endpoint]): Unit = {
-      val distinctListenerNames = endPoints.map(_.listener).distinct
-      require(distinctListenerNames.size == endPoints.size, s"Each listener 
must have a different name, listeners: $listeners")
-
-      val (duplicatePorts, _) = endPoints.filter {
-        // filter port 0 for unit tests
-        ep => ep.port != 0
-      }.groupBy(_.port).partition {
-        case (_, endpoints) => endpoints.size > 1
-      }
-
-      // Exception case, let's allow duplicate ports if one host is on IPv4 
and the other one is on IPv6
-      val duplicatePortsPartitionedByValidIps = duplicatePorts.map {
-        case (port, eps) =>
-          (port, eps.partition(ep =>
-            ep.host != null && 
InetAddressValidator.getInstance().isValid(ep.host)
-          ))
-      }
-
-      // Iterate through every grouping of duplicates by port to see if they 
are valid
-      duplicatePortsPartitionedByValidIps.foreach {
-        case (port, (duplicatesWithIpHosts, duplicatesWithoutIpHosts)) =>
-          if (requireDistinctPorts)
-            checkDuplicateListenerPorts(duplicatesWithoutIpHosts, listeners)
-
-          duplicatesWithIpHosts match {
-            case eps if eps.isEmpty =>
-            case Seq(ep1, ep2) =>
-              if (requireDistinctPorts) {
-                val errorMessage = "If you have two listeners on " +
-                  s"the same port then one needs to be IPv4 and the other 
IPv6, listeners: $listeners, port: $port"
-                require(validateOneIsIpv4AndOtherIpv6(ep1.host, ep2.host), 
errorMessage)
-
-                // If we reach this point it means that even though 
duplicatesWithIpHosts in isolation can be valid, if
-                // there happens to be ANOTHER listener on this port without 
an IP host (such as a null host) then its
-                // not valid.
-                if (duplicatesWithoutIpHosts.nonEmpty)
-                  throw new IllegalArgumentException(errorMessage)
-              }
-            case _ =>
-              // Having more than 2 duplicate endpoints doesn't make sense 
since we only have 2 IP stacks (one is IPv4
-              // and the other is IPv6)
-              if (requireDistinctPorts)
-                throw new IllegalArgumentException("Each listener must have a 
different port unless exactly one listener has " +
-                  s"an IPv4 address and the other IPv6 address, listeners: 
$listeners, port: $port")
-          }
-      }
-    }
-
-    val endPoints = try {
-      SocketServerConfigs.listenerListToEndPoints(listeners, 
securityProtocolMap).asScala
-    } catch {
-      case e: Exception =>
-        throw new IllegalArgumentException(s"Error creating broker listeners 
from '$listeners': ${e.getMessage}", e)
-    }
-    validate(endPoints)
-    endPoints
-  }
 }
 
 /**
@@ -516,7 +440,7 @@ class KafkaConfig private(doLog: Boolean, val props: 
util.Map[_, _])
   }
 
   def listeners: Seq[Endpoint] =
-    
KafkaConfig.listenerListToEndPoints(getList(SocketServerConfigs.LISTENERS_CONFIG),
 effectiveListenerSecurityProtocolMap)
+    
AbstractKafkaConfig.listenerListToEndPoints(getList(SocketServerConfigs.LISTENERS_CONFIG),
 effectiveListenerSecurityProtocolMap).asScala
 
   def controllerListeners: Seq[Endpoint] =
     listeners.filter(l => controllerListenerNames.contains(l.listener))
@@ -533,7 +457,7 @@ class KafkaConfig private(doLog: Boolean, val props: 
util.Map[_, _])
   def effectiveAdvertisedControllerListeners: Seq[Endpoint] = {
     val advertisedListenersProp = 
getList(SocketServerConfigs.ADVERTISED_LISTENERS_CONFIG)
     val controllerAdvertisedListeners = if (advertisedListenersProp != null) {
-      KafkaConfig.listenerListToEndPoints(advertisedListenersProp, 
effectiveListenerSecurityProtocolMap, requireDistinctPorts=false)
+      AbstractKafkaConfig.listenerListToEndPoints(advertisedListenersProp, 
effectiveListenerSecurityProtocolMap, false).asScala
         .filter(l => controllerListenerNames.contains(l.listener))
     } else {
       Seq.empty
@@ -563,7 +487,7 @@ class KafkaConfig private(doLog: Boolean, val props: 
util.Map[_, _])
     // Use advertised listeners if defined, fallback to listeners otherwise
     val advertisedListenersProp = 
getList(SocketServerConfigs.ADVERTISED_LISTENERS_CONFIG)
     val advertisedListeners = if (advertisedListenersProp != null) {
-      KafkaConfig.listenerListToEndPoints(advertisedListenersProp, 
effectiveListenerSecurityProtocolMap, requireDistinctPorts=false)
+      AbstractKafkaConfig.listenerListToEndPoints(advertisedListenersProp, 
effectiveListenerSecurityProtocolMap, false).asScala
     } else {
       listeners
     }
diff --git a/core/src/test/scala/unit/kafka/server/KafkaConfigTest.scala 
b/core/src/test/scala/unit/kafka/server/KafkaConfigTest.scala
index e34a2a85fd2..82f04d9d059 100755
--- a/core/src/test/scala/unit/kafka/server/KafkaConfigTest.scala
+++ b/core/src/test/scala/unit/kafka/server/KafkaConfigTest.scala
@@ -38,7 +38,7 @@ import 
org.apache.kafka.coordinator.share.ShareCoordinatorConfig
 import org.apache.kafka.coordinator.transaction.{TransactionLogConfig, 
TransactionStateManagerConfig}
 import org.apache.kafka.network.SocketServerConfigs
 import org.apache.kafka.raft.{KRaftConfigs, MetadataLogConfig, QuorumConfig}
-import org.apache.kafka.server.config.{DelegationTokenManagerConfigs, 
QuotaConfig, ReplicationConfigs, ServerConfigs, ServerLogConfigs, 
ServerTopicConfigSynonyms}
+import org.apache.kafka.server.config.{AbstractKafkaConfig, 
DelegationTokenManagerConfigs, QuotaConfig, ReplicationConfigs, ServerConfigs, 
ServerLogConfigs, ServerTopicConfigSynonyms}
 import org.apache.kafka.server.log.remote.storage.RemoteLogManagerConfig
 import org.apache.kafka.server.metrics.MetricConfigs
 import org.apache.kafka.storage.internals.log.CleanerConfig
@@ -608,7 +608,7 @@ class KafkaConfigTest {
 
   private def listenerListToEndPoints(listenerList: java.util.List[String],
                               securityProtocolMap: util.Map[ListenerName, 
SecurityProtocol] = SocketServerConfigs.DEFAULT_NAME_TO_SECURITY_PROTO) =
-    KafkaConfig.listenerListToEndPoints(listenerList, securityProtocolMap)
+    AbstractKafkaConfig.listenerListToEndPoints(listenerList, 
securityProtocolMap)
 
   @Test
   def testListenerDefaults(): Unit = {
@@ -620,9 +620,9 @@ class KafkaConfigTest {
 
     // configuration with no listeners
     val conf = KafkaConfig.fromProps(props)
-    assertEquals(listenerListToEndPoints(util.List.of("PLAINTEXT://:9092")), 
conf.listeners)
+    
assertEquals(listenerListToEndPoints(util.List.of("PLAINTEXT://:9092")).asScala,
 conf.listeners)
     assertNull(conf.listeners.find(_.securityProtocol == 
SecurityProtocol.PLAINTEXT).get.host)
-    assertEquals(conf.effectiveAdvertisedBrokerListeners, 
listenerListToEndPoints(util.List.of("PLAINTEXT://:9092")))
+    
assertEquals(listenerListToEndPoints(util.List.of("PLAINTEXT://:9092")).asScala,
 conf.effectiveAdvertisedBrokerListeners)
   }
 
   private def isValidKafkaConfig(props: Properties): Boolean = {
diff --git 
a/server/src/main/java/org/apache/kafka/server/config/AbstractKafkaConfig.java 
b/server/src/main/java/org/apache/kafka/server/config/AbstractKafkaConfig.java
index e649d6e6035..973f71c680f 100644
--- 
a/server/src/main/java/org/apache/kafka/server/config/AbstractKafkaConfig.java
+++ 
b/server/src/main/java/org/apache/kafka/server/config/AbstractKafkaConfig.java
@@ -16,6 +16,7 @@
  */
 package org.apache.kafka.server.config;
 
+import org.apache.kafka.common.Endpoint;
 import org.apache.kafka.common.KafkaException;
 import org.apache.kafka.common.Reconfigurable;
 import org.apache.kafka.common.config.AbstractConfig;
@@ -41,10 +42,13 @@ import org.apache.kafka.server.util.Csv;
 import org.apache.kafka.storage.internals.log.CleanerConfig;
 import org.apache.kafka.storage.internals.log.LogConfig;
 
+import org.apache.commons.validator.routines.InetAddressValidator;
+
 import java.util.List;
 import java.util.Locale;
 import java.util.Map;
 import java.util.Optional;
+import java.util.Set;
 import java.util.stream.Collectors;
 
 /**
@@ -54,6 +58,9 @@ import java.util.stream.Collectors;
  * For more details check KAFKA-15853
  */
 public abstract class AbstractKafkaConfig extends AbstractConfig {
+
+    private static final InetAddressValidator INET_ADDRESS_VALIDATOR = 
InetAddressValidator.getInstance();
+
     public static final ConfigDef CONFIG_DEF = Utils.mergeConfigs(List.of(
         RemoteLogManagerConfig.configDef(),
         ServerConfigs.CONFIG_DEF,
@@ -179,6 +186,81 @@ public abstract class AbstractKafkaConfig extends 
AbstractConfig {
         }
     }
 
+    public static List<Endpoint> listenerListToEndPoints(List<String> 
listeners, Map<ListenerName, SecurityProtocol> securityProtocolMap) {
+        return listenerListToEndPoints(listeners, securityProtocolMap, true);
+    }
+
+    public static List<Endpoint> listenerListToEndPoints(List<String> 
listeners, Map<ListenerName, SecurityProtocol> securityProtocolMap, boolean 
requireDistinctPorts) {
+        try {
+            List<Endpoint> endPoints = 
SocketServerConfigs.listenerListToEndPoints(listeners, securityProtocolMap);
+            validate(endPoints, listeners, requireDistinctPorts);
+            return endPoints;
+        } catch (Exception e) {
+            throw new IllegalArgumentException(String.format("Error creating 
broker listeners from '%s': %s", listeners, e.getMessage()), e);
+        }
+    }
+
+    private static void validate(List<Endpoint> endPoints, List<String> 
listeners, boolean requireDistinctPorts) {
+        long distinctListenerNames = 
endPoints.stream().map(Endpoint::listener).distinct().count();
+        if (distinctListenerNames != endPoints.size()) {
+            throw new IllegalArgumentException("Each listener must have a 
different name, listeners: " + listeners);
+        }
+
+        if (!requireDistinctPorts) return;
+
+        endPoints.stream()
+            .filter(ep -> ep.port() != 0) // filter port 0 for unit tests
+            .collect(Collectors.groupingBy(Endpoint::port))
+            .entrySet().stream()
+            .filter(entry -> entry.getValue().size() > 1)
+            .forEach(entry -> {
+                // Iterate through every grouping of duplicates by port to see 
if they are valid
+                int port = entry.getKey();
+                List<Endpoint> eps = entry.getValue();
+                // Exception case, let's allow duplicate ports if one host is 
on IPv4 and the other one is on IPv6
+                Map<Boolean, List<Endpoint>> partitionedByValidIp = 
eps.stream()
+                        .collect(Collectors.partitioningBy(ep -> ep.host() != 
null && INET_ADDRESS_VALIDATOR.isValid(ep.host())));
+
+                List<Endpoint> duplicatesWithIpHosts = 
partitionedByValidIp.get(true);
+                List<Endpoint> duplicatesWithoutIpHosts = 
partitionedByValidIp.get(false);
+
+                checkDuplicateListenerPorts(duplicatesWithoutIpHosts, 
listeners);
+
+                if (duplicatesWithIpHosts.isEmpty()) return;
+                if (duplicatesWithIpHosts.size() == 2) {
+                    String errorMessage = "If you have two listeners on the 
same port then one needs to be IPv4 and the other IPv6, listeners: " + 
listeners + ", port: " + port;
+                    Endpoint ep1 = duplicatesWithIpHosts.get(0);
+                    Endpoint ep2 = duplicatesWithIpHosts.get(1);
+                    if (!validateOneIsIpv4AndOtherIpv6(ep1.host(), 
ep2.host())) {
+                        throw new IllegalArgumentException(errorMessage);
+                    }
+
+                    // If we reach this point it means that even though 
duplicatesWithIpHosts in isolation can be valid, if
+                    // there happens to be ANOTHER listener on this port 
without an IP host (such as a null host) then its
+                    // not valid.
+                    if (!duplicatesWithoutIpHosts.isEmpty()) {
+                        throw new IllegalArgumentException(errorMessage);
+                    }
+                    return;
+                }
+                // Having more than 2 duplicate endpoints doesn't make sense 
since we only have 2 IP stacks (one is IPv4
+                // and the other is IPv6)
+                throw new IllegalArgumentException("Each listener must have a 
different port unless exactly one listener has an IPv4 address and the other 
IPv6 address, listeners: " + listeners + ", port: " + port);
+            });
+    }
+
+    private static boolean validateOneIsIpv4AndOtherIpv6(String first, String 
second) {
+        return (INET_ADDRESS_VALIDATOR.isValidInet4Address(first) && 
INET_ADDRESS_VALIDATOR.isValidInet6Address(second)) ||
+                (INET_ADDRESS_VALIDATOR.isValidInet6Address(first) && 
INET_ADDRESS_VALIDATOR.isValidInet4Address(second));
+    }
+
+    private static void checkDuplicateListenerPorts(List<Endpoint> endpoints, 
List<String> listeners) {
+        Set<Integer> distinctPorts = 
endpoints.stream().map(Endpoint::port).collect(Collectors.toSet());
+        if (endpoints.size() != distinctPorts.size()) {
+            throw new IllegalArgumentException("Each listener must have a 
different port, listeners: " + listeners);
+        }
+    }
+
     private static SecurityProtocol securityProtocol(String protocolName, 
String configName) {
         try {
             return SecurityProtocol.forName(protocolName);

Reply via email to