Copilot commented on code in PR #3597:
URL: https://github.com/apache/celeborn/pull/3597#discussion_r2778562928


##########
worker/src/main/scala/org/apache/celeborn/service/deploy/worker/Worker.scala:
##########
@@ -514,10 +513,10 @@ private[celeborn] class Worker(
     activeShuffleKeys.addAll(partitionLocationInfo.shuffleKeySet)
     activeShuffleKeys.addAll(storageManager.shuffleKeySet())
     storageManager.updateDiskInfos()
-    val diskInfos =
-      workerInfo.updateThenGetDiskInfos(storageManager.disksSnapshot().map { 
disk =>
-        disk.mountPoint -> disk
-      }.toMap.asJava).values().asScala.toSeq ++ 
storageManager.remoteDiskInfos.getOrElse(Set.empty)
+    val currentDiskMap = storageManager.localDisksSnapshot().map { disk =>

Review Comment:
   Heartbeat updates only pass `localDisksSnapshot()` into 
`workerInfo.updateThenGetDiskInfos(...)`. Since `updateThenGetDiskInfos` 
removes mount points missing from the update map, this will drop the remote 
disk entries that were added at registration, so remote disks stop being 
reported after the first heartbeat. Build the update map from 
`allDisksSnapshot()` (or merge in `remoteDiskInfos`) so remote disks remain 
present.
   ```suggestion
       val currentDiskMap = storageManager.allDisksSnapshot().map { disk =>
   ```



##########
master/src/main/java/org/apache/celeborn/service/deploy/master/SlotsAllocator.java:
##########
@@ -69,32 +77,11 @@ static class UsableDiskInfo {
     Map<WorkerInfo, List<UsableDiskInfo>> slotsRestrictions = new HashMap<>();
     for (WorkerInfo worker : workers) {
       List<UsableDiskInfo> usableDisks =
-          slotsRestrictions.computeIfAbsent(worker, v -> new ArrayList<>());
-      for (Map.Entry<String, DiskInfo> diskInfoEntry : 
worker.diskInfos().entrySet()) {
-        if (diskInfoEntry.getValue().status().equals(DiskStatus.HEALTHY)) {
-          if (StorageInfo.localDiskAvailable(availableStorageTypes)
-              && diskInfoEntry.getValue().storageType() != 
StorageInfo.Type.HDFS
-              && diskInfoEntry.getValue().storageType() != StorageInfo.Type.S3
-              && diskInfoEntry.getValue().storageType() != 
StorageInfo.Type.OSS) {
-            usableDisks.add(
-                new UsableDiskInfo(
-                    diskInfoEntry.getValue(), 
diskInfoEntry.getValue().getAvailableSlots()));
-          } else if (StorageInfo.HDFSAvailable(availableStorageTypes)
-              && diskInfoEntry.getValue().storageType() == 
StorageInfo.Type.HDFS) {
-            usableDisks.add(
-                new UsableDiskInfo(
-                    diskInfoEntry.getValue(), 
diskInfoEntry.getValue().getAvailableSlots()));
-          } else if (StorageInfo.S3Available(availableStorageTypes)
-              && diskInfoEntry.getValue().storageType() == 
StorageInfo.Type.S3) {
-            usableDisks.add(
-                new UsableDiskInfo(
-                    diskInfoEntry.getValue(), 
diskInfoEntry.getValue().getAvailableSlots()));
-          } else if (StorageInfo.OSSAvailable(availableStorageTypes)
-              && diskInfoEntry.getValue().storageType() == 
StorageInfo.Type.OSS) {
-            usableDisks.add(
-                new UsableDiskInfo(
-                    diskInfoEntry.getValue(), 
diskInfoEntry.getValue().availableSlots()));
-          }
+          slotsRestrictions.computeIfAbsent(worker, v -> new LinkedList<>());
+      for (DiskInfo diskInfo : worker.diskInfos().values()) {
+        if (diskInfo.status().equals(DiskStatus.HEALTHY)
+            && StorageInfo.isAvailable(diskInfo.storageType(), 
availableStorageTypes)) {
+          usableDisks.add(new UsableDiskInfo(diskInfo));
         }

Review Comment:
   `usableDisks` is created as a `LinkedList`, but later code accesses it by 
index via `usableDiskInfos.get(diskIndex)` in `getStorageInfo(...)`. With 
`LinkedList` this becomes O(n) per access and can significantly slow slot 
allocation for large numbers of partitions/disks. Use an `ArrayList` for 
`usableDisks` here.



##########
worker/src/main/scala/org/apache/celeborn/service/deploy/worker/storage/StorageManager.scala:
##########
@@ -102,19 +102,23 @@ final private[worker] class StorageManager(conf: 
CelebornConf, workerSource: Abs
     if (diskInfoSet.nonEmpty) Some(diskInfoSet) else None
   }
 
-  def disksSnapshot(): List[DiskInfo] = {
+  def localDisksSnapshot(): List[DiskInfo] = {
     diskInfos.synchronized {
       val disks = new util.ArrayList[DiskInfo](diskInfos.values())
       disks.asScala.toList
     }
   }
 
-  def healthyWorkingDirs(): List[File] =
-    disksSnapshot().filter(_.status == DiskStatus.HEALTHY).flatMap(_.dirs)
+  def allDisksSnapshot(): List[DiskInfo] = {
+    localDisksSnapshot() ++ remoteDiskInfos.getOrElse(Nil)

Review Comment:
   `allDisksSnapshot()` currently won’t compile: `remoteDiskInfos` is an 
`Option[Set[DiskInfo]]`, but `getOrElse(Nil)` returns a `List` (`Nil`). Use 
`getOrElse(Set.empty)` (and optionally `.toList`) or change `remoteDiskInfos` 
to a sequence type so the concatenation is type-safe.
   ```suggestion
       localDisksSnapshot() ++ 
remoteDiskInfos.getOrElse(Set.empty[DiskInfo]).toList
   ```



##########
common/src/main/scala/org/apache/celeborn/common/meta/WorkerInfo.scala:
##########
@@ -214,25 +214,21 @@ class WorkerInfo(
       for (newDisk <- newDiskInfos.values().asScala) {
         val mountPoint: String = newDisk.mountPoint
         val curDisk = diskInfos.get(mountPoint)
+        if (estimatedPartitionSize.nonEmpty && !newDisk.storageType.isDFS) {
+          newDisk.maxSlots = newDisk.totalSpace / estimatedPartitionSize.get
+          newDisk.availableSlots = newDisk.actualUsableSpace / 
estimatedPartitionSize.get
+        }
         if (curDisk != null) {
           curDisk.actualUsableSpace = newDisk.actualUsableSpace
           curDisk.totalSpace = newDisk.totalSpace
           // Update master's diskinfo activeslots to worker's value
           curDisk.activeSlots = newDisk.activeSlots
           curDisk.avgFlushTime = newDisk.avgFlushTime
           curDisk.avgFetchTime = newDisk.avgFetchTime
-          if (estimatedPartitionSize.nonEmpty && curDisk.storageType != 
StorageInfo.Type.HDFS
-            && curDisk.storageType != StorageInfo.Type.S3 && 
curDisk.storageType != StorageInfo.Type.OSS) {
-            curDisk.maxSlots = curDisk.totalSpace / estimatedPartitionSize.get
-            curDisk.availableSlots = curDisk.actualUsableSpace / 
estimatedPartitionSize.get
-          }
+          curDisk.maxSlots = newDisk.maxSlots
+          curDisk.availableSlots = newDisk.availableSlots

Review Comment:
   `updateThenGetDiskInfos` now copies `newDisk.maxSlots/availableSlots` into 
the existing `curDisk` for all disk types. For DFS/remote disks, the 
worker-side `DiskInfo` often has `maxSlots/availableSlots` left at default (0), 
while the master computes usable slots during registration via 
`updateDiskSlots(...)`. This change can therefore zero out the master-computed 
slots for remote disks on the first heartbeat, making them unusable. Consider 
recomputing slots on the master for DFS disks as well, or avoid overwriting 
`curDisk.maxSlots/availableSlots` for `storageType.isDFS` disks unless the 
worker provides meaningful values.
   ```suggestion
             // Only overwrite slot-related fields when worker reports 
meaningful values.
             if (newDisk.maxSlots > 0) {
               curDisk.maxSlots = newDisk.maxSlots
             }
             if (newDisk.availableSlots > 0) {
               curDisk.availableSlots = newDisk.availableSlots
             }
   ```



##########
master/src/main/java/org/apache/celeborn/service/deploy/master/SlotsAllocator.java:
##########
@@ -157,9 +144,7 @@ static class UsableDiskInfo {
                       diskToWorkerMap.put(diskInfo, i);
                       if (diskInfo.actualUsableSpace() > 0
                           && diskInfo.status().equals(DiskStatus.HEALTHY)
-                          && diskInfo.storageType() != StorageInfo.Type.HDFS
-                          && diskInfo.storageType() != StorageInfo.Type.S3
-                          && diskInfo.storageType() != StorageInfo.Type.OSS) {
+                          && !diskInfo.storageType().isDFS()) {
                         usableDisks.add(diskInfo);
                       }
                     }));

Review Comment:
   `usableDisks` was switched to `LinkedList`, but `placeDisksToGroups(...)` 
sorts it and repeatedly uses `subList(start, end)`. With `LinkedList`, these 
operations have extra traversal overhead compared to `ArrayList`. Unless 
there’s a specific need for O(1) inserts in the middle (not apparent here), 
prefer `ArrayList` for better performance.



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: [email protected]

For queries about this service, please contact Infrastructure at:
[email protected]

Reply via email to