This is an automated email from the ASF dual-hosted git repository.
zhouky pushed a commit to branch branch-0.1
in repository https://gitbox.apache.org/repos/asf/incubator-celeborn.git
The following commit(s) were added to refs/heads/branch-0.1 by this push:
new f7fdd173 [ISSUE-601] limit the max workers while offerSlots (#1096)
f7fdd173 is described below
commit f7fdd1733c430ea1f7e1db14af8e2debeec000f1
Author: jxysoft <[email protected]>
AuthorDate: Wed Dec 21 13:07:36 2022 +0800
[ISSUE-601] limit the max workers while offerSlots (#1096)
Co-authored-by: xianyao.jiang <[email protected]>
---
.../scala/com/aliyun/emr/rss/common/RssConf.scala | 18 +++
.../emr/rss/service/deploy/master/MasterUtil.java | 65 +++++++--
.../emr/rss/service/deploy/master/Master.scala | 3 +-
.../service/deploy/master/MasterUtilSuiteJ.java | 150 ++++++++++++++++++++-
4 files changed, 216 insertions(+), 20 deletions(-)
diff --git a/common/src/main/scala/com/aliyun/emr/rss/common/RssConf.scala
b/common/src/main/scala/com/aliyun/emr/rss/common/RssConf.scala
index 1e3f6298..b41b48ff 100644
--- a/common/src/main/scala/com/aliyun/emr/rss/common/RssConf.scala
+++ b/common/src/main/scala/com/aliyun/emr/rss/common/RssConf.scala
@@ -652,6 +652,24 @@ object RssConf extends Logging {
conf.getInt("rss.offer.slots.extra.size", 2)
}
+ def offerSlotsOrderByFreeSlots(conf: RssConf): Boolean = {
+ conf.getBoolean("rss.offer.slots.orderByFreeSlots", false)
+ }
+
+ def offerSlotsMaxWorkers(conf: RssConf): Int = {
+ // -1 unlimited
+ conf.getInt("rss.offer.slots.maxWorkers", -1)
+ }
+
+ def offerSlotsMinWorkers(conf: RssConf): Int = {
+ conf.getInt("rss.offer.slots.minWorkers", 1)
+ }
+
+ def offerSlotsMinPartitionsPerWorker(conf: RssConf): Long = {
+ // -1 unlimited
+ conf.getLong("rss.offer.slots.minPartitionsPerWorker", -1)
+ }
+
def shuffleWriterMode(conf: RssConf): String = {
conf.get("rss.shuffle.writer.mode", "hash")
}
diff --git
a/server-master/src/main/java/com/aliyun/emr/rss/service/deploy/master/MasterUtil.java
b/server-master/src/main/java/com/aliyun/emr/rss/service/deploy/master/MasterUtil.java
index 846921c2..39d8aee9 100644
---
a/server-master/src/main/java/com/aliyun/emr/rss/service/deploy/master/MasterUtil.java
+++
b/server-master/src/main/java/com/aliyun/emr/rss/service/deploy/master/MasterUtil.java
@@ -27,6 +27,7 @@ import java.util.Random;
import scala.Tuple2;
+import com.aliyun.emr.rss.common.RssConf;
import com.aliyun.emr.rss.common.meta.WorkerInfo;
import com.aliyun.emr.rss.common.protocol.PartitionLocation;
@@ -50,22 +51,62 @@ public class MasterUtil {
String shuffleKey,
List<WorkerInfo> workers,
List<Integer> reduceIds,
- boolean shouldReplicate) {
- int[] oldEpochs = new int[reduceIds.size()];
- Arrays.fill(oldEpochs, -1);
- return offerSlots(shuffleKey, workers, reduceIds, oldEpochs,
shouldReplicate);
+ boolean shouldReplicate,
+ RssConf conf) {
+ if (workers.size() < 1 || workers.size() < 2 && shouldReplicate) {
+ return null;
+ }
+
+ int targetSlots = shouldReplicate ? reduceIds.size() * 2 :
reduceIds.size();
+
+ // get max number of workers
+ int maxWorkerNums = workers.size();
+ long offerSlotsMinPartitionsPerWorker =
RssConf.offerSlotsMinPartitionsPerWorker(conf);
+ if (offerSlotsMinPartitionsPerWorker > 0) {
+ int workerNums = (int) ((targetSlots +
+ offerSlotsMinPartitionsPerWorker - 1) /
offerSlotsMinPartitionsPerWorker);
+ maxWorkerNums = Math.min(maxWorkerNums, workerNums);
+ }
+
+ int offerSlotsMaxWorkers = RssConf.offerSlotsMaxWorkers(conf);
+ if (offerSlotsMaxWorkers > 0) {
+ maxWorkerNums = Math.min(maxWorkerNums, offerSlotsMaxWorkers);
+ }
+
+ int minWorkerNums = Math.min(targetSlots,
Math.max(RssConf.offerSlotsMinWorkers(conf), 1));
+ if (shouldReplicate) {
+ minWorkerNums = Math.max(2, minWorkerNums);
+ }
+ maxWorkerNums = Math.max(minWorkerNums, maxWorkerNums);
+
+ // choose max number of workers
+ if (maxWorkerNums < workers.size()) {
+ if (RssConf.offerSlotsOrderByFreeSlots(conf)) {
+ workers.sort((o1, o2) -> o2.freeSlots() - o1.freeSlots());
+ }
+ List<WorkerInfo> newWorkers = workers.subList(0, maxWorkerNums);
+ Map<WorkerInfo, Tuple2<List<PartitionLocation>,
List<PartitionLocation>>> res =
+ doOfferSlots(newWorkers, reduceIds, shouldReplicate,
targetSlots);
+ if( res != null) {
+ return res;
+ }
+ }
+
+ // fallback to original
+ return doOfferSlots(workers, reduceIds, shouldReplicate, targetSlots);
}
- public static Map<WorkerInfo, Tuple2<List<PartitionLocation>,
List<PartitionLocation>>>
- offerSlots(
- String shuffleKey,
+ private static Map<WorkerInfo, Tuple2<List<PartitionLocation>,
List<PartitionLocation>>>
+ doOfferSlots(
List<WorkerInfo> workers,
List<Integer> reduceIds,
- int[] oldEpochs,
- boolean shouldReplicate) {
- if (workers.size() < 2 && shouldReplicate) {
- return null;
- }
+ boolean shouldReplicate,
+ long targetSlots) {
+ long totalFreeSlots = workers.stream().mapToLong(i -> i.freeSlots()).sum();
+ if (totalFreeSlots < targetSlots) return null;
+
+ int[] oldEpochs = new int[reduceIds.size()];
+ Arrays.fill(oldEpochs, -1);
int masterInd = rand.nextInt(workers.size());
Map<WorkerInfo, Tuple2<List<PartitionLocation>, List<PartitionLocation>>>
slots =
diff --git
a/server-master/src/main/scala/com/aliyun/emr/rss/service/deploy/master/Master.scala
b/server-master/src/main/scala/com/aliyun/emr/rss/service/deploy/master/Master.scala
index 0d38d250..5012bab8 100644
---
a/server-master/src/main/scala/com/aliyun/emr/rss/service/deploy/master/Master.scala
+++
b/server-master/src/main/scala/com/aliyun/emr/rss/service/deploy/master/Master.scala
@@ -332,7 +332,8 @@ private[deploy] class Master(
shuffleKey,
workersNotBlacklisted(),
requestSlots.reduceIdList,
- requestSlots.shouldReplicate
+ requestSlots.shouldReplicate,
+ conf
)
}
diff --git
a/server-master/src/test/java/com/aliyun/emr/rss/service/deploy/master/MasterUtilSuiteJ.java
b/server-master/src/test/java/com/aliyun/emr/rss/service/deploy/master/MasterUtilSuiteJ.java
index 33844425..61dae921 100644
---
a/server-master/src/test/java/com/aliyun/emr/rss/service/deploy/master/MasterUtilSuiteJ.java
+++
b/server-master/src/test/java/com/aliyun/emr/rss/service/deploy/master/MasterUtilSuiteJ.java
@@ -23,6 +23,7 @@ import java.util.Collections;
import java.util.List;
import java.util.Map;
+import com.aliyun.emr.rss.common.RssConf;
import scala.Tuple2;
import org.junit.Test;
@@ -31,6 +32,7 @@ import com.aliyun.emr.rss.common.meta.WorkerInfo;
import com.aliyun.emr.rss.common.protocol.PartitionLocation;
public class MasterUtilSuiteJ {
+ RssConf rssConf = new RssConf();
private List<WorkerInfo> prepareWorkers(int numSlots) {
ArrayList<WorkerInfo> workers = new ArrayList<>(3);
@@ -47,7 +49,7 @@ public class MasterUtilSuiteJ {
final List<Integer> reduceIds = Collections.emptyList();
final boolean shouldReplicate = true;
- check(0, 3, workers, reduceIds, shouldReplicate, true);
+ check(0, 3, workers, reduceIds, shouldReplicate, true, rssConf);
}
@Test
@@ -57,7 +59,7 @@ public class MasterUtilSuiteJ {
final List<Integer> reduceIds = Collections.singletonList(0);
final boolean shouldReplicate = true;
- check(2, 1, workers, reduceIds, shouldReplicate, true);
+ check(2, 1, workers, reduceIds, shouldReplicate, true, rssConf);
}
@Test
@@ -67,7 +69,7 @@ public class MasterUtilSuiteJ {
final List<Integer> reduceIds = Collections.singletonList(0);
final boolean shouldReplicate = false;
- check(1, 2, workers, reduceIds, shouldReplicate, true);
+ check(1, 2, workers, reduceIds, shouldReplicate, true, rssConf);
}
@Test
@@ -77,7 +79,7 @@ public class MasterUtilSuiteJ {
final List<Integer> reduceIds = Arrays.asList(0, 1);
final boolean shouldReplicate = false;
- check(2, 1, workers, reduceIds, shouldReplicate, true);
+ check(2, 1, workers, reduceIds, shouldReplicate, true, rssConf);
}
@Test
@@ -87,7 +89,140 @@ public class MasterUtilSuiteJ {
final List<Integer> reduceIds = Arrays.asList(0, 1, 2);
final boolean shouldReplicate = false;
- check(3, 0, workers, reduceIds, shouldReplicate, true);
+ check(3, 0, workers, reduceIds, shouldReplicate, true, rssConf);
+ }
+
+ public void testAllocateSlotsForOrderByFreeSlots() {
+ final List<WorkerInfo> workers = new ArrayList<>(3);
+ workers.add(new WorkerInfo("host1", 9, 10, 110, 113, 1, null));
+ workers.add(new WorkerInfo("host2", 9, 11, 111, 114, 2, null));
+ workers.add(new WorkerInfo("host3", 9, 12, 112, 115, 3, null));
+ final List<Integer> reduceIds = Arrays.asList(0, 1, 2);
+ final boolean shouldReplicate = false;
+
+ RssConf conf = new RssConf();
+ conf.set("rss.offer.slots.orderByFreeSlots", "true");
+ check(3, 3, workers, reduceIds, shouldReplicate, true, conf);
+ }
+
+ @Test
+ public void testAllocateSlotsByReduceIdsWithoutReplicate() {
+ RssConf conf = new RssConf();
+ conf.set("rss.offer.slots.minWorkers", "3");
+ conf.set("rss.offer.slots.maxWorkers", "6");
+ conf.set("rss.offer.slots.minPartitionsPerWorker", "2");
+
+ List<Integer> reduceIds = null;
+ boolean shouldReplicate = false;
+ int totalSlots = 10 * 3;
+
+ List<WorkerInfo> workers = genWorkers(10, 3);
+ reduceIds = genReduceIds(1);
+ check(1, totalSlots - 1, workers, reduceIds, shouldReplicate, true, conf);
+
+ workers = genWorkers(10, 3);
+ reduceIds = genReduceIds(3);
+ check(3, totalSlots - 3, workers, reduceIds, shouldReplicate, true, conf);
+
+ workers = genWorkers(10, 3);
+ reduceIds = genReduceIds(4);
+ check(3, totalSlots - 4, workers, reduceIds, shouldReplicate, true, conf);
+
+ workers = genWorkers(10, 3);
+ reduceIds = genReduceIds(6);
+ check(3, totalSlots - 6, workers, reduceIds, shouldReplicate, true, conf);
+
+ workers = genWorkers(10, 3);
+ reduceIds = genReduceIds(7);
+ check(4, totalSlots - 7, workers, reduceIds, shouldReplicate, true, conf);
+
+ workers = genWorkers(10, 3);
+ reduceIds = genReduceIds(12);
+ check(6, totalSlots - 12, workers, reduceIds, shouldReplicate, true, conf);
+
+ workers = genWorkers(10, 3);
+ reduceIds = genReduceIds(13);
+ check(6, totalSlots - 13, workers, reduceIds, shouldReplicate, true, conf);
+
+ workers = genWorkers(10, 3);
+ reduceIds = genReduceIds(18);
+ check(6, totalSlots - 18, workers, reduceIds, shouldReplicate, true, conf);
+
+ workers = genWorkers(10, 3);
+ reduceIds = genReduceIds(19);
+ check(10, totalSlots - 19, workers, reduceIds, shouldReplicate, true,
conf);
+
+ workers = genWorkers(10, 3);
+ reduceIds = genReduceIds(30);
+ check(10, totalSlots - 30, workers, reduceIds, shouldReplicate, true,
conf);
+
+ workers = genWorkers(10, 3);
+ reduceIds = genReduceIds(31);
+ check(0, totalSlots, workers, reduceIds, shouldReplicate, false, conf);
+ }
+
+ @Test
+ public void testAllocateSlotsByReduceIdsWithReplicate() {
+ RssConf conf = new RssConf();
+ conf.set("rss.offer.slots.minWorkers", "3");
+ conf.set("rss.offer.slots.maxWorkers", "6");
+ conf.set("rss.offer.slots.minPartitionsPerWorker", "2");
+
+ List<Integer> reduceIds = null;
+ boolean shouldReplicate = true;
+ int totalSlots = 10 * 3;
+
+ List<WorkerInfo> workers = genWorkers(10, 3);
+ reduceIds = genReduceIds(1);
+ check(2, totalSlots - 2, workers, reduceIds, shouldReplicate, true, conf);
+
+ workers = genWorkers(10, 3);
+ reduceIds = genReduceIds(3);
+ check(3, totalSlots - 6, workers, reduceIds, shouldReplicate, true, conf);
+
+ workers = genWorkers(10, 3);
+ reduceIds = genReduceIds(4);
+ check(4, totalSlots - 8, workers, reduceIds, shouldReplicate, true, conf);
+
+ workers = genWorkers(10, 3);
+ reduceIds = genReduceIds(6);
+ check(6, totalSlots - 12, workers, reduceIds, shouldReplicate, true, conf);
+
+ workers = genWorkers(10, 3);
+ reduceIds = genReduceIds(7);
+ check(6, totalSlots - 14, workers, reduceIds, shouldReplicate, true, conf);
+
+ workers = genWorkers(10, 3);
+ reduceIds = genReduceIds(9);
+ check(6, totalSlots - 18, workers, reduceIds, shouldReplicate, true, conf);
+
+ workers = genWorkers(10, 3);
+ reduceIds = genReduceIds(10);
+ check(10, totalSlots - 20, workers, reduceIds, shouldReplicate, true,
conf);
+
+ workers = genWorkers(10, 3);
+ reduceIds = genReduceIds(15);
+ check(10, totalSlots - 30, workers, reduceIds, shouldReplicate, true,
conf);
+
+ workers = genWorkers(10, 3);
+ reduceIds = genReduceIds(16);
+ check(0, totalSlots, workers, reduceIds, shouldReplicate, false, conf);
+ }
+
+ private List<WorkerInfo> genWorkers(int num, int slotPerWorker) {
+ List<WorkerInfo> workers = new ArrayList<>(num);
+ for (int i = 1; i <= num; i++) {
+ workers.add(new WorkerInfo("host" + i, 9, 10, 110, 113, slotPerWorker,
null));
+ }
+ return workers;
+ }
+
+ private List<Integer> genReduceIds(int num) {
+ List<Integer> res = new ArrayList(num);
+ for (int i = 0; i < num; i++) {
+ res.add(i);
+ }
+ return res;
}
private void check(
@@ -96,10 +231,11 @@ public class MasterUtilSuiteJ {
List<WorkerInfo> workers,
List<Integer> reduceIds,
boolean shouldReplicate,
- boolean expectSuccess) {
+ boolean expectSuccess,
+ RssConf rssConf) {
String shuffleKey = "appId-1";
Map<WorkerInfo, Tuple2<List<PartitionLocation>, List<PartitionLocation>>>
slots =
- MasterUtil.offerSlots(shuffleKey, workers, reduceIds, shouldReplicate);
+ MasterUtil.offerSlots(shuffleKey, workers, reduceIds, shouldReplicate,
rssConf);
if (expectSuccess) {
assert usedWorkers == slots.size() : "Offer slots, expect to return "