showuon commented on code in PR #11478:
URL: https://github.com/apache/kafka/pull/11478#discussion_r928891618
##########
core/src/main/scala/kafka/utils/CoreUtils.scala:
##########
@@ -252,16 +255,60 @@ object CoreUtils {
listenerListToEndPoints(listeners, securityProtocolMap, true)
}
+ def validateOneIsIpv4AndOtherIpv6(first: String, second: String): Boolean =
+ (inetAddressValidator.isValidInet4Address(first) &&
inetAddressValidator.isValidInet6Address(second)) ||
+ (inetAddressValidator.isValidInet6Address(first) &&
inetAddressValidator.isValidInet4Address(second))
+
+ def checkDuplicateListenerPorts(endpoints: Seq[EndPoint], listeners:
String): Unit = {
+ val distinctPorts = endpoints.map(_.port).distinct
+ require(distinctPorts.size == endpoints.map(_.port).size, s"Each listener
must have a different port, listeners: $listeners")
+ }
+
def listenerListToEndPoints(listeners: String, securityProtocolMap:
Map[ListenerName, SecurityProtocol], requireDistinctPorts: Boolean):
Seq[EndPoint] = {
def validate(endPoints: Seq[EndPoint]): Unit = {
- // filter port 0 for unit tests
- val portsExcludingZero = endPoints.map(_.port).filter(_ != 0)
val distinctListenerNames = endPoints.map(_.listenerName).distinct
-
require(distinctListenerNames.size == endPoints.size, s"Each listener
must have a different name, listeners: $listeners")
- if (requireDistinctPorts) {
- val distinctPorts = portsExcludingZero.distinct
- require(distinctPorts.size == portsExcludingZero.size, s"Each listener
must have a different port, listeners: $listeners")
+
+ val (duplicatePorts, _) = endPoints.filter {
+ // filter port 0 for unit tests
+ ep => ep.port != 0
+ }.groupBy(_.port).partition {
+ case (_, endpoints) => endpoints.size > 1
+ }
+
+ // Exception case, let's allow duplicate ports if one host is on IPv4
and the other one is on IPv6
+ val duplicatePortsPartitionedByValidIps = duplicatePorts.map {
+ case (port, eps) =>
+ (port, eps.partition(ep =>
+ ep.host != null && inetAddressValidator.isValid(ep.host)
+ ))
+ }
+
+ // Iterate through every grouping of duplicates by port to see if they
are valid
+ duplicatePortsPartitionedByValidIps.foreach {
+ case (port, (duplicatesWithIpHosts, duplicatesWithoutIpHosts)) =>
+ if (requireDistinctPorts)
+ checkDuplicateListenerPorts(duplicatesWithoutIpHosts, listeners)
Review Comment:
I think we should throw exception if `duplicatesWithoutIpHosts` is not empty
here, because we actually only expect
`duplicatesWithIpHosts/duplicatesWithoutIpHosts` to be: `"2 endpoints"/empty
list`, right? After all, we only allow duplicate ports under **valid** IPv4 and
IPv6, so we should be able to throw exception directly here, and remove the
check below:
```
if (duplicatesWithoutIpHosts.nonEmpty)
throw new IllegalArgumentException(errorMessage)
```
WDYT?
Something like this:
```
case (port, (duplicatesWithIpHosts, duplicatesWithoutIpHosts)) =>
// It's invalid if listener on this port without an IP host (such as a
null host)
if (duplicatesWithoutIpHosts.nonEmpty)
throw new IllegalArgumentException("...")
duplicatesWithIpHosts match {
case eps ....
case Seq(ep1, ep2) =>
if (requireDistinctPorts)
require(validateOneIsIpv4AndOtherIpv6(ep1.host, ep2.host), "If
you have two listeners on " +
s"the same port then one needs to be IPv4 and the other IPv6,
listeners: $listeners, port: $port")
case _ =>
....
```
--
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.
To unsubscribe, e-mail: [email protected]
For queries about this service, please contact Infrastructure at:
[email protected]