showuon commented on code in PR #11478:
URL: https://github.com/apache/kafka/pull/11478#discussion_r929446869


##########
core/src/main/scala/kafka/utils/CoreUtils.scala:
##########
@@ -252,16 +255,62 @@ object CoreUtils {
     listenerListToEndPoints(listeners, securityProtocolMap, true)
   }
 
+  def validateOneIsIpv4AndOtherIpv6(first: String, second: String): Boolean =

Review Comment:
   nit: since methods exposed in `CoreUtils` are expected to be used by other 
instances, I don't think we expect the `validateOneIsIpv4AndOtherIpv6` or 
`checkDuplicateListenerPorts` to be used by other cases. Maybe we can move them 
into `listenerListToEndPoints` like what `validate` method do.



##########
core/src/main/scala/kafka/utils/CoreUtils.scala:
##########
@@ -252,16 +255,60 @@ object CoreUtils {
     listenerListToEndPoints(listeners, securityProtocolMap, true)
   }
 
+  def validateOneIsIpv4AndOtherIpv6(first: String, second: String): Boolean =
+    (inetAddressValidator.isValidInet4Address(first) && 
inetAddressValidator.isValidInet6Address(second)) ||
+      (inetAddressValidator.isValidInet6Address(first) && 
inetAddressValidator.isValidInet4Address(second))
+
+  def checkDuplicateListenerPorts(endpoints: Seq[EndPoint], listeners: 
String): Unit = {
+    val distinctPorts = endpoints.map(_.port).distinct
+    require(distinctPorts.size == endpoints.map(_.port).size, s"Each listener 
must have a different port, listeners: $listeners")
+  }
+
   def listenerListToEndPoints(listeners: String, securityProtocolMap: 
Map[ListenerName, SecurityProtocol], requireDistinctPorts: Boolean): 
Seq[EndPoint] = {
     def validate(endPoints: Seq[EndPoint]): Unit = {
-      // filter port 0 for unit tests
-      val portsExcludingZero = endPoints.map(_.port).filter(_ != 0)
       val distinctListenerNames = endPoints.map(_.listenerName).distinct
-
       require(distinctListenerNames.size == endPoints.size, s"Each listener 
must have a different name, listeners: $listeners")
-      if (requireDistinctPorts) {
-        val distinctPorts = portsExcludingZero.distinct
-        require(distinctPorts.size == portsExcludingZero.size, s"Each listener 
must have a different port, listeners: $listeners")
+
+      val (duplicatePorts, _) = endPoints.filter {
+        // filter port 0 for unit tests
+        ep => ep.port != 0
+      }.groupBy(_.port).partition {
+        case (_, endpoints) => endpoints.size > 1
+      }
+
+      // Exception case, let's allow duplicate ports if one host is on IPv4 
and the other one is on IPv6
+      val duplicatePortsPartitionedByValidIps = duplicatePorts.map {
+        case (port, eps) =>
+          (port, eps.partition(ep =>
+            ep.host != null && inetAddressValidator.isValid(ep.host)
+          ))
+      }
+
+      // Iterate through every grouping of duplicates by port to see if they 
are valid
+      duplicatePortsPartitionedByValidIps.foreach {
+        case (port, (duplicatesWithIpHosts, duplicatesWithoutIpHosts)) =>
+          if (requireDistinctPorts)
+            checkDuplicateListenerPorts(duplicatesWithoutIpHosts, listeners)

Review Comment:
   I checked, the `apache.org` will be seen as `duplicatesWithoutIpHosts`, and 
it'll enter here:
   ```
   if (duplicatesWithoutIpHosts.nonEmpty)
         throw new IllegalArgumentException("...")
   ```
   which should also throw exception, and pass the test, unless the error 
message is not expected. But, if you think my above suggestion doesn't look 
better, you can keep current implementation. Just want to let you know I think 
there's a better and more clear way to implement the logic. Thanks.



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: [email protected]

For queries about this service, please contact Infrastructure at:
[email protected]

Reply via email to