This is an automated email from the ASF dual-hosted git repository.
ethanfeng pushed a commit to branch main
in repository https://gitbox.apache.org/repos/asf/celeborn.git
The following commit(s) were added to refs/heads/main by this push:
new e76e44524 [CELEBORN-1494] Support IPv6 addresses in
PbSerDeUtils.fromPackedPartitionLocations
e76e44524 is described below
commit e76e445242bc566b4bd43bb230c8cb17eb6f042f
Author: Mridul Muralidharan <mridulatgmail.com>
AuthorDate: Tue Jul 9 17:40:12 2024 +0800
[CELEBORN-1494] Support IPv6 addresses in
PbSerDeUtils.fromPackedPartitionLocations
### What changes were proposed in this pull request?
When Celeborn runs under IPv6, client side deserialization fails when
workers locations are IPv6 addresses.
This is because `PbSerDeUtils.fromPackedPartitionLocations` does a split by
`":"` - which does not work when host is an IPv6 address
### Why are the changes needed?
Fix IPv6 support
### Does this PR introduce _any_ user-facing change?
No
### How was this patch tested?
Unit test added - test fails without changes to
`PbSerDeUtils.fromPackedPartitionLocations`
Closes #2606 from mridulm/fix-ip-v6.
Authored-by: Mridul Muralidharan <mridulatgmail.com>
Signed-off-by: mingji <[email protected]>
---
.../apache/celeborn/common/util/PbSerDeUtils.scala | 6 ++++--
.../celeborn/common/util/PbSerDeUtilsTest.scala | 21 +++++++++++++++++++++
2 files changed, 25 insertions(+), 2 deletions(-)
diff --git
a/common/src/main/scala/org/apache/celeborn/common/util/PbSerDeUtils.scala
b/common/src/main/scala/org/apache/celeborn/common/util/PbSerDeUtils.scala
index 77db460ea..a2cd8dbde 100644
--- a/common/src/main/scala/org/apache/celeborn/common/util/PbSerDeUtils.scala
+++ b/common/src/main/scala/org/apache/celeborn/common/util/PbSerDeUtils.scala
@@ -603,8 +603,10 @@ object PbSerDeUtils {
private def fromPackedPartitionLocations(
pbPackedPartitionLocations: PbPackedPartitionLocations,
index: Int): PartitionLocation = {
- val workerIdParts = pbPackedPartitionLocations.getWorkerIdsSet(
- pbPackedPartitionLocations.getWorkerIds(index)).split(":").map(_.trim)
+ val workerIdParts = Utils.parseColonSeparatedHostPorts(
+ pbPackedPartitionLocations.getWorkerIdsSet(
+ pbPackedPartitionLocations.getWorkerIds(index)),
+ 4).map(_.trim)
var filePath = pbPackedPartitionLocations.getFilePaths(index)
if (filePath != "") {
filePath = pbPackedPartitionLocations.getMountPointsSet(
diff --git
a/common/src/test/scala/org/apache/celeborn/common/util/PbSerDeUtilsTest.scala
b/common/src/test/scala/org/apache/celeborn/common/util/PbSerDeUtilsTest.scala
index 5f5247273..d2eea050e 100644
---
a/common/src/test/scala/org/apache/celeborn/common/util/PbSerDeUtilsTest.scala
+++
b/common/src/test/scala/org/apache/celeborn/common/util/PbSerDeUtilsTest.scala
@@ -137,6 +137,17 @@ class PbSerDeUtilsTest extends CelebornFunSuite {
"filePath",
StorageInfo.LOCAL_DISK_MASK),
null)
+ val partitionLocationIPv6 =
+ // some random ipv6 address
+ new PartitionLocation(
+ 2,
+ 2,
+ "[2001:0db8:85a3:0000:0000:8a2e:0370:7334]",
+ 30,
+ 29,
+ 28,
+ 27,
+ PartitionLocation.Mode.PRIMARY)
val workerResource = new WorkerResource()
workerResource.put(
@@ -322,6 +333,16 @@ class PbSerDeUtilsTest extends CelebornFunSuite {
assert(partitionLocation3 == loc1)
}
+ test("testPackedPartitionLocationPairIPv6") {
+ val pairPb = PbSerDeUtils.toPbPackedPartitionLocationsPair(
+ List(partitionLocationIPv6))
+ val rePb = PbSerDeUtils.fromPbPackedPartitionLocationsPair(pairPb)
+
+ val loc1 = rePb._1.get(0)
+
+ assert(partitionLocationIPv6 == loc1)
+ }
+
private def testSerializationPerformance(scale: Int): Unit = {
val mountPoints = List(
"/mnt/disk1/celeborn/",