This is an automated email from the ASF dual-hosted git repository.
dongjoon pushed a commit to branch branch-3.4
in repository https://gitbox.apache.org/repos/asf/spark.git
The following commit(s) were added to refs/heads/branch-3.4 by this push:
new 61c53cd702e [SPARK-42173][CORE] RpcAddress equality can fail
61c53cd702e is described below
commit 61c53cd702e51affa4436d77b6aaf7fc25cb7808
Author: Holden Karau <[email protected]>
AuthorDate: Thu Jan 26 09:12:48 2023 -0800
[SPARK-42173][CORE] RpcAddress equality can fail
### What changes were proposed in this pull request?
When constructing an RpcAddress use InetUtils to get a consistently
formatted IPv6 address if the env is for an IPv6 address.
### Why are the changes needed?
We use RpcAddress equality for various tasks involving executors and a
mismatch of equality can cause interesting errors.
### Does this PR introduce _any_ user-facing change?
Log messages might change from sometimes having all the 0s in a v6 address
present to not.
### How was this patch tested?
Existing tests + new unit test showing that [::0:1] is formatted to [::1]
Closes #39728 from holdenk/SPARK-42173-ipv6-sparse.
Authored-by: Holden Karau <[email protected]>
Signed-off-by: Dongjoon Hyun <[email protected]>
(cherry picked from commit e30bb538e480940b1963eb14c3267662912d8584)
Signed-off-by: Dongjoon Hyun <[email protected]>
---
.../main/scala/org/apache/spark/rpc/RpcAddress.scala | 15 ++++++++++-----
core/src/main/scala/org/apache/spark/util/Utils.scala | 18 ++++++++++++++++++
.../scala/org/apache/spark/rpc/RpcAddressSuite.scala | 10 ++++++++++
3 files changed, 38 insertions(+), 5 deletions(-)
diff --git a/core/src/main/scala/org/apache/spark/rpc/RpcAddress.scala
b/core/src/main/scala/org/apache/spark/rpc/RpcAddress.scala
index 675dc24206a..1fa22451e5d 100644
--- a/core/src/main/scala/org/apache/spark/rpc/RpcAddress.scala
+++ b/core/src/main/scala/org/apache/spark/rpc/RpcAddress.scala
@@ -23,9 +23,7 @@ import org.apache.spark.util.Utils
/**
* Address for an RPC environment, with hostname and port.
*/
-private[spark] case class RpcAddress(_host: String, port: Int) {
-
- lazy val host: String = Utils.addBracketsIfNeeded(_host)
+private[spark] case class RpcAddress(host: String, port: Int) {
def hostPort: String = host + ":" + port
@@ -38,15 +36,22 @@ private[spark] case class RpcAddress(_host: String, port:
Int) {
private[spark] object RpcAddress {
+ def apply(host: String, port: Int): RpcAddress = {
+ new RpcAddress(
+ Utils.normalizeIpIfNeeded(host),
+ port
+ )
+ }
+
/** Return the [[RpcAddress]] represented by `uri`. */
def fromUrlString(uri: String): RpcAddress = {
val uriObj = new java.net.URI(uri)
- RpcAddress(uriObj.getHost, uriObj.getPort)
+ apply(uriObj.getHost, uriObj.getPort)
}
/** Returns the [[RpcAddress]] encoded in the form of "spark://host:port" */
def fromSparkURL(sparkUrl: String): RpcAddress = {
val (host, port) = Utils.extractHostPortFromSparkUrl(sparkUrl)
- RpcAddress(host, port)
+ apply(host, port)
}
}
diff --git a/core/src/main/scala/org/apache/spark/util/Utils.scala
b/core/src/main/scala/org/apache/spark/util/Utils.scala
index fb073595147..9bf45ed3776 100644
--- a/core/src/main/scala/org/apache/spark/util/Utils.scala
+++ b/core/src/main/scala/org/apache/spark/util/Utils.scala
@@ -1109,6 +1109,24 @@ private[spark] object Utils extends Logging {
}
}
+ /**
+ * Normalize IPv6 IPs and no-op on all other hosts.
+ */
+ private[spark] def normalizeIpIfNeeded(host: String): String = {
+ // Is this a v6 address. We ask users to add [] around v6 addresses as
strs but
+ // there not always there. If it's just 0-9 and : and [] we treat it as a
v6 address.
+ // This means some invalid addresses are treated as v6 addresses, but
since they are
+ // not valid hostnames it doesn't matter.
+ // See https://www.rfc-editor.org/rfc/rfc1123#page-13 for context around
valid hostnames.
+ val addressRe = """^\[{0,1}([0-9:]+?:[0-9]*)\]{0,1}$""".r
+ host match {
+ case addressRe(unbracketed) =>
+
addBracketsIfNeeded(InetAddresses.toAddrString(InetAddresses.forString(unbracketed)))
+ case _ =>
+ host
+ }
+ }
+
/**
* Checks if the host contains only valid hostname/ip without port
* NOTE: Incase of IPV6 ip it should be enclosed inside []
diff --git a/core/src/test/scala/org/apache/spark/rpc/RpcAddressSuite.scala
b/core/src/test/scala/org/apache/spark/rpc/RpcAddressSuite.scala
index 0f7c9d71330..9fb08c79420 100644
--- a/core/src/test/scala/org/apache/spark/rpc/RpcAddressSuite.scala
+++ b/core/src/test/scala/org/apache/spark/rpc/RpcAddressSuite.scala
@@ -70,4 +70,14 @@ class RpcAddressSuite extends SparkFunSuite {
val address = RpcAddress("::1", 1234)
assert(address.toSparkURL == "spark://[::1]:1234")
}
+
+ test("SPARK-42173: Consistent Sparse Mapping") {
+ val address = RpcAddress("::0:1", 1234)
+ assert(address.toSparkURL == "spark://[::1]:1234")
+ }
+
+ test("SPARK-42173: Consistent Sparse Mapping trailing 0s") {
+ val address = RpcAddress("2600::", 1234)
+ assert(address.toSparkURL == "spark://[2600::]:1234")
+ }
}
---------------------------------------------------------------------
To unsubscribe, e-mail: [email protected]
For additional commands, e-mail: [email protected]