This is an automated email from the ASF dual-hosted git repository.
zhouky pushed a commit to branch main
in repository https://gitbox.apache.org/repos/asf/incubator-celeborn.git
The following commit(s) were added to refs/heads/main by this push:
new 7a05b2fc1 [CELEBORN-1016] Fix IPv6 host address resolve issue
7a05b2fc1 is described below
commit 7a05b2fc182622a11114dce478e83a7ec8b558ed
Author: Fei Wang <[email protected]>
AuthorDate: Sat Feb 17 10:31:49 2024 +0800
[CELEBORN-1016] Fix IPv6 host address resolve issue
### What changes were proposed in this pull request?
To close CELEBORN-1016, fix the issue when parse IPv6 host address.
### Why are the changes needed?
Fix CELEBORN-1016
### Does this PR introduce _any_ user-facing change?
No.
### How was this patch tested?
UT.
Closes #2293 from turboFei/CELEBORN-1016_ipv6.
Authored-by: Fei Wang <[email protected]>
Signed-off-by: zky.zhoukeyong <[email protected]>
---
.../java/org/apache/celeborn/client/ShuffleClientImpl.java | 6 +++---
.../scala/org/apache/celeborn/common/meta/WorkerInfo.scala | 5 +++--
.../main/scala/org/apache/celeborn/common/util/Utils.scala | 7 +++++++
.../scala/org/apache/celeborn/common/util/UtilsSuite.scala | 13 +++++++++++++
4 files changed, 26 insertions(+), 5 deletions(-)
diff --git
a/client/src/main/java/org/apache/celeborn/client/ShuffleClientImpl.java
b/client/src/main/java/org/apache/celeborn/client/ShuffleClientImpl.java
index b7dc720b4..c3dcd2315 100644
--- a/client/src/main/java/org/apache/celeborn/client/ShuffleClientImpl.java
+++ b/client/src/main/java/org/apache/celeborn/client/ShuffleClientImpl.java
@@ -1278,9 +1278,9 @@ public class ShuffleClientImpl extends ShuffleClient {
PushState pushState,
int remainReviveTimes) {
String hostPort = addressPair.getLeft();
- final String[] splits = hostPort.split(":");
- final String host = splits[0];
- final int port = Integer.parseInt(splits[1]);
+ String[] hostPortArr = Utils.parseColonSeparatedHostPorts(hostPort, 1);
+ final String host = hostPortArr[0];
+ final int port = Integer.parseInt(hostPortArr[1]);
int groupedBatchId = pushState.nextBatchId();
pushState.addBatch(groupedBatchId, hostPort);
diff --git
a/common/src/main/scala/org/apache/celeborn/common/meta/WorkerInfo.scala
b/common/src/main/scala/org/apache/celeborn/common/meta/WorkerInfo.scala
index 24c5e5a5e..220663e59 100644
--- a/common/src/main/scala/org/apache/celeborn/common/meta/WorkerInfo.scala
+++ b/common/src/main/scala/org/apache/celeborn/common/meta/WorkerInfo.scala
@@ -28,7 +28,7 @@ import org.apache.celeborn.common.protocol.StorageInfo
import org.apache.celeborn.common.quota.ResourceConsumption
import org.apache.celeborn.common.rpc.RpcEndpointRef
import org.apache.celeborn.common.rpc.netty.NettyRpcEndpointRef
-import org.apache.celeborn.common.util.JavaUtils
+import org.apache.celeborn.common.util.{JavaUtils, Utils}
class WorkerInfo(
val host: String,
@@ -271,7 +271,8 @@ class WorkerInfo(
object WorkerInfo {
def fromUniqueId(id: String): WorkerInfo = {
- val Array(host, rpcPort, pushPort, fetchPort, replicatePort) =
id.split(":")
+ val Array(host, rpcPort, pushPort, fetchPort, replicatePort) =
+ Utils.parseColonSeparatedHostPorts(id, portsNum = 4)
new WorkerInfo(host, rpcPort.toInt, pushPort.toInt, fetchPort.toInt,
replicatePort.toInt)
}
}
diff --git a/common/src/main/scala/org/apache/celeborn/common/util/Utils.scala
b/common/src/main/scala/org/apache/celeborn/common/util/Utils.scala
index 2b71e8cb8..73ab6fa79 100644
--- a/common/src/main/scala/org/apache/celeborn/common/util/Utils.scala
+++ b/common/src/main/scala/org/apache/celeborn/common/util/Utils.scala
@@ -1101,4 +1101,11 @@ object Utils extends Logging {
private val dateFmt: FastDateFormat =
FastDateFormat.getInstance("yyyy-MM-dd'T'HH:mm:ss.SSSZ", Locale.ROOT)
def formatTimestamp(timestamp: Long): String = dateFmt.format(timestamp)
+
+ def parseColonSeparatedHostPorts(id: String, portsNum: Int): Array[String] =
{
+ val components = id.split(":")
+ val portsArr = components.takeRight(portsNum)
+ val host = components.dropRight(portsNum).mkString(":")
+ Array(host) ++ portsArr
+ }
}
diff --git
a/common/src/test/scala/org/apache/celeborn/common/util/UtilsSuite.scala
b/common/src/test/scala/org/apache/celeborn/common/util/UtilsSuite.scala
index d33eeb69c..6abf91357 100644
--- a/common/src/test/scala/org/apache/celeborn/common/util/UtilsSuite.scala
+++ b/common/src/test/scala/org/apache/celeborn/common/util/UtilsSuite.scala
@@ -178,6 +178,19 @@ class UtilsSuite extends CelebornFunSuite {
assert(transportConf.clientThreads() == 100)
}
+ test("parse colon separated host ports") {
+ val ipV4Host = "192.168.0.1"
+ val ipV6Host = "2600:1f13:9f:2d00:4a70:cc69:737d:7cb0"
+ assert(Utils.parseColonSeparatedHostPorts(s"$ipV4Host:1000",
1).sameElements(
+ Array[Object](ipV4Host, "1000")))
+ assert(Utils.parseColonSeparatedHostPorts(s"$ipV4Host:1:2:3:4",
4).sameElements(
+ Array[Object](ipV4Host, "1", "2", "3", "4")))
+ assert(Utils.parseColonSeparatedHostPorts(s"$ipV6Host:1000",
1).sameElements(
+ Array[Object](ipV6Host, "1000")))
+ assert(Utils.parseColonSeparatedHostPorts(s"$ipV6Host:1:2:3:4",
4).sameElements(
+ Array[Object](ipV6Host, "1", "2", "3", "4")))
+ }
+
def partitionLocation(partitionId: Int): util.HashSet[PartitionLocation] = {
val partitionSet = new util.HashSet[PartitionLocation]
for (i <- 0 until 3) {