This is an automated email from the ASF dual-hosted git repository.

nicholasjiang pushed a commit to branch branch-0.5
in repository https://gitbox.apache.org/repos/asf/celeborn.git


The following commit(s) were added to refs/heads/branch-0.5 by this push:
     new 10530faf3 [CELEBORN-1692] Set mount point in fromPbFileInfoMap
10530faf3 is described below

commit 10530faf3c96c507e26959a0a7ae97bb8dc58ba0
Author: Weijie Guo <[email protected]>
AuthorDate: Thu Nov 7 14:12:42 2024 +0800

    [CELEBORN-1692] Set mount point in fromPbFileInfoMap
---
 .../apache/celeborn/common/util/PbSerDeUtils.scala | 11 ++++++--
 .../celeborn/common/util/PbSerDeUtilsTest.scala    | 33 ++++++++++++++++++++--
 .../deploy/worker/storage/StorageManager.scala     |  2 +-
 3 files changed, 41 insertions(+), 5 deletions(-)

diff --git 
a/common/src/main/scala/org/apache/celeborn/common/util/PbSerDeUtils.scala 
b/common/src/main/scala/org/apache/celeborn/common/util/PbSerDeUtils.scala
index e50f76350..d577ec9f2 100644
--- a/common/src/main/scala/org/apache/celeborn/common/util/PbSerDeUtils.scala
+++ b/common/src/main/scala/org/apache/celeborn/common/util/PbSerDeUtils.scala
@@ -26,7 +26,7 @@ import scala.collection.JavaConverters._
 import com.google.protobuf.InvalidProtocolBufferException
 
 import org.apache.celeborn.common.identity.UserIdentifier
-import org.apache.celeborn.common.meta.{AppDiskUsage, AppDiskUsageSnapShot, 
ApplicationMeta, DiskFileInfo, DiskInfo, MapFileMeta, ReduceFileMeta, 
WorkerEventInfo, WorkerInfo, WorkerStatus}
+import org.apache.celeborn.common.meta.{AppDiskUsage, AppDiskUsageSnapShot, 
ApplicationMeta, DeviceInfo, DiskFileInfo, DiskInfo, MapFileMeta, 
ReduceFileMeta, WorkerEventInfo, WorkerInfo, WorkerStatus}
 import org.apache.celeborn.common.protocol._
 import org.apache.celeborn.common.protocol.PartitionLocation.Mode
 import 
