showuon commented on code in PR #11478:
URL: https://github.com/apache/kafka/pull/11478#discussion_r928380426
##########
core/src/main/scala/kafka/utils/CoreUtils.scala:
##########
@@ -252,16 +255,57 @@ object CoreUtils {
listenerListToEndPoints(listeners, securityProtocolMap, true)
}
+ def validateOneIsIpv4AndOtherIpv6(first: String, second: String): Boolean =
+ (inetAddressValidator.isValidInet4Address(first) &&
inetAddressValidator.isValidInet6Address(second)) ||
+ (inetAddressValidator.isValidInet6Address(first) &&
inetAddressValidator.isValidInet4Address(second))
+
+ def checkDuplicateListenerPorts(endpoints: Seq[EndPoint], listeners:
String): Unit = {
+ val distinctPorts = endpoints.map(_.port).distinct
+ require(distinctPorts.size == endpoints.map(_.port).size, s"Each listener
must have a different port, listeners: $listeners")
+ }
+
def listenerListToEndPoints(listeners: String, securityProtocolMap:
Map[ListenerName, SecurityProtocol], requireDistinctPorts: Boolean):
Seq[EndPoint] = {
def validate(endPoints: Seq[EndPoint]): Unit = {
- // filter port 0 for unit tests
- val portsExcludingZero = endPoints.map(_.port).filter(_ != 0)
val distinctListenerNames = endPoints.map(_.listenerName).distinct
-
require(distinctListenerNames.size == endPoints.size, s"Each listener
must have a different name, listeners: $listeners")
- if (requireDistinctPorts) {
- val distinctPorts = portsExcludingZero.distinct
- require(distinctPorts.size == portsExcludingZero.size, s"Each listener
must have a different port, listeners: $listeners")
+
+ val (duplicatePorts, nonDuplicatePorts) = endPoints.filter {
+ // filter port 0 for unit tests
+ ep => ep.port != 0
+ }.groupBy(_.port).partition {
+ case (_, endpoints) => endpoints.size > 1
+ }
+
+ val nonDuplicatePortsOnlyEndpoints = nonDuplicatePorts.flatMap { case
(_, eps) => eps }.toList
+
+ if (requireDistinctPorts)
+ checkDuplicateListenerPorts(nonDuplicatePortsOnlyEndpoints, listeners)
+
+ // Exception case, let's allow duplicate ports if one host is on IPv4
and the other one is on IPv6
+ val duplicatePortsPartitionedByValidIps = duplicatePorts.map {
+ case (port, eps) =>
+ (port, eps.partition(ep =>
+ ep.host != null && inetAddressValidator.isValid(ep.host)
+ ))
+ }
+
+ // Iterate through every grouping of duplicates by port to see if they
are valid
+ duplicatePortsPartitionedByValidIps.foreach {
+ case (port, (duplicatesWithIpHosts, duplicatesWithoutIpHosts)) =>
+ if (requireDistinctPorts)
+ checkDuplicateListenerPorts(duplicatesWithoutIpHosts, listeners)
Review Comment:
I'm still confused here. Why should we allow "duplicate port without IP
host"?
Let's take some examples here:
```
SSL://[::1]:9096,PLAINTEXT://127.0.0.1:9096
```
Before this PR, this validation will fail. But after this PR, we want it get
passed, which is expected.
But this case (i.e. one is in IPv4, one in ipv6, one with null host):
```
SSL://[::1]:9096,PLAINTEXT://127.0.0.1:9096,SASL_SSL://:9096
```
Before this PR, this validation will fail. But after this PR, it'll pass the
validation. Is that what we expected? I thought in this PR(KIP), we allow to
use the same port only when one is in ipv4, and the other one is in ipv6. But
this case also allow the 3rd set of same port listener. Is it correct?
##########
core/src/main/scala/kafka/utils/CoreUtils.scala:
##########
@@ -252,16 +255,57 @@ object CoreUtils {
listenerListToEndPoints(listeners, securityProtocolMap, true)
}
+ def validateOneIsIpv4AndOtherIpv6(first: String, second: String): Boolean =
+ (inetAddressValidator.isValidInet4Address(first) &&
inetAddressValidator.isValidInet6Address(second)) ||
+ (inetAddressValidator.isValidInet6Address(first) &&
inetAddressValidator.isValidInet4Address(second))
+
+ def checkDuplicateListenerPorts(endpoints: Seq[EndPoint], listeners:
String): Unit = {
+ val distinctPorts = endpoints.map(_.port).distinct
+ require(distinctPorts.size == endpoints.map(_.port).size, s"Each listener
must have a different port, listeners: $listeners")
+ }
+
def listenerListToEndPoints(listeners: String, securityProtocolMap:
Map[ListenerName, SecurityProtocol], requireDistinctPorts: Boolean):
Seq[EndPoint] = {
def validate(endPoints: Seq[EndPoint]): Unit = {
- // filter port 0 for unit tests
- val portsExcludingZero = endPoints.map(_.port).filter(_ != 0)
val distinctListenerNames = endPoints.map(_.listenerName).distinct
-
require(distinctListenerNames.size == endPoints.size, s"Each listener
must have a different name, listeners: $listeners")
- if (requireDistinctPorts) {
- val distinctPorts = portsExcludingZero.distinct
- require(distinctPorts.size == portsExcludingZero.size, s"Each listener
must have a different port, listeners: $listeners")
+
+ val (duplicatePorts, nonDuplicatePorts) = endPoints.filter {
+ // filter port 0 for unit tests
+ ep => ep.port != 0
+ }.groupBy(_.port).partition {
+ case (_, endpoints) => endpoints.size > 1
+ }
+
+ val nonDuplicatePortsOnlyEndpoints = nonDuplicatePorts.flatMap { case
(_, eps) => eps }.toList
+
+ if (requireDistinctPorts)
+ checkDuplicateListenerPorts(nonDuplicatePortsOnlyEndpoints, listeners)
Review Comment:
This doesn't make sense. We've filtered out the `nonDuplicatePorts` group,
and we still need to check duplicate listener ports for them again?
##########
core/src/main/scala/kafka/utils/CoreUtils.scala:
##########
@@ -252,16 +255,63 @@ object CoreUtils {
listenerListToEndPoints(listeners, securityProtocolMap, true)
}
+ def validateOneIsIpv4AndOtherIpv6(first: String, second: String): Boolean =
+ (inetAddressValidator.isValidInet4Address(first) &&
inetAddressValidator.isValidInet6Address(second)) ||
+ (inetAddressValidator.isValidInet6Address(first) &&
inetAddressValidator.isValidInet4Address(second))
+
+ def checkDuplicateListenerNames(endpoints: Seq[EndPoint], listeners:
String): Unit = {
+ val distinctListenerNames = endpoints.map(_.listenerName).distinct
+ require(distinctListenerNames.size == endpoints.size, s"Each listener must
have a different name unless you have exactly " +
+ s"one listener on IPv4 and the other IPv6 on the same port, listeners:
$listeners")
+ }
+
+ def checkDuplicateListenerPorts(endpoints: Seq[EndPoint], listeners:
String): Unit = {
+ val distinctPorts = endpoints.map(_.port).distinct
+ require(distinctPorts.size == endpoints.map(_.port).size, s"Each listener
must have a different port, listeners: $listeners")
+ }
+
def listenerListToEndPoints(listeners: String, securityProtocolMap:
Map[ListenerName, SecurityProtocol], requireDistinctPorts: Boolean):
Seq[EndPoint] = {
def validate(endPoints: Seq[EndPoint]): Unit = {
- // filter port 0 for unit tests
- val portsExcludingZero = endPoints.map(_.port).filter(_ != 0)
- val distinctListenerNames = endPoints.map(_.listenerName).distinct
-
- require(distinctListenerNames.size == endPoints.size, s"Each listener
must have a different name, listeners: $listeners")
- if (requireDistinctPorts) {
- val distinctPorts = portsExcludingZero.distinct
- require(distinctPorts.size == portsExcludingZero.size, s"Each listener
must have a different port, listeners: $listeners")
+ val (duplicatePorts, nonDuplicatePorts) = endPoints.filter {
+ // filter port 0 for unit tests
+ ep => ep.port != 0
+ }.groupBy(_.port).partition {
+ case (_, endpoints) => endpoints.size > 1
+ }
+
+ val nonDuplicatePortsOnlyEndpoints = nonDuplicatePorts.flatMap { case
(_, eps) => eps }.toList
+
+ checkDuplicateListenerNames(nonDuplicatePortsOnlyEndpoints, listeners)
+ if (requireDistinctPorts)
+ checkDuplicateListenerPorts(nonDuplicatePortsOnlyEndpoints, listeners)
+
+ // Exception case, lets allow duplicate ports if the host is on IPv4 and
the other is on IPv6
+ val duplicatePortsPartitionedByValidIps = duplicatePorts.map{
+ case (port, eps) =>
+ (port, eps.partition(ep =>
+ ep.host != null && inetAddressValidator.isValid(ep.host)
Review Comment:
In that case, should we do this validation?
`inetAddressValidator.isValid(ep.host)`. It looks like we don't care about if
the host is valid or not, all we want to filter out, is the `null host` case
(i.e. `PLAINTEXT://:9096`), is my understanding correct?
--
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.
To unsubscribe, e-mail: [email protected]
For queries about this service, please contact Infrastructure at:
[email protected]