This is an automated email from the ASF dual-hosted git repository.
ethanfeng pushed a commit to branch main
in repository https://gitbox.apache.org/repos/asf/incubator-celeborn.git
The following commit(s) were added to refs/heads/main by this push:
new 60871750e [CELEBORN-1136] Support policy for master to assign slots
fallback to roundrobin with no available slots
60871750e is described below
commit 60871750e4c21360054584c4ba5f10809ee172f2
Author: SteNicholas <[email protected]>
AuthorDate: Wed Nov 22 14:08:06 2023 +0800
[CELEBORN-1136] Support policy for master to assign slots fallback to
roundrobin with no available slots
### What changes were proposed in this pull request?
`SlotsAllocator` supports policy for master to assign slots fallback to
roundrobin with no available slots.
### Why are the changes needed?
When the selected workers have no available slots, the loadaware policy
could throw `MasterNotLeaderException`. It's recommended to support policy for
master to assign slots fallback to roundrobin with no available slots.
Meanwhile, the situation that there is no available slots would occur when the
partition size has increased a lot in a short period of time.
```
Caused by: org.apache.celeborn.common.haclient.MasterNotLeaderException:
Master:xx.xx.xx.xx:9099 is not the leader. Suggested leader is
Master:xx.xx.xx.xx:9099. Exception:bound must be positive.
at
org.apache.celeborn.service.deploy.master.clustermeta.ha.HAHelper.sendFailure(HAHelper.java:58)
at
org.apache.celeborn.service.deploy.master.Master.executeWithLeaderChecker(Master.scala:236)
at
org.apache.celeborn.service.deploy.master.Master$$anonfun$receiveAndReply$1.applyOrElse(Master.scala:314)
... 7 more
Caused by: java.lang.IllegalArgumentException: bound must be positive
at java.util.Random.nextInt(Random.java:388)
at
org.apache.celeborn.service.deploy.master.SlotsAllocator.roundRobin(SlotsAllocator.java:202)
at
org.apache.celeborn.service.deploy.master.SlotsAllocator.offerSlotsLoadAware(SlotsAllocator.java:151)
at
org.apache.celeborn.service.deploy.master.Master.$anonfun$handleRequestSlots$1(Master.scala:598)
at
org.apache.celeborn.common.metrics.source.AbstractSource.sample(AbstractSource.scala:199)
at
org.apache.celeborn.common.metrics.source.AbstractSource.sample(AbstractSource.scala:189)
at
org.apache.celeborn.service.deploy.master.Master.handleRequestSlots(Master.scala:587)
at
org.apache.celeborn.service.deploy.master.Master$$anonfun$receiveAndReply$1.$anonfun$applyOrElse$12(Master.scala:314)
at
scala.runtime.java8.JFunction0$mcV$sp.apply(JFunction0$mcV$sp.java:23)
at
org.apache.celeborn.service.deploy.master.Master.executeWithLeaderChecker(Master.scala:233)
... 8 more
```
### Does this PR introduce _any_ user-facing change?
No.
### How was this patch tested?
`SlotsAllocatorSuiteJ#testAllocateSlotsWithNoAvailableSlots`
Closes #2108 from SteNicholas/CELEBORN-1136.
Authored-by: SteNicholas <[email protected]>
Signed-off-by: mingji <[email protected]>
---
.../scala/org/apache/celeborn/common/meta/DeviceInfo.scala | 11 ++++++++++-
.../celeborn/service/deploy/master/SlotsAllocator.java | 10 ++++++----
.../celeborn/service/deploy/master/SlotsAllocatorSuiteJ.java | 12 ++++++++++++
3 files changed, 28 insertions(+), 5 deletions(-)
diff --git
a/common/src/main/scala/org/apache/celeborn/common/meta/DeviceInfo.scala
b/common/src/main/scala/org/apache/celeborn/common/meta/DeviceInfo.scala
index 6bd49a1a0..56933a2cf 100644
--- a/common/src/main/scala/org/apache/celeborn/common/meta/DeviceInfo.scala
+++ b/common/src/main/scala/org/apache/celeborn/common/meta/DeviceInfo.scala
@@ -92,8 +92,17 @@ class DiskInfo(
avgFetchTime = fetchTimeMetrics.getAverage()
}
+ /**
+ * Returns the available slots of the disk calculated by maxSlots minus
activeSlots.
+ * Returns zero for the negative slots calculated.
+ *
+ * <b>Note:</b>`maxSlots` is calculated by actualUsableSpace divided
estimatedPartitionSize.
+ * Meanwhile, `activeSlots` include slots reserved.
+ *
+ * @return the available slots of the disk.
+ */
def availableSlots(): Long = this.synchronized {
- maxSlots - activeSlots
+ math.max(maxSlots - activeSlots, 0L)
}
def allocateSlots(shuffleKey: String, slots: Int): Unit = this.synchronized {
diff --git
a/master/src/main/java/org/apache/celeborn/service/deploy/master/SlotsAllocator.java
b/master/src/main/java/org/apache/celeborn/service/deploy/master/SlotsAllocator.java
index 2ee40de63..e964e57e5 100644
---
a/master/src/main/java/org/apache/celeborn/service/deploy/master/SlotsAllocator.java
+++
b/master/src/main/java/org/apache/celeborn/service/deploy/master/SlotsAllocator.java
@@ -131,16 +131,18 @@ public class SlotsAllocator {
}
}));
- boolean shouldFallback =
+ boolean noUsableDisks =
usableDisks.isEmpty()
|| (shouldReplicate
&& (usableDisks.size() == 1
||
usableDisks.stream().map(diskToWorkerMap::get).distinct().count() <= 1));
+ boolean noAvailableSlots =
usableDisks.stream().mapToLong(DiskInfo::availableSlots).sum() <= 0;
- if (shouldFallback) {
+ if (noUsableDisks || noAvailableSlots) {
logger.warn(
- "offer slots for {} fallback to roundrobin because there is no
usable disks",
- StringUtils.join(partitionIds, ','));
+ "offer slots for {} fallback to roundrobin because there is no {}",
+ StringUtils.join(partitionIds, ','),
+ noUsableDisks ? "usable disks" : "available slots");
return offerSlotsRoundRobin(
workers, partitionIds, shouldReplicate, shouldRackAware,
availableStorageTypes);
}
diff --git
a/master/src/test/java/org/apache/celeborn/service/deploy/master/SlotsAllocatorSuiteJ.java
b/master/src/test/java/org/apache/celeborn/service/deploy/master/SlotsAllocatorSuiteJ.java
index 3300f6de2..92f2ecfbd 100644
---
a/master/src/test/java/org/apache/celeborn/service/deploy/master/SlotsAllocatorSuiteJ.java
+++
b/master/src/test/java/org/apache/celeborn/service/deploy/master/SlotsAllocatorSuiteJ.java
@@ -334,4 +334,16 @@ public class SlotsAllocatorSuiteJ {
final boolean shouldReplicate = true;
checkSlotsOnHDFS(workers, partitionIds, shouldReplicate, true);
}
+
+ @Test
+ public void testAllocateSlotsWithNoAvailableSlots() {
+ final List<WorkerInfo> workers = prepareWorkers(true);
+ // Simulates no available slots behavior with greatly changed
estimatedPartitionSize for workers
+ // with usable disks.
+ workers.forEach(workerInfo ->
workerInfo.updateDiskMaxSlots(Long.MAX_VALUE));
+ final List<Integer> partitionIds = Collections.singletonList(0);
+ final boolean shouldReplicate = false;
+
+ check(workers, partitionIds, shouldReplicate, true);
+ }
}