showuon commented on code in PR #11478:
URL: https://github.com/apache/kafka/pull/11478#discussion_r928891618


##########
core/src/main/scala/kafka/utils/CoreUtils.scala:
##########
@@ -252,16 +255,60 @@ object CoreUtils {
     listenerListToEndPoints(listeners, securityProtocolMap, true)
   }
 
+  def validateOneIsIpv4AndOtherIpv6(first: String, second: String): Boolean =
+    (inetAddressValidator.isValidInet4Address(first) && 
inetAddressValidator.isValidInet6Address(second)) ||
+      (inetAddressValidator.isValidInet6Address(first) && 
inetAddressValidator.isValidInet4Address(second))
+
+  def checkDuplicateListenerPorts(endpoints: Seq[EndPoint], listeners: 
String): Unit = {
+    val distinctPorts = endpoints.map(_.port).distinct
+    require(distinctPorts.size == endpoints.map(_.port).size, s"Each listener 
must have a different port, listeners: $listeners")
+  }
+
   def listenerListToEndPoints(listeners: String, securityProtocolMap: 
Map[ListenerName, SecurityProtocol], requireDistinctPorts: Boolean): 
Seq[EndPoint] = {
     def validate(endPoints: Seq[EndPoint]): Unit = {
-      // filter port 0 for unit tests
-      val portsExcludingZero = endPoints.map(_.port).filter(_ != 0)
       val distinctListenerNames = endPoints.map(_.listenerName).distinct
-
       require(distinctListenerNames.size == endPoints.size, s"Each listener 
must have a different name, listeners: $listeners")
-      if (requireDistinctPorts) {
-        val distinctPorts = portsExcludingZero.distinct
-        require(distinctPorts.size == portsExcludingZero.size, s"Each listener 
must have a different port, listeners: $listeners")
+
+      val (duplicatePorts, _) = endPoints.filter {
+        // filter port 0 for unit tests
+        ep => ep.port != 0
+      }.groupBy(_.port).partition {
+        case (_, endpoints) => endpoints.size > 1
+      }
+
+      // Exception case, let's allow duplicate ports if one host is on IPv4 
and the other one is on IPv6
+      val duplicatePortsPartitionedByValidIps = duplicatePorts.map {
+        case (port, eps) =>
+          (port, eps.partition(ep =>
+            ep.host != null && inetAddressValidator.isValid(ep.host)
+          ))
+      }
+
+      // Iterate through every grouping of duplicates by port to see if they 
are valid
+      duplicatePortsPartitionedByValidIps.foreach {
+        case (port, (duplicatesWithIpHosts, duplicatesWithoutIpHosts)) =>
+          if (requireDistinctPorts)
+            checkDuplicateListenerPorts(duplicatesWithoutIpHosts, listeners)

Review Comment:
   I think we should throw exception if `duplicatesWithoutIpHosts` is not empty 
here, because we actually only expect 
`duplicatesWithIpHosts/duplicatesWithoutIpHosts` to be: `"2 endpoints"/empty 
list`, right? After all, we only allow duplicate ports under **valid** IPv4 and 
IPv6, so we should be able to throw exception directly here, and remove the 
check below:
   ```
   if (duplicatesWithoutIpHosts.nonEmpty)
         throw new IllegalArgumentException(errorMessage)
   ```
   WDYT?
   
   Something like this:
   ```
   case (port, (duplicatesWithIpHosts, duplicatesWithoutIpHosts)) =>
     // It's invalid if listener on this port without an IP host (such as a 
null host) 
     if (duplicatesWithoutIpHosts.nonEmpty)
         throw new IllegalArgumentException("...")
   
     duplicatesWithIpHosts match {
       case eps ....
       if (requireDistinctPorts)
                   require(validateOneIsIpv4AndOtherIpv6(ep1.host, ep2.host), 
"If you have two listeners on " +
                     s"the same port then one needs to be IPv4 and the other 
IPv6, listeners: $listeners, port: $port")
   ```



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: [email protected]

For queries about this service, please contact Infrastructure at:
[email protected]

Reply via email to