This is an automated email from the ASF dual-hosted git repository.
ethanfeng pushed a commit to branch branch-0.5
in repository https://gitbox.apache.org/repos/asf/celeborn.git
The following commit(s) were added to refs/heads/branch-0.5 by this push:
new 26843ee2e [CELEBORN-1639] Fix SlotsAllocatorRackAwareSuiteJ UT
26843ee2e is described below
commit 26843ee2e6ac427ba9dad9def2e880891b33e132
Author: wankunde <[email protected]>
AuthorDate: Sat Oct 12 17:41:40 2024 +0800
[CELEBORN-1639] Fix SlotsAllocatorRackAwareSuiteJ UT
### What changes were proposed in this pull request?
In offerSlotsRoundRobinWithRackAware and
offerSlotsRoundRobinWithRackAwareWithoutMappingFile method of
SlotsAllocatorRackAwareSuiteJ UT, the result slots is empty, so they can not
test the slots allocation with rack aware is true.
### Why are the changes needed?
To fix the exists UT
### Does this PR introduce _any_ user-facing change?
No
### How was this patch tested?
Fix UT methods: `offerSlotsRoundRobinWithRackAware()` and
`offerSlotsRoundRobinWithRackAwareWithoutMappingFile()`
Closes #2800 from wankunde/slots_allocator2.
Authored-by: wankunde <[email protected]>
Signed-off-by: mingji <[email protected]>
(cherry picked from commit 991f1c23b6dd2defecb365444101ac7bf11e3ccd)
Signed-off-by: mingji <[email protected]>
---
.../master/SlotsAllocatorRackAwareSuiteJ.java | 26 ++++++++++++++++------
1 file changed, 19 insertions(+), 7 deletions(-)
diff --git
a/master/src/test/java/org/apache/celeborn/service/deploy/master/SlotsAllocatorRackAwareSuiteJ.java
b/master/src/test/java/org/apache/celeborn/service/deploy/master/SlotsAllocatorRackAwareSuiteJ.java
index 6c1627fff..d8d7bfdd1 100644
---
a/master/src/test/java/org/apache/celeborn/service/deploy/master/SlotsAllocatorRackAwareSuiteJ.java
+++
b/master/src/test/java/org/apache/celeborn/service/deploy/master/SlotsAllocatorRackAwareSuiteJ.java
@@ -33,6 +33,7 @@ import java.util.stream.Collectors;
import java.util.stream.IntStream;
import scala.Tuple2;
+import scala.collection.mutable.ArrayBuffer;
import org.apache.hadoop.net.NetworkTopology;
import org.apache.hadoop.net.TableMapping;
@@ -91,6 +92,9 @@ public class SlotsAllocatorRackAwareSuiteJ {
resolver.resolve(location.getPeer().getHost()).getNetworkLocation());
}
};
+ Integer locationsCount =
+ slots.values().stream().map(tup -> tup._1.size() +
tup._2.size()).reduce(0, Integer::sum);
+ Assert.assertEquals((long) locationsCount, partitionIds.size() * 2);
slots.values().stream().map(Tuple2::_1).flatMap(Collection::stream).forEach(assertCustomer);
}
@@ -131,17 +135,26 @@ public class SlotsAllocatorRackAwareSuiteJ {
resolver.resolve(location.getPeer().getHost()).getNetworkLocation());
}
};
+ Integer locationsCount =
+ slots.values().stream().map(tup -> tup._1.size() +
tup._2.size()).reduce(0, Integer::sum);
+ Assert.assertEquals((long) locationsCount, partitionIds.size() * 2);
slots.values().stream().map(Tuple2::_1).flatMap(Collection::stream).forEach(assertConsumer);
}
private List<WorkerInfo> prepareWorkers(CelebornRackResolver resolver) {
ArrayList<WorkerInfo> workers = new ArrayList<>(3);
- workers.add(new WorkerInfo("host1", 9, 10, 110, 113, 212, new HashMap<>(),
null));
- workers.add(new WorkerInfo("host2", 9, 11, 111, 114, 212, new HashMap<>(),
null));
- workers.add(new WorkerInfo("host3", 9, 12, 112, 115, 212, new HashMap<>(),
null));
- workers.add(new WorkerInfo("host4", 9, 10, 110, 113, 212, new HashMap<>(),
null));
- workers.add(new WorkerInfo("host5", 9, 11, 111, 114, 212, new HashMap<>(),
null));
- workers.add(new WorkerInfo("host6", 9, 12, 112, 115, 212, new HashMap<>(),
null));
+ ArrayBuffer<File> files = new ArrayBuffer<>();
+ files.$plus$eq(new File("/mnt/disk/1"));
+ files.$plus$eq(new File("/mnt/disk/2"));
+ HashMap<String, DiskInfo> diskInfos = new HashMap<>();
+ diskInfos.put(
+ "disk1", new DiskInfo("/mnt/disk/0", 1000, 1000, 1000, 1000,
files.toList(), null));
+ workers.add(new WorkerInfo("host1", 9, 10, 110, 113, 212, diskInfos,
null));
+ workers.add(new WorkerInfo("host2", 9, 11, 111, 114, 212, diskInfos,
null));
+ workers.add(new WorkerInfo("host3", 9, 12, 112, 115, 212, diskInfos,
null));
+ workers.add(new WorkerInfo("host4", 9, 10, 110, 113, 212, diskInfos,
null));
+ workers.add(new WorkerInfo("host5", 9, 11, 111, 114, 212, diskInfos,
null));
+ workers.add(new WorkerInfo("host6", 9, 12, 112, 115, 212, diskInfos,
null));
workers.forEach(
new Consumer<WorkerInfo>() {
@@ -217,7 +230,6 @@ public class SlotsAllocatorRackAwareSuiteJ {
List<SlotReplicaAllocatorTestCase> allTests =
getSlotReplicaAllocatorTestCases();
for (final SlotReplicaAllocatorTestCase test : allTests) {
-
final int numPartitions = test.getNumPartitions();
long maxValue = Long.MIN_VALUE;
List<WorkerInfo> maxValueWorkers = null;