This is an automated email from the ASF dual-hosted git repository.

ethanfeng pushed a commit to branch main
in repository https://gitbox.apache.org/repos/asf/celeborn.git


The following commit(s) were added to refs/heads/main by this push:
     new e76e44524 [CELEBORN-1494] Support IPv6 addresses in 
PbSerDeUtils.fromPackedPartitionLocations
e76e44524 is described below

commit e76e445242bc566b4bd43bb230c8cb17eb6f042f
Author: Mridul Muralidharan <mridulatgmail.com>
AuthorDate: Tue Jul 9 17:40:12 2024 +0800

    [CELEBORN-1494] Support IPv6 addresses in 
PbSerDeUtils.fromPackedPartitionLocations
    
    ### What changes were proposed in this pull request?
    
    When Celeborn runs under IPv6, client side deserialization fails when 
workers locations are IPv6 addresses.
    This is because `PbSerDeUtils.fromPackedPartitionLocations` does a split by 
`":"` - which does not work when host is an IPv6 address
    
    ### Why are the changes needed?
    
    Fix IPv6 support
    
    ### Does this PR introduce _any_ user-facing change?
    
    No
    
    ### How was this patch tested?
    
    Unit test added - test fails without changes to 
`PbSerDeUtils.fromPackedPartitionLocations`
    
    Closes #2606 from mridulm/fix-ip-v6.
    
    Authored-by: Mridul Muralidharan <mridulatgmail.com>
    Signed-off-by: mingji <[email protected]>
---
 .../apache/celeborn/common/util/PbSerDeUtils.scala  |  6 ++++--
 .../celeborn/common/util/PbSerDeUtilsTest.scala     | 21 +++++++++++++++++++++
 2 files changed, 25 insertions(+), 2 deletions(-)

diff --git 
a/common/src/main/scala/org/apache/celeborn/common/util/PbSerDeUtils.scala 
b/common/src/main/scala/org/apache/celeborn/common/util/PbSerDeUtils.scala
index 77db460ea..a2cd8dbde 100644
--- a/common/src/main/scala/org/apache/celeborn/common/util/PbSerDeUtils.scala
+++ b/common/src/main/scala/org/apache/celeborn/common/util/PbSerDeUtils.scala
@@ -603,8 +603,10 @@ object PbSerDeUtils {
   private def fromPackedPartitionLocations(
       pbPackedPartitionLocations: PbPackedPartitionLocations,
       index: Int): PartitionLocation = {
-    val workerIdParts = pbPackedPartitionLocations.getWorkerIdsSet(
-      pbPackedPartitionLocations.getWorkerIds(index)).split(":").map(_.trim)
+    val workerIdParts = Utils.parseColonSeparatedHostPorts(
+      pbPackedPartitionLocations.getWorkerIdsSet(
+        pbPackedPartitionLocations.getWorkerIds(index)),
+      4).map(_.trim)
     var filePath = pbPackedPartitionLocations.getFilePaths(index)
     if (filePath != "") {
       filePath = pbPackedPartitionLocations.getMountPointsSet(
diff --git 
a/common/src/test/scala/org/apache/celeborn/common/util/PbSerDeUtilsTest.scala 
b/common/src/test/scala/org/apache/celeborn/common/util/PbSerDeUtilsTest.scala
index 5f5247273..d2eea050e 100644
--- 
a/common/src/test/scala/org/apache/celeborn/common/util/PbSerDeUtilsTest.scala
+++ 
b/common/src/test/scala/org/apache/celeborn/common/util/PbSerDeUtilsTest.scala
@@ -137,6 +137,17 @@ class PbSerDeUtilsTest extends CelebornFunSuite {
         "filePath",
         StorageInfo.LOCAL_DISK_MASK),
       null)
+  val partitionLocationIPv6 =
+    // some random ipv6 address
+    new PartitionLocation(
+      2,
+      2,
+      "[2001:0db8:85a3:0000:0000:8a2e:0370:7334]",
+      30,
+      29,
+      28,
+      27,
+      PartitionLocation.Mode.PRIMARY)
 
   val workerResource = new WorkerResource()
   workerResource.put(
@@ -322,6 +333,16 @@ class PbSerDeUtilsTest extends CelebornFunSuite {
     assert(partitionLocation3 == loc1)
   }
 
+  test("testPackedPartitionLocationPairIPv6") {
+    val pairPb = PbSerDeUtils.toPbPackedPartitionLocationsPair(
+      List(partitionLocationIPv6))
+    val rePb = PbSerDeUtils.fromPbPackedPartitionLocationsPair(pairPb)
+
+    val loc1 = rePb._1.get(0)
+
+    assert(partitionLocationIPv6 == loc1)
+  }
+
   private def testSerializationPerformance(scale: Int): Unit = {
     val mountPoints = List(
       "/mnt/disk1/celeborn/",

Reply via email to