xleoken commented on code in PR #2119:
URL:
https://github.com/apache/incubator-celeborn/pull/2119#discussion_r1413278837
##########
master/src/main/scala/org/apache/celeborn/service/deploy/master/Master.scala:
##########
@@ -666,23 +666,32 @@ private[celeborn] class Master(
val shuffleKey = Utils.makeShuffleKey(requestSlots.applicationId,
requestSlots.shuffleId)
val availableWorkers = workersAvailable()
+ // reply false if all workers are unavailable
+ if (availableWorkers.isEmpty) {
+ logError(
+ s"Non available workers, offer slots for $numReducers reducers of
$shuffleKey failed!")
+ context.reply(RequestSlotsResponse(StatusCode.SLOT_NOT_AVAILABLE, new
WorkerResource()))
+ return
+ }
+
val numAvailableWorkers = availableWorkers.size()
val numWorkers = Math.min(
Math.max(
if (requestSlots.shouldReplicate) 2 else 1,
if (requestSlots.maxWorkers <= 0) slotsAssignMaxWorkers
else Math.min(slotsAssignMaxWorkers, requestSlots.maxWorkers)),
numAvailableWorkers)
+
+ // We treated availableWorkers as a Circular Queue here.
val startIndex = Random.nextInt(numAvailableWorkers)
+ val endIndex = startIndex + numWorkers
+
val selectedWorkers = new util.ArrayList[WorkerInfo](numWorkers)
- selectedWorkers.addAll(availableWorkers.subList(
- startIndex,
- Math.min(numAvailableWorkers, startIndex + numWorkers)))
- if (startIndex + numWorkers > numAvailableWorkers) {
- selectedWorkers.addAll(availableWorkers.subList(
- 0,
- startIndex + numWorkers - numAvailableWorkers))
+ for (index <- startIndex until endIndex) {
+ val realIndex = index % numAvailableWorkers
+ selectedWorkers.add(availableWorkers.get(realIndex))
}
+
Review Comment:
hi @mridulm, thanks for your detail response 👍, you are very correct about
the algorithm performance itself, and I learned about performance testing
framework from your suggestion.
As we known, celeborn is designed for remote shuffle, which usually used in
OLAP scenarios, so that the very small performance gap is acceptable from my
side, and the focus of this patch is to improve readability.
and we can see that there is a similar algorithm in the
`Master#handleRequestSlots`.
https://github.com/apache/incubator-celeborn/blob/1c7cd1bd1392c8d3fd4396c539696dc79835f8dc/master/src/main/scala/org/apache/celeborn/service/deploy/master/Master.scala#L738-L750
##########
master/src/main/scala/org/apache/celeborn/service/deploy/master/Master.scala:
##########
@@ -666,23 +666,32 @@ private[celeborn] class Master(
val shuffleKey = Utils.makeShuffleKey(requestSlots.applicationId,
requestSlots.shuffleId)
val availableWorkers = workersAvailable()
+ // reply false if all workers are unavailable
+ if (availableWorkers.isEmpty) {
+ logError(
+ s"Non available workers, offer slots for $numReducers reducers of
$shuffleKey failed!")
+ context.reply(RequestSlotsResponse(StatusCode.SLOT_NOT_AVAILABLE, new
WorkerResource()))
+ return
+ }
+
val numAvailableWorkers = availableWorkers.size()
val numWorkers = Math.min(
Math.max(
if (requestSlots.shouldReplicate) 2 else 1,
if (requestSlots.maxWorkers <= 0) slotsAssignMaxWorkers
else Math.min(slotsAssignMaxWorkers, requestSlots.maxWorkers)),
numAvailableWorkers)
+
+ // We treated availableWorkers as a Circular Queue here.
val startIndex = Random.nextInt(numAvailableWorkers)
+ val endIndex = startIndex + numWorkers
+
val selectedWorkers = new util.ArrayList[WorkerInfo](numWorkers)
- selectedWorkers.addAll(availableWorkers.subList(
- startIndex,
- Math.min(numAvailableWorkers, startIndex + numWorkers)))
- if (startIndex + numWorkers > numAvailableWorkers) {
- selectedWorkers.addAll(availableWorkers.subList(
- 0,
- startIndex + numWorkers - numAvailableWorkers))
+ for (index <- startIndex until endIndex) {
+ val realIndex = index % numAvailableWorkers
+ selectedWorkers.add(availableWorkers.get(realIndex))
}
+
Review Comment:
> IMO performance is also important - atleast for our deployment, where we
are running millions of shuffles a day for PB's of data, it is :-)
> [1]
Result "com.example.benchmarks.SublistBenchmark.sublistTest":
1017.635 ±(99.9%) 7.152 ns/op [Average]
(min, avg, max) = (1004.876, 1017.635, 1037.149), stdev = 9.548
CI (99.9%): [1010.483, 1024.787] (assumes normal distribution)
Result "com.example.benchmarks.SublistBenchmark.indexTest":
1868.689 ±(99.9%) 32.588 ns/op [Average]
(min, avg, max) = (1819.288, 1868.689, 1955.872), stdev = 43.505
CI (99.9%): [1836.101, 1901.278] (assumes normal distribution)
Hi @mridulm, as you have tested, we can see that `32.588ns/op` vs
`7.152ns/op`(based on LinkedList), seems that it can't affect the
handleRequestSlots performance.
e.g (based on LinkedList)
1 millions of shuffles, the delta is
100, 0000 * (32ns - 7ns) = 25000000 ns < 1s
BTW, the `availableWorkers` is ArrayList, the delta difference will be very
very small.
So for this patch, readability has a higher priority.

##########
master/src/main/scala/org/apache/celeborn/service/deploy/master/Master.scala:
##########
@@ -666,23 +666,32 @@ private[celeborn] class Master(
val shuffleKey = Utils.makeShuffleKey(requestSlots.applicationId,
requestSlots.shuffleId)
val availableWorkers = workersAvailable()
+ // reply false if all workers are unavailable
+ if (availableWorkers.isEmpty) {
+ logError(
+ s"Non available workers, offer slots for $numReducers reducers of
$shuffleKey failed!")
+ context.reply(RequestSlotsResponse(StatusCode.SLOT_NOT_AVAILABLE, new
WorkerResource()))
+ return
+ }
+
val numAvailableWorkers = availableWorkers.size()
val numWorkers = Math.min(
Math.max(
if (requestSlots.shouldReplicate) 2 else 1,
if (requestSlots.maxWorkers <= 0) slotsAssignMaxWorkers
else Math.min(slotsAssignMaxWorkers, requestSlots.maxWorkers)),
numAvailableWorkers)
+
+ // We treated availableWorkers as a Circular Queue here.
val startIndex = Random.nextInt(numAvailableWorkers)
+ val endIndex = startIndex + numWorkers
+
val selectedWorkers = new util.ArrayList[WorkerInfo](numWorkers)
- selectedWorkers.addAll(availableWorkers.subList(
- startIndex,
- Math.min(numAvailableWorkers, startIndex + numWorkers)))
- if (startIndex + numWorkers > numAvailableWorkers) {
- selectedWorkers.addAll(availableWorkers.subList(
- 0,
- startIndex + numWorkers - numAvailableWorkers))
+ for (index <- startIndex until endIndex) {
+ val realIndex = index % numAvailableWorkers
+ selectedWorkers.add(availableWorkers.get(realIndex))
}
+
Review Comment:
@mridulm, thanks for sharing your thought. 👍
--
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.
To unsubscribe, e-mail: [email protected]
For queries about this service, please contact Infrastructure at:
[email protected]