turboFei commented on code in PR #2233:
URL:
https://github.com/apache/incubator-celeborn/pull/2233#discussion_r1490201657
##########
worker/src/main/scala/org/apache/celeborn/service/deploy/worker/storage/DeviceMonitor.scala:
##########
@@ -317,4 +319,105 @@ object DeviceMonitor extends Logging {
}
def EmptyMonitor(): DeviceMonitor = EmptyDeviceMonitor
+
+ /**
+ * @param workingDirs array of (workingDir, max usable space, flush thread
count, storage type)
+ * @return it will return two maps
+ * (deviceName -> deviceInfo)
+ * (mount point -> diskInfo)
+ */
+ def getDeviceAndDiskInfos(
+ workingDirs: Seq[(File, Long, Int, StorageInfo.Type)],
+ conf: CelebornConf): (util.Map[String, DeviceInfo], util.Map[String,
DiskInfo]) = {
+ val deviceNameToDeviceInfo = new util.HashMap[String, DeviceInfo]()
+ val mountPointToDeviceInfo = new util.HashMap[String, DeviceInfo]()
+
+ val dfResult = runCommand("df -ah").trim
+ logger.info(s"df result\n$dfResult")
+ // (/dev/vdb, /mnt/disk1)
+ val fsMounts = dfResult
+ .split("[\n\r]")
+ .tail
+ .map(line => {
+ val tokens = line.trim.split("[ \t]+")
+ (tokens.head, tokens.last)
+ })
+
+ // (vda, vdb)
+ val lsBlockResult = runCommand("ls /sys/block/").trim
+ logger.info(s"ls block\n$lsBlockResult")
+ val blocks = lsBlockResult.split("[ \n\r\t]+")
+
+ fsMounts.foreach { case (fileSystem, mountPoint) =>
+ val deviceName = fileSystem.substring(fileSystem.lastIndexOf('/') + 1)
+ var index = -1
+ var maxLength = -1
+ blocks.zipWithIndex.foreach(block => {
+ if (deviceName.startsWith(block._1) && block._1.length > maxLength) {
+ index = block._2
+ maxLength = block._1.length
+ }
+ })
+
+ val newDeviceInfoFunc =
+ new util.function.Function[String, DeviceInfo]() {
+ override def apply(s: String): DeviceInfo = {
+ val deviceInfo = new DeviceInfo(s)
+ if (index < 0) {
+ // device not found in /sys/block/
+ deviceInfo.deviceStatAvailable = false
+ }
+ deviceInfo
+ }
+ }
+
+ val deviceInfo =
+ if (index >= 0) {
+ deviceNameToDeviceInfo.computeIfAbsent(blocks(index),
newDeviceInfoFunc)
+ } else {
+ deviceNameToDeviceInfo.computeIfAbsent(deviceName, newDeviceInfoFunc)
+ }
+ mountPointToDeviceInfo.putIfAbsent(mountPoint, deviceInfo)
+ }
+
+ val retDeviceInfos = JavaUtils.newConcurrentHashMap[String, DeviceInfo]()
+ val retDiskInfos = JavaUtils.newConcurrentHashMap[String, DiskInfo]()
+
+ workingDirs.groupBy { case (dir, _, _, _) =>
+ getMountPoint(dir.getCanonicalPath, mountPointToDeviceInfo.keySet())
+ }.foreach {
+ case (mountPoint, dirs) =>
+ if (mountPoint.nonEmpty) {
+ val deviceInfo = mountPointToDeviceInfo.get(mountPoint)
+ val diskInfo = new DiskInfo(
+ mountPoint,
+ dirs.map(_._1).toList,
+ deviceInfo,
+ conf)
+ val (dir, maxUsableSpace, threadCount, storageType) = dirs(0)
+ diskInfo.configuredUsableSpace = maxUsableSpace
+ diskInfo.threadCount = threadCount
+ diskInfo.storageType = storageType
+ if (readWriteError(conf, dir)) {
+ diskInfo.status = DiskStatus.READ_OR_WRITE_FAILURE
+ }
Review Comment:
just move the code from DeviceInfo.scala to DeviceMonitor.scala and check
the readWriteError here.
--
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.
To unsubscribe, e-mail: [email protected]
For queries about this service, please contact Infrastructure at:
[email protected]