mridulm commented on code in PR #2119:
URL: 
https://github.com/apache/incubator-celeborn/pull/2119#discussion_r1412851397


##########
master/src/main/scala/org/apache/celeborn/service/deploy/master/Master.scala:
##########
@@ -666,23 +666,32 @@ private[celeborn] class Master(
     val shuffleKey = Utils.makeShuffleKey(requestSlots.applicationId, 
requestSlots.shuffleId)
 
     val availableWorkers = workersAvailable()
+    // reply false if all workers are unavailable
+    if (availableWorkers.isEmpty) {
+      logError(
+        s"Non available workers, offer slots for $numReducers reducers of 
$shuffleKey failed!")
+      context.reply(RequestSlotsResponse(StatusCode.SLOT_NOT_AVAILABLE, new 
WorkerResource()))
+      return
+    }
+
     val numAvailableWorkers = availableWorkers.size()
     val numWorkers = Math.min(
       Math.max(
         if (requestSlots.shouldReplicate) 2 else 1,
         if (requestSlots.maxWorkers <= 0) slotsAssignMaxWorkers
         else Math.min(slotsAssignMaxWorkers, requestSlots.maxWorkers)),
       numAvailableWorkers)
+
+    // We treated availableWorkers as a Circular Queue here.
     val startIndex = Random.nextInt(numAvailableWorkers)
+    val endIndex = startIndex + numWorkers
+
     val selectedWorkers = new util.ArrayList[WorkerInfo](numWorkers)
-    selectedWorkers.addAll(availableWorkers.subList(
-      startIndex,
-      Math.min(numAvailableWorkers, startIndex + numWorkers)))
-    if (startIndex + numWorkers > numAvailableWorkers) {
-      selectedWorkers.addAll(availableWorkers.subList(
-        0,
-        startIndex + numWorkers - numAvailableWorkers))
+    for (index <- startIndex until endIndex) {
+      val realIndex = index % numAvailableWorkers
+      selectedWorkers.add(availableWorkers.get(realIndex))
     }
+

Review Comment:
   I wrote up a quick benchmark based on your code above @xleoken.
   I kept it as-is, though I am  sure some modifications (like ArrayList 
instead of LinkedList will speed things up).
   
   
   TL/DR; it is ~90x slower
   
   Result "com.example.benchmarks.SublistBenchmark.sublistTest":
     3131.106 ±(99.9%) 295.281 ns/op [Average]
     (min, avg, max) = (2785.079, 3131.106, 3671.674), stdev = 394.191
     CI (99.9%): [2835.825, 3426.387] (assumes normal distribution)
   
   
   Result "com.example.benchmarks.SublistBenchmark.indexTest":
     283484.145 ±(99.9%) 818.322 ns/op [Average]
     (min, avg, max) = (281715.260, 283484.145, 285482.984), stdev = 1092.437
     CI (99.9%): [282665.822, 284302.467] (assumes normal distribution)
   
   
   
   
   ```
   @BenchmarkMode(Mode.AverageTime)
   @OutputTimeUnit(TimeUnit.NANOSECONDS)
   @State(Scope.Thread)
   @Threads(1)
   public class SublistBenchmark {
   
       @State(Scope.Benchmark)
       public static class InputState {
   
           // So that both tests are using the same distribution
           private Random random;
           private List<String> list;
   
           Random getRandom() {
               return random;
           }
   
           List<String> getList() {
               return list;
           }
   
           @Setup(Level.Invocation)
           public void setUp() {
               random = new Random(42);
               list = new LinkedList<String>();
               for (int i = 0;i < 2000; i ++) {
                   list.add("worker-" + i);
               }
           }
       }
   
       @Benchmark
       public void sublistTest(Blackhole blackhole, InputState state) {
           List<String> data = state.getList();
           Random random = state.getRandom();
   
           int startIndex = random.nextInt(data.size());
           int size = data.size();
   
           ArrayList<String> selectedWorkers = new ArrayList<>(size);
           selectedWorkers.addAll(data.subList(startIndex, Math.min(size, 
startIndex + 200)));
           if (startIndex + 200 > size) {
               selectedWorkers.addAll(data.subList(0, startIndex + 200 - size));
           }
   
           blackhole.consume(selectedWorkers);
       }
   
       @Benchmark
       public void indexTest(Blackhole blackhole, InputState state) {
           List<String> data = state.getList();
           Random random = state.getRandom();
   
           int startIndex = random.nextInt(data.size());
           int endIndex = startIndex + 200;
           int size = data.size();
   
           ArrayList<String> selectedWorkers = new ArrayList<>(size);
           for (int i = startIndex; i < endIndex; i ++) {
               int index = i % size;
               selectedWorkers.add(data.get(index));
           }
   
           blackhole.consume(selectedWorkers);
       }
   
       public static void main(String[] args) throws Exception {
           org.openjdk.jmh.Main.main(args);
       }
   }
   ```



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: [email protected]

For queries about this service, please contact Infrastructure at:
[email protected]

Reply via email to