showuon commented on code in PR #11478:
URL: https://github.com/apache/kafka/pull/11478#discussion_r928380426


##########
core/src/main/scala/kafka/utils/CoreUtils.scala:
##########
@@ -252,16 +255,57 @@ object CoreUtils {
     listenerListToEndPoints(listeners, securityProtocolMap, true)
   }
 
+  def validateOneIsIpv4AndOtherIpv6(first: String, second: String): Boolean =
+    (inetAddressValidator.isValidInet4Address(first) && 
inetAddressValidator.isValidInet6Address(second)) ||
+      (inetAddressValidator.isValidInet6Address(first) && 
inetAddressValidator.isValidInet4Address(second))
+
+  def checkDuplicateListenerPorts(endpoints: Seq[EndPoint], listeners: 
String): Unit = {
+    val distinctPorts = endpoints.map(_.port).distinct
+    require(distinctPorts.size == endpoints.map(_.port).size, s"Each listener 
must have a different port, listeners: $listeners")
+  }
+
   def listenerListToEndPoints(listeners: String, securityProtocolMap: 
Map[ListenerName, SecurityProtocol], requireDistinctPorts: Boolean): 
Seq[EndPoint] = {
     def validate(endPoints: Seq[EndPoint]): Unit = {
-      // filter port 0 for unit tests
-      val portsExcludingZero = endPoints.map(_.port).filter(_ != 0)
       val distinctListenerNames = endPoints.map(_.listenerName).distinct
-
       require(distinctListenerNames.size == endPoints.size, s"Each listener 
must have a different name, listeners: $listeners")
-      if (requireDistinctPorts) {
-        val distinctPorts = portsExcludingZero.distinct
-        require(distinctPorts.size == portsExcludingZero.size, s"Each listener 
must have a different port, listeners: $listeners")
+
+      val (duplicatePorts, nonDuplicatePorts) = endPoints.filter {
+        // filter port 0 for unit tests
+        ep => ep.port != 0
+      }.groupBy(_.port).partition {
+        case (_, endpoints) => endpoints.size > 1
+      }
+
+      val nonDuplicatePortsOnlyEndpoints = nonDuplicatePorts.flatMap { case 
(_, eps) => eps }.toList
+
+      if (requireDistinctPorts)
+        checkDuplicateListenerPorts(nonDuplicatePortsOnlyEndpoints, listeners)
+
+      // Exception case, let's allow duplicate ports if one host is on IPv4 
and the other one is on IPv6
+      val duplicatePortsPartitionedByValidIps = duplicatePorts.map {
+        case (port, eps) =>
+          (port, eps.partition(ep =>
+            ep.host != null && inetAddressValidator.isValid(ep.host)
+          ))
+      }
+
+      // Iterate through every grouping of duplicates by port to see if they 
are valid
+      duplicatePortsPartitionedByValidIps.foreach {
+        case (port, (duplicatesWithIpHosts, duplicatesWithoutIpHosts)) =>
+          if (requireDistinctPorts)
+            checkDuplicateListenerPorts(duplicatesWithoutIpHosts, listeners)

Review Comment:
   I'm still confused here. Why should we allow "duplicate port without IP 
host"? 
   
   Let's take some examples here:
   ```
   SSL://[::1]:9096,PLAINTEXT://127.0.0.1:9096
   ```
   Before this PR, this validation will fail. But after this PR, we want it get 
passed, which is expected.
   
   But this case (i.e. one is in IPv4, one in ipv6, one with null host):
   ```
   SSL://[::1]:9096,PLAINTEXT://127.0.0.1:9096,SASL_SSL://:9096
   ```
   Before this PR, this validation will fail. But after this PR, it'll pass the 
validation. Is that what we expected? I thought in this PR(KIP), we allow to 
use the same port only when one is in ipv4, and the other one is in ipv6. But 
this case also allow the 3rd set of same port listener. Is it correct?



##########
core/src/main/scala/kafka/utils/CoreUtils.scala:
##########
@@ -252,16 +255,57 @@ object CoreUtils {
     listenerListToEndPoints(listeners, securityProtocolMap, true)
   }
 
+  def validateOneIsIpv4AndOtherIpv6(first: String, second: String): Boolean =
+    (inetAddressValidator.isValidInet4Address(first) && 
inetAddressValidator.isValidInet6Address(second)) ||
+      (inetAddressValidator.isValidInet6Address(first) && 
inetAddressValidator.isValidInet4Address(second))
+
+  def checkDuplicateListenerPorts(endpoints: Seq[EndPoint], listeners: 
String): Unit = {
+    val distinctPorts = endpoints.map(_.port).distinct
+    require(distinctPorts.size == endpoints.map(_.port).size, s"Each listener 
must have a different port, listeners: $listeners")
+  }
+
   def listenerListToEndPoints(listeners: String, securityProtocolMap: 
Map[ListenerName, SecurityProtocol], requireDistinctPorts: Boolean): 
Seq[EndPoint] = {
     def validate(endPoints: Seq[EndPoint]): Unit = {
-      // filter port 0 for unit tests
-      val portsExcludingZero = endPoints.map(_.port).filter(_ != 0)
       val distinctListenerNames = endPoints.map(_.listenerName).distinct
-
       require(distinctListenerNames.size == endPoints.size, s"Each listener 
must have a different name, listeners: $listeners")
-      if (requireDistinctPorts) {
-        val distinctPorts = portsExcludingZero.distinct
-        require(distinctPorts.size == portsExcludingZero.size, s"Each listener 
must have a different port, listeners: $listeners")
+
+      val (duplicatePorts, nonDuplicatePorts) = endPoints.filter {
+        // filter port 0 for unit tests
+        ep => ep.port != 0
+      }.groupBy(_.port).partition {
+        case (_, endpoints) => endpoints.size > 1
+      }
+
+      val nonDuplicatePortsOnlyEndpoints = nonDuplicatePorts.flatMap { case 
(_, eps) => eps }.toList
+
+      if (requireDistinctPorts)
+        checkDuplicateListenerPorts(nonDuplicatePortsOnlyEndpoints, listeners)

Review Comment:
   This doesn't make sense. We've filtered out the `nonDuplicatePorts` group, 
and we still need to check duplicate listener ports for them again?



##########
core/src/main/scala/kafka/utils/CoreUtils.scala:
##########
@@ -252,16 +255,63 @@ object CoreUtils {
     listenerListToEndPoints(listeners, securityProtocolMap, true)
   }
 
+  def validateOneIsIpv4AndOtherIpv6(first: String, second: String): Boolean =
+    (inetAddressValidator.isValidInet4Address(first) && 
inetAddressValidator.isValidInet6Address(second)) ||
+      (inetAddressValidator.isValidInet6Address(first) && 
inetAddressValidator.isValidInet4Address(second))
+
+  def checkDuplicateListenerNames(endpoints: Seq[EndPoint], listeners: 
String): Unit = {
+    val distinctListenerNames = endpoints.map(_.listenerName).distinct
+    require(distinctListenerNames.size == endpoints.size, s"Each listener must 
have a different name unless you have exactly " +
+      s"one listener on IPv4 and the other IPv6 on the same port, listeners: 
$listeners")
+  }
+
+  def checkDuplicateListenerPorts(endpoints: Seq[EndPoint], listeners: 
String): Unit = {
+    val distinctPorts = endpoints.map(_.port).distinct
+    require(distinctPorts.size == endpoints.map(_.port).size, s"Each listener 
must have a different port, listeners: $listeners")
+  }
+
   def listenerListToEndPoints(listeners: String, securityProtocolMap: 
Map[ListenerName, SecurityProtocol], requireDistinctPorts: Boolean): 
Seq[EndPoint] = {
     def validate(endPoints: Seq[EndPoint]): Unit = {
-      // filter port 0 for unit tests
-      val portsExcludingZero = endPoints.map(_.port).filter(_ != 0)
-      val distinctListenerNames = endPoints.map(_.listenerName).distinct
-
-      require(distinctListenerNames.size == endPoints.size, s"Each listener 
must have a different name, listeners: $listeners")
-      if (requireDistinctPorts) {
-        val distinctPorts = portsExcludingZero.distinct
-        require(distinctPorts.size == portsExcludingZero.size, s"Each listener 
must have a different port, listeners: $listeners")
+      val (duplicatePorts, nonDuplicatePorts) = endPoints.filter {
+        // filter port 0 for unit tests
+        ep => ep.port != 0
+      }.groupBy(_.port).partition {
+        case (_, endpoints) => endpoints.size > 1
+      }
+
+      val nonDuplicatePortsOnlyEndpoints = nonDuplicatePorts.flatMap { case 
(_, eps) => eps }.toList
+
+      checkDuplicateListenerNames(nonDuplicatePortsOnlyEndpoints, listeners)
+      if (requireDistinctPorts)
+        checkDuplicateListenerPorts(nonDuplicatePortsOnlyEndpoints, listeners)
+
+      // Exception case, lets allow duplicate ports if the host is on IPv4 and 
the other is on IPv6
+      val duplicatePortsPartitionedByValidIps = duplicatePorts.map{
+        case (port, eps) =>
+          (port, eps.partition(ep =>
+            ep.host != null && inetAddressValidator.isValid(ep.host)

Review Comment:
   In that case, should we do this validation? 
`inetAddressValidator.isValid(ep.host)`. It looks like we don't care about if 
the host is valid or not, all we want to filter out, is the `null host` case 
(i.e. `PLAINTEXT://:9096`), is my understanding correct?



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: [email protected]

For queries about this service, please contact Infrastructure at:
[email protected]

Reply via email to