showuon commented on code in PR #11478:
URL: https://github.com/apache/kafka/pull/11478#discussion_r928892036
##########
core/src/test/scala/unit/kafka/server/KafkaConfigTest.scala:
##########
@@ -206,6 +206,51 @@ class KafkaConfigTest {
assertBadConfigContainingMessage(props, "Each listener must have a
different name")
}
+ @Test
+ def testIPv4AndIPv6SamePortListeners(): Unit = {
+ val props = new Properties()
+ props.put(KafkaConfig.BrokerIdProp, "1")
+ props.put(KafkaConfig.ZkConnectProp, "localhost:2181")
+
+ props.put(KafkaConfig.ListenersProp,
"PLAINTEXT://[::1]:9092,SSL://[::1]:9092")
+ var caught = assertThrows(classOf[IllegalArgumentException], () =>
KafkaConfig.fromProps(props))
+ assertTrue(caught.getMessage.contains("If you have two listeners on the
same port then one needs to be IPv4 and the other IPv6"))
+
+ props.put(KafkaConfig.ListenersProp,
"PLAINTEXT://127.0.0.1:9092,SSL://127.0.0.1:9092")
+ caught = assertThrows(classOf[IllegalArgumentException], () =>
KafkaConfig.fromProps(props))
+ assertTrue(caught.getMessage.contains("If you have two listeners on the
same port then one needs to be IPv4 and the other IPv6"))
+
+ props.put(KafkaConfig.ListenersProp,
"SSL://[::1]:9096,PLAINTEXT://127.0.0.1:9096,SASL_SSL://:9096")
Review Comment:
Thanks for adding this test case.
##########
core/src/main/scala/kafka/utils/CoreUtils.scala:
##########
@@ -252,16 +255,60 @@ object CoreUtils {
listenerListToEndPoints(listeners, securityProtocolMap, true)
}
+ def validateOneIsIpv4AndOtherIpv6(first: String, second: String): Boolean =
+ (inetAddressValidator.isValidInet4Address(first) &&
inetAddressValidator.isValidInet6Address(second)) ||
+ (inetAddressValidator.isValidInet6Address(first) &&
inetAddressValidator.isValidInet4Address(second))
+
+ def checkDuplicateListenerPorts(endpoints: Seq[EndPoint], listeners:
String): Unit = {
+ val distinctPorts = endpoints.map(_.port).distinct
+ require(distinctPorts.size == endpoints.map(_.port).size, s"Each listener
must have a different port, listeners: $listeners")
+ }
+
def listenerListToEndPoints(listeners: String, securityProtocolMap:
Map[ListenerName, SecurityProtocol], requireDistinctPorts: Boolean):
Seq[EndPoint] = {
def validate(endPoints: Seq[EndPoint]): Unit = {
- // filter port 0 for unit tests
- val portsExcludingZero = endPoints.map(_.port).filter(_ != 0)
val distinctListenerNames = endPoints.map(_.listenerName).distinct
-
require(distinctListenerNames.size == endPoints.size, s"Each listener
must have a different name, listeners: $listeners")
- if (requireDistinctPorts) {
- val distinctPorts = portsExcludingZero.distinct
- require(distinctPorts.size == portsExcludingZero.size, s"Each listener
must have a different port, listeners: $listeners")
+
+ val (duplicatePorts, _) = endPoints.filter {
+ // filter port 0 for unit tests
+ ep => ep.port != 0
+ }.groupBy(_.port).partition {
+ case (_, endpoints) => endpoints.size > 1
+ }
+
+ // Exception case, let's allow duplicate ports if one host is on IPv4
and the other one is on IPv6
+ val duplicatePortsPartitionedByValidIps = duplicatePorts.map {
+ case (port, eps) =>
+ (port, eps.partition(ep =>
+ ep.host != null && inetAddressValidator.isValid(ep.host)
+ ))
+ }
+
+ // Iterate through every grouping of duplicates by port to see if they
are valid
+ duplicatePortsPartitionedByValidIps.foreach {
+ case (port, (duplicatesWithIpHosts, duplicatesWithoutIpHosts)) =>
+ if (requireDistinctPorts)
+ checkDuplicateListenerPorts(duplicatesWithoutIpHosts, listeners)
Review Comment:
I think we should throw exception if `duplicatesWithoutIpHosts` is not empty
here, because we actually only expect
`duplicatesWithIpHosts/duplicatesWithoutIpHosts` to be: `"2 IPs"/empty list`,
right? After all, we only allow duplicate ports under **valid** IPv4 and IPv6,
so we should be able to throw exception directly here, and remove the check
below:
```
if (duplicatesWithoutIpHosts.nonEmpty)
throw new IllegalArgumentException(errorMessage)
```
WDYT?
Something like this:
```
case (port, (duplicatesWithIpHosts, duplicatesWithoutIpHosts)) =>
// It's invalid if listener on this port without an IP host (such as a
null host)
if (duplicatesWithoutIpHosts.nonEmpty)
throw new IllegalArgumentException("...")
duplicatesWithIpHosts match {
case eps ....
if (requireDistinctPorts)
require(validateOneIsIpv4AndOtherIpv6(ep1.host, ep2.host),
"If you have two listeners on " +
s"the same port then one needs to be IPv4 and the other
IPv6, listeners: $listeners, port: $port")
```
##########
core/src/main/scala/kafka/utils/CoreUtils.scala:
##########
@@ -252,16 +255,63 @@ object CoreUtils {
listenerListToEndPoints(listeners, securityProtocolMap, true)
}
+ def validateOneIsIpv4AndOtherIpv6(first: String, second: String): Boolean =
+ (inetAddressValidator.isValidInet4Address(first) &&
inetAddressValidator.isValidInet6Address(second)) ||
+ (inetAddressValidator.isValidInet6Address(first) &&
inetAddressValidator.isValidInet4Address(second))
+
+ def checkDuplicateListenerNames(endpoints: Seq[EndPoint], listeners:
String): Unit = {
+ val distinctListenerNames = endpoints.map(_.listenerName).distinct
+ require(distinctListenerNames.size == endpoints.size, s"Each listener must
have a different name unless you have exactly " +
+ s"one listener on IPv4 and the other IPv6 on the same port, listeners:
$listeners")
+ }
+
+ def checkDuplicateListenerPorts(endpoints: Seq[EndPoint], listeners:
String): Unit = {
+ val distinctPorts = endpoints.map(_.port).distinct
+ require(distinctPorts.size == endpoints.map(_.port).size, s"Each listener
must have a different port, listeners: $listeners")
+ }
+
def listenerListToEndPoints(listeners: String, securityProtocolMap:
Map[ListenerName, SecurityProtocol], requireDistinctPorts: Boolean):
Seq[EndPoint] = {
def validate(endPoints: Seq[EndPoint]): Unit = {
- // filter port 0 for unit tests
- val portsExcludingZero = endPoints.map(_.port).filter(_ != 0)
- val distinctListenerNames = endPoints.map(_.listenerName).distinct
-
- require(distinctListenerNames.size == endPoints.size, s"Each listener
must have a different name, listeners: $listeners")
- if (requireDistinctPorts) {
- val distinctPorts = portsExcludingZero.distinct
- require(distinctPorts.size == portsExcludingZero.size, s"Each listener
must have a different port, listeners: $listeners")
+ val (duplicatePorts, nonDuplicatePorts) = endPoints.filter {
+ // filter port 0 for unit tests
+ ep => ep.port != 0
+ }.groupBy(_.port).partition {
+ case (_, endpoints) => endpoints.size > 1
+ }
+
+ val nonDuplicatePortsOnlyEndpoints = nonDuplicatePorts.flatMap { case
(_, eps) => eps }.toList
+
+ checkDuplicateListenerNames(nonDuplicatePortsOnlyEndpoints, listeners)
+ if (requireDistinctPorts)
+ checkDuplicateListenerPorts(nonDuplicatePortsOnlyEndpoints, listeners)
+
+ // Exception case, lets allow duplicate ports if the host is on IPv4 and
the other is on IPv6
+ val duplicatePortsPartitionedByValidIps = duplicatePorts.map{
+ case (port, eps) =>
+ (port, eps.partition(ep =>
+ ep.host != null && inetAddressValidator.isValid(ep.host)
Review Comment:
Thanks for the explanation. I see.
##########
core/src/main/scala/kafka/utils/CoreUtils.scala:
##########
@@ -252,16 +255,60 @@ object CoreUtils {
listenerListToEndPoints(listeners, securityProtocolMap, true)
}
+ def validateOneIsIpv4AndOtherIpv6(first: String, second: String): Boolean =
+ (inetAddressValidator.isValidInet4Address(first) &&
inetAddressValidator.isValidInet6Address(second)) ||
+ (inetAddressValidator.isValidInet6Address(first) &&
inetAddressValidator.isValidInet4Address(second))
+
+ def checkDuplicateListenerPorts(endpoints: Seq[EndPoint], listeners:
String): Unit = {
+ val distinctPorts = endpoints.map(_.port).distinct
+ require(distinctPorts.size == endpoints.map(_.port).size, s"Each listener
must have a different port, listeners: $listeners")
+ }
+
def listenerListToEndPoints(listeners: String, securityProtocolMap:
Map[ListenerName, SecurityProtocol], requireDistinctPorts: Boolean):
Seq[EndPoint] = {
def validate(endPoints: Seq[EndPoint]): Unit = {
- // filter port 0 for unit tests
- val portsExcludingZero = endPoints.map(_.port).filter(_ != 0)
val distinctListenerNames = endPoints.map(_.listenerName).distinct
-
require(distinctListenerNames.size == endPoints.size, s"Each listener
must have a different name, listeners: $listeners")
- if (requireDistinctPorts) {
- val distinctPorts = portsExcludingZero.distinct
- require(distinctPorts.size == portsExcludingZero.size, s"Each listener
must have a different port, listeners: $listeners")
+
+ val (duplicatePorts, _) = endPoints.filter {
+ // filter port 0 for unit tests
+ ep => ep.port != 0
+ }.groupBy(_.port).partition {
+ case (_, endpoints) => endpoints.size > 1
+ }
+
+ // Exception case, let's allow duplicate ports if one host is on IPv4
and the other one is on IPv6
+ val duplicatePortsPartitionedByValidIps = duplicatePorts.map {
+ case (port, eps) =>
+ (port, eps.partition(ep =>
+ ep.host != null && inetAddressValidator.isValid(ep.host)
+ ))
+ }
+
+ // Iterate through every grouping of duplicates by port to see if they
are valid
+ duplicatePortsPartitionedByValidIps.foreach {
+ case (port, (duplicatesWithIpHosts, duplicatesWithoutIpHosts)) =>
+ if (requireDistinctPorts)
+ checkDuplicateListenerPorts(duplicatesWithoutIpHosts, listeners)
+
+ duplicatesWithIpHosts match {
+ case eps if eps.isEmpty =>
+ case Seq(ep1, ep2) =>
+ if (requireDistinctPorts) {
+ val errorMessage = "If you have two listeners on " +
+ s"the same port then one needs to be IPv4 and the other
IPv6, listeners: $listeners, port: $port"
+ require(validateOneIsIpv4AndOtherIpv6(ep1.host, ep2.host),
errorMessage)
+
+ // If we reach this point it means that even though
duplicatesWithIpHosts in isolation can be valid, if
+ // there happens to be ANOTHER listener on this port without
an IP host (such as a null host) then its
+ // not valid.
+ if (duplicatesWithoutIpHosts.nonEmpty)
+ throw new IllegalArgumentException(errorMessage)
+ }
+ case _ =>
+ if (requireDistinctPorts)
Review Comment:
nit: we can add a comment here, ex:
// more than 2 duplicate endpoints is not allowed
--
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.
To unsubscribe, e-mail: [email protected]
For queries about this service, please contact Infrastructure at:
[email protected]