org.apache.celeborn.common.protocol.message.ControlMessages.WorkerResource
@@ -131,7 +131,8 @@ object PbSerDeUtils {
   @throws[InvalidProtocolBufferException]
   def fromPbFileInfoMap(
       data: Array[Byte],
-      cache: ConcurrentHashMap[String, UserIdentifier]): 
ConcurrentHashMap[String, DiskFileInfo] = {
+      cache: ConcurrentHashMap[String, UserIdentifier],
+      mountPoints: util.HashSet[String]): ConcurrentHashMap[String, 
DiskFileInfo] = {
     val pbFileInfoMap = PbFileInfoMap.parseFrom(data)
     val fileInfoMap = JavaUtils.newConcurrentHashMap[String, DiskFileInfo]
     pbFileInfoMap.getValuesMap.entrySet().asScala.foreach { entry =>
@@ -141,10 +142,16 @@ object PbSerDeUtils {
       val userIdentifierKey = pbUserIdentifier.getTenantId + "-" + 
pbUserIdentifier.getName
       if (!cache.containsKey(userIdentifierKey)) {
         val fileInfo = fromPbFileInfo(pbFileInfo)
+        if (fileInfo.getFileMeta.isInstanceOf[MapFileMeta]) {
+          
fileInfo.setMountPoint(DeviceInfo.getMountPoint(fileInfo.getFilePath, 
mountPoints))
+        }
         cache.put(userIdentifierKey, fileInfo.getUserIdentifier)
         fileInfoMap.put(fileName, fileInfo)
       } else {
         val fileInfo = fromPbFileInfo(pbFileInfo, cache.get(userIdentifierKey))
+        if (fileInfo.getFileMeta.isInstanceOf[MapFileMeta]) {
+          
fileInfo.setMountPoint(DeviceInfo.getMountPoint(fileInfo.getFilePath, 
mountPoints))
+        }
         fileInfoMap.put(fileName, fileInfo)
       }
     }
diff --git 
a/common/src/test/scala/org/apache/celeborn/common/util/PbSerDeUtilsTest.scala 
b/common/src/test/scala/org/apache/celeborn/common/util/PbSerDeUtilsTest.scala
index d2eea050e..485662cda 100644
--- 
a/common/src/test/scala/org/apache/celeborn/common/util/PbSerDeUtilsTest.scala
+++ 
b/common/src/test/scala/org/apache/celeborn/common/util/PbSerDeUtilsTest.scala
@@ -27,7 +27,7 @@ import 
org.apache.hadoop.shaded.org.apache.commons.lang3.RandomStringUtils
 
 import org.apache.celeborn.CelebornFunSuite
 import org.apache.celeborn.common.identity.UserIdentifier
-import org.apache.celeborn.common.meta.{ApplicationMeta, DeviceInfo, 
DiskFileInfo, DiskInfo, FileInfo, ReduceFileMeta, WorkerEventInfo, WorkerInfo, 
WorkerStatus}
+import org.apache.celeborn.common.meta.{ApplicationMeta, DeviceInfo, 
DiskFileInfo, DiskInfo, MapFileMeta, ReduceFileMeta, WorkerEventInfo, 
WorkerInfo, WorkerStatus}
 import org.apache.celeborn.common.protocol.{PartitionLocation, 
PbPackedWorkerResource, PbWorkerResource, StorageInfo}
 import 
org.apache.celeborn.common.protocol.message.ControlMessages.WorkerResource
 import org.apache.celeborn.common.quota.ResourceConsumption
@@ -75,9 +75,28 @@ class PbSerDeUtilsTest extends CelebornFunSuite {
     new ReduceFileMeta(chunkOffsets2, 123),
     file2.getAbsolutePath,
     6000L)
+  val mapFileInfo1 = new DiskFileInfo(
+    userIdentifier1,
+    true,
+    new MapFileMeta(1024, 10),
+    file1.getAbsolutePath,
+    6000L)
+  val mapFileInfo2 = new DiskFileInfo(
+    userIdentifier2,
+    true,
+    new MapFileMeta(1024, 10),
+    file2.getAbsolutePath,
+    6000L)
   val fileInfoMap = JavaUtils.newConcurrentHashMap[String, DiskFileInfo]()
+  mapFileInfo1.setMountPoint("/mnt")
+  mapFileInfo2.setMountPoint("/mnt")
+
   fileInfoMap.put("file1", fileInfo1)
   fileInfoMap.put("file2", fileInfo2)
+  fileInfoMap.put("mapFile1", mapFileInfo1)
+  fileInfoMap.put("mapFile2", mapFileInfo2)
+  val mountPoints = new util.HashSet[String]
+  mountPoints.add("/mnt")
   val cache = JavaUtils.newConcurrentHashMap[String, UserIdentifier]()
 
   val resourceConsumption1 = ResourceConsumption(1000, 2000, 3000, 4000)
@@ -204,7 +223,7 @@ class PbSerDeUtilsTest extends CelebornFunSuite {
 
   test("fromAndToPbFileInfoMap") {
     val pbFileInfoMap = PbSerDeUtils.toPbFileInfoMap(fileInfoMap)
-    val restoredFileInfoMap = PbSerDeUtils.fromPbFileInfoMap(pbFileInfoMap, 
cache)
+    val restoredFileInfoMap = PbSerDeUtils.fromPbFileInfoMap(pbFileInfoMap, 
cache, mountPoints)
     val restoredFileInfo1 = restoredFileInfoMap.get("file1")
     val restoredFileInfo2 = restoredFileInfoMap.get("file2")
 
@@ -221,6 +240,16 @@ class PbSerDeUtilsTest extends CelebornFunSuite {
     
assert(restoredFileInfo2.getUserIdentifier.equals(fileInfo2.getUserIdentifier))
   }
 
+  test("fromAndToPBFileInfoMapMountPoint") {
+    val pbFileInfoMap = PbSerDeUtils.toPbFileInfoMap(fileInfoMap)
+    val restoredFileInfoMap = PbSerDeUtils.fromPbFileInfoMap(pbFileInfoMap, 
cache, mountPoints)
+    val restoredFileInfo1 = restoredFileInfoMap.get("mapFile1")
+    val restoredFileInfo2 = restoredFileInfoMap.get("mapFile2")
+
+    assert(restoredFileInfo1.getMountPoint.equals(mapFileInfo1.getMountPoint))
+    assert(restoredFileInfo2.getMountPoint.equals(mapFileInfo2.getMountPoint))
+  }
+
   test("fromAndToPbUserIdentifier") {
     val pbUserIdentifier = PbSerDeUtils.toPbUserIdentifier(userIdentifier1)
     val restoredUserIdentifier = 
PbSerDeUtils.fromPbUserIdentifier(pbUserIdentifier)
diff --git 
a/worker/src/main/scala/org/apache/celeborn/service/deploy/worker/storage/StorageManager.scala
 
b/worker/src/main/scala/org/apache/celeborn/service/deploy/worker/storage/StorageManager.scala
index 32f4ea5c7..1becfed76 100644
--- 
a/worker/src/main/scala/org/apache/celeborn/service/deploy/worker/storage/StorageManager.scala
+++ 
b/worker/src/main/scala/org/apache/celeborn/service/deploy/worker/storage/StorageManager.scala
@@ -281,7 +281,7 @@ final private[worker] class StorageManager(conf: 
CelebornConf, workerSource: Abs
         if (key.startsWith(SHUFFLE_KEY_PREFIX)) {
           val shuffleKey = parseDbShuffleKey(key)
           try {
-            val files = PbSerDeUtils.fromPbFileInfoMap(entry.getValue, cache)
+            val files = PbSerDeUtils.fromPbFileInfoMap(entry.getValue, cache, 
mountPoints)
             logDebug(s"Reload DB: $shuffleKey -> $files")
             diskFileInfos.put(shuffleKey, files)
             committedFileInfos.put(shuffleKey, files)

Reply via email to