This is an automated email from the ASF dual-hosted git repository.
zhouky pushed a commit to branch branch-0.3
in repository https://gitbox.apache.org/repos/asf/incubator-celeborn.git
The following commit(s) were added to refs/heads/branch-0.3 by this push:
new 71ead8784 [CELEBORN-872][MASTER] Extract the same allocation logic for
both loadaware and roundrobin
71ead8784 is described below
commit 71ead8784734eb8fc830cbdeb8d700c70f3808d7
Author: zwangsheng <[email protected]>
AuthorDate: Thu Aug 3 20:14:45 2023 +0800
[CELEBORN-872][MASTER] Extract the same allocation logic for both loadaware
and roundrobin
### What changes were proposed in this pull request?
As title
### Why are the changes needed?
Reduce duplicate code segments, improve code readability and maintenance
difficulty.
### Does this PR introduce _any_ user-facing change?
No
### How was this patch tested?
Unit Test
Closes #1786 from zwangsheng/CELEBORN-872.
Authored-by: zwangsheng <[email protected]>
Signed-off-by: mingji <[email protected]>
---
.../service/deploy/master/SlotsAllocator.java | 74 +++++++++++-----------
1 file changed, 36 insertions(+), 38 deletions(-)
diff --git
a/master/src/main/java/org/apache/celeborn/service/deploy/master/SlotsAllocator.java
b/master/src/main/java/org/apache/celeborn/service/deploy/master/SlotsAllocator.java
index 7d9ffab37..10fb31824 100644
---
a/master/src/main/java/org/apache/celeborn/service/deploy/master/SlotsAllocator.java
+++
b/master/src/main/java/org/apache/celeborn/service/deploy/master/SlotsAllocator.java
@@ -60,8 +60,6 @@ public class SlotsAllocator {
if (workers.size() < 2 && shouldReplicate) {
return new HashMap<>();
}
- Map<WorkerInfo, Tuple2<List<PartitionLocation>, List<PartitionLocation>>>
slots =
- new HashMap<>();
Map<WorkerInfo, List<UsableDiskInfo>> restrictions = new HashMap<>();
for (WorkerInfo worker : workers) {
List<UsableDiskInfo> usableDisks =
@@ -74,16 +72,7 @@ public class SlotsAllocator {
}
}
}
- List<Integer> remain =
- roundRobin(slots, partitionIds, workers, restrictions,
shouldReplicate, shouldRackAware);
- if (!remain.isEmpty()) {
- remain = roundRobin(slots, remain, workers, null, shouldReplicate,
shouldRackAware);
- }
-
- if (!remain.isEmpty()) {
- roundRobin(slots, remain, workers, null, shouldReplicate, false);
- }
- return slots;
+ return locateSlots(partitionIds, workers, restrictions, shouldReplicate,
shouldRackAware);
}
/**
@@ -139,36 +128,12 @@ public class SlotsAllocator {
initLoadAwareAlgorithm(diskGroupCount, diskGroupGradient);
}
- Map<WorkerInfo, List<UsableDiskInfo>> restriction =
+ Map<WorkerInfo, List<UsableDiskInfo>> restrictions =
getRestriction(
placeDisksToGroups(usableDisks, diskGroupCount, flushTimeWeight,
fetchTimeWeight),
diskToWorkerMap,
shouldReplicate ? partitionIds.size() * 2 : partitionIds.size());
-
- Map<WorkerInfo, Tuple2<List<PartitionLocation>, List<PartitionLocation>>>
slots =
- new HashMap<>();
- List<Integer> remainPartitions =
- roundRobin(
- slots,
- partitionIds,
- new ArrayList<>(restriction.keySet()),
- restriction,
- shouldReplicate,
- shouldRackAware);
- if (!remainPartitions.isEmpty()) {
- remainPartitions =
- roundRobin(
- slots,
- remainPartitions,
- new ArrayList<>(workers),
- null,
- shouldReplicate,
- shouldRackAware);
- }
- if (!remainPartitions.isEmpty()) {
- roundRobin(slots, remainPartitions, new ArrayList<>(workers), null,
shouldReplicate, false);
- }
- return slots;
+ return locateSlots(partitionIds, workers, restrictions, shouldReplicate,
shouldRackAware);
}
private static StorageInfo getStorageInfo(
@@ -188,6 +153,39 @@ public class SlotsAllocator {
return storageInfo;
}
+ /**
+ * Progressive locate slots for all partitions <br>
+ * 1. try to allocate for all partitions under restrictions <br>
+ * 2. allocate remain partitions to all workers <br>
+ * 3. allocate remain partitions to all workers again without considering
rack aware <br>
+ */
+ private static Map<WorkerInfo, Tuple2<List<PartitionLocation>,
List<PartitionLocation>>>
+ locateSlots(
+ List<Integer> partitionIds,
+ List<WorkerInfo> workers,
+ Map<WorkerInfo, List<UsableDiskInfo>> restrictions,
+ boolean shouldReplicate,
+ boolean shouldRackAware) {
+ Map<WorkerInfo, Tuple2<List<PartitionLocation>, List<PartitionLocation>>>
slots =
+ new HashMap<>();
+
+ List<Integer> remain =
+ roundRobin(
+ slots,
+ partitionIds,
+ new LinkedList<>(restrictions.keySet()),
+ restrictions,
+ shouldReplicate,
+ shouldRackAware);
+ if (!remain.isEmpty()) {
+ remain = roundRobin(slots, remain, workers, null, shouldReplicate,
shouldRackAware);
+ }
+ if (!remain.isEmpty()) {
+ roundRobin(slots, remain, workers, null, shouldReplicate, false);
+ }
+ return slots;
+ }
+
private static List<Integer> roundRobin(
Map<WorkerInfo, Tuple2<List<PartitionLocation>,
List<PartitionLocation>>> slots,
List<Integer> partitionIds,