showuon commented on code in PR #11478:
URL: https://github.com/apache/kafka/pull/11478#discussion_r927372305
##########
core/src/main/scala/kafka/utils/CoreUtils.scala:
##########
@@ -252,16 +255,63 @@ object CoreUtils {
listenerListToEndPoints(listeners, securityProtocolMap, true)
}
+ def validateOneIsIpv4AndOtherIpv6(first: String, second: String): Boolean =
+ (inetAddressValidator.isValidInet4Address(first) &&
inetAddressValidator.isValidInet6Address(second)) ||
+ (inetAddressValidator.isValidInet6Address(first) &&
inetAddressValidator.isValidInet4Address(second))
+
+ def checkDuplicateListenerNames(endpoints: Seq[EndPoint], listeners:
String): Unit = {
+ val distinctListenerNames = endpoints.map(_.listenerName).distinct
+ require(distinctListenerNames.size == endpoints.size, s"Each listener must
have a different name unless you have exactly " +
+ s"one listener on IPv4 and the other IPv6 on the same port, listeners:
$listeners")
+ }
+
+ def checkDuplicateListenerPorts(endpoints: Seq[EndPoint], listeners:
String): Unit = {
+ val distinctPorts = endpoints.map(_.port).distinct
+ require(distinctPorts.size == endpoints.map(_.port).size, s"Each listener
must have a different port, listeners: $listeners")
+ }
+
def listenerListToEndPoints(listeners: String, securityProtocolMap:
Map[ListenerName, SecurityProtocol], requireDistinctPorts: Boolean):
Seq[EndPoint] = {
def validate(endPoints: Seq[EndPoint]): Unit = {
- // filter port 0 for unit tests
- val portsExcludingZero = endPoints.map(_.port).filter(_ != 0)
- val distinctListenerNames = endPoints.map(_.listenerName).distinct
-
- require(distinctListenerNames.size == endPoints.size, s"Each listener
must have a different name, listeners: $listeners")
- if (requireDistinctPorts) {
- val distinctPorts = portsExcludingZero.distinct
- require(distinctPorts.size == portsExcludingZero.size, s"Each listener
must have a different port, listeners: $listeners")
+ val (duplicatePorts, nonDuplicatePorts) = endPoints.filter {
+ // filter port 0 for unit tests
+ ep => ep.port != 0
+ }.groupBy(_.port).partition {
+ case (_, endpoints) => endpoints.size > 1
+ }
+
+ val nonDuplicatePortsOnlyEndpoints = nonDuplicatePorts.flatMap { case
(_, eps) => eps }.toList
+
+ checkDuplicateListenerNames(nonDuplicatePortsOnlyEndpoints, listeners)
+ if (requireDistinctPorts)
+ checkDuplicateListenerPorts(nonDuplicatePortsOnlyEndpoints, listeners)
+
+ // Exception case, lets allow duplicate ports if the host is on IPv4 and
the other is on IPv6
+ val duplicatePortsPartitionedByValidIps = duplicatePorts.map{
+ case (port, eps) =>
+ (port, eps.partition(ep =>
+ ep.host != null && inetAddressValidator.isValid(ep.host)
+ ))
+ }
+
+ // Iterate through every grouping of duplicates by port to see if they
are valid
+ duplicatePortsPartitionedByValidIps.foreach{
Review Comment:
nit: space after `foreach`
##########
core/src/main/scala/kafka/utils/CoreUtils.scala:
##########
@@ -252,16 +255,63 @@ object CoreUtils {
listenerListToEndPoints(listeners, securityProtocolMap, true)
}
+ def validateOneIsIpv4AndOtherIpv6(first: String, second: String): Boolean =
+ (inetAddressValidator.isValidInet4Address(first) &&
inetAddressValidator.isValidInet6Address(second)) ||
+ (inetAddressValidator.isValidInet6Address(first) &&
inetAddressValidator.isValidInet4Address(second))
+
+ def checkDuplicateListenerNames(endpoints: Seq[EndPoint], listeners:
String): Unit = {
+ val distinctListenerNames = endpoints.map(_.listenerName).distinct
+ require(distinctListenerNames.size == endpoints.size, s"Each listener must
have a different name unless you have exactly " +
+ s"one listener on IPv4 and the other IPv6 on the same port, listeners:
$listeners")
+ }
+
+ def checkDuplicateListenerPorts(endpoints: Seq[EndPoint], listeners:
String): Unit = {
+ val distinctPorts = endpoints.map(_.port).distinct
+ require(distinctPorts.size == endpoints.map(_.port).size, s"Each listener
must have a different port, listeners: $listeners")
+ }
+
def listenerListToEndPoints(listeners: String, securityProtocolMap:
Map[ListenerName, SecurityProtocol], requireDistinctPorts: Boolean):
Seq[EndPoint] = {
def validate(endPoints: Seq[EndPoint]): Unit = {
- // filter port 0 for unit tests
- val portsExcludingZero = endPoints.map(_.port).filter(_ != 0)
- val distinctListenerNames = endPoints.map(_.listenerName).distinct
-
- require(distinctListenerNames.size == endPoints.size, s"Each listener
must have a different name, listeners: $listeners")
- if (requireDistinctPorts) {
- val distinctPorts = portsExcludingZero.distinct
- require(distinctPorts.size == portsExcludingZero.size, s"Each listener
must have a different port, listeners: $listeners")
+ val (duplicatePorts, nonDuplicatePorts) = endPoints.filter {
+ // filter port 0 for unit tests
+ ep => ep.port != 0
+ }.groupBy(_.port).partition {
+ case (_, endpoints) => endpoints.size > 1
+ }
+
+ val nonDuplicatePortsOnlyEndpoints = nonDuplicatePorts.flatMap { case
(_, eps) => eps }.toList
+
+ checkDuplicateListenerNames(nonDuplicatePortsOnlyEndpoints, listeners)
+ if (requireDistinctPorts)
+ checkDuplicateListenerPorts(nonDuplicatePortsOnlyEndpoints, listeners)
+
+ // Exception case, lets allow duplicate ports if the host is on IPv4 and
the other is on IPv6
+ val duplicatePortsPartitionedByValidIps = duplicatePorts.map{
Review Comment:
nit: need a space after `map`
##########
core/src/main/scala/kafka/utils/CoreUtils.scala:
##########
@@ -252,16 +255,63 @@ object CoreUtils {
listenerListToEndPoints(listeners, securityProtocolMap, true)
}
+ def validateOneIsIpv4AndOtherIpv6(first: String, second: String): Boolean =
+ (inetAddressValidator.isValidInet4Address(first) &&
inetAddressValidator.isValidInet6Address(second)) ||
+ (inetAddressValidator.isValidInet6Address(first) &&
inetAddressValidator.isValidInet4Address(second))
+
+ def checkDuplicateListenerNames(endpoints: Seq[EndPoint], listeners:
String): Unit = {
+ val distinctListenerNames = endpoints.map(_.listenerName).distinct
+ require(distinctListenerNames.size == endpoints.size, s"Each listener must
have a different name unless you have exactly " +
+ s"one listener on IPv4 and the other IPv6 on the same port, listeners:
$listeners")
+ }
+
+ def checkDuplicateListenerPorts(endpoints: Seq[EndPoint], listeners:
String): Unit = {
+ val distinctPorts = endpoints.map(_.port).distinct
+ require(distinctPorts.size == endpoints.map(_.port).size, s"Each listener
must have a different port, listeners: $listeners")
+ }
+
def listenerListToEndPoints(listeners: String, securityProtocolMap:
Map[ListenerName, SecurityProtocol], requireDistinctPorts: Boolean):
Seq[EndPoint] = {
def validate(endPoints: Seq[EndPoint]): Unit = {
- // filter port 0 for unit tests
- val portsExcludingZero = endPoints.map(_.port).filter(_ != 0)
- val distinctListenerNames = endPoints.map(_.listenerName).distinct
-
- require(distinctListenerNames.size == endPoints.size, s"Each listener
must have a different name, listeners: $listeners")
Review Comment:
Before this PR, we'll validate if **all endpoint listenerName** are
distinct. But after this PR, we only validate **partial endpoint listenerName**
are distinct. It will cause some error cannot be caught. Ex:
```
props.put(KafkaConfig.ListenersProp,
"SSL://[::1]:9096,PLAINTEXT://127.0.0.1:9096,SSL://127.0.0.1:9097")
```
This props will fail before this PR, because there are 2 `SSL` listener
names. But in this PR, we'll group the first two endpoints first and validate
them. Later, validate the last one. And it'll pass validation, which is
unexpected. Please fix it and add a test for it.
##########
core/src/main/scala/kafka/utils/CoreUtils.scala:
##########
@@ -252,16 +255,63 @@ object CoreUtils {
listenerListToEndPoints(listeners, securityProtocolMap, true)
}
+ def validateOneIsIpv4AndOtherIpv6(first: String, second: String): Boolean =
+ (inetAddressValidator.isValidInet4Address(first) &&
inetAddressValidator.isValidInet6Address(second)) ||
+ (inetAddressValidator.isValidInet6Address(first) &&
inetAddressValidator.isValidInet4Address(second))
+
+ def checkDuplicateListenerNames(endpoints: Seq[EndPoint], listeners:
String): Unit = {
+ val distinctListenerNames = endpoints.map(_.listenerName).distinct
+ require(distinctListenerNames.size == endpoints.size, s"Each listener must
have a different name unless you have exactly " +
+ s"one listener on IPv4 and the other IPv6 on the same port, listeners:
$listeners")
+ }
+
+ def checkDuplicateListenerPorts(endpoints: Seq[EndPoint], listeners:
String): Unit = {
+ val distinctPorts = endpoints.map(_.port).distinct
+ require(distinctPorts.size == endpoints.map(_.port).size, s"Each listener
must have a different port, listeners: $listeners")
+ }
+
def listenerListToEndPoints(listeners: String, securityProtocolMap:
Map[ListenerName, SecurityProtocol], requireDistinctPorts: Boolean):
Seq[EndPoint] = {
def validate(endPoints: Seq[EndPoint]): Unit = {
- // filter port 0 for unit tests
- val portsExcludingZero = endPoints.map(_.port).filter(_ != 0)
- val distinctListenerNames = endPoints.map(_.listenerName).distinct
-
- require(distinctListenerNames.size == endPoints.size, s"Each listener
must have a different name, listeners: $listeners")
- if (requireDistinctPorts) {
- val distinctPorts = portsExcludingZero.distinct
- require(distinctPorts.size == portsExcludingZero.size, s"Each listener
must have a different port, listeners: $listeners")
+ val (duplicatePorts, nonDuplicatePorts) = endPoints.filter {
+ // filter port 0 for unit tests
+ ep => ep.port != 0
+ }.groupBy(_.port).partition {
+ case (_, endpoints) => endpoints.size > 1
+ }
+
+ val nonDuplicatePortsOnlyEndpoints = nonDuplicatePorts.flatMap { case
(_, eps) => eps }.toList
+
+ checkDuplicateListenerNames(nonDuplicatePortsOnlyEndpoints, listeners)
+ if (requireDistinctPorts)
+ checkDuplicateListenerPorts(nonDuplicatePortsOnlyEndpoints, listeners)
+
+ // Exception case, lets allow duplicate ports if the host is on IPv4 and
the other is on IPv6
+ val duplicatePortsPartitionedByValidIps = duplicatePorts.map{
+ case (port, eps) =>
+ (port, eps.partition(ep =>
+ ep.host != null && inetAddressValidator.isValid(ep.host)
+ ))
+ }
+
+ // Iterate through every grouping of duplicates by port to see if they
are valid
+ duplicatePortsPartitionedByValidIps.foreach{
+ case (port, (duplicatesWithIpHosts, duplicatesWithoutIpHosts)) =>
+ checkDuplicateListenerNames(duplicatesWithoutIpHosts, listeners)
Review Comment:
We validate `duplicatesWithIpHosts` and `duplicatesWithoutIpHosts` here, so
it will allow props like this:
```
props.put(KafkaConfig.ListenersProp,
"SSL://[::1]:9096,PLAINTEXT://127.0.0.1:9096,CONTROLLER://:9096")
```
It that expected?
##########
core/src/main/scala/kafka/utils/CoreUtils.scala:
##########
@@ -252,16 +255,63 @@ object CoreUtils {
listenerListToEndPoints(listeners, securityProtocolMap, true)
}
+ def validateOneIsIpv4AndOtherIpv6(first: String, second: String): Boolean =
+ (inetAddressValidator.isValidInet4Address(first) &&
inetAddressValidator.isValidInet6Address(second)) ||
+ (inetAddressValidator.isValidInet6Address(first) &&
inetAddressValidator.isValidInet4Address(second))
+
+ def checkDuplicateListenerNames(endpoints: Seq[EndPoint], listeners:
String): Unit = {
+ val distinctListenerNames = endpoints.map(_.listenerName).distinct
+ require(distinctListenerNames.size == endpoints.size, s"Each listener must
have a different name unless you have exactly " +
+ s"one listener on IPv4 and the other IPv6 on the same port, listeners:
$listeners")
+ }
+
+ def checkDuplicateListenerPorts(endpoints: Seq[EndPoint], listeners:
String): Unit = {
+ val distinctPorts = endpoints.map(_.port).distinct
+ require(distinctPorts.size == endpoints.map(_.port).size, s"Each listener
must have a different port, listeners: $listeners")
+ }
+
def listenerListToEndPoints(listeners: String, securityProtocolMap:
Map[ListenerName, SecurityProtocol], requireDistinctPorts: Boolean):
Seq[EndPoint] = {
def validate(endPoints: Seq[EndPoint]): Unit = {
- // filter port 0 for unit tests
- val portsExcludingZero = endPoints.map(_.port).filter(_ != 0)
- val distinctListenerNames = endPoints.map(_.listenerName).distinct
-
- require(distinctListenerNames.size == endPoints.size, s"Each listener
must have a different name, listeners: $listeners")
- if (requireDistinctPorts) {
- val distinctPorts = portsExcludingZero.distinct
- require(distinctPorts.size == portsExcludingZero.size, s"Each listener
must have a different port, listeners: $listeners")
+ val (duplicatePorts, nonDuplicatePorts) = endPoints.filter {
+ // filter port 0 for unit tests
+ ep => ep.port != 0
+ }.groupBy(_.port).partition {
+ case (_, endpoints) => endpoints.size > 1
+ }
+
+ val nonDuplicatePortsOnlyEndpoints = nonDuplicatePorts.flatMap { case
(_, eps) => eps }.toList
+
+ checkDuplicateListenerNames(nonDuplicatePortsOnlyEndpoints, listeners)
+ if (requireDistinctPorts)
+ checkDuplicateListenerPorts(nonDuplicatePortsOnlyEndpoints, listeners)
+
+ // Exception case, lets allow duplicate ports if the host is on IPv4 and
the other is on IPv6
+ val duplicatePortsPartitionedByValidIps = duplicatePorts.map{
+ case (port, eps) =>
+ (port, eps.partition(ep =>
+ ep.host != null && inetAddressValidator.isValid(ep.host)
+ ))
+ }
+
+ // Iterate through every grouping of duplicates by port to see if they
are valid
+ duplicatePortsPartitionedByValidIps.foreach{
+ case (port, (duplicatesWithIpHosts, duplicatesWithoutIpHosts)) =>
+ checkDuplicateListenerNames(duplicatesWithoutIpHosts, listeners)
+ if (requireDistinctPorts)
+ checkDuplicateListenerPorts(duplicatesWithoutIpHosts, listeners)
+
+ duplicatesWithIpHosts match {
+ case s if s.isEmpty =>
+ case Seq(one, two) =>
+ if (requireDistinctPorts)
+ require(validateOneIsIpv4AndOtherIpv6(one.host, two.host), "If
you have two listeners on " +
+ s"the same port then one needs to be IPv4 and the other
IPv6, listeners: $listeners, port: $port")
+ case other =>
+ checkDuplicateListenerNames(other, listeners)
Review Comment:
We can move this listenerNames check below `if (requireDistinctPorts)`
condition, so that we can throw exception directly if `requireDistinctPorts`
true, right?
##########
core/src/main/scala/kafka/server/KafkaConfig.scala:
##########
@@ -741,12 +741,15 @@ object KafkaConfig {
/** ********* Socket Server Configuration ***********/
val ListenersDoc = "Listener List - Comma-separated list of URIs we will
listen on and the listener names." +
s" If the listener name is not a security protocol,
<code>$ListenerSecurityProtocolMapProp</code> must also be set.\n" +
- " Listener names and port numbers must be unique.\n" +
+ " Listener names and port numbers must be unique unless \n" +
+ " one listener is an IPv4 address and the other listener is \n" +
+ " an IPv6 address (for the same port).\n" +
Review Comment:
nit: I think "for the same port" can be removed since that is already
mentioned in the beginning. WDYT?
##########
core/src/main/scala/kafka/utils/CoreUtils.scala:
##########
@@ -252,16 +255,63 @@ object CoreUtils {
listenerListToEndPoints(listeners, securityProtocolMap, true)
}
+ def validateOneIsIpv4AndOtherIpv6(first: String, second: String): Boolean =
+ (inetAddressValidator.isValidInet4Address(first) &&
inetAddressValidator.isValidInet6Address(second)) ||
+ (inetAddressValidator.isValidInet6Address(first) &&
inetAddressValidator.isValidInet4Address(second))
+
+ def checkDuplicateListenerNames(endpoints: Seq[EndPoint], listeners:
String): Unit = {
+ val distinctListenerNames = endpoints.map(_.listenerName).distinct
+ require(distinctListenerNames.size == endpoints.size, s"Each listener must
have a different name unless you have exactly " +
+ s"one listener on IPv4 and the other IPv6 on the same port, listeners:
$listeners")
+ }
+
+ def checkDuplicateListenerPorts(endpoints: Seq[EndPoint], listeners:
String): Unit = {
+ val distinctPorts = endpoints.map(_.port).distinct
+ require(distinctPorts.size == endpoints.map(_.port).size, s"Each listener
must have a different port, listeners: $listeners")
+ }
+
def listenerListToEndPoints(listeners: String, securityProtocolMap:
Map[ListenerName, SecurityProtocol], requireDistinctPorts: Boolean):
Seq[EndPoint] = {
def validate(endPoints: Seq[EndPoint]): Unit = {
- // filter port 0 for unit tests
- val portsExcludingZero = endPoints.map(_.port).filter(_ != 0)
- val distinctListenerNames = endPoints.map(_.listenerName).distinct
-
- require(distinctListenerNames.size == endPoints.size, s"Each listener
must have a different name, listeners: $listeners")
- if (requireDistinctPorts) {
- val distinctPorts = portsExcludingZero.distinct
- require(distinctPorts.size == portsExcludingZero.size, s"Each listener
must have a different port, listeners: $listeners")
+ val (duplicatePorts, nonDuplicatePorts) = endPoints.filter {
+ // filter port 0 for unit tests
+ ep => ep.port != 0
+ }.groupBy(_.port).partition {
+ case (_, endpoints) => endpoints.size > 1
+ }
+
+ val nonDuplicatePortsOnlyEndpoints = nonDuplicatePorts.flatMap { case
(_, eps) => eps }.toList
+
+ checkDuplicateListenerNames(nonDuplicatePortsOnlyEndpoints, listeners)
+ if (requireDistinctPorts)
+ checkDuplicateListenerPorts(nonDuplicatePortsOnlyEndpoints, listeners)
+
+ // Exception case, lets allow duplicate ports if the host is on IPv4 and
the other is on IPv6
+ val duplicatePortsPartitionedByValidIps = duplicatePorts.map{
+ case (port, eps) =>
+ (port, eps.partition(ep =>
+ ep.host != null && inetAddressValidator.isValid(ep.host)
+ ))
+ }
+
+ // Iterate through every grouping of duplicates by port to see if they
are valid
+ duplicatePortsPartitionedByValidIps.foreach{
+ case (port, (duplicatesWithIpHosts, duplicatesWithoutIpHosts)) =>
+ checkDuplicateListenerNames(duplicatesWithoutIpHosts, listeners)
+ if (requireDistinctPorts)
+ checkDuplicateListenerPorts(duplicatesWithoutIpHosts, listeners)
+
+ duplicatesWithIpHosts match {
+ case s if s.isEmpty =>
Review Comment:
nit: What does `s` mean here?
##########
core/src/main/scala/kafka/utils/CoreUtils.scala:
##########
@@ -252,16 +255,63 @@ object CoreUtils {
listenerListToEndPoints(listeners, securityProtocolMap, true)
}
+ def validateOneIsIpv4AndOtherIpv6(first: String, second: String): Boolean =
+ (inetAddressValidator.isValidInet4Address(first) &&
inetAddressValidator.isValidInet6Address(second)) ||
+ (inetAddressValidator.isValidInet6Address(first) &&
inetAddressValidator.isValidInet4Address(second))
+
+ def checkDuplicateListenerNames(endpoints: Seq[EndPoint], listeners:
String): Unit = {
+ val distinctListenerNames = endpoints.map(_.listenerName).distinct
+ require(distinctListenerNames.size == endpoints.size, s"Each listener must
have a different name unless you have exactly " +
+ s"one listener on IPv4 and the other IPv6 on the same port, listeners:
$listeners")
+ }
+
+ def checkDuplicateListenerPorts(endpoints: Seq[EndPoint], listeners:
String): Unit = {
+ val distinctPorts = endpoints.map(_.port).distinct
+ require(distinctPorts.size == endpoints.map(_.port).size, s"Each listener
must have a different port, listeners: $listeners")
+ }
+
def listenerListToEndPoints(listeners: String, securityProtocolMap:
Map[ListenerName, SecurityProtocol], requireDistinctPorts: Boolean):
Seq[EndPoint] = {
def validate(endPoints: Seq[EndPoint]): Unit = {
- // filter port 0 for unit tests
- val portsExcludingZero = endPoints.map(_.port).filter(_ != 0)
- val distinctListenerNames = endPoints.map(_.listenerName).distinct
-
- require(distinctListenerNames.size == endPoints.size, s"Each listener
must have a different name, listeners: $listeners")
- if (requireDistinctPorts) {
- val distinctPorts = portsExcludingZero.distinct
- require(distinctPorts.size == portsExcludingZero.size, s"Each listener
must have a different port, listeners: $listeners")
+ val (duplicatePorts, nonDuplicatePorts) = endPoints.filter {
+ // filter port 0 for unit tests
+ ep => ep.port != 0
+ }.groupBy(_.port).partition {
+ case (_, endpoints) => endpoints.size > 1
+ }
+
+ val nonDuplicatePortsOnlyEndpoints = nonDuplicatePorts.flatMap { case
(_, eps) => eps }.toList
+
+ checkDuplicateListenerNames(nonDuplicatePortsOnlyEndpoints, listeners)
+ if (requireDistinctPorts)
+ checkDuplicateListenerPorts(nonDuplicatePortsOnlyEndpoints, listeners)
+
+ // Exception case, lets allow duplicate ports if the host is on IPv4 and
the other is on IPv6
+ val duplicatePortsPartitionedByValidIps = duplicatePorts.map{
+ case (port, eps) =>
+ (port, eps.partition(ep =>
+ ep.host != null && inetAddressValidator.isValid(ep.host)
Review Comment:
Should we throw exception when the `host` is invalid? Currently, we can
allow invalid host pass `validate` method, since we only check duplicate
listenerNames and ports below. Is that correct?
##########
core/src/main/scala/kafka/utils/CoreUtils.scala:
##########
@@ -252,16 +255,63 @@ object CoreUtils {
listenerListToEndPoints(listeners, securityProtocolMap, true)
}
+ def validateOneIsIpv4AndOtherIpv6(first: String, second: String): Boolean =
+ (inetAddressValidator.isValidInet4Address(first) &&
inetAddressValidator.isValidInet6Address(second)) ||
+ (inetAddressValidator.isValidInet6Address(first) &&
inetAddressValidator.isValidInet4Address(second))
+
+ def checkDuplicateListenerNames(endpoints: Seq[EndPoint], listeners:
String): Unit = {
+ val distinctListenerNames = endpoints.map(_.listenerName).distinct
+ require(distinctListenerNames.size == endpoints.size, s"Each listener must
have a different name unless you have exactly " +
+ s"one listener on IPv4 and the other IPv6 on the same port, listeners:
$listeners")
+ }
+
+ def checkDuplicateListenerPorts(endpoints: Seq[EndPoint], listeners:
String): Unit = {
+ val distinctPorts = endpoints.map(_.port).distinct
+ require(distinctPorts.size == endpoints.map(_.port).size, s"Each listener
must have a different port, listeners: $listeners")
+ }
+
def listenerListToEndPoints(listeners: String, securityProtocolMap:
Map[ListenerName, SecurityProtocol], requireDistinctPorts: Boolean):
Seq[EndPoint] = {
def validate(endPoints: Seq[EndPoint]): Unit = {
- // filter port 0 for unit tests
- val portsExcludingZero = endPoints.map(_.port).filter(_ != 0)
- val distinctListenerNames = endPoints.map(_.listenerName).distinct
-
- require(distinctListenerNames.size == endPoints.size, s"Each listener
must have a different name, listeners: $listeners")
- if (requireDistinctPorts) {
- val distinctPorts = portsExcludingZero.distinct
- require(distinctPorts.size == portsExcludingZero.size, s"Each listener
must have a different port, listeners: $listeners")
+ val (duplicatePorts, nonDuplicatePorts) = endPoints.filter {
+ // filter port 0 for unit tests
+ ep => ep.port != 0
+ }.groupBy(_.port).partition {
+ case (_, endpoints) => endpoints.size > 1
+ }
+
+ val nonDuplicatePortsOnlyEndpoints = nonDuplicatePorts.flatMap { case
(_, eps) => eps }.toList
+
+ checkDuplicateListenerNames(nonDuplicatePortsOnlyEndpoints, listeners)
+ if (requireDistinctPorts)
+ checkDuplicateListenerPorts(nonDuplicatePortsOnlyEndpoints, listeners)
+
+ // Exception case, lets allow duplicate ports if the host is on IPv4 and
the other is on IPv6
+ val duplicatePortsPartitionedByValidIps = duplicatePorts.map{
+ case (port, eps) =>
+ (port, eps.partition(ep =>
+ ep.host != null && inetAddressValidator.isValid(ep.host)
+ ))
+ }
+
+ // Iterate through every grouping of duplicates by port to see if they
are valid
+ duplicatePortsPartitionedByValidIps.foreach{
+ case (port, (duplicatesWithIpHosts, duplicatesWithoutIpHosts)) =>
+ checkDuplicateListenerNames(duplicatesWithoutIpHosts, listeners)
+ if (requireDistinctPorts)
+ checkDuplicateListenerPorts(duplicatesWithoutIpHosts, listeners)
+
+ duplicatesWithIpHosts match {
+ case s if s.isEmpty =>
+ case Seq(one, two) =>
Review Comment:
nit: rename to `Seq(ep1, ep2)` ?
##########
core/src/main/scala/kafka/utils/CoreUtils.scala:
##########
@@ -252,16 +255,63 @@ object CoreUtils {
listenerListToEndPoints(listeners, securityProtocolMap, true)
}
+ def validateOneIsIpv4AndOtherIpv6(first: String, second: String): Boolean =
+ (inetAddressValidator.isValidInet4Address(first) &&
inetAddressValidator.isValidInet6Address(second)) ||
+ (inetAddressValidator.isValidInet6Address(first) &&
inetAddressValidator.isValidInet4Address(second))
+
+ def checkDuplicateListenerNames(endpoints: Seq[EndPoint], listeners:
String): Unit = {
+ val distinctListenerNames = endpoints.map(_.listenerName).distinct
+ require(distinctListenerNames.size == endpoints.size, s"Each listener must
have a different name unless you have exactly " +
+ s"one listener on IPv4 and the other IPv6 on the same port, listeners:
$listeners")
+ }
+
+ def checkDuplicateListenerPorts(endpoints: Seq[EndPoint], listeners:
String): Unit = {
+ val distinctPorts = endpoints.map(_.port).distinct
+ require(distinctPorts.size == endpoints.map(_.port).size, s"Each listener
must have a different port, listeners: $listeners")
+ }
+
def listenerListToEndPoints(listeners: String, securityProtocolMap:
Map[ListenerName, SecurityProtocol], requireDistinctPorts: Boolean):
Seq[EndPoint] = {
def validate(endPoints: Seq[EndPoint]): Unit = {
- // filter port 0 for unit tests
- val portsExcludingZero = endPoints.map(_.port).filter(_ != 0)
- val distinctListenerNames = endPoints.map(_.listenerName).distinct
-
- require(distinctListenerNames.size == endPoints.size, s"Each listener
must have a different name, listeners: $listeners")
- if (requireDistinctPorts) {
- val distinctPorts = portsExcludingZero.distinct
- require(distinctPorts.size == portsExcludingZero.size, s"Each listener
must have a different port, listeners: $listeners")
+ val (duplicatePorts, nonDuplicatePorts) = endPoints.filter {
+ // filter port 0 for unit tests
+ ep => ep.port != 0
+ }.groupBy(_.port).partition {
+ case (_, endpoints) => endpoints.size > 1
+ }
+
+ val nonDuplicatePortsOnlyEndpoints = nonDuplicatePorts.flatMap { case
(_, eps) => eps }.toList
+
+ checkDuplicateListenerNames(nonDuplicatePortsOnlyEndpoints, listeners)
+ if (requireDistinctPorts)
+ checkDuplicateListenerPorts(nonDuplicatePortsOnlyEndpoints, listeners)
+
+ // Exception case, lets allow duplicate ports if the host is on IPv4 and
the other is on IPv6
+ val duplicatePortsPartitionedByValidIps = duplicatePorts.map{
+ case (port, eps) =>
+ (port, eps.partition(ep =>
+ ep.host != null && inetAddressValidator.isValid(ep.host)
+ ))
+ }
+
+ // Iterate through every grouping of duplicates by port to see if they
are valid
+ duplicatePortsPartitionedByValidIps.foreach{
+ case (port, (duplicatesWithIpHosts, duplicatesWithoutIpHosts)) =>
+ checkDuplicateListenerNames(duplicatesWithoutIpHosts, listeners)
+ if (requireDistinctPorts)
+ checkDuplicateListenerPorts(duplicatesWithoutIpHosts, listeners)
+
+ duplicatesWithIpHosts match {
+ case s if s.isEmpty =>
+ case Seq(one, two) =>
+ if (requireDistinctPorts)
+ require(validateOneIsIpv4AndOtherIpv6(one.host, two.host), "If
you have two listeners on " +
+ s"the same port then one needs to be IPv4 and the other
IPv6, listeners: $listeners, port: $port")
+ case other =>
Review Comment:
Could we rename this variable? ex: `allEps`?
##########
core/src/main/scala/kafka/utils/CoreUtils.scala:
##########
@@ -252,16 +255,63 @@ object CoreUtils {
listenerListToEndPoints(listeners, securityProtocolMap, true)
}
+ def validateOneIsIpv4AndOtherIpv6(first: String, second: String): Boolean =
+ (inetAddressValidator.isValidInet4Address(first) &&
inetAddressValidator.isValidInet6Address(second)) ||
+ (inetAddressValidator.isValidInet6Address(first) &&
inetAddressValidator.isValidInet4Address(second))
+
+ def checkDuplicateListenerNames(endpoints: Seq[EndPoint], listeners:
String): Unit = {
+ val distinctListenerNames = endpoints.map(_.listenerName).distinct
+ require(distinctListenerNames.size == endpoints.size, s"Each listener must
have a different name unless you have exactly " +
+ s"one listener on IPv4 and the other IPv6 on the same port, listeners:
$listeners")
+ }
+
+ def checkDuplicateListenerPorts(endpoints: Seq[EndPoint], listeners:
String): Unit = {
+ val distinctPorts = endpoints.map(_.port).distinct
+ require(distinctPorts.size == endpoints.map(_.port).size, s"Each listener
must have a different port, listeners: $listeners")
+ }
+
def listenerListToEndPoints(listeners: String, securityProtocolMap:
Map[ListenerName, SecurityProtocol], requireDistinctPorts: Boolean):
Seq[EndPoint] = {
def validate(endPoints: Seq[EndPoint]): Unit = {
- // filter port 0 for unit tests
- val portsExcludingZero = endPoints.map(_.port).filter(_ != 0)
- val distinctListenerNames = endPoints.map(_.listenerName).distinct
-
- require(distinctListenerNames.size == endPoints.size, s"Each listener
must have a different name, listeners: $listeners")
- if (requireDistinctPorts) {
- val distinctPorts = portsExcludingZero.distinct
- require(distinctPorts.size == portsExcludingZero.size, s"Each listener
must have a different port, listeners: $listeners")
+ val (duplicatePorts, nonDuplicatePorts) = endPoints.filter {
+ // filter port 0 for unit tests
+ ep => ep.port != 0
+ }.groupBy(_.port).partition {
+ case (_, endpoints) => endpoints.size > 1
+ }
+
+ val nonDuplicatePortsOnlyEndpoints = nonDuplicatePorts.flatMap { case
(_, eps) => eps }.toList
+
+ checkDuplicateListenerNames(nonDuplicatePortsOnlyEndpoints, listeners)
+ if (requireDistinctPorts)
+ checkDuplicateListenerPorts(nonDuplicatePortsOnlyEndpoints, listeners)
+
+ // Exception case, lets allow duplicate ports if the host is on IPv4 and
the other is on IPv6
Review Comment:
nit: // allow duplicate ports if one host is on IPv4 and the other one is on
IPv6
--
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.
To unsubscribe, e-mail: [email protected]
For queries about this service, please contact Infrastructure at:
[email protected]