Repository: incubator-reef
Updated Branches:
  refs/heads/master 4d5ca8b07 -> 073f18e40


[REEF-518] Smarter VortexWorker-Tasklet scheduling

  This addressed the issue by
  * Creating pluggable SchedulingPolicy for choosing a worker to launch a 
tasklet onto.
  * Implementing two types of SchedulingPolicy: FirstFit and Random.
  * Creating unit tests for testing these two policies.
  * Making changes in related classes to configure and use the SchedulingPolicy.

  Zhengping Qian implemented the core logic. John Yang wrote tests, code for 
configurations, and JavaDocs.
  Note that we changed the name of the class PendingTaskletScheduler to 
PendingTaskletLauncher to
  make it clearer that the worker-tasklet matching is the responsibility of 
RunningWorkers.
  In the code you can see that SchedulingPolicy is injected into RunningWorkers 
and used there.

  We expect that later the PendingTaskletLauncher will be responsibile for 
addressing REEF-500.

JIRA:
  [REEF-518](https://issues.apache.org/jira/browse/REEF-518)

Pull Request:
  Closes #427


Project: http://git-wip-us.apache.org/repos/asf/incubator-reef/repo
Commit: http://git-wip-us.apache.org/repos/asf/incubator-reef/commit/073f18e4
Tree: http://git-wip-us.apache.org/repos/asf/incubator-reef/tree/073f18e4
Diff: http://git-wip-us.apache.org/repos/asf/incubator-reef/diff/073f18e4

Branch: refs/heads/master
Commit: 073f18e4043cb598d7bca129716cf4b3400b6f7c
Parents: 4d5ca8b
Author: John Yang <johnya...@apache.org>
Authored: Wed Aug 26 14:50:52 2015 +0800
Committer: Byung-Gon Chun <bgc...@apache.org>
Committed: Sun Aug 30 12:45:23 2015 +0900

----------------------------------------------------------------------
 .../reef/vortex/driver/DefaultVortexMaster.java |   9 +-
 .../vortex/driver/FirstFitSchedulingPolicy.java | 151 +++++++++++++
 .../vortex/driver/PendingTaskletLauncher.java   |  59 +++++
 .../vortex/driver/PendingTaskletScheduler.java  |  59 -----
 .../vortex/driver/RandomSchedulingPolicy.java   | 105 +++++++++
 .../reef/vortex/driver/RunningWorkers.java      | 219 +++++++++----------
 .../reef/vortex/driver/SchedulingPolicy.java    |  60 +++++
 .../reef/vortex/driver/VortexConfHelper.java    |   4 +-
 .../apache/reef/vortex/driver/VortexDriver.java |   4 +-
 .../reef/vortex/driver/VortexLauncher.java      |  12 +-
 .../reef/vortex/driver/VortexMasterConf.java    |  18 +-
 .../reef/vortex/driver/VortexWorkerManager.java |  27 ++-
 .../reef/vortex/examples/addone/AddOne.java     |   2 +-
 .../reef/vortex/examples/hello/HelloVortex.java |   2 +-
 .../vortex/driver/DefaultVortexMasterTest.java  |  31 ++-
 .../reef/vortex/driver/RunningWorkersTest.java  |   6 +-
 .../vortex/driver/SchedulingPolicyTest.java     | 120 ++++++++++
 .../applications/vortex/addone/AddOneTest.java  |   2 +-
 18 files changed, 664 insertions(+), 226 deletions(-)
----------------------------------------------------------------------


http://git-wip-us.apache.org/repos/asf/incubator-reef/blob/073f18e4/lang/java/reef-applications/reef-vortex/src/main/java/org/apache/reef/vortex/driver/DefaultVortexMaster.java
----------------------------------------------------------------------
diff --git 
a/lang/java/reef-applications/reef-vortex/src/main/java/org/apache/reef/vortex/driver/DefaultVortexMaster.java
 
b/lang/java/reef-applications/reef-vortex/src/main/java/org/apache/reef/vortex/driver/DefaultVortexMaster.java
index f1a55b7..206f7e3 100644
--- 
a/lang/java/reef-applications/reef-vortex/src/main/java/org/apache/reef/vortex/driver/DefaultVortexMaster.java
+++ 
b/lang/java/reef-applications/reef-vortex/src/main/java/org/apache/reef/vortex/driver/DefaultVortexMaster.java
@@ -20,6 +20,7 @@ package org.apache.reef.vortex.driver;
 
 import net.jcip.annotations.ThreadSafe;
 import org.apache.reef.annotations.audience.DriverSide;
+import org.apache.reef.util.Optional;
 import org.apache.reef.vortex.api.VortexFunction;
 import org.apache.reef.vortex.api.VortexFuture;
 
@@ -74,9 +75,11 @@ final class DefaultVortexMaster implements VortexMaster {
    */
   @Override
   public void workerPreempted(final String id) {
-    final Collection<Tasklet> preemptedTasklets = 
runningWorkers.removeWorker(id);
-    for (final Tasklet tasklet : preemptedTasklets) {
-      pendingTasklets.addFirst(tasklet);
+    final Optional<Collection<Tasklet>> preemptedTasklets = 
runningWorkers.removeWorker(id);
+    if (preemptedTasklets.isPresent()) {
+      for (final Tasklet tasklet : preemptedTasklets.get()) {
+        pendingTasklets.addFirst(tasklet);
+      }
     }
   }
 

http://git-wip-us.apache.org/repos/asf/incubator-reef/blob/073f18e4/lang/java/reef-applications/reef-vortex/src/main/java/org/apache/reef/vortex/driver/FirstFitSchedulingPolicy.java
----------------------------------------------------------------------
diff --git 
a/lang/java/reef-applications/reef-vortex/src/main/java/org/apache/reef/vortex/driver/FirstFitSchedulingPolicy.java
 
b/lang/java/reef-applications/reef-vortex/src/main/java/org/apache/reef/vortex/driver/FirstFitSchedulingPolicy.java
new file mode 100644
index 0000000..6dfd8bb
--- /dev/null
+++ 
b/lang/java/reef-applications/reef-vortex/src/main/java/org/apache/reef/vortex/driver/FirstFitSchedulingPolicy.java
@@ -0,0 +1,151 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *   http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied.  See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+package org.apache.reef.vortex.driver;
+
+import java.util.ArrayList;
+import java.util.HashMap;
+import java.util.List;
+
+import javax.inject.Inject;
+
+import net.jcip.annotations.NotThreadSafe;
+import org.apache.reef.tang.annotations.Parameter;
+import org.apache.reef.util.Optional;
+
+/**
+ * Always select the next worker that has enough resource in a round-robin 
fashion
+ * based on the worker capacity configured.
+ */
+@NotThreadSafe
+class FirstFitSchedulingPolicy implements SchedulingPolicy {
+
+  private final int workerCapacity;
+
+  /**
+   * Keep the load information for each worker.
+   */
+  private final HashMap<String, Integer> idLoadMap = new HashMap<>();
+
+  /**
+   * A linked list for a circular buffer of worker ids for search in a 
round-robin fashion.
+   */
+  private final List<String> idList = new ArrayList<>();
+
+  /**
+   * The index of the next/first worker to check.
+   */
+  private int nextIndex = 0;
+  
+
+  @Inject
+  FirstFitSchedulingPolicy(@Parameter(VortexMasterConf.WorkerCapacity.class) 
final int capacity) {
+    this.workerCapacity = capacity;
+  }
+
+  /**
+   * Checking from nextIndex, choose the first worker that fits to schedule 
the tasklet onto.
+   * @param tasklet to schedule
+   * @return the next worker that has enough resources for the tasklet
+   */
+  @Override
+  public Optional<String> trySchedule(final Tasklet tasklet) {
+    for (int i = 0; i < idList.size(); i++) {
+      final int index = (nextIndex + i) % idList.size();
+      final String workerId = idList.get(index);
+      
+      if (idLoadMap.get(workerId) < workerCapacity) {
+        nextIndex = (index + 1) % idList.size();
+        return Optional.of(workerId);
+      }
+    }
+    return Optional.empty();
+  }
+
+  /**
+   * @param vortexWorker added
+   */
+  @Override
+  public void workerAdded(final VortexWorkerManager vortexWorker) {
+    final String workerId = vortexWorker.getId();
+    if (!idLoadMap.containsKey(workerId)) { // Ignore duplicate add.
+      idLoadMap.put(workerId, 0);
+      idList.add(nextIndex, workerId); // Prefer to schedule the new worker 
ASAP.
+    }
+  }
+
+  /**
+   * @param vortexWorker removed
+   */
+  @Override
+  public void workerRemoved(final VortexWorkerManager vortexWorker) {
+    final String workerId = vortexWorker.getId();
+    if (idLoadMap.remove(workerId) != null) { // Ignore invalid removal.
+      for (int i = 0; i < idList.size(); i++) { // This looping operation 
might degrade performance.
+        if (idList.get(i).equals(workerId)) {
+          idList.remove(i);
+          
+          if (i < nextIndex) {
+            nextIndex--;
+          } else if (nextIndex == idList.size()) {
+            nextIndex = 0;
+          }
+          return;
+        }
+      }
+    }
+  }
+
+  /**
+   * @param vortexWorker that the tasklet was launched onto
+   * @param tasklet launched
+   */
+  @Override
+  public void taskletLaunched(final VortexWorkerManager vortexWorker, final 
Tasklet tasklet) {
+    final String workerId = vortexWorker.getId();
+    if (idLoadMap.containsKey(workerId)) {
+      idLoadMap.put(workerId, Math.min(workerCapacity, idLoadMap.get(workerId) 
+ 1));
+    }
+  }
+
+  /**
+   * @param vortexWorker that the tasklet completed in
+   * @param tasklet completed
+   */
+  @Override
+  public void taskletCompleted(final VortexWorkerManager vortexWorker, final 
Tasklet tasklet) {
+    final String workerId = vortexWorker.getId();
+    removeTasklet(workerId);
+  }
+
+  /**
+   * @param vortexWorker that the tasklet failed in
+   * @param tasklet failed
+   */
+  @Override
+  public void taskletFailed(final VortexWorkerManager vortexWorker, final 
Tasklet tasklet) {
+    final String workerId = vortexWorker.getId();
+    removeTasklet(workerId);
+  }
+
+  private void removeTasklet(final String workerId) {
+    if (idLoadMap.containsKey(workerId)) {
+      idLoadMap.put(workerId, Math.max(0, idLoadMap.get(workerId) - 1));
+    }
+  }
+}

http://git-wip-us.apache.org/repos/asf/incubator-reef/blob/073f18e4/lang/java/reef-applications/reef-vortex/src/main/java/org/apache/reef/vortex/driver/PendingTaskletLauncher.java
----------------------------------------------------------------------
diff --git 
a/lang/java/reef-applications/reef-vortex/src/main/java/org/apache/reef/vortex/driver/PendingTaskletLauncher.java
 
b/lang/java/reef-applications/reef-vortex/src/main/java/org/apache/reef/vortex/driver/PendingTaskletLauncher.java
new file mode 100644
index 0000000..adb2c1d
--- /dev/null
+++ 
b/lang/java/reef-applications/reef-vortex/src/main/java/org/apache/reef/vortex/driver/PendingTaskletLauncher.java
@@ -0,0 +1,59 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *   http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied.  See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+package org.apache.reef.vortex.driver;
+
+import org.apache.reef.annotations.audience.DriverSide;
+import org.apache.reef.wake.EventHandler;
+
+import javax.inject.Inject;
+import java.util.logging.Level;
+import java.util.logging.Logger;
+
+/**
+ * Pending Tasklet Launcher.
+ */
+@DriverSide
+final class PendingTaskletLauncher implements EventHandler<Integer> {
+  private static final Logger LOG = 
Logger.getLogger(PendingTaskletLauncher.class.getName());
+
+  private final RunningWorkers runningWorkers;
+  private final PendingTasklets pendingTasklets;
+
+  @Inject
+  private PendingTaskletLauncher(final RunningWorkers runningWorkers,
+                                 final PendingTasklets pendingTasklets) {
+    this.runningWorkers = runningWorkers;
+    this.pendingTasklets = pendingTasklets;
+  }
+
+  /**
+   * Repeatedly take a tasklet from the pending queue and launch it via 
RunningWorkers.
+   */
+  @Override
+  public void onNext(final Integer integer) {
+    while (!runningWorkers.isTerminated()) {
+      try {
+        final Tasklet tasklet = pendingTasklets.takeFirst(); // blocks when no 
tasklet exists
+        runningWorkers.launchTasklet(tasklet); // blocks when no worker exists
+      } catch (InterruptedException e) {
+        LOG.log(Level.INFO, "Interrupted upon termination");
+      }
+    }
+  }
+}
\ No newline at end of file

http://git-wip-us.apache.org/repos/asf/incubator-reef/blob/073f18e4/lang/java/reef-applications/reef-vortex/src/main/java/org/apache/reef/vortex/driver/PendingTaskletScheduler.java
----------------------------------------------------------------------
diff --git 
a/lang/java/reef-applications/reef-vortex/src/main/java/org/apache/reef/vortex/driver/PendingTaskletScheduler.java
 
b/lang/java/reef-applications/reef-vortex/src/main/java/org/apache/reef/vortex/driver/PendingTaskletScheduler.java
deleted file mode 100644
index 6a0b8b0..0000000
--- 
a/lang/java/reef-applications/reef-vortex/src/main/java/org/apache/reef/vortex/driver/PendingTaskletScheduler.java
+++ /dev/null
@@ -1,59 +0,0 @@
-/*
- * Licensed to the Apache Software Foundation (ASF) under one
- * or more contributor license agreements.  See the NOTICE file
- * distributed with this work for additional information
- * regarding copyright ownership.  The ASF licenses this file
- * to you under the Apache License, Version 2.0 (the
- * "License"); you may not use this file except in compliance
- * with the License.  You may obtain a copy of the License at
- *
- *   http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing,
- * software distributed under the License is distributed on an
- * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
- * KIND, either express or implied.  See the License for the
- * specific language governing permissions and limitations
- * under the License.
- */
-package org.apache.reef.vortex.driver;
-
-import org.apache.reef.annotations.audience.DriverSide;
-import org.apache.reef.wake.EventHandler;
-
-import javax.inject.Inject;
-import java.util.logging.Level;
-import java.util.logging.Logger;
-
-/**
- * Pending Tasklet Scheduler.
- */
-@DriverSide
-final class PendingTaskletScheduler implements EventHandler<Integer> {
-  private static final Logger LOG = 
Logger.getLogger(PendingTaskletScheduler.class.getName());
-
-  private final RunningWorkers runningWorkers;
-  private final PendingTasklets pendingTasklets;
-
-  @Inject
-  private PendingTaskletScheduler(final RunningWorkers runningWorkers,
-                                  final PendingTasklets pendingTasklets) {
-    this.runningWorkers = runningWorkers;
-    this.pendingTasklets = pendingTasklets;
-  }
-
-  /**
-   * Repeatedly take a tasklet from the pending queue and schedule/launch it 
via RunningWorkers.
-   */
-  @Override
-  public void onNext(final Integer integer) {
-    while (!runningWorkers.isTerminated()) {
-      try {
-        final Tasklet tasklet = pendingTasklets.takeFirst(); // blocks when no 
tasklet exists
-        runningWorkers.launchTasklet(tasklet); // blocks when no worker exists
-      } catch (InterruptedException e) {
-        LOG.log(Level.INFO, "Interrupted upon termination");
-      }
-    }
-  }
-}
\ No newline at end of file

http://git-wip-us.apache.org/repos/asf/incubator-reef/blob/073f18e4/lang/java/reef-applications/reef-vortex/src/main/java/org/apache/reef/vortex/driver/RandomSchedulingPolicy.java
----------------------------------------------------------------------
diff --git 
a/lang/java/reef-applications/reef-vortex/src/main/java/org/apache/reef/vortex/driver/RandomSchedulingPolicy.java
 
b/lang/java/reef-applications/reef-vortex/src/main/java/org/apache/reef/vortex/driver/RandomSchedulingPolicy.java
new file mode 100644
index 0000000..2f71cd7
--- /dev/null
+++ 
b/lang/java/reef-applications/reef-vortex/src/main/java/org/apache/reef/vortex/driver/RandomSchedulingPolicy.java
@@ -0,0 +1,105 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *   http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied.  See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+package org.apache.reef.vortex.driver;
+
+import net.jcip.annotations.NotThreadSafe;
+import org.apache.reef.util.Optional;
+
+import java.util.ArrayList;
+import java.util.List;
+import java.util.Random;
+
+import javax.inject.Inject;
+
+/**
+ * Randomly select a running worker for scheduling a tasklet,
+ * without considering worker load or capacity.
+ */
+@NotThreadSafe
+class RandomSchedulingPolicy implements SchedulingPolicy {
+  private final Random rand = new Random();
+
+  /**
+   * Keep the worker ids in an array for fast random selection.
+   * 
+   * Add/removal from the array require O(n) complexity.
+   */
+  private final List<String> idList = new ArrayList<>();
+
+  @Inject
+  RandomSchedulingPolicy() {
+  }
+
+  /**
+   * @param tasklet to schedule
+   * @return a random worker
+   */
+  @Override
+  public Optional<String> trySchedule(final Tasklet tasklet) {
+    if (idList.isEmpty()) {
+      return Optional.empty();
+    } else {
+      final int index = rand.nextInt(idList.size());
+      return Optional.of(idList.get(index));
+    }
+  }
+
+  /**
+   * @param vortexWorker added
+   */
+  @Override
+  public void workerAdded(final VortexWorkerManager vortexWorker) {
+    if (idList.indexOf(vortexWorker.getId()) == -1) { // Ignore duplicate add.
+      idList.add(vortexWorker.getId());
+    }
+  }
+
+  /**
+   * @param vortexWorker removed
+   */
+  @Override
+  public void workerRemoved(final VortexWorkerManager vortexWorker) {
+    idList.remove(vortexWorker.getId()); // Ignore invalid removal.
+  }
+
+  /**
+   * Do nothing.
+   */
+  @Override
+  public void taskletLaunched(final VortexWorkerManager vortexWorker, final 
Tasklet tasklet) {
+    // Do nothing
+  }
+
+  /**
+   * Do nothing.
+   */
+  @Override
+  public void taskletCompleted(final VortexWorkerManager vortexWorker, final 
Tasklet tasklet) {
+    // Do nothing
+  }
+
+  /**
+   * Do nothing.
+   */
+  @Override
+  public void taskletFailed(final VortexWorkerManager vortexWorker, final 
Tasklet tasklet) {
+    // Do nothing
+  }
+
+}

http://git-wip-us.apache.org/repos/asf/incubator-reef/blob/073f18e4/lang/java/reef-applications/reef-vortex/src/main/java/org/apache/reef/vortex/driver/RunningWorkers.java
----------------------------------------------------------------------
diff --git 
a/lang/java/reef-applications/reef-vortex/src/main/java/org/apache/reef/vortex/driver/RunningWorkers.java
 
b/lang/java/reef-applications/reef-vortex/src/main/java/org/apache/reef/vortex/driver/RunningWorkers.java
index f712786..01be3a6 100644
--- 
a/lang/java/reef-applications/reef-vortex/src/main/java/org/apache/reef/vortex/driver/RunningWorkers.java
+++ 
b/lang/java/reef-applications/reef-vortex/src/main/java/org/apache/reef/vortex/driver/RunningWorkers.java
@@ -19,14 +19,17 @@
 package org.apache.reef.vortex.driver;
 
 import net.jcip.annotations.ThreadSafe;
+
 import org.apache.reef.annotations.audience.DriverSide;
+import org.apache.reef.util.Optional;
 
 import javax.inject.Inject;
+
 import java.io.Serializable;
 import java.util.*;
 import java.util.concurrent.locks.Condition;
 import java.util.concurrent.locks.Lock;
-import java.util.concurrent.locks.ReentrantReadWriteLock;
+import java.util.concurrent.locks.ReentrantLock;
 
 /**
  * Keeps track of all running VortexWorkers and Tasklets.
@@ -37,22 +40,24 @@ import java.util.concurrent.locks.ReentrantReadWriteLock;
 final class RunningWorkers {
   // RunningWorkers and its locks
   private final HashMap<String, VortexWorkerManager> runningWorkers = new 
HashMap<>(); // Running workers/tasklets
-  private final ReentrantReadWriteLock rwLock = new ReentrantReadWriteLock();
-  private final Lock readLock = rwLock.readLock(); // Need to acquire this to 
launch/complete tasklets
-  private final Lock writeLock = rwLock.writeLock(); // Need to acquire this 
to add/remove worker
-  private final Condition noRunningWorker = writeLock.newCondition(); // When 
there's no running worker
+  private final Lock lock = new ReentrantLock();
+  private final Condition noWorkerOrResource = lock.newCondition();
 
   // To keep track of workers that are preempted before acknowledged
   private final Set<String> removedBeforeAddedWorkers = new HashSet<>();
 
   // Terminated
-  private volatile boolean terminated = false;
+  private boolean terminated = false;
+
+  // Scheduling policy
+  private final SchedulingPolicy schedulingPolicy;
 
   /**
    * RunningWorkers constructor.
    */
   @Inject
-  RunningWorkers() {
+  RunningWorkers(final SchedulingPolicy schedulingPolicy) {
+    this.schedulingPolicy = schedulingPolicy;
   }
 
   /**
@@ -60,54 +65,49 @@ final class RunningWorkers {
    * Parameter: Called exactly once per vortexWorkerManager.
    */
   void addWorker(final VortexWorkerManager vortexWorkerManager) {
-    if (!terminated) {
-      writeLock.lock();
-      try {
-        if (!terminated) { // Make sure it is really terminated
-          if 
(!removedBeforeAddedWorkers.contains(vortexWorkerManager.getId())) {
-            this.runningWorkers.put(vortexWorkerManager.getId(), 
vortexWorkerManager);
-
-            // Notify (possibly) waiting scheduler
-            if (runningWorkers.size() == 1) {
-              noRunningWorker.signalAll();
-            }
-          }
-          return;
+    lock.lock();
+    try {
+      if (!terminated) {
+        if (!removedBeforeAddedWorkers.contains(vortexWorkerManager.getId())) {
+          this.runningWorkers.put(vortexWorkerManager.getId(), 
vortexWorkerManager);
+          this.schedulingPolicy.workerAdded(vortexWorkerManager);
+
+          // Notify (possibly) waiting scheduler
+          noWorkerOrResource.signal();
         }
-      } finally {
-        writeLock.unlock();
+      } else {
+        // Terminate the worker
+        vortexWorkerManager.terminate();
       }
+    } finally {
+      lock.unlock();
     }
-
-    // Terminate the worker
-    vortexWorkerManager.terminate();
   }
 
   /**
    * Concurrency: Called by multiple threads.
    * Parameter: Called exactly once per id.
    */
-  Collection<Tasklet> removeWorker(final String id) {
-    if (!terminated) {
-      writeLock.lock();
-      try {
-        if (!terminated) { // Make sure it is really terminated
-          final VortexWorkerManager vortexWorkerManager = 
this.runningWorkers.remove(id);
-          if (vortexWorkerManager != null) {
-            return vortexWorkerManager.removed();
-          } else {
-            // Called before addWorker (e.g. RM preempted the resource before 
the Evaluator started)
-            removedBeforeAddedWorkers.add(id);
-            return new ArrayList<>(0);
-          }
+  Optional<Collection<Tasklet>> removeWorker(final String id) {
+    lock.lock();
+    try {
+      if (!terminated) {
+        final VortexWorkerManager vortexWorkerManager = 
this.runningWorkers.remove(id);
+        if (vortexWorkerManager != null) {
+          this.schedulingPolicy.workerRemoved(vortexWorkerManager);
+          return Optional.ofNullable(vortexWorkerManager.removed());
+        } else {
+          // Called before addWorker (e.g. RM preempted the resource before 
the Evaluator started)
+          removedBeforeAddedWorkers.add(id);
+          return Optional.empty();
         }
-      } finally {
-        writeLock.unlock();
+      } else {
+        // No need to return anything since it is terminated
+        return Optional.empty();
       }
+    } finally {
+      lock.unlock();
     }
-
-    // No need to return anything since it is terminated
-    return new ArrayList<>(0);
   }
 
   /**
@@ -115,35 +115,29 @@ final class RunningWorkers {
    * Parameter: Same tasklet can be launched multiple times.
    */
   void launchTasklet(final Tasklet tasklet) {
-    if (!terminated) {
-      readLock.lock();
-      try {
-        if (!terminated) { // Make sure it is really terminated
-          // Wait until there is a running worker
-          if (runningWorkers.isEmpty()) {
-            readLock.unlock();
-            writeLock.lock();
+    lock.lock();
+    try {
+      if (!terminated) {
+        Optional<String> workerId;
+        while(true) {
+          workerId = schedulingPolicy.trySchedule(tasklet);
+          if (!workerId.isPresent()) {
             try {
-              while (runningWorkers.isEmpty()) {
-                try {
-                  noRunningWorker.await();
-                } catch (InterruptedException e) {
-                  throw new RuntimeException(e);
-                }
-              }
-              readLock.lock();
-            } finally {
-              writeLock.unlock();
+              noWorkerOrResource.await();
+            } catch (InterruptedException e) {
+              throw new RuntimeException(e);
             }
+          } else {
+            break;
           }
-
-          final VortexWorkerManager vortexWorkerManager = 
randomlyChooseWorker();
-          vortexWorkerManager.launchTasklet(tasklet);
-          return;
         }
-      } finally {
-        readLock.unlock();
+
+        final VortexWorkerManager vortexWorkerManager = 
runningWorkers.get(workerId.get());
+        vortexWorkerManager.launchTasklet(tasklet);
+        schedulingPolicy.taskletLaunched(vortexWorkerManager, tasklet);
       }
+    } finally {
+      lock.unlock();
     }
   }
 
@@ -153,20 +147,22 @@ final class RunningWorkers {
    * (e.g. preemption message coming before tasklet completion message 
multiple times)
    */
   void completeTasklet(final String workerId,
-                              final int taskletId,
-                              final Serializable result) {
-    if (!terminated) {
-      readLock.lock();
-      try {
-        if (!terminated) { // Make sure it is really terminated
-          if (runningWorkers.containsKey(workerId)) { // Preemption can come 
before
-            this.runningWorkers.get(workerId).taskletCompleted(taskletId, 
result);
-          }
-          return;
+                       final int taskletId,
+                       final Serializable result) {
+    lock.lock();
+    try {
+      if (!terminated) {
+        if (runningWorkers.containsKey(workerId)) { // Preemption can come 
before
+          final VortexWorkerManager worker = this.runningWorkers.get(workerId);
+          final Tasklet tasklet = worker.taskletCompleted(taskletId, result);
+          this.schedulingPolicy.taskletCompleted(worker, tasklet);
+
+          // Notify (possibly) waiting scheduler
+          noWorkerOrResource.signal();
         }
-      } finally {
-        readLock.unlock();
       }
+    } finally {
+      lock.unlock();
     }
   }
 
@@ -176,60 +172,47 @@ final class RunningWorkers {
    * (e.g. preemption message coming before tasklet error message multiple 
times)
    */
   void errorTasklet(final String workerId,
-                           final int taskletId,
-                           final Exception exception) {
-    if (!terminated) {
-      readLock.lock();
-      try {
-        if (!terminated) { // Make sure it is really terminated
-          if (runningWorkers.containsKey(workerId)) { // Preemption can come 
before
-            this.runningWorkers.get(workerId).taskletThrewException(taskletId, 
exception);
-          }
-          return;
+                    final int taskletId,
+                    final Exception exception) {
+    lock.lock();
+    try {
+      if (!terminated) {
+        if (runningWorkers.containsKey(workerId)) { // Preemption can come 
before
+          final VortexWorkerManager worker = this.runningWorkers.get(workerId);
+          final Tasklet tasklet = worker.taskletThrewException(taskletId, 
exception);
+          this.schedulingPolicy.taskletFailed(worker, tasklet);
+
+          // Notify (possibly) waiting scheduler
+          noWorkerOrResource.signal();
         }
-      } finally {
-        readLock.unlock();
       }
+    } finally {
+      lock.unlock();
     }
   }
 
   void terminate() {
-    if (!terminated) {
-      writeLock.lock();
-      try {
-        if (!terminated) { // Make sure it is really terminated
-          terminated = true;
-          for (final VortexWorkerManager vortexWorkerManager : 
runningWorkers.values()) {
-            vortexWorkerManager.terminate();
-          }
-          runningWorkers.clear();
-          return;
+    lock.lock();
+    try {
+      if (!terminated) {
+        terminated = true;
+        for (final VortexWorkerManager vortexWorkerManager : 
runningWorkers.values()) {
+          vortexWorkerManager.terminate();
+          schedulingPolicy.workerRemoved(vortexWorkerManager);
         }
-      } finally {
-        writeLock.unlock();
+        runningWorkers.clear();
+      } else {
+        throw new RuntimeException("Attempting to terminate an already 
terminated RunningWorkers");
       }
+    } finally {
+      lock.unlock();
     }
-    throw new RuntimeException("Attempting to terminate an already terminated 
RunningWorkers");
   }
 
   boolean isTerminated() {
     return terminated;
   }
 
-  private VortexWorkerManager randomlyChooseWorker() {
-    final Collection<VortexWorkerManager> workers = runningWorkers.values();
-    final int index = new Random().nextInt(workers.size());
-    int i = 0;
-    for (final VortexWorkerManager vortexWorkerManager : workers) {
-      if (i == index) {
-        return vortexWorkerManager;
-      } else {
-        i++;
-      }
-    }
-    throw new RuntimeException("Bad Index");
-  }
-
   ///////////////////////////////////////// For Tests Only
 
   /**

http://git-wip-us.apache.org/repos/asf/incubator-reef/blob/073f18e4/lang/java/reef-applications/reef-vortex/src/main/java/org/apache/reef/vortex/driver/SchedulingPolicy.java
----------------------------------------------------------------------
diff --git 
a/lang/java/reef-applications/reef-vortex/src/main/java/org/apache/reef/vortex/driver/SchedulingPolicy.java
 
b/lang/java/reef-applications/reef-vortex/src/main/java/org/apache/reef/vortex/driver/SchedulingPolicy.java
new file mode 100644
index 0000000..cef1cb6
--- /dev/null
+++ 
b/lang/java/reef-applications/reef-vortex/src/main/java/org/apache/reef/vortex/driver/SchedulingPolicy.java
@@ -0,0 +1,60 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *   http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied.  See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+package org.apache.reef.vortex.driver;
+
+import org.apache.reef.tang.annotations.DefaultImplementation;
+import org.apache.reef.util.Optional;
+
+/**
+ * For choosing which worker to schedule the tasklet onto.
+ */
+@DefaultImplementation(FirstFitSchedulingPolicy.class)
+interface SchedulingPolicy {
+  /**
+   * Implementation of this method is expected to be fast.
+   * @param tasklet to schedule
+   * @return the worker onto which the tasklet should be scheduled, null if 
there's none
+   */
+  Optional<String> trySchedule(final Tasklet tasklet);
+
+  /**
+   * Worker added.
+   */
+  void workerAdded(final VortexWorkerManager vortexWorker);
+
+  /**
+   * Worker removed.
+   */
+  void workerRemoved(final VortexWorkerManager vortexWorker);
+
+  /**
+   * Tasklet launched.
+   */
+  void taskletLaunched(final VortexWorkerManager vortexWorker, final Tasklet 
tasklet);
+
+  /**
+   * Tasklet completed.
+   */
+  void taskletCompleted(final VortexWorkerManager vortexWorker, final Tasklet 
tasklet);
+
+  /**
+   * Tasklet failed.
+   */
+  void taskletFailed(final VortexWorkerManager vortexWorker, final Tasklet 
tasklet);
+}

http://git-wip-us.apache.org/repos/asf/incubator-reef/blob/073f18e4/lang/java/reef-applications/reef-vortex/src/main/java/org/apache/reef/vortex/driver/VortexConfHelper.java
----------------------------------------------------------------------
diff --git 
a/lang/java/reef-applications/reef-vortex/src/main/java/org/apache/reef/vortex/driver/VortexConfHelper.java
 
b/lang/java/reef-applications/reef-vortex/src/main/java/org/apache/reef/vortex/driver/VortexConfHelper.java
index a7b9106..5c8244d 100644
--- 
a/lang/java/reef-applications/reef-vortex/src/main/java/org/apache/reef/vortex/driver/VortexConfHelper.java
+++ 
b/lang/java/reef-applications/reef-vortex/src/main/java/org/apache/reef/vortex/driver/VortexConfHelper.java
@@ -42,7 +42,8 @@ public final class VortexConfHelper {
                                             final Class<? extends VortexStart> 
vortexStart,
                                             final int numOfWorkers,
                                             final int workerMemory,
-                                            final int workerCores) {
+                                            final int workerCores,
+                                            final int workerCapacity) {
     final Configuration vortexDriverConf = DriverConfiguration.CONF
         .set(DriverConfiguration.GLOBAL_LIBRARIES, 
EnvironmentUtils.getClassLocation(VortexDriver.class))
         .set(DriverConfiguration.ON_DRIVER_STARTED, 
VortexDriver.StartHandler.class)
@@ -57,6 +58,7 @@ public final class VortexConfHelper {
         .set(VortexMasterConf.WORKER_NUM, numOfWorkers)
         .set(VortexMasterConf.WORKER_MEM, workerMemory)
         .set(VortexMasterConf.WORKER_CORES, workerCores)
+        .set(VortexMasterConf.WORKER_CAPACITY, workerCapacity)
         .set(VortexMasterConf.VORTEX_START, vortexStart)
         .set(VortexMasterConf.NUM_OF_VORTEX_START_THERAD, 
DEFAULT_NUM_OF_VORTEX_START_THERAD) // fixed to 1 for now
         .build();

http://git-wip-us.apache.org/repos/asf/incubator-reef/blob/073f18e4/lang/java/reef-applications/reef-vortex/src/main/java/org/apache/reef/vortex/driver/VortexDriver.java
----------------------------------------------------------------------
diff --git 
a/lang/java/reef-applications/reef-vortex/src/main/java/org/apache/reef/vortex/driver/VortexDriver.java
 
b/lang/java/reef-applications/reef-vortex/src/main/java/org/apache/reef/vortex/driver/VortexDriver.java
index ef3061b..977302f 100644
--- 
a/lang/java/reef-applications/reef-vortex/src/main/java/org/apache/reef/vortex/driver/VortexDriver.java
+++ 
b/lang/java/reef-applications/reef-vortex/src/main/java/org/apache/reef/vortex/driver/VortexDriver.java
@@ -74,14 +74,14 @@ final class VortexDriver {
                        final VortexMaster vortexMaster,
                        final VortexStart vortexStart,
                        final VortexStartExecutor vortexStartExecutor,
-                       final PendingTaskletScheduler pendingTaskletScheduler,
+                       final PendingTaskletLauncher pendingTaskletLauncher,
                        @Parameter(VortexMasterConf.WorkerMem.class) final int 
workerMem,
                        @Parameter(VortexMasterConf.WorkerNum.class) final int 
workerNum,
                        @Parameter(VortexMasterConf.WorkerCores.class) final 
int workerCores,
                        
@Parameter(VortexMasterConf.NumberOfVortexStartThreads.class) final int 
numOfStartThreads) {
     this.vortexStartEStage = new ThreadPoolStage<>(vortexStartExecutor, 
numOfStartThreads);
     this.vortexStart = vortexStart;
-    this.pendingTaskletSchedulerEStage = new 
SingleThreadStage<>(pendingTaskletScheduler, 1);
+    this.pendingTaskletSchedulerEStage = new 
SingleThreadStage<>(pendingTaskletLauncher, 1);
     this.evaluatorRequestor = evaluatorRequestor;
     this.vortexMaster = vortexMaster;
     this.vortexRequestor = vortexRequestor;

http://git-wip-us.apache.org/repos/asf/incubator-reef/blob/073f18e4/lang/java/reef-applications/reef-vortex/src/main/java/org/apache/reef/vortex/driver/VortexLauncher.java
----------------------------------------------------------------------
diff --git 
a/lang/java/reef-applications/reef-vortex/src/main/java/org/apache/reef/vortex/driver/VortexLauncher.java
 
b/lang/java/reef-applications/reef-vortex/src/main/java/org/apache/reef/vortex/driver/VortexLauncher.java
index b288824..c7461e5 100644
--- 
a/lang/java/reef-applications/reef-vortex/src/main/java/org/apache/reef/vortex/driver/VortexLauncher.java
+++ 
b/lang/java/reef-applications/reef-vortex/src/main/java/org/apache/reef/vortex/driver/VortexLauncher.java
@@ -43,12 +43,18 @@ public final class VortexLauncher {
                                            final Class<? extends VortexStart> 
vortexUserCode,
                                            final int numOfWorkers,
                                            final int workerMemory,
-                                           final int workerCores) {
+                                           final int workerCores,
+                                           final int workerCapacity) {
     final Configuration runtimeConf = LocalRuntimeConfiguration.CONF
         .set(LocalRuntimeConfiguration.MAX_NUMBER_OF_EVALUATORS, 
MAX_NUMBER_OF_EVALUATORS)
         .build();
-    final Configuration vortexConf =
-        VortexConfHelper.getVortexConf(jobName, vortexUserCode, numOfWorkers, 
workerMemory, workerCores);
+    final Configuration vortexConf = VortexConfHelper.getVortexConf(
+        jobName,
+        vortexUserCode,
+        numOfWorkers,
+        workerMemory,
+        workerCores,
+        workerCapacity);
     return launch(runtimeConf, vortexConf);
   }
 

http://git-wip-us.apache.org/repos/asf/incubator-reef/blob/073f18e4/lang/java/reef-applications/reef-vortex/src/main/java/org/apache/reef/vortex/driver/VortexMasterConf.java
----------------------------------------------------------------------
diff --git 
a/lang/java/reef-applications/reef-vortex/src/main/java/org/apache/reef/vortex/driver/VortexMasterConf.java
 
b/lang/java/reef-applications/reef-vortex/src/main/java/org/apache/reef/vortex/driver/VortexMasterConf.java
index 358dcb7..054afe5 100644
--- 
a/lang/java/reef-applications/reef-vortex/src/main/java/org/apache/reef/vortex/driver/VortexMasterConf.java
+++ 
b/lang/java/reef-applications/reef-vortex/src/main/java/org/apache/reef/vortex/driver/VortexMasterConf.java
@@ -22,10 +22,7 @@ import org.apache.reef.annotations.Unstable;
 import org.apache.reef.annotations.audience.DriverSide;
 import org.apache.reef.tang.annotations.Name;
 import org.apache.reef.tang.annotations.NamedParameter;
-import org.apache.reef.tang.formats.ConfigurationModule;
-import org.apache.reef.tang.formats.ConfigurationModuleBuilder;
-import org.apache.reef.tang.formats.RequiredImpl;
-import org.apache.reef.tang.formats.RequiredParameter;
+import org.apache.reef.tang.formats.*;
 import org.apache.reef.vortex.api.VortexStart;
 
 /**
@@ -56,6 +53,13 @@ public final class VortexMasterConf extends 
ConfigurationModuleBuilder {
   }
 
   /**
+   * Worker Capacity.
+   */
+  @NamedParameter(doc = "Worker Capacity")
+  final class WorkerCapacity implements Name<Integer> {
+  }
+
+  /**
    * Number of Vortex Start Threads.
    */
   @NamedParameter(doc = "Number of Vortex Start Threads")
@@ -78,6 +82,11 @@ public final class VortexMasterConf extends 
ConfigurationModuleBuilder {
   public static final RequiredParameter<Integer> WORKER_CORES = new 
RequiredParameter<>();
 
   /**
+   * Worker Cores.
+   */
+  public static final OptionalParameter<Integer> WORKER_CAPACITY = new 
OptionalParameter<>();
+
+  /**
    * Vortex Start.
    */
   public static final RequiredImpl<VortexStart> VORTEX_START = new 
RequiredImpl<>();
@@ -94,6 +103,7 @@ public final class VortexMasterConf extends 
ConfigurationModuleBuilder {
       .bindNamedParameter(WorkerNum.class, WORKER_NUM)
       .bindNamedParameter(WorkerMem.class, WORKER_MEM)
       .bindNamedParameter(WorkerCores.class, WORKER_CORES)
+      .bindNamedParameter(WorkerCapacity.class, WORKER_CAPACITY)
       .bindImplementation(VortexStart.class, VORTEX_START)
       .bindNamedParameter(NumberOfVortexStartThreads.class, 
NUM_OF_VORTEX_START_THERAD)
       .build();

http://git-wip-us.apache.org/repos/asf/incubator-reef/blob/073f18e4/lang/java/reef-applications/reef-vortex/src/main/java/org/apache/reef/vortex/driver/VortexWorkerManager.java
----------------------------------------------------------------------
diff --git 
a/lang/java/reef-applications/reef-vortex/src/main/java/org/apache/reef/vortex/driver/VortexWorkerManager.java
 
b/lang/java/reef-applications/reef-vortex/src/main/java/org/apache/reef/vortex/driver/VortexWorkerManager.java
index 5aa4759..3b11ad6 100644
--- 
a/lang/java/reef-applications/reef-vortex/src/main/java/org/apache/reef/vortex/driver/VortexWorkerManager.java
+++ 
b/lang/java/reef-applications/reef-vortex/src/main/java/org/apache/reef/vortex/driver/VortexWorkerManager.java
@@ -18,24 +18,24 @@
  */
 package org.apache.reef.vortex.driver;
 
-import net.jcip.annotations.ThreadSafe;
+import net.jcip.annotations.NotThreadSafe;
 import org.apache.reef.annotations.audience.DriverSide;
 import org.apache.reef.driver.task.RunningTask;
 import org.apache.reef.vortex.common.TaskletExecutionRequest;
 
 import java.io.Serializable;
 import java.util.Collection;
-import java.util.concurrent.ConcurrentHashMap;
+import java.util.HashMap;
 
 /**
  * Representation of a VortexWorkerManager in Driver.
  */
-@ThreadSafe
+@NotThreadSafe
 @DriverSide
 class VortexWorkerManager {
   private final VortexRequestor vortexRequestor;
   private final RunningTask reefTask;
-  private final ConcurrentHashMap<Integer, Tasklet> runningTasklets = new 
ConcurrentHashMap<>();
+  private final HashMap<Integer, Tasklet> runningTasklets = new HashMap<>();
 
   VortexWorkerManager(final VortexRequestor vortexRequestor, final RunningTask 
reefTask) {
     this.vortexRequestor = vortexRequestor;
@@ -51,23 +51,22 @@ class VortexWorkerManager {
     vortexRequestor.send(reefTask, taskletExecutionRequest);
   }
 
-  <TOutput extends Serializable>
-      void taskletCompleted(final Integer taskletId, final TOutput result) {
+  <TOutput extends Serializable> Tasklet taskletCompleted(final Integer 
taskletId, final TOutput result) {
     final Tasklet<?, TOutput> tasklet = runningTasklets.remove(taskletId);
-    if (tasklet != null) { // Tasklet should complete/error only once
-      tasklet.completed(result);
-    }
+    assert(tasklet != null); // Tasklet should complete/error only once
+    tasklet.completed(result);
+    return tasklet;
   }
 
-  void taskletThrewException(final Integer taskletId, final Exception 
exception) {
+  Tasklet taskletThrewException(final Integer taskletId, final Exception 
exception) {
     final Tasklet tasklet = runningTasklets.remove(taskletId);
-    if (tasklet != null) { // Tasklet should complete/error only once
-      tasklet.threwException(exception);
-    }
+    assert(tasklet != null); // Tasklet should complete/error only once
+    tasklet.threwException(exception);
+    return tasklet;
   }
 
   Collection<Tasklet> removed() {
-    return runningTasklets.values();
+    return runningTasklets.isEmpty() ? null : runningTasklets.values();
   }
 
   void terminate() {

http://git-wip-us.apache.org/repos/asf/incubator-reef/blob/073f18e4/lang/java/reef-applications/reef-vortex/src/main/java/org/apache/reef/vortex/examples/addone/AddOne.java
----------------------------------------------------------------------
diff --git 
a/lang/java/reef-applications/reef-vortex/src/main/java/org/apache/reef/vortex/examples/addone/AddOne.java
 
b/lang/java/reef-applications/reef-vortex/src/main/java/org/apache/reef/vortex/examples/addone/AddOne.java
index 4e27ba9..25573fc 100644
--- 
a/lang/java/reef-applications/reef-vortex/src/main/java/org/apache/reef/vortex/examples/addone/AddOne.java
+++ 
b/lang/java/reef-applications/reef-vortex/src/main/java/org/apache/reef/vortex/examples/addone/AddOne.java
@@ -31,6 +31,6 @@ final class AddOne {
    * Launch the vortex job, passing appropriate arguments.
    */
   public static void main(final String[] args) {
-    VortexLauncher.launchLocal("Vortex_Example_AddOne", AddOneStart.class, 2, 
1024, 4);
+    VortexLauncher.launchLocal("Vortex_Example_AddOne", AddOneStart.class, 2, 
1024, 4, 2000);
   }
 }

http://git-wip-us.apache.org/repos/asf/incubator-reef/blob/073f18e4/lang/java/reef-applications/reef-vortex/src/main/java/org/apache/reef/vortex/examples/hello/HelloVortex.java
----------------------------------------------------------------------
diff --git 
a/lang/java/reef-applications/reef-vortex/src/main/java/org/apache/reef/vortex/examples/hello/HelloVortex.java
 
b/lang/java/reef-applications/reef-vortex/src/main/java/org/apache/reef/vortex/examples/hello/HelloVortex.java
index 565bce2..c4e3b6f 100644
--- 
a/lang/java/reef-applications/reef-vortex/src/main/java/org/apache/reef/vortex/examples/hello/HelloVortex.java
+++ 
b/lang/java/reef-applications/reef-vortex/src/main/java/org/apache/reef/vortex/examples/hello/HelloVortex.java
@@ -31,6 +31,6 @@ final class HelloVortex {
    * Launch the vortex job, passing appropriate arguments.
    */
   public static void main(final String[] args) {
-    VortexLauncher.launchLocal("Vortex_Example_HelloVortex", 
HelloVortexStart.class, 1, 1024, 1);
+    VortexLauncher.launchLocal("Vortex_Example_HelloVortex", 
HelloVortexStart.class, 1, 1024, 1, 2000);
   }
 }

http://git-wip-us.apache.org/repos/asf/incubator-reef/blob/073f18e4/lang/java/reef-applications/reef-vortex/src/test/java/org/apache/reef/vortex/driver/DefaultVortexMasterTest.java
----------------------------------------------------------------------
diff --git 
a/lang/java/reef-applications/reef-vortex/src/test/java/org/apache/reef/vortex/driver/DefaultVortexMasterTest.java
 
b/lang/java/reef-applications/reef-vortex/src/test/java/org/apache/reef/vortex/driver/DefaultVortexMasterTest.java
index f2c2f92..c5c0b30 100644
--- 
a/lang/java/reef-applications/reef-vortex/src/test/java/org/apache/reef/vortex/driver/DefaultVortexMasterTest.java
+++ 
b/lang/java/reef-applications/reef-vortex/src/test/java/org/apache/reef/vortex/driver/DefaultVortexMasterTest.java
@@ -41,13 +41,13 @@ public class DefaultVortexMasterTest {
   public void testSingleTaskletNoFailure() throws Exception {
     final VortexFunction vortexFunction = testUtil.newFunction();
     final VortexWorkerManager vortexWorkerManager1 = testUtil.newWorker();
-    final RunningWorkers runningWorkers = new RunningWorkers();
+    final RunningWorkers runningWorkers = new RunningWorkers(new 
RandomSchedulingPolicy());
     final PendingTasklets pendingTasklets = new PendingTasklets();
     final DefaultVortexMaster vortexMaster = new 
DefaultVortexMaster(runningWorkers, pendingTasklets);
 
     vortexMaster.workerAllocated(vortexWorkerManager1);
     final VortexFuture future = vortexMaster.enqueueTasklet(vortexFunction, 
null);
-    final ArrayList<Integer> taskletIds = scheduleTasklets(runningWorkers, 
pendingTasklets, 1);
+    final ArrayList<Integer> taskletIds = launchTasklets(runningWorkers, 
pendingTasklets, 1);
     for (final int taskletId : taskletIds) {
       vortexMaster.taskletCompleted(vortexWorkerManager1.getId(), taskletId, 
null);
     }
@@ -63,14 +63,14 @@ public class DefaultVortexMasterTest {
     final VortexFunction vortexFunction = testUtil.newFunction();
     final VortexWorkerManager vortexWorkerManager1 = testUtil.newWorker();
     final VortexWorkerManager vortexWorkerManager2 = testUtil.newWorker();
-    final RunningWorkers runningWorkers = new RunningWorkers();
+    final RunningWorkers runningWorkers = new RunningWorkers(new 
RandomSchedulingPolicy());
     final PendingTasklets pendingTasklets = new PendingTasklets();
     final DefaultVortexMaster vortexMaster = new 
DefaultVortexMaster(runningWorkers, pendingTasklets);
 
     // Allocate worker & tasklet and schedule
     vortexMaster.workerAllocated(vortexWorkerManager1);
     final VortexFuture future = vortexMaster.enqueueTasklet(vortexFunction, 
null);
-    final ArrayList<Integer> taskletIds1 = scheduleTasklets(runningWorkers, 
pendingTasklets, 1);
+    final ArrayList<Integer> taskletIds1 = launchTasklets(runningWorkers, 
pendingTasklets, 1);
 
     // Preemption!
     vortexMaster.workerPreempted(vortexWorkerManager1.getId());
@@ -78,7 +78,7 @@ public class DefaultVortexMasterTest {
 
     // New resource allocation and scheduling
     vortexMaster.workerAllocated(vortexWorkerManager2);
-    final ArrayList<Integer> taskletIds2 = scheduleTasklets(runningWorkers, 
pendingTasklets, 1);
+    final ArrayList<Integer> taskletIds2 = launchTasklets(runningWorkers, 
pendingTasklets, 1);
     assertEquals("Both lists need to contain the same single tasklet id", 
taskletIds1, taskletIds2);
 
     // Completed?
@@ -95,13 +95,13 @@ public class DefaultVortexMasterTest {
   public void testMultipleTaskletsFailure() throws Exception {
     // The tasklets that need to be executed
     final ArrayList<VortexFuture> vortexFutures = new ArrayList<>();
-    final RunningWorkers runningWorkers = new RunningWorkers();
+    final RunningWorkers runningWorkers = new RunningWorkers(new 
RandomSchedulingPolicy());
     final PendingTasklets pendingTasklets = new PendingTasklets();
     final DefaultVortexMaster vortexMaster = new 
DefaultVortexMaster(runningWorkers, pendingTasklets);
 
     // Allocate iniital evaluators (will all be preempted later...)
     final List<VortexWorkerManager> initialWorkers = new ArrayList<>();
-    final int numOfWorkers = 10;
+    final int numOfWorkers = 100;
     for (int i = 0; i < numOfWorkers; i++) {
       final VortexWorkerManager vortexWorkerManager = testUtil.newWorker();
       initialWorkers.add(vortexWorkerManager);
@@ -113,7 +113,7 @@ public class DefaultVortexMasterTest {
     for (int i = 0; i < numOfTasklets; i++) {
       vortexFutures.add(vortexMaster.enqueueTasklet(testUtil.newFunction(), 
null));
     }
-    final ArrayList<Integer> taskletIds1 = scheduleTasklets(runningWorkers, 
pendingTasklets, numOfTasklets);
+    final ArrayList<Integer> taskletIds1 = launchTasklets(runningWorkers, 
pendingTasklets, numOfTasklets);
 
     // Preempt all evaluators
     for (int i = 0; i < numOfWorkers; i++) {
@@ -124,7 +124,7 @@ public class DefaultVortexMasterTest {
     for (int i = 0; i < numOfWorkers; i++) {
       vortexMaster.workerAllocated(testUtil.newWorker());
     }
-    final ArrayList<Integer> taskletIds2 = scheduleTasklets(runningWorkers, 
pendingTasklets, numOfTasklets);
+    final ArrayList<Integer> taskletIds2 = launchTasklets(runningWorkers, 
pendingTasklets, numOfTasklets);
     assertEquals("Must contain same tasklet ids", new HashSet<>(taskletIds1), 
new HashSet<>(taskletIds2));
 
     // Completed?
@@ -139,19 +139,18 @@ public class DefaultVortexMasterTest {
   }
 
   /**
-   * Schedule specified number of tasklets.
-   * @return ids of scheduled tasklets
+   * Launch specified number of tasklets as a substitute for 
PendingTaskletLauncher.
+   * @return ids of launched tasklets
    */
-  private ArrayList<Integer> scheduleTasklets(final RunningWorkers 
runningWorkers,
-                                              final PendingTasklets 
pendingTasklets,
-                                              final int numOfTasklets) throws 
InterruptedException {
+  private ArrayList<Integer> launchTasklets(final RunningWorkers 
runningWorkers,
+                                            final PendingTasklets 
pendingTasklets,
+                                            final int numOfTasklets) throws 
InterruptedException {
     final ArrayList<Integer> taskletIds = new ArrayList<>();
     for (int i = 0; i < numOfTasklets; i++) {
       final Tasklet tasklet = pendingTasklets.takeFirst(); // blocks when no 
tasklet exists
       assertNotNull("Tasklet should exist in the pending queue", tasklet);
-
+      runningWorkers.launchTasklet(tasklet); // blocks when no resource exists
       taskletIds.add(tasklet.getId());
-      runningWorkers.launchTasklet(tasklet); // blocks when no worker exists
     }
     return taskletIds;
   }

http://git-wip-us.apache.org/repos/asf/incubator-reef/blob/073f18e4/lang/java/reef-applications/reef-vortex/src/test/java/org/apache/reef/vortex/driver/RunningWorkersTest.java
----------------------------------------------------------------------
diff --git 
a/lang/java/reef-applications/reef-vortex/src/test/java/org/apache/reef/vortex/driver/RunningWorkersTest.java
 
b/lang/java/reef-applications/reef-vortex/src/test/java/org/apache/reef/vortex/driver/RunningWorkersTest.java
index 03c671e..7cd0847 100644
--- 
a/lang/java/reef-applications/reef-vortex/src/test/java/org/apache/reef/vortex/driver/RunningWorkersTest.java
+++ 
b/lang/java/reef-applications/reef-vortex/src/test/java/org/apache/reef/vortex/driver/RunningWorkersTest.java
@@ -30,7 +30,7 @@ import static org.junit.Assert.assertTrue;
  * Test Possible Race Conditions.
  */
 public class RunningWorkersTest {
-  private final RunningWorkers runningWorkers = new RunningWorkers();
+  private final RunningWorkers runningWorkers = new RunningWorkers(new 
RandomSchedulingPolicy());
   private final TestUtil testUtil = new TestUtil();
 
   /**
@@ -40,7 +40,7 @@ public class RunningWorkersTest {
   @Test
   public void removeExecutorAndAddExecutor() throws Exception {
     final VortexWorkerManager vortexWorkerManager = testUtil.newWorker();
-    assertEquals("Must be no running tasklets", 0, 
runningWorkers.removeWorker(vortexWorkerManager.getId()).size());
+    assertFalse("Must be no running tasklets", 
runningWorkers.removeWorker(vortexWorkerManager.getId()).isPresent());
     runningWorkers.addWorker(vortexWorkerManager);
     assertFalse("Executor should not be running", 
runningWorkers.isWorkerRunning(vortexWorkerManager.getId()));
   }
@@ -55,7 +55,7 @@ public class RunningWorkersTest {
     final Tasklet tasklet = testUtil.newTasklet();
     runningWorkers.addWorker(vortexWorkerManager);
     runningWorkers.launchTasklet(tasklet); // blocks when no worker exists
-    final Collection<Tasklet> tasklets = 
runningWorkers.removeWorker(vortexWorkerManager.getId());
+    final Collection<Tasklet> tasklets = 
runningWorkers.removeWorker(vortexWorkerManager.getId()).get();
     assertEquals("Only 1 Tasklet must have been running", 1, tasklets.size());
     assertTrue("This Tasklet must have been running", 
tasklets.contains(tasklet));
     runningWorkers.completeTasklet(vortexWorkerManager.getId(), 
tasklet.getId(), null);

http://git-wip-us.apache.org/repos/asf/incubator-reef/blob/073f18e4/lang/java/reef-applications/reef-vortex/src/test/java/org/apache/reef/vortex/driver/SchedulingPolicyTest.java
----------------------------------------------------------------------
diff --git 
a/lang/java/reef-applications/reef-vortex/src/test/java/org/apache/reef/vortex/driver/SchedulingPolicyTest.java
 
b/lang/java/reef-applications/reef-vortex/src/test/java/org/apache/reef/vortex/driver/SchedulingPolicyTest.java
new file mode 100644
index 0000000..fea4235
--- /dev/null
+++ 
b/lang/java/reef-applications/reef-vortex/src/test/java/org/apache/reef/vortex/driver/SchedulingPolicyTest.java
@@ -0,0 +1,120 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *   http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied.  See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+package org.apache.reef.vortex.driver;
+
+import org.junit.Test;
+
+import java.util.ArrayDeque;
+import java.util.Deque;
+
+import static org.junit.Assert.*;
+
+/**
+ * Test SchedulingPolicy.
+ */
+public class SchedulingPolicyTest {
+  private final TestUtil testUtil = new TestUtil();
+
+  /**
+   * Test common traits of different scheduling policies.
+   */
+  @Test
+  public void testCommon() throws Exception {
+    commonPolicyTests(new RandomSchedulingPolicy());
+    commonPolicyTests(new FirstFitSchedulingPolicy(10));
+  }
+
+  /**
+   * Test FirstFitSchedulingPolicy without preemption events.
+   */
+  @Test
+  public void testFirstFitNoPreemption() throws Exception {
+    final int workerCapacity = 1;
+    final int numOfWorkers = 5;
+    final FirstFitSchedulingPolicy policy = new 
FirstFitSchedulingPolicy(workerCapacity);
+
+    // Add workers
+    final Deque<VortexWorkerManager> workers = new ArrayDeque<>();
+    for (int i = 0; i < numOfWorkers; i++) {
+      final VortexWorkerManager worker = testUtil.newWorker();
+      workers.addFirst(worker);
+      policy.workerAdded(worker);
+    }
+
+    // Launch 1 tasklet per worker
+    for (final VortexWorkerManager worker : workers) {
+      final Tasklet tasklet = testUtil.newTasklet();
+      assertEquals("This should be the first fit", worker.getId(), 
policy.trySchedule(tasklet).get());
+      policy.taskletLaunched(worker, tasklet);
+    }
+
+    // When all workers are full...
+    assertFalse("All workers should be full", 
policy.trySchedule(testUtil.newTasklet()).isPresent());
+  }
+
+  /**
+   * Test FirstFitSchedulingPolicy with preemption events.
+   */
+  @Test
+  public void testFirstFitPreemptions() throws Exception {
+    final int workerCapacity = 1;
+    final int numOfWorkers = 10;
+    final FirstFitSchedulingPolicy policy = new 
FirstFitSchedulingPolicy(workerCapacity);
+
+    // Add workers and make the odd ones full
+    final ArrayDeque<VortexWorkerManager> evenWorkers = new ArrayDeque<>();
+    for (int i = 0; i < numOfWorkers; i++) {
+      final VortexWorkerManager worker = testUtil.newWorker();
+      policy.workerAdded(worker);
+
+      if (i % 2 == 1) {
+        policy.taskletLaunched(worker, testUtil.newTasklet());
+      } else {
+        evenWorkers.addFirst(worker);
+      }
+    }
+
+    // Check whether the policy returns even ones in order
+    for (final VortexWorkerManager worker : evenWorkers) {
+      final Tasklet tasklet = testUtil.newTasklet();
+      assertEquals("This should be the first fit", worker.getId(), 
policy.trySchedule(tasklet).get());
+      policy.taskletLaunched(worker, tasklet);
+    }
+
+    // When all workers are full...
+    assertFalse("All workers should be full", 
policy.trySchedule(testUtil.newTasklet()).isPresent());
+  }
+
+  /**
+   * Simple common tests.
+   */
+  private void commonPolicyTests(final SchedulingPolicy policy) {
+    // Initial state
+    assertFalse("No worker added yet", 
policy.trySchedule(testUtil.newTasklet()).isPresent());
+
+    // One worker added
+    final VortexWorkerManager worker = testUtil.newWorker();
+    policy.workerAdded(worker);
+    assertEquals("Only one worker exists", worker.getId(), 
policy.trySchedule(testUtil.newTasklet()).get());
+
+    // One worker removed
+    policy.workerRemoved(worker);
+    assertFalse("No worker exists", 
policy.trySchedule(testUtil.newTasklet()).isPresent());
+  }
+}

http://git-wip-us.apache.org/repos/asf/incubator-reef/blob/073f18e4/lang/java/reef-tests/src/test/java/org/apache/reef/tests/applications/vortex/addone/AddOneTest.java
----------------------------------------------------------------------
diff --git 
a/lang/java/reef-tests/src/test/java/org/apache/reef/tests/applications/vortex/addone/AddOneTest.java
 
b/lang/java/reef-tests/src/test/java/org/apache/reef/tests/applications/vortex/addone/AddOneTest.java
index f935298..1fc2d7a 100644
--- 
a/lang/java/reef-tests/src/test/java/org/apache/reef/tests/applications/vortex/addone/AddOneTest.java
+++ 
b/lang/java/reef-tests/src/test/java/org/apache/reef/tests/applications/vortex/addone/AddOneTest.java
@@ -56,7 +56,7 @@ public final class AddOneTest {
   @Test
   public void testVortexAddOne() {
     final Configuration conf =
-        VortexConfHelper.getVortexConf("TEST_Vortex_AddOneTest", 
AddOneTestStart.class, 2, 1024, 4);
+        VortexConfHelper.getVortexConf("TEST_Vortex_AddOneTest", 
AddOneTestStart.class, 2, 1024, 4, 2000);
     final LauncherStatus status = this.testEnvironment.run(conf);
     Assert.assertTrue("Job state after execution: " + status, 
status.isSuccess());
   }

Reply via email to