RexXiong commented on code in PR #3353:
URL: https://github.com/apache/celeborn/pull/3353#discussion_r2192760175
##########
common/src/main/scala/org/apache/celeborn/common/CelebornConf.scala:
##########
@@ -1241,6 +1241,30 @@ class CelebornConf(loadDefaults: Boolean) extends
Cloneable with Logging with Se
}.getOrElse("")
}
+ def remoteStorageDirs: Option[Set[(StorageInfo.Type, String)]] = {
+ val supported = Seq(
+ (StorageInfo.Type.HDFS, HDFS_DIR, Utils.isHdfsPath _),
+ (StorageInfo.Type.S3, S3_DIR, Utils.isS3Path _),
+ (StorageInfo.Type.OSS, OSS_DIR, Utils.isOssPath _))
+
+ val activeNames = get(ACTIVE_STORAGE_TYPES)
+
+ val validDirs = supported.flatMap { case (ty, e, checker) =>
+ if (!activeNames.contains(ty.name)) None
Review Comment:
Split activeNames to List then we can contain the exactly storage type
##########
worker/src/main/scala/org/apache/celeborn/service/deploy/worker/storage/StorageManager.scala:
##########
@@ -86,27 +85,20 @@ final private[worker] class StorageManager(conf:
CelebornConf, workerSource: Abs
(new File(workdir, conf.workerWorkingDir), maxSpace, flusherThread,
storageType)
}
- if (workingDirInfos.size <= 0 && !hasHDFSStorage && !hasS3Storage &&
!hasOssStorage) {
+ if (workingDirInfos.size <= 0 && remoteStorageDirs.isEmpty) {
throw new IOException("Empty working directory configuration!")
}
DeviceInfo.getDeviceAndDiskInfos(workingDirInfos, conf)
}
val mountPoints = new util.HashSet[String](diskInfos.keySet())
- val hdfsDiskInfo =
- if (conf.hasHDFSStorage)
- Option(new DiskInfo("HDFS", Long.MaxValue, 999999, 999999, 0,
StorageInfo.Type.HDFS))
- else None
-
- val s3DiskInfo =
- if (conf.hasS3Storage)
- Option(new DiskInfo("S3", Long.MaxValue, 999999, 999999, 0,
StorageInfo.Type.S3))
- else None
- val ossDiskInfo =
- if (conf.hasOssStorage)
- Option(new DiskInfo("OSS", Long.MaxValue, 999999, 999999, 0,
StorageInfo.Type.OSS))
- else None
+ val remoteDiskInfos: Option[Set[DiskInfo]] = remoteStorageDirs.flatMap {
dirs =>
+ val diskInfoSet = dirs.flatMap { case (storageInfoType, _) =>
Review Comment:
dirs.map is better?
##########
master/src/main/java/org/apache/celeborn/service/deploy/master/clustermeta/AbstractMetaManager.java:
##########
@@ -287,20 +288,14 @@ public void updateWorkerHeartbeatMeta(
long unhealthyDiskNum =
disks.values().stream().filter(s ->
!s.status().equals(DiskStatus.HEALTHY)).count();
boolean exceed = unhealthyDiskNum * 1.0 / disks.size() >=
unhealthyDiskRatioThreshold;
+ Option<scala.collection.immutable.Set<Tuple2<StorageInfo.Type, String>>>
remoteStorageDirs =
Review Comment:
Let's introduce a final variable for `conf.remoteStorageDirs().isDefined()`
--
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.
To unsubscribe, e-mail: [email protected]
For queries about this service, please contact Infrastructure at:
[email protected]