This is an automated email from the ASF dual-hosted git repository.

zhouky pushed a commit to branch main
in repository https://gitbox.apache.org/repos/asf/incubator-celeborn.git


The following commit(s) were added to refs/heads/main by this push:
     new 7a05b2fc1 [CELEBORN-1016] Fix IPv6 host address resolve issue
7a05b2fc1 is described below

commit 7a05b2fc182622a11114dce478e83a7ec8b558ed
Author: Fei Wang <[email protected]>
AuthorDate: Sat Feb 17 10:31:49 2024 +0800

    [CELEBORN-1016] Fix IPv6 host address resolve issue
    
    ### What changes were proposed in this pull request?
    
    To close CELEBORN-1016, fix the issue when parse IPv6 host address.
    ### Why are the changes needed?
    
    Fix CELEBORN-1016
    
    ### Does this PR introduce _any_ user-facing change?
    No.
    
    ### How was this patch tested?
    UT.
    
    Closes #2293 from turboFei/CELEBORN-1016_ipv6.
    
    Authored-by: Fei Wang <[email protected]>
    Signed-off-by: zky.zhoukeyong <[email protected]>
---
 .../java/org/apache/celeborn/client/ShuffleClientImpl.java  |  6 +++---
 .../scala/org/apache/celeborn/common/meta/WorkerInfo.scala  |  5 +++--
 .../main/scala/org/apache/celeborn/common/util/Utils.scala  |  7 +++++++
 .../scala/org/apache/celeborn/common/util/UtilsSuite.scala  | 13 +++++++++++++
 4 files changed, 26 insertions(+), 5 deletions(-)

diff --git 
a/client/src/main/java/org/apache/celeborn/client/ShuffleClientImpl.java 
b/client/src/main/java/org/apache/celeborn/client/ShuffleClientImpl.java
index b7dc720b4..c3dcd2315 100644
--- a/client/src/main/java/org/apache/celeborn/client/ShuffleClientImpl.java
+++ b/client/src/main/java/org/apache/celeborn/client/ShuffleClientImpl.java
@@ -1278,9 +1278,9 @@ public class ShuffleClientImpl extends ShuffleClient {
       PushState pushState,
       int remainReviveTimes) {
     String hostPort = addressPair.getLeft();
-    final String[] splits = hostPort.split(":");
-    final String host = splits[0];
-    final int port = Integer.parseInt(splits[1]);
+    String[] hostPortArr = Utils.parseColonSeparatedHostPorts(hostPort, 1);
+    final String host = hostPortArr[0];
+    final int port = Integer.parseInt(hostPortArr[1]);
 
     int groupedBatchId = pushState.nextBatchId();
     pushState.addBatch(groupedBatchId, hostPort);
diff --git 
a/common/src/main/scala/org/apache/celeborn/common/meta/WorkerInfo.scala 
b/common/src/main/scala/org/apache/celeborn/common/meta/WorkerInfo.scala
index 24c5e5a5e..220663e59 100644
--- a/common/src/main/scala/org/apache/celeborn/common/meta/WorkerInfo.scala
+++ b/common/src/main/scala/org/apache/celeborn/common/meta/WorkerInfo.scala
@@ -28,7 +28,7 @@ import org.apache.celeborn.common.protocol.StorageInfo
 import org.apache.celeborn.common.quota.ResourceConsumption
 import org.apache.celeborn.common.rpc.RpcEndpointRef
 import org.apache.celeborn.common.rpc.netty.NettyRpcEndpointRef
-import org.apache.celeborn.common.util.JavaUtils
+import org.apache.celeborn.common.util.{JavaUtils, Utils}
 
 class WorkerInfo(
     val host: String,
@@ -271,7 +271,8 @@ class WorkerInfo(
 object WorkerInfo {
 
   def fromUniqueId(id: String): WorkerInfo = {
-    val Array(host, rpcPort, pushPort, fetchPort, replicatePort) = 
id.split(":")
+    val Array(host, rpcPort, pushPort, fetchPort, replicatePort) =
+      Utils.parseColonSeparatedHostPorts(id, portsNum = 4)
     new WorkerInfo(host, rpcPort.toInt, pushPort.toInt, fetchPort.toInt, 
replicatePort.toInt)
   }
 }
diff --git a/common/src/main/scala/org/apache/celeborn/common/util/Utils.scala 
b/common/src/main/scala/org/apache/celeborn/common/util/Utils.scala
index 2b71e8cb8..73ab6fa79 100644
--- a/common/src/main/scala/org/apache/celeborn/common/util/Utils.scala
+++ b/common/src/main/scala/org/apache/celeborn/common/util/Utils.scala
@@ -1101,4 +1101,11 @@ object Utils extends Logging {
   private val dateFmt: FastDateFormat =
     FastDateFormat.getInstance("yyyy-MM-dd'T'HH:mm:ss.SSSZ", Locale.ROOT)
   def formatTimestamp(timestamp: Long): String = dateFmt.format(timestamp)
+
+  def parseColonSeparatedHostPorts(id: String, portsNum: Int): Array[String] = 
{
+    val components = id.split(":")
+    val portsArr = components.takeRight(portsNum)
+    val host = components.dropRight(portsNum).mkString(":")
+    Array(host) ++ portsArr
+  }
 }
diff --git 
a/common/src/test/scala/org/apache/celeborn/common/util/UtilsSuite.scala 
b/common/src/test/scala/org/apache/celeborn/common/util/UtilsSuite.scala
index d33eeb69c..6abf91357 100644
--- a/common/src/test/scala/org/apache/celeborn/common/util/UtilsSuite.scala
+++ b/common/src/test/scala/org/apache/celeborn/common/util/UtilsSuite.scala
@@ -178,6 +178,19 @@ class UtilsSuite extends CelebornFunSuite {
     assert(transportConf.clientThreads() == 100)
   }
 
+  test("parse colon separated host ports") {
+    val ipV4Host = "192.168.0.1"
+    val ipV6Host = "2600:1f13:9f:2d00:4a70:cc69:737d:7cb0"
+    assert(Utils.parseColonSeparatedHostPorts(s"$ipV4Host:1000", 
1).sameElements(
+      Array[Object](ipV4Host, "1000")))
+    assert(Utils.parseColonSeparatedHostPorts(s"$ipV4Host:1:2:3:4", 
4).sameElements(
+      Array[Object](ipV4Host, "1", "2", "3", "4")))
+    assert(Utils.parseColonSeparatedHostPorts(s"$ipV6Host:1000", 
1).sameElements(
+      Array[Object](ipV6Host, "1000")))
+    assert(Utils.parseColonSeparatedHostPorts(s"$ipV6Host:1:2:3:4", 
4).sameElements(
+      Array[Object](ipV6Host, "1", "2", "3", "4")))
+  }
+
   def partitionLocation(partitionId: Int): util.HashSet[PartitionLocation] = {
     val partitionSet = new util.HashSet[PartitionLocation]
     for (i <- 0 until 3) {

Reply via email to