This is an automated email from the ASF dual-hosted git repository.
nicholasjiang pushed a commit to branch branch-0.5
in repository https://gitbox.apache.org/repos/asf/celeborn.git
The following commit(s) were added to refs/heads/branch-0.5 by this push:
new 10530faf3 [CELEBORN-1692] Set mount point in fromPbFileInfoMap
10530faf3 is described below
commit 10530faf3c96c507e26959a0a7ae97bb8dc58ba0
Author: Weijie Guo <[email protected]>
AuthorDate: Thu Nov 7 14:12:42 2024 +0800
[CELEBORN-1692] Set mount point in fromPbFileInfoMap
---
.../apache/celeborn/common/util/PbSerDeUtils.scala | 11 ++++++--
.../celeborn/common/util/PbSerDeUtilsTest.scala | 33 ++++++++++++++++++++--
.../deploy/worker/storage/StorageManager.scala | 2 +-
3 files changed, 41 insertions(+), 5 deletions(-)
diff --git
a/common/src/main/scala/org/apache/celeborn/common/util/PbSerDeUtils.scala
b/common/src/main/scala/org/apache/celeborn/common/util/PbSerDeUtils.scala
index e50f76350..d577ec9f2 100644
--- a/common/src/main/scala/org/apache/celeborn/common/util/PbSerDeUtils.scala
+++ b/common/src/main/scala/org/apache/celeborn/common/util/PbSerDeUtils.scala
@@ -26,7 +26,7 @@ import scala.collection.JavaConverters._
import com.google.protobuf.InvalidProtocolBufferException
import org.apache.celeborn.common.identity.UserIdentifier
-import org.apache.celeborn.common.meta.{AppDiskUsage, AppDiskUsageSnapShot,
ApplicationMeta, DiskFileInfo, DiskInfo, MapFileMeta, ReduceFileMeta,
WorkerEventInfo, WorkerInfo, WorkerStatus}
+import org.apache.celeborn.common.meta.{AppDiskUsage, AppDiskUsageSnapShot,
ApplicationMeta, DeviceInfo, DiskFileInfo, DiskInfo, MapFileMeta,
ReduceFileMeta, WorkerEventInfo, WorkerInfo, WorkerStatus}
import org.apache.celeborn.common.protocol._
import org.apache.celeborn.common.protocol.PartitionLocation.Mode
import
org.apache.celeborn.common.protocol.message.ControlMessages.WorkerResource
@@ -131,7 +131,8 @@ object PbSerDeUtils {
@throws[InvalidProtocolBufferException]
def fromPbFileInfoMap(
data: Array[Byte],
- cache: ConcurrentHashMap[String, UserIdentifier]):
ConcurrentHashMap[String, DiskFileInfo] = {
+ cache: ConcurrentHashMap[String, UserIdentifier],
+ mountPoints: util.HashSet[String]): ConcurrentHashMap[String,
DiskFileInfo] = {
val pbFileInfoMap = PbFileInfoMap.parseFrom(data)
val fileInfoMap = JavaUtils.newConcurrentHashMap[String, DiskFileInfo]
pbFileInfoMap.getValuesMap.entrySet().asScala.foreach { entry =>
@@ -141,10 +142,16 @@ object PbSerDeUtils {
val userIdentifierKey = pbUserIdentifier.getTenantId + "-" +
pbUserIdentifier.getName
if (!cache.containsKey(userIdentifierKey)) {
val fileInfo = fromPbFileInfo(pbFileInfo)
+ if (fileInfo.getFileMeta.isInstanceOf[MapFileMeta]) {
+
fileInfo.setMountPoint(DeviceInfo.getMountPoint(fileInfo.getFilePath,
mountPoints))
+ }
cache.put(userIdentifierKey, fileInfo.getUserIdentifier)
fileInfoMap.put(fileName, fileInfo)
} else {
val fileInfo = fromPbFileInfo(pbFileInfo, cache.get(userIdentifierKey))
+ if (fileInfo.getFileMeta.isInstanceOf[MapFileMeta]) {
+
fileInfo.setMountPoint(DeviceInfo.getMountPoint(fileInfo.getFilePath,
mountPoints))
+ }
fileInfoMap.put(fileName, fileInfo)
}
}
diff --git
a/common/src/test/scala/org/apache/celeborn/common/util/PbSerDeUtilsTest.scala
b/common/src/test/scala/org/apache/celeborn/common/util/PbSerDeUtilsTest.scala
index d2eea050e..485662cda 100644
---
a/common/src/test/scala/org/apache/celeborn/common/util/PbSerDeUtilsTest.scala
+++
b/common/src/test/scala/org/apache/celeborn/common/util/PbSerDeUtilsTest.scala
@@ -27,7 +27,7 @@ import
org.apache.hadoop.shaded.org.apache.commons.lang3.RandomStringUtils
import org.apache.celeborn.CelebornFunSuite
import org.apache.celeborn.common.identity.UserIdentifier
-import org.apache.celeborn.common.meta.{ApplicationMeta, DeviceInfo,
DiskFileInfo, DiskInfo, FileInfo, ReduceFileMeta, WorkerEventInfo, WorkerInfo,
WorkerStatus}
+import org.apache.celeborn.common.meta.{ApplicationMeta, DeviceInfo,
DiskFileInfo, DiskInfo, MapFileMeta, ReduceFileMeta, WorkerEventInfo,
WorkerInfo, WorkerStatus}
import org.apache.celeborn.common.protocol.{PartitionLocation,
PbPackedWorkerResource, PbWorkerResource, StorageInfo}
import
org.apache.celeborn.common.protocol.message.ControlMessages.WorkerResource
import org.apache.celeborn.common.quota.ResourceConsumption
@@ -75,9 +75,28 @@ class PbSerDeUtilsTest extends CelebornFunSuite {
new ReduceFileMeta(chunkOffsets2, 123),
file2.getAbsolutePath,
6000L)
+ val mapFileInfo1 = new DiskFileInfo(
+ userIdentifier1,
+ true,
+ new MapFileMeta(1024, 10),
+ file1.getAbsolutePath,
+ 6000L)
+ val mapFileInfo2 = new DiskFileInfo(
+ userIdentifier2,
+ true,
+ new MapFileMeta(1024, 10),
+ file2.getAbsolutePath,
+ 6000L)
val fileInfoMap = JavaUtils.newConcurrentHashMap[String, DiskFileInfo]()
+ mapFileInfo1.setMountPoint("/mnt")
+ mapFileInfo2.setMountPoint("/mnt")
+
fileInfoMap.put("file1", fileInfo1)
fileInfoMap.put("file2", fileInfo2)
+ fileInfoMap.put("mapFile1", mapFileInfo1)
+ fileInfoMap.put("mapFile2", mapFileInfo2)
+ val mountPoints = new util.HashSet[String]
+ mountPoints.add("/mnt")
val cache = JavaUtils.newConcurrentHashMap[String, UserIdentifier]()
val resourceConsumption1 = ResourceConsumption(1000, 2000, 3000, 4000)
@@ -204,7 +223,7 @@ class PbSerDeUtilsTest extends CelebornFunSuite {
test("fromAndToPbFileInfoMap") {
val pbFileInfoMap = PbSerDeUtils.toPbFileInfoMap(fileInfoMap)
- val restoredFileInfoMap = PbSerDeUtils.fromPbFileInfoMap(pbFileInfoMap,
cache)
+ val restoredFileInfoMap = PbSerDeUtils.fromPbFileInfoMap(pbFileInfoMap,
cache, mountPoints)
val restoredFileInfo1 = restoredFileInfoMap.get("file1")
val restoredFileInfo2 = restoredFileInfoMap.get("file2")
@@ -221,6 +240,16 @@ class PbSerDeUtilsTest extends CelebornFunSuite {
assert(restoredFileInfo2.getUserIdentifier.equals(fileInfo2.getUserIdentifier))
}
+ test("fromAndToPBFileInfoMapMountPoint") {
+ val pbFileInfoMap = PbSerDeUtils.toPbFileInfoMap(fileInfoMap)
+ val restoredFileInfoMap = PbSerDeUtils.fromPbFileInfoMap(pbFileInfoMap,
cache, mountPoints)
+ val restoredFileInfo1 = restoredFileInfoMap.get("mapFile1")
+ val restoredFileInfo2 = restoredFileInfoMap.get("mapFile2")
+
+ assert(restoredFileInfo1.getMountPoint.equals(mapFileInfo1.getMountPoint))
+ assert(restoredFileInfo2.getMountPoint.equals(mapFileInfo2.getMountPoint))
+ }
+
test("fromAndToPbUserIdentifier") {
val pbUserIdentifier = PbSerDeUtils.toPbUserIdentifier(userIdentifier1)
val restoredUserIdentifier =
PbSerDeUtils.fromPbUserIdentifier(pbUserIdentifier)
diff --git
a/worker/src/main/scala/org/apache/celeborn/service/deploy/worker/storage/StorageManager.scala
b/worker/src/main/scala/org/apache/celeborn/service/deploy/worker/storage/StorageManager.scala
index 32f4ea5c7..1becfed76 100644
---
a/worker/src/main/scala/org/apache/celeborn/service/deploy/worker/storage/StorageManager.scala
+++
b/worker/src/main/scala/org/apache/celeborn/service/deploy/worker/storage/StorageManager.scala
@@ -281,7 +281,7 @@ final private[worker] class StorageManager(conf:
CelebornConf, workerSource: Abs
if (key.startsWith(SHUFFLE_KEY_PREFIX)) {
val shuffleKey = parseDbShuffleKey(key)
try {
- val files = PbSerDeUtils.fromPbFileInfoMap(entry.getValue, cache)
+ val files = PbSerDeUtils.fromPbFileInfoMap(entry.getValue, cache,
mountPoints)
logDebug(s"Reload DB: $shuffleKey -> $files")
diskFileInfos.put(shuffleKey, files)
committedFileInfos.put(shuffleKey, files)