Copilot commented on code in PR #3597:
URL: https://github.com/apache/celeborn/pull/3597#discussion_r2778562928
##########
worker/src/main/scala/org/apache/celeborn/service/deploy/worker/Worker.scala:
##########
@@ -514,10 +513,10 @@ private[celeborn] class Worker(
activeShuffleKeys.addAll(partitionLocationInfo.shuffleKeySet)
activeShuffleKeys.addAll(storageManager.shuffleKeySet())
storageManager.updateDiskInfos()
- val diskInfos =
- workerInfo.updateThenGetDiskInfos(storageManager.disksSnapshot().map {
disk =>
- disk.mountPoint -> disk
- }.toMap.asJava).values().asScala.toSeq ++
storageManager.remoteDiskInfos.getOrElse(Set.empty)
+ val currentDiskMap = storageManager.localDisksSnapshot().map { disk =>
Review Comment:
Heartbeat updates only pass `localDisksSnapshot()` into
`workerInfo.updateThenGetDiskInfos(...)`. Since `updateThenGetDiskInfos`
removes mount points missing from the update map, this will drop the remote
disk entries that were added at registration, so remote disks stop being
reported after the first heartbeat. Build the update map from
`allDisksSnapshot()` (or merge in `remoteDiskInfos`) so remote disks remain
present.
```suggestion
val currentDiskMap = storageManager.allDisksSnapshot().map { disk =>
```
##########
master/src/main/java/org/apache/celeborn/service/deploy/master/SlotsAllocator.java:
##########
@@ -69,32 +77,11 @@ static class UsableDiskInfo {
Map<WorkerInfo, List<UsableDiskInfo>> slotsRestrictions = new HashMap<>();
for (WorkerInfo worker : workers) {
List<UsableDiskInfo> usableDisks =
- slotsRestrictions.computeIfAbsent(worker, v -> new ArrayList<>());
- for (Map.Entry<String, DiskInfo> diskInfoEntry :
worker.diskInfos().entrySet()) {
- if (diskInfoEntry.getValue().status().equals(DiskStatus.HEALTHY)) {
- if (StorageInfo.localDiskAvailable(availableStorageTypes)
- && diskInfoEntry.getValue().storageType() !=
StorageInfo.Type.HDFS
- && diskInfoEntry.getValue().storageType() != StorageInfo.Type.S3
- && diskInfoEntry.getValue().storageType() !=
StorageInfo.Type.OSS) {
- usableDisks.add(
- new UsableDiskInfo(
- diskInfoEntry.getValue(),
diskInfoEntry.getValue().getAvailableSlots()));
- } else if (StorageInfo.HDFSAvailable(availableStorageTypes)
- && diskInfoEntry.getValue().storageType() ==
StorageInfo.Type.HDFS) {
- usableDisks.add(
- new UsableDiskInfo(
- diskInfoEntry.getValue(),
diskInfoEntry.getValue().getAvailableSlots()));
- } else if (StorageInfo.S3Available(availableStorageTypes)
- && diskInfoEntry.getValue().storageType() ==
StorageInfo.Type.S3) {
- usableDisks.add(
- new UsableDiskInfo(
- diskInfoEntry.getValue(),
diskInfoEntry.getValue().getAvailableSlots()));
- } else if (StorageInfo.OSSAvailable(availableStorageTypes)
- && diskInfoEntry.getValue().storageType() ==
StorageInfo.Type.OSS) {
- usableDisks.add(
- new UsableDiskInfo(
- diskInfoEntry.getValue(),
diskInfoEntry.getValue().availableSlots()));
- }
+ slotsRestrictions.computeIfAbsent(worker, v -> new LinkedList<>());
+ for (DiskInfo diskInfo : worker.diskInfos().values()) {
+ if (diskInfo.status().equals(DiskStatus.HEALTHY)
+ && StorageInfo.isAvailable(diskInfo.storageType(),
availableStorageTypes)) {
+ usableDisks.add(new UsableDiskInfo(diskInfo));
}
Review Comment:
`usableDisks` is created as a `LinkedList`, but later code accesses it by
index via `usableDiskInfos.get(diskIndex)` in `getStorageInfo(...)`. With
`LinkedList` this becomes O(n) per access and can significantly slow slot
allocation for large numbers of partitions/disks. Use an `ArrayList` for
`usableDisks` here.
##########
worker/src/main/scala/org/apache/celeborn/service/deploy/worker/storage/StorageManager.scala:
##########
@@ -102,19 +102,23 @@ final private[worker] class StorageManager(conf:
CelebornConf, workerSource: Abs
if (diskInfoSet.nonEmpty) Some(diskInfoSet) else None
}
- def disksSnapshot(): List[DiskInfo] = {
+ def localDisksSnapshot(): List[DiskInfo] = {
diskInfos.synchronized {
val disks = new util.ArrayList[DiskInfo](diskInfos.values())
disks.asScala.toList
}
}
- def healthyWorkingDirs(): List[File] =
- disksSnapshot().filter(_.status == DiskStatus.HEALTHY).flatMap(_.dirs)
+ def allDisksSnapshot(): List[DiskInfo] = {
+ localDisksSnapshot() ++ remoteDiskInfos.getOrElse(Nil)
Review Comment:
`allDisksSnapshot()` currently won’t compile: `remoteDiskInfos` is an
`Option[Set[DiskInfo]]`, but `getOrElse(Nil)` returns a `List` (`Nil`). Use
`getOrElse(Set.empty)` (and optionally `.toList`) or change `remoteDiskInfos`
to a sequence type so the concatenation is type-safe.
```suggestion
localDisksSnapshot() ++
remoteDiskInfos.getOrElse(Set.empty[DiskInfo]).toList
```
##########
common/src/main/scala/org/apache/celeborn/common/meta/WorkerInfo.scala:
##########
@@ -214,25 +214,21 @@ class WorkerInfo(
for (newDisk <- newDiskInfos.values().asScala) {
val mountPoint: String = newDisk.mountPoint
val curDisk = diskInfos.get(mountPoint)
+ if (estimatedPartitionSize.nonEmpty && !newDisk.storageType.isDFS) {
+ newDisk.maxSlots = newDisk.totalSpace / estimatedPartitionSize.get
+ newDisk.availableSlots = newDisk.actualUsableSpace /
estimatedPartitionSize.get
+ }
if (curDisk != null) {
curDisk.actualUsableSpace = newDisk.actualUsableSpace
curDisk.totalSpace = newDisk.totalSpace
// Update master's diskinfo activeslots to worker's value
curDisk.activeSlots = newDisk.activeSlots
curDisk.avgFlushTime = newDisk.avgFlushTime
curDisk.avgFetchTime = newDisk.avgFetchTime
- if (estimatedPartitionSize.nonEmpty && curDisk.storageType !=
StorageInfo.Type.HDFS
- && curDisk.storageType != StorageInfo.Type.S3 &&
curDisk.storageType != StorageInfo.Type.OSS) {
- curDisk.maxSlots = curDisk.totalSpace / estimatedPartitionSize.get
- curDisk.availableSlots = curDisk.actualUsableSpace /
estimatedPartitionSize.get
- }
+ curDisk.maxSlots = newDisk.maxSlots
+ curDisk.availableSlots = newDisk.availableSlots
Review Comment:
`updateThenGetDiskInfos` now copies `newDisk.maxSlots/availableSlots` into
the existing `curDisk` for all disk types. For DFS/remote disks, the
worker-side `DiskInfo` often has `maxSlots/availableSlots` left at default (0),
while the master computes usable slots during registration via
`updateDiskSlots(...)`. This change can therefore zero out the master-computed
slots for remote disks on the first heartbeat, making them unusable. Consider
recomputing slots on the master for DFS disks as well, or avoid overwriting
`curDisk.maxSlots/availableSlots` for `storageType.isDFS` disks unless the
worker provides meaningful values.
```suggestion
// Only overwrite slot-related fields when worker reports
meaningful values.
if (newDisk.maxSlots > 0) {
curDisk.maxSlots = newDisk.maxSlots
}
if (newDisk.availableSlots > 0) {
curDisk.availableSlots = newDisk.availableSlots
}
```
##########
master/src/main/java/org/apache/celeborn/service/deploy/master/SlotsAllocator.java:
##########
@@ -157,9 +144,7 @@ static class UsableDiskInfo {
diskToWorkerMap.put(diskInfo, i);
if (diskInfo.actualUsableSpace() > 0
&& diskInfo.status().equals(DiskStatus.HEALTHY)
- && diskInfo.storageType() != StorageInfo.Type.HDFS
- && diskInfo.storageType() != StorageInfo.Type.S3
- && diskInfo.storageType() != StorageInfo.Type.OSS) {
+ && !diskInfo.storageType().isDFS()) {
usableDisks.add(diskInfo);
}
}));
Review Comment:
`usableDisks` was switched to `LinkedList`, but `placeDisksToGroups(...)`
sorts it and repeatedly uses `subList(start, end)`. With `LinkedList`, these
operations have extra traversal overhead compared to `ArrayList`. Unless
there’s a specific need for O(1) inserts in the middle (not apparent here),
prefer `ArrayList` for better performance.
--
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.
To unsubscribe, e-mail: [email protected]
For queries about this service, please contact Infrastructure at:
[email protected]