This is an automated email from the ASF dual-hosted git repository.
ethanfeng pushed a commit to branch main
in repository https://gitbox.apache.org/repos/asf/incubator-celeborn.git
The following commit(s) were added to refs/heads/main by this push:
new a5dfd67d5 [CELEBORN-1034] Offer slots uses random range of available
workers instead of shuffling
a5dfd67d5 is described below
commit a5dfd67d5b9bcb7d5da59f441ed1d60b4bc27cd3
Author: zky.zhoukeyong <[email protected]>
AuthorDate: Wed Oct 18 17:00:03 2023 +0800
[CELEBORN-1034] Offer slots uses random range of available workers instead
of shuffling
### What changes were proposed in this pull request?
In original design, (primary worker, replica worker) pairs tends to stay
stable, for example,
for primary PartitionLocations on Worker A, their replica
PartitionLocations will all be on
Worker B in general scenarios, i.e. all workers are healthy and works well.
This way, one Worker
will have only one (or very few) connections to other workers' replicate
netty server.
However, https://github.com/apache/incubator-celeborn/pull/1790 calls
`Collections.shuffle(availableWorkers)`,
causing the number of replica connections increases dramatically:

### Why are the changes needed?
This PR refine the logic of selecting limited number of workers, instead of
shuffling,
Master just randomly picks a range of available workers.
### Does this PR introduce _any_ user-facing change?
No.
### How was this patch tested?
Manual test.
Closes #1975 from waitinfuture/1034.
Lead-authored-by: zky.zhoukeyong <[email protected]>
Co-authored-by: Keyong Zhou <[email protected]>
Signed-off-by: mingji <[email protected]>
---
.../celeborn/service/deploy/master/Master.scala | 21 +++++++++++++++------
1 file changed, 15 insertions(+), 6 deletions(-)
diff --git
a/master/src/main/scala/org/apache/celeborn/service/deploy/master/Master.scala
b/master/src/main/scala/org/apache/celeborn/service/deploy/master/Master.scala
index 18a5b7560..a2226b6b0 100644
---
a/master/src/main/scala/org/apache/celeborn/service/deploy/master/Master.scala
+++
b/master/src/main/scala/org/apache/celeborn/service/deploy/master/Master.scala
@@ -615,22 +615,31 @@ private[celeborn] class Master(
val numReducers = requestSlots.partitionIdList.size()
val shuffleKey = Utils.makeShuffleKey(requestSlots.applicationId,
requestSlots.shuffleId)
- var availableWorkers = workersAvailable()
- Collections.shuffle(availableWorkers)
+ val availableWorkers = workersAvailable()
+ val numAvailableWorkers = availableWorkers.size()
val numWorkers = Math.min(
Math.max(
if (requestSlots.shouldReplicate) 2 else 1,
if (requestSlots.maxWorkers <= 0) slotsAssignMaxWorkers
else Math.min(slotsAssignMaxWorkers, requestSlots.maxWorkers)),
- availableWorkers.size())
- availableWorkers = availableWorkers.subList(0, numWorkers)
+ numAvailableWorkers)
+ val startIndex = Random.nextInt(numAvailableWorkers)
+ val selectedWorkers = new util.ArrayList[WorkerInfo](numWorkers)
+ selectedWorkers.addAll(availableWorkers.subList(
+ startIndex,
+ Math.min(numAvailableWorkers, startIndex + numWorkers)))
+ if (startIndex + numWorkers > numAvailableWorkers) {
+ selectedWorkers.addAll(availableWorkers.subList(
+ 0,
+ startIndex + numWorkers - numAvailableWorkers))
+ }
// offer slots
val slots =
masterSource.sample(MasterSource.OFFER_SLOTS_TIME,
s"offerSlots-${Random.nextInt()}") {
statusSystem.workers.synchronized {
if (slotsAssignPolicy == SlotsAssignPolicy.LOADAWARE &&
!hasHDFSStorage) {
SlotsAllocator.offerSlotsLoadAware(
- availableWorkers,
+ selectedWorkers,
requestSlots.partitionIdList,
requestSlots.shouldReplicate,
requestSlots.shouldRackAware,
@@ -641,7 +650,7 @@ private[celeborn] class Master(
loadAwareFetchTimeWeight)
} else {
SlotsAllocator.offerSlotsRoundRobin(
- availableWorkers,
+ selectedWorkers,
requestSlots.partitionIdList,
requestSlots.shouldReplicate,
requestSlots.shouldRackAware)