mdedetrich commented on code in PR #11478:
URL: https://github.com/apache/kafka/pull/11478#discussion_r927539567


##########
core/src/main/scala/kafka/utils/CoreUtils.scala:
##########
@@ -252,16 +255,63 @@ object CoreUtils {
     listenerListToEndPoints(listeners, securityProtocolMap, true)
   }
 
+  def validateOneIsIpv4AndOtherIpv6(first: String, second: String): Boolean =
+    (inetAddressValidator.isValidInet4Address(first) && 
inetAddressValidator.isValidInet6Address(second)) ||
+      (inetAddressValidator.isValidInet6Address(first) && 
inetAddressValidator.isValidInet4Address(second))
+
+  def checkDuplicateListenerNames(endpoints: Seq[EndPoint], listeners: 
String): Unit = {
+    val distinctListenerNames = endpoints.map(_.listenerName).distinct
+    require(distinctListenerNames.size == endpoints.size, s"Each listener must 
have a different name unless you have exactly " +
+      s"one listener on IPv4 and the other IPv6 on the same port, listeners: 
$listeners")
+  }
+
+  def checkDuplicateListenerPorts(endpoints: Seq[EndPoint], listeners: 
String): Unit = {
+    val distinctPorts = endpoints.map(_.port).distinct
+    require(distinctPorts.size == endpoints.map(_.port).size, s"Each listener 
must have a different port, listeners: $listeners")
+  }
+
   def listenerListToEndPoints(listeners: String, securityProtocolMap: 
Map[ListenerName, SecurityProtocol], requireDistinctPorts: Boolean): 
Seq[EndPoint] = {
     def validate(endPoints: Seq[EndPoint]): Unit = {
-      // filter port 0 for unit tests
-      val portsExcludingZero = endPoints.map(_.port).filter(_ != 0)
-      val distinctListenerNames = endPoints.map(_.listenerName).distinct
-
-      require(distinctListenerNames.size == endPoints.size, s"Each listener 
must have a different name, listeners: $listeners")
-      if (requireDistinctPorts) {
-        val distinctPorts = portsExcludingZero.distinct
-        require(distinctPorts.size == portsExcludingZero.size, s"Each listener 
must have a different port, listeners: $listeners")
+      val (duplicatePorts, nonDuplicatePorts) = endPoints.filter {
+        // filter port 0 for unit tests
+        ep => ep.port != 0
+      }.groupBy(_.port).partition {
+        case (_, endpoints) => endpoints.size > 1
+      }
+
+      val nonDuplicatePortsOnlyEndpoints = nonDuplicatePorts.flatMap { case 
(_, eps) => eps }.toList
+
+      checkDuplicateListenerNames(nonDuplicatePortsOnlyEndpoints, listeners)
+      if (requireDistinctPorts)
+        checkDuplicateListenerPorts(nonDuplicatePortsOnlyEndpoints, listeners)
+
+      // Exception case, lets allow duplicate ports if the host is on IPv4 and 
the other is on IPv6
+      val duplicatePortsPartitionedByValidIps = duplicatePorts.map{
+        case (port, eps) =>
+          (port, eps.partition(ep =>
+            ep.host != null && inetAddressValidator.isValid(ep.host)

Review Comment:
   > Currently, we can allow invalid host pass validate method, since we only 
check duplicate listenerNames and ports below. Is that correct?
   
   Yes I believe this is correct and irrespective of this PR I believe its also 
the current behavior, i.e. there is no host checking to see if its valid.
   
   If this is the case my thoughts here are the same as 
https://github.com/apache/kafka/pull/11478#discussion_r923514829, i.e. since 
its orthogonal to the current change it should be done in a separate PR if 
necessary (it can also be that there is some other check done later on in 
another section of the code to see if hostnames are valid)



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: jira-unsubscr...@kafka.apache.org

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org

Reply via email to