mridulm commented on code in PR #2119:
URL: 
https://github.com/apache/incubator-celeborn/pull/2119#discussion_r1412725910


##########
master/src/main/scala/org/apache/celeborn/service/deploy/master/Master.scala:
##########
@@ -666,23 +666,32 @@ private[celeborn] class Master(
     val shuffleKey = Utils.makeShuffleKey(requestSlots.applicationId, 
requestSlots.shuffleId)
 
     val availableWorkers = workersAvailable()
+    // reply false if all workers are unavailable
+    if (availableWorkers.isEmpty) {
+      logError(
+        s"Non available workers, offer slots for $numReducers reducers of 
$shuffleKey failed!")

Review Comment:
   nit: 
   ```suggestion
           s"No available workers, offer slots for $numReducers reducers for 
$shuffleKey failed!")
   ```



##########
master/src/main/scala/org/apache/celeborn/service/deploy/master/Master.scala:
##########
@@ -666,23 +666,32 @@ private[celeborn] class Master(
     val shuffleKey = Utils.makeShuffleKey(requestSlots.applicationId, 
requestSlots.shuffleId)
 
     val availableWorkers = workersAvailable()
+    // reply false if all workers are unavailable
+    if (availableWorkers.isEmpty) {
+      logError(
+        s"Non available workers, offer slots for $numReducers reducers of 
$shuffleKey failed!")
+      context.reply(RequestSlotsResponse(StatusCode.SLOT_NOT_AVAILABLE, new 
WorkerResource()))
+      return
+    }
+
     val numAvailableWorkers = availableWorkers.size()
     val numWorkers = Math.min(
       Math.max(
         if (requestSlots.shouldReplicate) 2 else 1,
         if (requestSlots.maxWorkers <= 0) slotsAssignMaxWorkers
         else Math.min(slotsAssignMaxWorkers, requestSlots.maxWorkers)),
       numAvailableWorkers)
+
+    // We treated availableWorkers as a Circular Queue here.
     val startIndex = Random.nextInt(numAvailableWorkers)
+    val endIndex = startIndex + numWorkers
+
     val selectedWorkers = new util.ArrayList[WorkerInfo](numWorkers)
-    selectedWorkers.addAll(availableWorkers.subList(
-      startIndex,
-      Math.min(numAvailableWorkers, startIndex + numWorkers)))
-    if (startIndex + numWorkers > numAvailableWorkers) {
-      selectedWorkers.addAll(availableWorkers.subList(
-        0,
-        startIndex + numWorkers - numAvailableWorkers))
+    for (index <- startIndex until endIndex) {
+      val realIndex = index % numAvailableWorkers
+      selectedWorkers.add(availableWorkers.get(realIndex))
     }
+

Review Comment:
   Wont this not be slower ? The range check is done for each `add`, and we are 
not able to leverage bulk add from sublist to array list as well.
   
   The earlier code is definitely a bit dense - but that can be fixed by simply 
pulling out variables and making it cleaner ?
   
   For example, something like (strawman proposal):
   ```
   val selectedWorkers = new util.ArrayList[WorkerInfo](numWorkers)
   
   def copySublist(from: Int): Unit = {
     val numEntriesToCopy = Math.min(numAvailableWorkers - from, numWorkers - 
selectedWorkers.size)
     selectedWorkers.addAll(availableWorkers.subList(from, from + 
numEntriesToCopy)
   }
   
   copySublist(startIndex)
   if (selectedWorkers.size < numWorkers) {
     // wrap around to beginning to copy rest.
     copySublist(0)
   }
   ```
   
   
   
   



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: [email protected]

For queries about this service, please contact Infrastructure at:
[email protected]

Reply via email to