This is an automated email from the ASF dual-hosted git repository.
ethanfeng pushed a commit to branch branch-0.3
in repository https://gitbox.apache.org/repos/asf/incubator-celeborn.git
The following commit(s) were added to refs/heads/branch-0.3 by this push:
new 330c49837 [CELEBORN-1034] Offer slots uses random range of available
workers instead of shuffling
330c49837 is described below
commit 330c49837deb347da1f0decd7e7bd6f2460c9b2a
Author: zky.zhoukeyong <[email protected]>
AuthorDate: Wed Oct 18 17:00:03 2023 +0800
[CELEBORN-1034] Offer slots uses random range of available workers instead
of shuffling
### What changes were proposed in this pull request?
In original design, (primary worker, replica worker) pairs tends to stay
stable, for example,
for primary PartitionLocations on Worker A, their replica
PartitionLocations will all be on
Worker B in general scenarios, i.e. all workers are healthy and works well.
This way, one Worker
will have only one (or very few) connections to other workers' replicate
netty server.
However, https://github.com/apache/incubator-celeborn/pull/1790 calls
`Collections.shuffle(availableWorkers)`,
causing the number of replica connections increases dramatically:

### Why are the changes needed?
This PR refine the logic of selecting limited number of workers, instead of
shuffling,
Master just randomly picks a range of available workers.
### Does this PR introduce _any_ user-facing change?
No.
### How was this patch tested?
Manual test.
Closes #1975 from waitinfuture/1034.
Lead-authored-by: zky.zhoukeyong <[email protected]>
Co-authored-by: Keyong Zhou <[email protected]>
Signed-off-by: mingji <[email protected]>
(cherry picked from commit a5dfd67d5b9bcb7d5da59f441ed1d60b4bc27cd3)
Signed-off-by: mingji <[email protected]>
---
.../celeborn/service/deploy/master/Master.scala | 21 +++++++++++++++------
1 file changed, 15 insertions(+), 6 deletions(-)
diff --git
a/master/src/main/scala/org/apache/celeborn/service/deploy/master/Master.scala
b/master/src/main/scala/org/apache/celeborn/service/deploy/master/Master.scala
index 79e1b9a37..b134781b8 100644
---
a/master/src/main/scala/org/apache/celeborn/service/deploy/master/Master.scala
+++
b/master/src/main/scala/org/apache/celeborn/service/deploy/master/Master.scala
@@ -615,22 +615,31 @@ private[celeborn] class Master(
val numReducers = requestSlots.partitionIdList.size()
val shuffleKey = Utils.makeShuffleKey(requestSlots.applicationId,
requestSlots.shuffleId)
- var availableWorkers = workersAvailable()
- Collections.shuffle(availableWorkers)
+ val availableWorkers = workersAvailable()
+ val numAvailableWorkers = availableWorkers.size()
val numWorkers = Math.min(
Math.max(
if (requestSlots.shouldReplicate) 2 else 1,
if (requestSlots.maxWorkers <= 0) slotsAssignMaxWorkers
else Math.min(slotsAssignMaxWorkers, requestSlots.maxWorkers)),
- availableWorkers.size())
- availableWorkers = availableWorkers.subList(0, numWorkers)
+ numAvailableWorkers)
+ val startIndex = Random.nextInt(numAvailableWorkers)
+ val selectedWorkers = new util.ArrayList[WorkerInfo](numWorkers)
+ selectedWorkers.addAll(availableWorkers.subList(
+ startIndex,
+ Math.min(numAvailableWorkers, startIndex + numWorkers)))
+ if (startIndex + numWorkers > numAvailableWorkers) {
+ selectedWorkers.addAll(availableWorkers.subList(
+ 0,
+ startIndex + numWorkers - numAvailableWorkers))
+ }
// offer slots
val slots =
masterSource.sample(MasterSource.OFFER_SLOTS_TIME,
s"offerSlots-${Random.nextInt()}") {
statusSystem.workers.synchronized {
if (slotsAssignPolicy == SlotsAssignPolicy.LOADAWARE &&
!hasHDFSStorage) {
SlotsAllocator.offerSlotsLoadAware(
- availableWorkers,
+ selectedWorkers,
requestSlots.partitionIdList,
requestSlots.shouldReplicate,
requestSlots.shouldRackAware,
@@ -641,7 +650,7 @@ private[celeborn] class Master(
loadAwareFetchTimeWeight)
} else {
SlotsAllocator.offerSlotsRoundRobin(
- availableWorkers,
+ selectedWorkers,
requestSlots.partitionIdList,
requestSlots.shouldReplicate,
requestSlots.shouldRackAware)