sanha closed pull request #11: [NEMO-42] Make SchedulerRunner reactive, Ensure 
reverse-topological ordering in scheduling
URL: https://github.com/apache/incubator-nemo/pull/11
 
 
   

This is a PR merged from a forked repository.
As GitHub hides the original diff on merge, it is displayed below for
the sake of provenance:

As this is a foreign pull request (from a fork), the diff is supplied
below (as it won't show otherwise due to GitHub magic):

diff --git a/client/src/main/java/edu/snu/nemo/client/JobLauncher.java 
b/client/src/main/java/edu/snu/nemo/client/JobLauncher.java
index 916c9e81..4440a869 100644
--- a/client/src/main/java/edu/snu/nemo/client/JobLauncher.java
+++ b/client/src/main/java/edu/snu/nemo/client/JobLauncher.java
@@ -213,7 +213,6 @@ public static Configuration getJobConf(final String[] args) 
throws IOException,
     cl.registerShortNameOfClass(JobConf.ExecutorJsonPath.class);
     cl.registerShortNameOfClass(JobConf.JVMHeapSlack.class);
     cl.registerShortNameOfClass(JobConf.IORequestHandleThreadsTotal.class);
-    cl.registerShortNameOfClass(JobConf.SchedulerTimeoutMs.class);
     cl.registerShortNameOfClass(JobConf.MaxScheduleAttempt.class);
     cl.registerShortNameOfClass(JobConf.FileDirectory.class);
     cl.registerShortNameOfClass(JobConf.GlusterVolumeDirectory.class);
diff --git a/conf/src/main/java/edu/snu/nemo/conf/JobConf.java 
b/conf/src/main/java/edu/snu/nemo/conf/JobConf.java
index 255e8bbe..9bfc8700 100644
--- a/conf/src/main/java/edu/snu/nemo/conf/JobConf.java
+++ b/conf/src/main/java/edu/snu/nemo/conf/JobConf.java
@@ -141,13 +141,6 @@
   public final class IORequestHandleThreadsTotal implements Name<Integer> {
   }
 
-  /**
-   * Scheduler timeout in ms.
-   */
-  @NamedParameter(doc = "Scheduler timeout in ms", short_name = 
"scheduler_timeout_ms", default_value = "50")
-  public final class SchedulerTimeoutMs implements Name<Integer> {
-  }
-
   /**
    * Max number of attempts for task group scheduling.
    */
diff --git a/examples/resources/sample_executor_resources.json 
b/examples/resources/sample_executor_resources.json
index 5f5b2e0c..5765bf39 100644
--- a/examples/resources/sample_executor_resources.json
+++ b/examples/resources/sample_executor_resources.json
@@ -2,16 +2,16 @@
   {
     "type": "Transient",
     "memory_mb": 512,
-    "capacity": 1
+    "capacity": 5
   },
   {
     "type": "Reserved",
     "memory_mb": 512,
-    "capacity": 1
+    "capacity": 5
   },
   {
     "type": "Compute",
     "memory_mb": 512,
-    "capacity": 1
+    "capacity": 5
   }
 ]
diff --git 
a/runtime/executor/src/main/java/edu/snu/nemo/runtime/executor/MetricManagerWorker.java
 
b/runtime/executor/src/main/java/edu/snu/nemo/runtime/executor/MetricManagerWorker.java
index 7bcb2bc3..a6c0bfd8 100644
--- 
a/runtime/executor/src/main/java/edu/snu/nemo/runtime/executor/MetricManagerWorker.java
+++ 
b/runtime/executor/src/main/java/edu/snu/nemo/runtime/executor/MetricManagerWorker.java
@@ -53,7 +53,7 @@ private 
MetricManagerWorker(@Parameter(MetricFlushPeriod.class) final long flush
                                                       flushingPeriod, 
TimeUnit.MILLISECONDS);
   }
 
-  private void flushMetricMessageQueueToMaster() {
+  private synchronized void flushMetricMessageQueueToMaster() {
     if (!metricMessageQueue.isEmpty()) {
       // Build batched metric messages
       int size = metricMessageQueue.size();
diff --git 
a/runtime/master/src/main/java/edu/snu/nemo/runtime/master/BlockManagerMaster.java
 
b/runtime/master/src/main/java/edu/snu/nemo/runtime/master/BlockManagerMaster.java
index 4dc2e4c5..7afb373a 100644
--- 
a/runtime/master/src/main/java/edu/snu/nemo/runtime/master/BlockManagerMaster.java
+++ 
b/runtime/master/src/main/java/edu/snu/nemo/runtime/master/BlockManagerMaster.java
@@ -180,7 +180,7 @@ public BlockLocationRequestHandler 
getBlockLocationHandler(final String blockId)
   /**
    * To be called when a potential producer task group is scheduled.
    * To be precise, it is called when the task group is enqueued to
-   * {@link edu.snu.nemo.runtime.master.scheduler.PendingTaskGroupQueue}.
+   * {@link edu.snu.nemo.runtime.master.scheduler.PendingTaskGroupCollection}.
    *
    * @param scheduledTaskGroupId the ID of the scheduled task group.
    */
diff --git 
a/runtime/master/src/main/java/edu/snu/nemo/runtime/master/scheduler/BatchSingleJobScheduler.java
 
b/runtime/master/src/main/java/edu/snu/nemo/runtime/master/scheduler/BatchSingleJobScheduler.java
index f12293f0..d579b4da 100644
--- 
a/runtime/master/src/main/java/edu/snu/nemo/runtime/master/scheduler/BatchSingleJobScheduler.java
+++ 
b/runtime/master/src/main/java/edu/snu/nemo/runtime/master/scheduler/BatchSingleJobScheduler.java
@@ -62,7 +62,7 @@
    */
   private final SchedulingPolicy schedulingPolicy;
   private final SchedulerRunner schedulerRunner;
-  private final PendingTaskGroupQueue pendingTaskGroupQueue;
+  private final PendingTaskGroupCollection pendingTaskGroupCollection;
 
   /**
    * Other necessary components of this {@link 
edu.snu.nemo.runtime.master.RuntimeMaster}.
@@ -80,13 +80,13 @@
   @Inject
   public BatchSingleJobScheduler(final SchedulingPolicy schedulingPolicy,
                                  final SchedulerRunner schedulerRunner,
-                                 final PendingTaskGroupQueue 
pendingTaskGroupQueue,
+                                 final PendingTaskGroupCollection 
pendingTaskGroupCollection,
                                  final BlockManagerMaster blockManagerMaster,
                                  final PubSubEventHandlerWrapper 
pubSubEventHandlerWrapper,
                                  final UpdatePhysicalPlanEventHandler 
updatePhysicalPlanEventHandler) {
     this.schedulingPolicy = schedulingPolicy;
     this.schedulerRunner = schedulerRunner;
-    this.pendingTaskGroupQueue = pendingTaskGroupQueue;
+    this.pendingTaskGroupCollection = pendingTaskGroupCollection;
     this.blockManagerMaster = blockManagerMaster;
     this.pubSubEventHandlerWrapper = pubSubEventHandlerWrapper;
     updatePhysicalPlanEventHandler.setScheduler(this);
@@ -107,7 +107,7 @@ public void scheduleJob(final PhysicalPlan jobToSchedule, 
final JobStateManager
     this.jobStateManager = scheduledJobStateManager;
 
     schedulerRunner.scheduleJob(scheduledJobStateManager);
-    pendingTaskGroupQueue.onJobScheduled(physicalPlan);
+    pendingTaskGroupCollection.onJobScheduled(physicalPlan);
 
     LOG.info("Job to schedule: {}", jobToSchedule.getId());
 
@@ -167,6 +167,7 @@ public void onTaskGroupStateChanged(final String 
executorId, final String taskGr
   @Override
   public void onExecutorAdded(final ExecutorRepresenter executorRepresenter) {
     schedulingPolicy.onExecutorAdded(executorRepresenter);
+    schedulerRunner.onAnExecutorAvailable();
   }
 
   @Override
@@ -195,7 +196,7 @@ public void onExecutorRemoved(final String executorId) {
   @Override
   public void terminate() {
     this.schedulerRunner.terminate();
-    this.pendingTaskGroupQueue.close();
+    this.pendingTaskGroupCollection.close();
   }
 
   /**
@@ -229,7 +230,7 @@ private void scheduleNextStage(final String 
completedStageId) {
   }
 
   /**
-   * Selects the list of stages to schedule, in the order they must be added 
to {@link PendingTaskGroupQueue}.
+   * Selects the list of stages to schedule, in the order they must be added 
to {@link PendingTaskGroupCollection}.
    *
    * This is a recursive function that decides which schedule group to 
schedule upon a stage completion, or a failure.
    * It takes the currentScheduleGroupIndex as a reference point to begin 
looking for the stages to execute:
@@ -245,7 +246,7 @@ private void scheduleNextStage(final String 
completedStageId) {
    * @param currentScheduleGroupIndex
    *      the index of the schedule group that is executing/has executed when 
this method is called.
    * @return an optional of the (possibly empty) list of next schedulable 
stages, in the order they should be
-   * enqueued to {@link PendingTaskGroupQueue}.
+   * enqueued to {@link PendingTaskGroupCollection}.
    */
   private Optional<List<PhysicalStage>> selectNextStagesToSchedule(final int 
currentScheduleGroupIndex) {
     if (currentScheduleGroupIndex > initialScheduleGroup) {
@@ -376,10 +377,11 @@ private void scheduleStage(final PhysicalStage 
stageToSchedule) {
       blockManagerMaster.onProducerTaskGroupScheduled(taskGroupId);
       final int taskGroupIdx = 
RuntimeIdGenerator.getIndexFromTaskGroupId(taskGroupId);
       LOG.debug("Enquing {}", taskGroupId);
-      pendingTaskGroupQueue.enqueue(new 
ScheduledTaskGroup(physicalPlan.getId(),
+      pendingTaskGroupCollection.add(new 
ScheduledTaskGroup(physicalPlan.getId(),
           stageToSchedule.getSerializedTaskGroupDag(), taskGroupId, 
stageIncomingEdges, stageOutgoingEdges, attemptIdx,
           stageToSchedule.getContainerType(), 
logicalTaskIdToReadables.get(taskGroupIdx)));
     });
+    schedulerRunner.onATaskGroupAvailable();
   }
 
   /**
@@ -438,6 +440,7 @@ private void onTaskGroupExecutionComplete(final String 
executorId,
         scheduleNextStage(stageIdForTaskGroupUponCompletion);
       }
     }
+    schedulerRunner.onAnExecutorAvailable();
   }
 
   /**
@@ -475,6 +478,7 @@ private void onTaskGroupExecutionOnHold(final String 
executorId,
     } else {
       onTaskGroupExecutionComplete(executorId, taskGroupId, true);
     }
+    schedulerRunner.onAnExecutorAvailable();
   }
 
   /**
@@ -504,7 +508,7 @@ private void onTaskGroupExecutionFailedRecoverable(final 
String executorId, fina
           for (final PhysicalStage stage : 
physicalPlan.getStageDAG().getTopologicalSort()) {
             if (stage.getId().equals(stageId)) {
               LOG.info("Removing TaskGroups for {} before they are scheduled 
to an executor", stage.getId());
-              
pendingTaskGroupQueue.removeTaskGroupsAndDescendants(stage.getId());
+              
pendingTaskGroupCollection.removeTaskGroupsAndDescendants(stage.getId());
               stage.getTaskGroupIds().forEach(dstTaskGroupId -> {
                 if 
(jobStateManager.getTaskGroupState(dstTaskGroupId).getStateMachine().getCurrentState()
                     != TaskGroupState.State.COMPLETE) {
@@ -542,5 +546,6 @@ private void onTaskGroupExecutionFailedRecoverable(final 
String executorId, fina
       default:
         throw new UnknownFailureCauseException(new Throwable("Unknown cause: " 
+ failureCause));
     }
+    schedulerRunner.onAnExecutorAvailable();
   }
 }
diff --git 
a/runtime/master/src/main/java/edu/snu/nemo/runtime/master/scheduler/PendingTaskGroupQueue.java
 
b/runtime/master/src/main/java/edu/snu/nemo/runtime/master/scheduler/PendingTaskGroupCollection.java
similarity index 65%
rename from 
runtime/master/src/main/java/edu/snu/nemo/runtime/master/scheduler/PendingTaskGroupQueue.java
rename to 
runtime/master/src/main/java/edu/snu/nemo/runtime/master/scheduler/PendingTaskGroupCollection.java
index 5b59de41..db3a0841 100644
--- 
a/runtime/master/src/main/java/edu/snu/nemo/runtime/master/scheduler/PendingTaskGroupQueue.java
+++ 
b/runtime/master/src/main/java/edu/snu/nemo/runtime/master/scheduler/PendingTaskGroupCollection.java
@@ -22,6 +22,8 @@
 import org.apache.reef.annotations.audience.DriverSide;
 import org.apache.reef.tang.annotations.DefaultImplementation;
 
+import java.util.Collection;
+import java.util.NoSuchElementException;
 import java.util.Optional;
 
 /**
@@ -31,21 +33,30 @@
  */
 @ThreadSafe
 @DriverSide
-@DefaultImplementation(SingleJobTaskGroupQueue.class)
-public interface PendingTaskGroupQueue {
+@DefaultImplementation(SingleJobTaskGroupCollection.class)
+public interface PendingTaskGroupCollection {
 
   /**
-   * Enqueues a TaskGroup to this PQ.
-   * @param scheduledTaskGroup to enqueue.
+   * Adds a TaskGroup to this collection.
+   * @param scheduledTaskGroup to add.
    */
-  void enqueue(final ScheduledTaskGroup scheduledTaskGroup);
+  void add(final ScheduledTaskGroup scheduledTaskGroup);
 
   /**
-   * Dequeues the next TaskGroup to be scheduled.
-   * @return an optional of the the next TaskGroup to be scheduled,
-   * an empty optional if no such TaskGroup exists.
+   * Removes the specified TaskGroup to be scheduled.
+   * @param taskGroupId id of the TaskGroup
+   * @return the specified TaskGroup
+   * @throws NoSuchElementException if the specified TaskGroup is not in the 
queue,
+   *                                or removing this TaskGroup breaks 
scheduling order
    */
-  Optional<ScheduledTaskGroup> dequeue();
+  ScheduledTaskGroup remove(final String taskGroupId) throws 
NoSuchElementException;
+
+  /**
+   * Peeks TaskGroups that can be scheduled according to job dependency 
priority.
+   * Changes to the queue must not reflected to the returned collection to 
avoid concurrent modification.
+   * @return TaskGroups that can be scheduled, or {@link Optional#empty()} if 
the queue is empty
+   */
+  Optional<Collection<ScheduledTaskGroup>> peekSchedulableTaskGroups();
 
   /**
    * Registers a job to this queue in case the queue needs to understand the 
topology of the job DAG.
diff --git 
a/runtime/master/src/main/java/edu/snu/nemo/runtime/master/scheduler/RoundRobinSchedulingPolicy.java
 
b/runtime/master/src/main/java/edu/snu/nemo/runtime/master/scheduler/RoundRobinSchedulingPolicy.java
index 0db67adc..fa5f647b 100644
--- 
a/runtime/master/src/main/java/edu/snu/nemo/runtime/master/scheduler/RoundRobinSchedulingPolicy.java
+++ 
b/runtime/master/src/main/java/edu/snu/nemo/runtime/master/scheduler/RoundRobinSchedulingPolicy.java
@@ -16,23 +16,16 @@
 package edu.snu.nemo.runtime.master.scheduler;
 
 import com.google.common.annotations.VisibleForTesting;
-import edu.snu.nemo.conf.JobConf;
-import edu.snu.nemo.common.exception.SchedulingException;
 import 
edu.snu.nemo.common.ir.vertex.executionproperty.ExecutorPlacementProperty;
 import edu.snu.nemo.runtime.common.plan.physical.ScheduledTaskGroup;
 import edu.snu.nemo.runtime.common.state.TaskGroupState;
 import edu.snu.nemo.runtime.master.JobStateManager;
 import edu.snu.nemo.runtime.master.resource.ExecutorRepresenter;
 import org.apache.reef.annotations.audience.DriverSide;
-import org.apache.reef.tang.annotations.Parameter;
 
 import javax.annotation.concurrent.ThreadSafe;
 import javax.inject.Inject;
 import java.util.*;
-import java.util.concurrent.TimeUnit;
-import java.util.concurrent.locks.Condition;
-import java.util.concurrent.locks.Lock;
-import java.util.concurrent.locks.ReentrantLock;
 import org.slf4j.Logger;
 import org.slf4j.LoggerFactory;
 import java.util.stream.Collectors;
@@ -49,22 +42,8 @@
 public final class RoundRobinSchedulingPolicy implements SchedulingPolicy {
   private static final Logger LOG = 
LoggerFactory.getLogger(RoundRobinSchedulingPolicy.class.getName());
 
-  private final int scheduleTimeoutMs;
-
   private final ExecutorRegistry executorRegistry;
 
-  /**
-   * Thread safety is provided by this lock as multiple threads can call the 
methods in this class concurrently.
-   */
-  private final Lock lock;
-
-  /**
-   * Executor allocation is achieved by putting conditions for each container 
type.
-   * The condition blocks when there is no executor of the container type 
available,
-   * and is released when such an executor becomes available (either by an 
extra executor, or a task group completion).
-   */
-  private final Map<String, Condition> conditionByContainerType;
-
   /**
    * The pool of executors available for each container type.
    */
@@ -78,129 +57,73 @@
 
   @Inject
   @VisibleForTesting
-  public RoundRobinSchedulingPolicy(final ExecutorRegistry executorRegistry,
-                                    
@Parameter(JobConf.SchedulerTimeoutMs.class) final int scheduleTimeoutMs) {
-    this.scheduleTimeoutMs = scheduleTimeoutMs;
+  public RoundRobinSchedulingPolicy(final ExecutorRegistry executorRegistry) {
     this.executorRegistry = executorRegistry;
-    this.lock = new ReentrantLock();
     this.executorIdByContainerType = new HashMap<>();
-    this.conditionByContainerType = new HashMap<>();
     this.nextExecutorIndexByContainerType = new HashMap<>();
     initializeContainerTypeIfAbsent(ExecutorPlacementProperty.NONE); // Need 
this to avoid potential null errors
   }
 
-  public long getScheduleTimeoutMs() {
-    return scheduleTimeoutMs;
-  }
-
   @Override
   public boolean scheduleTaskGroup(final ScheduledTaskGroup scheduledTaskGroup,
                                    final JobStateManager jobStateManager) {
-    lock.lock();
-    try {
-      final String containerType = scheduledTaskGroup.getContainerType();
-      initializeContainerTypeIfAbsent(containerType);
-
-      Optional<String> executorId = selectExecutorByRR(containerType);
-      if (!executorId.isPresent()) { // If there is no available executor to 
schedule this task group now,
-        // TODO #696 Sleep Time Per Container Type in Scheduling Policy
-        final boolean executorAvailable =
-            
conditionByContainerType.get(containerType).await(scheduleTimeoutMs, 
TimeUnit.MILLISECONDS);
-        if (executorAvailable) { // if an executor has become available before 
scheduleTimeoutMs,
-          executorId = selectExecutorByRR(containerType);
-          if (executorId.isPresent()) {
-            scheduleTaskGroup(selectExecutorByRR(containerType).get(), 
scheduledTaskGroup, jobStateManager);
-            return true;
-          } else {
-            throw new SchedulingException(new Throwable("An executor must be 
available at this point"));
-          }
-        }
-        return false;
-      } else {
-        scheduleTaskGroup(executorId.get(), scheduledTaskGroup, 
jobStateManager);
-        return true;
-      }
-    } catch (final Exception e) {
-      throw new SchedulingException(e);
-    } finally {
-      lock.unlock();
+    final String containerType = scheduledTaskGroup.getContainerType();
+    initializeContainerTypeIfAbsent(containerType);
+
+    Optional<String> executorId = selectExecutorByRR(containerType);
+    if (!executorId.isPresent()) { // If there is no available executor to 
schedule this task group now,
+      return false;
+    } else {
+      scheduleTaskGroup(executorId.get(), scheduledTaskGroup, jobStateManager);
+      return true;
     }
   }
 
   @Override
   public void onExecutorAdded(final ExecutorRepresenter executor) {
-    lock.lock();
-    try {
-      executorRegistry.registerRepresenter(executor);
-      final String containerType = executor.getContainerType();
-      initializeContainerTypeIfAbsent(containerType);
+    executorRegistry.registerRepresenter(executor);
+    final String containerType = executor.getContainerType();
+    initializeContainerTypeIfAbsent(containerType);
 
-      executorIdByContainerType.get(containerType)
-          .add(nextExecutorIndexByContainerType.get(containerType), 
executor.getExecutorId());
-      signalPossiblyWaitingScheduler(containerType);
-    } finally {
-      lock.unlock();
-    }
+    executorIdByContainerType.get(containerType)
+        .add(nextExecutorIndexByContainerType.get(containerType), 
executor.getExecutorId());
   }
 
   @Override
   public Set<String> onExecutorRemoved(final String executorId) {
-    lock.lock();
-    try {
-      executorRegistry.setRepresenterAsFailed(executorId);
-      final ExecutorRepresenter executor = 
executorRegistry.getFailedExecutorRepresenter(executorId);
-      executor.onExecutorFailed();
+    executorRegistry.setRepresenterAsFailed(executorId);
+    final ExecutorRepresenter executor = 
executorRegistry.getFailedExecutorRepresenter(executorId);
+    executor.onExecutorFailed();
 
-      final String containerType = executor.getContainerType();
+    final String containerType = executor.getContainerType();
 
-      final List<String> executorIdList = 
executorIdByContainerType.get(containerType);
-      int nextExecutorIndex = 
nextExecutorIndexByContainerType.get(containerType);
-
-      final int executorAssignmentLocation = 
executorIdList.indexOf(executorId);
-      if (executorAssignmentLocation < nextExecutorIndex) {
-        nextExecutorIndexByContainerType.put(containerType, nextExecutorIndex 
- 1);
-      } else if (executorAssignmentLocation == nextExecutorIndex) {
-        nextExecutorIndexByContainerType.put(containerType, 0);
-      }
-      executorIdList.remove(executorId);
+    final List<String> executorIdList = 
executorIdByContainerType.get(containerType);
+    int nextExecutorIndex = 
nextExecutorIndexByContainerType.get(containerType);
 
-      return Collections.unmodifiableSet(executor.getFailedTaskGroups());
-    } finally {
-      lock.unlock();
+    final int executorAssignmentLocation = executorIdList.indexOf(executorId);
+    if (executorAssignmentLocation < nextExecutorIndex) {
+      nextExecutorIndexByContainerType.put(containerType, nextExecutorIndex - 
1);
+    } else if (executorAssignmentLocation == nextExecutorIndex) {
+      nextExecutorIndexByContainerType.put(containerType, 0);
     }
+    executorIdList.remove(executorId);
+
+    return Collections.unmodifiableSet(executor.getFailedTaskGroups());
   }
 
   @Override
   public void onTaskGroupExecutionComplete(final String executorId, final 
String taskGroupId) {
-    lock.lock();
-    try {
-      final ExecutorRepresenter executor = 
executorRegistry.getRunningExecutorRepresenter(executorId);
-      executor.onTaskGroupExecutionComplete(taskGroupId);
-      LOG.info("{" + taskGroupId + "} completed in [" + executorId + "]");
-
-      // the scheduler thread may be waiting for a free slot...
-      final String containerType = executor.getContainerType();
-      signalPossiblyWaitingScheduler(containerType);
-    } finally {
-      lock.unlock();
-    }
+    final ExecutorRepresenter executor = 
executorRegistry.getRunningExecutorRepresenter(executorId);
+    executor.onTaskGroupExecutionComplete(taskGroupId);
+    LOG.info("{" + taskGroupId + "} completed in [" + executorId + "]");
   }
 
   @Override
   public void onTaskGroupExecutionFailed(final String executorId, final String 
taskGroupId) {
-    lock.lock();
-    try {
-      final ExecutorRepresenter executor = 
executorRegistry.getExecutorRepresenter(executorId);
-
-      executor.onTaskGroupExecutionFailed(taskGroupId);
-      LOG.info("{" + taskGroupId + "} failed in [" + executorId + "]");
+    final ExecutorRepresenter executor = 
executorRegistry.getExecutorRepresenter(executorId);
 
-      // the scheduler thread may be waiting for a free slot...
-      final String containerType = executor.getContainerType();
-      signalPossiblyWaitingScheduler(containerType);
-    } finally {
-      lock.unlock();
-    }
+    executor.onTaskGroupExecutionFailed(taskGroupId);
+    LOG.info("{" + taskGroupId + "} failed in [" + executorId + "]");
   }
 
   @Override
@@ -276,13 +199,5 @@ private boolean hasFreeSlot(final ExecutorRepresenter 
executor) {
   private void initializeContainerTypeIfAbsent(final String containerType) {
     executorIdByContainerType.putIfAbsent(containerType, new ArrayList<>());
     nextExecutorIndexByContainerType.putIfAbsent(containerType, 0);
-    conditionByContainerType.putIfAbsent(containerType, lock.newCondition());
-  }
-
-  private void signalPossiblyWaitingScheduler(final String 
typeOfContainerWithNewFreeSlot) {
-    conditionByContainerType.get(typeOfContainerWithNewFreeSlot).signal();
-    if 
(!typeOfContainerWithNewFreeSlot.equals(ExecutorPlacementProperty.NONE)) {
-      conditionByContainerType.get(ExecutorPlacementProperty.NONE).signal();
-    }
   }
 }
diff --git 
a/runtime/master/src/main/java/edu/snu/nemo/runtime/master/scheduler/Scheduler.java
 
b/runtime/master/src/main/java/edu/snu/nemo/runtime/master/scheduler/Scheduler.java
index 65f44dcc..deef9d87 100644
--- 
a/runtime/master/src/main/java/edu/snu/nemo/runtime/master/scheduler/Scheduler.java
+++ 
b/runtime/master/src/main/java/edu/snu/nemo/runtime/master/scheduler/Scheduler.java
@@ -27,7 +27,8 @@
 
 /**
  * Only two threads call scheduling code: RuntimeMaster thread (RMT), and 
SchedulerThread(ST).
- * RMT and ST meet only at two points: SchedulingPolicy, and 
PendingTaskGroupQueue, which are synchronized(ThreadSafe).
+ * RMT and ST meet only at two points: SchedulingPolicy, and 
PendingTaskGroupCollection,
+ * which are synchronized(ThreadSafe).
  * Other scheduler-related classes that are accessed by only one of the two 
threads are not synchronized(NotThreadSafe).
  *
  * Receives jobs to execute and schedules
diff --git 
a/runtime/master/src/main/java/edu/snu/nemo/runtime/master/scheduler/SchedulerRunner.java
 
b/runtime/master/src/main/java/edu/snu/nemo/runtime/master/scheduler/SchedulerRunner.java
index c6079d6c..6806d743 100644
--- 
a/runtime/master/src/main/java/edu/snu/nemo/runtime/master/scheduler/SchedulerRunner.java
+++ 
b/runtime/master/src/main/java/edu/snu/nemo/runtime/master/scheduler/SchedulerRunner.java
@@ -23,6 +23,10 @@
 import java.util.*;
 import java.util.concurrent.ExecutorService;
 import java.util.concurrent.Executors;
+import java.util.concurrent.atomic.AtomicBoolean;
+import java.util.concurrent.locks.Condition;
+import java.util.concurrent.locks.Lock;
+import java.util.concurrent.locks.ReentrantLock;
 
 import org.slf4j.Logger;
 import org.slf4j.LoggerFactory;
@@ -39,22 +43,38 @@
   private static final Logger LOG = 
LoggerFactory.getLogger(SchedulerRunner.class.getName());
   private final Map<String, JobStateManager> jobStateManagers;
   private final SchedulingPolicy schedulingPolicy;
-  private final PendingTaskGroupQueue pendingTaskGroupQueue;
+  private final PendingTaskGroupCollection pendingTaskGroupCollection;
   private final ExecutorService schedulerThread;
   private boolean initialJobScheduled;
   private boolean isTerminated;
+  private final DelayedSignalingCondition 
mustCheckSchedulingAvailabilityOrSchedulerTerminated
+      = new DelayedSignalingCondition();
 
   @Inject
   public SchedulerRunner(final SchedulingPolicy schedulingPolicy,
-                         final PendingTaskGroupQueue pendingTaskGroupQueue) {
+                         final PendingTaskGroupCollection 
pendingTaskGroupCollection) {
     this.jobStateManagers = new HashMap<>();
-    this.pendingTaskGroupQueue = pendingTaskGroupQueue;
+    this.pendingTaskGroupCollection = pendingTaskGroupCollection;
     this.schedulingPolicy = schedulingPolicy;
     this.schedulerThread = Executors.newSingleThreadExecutor(runnable -> new 
Thread(runnable, "SchedulerRunner"));
     this.initialJobScheduled = false;
     this.isTerminated = false;
   }
 
+  /**
+   * Signals to the condition on executor availability.
+   */
+  public void onAnExecutorAvailable() {
+    mustCheckSchedulingAvailabilityOrSchedulerTerminated.signal();
+  }
+
+  /**
+   * Signals to the condition on TaskGroup availability.
+   */
+  public void onATaskGroupAvailable() {
+    mustCheckSchedulingAvailabilityOrSchedulerTerminated.signal();
+  }
+
   /**
    * Begin scheduling a job.
    * @param jobStateManager the corresponding {@link JobStateManager}
@@ -74,6 +94,7 @@ void scheduleJob(final JobStateManager jobStateManager) {
   void terminate() {
     schedulingPolicy.terminate();
     isTerminated = true;
+    mustCheckSchedulingAvailabilityOrSchedulerTerminated.signal();
   }
 
   /**
@@ -82,28 +103,43 @@ void terminate() {
   private final class SchedulerThread implements Runnable {
     @Override
     public void run() {
-      while (!isTerminated) {
-        try {
-          Optional<ScheduledTaskGroup> nextTaskGroupToSchedule;
-          do {
-            nextTaskGroupToSchedule = pendingTaskGroupQueue.dequeue();
-          } while (!nextTaskGroupToSchedule.isPresent());
+      // Run the first iteration unconditionally
+      mustCheckSchedulingAvailabilityOrSchedulerTerminated.signal();
 
-          final JobStateManager jobStateManager = 
jobStateManagers.get(nextTaskGroupToSchedule.get().getJobId());
-          final boolean isScheduled =
-              
schedulingPolicy.scheduleTaskGroup(nextTaskGroupToSchedule.get(), 
jobStateManager);
+      while (!isTerminated) {
+        // Iteration guard
+        mustCheckSchedulingAvailabilityOrSchedulerTerminated.await();
 
-          if (!isScheduled) {
-            LOG.info("Failed to assign an executor for {} before the timeout: 
{}",
-                new Object[]{nextTaskGroupToSchedule.get().getTaskGroupId(),
-                    schedulingPolicy.getScheduleTimeoutMs()});
+        final Collection<ScheduledTaskGroup> schedulableTaskGroups = 
pendingTaskGroupCollection
+            .peekSchedulableTaskGroups().orElse(null);
+        if (schedulableTaskGroups == null) {
+          // TaskGroup queue is empty
+          LOG.debug("PendingTaskGroupCollection is empty. Awaiting for more 
TaskGroups...");
+          continue;
+        }
 
-            // Put this TaskGroup back to the queue since we failed to 
schedule it.
-            pendingTaskGroupQueue.enqueue(nextTaskGroupToSchedule.get());
+        int numScheduledTaskGroups = 0;
+        for (final ScheduledTaskGroup schedulableTaskGroup : 
schedulableTaskGroups) {
+          final JobStateManager jobStateManager = 
jobStateManagers.get(schedulableTaskGroup.getJobId());
+          LOG.debug("Trying to schedule {}...", 
schedulableTaskGroup.getTaskGroupId());
+          final boolean isScheduled =
+              schedulingPolicy.scheduleTaskGroup(schedulableTaskGroup, 
jobStateManager);
+          if (isScheduled) {
+            LOG.debug("Successfully scheduled {}", 
schedulableTaskGroup.getTaskGroupId());
+            
pendingTaskGroupCollection.remove(schedulableTaskGroup.getTaskGroupId());
+            numScheduledTaskGroups++;
+          } else {
+            LOG.debug("Failed to schedule {}", 
schedulableTaskGroup.getTaskGroupId());
           }
-        } catch (final Exception e) {
-          e.printStackTrace();
-          throw e;
+        }
+
+        LOG.debug("Examined {} TaskGroups, scheduled {} TaskGroups",
+            schedulableTaskGroups.size(), numScheduledTaskGroups);
+        if (schedulableTaskGroups.size() == numScheduledTaskGroups) {
+          // Scheduled all TaskGroups in the stage
+          // Immediately run next iteration to check whether there is another 
schedulable stage
+          LOG.debug("Trying to schedule next Stage in the ScheduleGroup (if 
any)...");
+          mustCheckSchedulingAvailabilityOrSchedulerTerminated.signal();
         }
       }
       jobStateManagers.values().forEach(jobStateManager -> {
@@ -116,4 +152,45 @@ public void run() {
       LOG.info("SchedulerRunner Terminated!");
     }
   }
+
+  /**
+   * A {@link Condition} that allows 'delayed' signaling.
+   */
+  private final class DelayedSignalingCondition {
+    private final AtomicBoolean hasDelayedSignal = new AtomicBoolean(false);
+    private final Lock lock = new ReentrantLock();
+    private final Condition condition = lock.newCondition();
+
+    /**
+     * Signals to this condition. If no thread is awaiting for this condition,
+     * signaling is delayed until the first next {@link #await} invocation.
+     */
+    public void signal() {
+      lock.lock();
+      try {
+        hasDelayedSignal.set(true);
+        condition.signal();
+      } finally {
+        lock.unlock();
+      }
+    }
+
+    /**
+     * Awaits to this condition. The thread will awake when there is a delayed 
signal,
+     * or the next first {@link #signal} invocation.
+     */
+    public void await() {
+      lock.lock();
+      try {
+        if (!hasDelayedSignal.get()) {
+          condition.await();
+        }
+        hasDelayedSignal.set(false);
+      } catch (final InterruptedException e) {
+        throw new RuntimeException(e);
+      } finally {
+        lock.unlock();
+      }
+    }
+  }
 }
diff --git 
a/runtime/master/src/main/java/edu/snu/nemo/runtime/master/scheduler/SchedulingPolicy.java
 
b/runtime/master/src/main/java/edu/snu/nemo/runtime/master/scheduler/SchedulingPolicy.java
index 59f9adf7..061adae9 100644
--- 
a/runtime/master/src/main/java/edu/snu/nemo/runtime/master/scheduler/SchedulingPolicy.java
+++ 
b/runtime/master/src/main/java/edu/snu/nemo/runtime/master/scheduler/SchedulingPolicy.java
@@ -32,12 +32,6 @@
 @DefaultImplementation(SourceLocationAwareSchedulingPolicy.class)
 public interface SchedulingPolicy {
 
-  /**
-   * Returns this scheduling policy's timeout before an executor assignment.
-   * @return the timeout in milliseconds.
-   */
-  long getScheduleTimeoutMs();
-
   /**
    * Attempts to schedule the given taskGroup to an executor according to this 
policy.
    * If there is no executor available for the taskGroup, it waits for an 
executor to be assigned before it times out.
diff --git 
a/runtime/master/src/main/java/edu/snu/nemo/runtime/master/scheduler/SingleJobTaskGroupCollection.java
 
b/runtime/master/src/main/java/edu/snu/nemo/runtime/master/scheduler/SingleJobTaskGroupCollection.java
new file mode 100644
index 00000000..51c5db79
--- /dev/null
+++ 
b/runtime/master/src/main/java/edu/snu/nemo/runtime/master/scheduler/SingleJobTaskGroupCollection.java
@@ -0,0 +1,214 @@
+/*
+ * Copyright (C) 2017 Seoul National University
+ *
+ * Licensed under the Apache License, Version 2.0 (the "License");
+ * you may not use this file except in compliance with the License.
+ * You may obtain a copy of the License at
+ *
+ *         http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package edu.snu.nemo.runtime.master.scheduler;
+
+import edu.snu.nemo.common.dag.DAG;
+import edu.snu.nemo.runtime.common.RuntimeIdGenerator;
+import edu.snu.nemo.runtime.common.plan.physical.PhysicalPlan;
+import edu.snu.nemo.runtime.common.plan.physical.PhysicalStage;
+import edu.snu.nemo.runtime.common.plan.physical.PhysicalStageEdge;
+import edu.snu.nemo.runtime.common.plan.physical.ScheduledTaskGroup;
+import net.jcip.annotations.ThreadSafe;
+import org.apache.reef.annotations.audience.DriverSide;
+
+import javax.inject.Inject;
+import java.util.*;
+import java.util.concurrent.*;
+
+/**
+ * {@link PendingTaskGroupCollection} implementation.
+ * This class provides two-level scheduling by keeping track of schedulable 
stages and stage-TaskGroup membership.
+ * {@link #peekSchedulableTaskGroups()} returns collection of TaskGroups which 
belong to one of the schedulable stages.
+ */
+@ThreadSafe
+@DriverSide
+public final class SingleJobTaskGroupCollection implements 
PendingTaskGroupCollection {
+  private PhysicalPlan physicalPlan;
+
+  /**
+   * Pending TaskGroups awaiting to be scheduled for each stage.
+   */
+  private final ConcurrentMap<String, Map<String, ScheduledTaskGroup>> 
stageIdToPendingTaskGroups;
+
+  /**
+   * Stages with TaskGroups that have not yet been scheduled.
+   */
+  private final BlockingDeque<String> schedulableStages;
+
+  @Inject
+  public SingleJobTaskGroupCollection() {
+    stageIdToPendingTaskGroups = new ConcurrentHashMap<>();
+    schedulableStages = new LinkedBlockingDeque<>();
+  }
+
+  @Override
+  public synchronized void add(final ScheduledTaskGroup scheduledTaskGroup) {
+    final String stageId = 
RuntimeIdGenerator.getStageIdFromTaskGroupId(scheduledTaskGroup.getTaskGroupId());
+
+    stageIdToPendingTaskGroups.compute(stageId, (s, taskGroupIdToTaskGroup) -> 
{
+      if (taskGroupIdToTaskGroup == null) {
+        final Map<String, ScheduledTaskGroup> taskGroupIdToTaskGroupMap = new 
HashMap<>();
+        taskGroupIdToTaskGroupMap.put(scheduledTaskGroup.getTaskGroupId(), 
scheduledTaskGroup);
+        updateSchedulableStages(stageId, 
scheduledTaskGroup.getContainerType());
+        return taskGroupIdToTaskGroupMap;
+      } else {
+        taskGroupIdToTaskGroup.put(scheduledTaskGroup.getTaskGroupId(), 
scheduledTaskGroup);
+        return taskGroupIdToTaskGroup;
+      }
+    });
+  }
+
+  /**
+   * Removes the specified TaskGroup to be scheduled.
+   * The specified TaskGroup should belong to the collection from {@link 
#peekSchedulableTaskGroups()}.
+   * @param taskGroupId id of the TaskGroup
+   * @return the specified TaskGroup
+   * @throws NoSuchElementException if the specified TaskGroup is not in the 
queue,
+   *                                or removing this TaskGroup breaks 
scheduling order
+   *                                (i.e. does not belong to the collection 
from {@link #peekSchedulableTaskGroups()}.
+   */
+  @Override
+  public synchronized ScheduledTaskGroup remove(final String taskGroupId) 
throws NoSuchElementException {
+    final String stageId = schedulableStages.peekFirst();
+    if (stageId == null) {
+      throw new NoSuchElementException("No schedulable stage in TaskGroup 
queue");
+    }
+
+    final Map<String, ScheduledTaskGroup> pendingTaskGroupsForStage = 
stageIdToPendingTaskGroups.get(stageId);
+
+    if (pendingTaskGroupsForStage == null) {
+      throw new RuntimeException(String.format("Stage %s not found in 
TaskGroup queue", stageId));
+    }
+    final ScheduledTaskGroup taskGroupToSchedule = 
pendingTaskGroupsForStage.remove(taskGroupId);
+    if (taskGroupToSchedule == null) {
+      throw new NoSuchElementException(String.format("TaskGroup %s not found 
in TaskGroup queue", taskGroupId));
+    }
+    if (pendingTaskGroupsForStage.isEmpty()) {
+      if (!schedulableStages.pollFirst().equals(stageId)) {
+        throw new RuntimeException(String.format("Expected stage %s to be 
polled", stageId));
+      }
+      stageIdToPendingTaskGroups.remove(stageId);
+      stageIdToPendingTaskGroups.forEach((scheduledStageId, taskGroups) ->
+          updateSchedulableStages(scheduledStageId, 
taskGroups.values().iterator().next().getContainerType()));
+    }
+
+    return taskGroupToSchedule;
+  }
+
+  /**
+   * Peeks TaskGroups that can be scheduled.
+   * @return TaskGroups to be scheduled, or {@link Optional#empty()} if the 
queue is empty
+   * @return collection of TaskGroups which belong to one of the schedulable 
stages
+   *         or {@link Optional#empty} if the queue is empty
+   */
+  @Override
+  public synchronized Optional<Collection<ScheduledTaskGroup>> 
peekSchedulableTaskGroups() {
+    final String stageId = schedulableStages.peekFirst();
+    if (stageId == null) {
+      return Optional.empty();
+    }
+
+    final Map<String, ScheduledTaskGroup> pendingTaskGroupsForStage = 
stageIdToPendingTaskGroups.get(stageId);
+    if (pendingTaskGroupsForStage == null) {
+      throw new RuntimeException(String.format("Stage %s not found in 
stageIdToPendingTaskGroups map", stageId));
+    }
+    return Optional.of(new ArrayList<>(pendingTaskGroupsForStage.values()));
+  }
+
+  /**
+   * Removes a stage and its descendant stages from this PQ.
+   * @param stageId for the stage to begin the removal recursively.
+   */
+  @Override
+  public synchronized void removeTaskGroupsAndDescendants(final String 
stageId) {
+    removeStageAndChildren(stageId);
+  }
+
+  /**
+   * Recursively removes a stage and its children stages from this PQ.
+   * @param stageId for the stage to begin the removal recursively.
+   */
+  private synchronized void removeStageAndChildren(final String stageId) {
+    schedulableStages.remove(stageId);
+    stageIdToPendingTaskGroups.remove(stageId);
+
+    physicalPlan.getStageDAG().getChildren(stageId).forEach(
+        physicalStage -> removeStageAndChildren(physicalStage.getId()));
+  }
+
+  /**
+   * Updates the two-level data structure by examining a new candidate stage.
+   * If there are no stages with higher priority, the candidate can be made 
schedulable.
+   *
+   * NOTE: This method provides the "line up" between stages, by assigning 
priorities,
+   * serving as the key to the "priority" implementation of this class.
+   * @param candidateStageId for the stage that can potentially be scheduled.
+   * @param candidateStageContainerType for the stage that can potentially be 
scheduled.
+   */
+  private synchronized void updateSchedulableStages(
+      final String candidateStageId, final String candidateStageContainerType) 
{
+    final DAG<PhysicalStage, PhysicalStageEdge> jobDAG = 
physicalPlan.getStageDAG();
+
+    if (isSchedulable(candidateStageId, candidateStageContainerType)) {
+      // Check for ancestor stages that became schedulable due to 
candidateStage's absence from the queue.
+      jobDAG.getAncestors(candidateStageId).forEach(ancestorStage -> {
+        if (schedulableStages.contains(ancestorStage.getId())) {
+          // Remove the ancestor stage if it is of the same container type.
+          if 
(candidateStageContainerType.equals(ancestorStage.getContainerType())) {
+            schedulableStages.remove(ancestorStage.getId());
+          }
+        }
+      });
+      if (!schedulableStages.contains(candidateStageId)) {
+        schedulableStages.addLast(candidateStageId);
+      }
+    }
+  }
+
+  /**
+   * Determines whether the given candidate stage is schedulable immediately 
or not.
+   * @param candidateStageId for the stage that can potentially be scheduled.
+   * @param candidateStageContainerType for the stage that can potentially be 
scheduled.
+   * @return true if schedulable, false otherwise.
+   */
+  private synchronized boolean isSchedulable(final String candidateStageId, 
final String candidateStageContainerType) {
+    final DAG<PhysicalStage, PhysicalStageEdge> jobDAG = 
physicalPlan.getStageDAG();
+    for (final PhysicalStage descendantStage : 
jobDAG.getDescendants(candidateStageId)) {
+      if (schedulableStages.contains(descendantStage.getId())) {
+        if 
(candidateStageContainerType.equals(descendantStage.getContainerType())) {
+          return false;
+        }
+      }
+    }
+    return true;
+  }
+
+  @Override
+  public synchronized void onJobScheduled(final PhysicalPlan 
physicalPlanForJob) {
+    this.physicalPlan = physicalPlanForJob;
+  }
+
+  @Override
+  public synchronized boolean isEmpty() {
+    return schedulableStages.isEmpty();
+  }
+
+  @Override
+  public synchronized void close() {
+    schedulableStages.clear();
+    stageIdToPendingTaskGroups.clear();
+  }
+}
diff --git 
a/runtime/master/src/main/java/edu/snu/nemo/runtime/master/scheduler/SingleJobTaskGroupQueue.java
 
b/runtime/master/src/main/java/edu/snu/nemo/runtime/master/scheduler/SingleJobTaskGroupQueue.java
deleted file mode 100644
index c605390b..00000000
--- 
a/runtime/master/src/main/java/edu/snu/nemo/runtime/master/scheduler/SingleJobTaskGroupQueue.java
+++ /dev/null
@@ -1,212 +0,0 @@
-/*
- * Copyright (C) 2017 Seoul National University
- *
- * Licensed under the Apache License, Version 2.0 (the "License");
- * you may not use this file except in compliance with the License.
- * You may obtain a copy of the License at
- *
- *         http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
- */
-package edu.snu.nemo.runtime.master.scheduler;
-
-import edu.snu.nemo.common.dag.DAG;
-import edu.snu.nemo.common.exception.SchedulingException;
-import edu.snu.nemo.runtime.common.RuntimeIdGenerator;
-import edu.snu.nemo.runtime.common.plan.physical.PhysicalPlan;
-import edu.snu.nemo.runtime.common.plan.physical.PhysicalStage;
-import edu.snu.nemo.runtime.common.plan.physical.PhysicalStageEdge;
-import edu.snu.nemo.runtime.common.plan.physical.ScheduledTaskGroup;
-import net.jcip.annotations.ThreadSafe;
-import org.apache.reef.annotations.audience.DriverSide;
-
-import javax.inject.Inject;
-import java.util.*;
-import java.util.concurrent.*;
-import java.util.function.BiFunction;
-
-/**
- * Keep tracks of all pending task groups.
- * This class provides two-level queue scheduling by prioritizing TaskGroups 
of certain stages to be scheduled first.
- * Stages that are mutually independent alternate turns in scheduling each of 
their TaskGroups.
- * This PQ assumes that stages/task groups of higher priorities are never 
enqueued without first removing
- * those of lower priorities (which is how Scheduler behaves) for simplicity.
- */
-@ThreadSafe
-@DriverSide
-public final class SingleJobTaskGroupQueue implements PendingTaskGroupQueue {
-  private PhysicalPlan physicalPlan;
-
-  /**
-   * Pending TaskGroups awaiting to be scheduled for each stage.
-   */
-  private final ConcurrentMap<String, Deque<ScheduledTaskGroup>> 
stageIdToPendingTaskGroups;
-
-  /**
-   * Stages with TaskGroups that have not yet been scheduled.
-   */
-  private final BlockingDeque<String> schedulableStages;
-
-  @Inject
-  public SingleJobTaskGroupQueue() {
-    stageIdToPendingTaskGroups = new ConcurrentHashMap<>();
-    schedulableStages = new LinkedBlockingDeque<>();
-  }
-
-  @Override
-  public void enqueue(final ScheduledTaskGroup scheduledTaskGroup) {
-    final String stageId = 
RuntimeIdGenerator.getStageIdFromTaskGroupId(scheduledTaskGroup.getTaskGroupId());
-
-    synchronized (stageIdToPendingTaskGroups) {
-      stageIdToPendingTaskGroups.compute(stageId,
-          new BiFunction<String, Deque<ScheduledTaskGroup>, 
Deque<ScheduledTaskGroup>>() {
-            @Override
-            public Deque<ScheduledTaskGroup> apply(final String s,
-                                                   final 
Deque<ScheduledTaskGroup> scheduledTaskGroups) {
-              if (scheduledTaskGroups == null) {
-                final Deque<ScheduledTaskGroup> pendingTaskGroupsForStage = 
new ArrayDeque<>();
-                pendingTaskGroupsForStage.add(scheduledTaskGroup);
-                updateSchedulableStages(stageId, 
scheduledTaskGroup.getContainerType());
-                return pendingTaskGroupsForStage;
-              } else {
-                scheduledTaskGroups.add(scheduledTaskGroup);
-                return scheduledTaskGroups;
-              }
-            }
-          });
-    }
-  }
-
-  /**
-   * Dequeues the next TaskGroup to be scheduled according to job dependency 
priority.
-   * @return the next TaskGroup to be scheduled
-   */
-  @Override
-  public Optional<ScheduledTaskGroup> dequeue() {
-    ScheduledTaskGroup taskGroupToSchedule = null;
-    final String stageId;
-    try {
-      stageId = schedulableStages.takeFirst();
-    } catch (InterruptedException e) {
-      e.printStackTrace();
-      throw new SchedulingException(new Throwable("An exception occurred while 
trying to dequeue the next TaskGroup"));
-    }
-
-    synchronized (stageIdToPendingTaskGroups) {
-      final Deque<ScheduledTaskGroup> pendingTaskGroupsForStage = 
stageIdToPendingTaskGroups.get(stageId);
-
-      if (pendingTaskGroupsForStage == null) {
-        schedulableStages.addLast(stageId);
-      } else {
-        taskGroupToSchedule = pendingTaskGroupsForStage.poll();
-        if (pendingTaskGroupsForStage.isEmpty()) {
-          stageIdToPendingTaskGroups.remove(stageId);
-          stageIdToPendingTaskGroups.forEach((scheduledStageId, taskGroupList) 
->
-              updateSchedulableStages(scheduledStageId, 
taskGroupList.getFirst().getContainerType()));
-        } else {
-          schedulableStages.addLast(stageId);
-        }
-      }
-    }
-
-    return (taskGroupToSchedule == null) ? Optional.empty()
-        : Optional.of(taskGroupToSchedule);
-  }
-
-  /**
-   * Removes a stage and its descendant stages from this PQ.
-   * @param stageId for the stage to begin the removal recursively.
-   */
-  @Override
-  public void removeTaskGroupsAndDescendants(final String stageId) {
-    synchronized (stageIdToPendingTaskGroups) {
-      removeStageAndChildren(stageId);
-    }
-  }
-
-  /**
-   * Recursively removes a stage and its children stages from this PQ.
-   * @param stageId for the stage to begin the removal recursively.
-   */
-  private void removeStageAndChildren(final String stageId) {
-    schedulableStages.remove(stageId);
-    stageIdToPendingTaskGroups.remove(stageId);
-
-    physicalPlan.getStageDAG().getChildren(stageId).forEach(
-        physicalStage -> removeStageAndChildren(physicalStage.getId()));
-  }
-
-  /**
-   * Updates the two-level PQ by examining a new candidate stage.
-   * If there are no stages with higher priority, the candidate can be made 
schedulable.
-   *
-   * NOTE: This method provides the "line up" between stages, by assigning 
priorities,
-   * serving as the key to the "priority" implementation of this class.
-   * @param candidateStageId for the stage that can potentially be scheduled.
-   * @param candidateStageContainerType for the stage that can potentially be 
scheduled.
-   */
-  private void updateSchedulableStages(final String candidateStageId, final 
String candidateStageContainerType) {
-    final DAG<PhysicalStage, PhysicalStageEdge> jobDAG = 
physicalPlan.getStageDAG();
-
-    if (isSchedulable(candidateStageId, candidateStageContainerType)) {
-      // Check for ancestor stages that became schedulable due to 
candidateStage's absence from the queue.
-      jobDAG.getAncestors(candidateStageId).forEach(ancestorStage -> {
-        if (schedulableStages.contains(ancestorStage.getId())) {
-          // Remove the ancestor stage if it is of the same container type.
-          if 
(candidateStageContainerType.equals(ancestorStage.getContainerType())) {
-            schedulableStages.remove(ancestorStage.getId());
-          }
-        }
-      });
-      if (!schedulableStages.contains(candidateStageId)) {
-        schedulableStages.addLast(candidateStageId);
-      }
-    }
-  }
-
-  /**
-   * Determines whether the given candidate stage is schedulable immediately 
or not.
-   * @param candidateStageId for the stage that can potentially be scheduled.
-   * @param candidateStageContainerType for the stage that can potentially be 
scheduled.
-   * @return true if schedulable, false otherwise.
-   */
-  private boolean isSchedulable(final String candidateStageId, final String 
candidateStageContainerType) {
-    final DAG<PhysicalStage, PhysicalStageEdge> jobDAG = 
physicalPlan.getStageDAG();
-    for (final PhysicalStage descendantStage : 
jobDAG.getDescendants(candidateStageId)) {
-      if (schedulableStages.contains(descendantStage.getId())) {
-        if 
(candidateStageContainerType.equals(descendantStage.getContainerType())) {
-          return false;
-        }
-      }
-    }
-    return true;
-  }
-
-  @Override
-  public void onJobScheduled(final PhysicalPlan physicalPlanForJob) {
-    this.physicalPlan = physicalPlanForJob;
-  }
-
-  @Override
-  public boolean isEmpty() {
-    synchronized (stageIdToPendingTaskGroups) {
-      for (final String stageId : schedulableStages) {
-        if (!stageIdToPendingTaskGroups.get(stageId).isEmpty()) {
-          return false;
-        }
-      }
-      return true;
-    }
-  }
-
-  @Override
-  public void close() {
-    schedulableStages.clear();
-    stageIdToPendingTaskGroups.clear();
-  }
-}
diff --git 
a/runtime/master/src/main/java/edu/snu/nemo/runtime/master/scheduler/SourceLocationAwareSchedulingPolicy.java
 
b/runtime/master/src/main/java/edu/snu/nemo/runtime/master/scheduler/SourceLocationAwareSchedulingPolicy.java
index e02c2596..9c692531 100644
--- 
a/runtime/master/src/main/java/edu/snu/nemo/runtime/master/scheduler/SourceLocationAwareSchedulingPolicy.java
+++ 
b/runtime/master/src/main/java/edu/snu/nemo/runtime/master/scheduler/SourceLocationAwareSchedulingPolicy.java
@@ -15,7 +15,6 @@
  */
 package edu.snu.nemo.runtime.master.scheduler;
 
-import edu.snu.nemo.common.exception.SchedulingException;
 import edu.snu.nemo.common.ir.Readable;
 import 
edu.snu.nemo.common.ir.vertex.executionproperty.ExecutorPlacementProperty;
 import edu.snu.nemo.runtime.common.plan.physical.ScheduledTaskGroup;
@@ -30,9 +29,6 @@
 import javax.inject.Inject;
 import java.util.*;
 import java.util.concurrent.ThreadLocalRandom;
-import java.util.concurrent.locks.Condition;
-import java.util.concurrent.locks.Lock;
-import java.util.concurrent.locks.ReentrantLock;
 import java.util.stream.Collectors;
 import java.util.stream.Stream;
 
@@ -48,9 +44,6 @@
 
   private final ExecutorRegistry executorRegistry;
   private final RoundRobinSchedulingPolicy roundRobinSchedulingPolicy;
-  private final long scheduleTimeoutMs;
-  private final Lock lock = new ReentrantLock();
-  private final Condition moreExecutorsAvailableCondition = 
lock.newCondition();
 
   /**
    * Injectable constructor for {@link SourceLocationAwareSchedulingPolicy}.
@@ -62,12 +55,6 @@ private SourceLocationAwareSchedulingPolicy(final 
ExecutorRegistry executorRegis
                                               final RoundRobinSchedulingPolicy 
roundRobinSchedulingPolicy) {
     this.executorRegistry = executorRegistry;
     this.roundRobinSchedulingPolicy = roundRobinSchedulingPolicy;
-    this.scheduleTimeoutMs = roundRobinSchedulingPolicy.getScheduleTimeoutMs();
-  }
-
-  @Override
-  public long getScheduleTimeoutMs() {
-    return scheduleTimeoutMs;
   }
 
   /**
@@ -82,40 +69,20 @@ public long getScheduleTimeoutMs() {
   @Override
   public boolean scheduleTaskGroup(final ScheduledTaskGroup scheduledTaskGroup,
                                    final JobStateManager jobStateManager) {
-    lock.lock();
+    Set<String> sourceLocations = Collections.emptySet();
     try {
-      Set<String> sourceLocations = Collections.emptySet();
-      try {
-        sourceLocations = 
getSourceLocations(scheduledTaskGroup.getLogicalTaskIdToReadable().values());
-      } catch (final UnsupportedOperationException e) {
-        // do nothing
-      } catch (final Exception e) {
-        LOG.warn(String.format("Exception while trying to get source location 
for %s",
-            scheduledTaskGroup.getTaskGroupId()), e);
-      }
-      if (sourceLocations.size() == 0) {
-        // No source location information found, fall back to the 
RoundRobinSchedulingPolicy
-        return 
roundRobinSchedulingPolicy.scheduleTaskGroup(scheduledTaskGroup, 
jobStateManager);
-      }
-
-      long timeoutInNanoseconds = scheduleTimeoutMs * 1000000;
-      while (timeoutInNanoseconds > 0) {
-        if (scheduleToLocalNode(scheduledTaskGroup, jobStateManager, 
sourceLocations)) {
-          return true;
-        }
-        try {
-          timeoutInNanoseconds = 
moreExecutorsAvailableCondition.awaitNanos(timeoutInNanoseconds);
-          // Signals on this condition does not necessarily guarantee that the 
added executor helps scheduling the
-          // TaskGroup we are interested in. We need to await again if the 
consequent scheduling attempt still fails,
-          // until we spend the time budget specified.
-        } catch (final InterruptedException e) {
-          throw new SchedulingException(e);
-        }
-      }
-      return false;
-    } finally {
-      lock.unlock();
+      sourceLocations = 
getSourceLocations(scheduledTaskGroup.getLogicalTaskIdToReadable().values());
+    } catch (final UnsupportedOperationException e) {
+      // do nothing
+    } catch (final Exception e) {
+      throw new RuntimeException(e);
+    }
+    if (sourceLocations.size() == 0) {
+      // No source location information found, fall back to the 
RoundRobinSchedulingPolicy
+      return roundRobinSchedulingPolicy.scheduleTaskGroup(scheduledTaskGroup, 
jobStateManager);
     }
+
+    return scheduleToLocalNode(scheduledTaskGroup, jobStateManager, 
sourceLocations);
   }
 
   /**
@@ -128,78 +95,45 @@ public boolean scheduleTaskGroup(final ScheduledTaskGroup 
scheduledTaskGroup,
   private boolean scheduleToLocalNode(final ScheduledTaskGroup 
scheduledTaskGroup,
                                       final JobStateManager jobStateManager,
                                       final Set<String> sourceLocations) {
-    lock.lock();
-    try {
-      final List<ExecutorRepresenter> candidateExecutors =
-          
selectExecutorByContainerTypeAndNodeNames(scheduledTaskGroup.getContainerType(),
 sourceLocations);
-      if (candidateExecutors.size() == 0) {
-        return false;
-      }
-      final int randomIndex = ThreadLocalRandom.current().nextInt(0, 
candidateExecutors.size());
-      final ExecutorRepresenter selectedExecutor = 
candidateExecutors.get(randomIndex);
-
-      
jobStateManager.onTaskGroupStateChanged(scheduledTaskGroup.getTaskGroupId(), 
TaskGroupState.State.EXECUTING);
-      selectedExecutor.onTaskGroupScheduled(scheduledTaskGroup);
-      LOG.info("Scheduling {} (source location: {}) to {} (node name: {})", 
scheduledTaskGroup.getTaskGroupId(),
-          String.join(", ", sourceLocations), selectedExecutor.getExecutorId(),
-          selectedExecutor.getNodeName());
-      return true;
-    } finally {
-      lock.unlock();
+    final List<ExecutorRepresenter> candidateExecutors =
+        
selectExecutorByContainerTypeAndNodeNames(scheduledTaskGroup.getContainerType(),
 sourceLocations);
+    if (candidateExecutors.size() == 0) {
+      return false;
     }
+    final int randomIndex = ThreadLocalRandom.current().nextInt(0, 
candidateExecutors.size());
+    final ExecutorRepresenter selectedExecutor = 
candidateExecutors.get(randomIndex);
+
+    
jobStateManager.onTaskGroupStateChanged(scheduledTaskGroup.getTaskGroupId(), 
TaskGroupState.State.EXECUTING);
+    selectedExecutor.onTaskGroupScheduled(scheduledTaskGroup);
+    LOG.info("Scheduling {} (source location: {}) to {} (node name: {})", 
scheduledTaskGroup.getTaskGroupId(),
+        String.join(", ", sourceLocations), selectedExecutor.getExecutorId(),
+        selectedExecutor.getNodeName());
+    return true;
   }
 
   @Override
   public void onExecutorAdded(final ExecutorRepresenter executorRepresenter) {
-    lock.lock();
-    try {
-      moreExecutorsAvailableCondition.signal();
-      roundRobinSchedulingPolicy.onExecutorAdded(executorRepresenter);
-    } finally {
-      lock.unlock();
-    }
+    roundRobinSchedulingPolicy.onExecutorAdded(executorRepresenter);
   }
 
   @Override
   public Set<String> onExecutorRemoved(final String executorId) {
-    lock.lock();
-    try {
-      return roundRobinSchedulingPolicy.onExecutorRemoved(executorId);
-    } finally {
-      lock.unlock();
-    }
+    return roundRobinSchedulingPolicy.onExecutorRemoved(executorId);
   }
 
   @Override
   public void onTaskGroupExecutionComplete(final String executorId, final 
String taskGroupId) {
-    lock.lock();
-    try {
-      moreExecutorsAvailableCondition.signal();
-      roundRobinSchedulingPolicy.onTaskGroupExecutionComplete(executorId, 
taskGroupId);
-    } finally {
-      lock.unlock();
-    }
+    roundRobinSchedulingPolicy.onTaskGroupExecutionComplete(executorId, 
taskGroupId);
   }
 
   @Override
   public void onTaskGroupExecutionFailed(final String executorId, final String 
taskGroupId) {
-    lock.lock();
-    try {
-      moreExecutorsAvailableCondition.signal();
-      roundRobinSchedulingPolicy.onTaskGroupExecutionFailed(executorId, 
taskGroupId);
-    } finally {
-      lock.unlock();
-    }
+    roundRobinSchedulingPolicy.onTaskGroupExecutionFailed(executorId, 
taskGroupId);
   }
 
   @Override
   public void terminate() {
-    lock.lock();
-    try {
-      roundRobinSchedulingPolicy.terminate();
-    } finally {
-      lock.unlock();
-    }
+    roundRobinSchedulingPolicy.terminate();
   }
 
   /**
@@ -209,21 +143,16 @@ public void terminate() {
    *         and has an empty slot for execution
    */
   private List<ExecutorRepresenter> selectExecutorByContainerTypeAndNodeNames(
-      final String containerType, final Set<String> nodeNames) {
-    lock.lock();
-    try {
-      final Stream<ExecutorRepresenter> localNodesWithSpareCapacity = 
executorRegistry.getRunningExecutorIds().stream()
-          .map(executorId -> 
executorRegistry.getRunningExecutorRepresenter(executorId))
-          .filter(executor -> executor.getRunningTaskGroups().size() < 
executor.getExecutorCapacity())
-          .filter(executor -> nodeNames.contains(executor.getNodeName()));
-      if (containerType.equals(ExecutorPlacementProperty.NONE)) {
-        return localNodesWithSpareCapacity.collect(Collectors.toList());
-      } else {
-        return localNodesWithSpareCapacity.filter(executor -> 
executor.getContainerType().equals(containerType))
-            .collect(Collectors.toList());
-      }
-    } finally {
-      lock.unlock();
+    final String containerType, final Set<String> nodeNames) {
+    final Stream<ExecutorRepresenter> localNodesWithSpareCapacity = 
executorRegistry.getRunningExecutorIds().stream()
+        .map(executorId -> 
executorRegistry.getRunningExecutorRepresenter(executorId))
+        .filter(executor -> executor.getRunningTaskGroups().size() < 
executor.getExecutorCapacity())
+        .filter(executor -> nodeNames.contains(executor.getNodeName()));
+    if (containerType.equals(ExecutorPlacementProperty.NONE)) {
+      return localNodesWithSpareCapacity.collect(Collectors.toList());
+    } else {
+      return localNodesWithSpareCapacity.filter(executor -> 
executor.getContainerType().equals(containerType))
+          .collect(Collectors.toList());
     }
   }
 
diff --git 
a/tests/src/test/java/edu/snu/nemo/tests/runtime/RuntimeTestUtil.java 
b/tests/src/test/java/edu/snu/nemo/tests/runtime/RuntimeTestUtil.java
index b4f411a6..f95c452a 100644
--- a/tests/src/test/java/edu/snu/nemo/tests/runtime/RuntimeTestUtil.java
+++ b/tests/src/test/java/edu/snu/nemo/tests/runtime/RuntimeTestUtil.java
@@ -21,7 +21,7 @@
 import edu.snu.nemo.runtime.master.JobStateManager;
 import edu.snu.nemo.runtime.master.scheduler.ExecutorRegistry;
 import edu.snu.nemo.runtime.master.resource.ExecutorRepresenter;
-import edu.snu.nemo.runtime.master.scheduler.PendingTaskGroupQueue;
+import edu.snu.nemo.runtime.master.scheduler.PendingTaskGroupCollection;
 import edu.snu.nemo.runtime.master.scheduler.Scheduler;
 import edu.snu.nemo.runtime.master.scheduler.SchedulingPolicy;
 import org.apache.beam.sdk.values.KV;
@@ -104,12 +104,13 @@ public static void 
sendTaskGroupStateEventToScheduler(final Scheduler scheduler,
     sendTaskGroupStateEventToScheduler(scheduler, executorRegistry, 
taskGroupId, newState, attemptIdx, null);
   }
 
-  public static void mockSchedulerRunner(final PendingTaskGroupQueue 
pendingTaskGroupQueue,
+  public static void mockSchedulerRunner(final PendingTaskGroupCollection 
pendingTaskGroupCollection,
                                          final SchedulingPolicy 
schedulingPolicy,
                                          final JobStateManager jobStateManager,
                                          final boolean isPartialSchedule) {
-    while (!pendingTaskGroupQueue.isEmpty()) {
-      final ScheduledTaskGroup taskGroupToSchedule = 
pendingTaskGroupQueue.dequeue().get();
+    while (!pendingTaskGroupCollection.isEmpty()) {
+      final ScheduledTaskGroup taskGroupToSchedule = 
pendingTaskGroupCollection.remove(
+          
pendingTaskGroupCollection.peekSchedulableTaskGroups().get().iterator().next().getTaskGroupId());
 
       schedulingPolicy.scheduleTaskGroup(taskGroupToSchedule, jobStateManager);
 
diff --git 
a/tests/src/test/java/edu/snu/nemo/tests/runtime/executor/datatransfer/DataTransferTest.java
 
b/tests/src/test/java/edu/snu/nemo/tests/runtime/executor/datatransfer/DataTransferTest.java
index 73a1b5d2..80f00d28 100644
--- 
a/tests/src/test/java/edu/snu/nemo/tests/runtime/executor/datatransfer/DataTransferTest.java
+++ 
b/tests/src/test/java/edu/snu/nemo/tests/runtime/executor/datatransfer/DataTransferTest.java
@@ -98,7 +98,6 @@
     SourceVertex.class})
 public final class DataTransferTest {
   private static final String EXECUTOR_ID_PREFIX = "Executor";
-  private static final int SCHEDULE_TIMEOUT = 1000;
   private static final DataStoreProperty.Value MEMORY_STORE = 
DataStoreProperty.Value.MemoryStore;
   private static final DataStoreProperty.Value SER_MEMORY_STORE = 
DataStoreProperty.Value.SerializedMemoryStore;
   private static final DataStoreProperty.Value LOCAL_FILE_STORE = 
DataStoreProperty.Value.LocalFileStore;
@@ -134,8 +133,8 @@ public void setUp() throws InjectionException {
     final PubSubEventHandlerWrapper pubSubEventHandler = 
mock(PubSubEventHandlerWrapper.class);
     final UpdatePhysicalPlanEventHandler updatePhysicalPlanEventHandler = 
mock(UpdatePhysicalPlanEventHandler.class);
     final SchedulingPolicy schedulingPolicy = new RoundRobinSchedulingPolicy(
-        injector.getInstance(ExecutorRegistry.class), SCHEDULE_TIMEOUT);
-    final PendingTaskGroupQueue taskGroupQueue = new SingleJobTaskGroupQueue();
+        injector.getInstance(ExecutorRegistry.class));
+    final PendingTaskGroupCollection taskGroupQueue = new 
SingleJobTaskGroupCollection();
     final SchedulerRunner schedulerRunner = new 
SchedulerRunner(schedulingPolicy, taskGroupQueue);
     final Scheduler scheduler =
         new BatchSingleJobScheduler(schedulingPolicy, schedulerRunner, 
taskGroupQueue, master,
diff --git 
a/tests/src/test/java/edu/snu/nemo/tests/runtime/master/scheduler/BatchSingleJobSchedulerTest.java
 
b/tests/src/test/java/edu/snu/nemo/tests/runtime/master/scheduler/BatchSingleJobSchedulerTest.java
index 9afe5047..c87bc818 100644
--- 
a/tests/src/test/java/edu/snu/nemo/tests/runtime/master/scheduler/BatchSingleJobSchedulerTest.java
+++ 
b/tests/src/test/java/edu/snu/nemo/tests/runtime/master/scheduler/BatchSingleJobSchedulerTest.java
@@ -81,15 +81,13 @@
   private SchedulerRunner schedulerRunner;
   private ExecutorRegistry executorRegistry;
   private MetricMessageHandler metricMessageHandler;
-  private PendingTaskGroupQueue pendingTaskGroupQueue;
+  private PendingTaskGroupCollection pendingTaskGroupCollection;
   private PubSubEventHandlerWrapper pubSubEventHandler;
   private UpdatePhysicalPlanEventHandler updatePhysicalPlanEventHandler;
   private BlockManagerMaster blockManagerMaster = 
mock(BlockManagerMaster.class);
   private final MessageSender<ControlMessage.Message> mockMsgSender = 
mock(MessageSender.class);
   private PhysicalPlanGenerator physicalPlanGenerator;
 
-  private static final int TEST_TIMEOUT_MS = 500;
-
   private static final int EXECUTOR_CAPACITY = 20;
 
   // This schedule index will make sure that task group events are not ignored
@@ -103,13 +101,13 @@ public void setUp() throws Exception {
     irDAGBuilder = initializeDAGBuilder();
     executorRegistry = injector.getInstance(ExecutorRegistry.class);
     metricMessageHandler = mock(MetricMessageHandler.class);
-    pendingTaskGroupQueue = new SingleJobTaskGroupQueue();
-    schedulingPolicy = new RoundRobinSchedulingPolicy(executorRegistry, 
TEST_TIMEOUT_MS);
-    schedulerRunner = new SchedulerRunner(schedulingPolicy, 
pendingTaskGroupQueue);
+    pendingTaskGroupCollection = new SingleJobTaskGroupCollection();
+    schedulingPolicy = new RoundRobinSchedulingPolicy(executorRegistry);
+    schedulerRunner = new SchedulerRunner(schedulingPolicy, 
pendingTaskGroupCollection);
     pubSubEventHandler = mock(PubSubEventHandlerWrapper.class);
     updatePhysicalPlanEventHandler = 
mock(UpdatePhysicalPlanEventHandler.class);
     scheduler =
-        new BatchSingleJobScheduler(schedulingPolicy, schedulerRunner, 
pendingTaskGroupQueue,
+        new BatchSingleJobScheduler(schedulingPolicy, schedulerRunner, 
pendingTaskGroupCollection,
             blockManagerMaster, pubSubEventHandler, 
updatePhysicalPlanEventHandler);
 
     final ActiveContext activeContext = mock(ActiveContext.class);
diff --git 
a/tests/src/test/java/edu/snu/nemo/tests/runtime/master/scheduler/FaultToleranceTest.java
 
b/tests/src/test/java/edu/snu/nemo/tests/runtime/master/scheduler/FaultToleranceTest.java
index 9d067b0e..8bfd993c 100644
--- 
a/tests/src/test/java/edu/snu/nemo/tests/runtime/master/scheduler/FaultToleranceTest.java
+++ 
b/tests/src/test/java/edu/snu/nemo/tests/runtime/master/scheduler/FaultToleranceTest.java
@@ -83,7 +83,7 @@
   private ExecutorRegistry executorRegistry;
 
   private MetricMessageHandler metricMessageHandler;
-  private PendingTaskGroupQueue pendingTaskGroupQueue;
+  private PendingTaskGroupCollection pendingTaskGroupCollection;
   private PubSubEventHandlerWrapper pubSubEventHandler;
   private UpdatePhysicalPlanEventHandler updatePhysicalPlanEventHandler;
   private BlockManagerMaster blockManagerMaster = 
mock(BlockManagerMaster.class);
@@ -91,7 +91,6 @@
   private final ExecutorService serExecutorService = 
Executors.newSingleThreadExecutor();
   private PhysicalPlanGenerator physicalPlanGenerator;
 
-  private static final int TEST_TIMEOUT_MS = 500;
   private static final int MAX_SCHEDULE_ATTEMPT = 3;
 
   @Before
@@ -111,16 +110,16 @@ private void setUpExecutors(final 
Collection<ExecutorRepresenter> executors,
                               final boolean useMockSchedulerRunner) throws 
InjectionException {
     executorRegistry = 
Tang.Factory.getTang().newInjector().getInstance(ExecutorRegistry.class);
 
-    pendingTaskGroupQueue = new SingleJobTaskGroupQueue();
-    schedulingPolicy = new RoundRobinSchedulingPolicy(executorRegistry, 
TEST_TIMEOUT_MS);
+    pendingTaskGroupCollection = new SingleJobTaskGroupCollection();
+    schedulingPolicy = new RoundRobinSchedulingPolicy(executorRegistry);
 
     if (useMockSchedulerRunner) {
       schedulerRunner = mock(SchedulerRunner.class);
     } else {
-      schedulerRunner = new SchedulerRunner(schedulingPolicy, 
pendingTaskGroupQueue);
+      schedulerRunner = new SchedulerRunner(schedulingPolicy, 
pendingTaskGroupCollection);
     }
     scheduler =
-        new BatchSingleJobScheduler(schedulingPolicy, schedulerRunner, 
pendingTaskGroupQueue,
+        new BatchSingleJobScheduler(schedulingPolicy, schedulerRunner, 
pendingTaskGroupCollection,
             blockManagerMaster, pubSubEventHandler, 
updatePhysicalPlanEventHandler);
 
     // Add nodes
@@ -208,14 +207,14 @@ public void testContainerRemoval() throws Exception {
       if (stage.getScheduleGroupIndex() == 0) {
 
         // There are 3 executors, each of capacity 2, and there are 6 
TaskGroups in ScheduleGroup 0.
-        RuntimeTestUtil.mockSchedulerRunner(pendingTaskGroupQueue, 
schedulingPolicy, jobStateManager, false);
-        assertTrue(pendingTaskGroupQueue.isEmpty());
+        RuntimeTestUtil.mockSchedulerRunner(pendingTaskGroupCollection, 
schedulingPolicy, jobStateManager, false);
+        assertTrue(pendingTaskGroupCollection.isEmpty());
         stage.getTaskGroupIds().forEach(taskGroupId ->
           RuntimeTestUtil.sendTaskGroupStateEventToScheduler(scheduler, 
executorRegistry,
               taskGroupId, TaskGroupState.State.COMPLETE, 1));
       } else if (stage.getScheduleGroupIndex() == 1) {
         // There are 3 executors, each of capacity 2, and there are 2 
TaskGroups in ScheduleGroup 1.
-        RuntimeTestUtil.mockSchedulerRunner(pendingTaskGroupQueue, 
schedulingPolicy, jobStateManager, false);
+        RuntimeTestUtil.mockSchedulerRunner(pendingTaskGroupCollection, 
schedulingPolicy, jobStateManager, false);
 
         // Due to round robin scheduling, "a2" is assured to have a running 
TaskGroup.
         scheduler.onExecutorRemoved("a2");
@@ -225,15 +224,15 @@ public void testContainerRemoval() throws Exception {
         }
         assertEquals(jobStateManager.getAttemptCountForStage(stage.getId()), 
2);
 
-        RuntimeTestUtil.mockSchedulerRunner(pendingTaskGroupQueue, 
schedulingPolicy, jobStateManager, false);
-        assertTrue(pendingTaskGroupQueue.isEmpty());
+        RuntimeTestUtil.mockSchedulerRunner(pendingTaskGroupCollection, 
schedulingPolicy, jobStateManager, false);
+        assertTrue(pendingTaskGroupCollection.isEmpty());
         stage.getTaskGroupIds().forEach(taskGroupId ->
           RuntimeTestUtil.sendTaskGroupStateEventToScheduler(scheduler, 
executorRegistry,
               taskGroupId, TaskGroupState.State.COMPLETE, 1));
       } else {
         // There are 2 executors, each of capacity 2, and there are 2 
TaskGroups in ScheduleGroup 2.
         // Schedule only the first TaskGroup
-        RuntimeTestUtil.mockSchedulerRunner(pendingTaskGroupQueue, 
schedulingPolicy, jobStateManager, true);
+        RuntimeTestUtil.mockSchedulerRunner(pendingTaskGroupCollection, 
schedulingPolicy, jobStateManager, true);
 
         boolean first = true;
         for (final String taskGroupId : stage.getTaskGroupIds()) {
@@ -285,15 +284,15 @@ public void testOutputFailure() throws Exception {
       if (stage.getScheduleGroupIndex() == 0) {
 
         // There are 3 executors, each of capacity 2, and there are 6 
TaskGroups in ScheduleGroup 0.
-        RuntimeTestUtil.mockSchedulerRunner(pendingTaskGroupQueue, 
schedulingPolicy, jobStateManager, false);
-        assertTrue(pendingTaskGroupQueue.isEmpty());
+        RuntimeTestUtil.mockSchedulerRunner(pendingTaskGroupCollection, 
schedulingPolicy, jobStateManager, false);
+        assertTrue(pendingTaskGroupCollection.isEmpty());
         stage.getTaskGroupIds().forEach(taskGroupId ->
           RuntimeTestUtil.sendTaskGroupStateEventToScheduler(scheduler, 
executorRegistry,
               taskGroupId, TaskGroupState.State.COMPLETE, 1));
       } else if (stage.getScheduleGroupIndex() == 1) {
         // There are 3 executors, each of capacity 2, and there are 2 
TaskGroups in ScheduleGroup 1.
-        RuntimeTestUtil.mockSchedulerRunner(pendingTaskGroupQueue, 
schedulingPolicy, jobStateManager, false);
-        assertTrue(pendingTaskGroupQueue.isEmpty());
+        RuntimeTestUtil.mockSchedulerRunner(pendingTaskGroupCollection, 
schedulingPolicy, jobStateManager, false);
+        assertTrue(pendingTaskGroupCollection.isEmpty());
         stage.getTaskGroupIds().forEach(taskGroupId ->
           RuntimeTestUtil.sendTaskGroupStateEventToScheduler(scheduler, 
executorRegistry,
               taskGroupId, TaskGroupState.State.FAILED_RECOVERABLE, 1,
@@ -304,7 +303,7 @@ public void testOutputFailure() throws Exception {
         }
 
         assertEquals(jobStateManager.getAttemptCountForStage(stage.getId()), 
3);
-        assertFalse(pendingTaskGroupQueue.isEmpty());
+        assertFalse(pendingTaskGroupCollection.isEmpty());
         stage.getTaskGroupIds().forEach(taskGroupId -> {
           
assertEquals(jobStateManager.getTaskGroupState(taskGroupId).getStateMachine().getCurrentState(),
               TaskGroupState.State.READY);
@@ -345,14 +344,14 @@ public void testInputReadFailure() throws Exception {
       if (stage.getScheduleGroupIndex() == 0) {
 
         // There are 3 executors, each of capacity 2, and there are 6 
TaskGroups in ScheduleGroup 0.
-        RuntimeTestUtil.mockSchedulerRunner(pendingTaskGroupQueue, 
schedulingPolicy, jobStateManager, false);
-        assertTrue(pendingTaskGroupQueue.isEmpty());
+        RuntimeTestUtil.mockSchedulerRunner(pendingTaskGroupCollection, 
schedulingPolicy, jobStateManager, false);
+        assertTrue(pendingTaskGroupCollection.isEmpty());
         stage.getTaskGroupIds().forEach(taskGroupId ->
           RuntimeTestUtil.sendTaskGroupStateEventToScheduler(scheduler, 
executorRegistry,
               taskGroupId, TaskGroupState.State.COMPLETE, 1));
       } else if (stage.getScheduleGroupIndex() == 1) {
         // There are 3 executors, each of capacity 2, and there are 2 
TaskGroups in ScheduleGroup 1.
-        RuntimeTestUtil.mockSchedulerRunner(pendingTaskGroupQueue, 
schedulingPolicy, jobStateManager, false);
+        RuntimeTestUtil.mockSchedulerRunner(pendingTaskGroupCollection, 
schedulingPolicy, jobStateManager, false);
 
         stage.getTaskGroupIds().forEach(taskGroupId ->
           RuntimeTestUtil.sendTaskGroupStateEventToScheduler(scheduler, 
executorRegistry,
diff --git 
a/tests/src/test/java/edu/snu/nemo/tests/runtime/master/scheduler/RoundRobinSchedulingPolicyTest.java
 
b/tests/src/test/java/edu/snu/nemo/tests/runtime/master/scheduler/RoundRobinSchedulingPolicyTest.java
index 72b19b7d..22068a59 100644
--- 
a/tests/src/test/java/edu/snu/nemo/tests/runtime/master/scheduler/RoundRobinSchedulingPolicyTest.java
+++ 
b/tests/src/test/java/edu/snu/nemo/tests/runtime/master/scheduler/RoundRobinSchedulingPolicyTest.java
@@ -41,10 +41,8 @@
 import java.util.concurrent.Executors;
 import java.util.function.Function;
 
-import static org.junit.Assert.assertEquals;
 import static org.junit.Assert.assertFalse;
 import static org.junit.Assert.assertTrue;
-import static org.mockito.Matchers.anyObject;
 import static org.mockito.Mockito.*;
 
 /**
@@ -53,8 +51,6 @@
 @RunWith(PowerMockRunner.class)
 @PrepareForTest(JobStateManager.class)
 public final class RoundRobinSchedulingPolicyTest {
-  private static final int TIMEOUT_MS = 2000;
-
   private SchedulingPolicy schedulingPolicy;
   private ExecutorRegistry executorRegistry;
   private final MessageSender<ControlMessage.Message> mockMsgSender = 
mock(MessageSender.class);
@@ -68,7 +64,7 @@
   public void setUp() throws InjectionException {
     executorRegistry = 
Tang.Factory.getTang().newInjector().getInstance(ExecutorRegistry.class);
 
-    schedulingPolicy = new RoundRobinSchedulingPolicy(executorRegistry, 
TIMEOUT_MS);
+    schedulingPolicy = new RoundRobinSchedulingPolicy(executorRegistry);
 
     final ActiveContext activeContext = mock(ActiveContext.class);
     Mockito.doThrow(new RuntimeException()).when(activeContext).close();
@@ -105,27 +101,6 @@ public void setUp() throws InjectionException {
     schedulingPolicy.onExecutorAdded(r);
   }
 
-  @Test
-  public void testWakeupFromAwaitByTaskGroupCompletion() {
-    final Timer timer = new Timer();
-    final List<ScheduledTaskGroup> scheduledTaskGroups =
-        convertToScheduledTaskGroups(5, new byte[0], "Stage", 
ExecutorPlacementProperty.RESERVED);
-    assertTrue(schedulingPolicy.scheduleTaskGroup(scheduledTaskGroups.get(0), 
jobStateManager));
-    timer.schedule(new TimerTask() {
-      @Override
-      public void run() {
-        schedulingPolicy.onTaskGroupExecutionComplete(RESERVED_EXECUTOR_ID,
-            scheduledTaskGroups.get(0).getTaskGroupId());
-      }
-    }, 1000);
-    assertTrue(schedulingPolicy.scheduleTaskGroup(scheduledTaskGroups.get(1), 
jobStateManager));
-  }
-
-  @Test
-  public void checkScheduleTimeout() {
-    assertEquals(schedulingPolicy.getScheduleTimeoutMs(), TIMEOUT_MS);
-  }
-
   @Test
   public void testNoneContainerType() {
     final int slots = 6;
@@ -143,67 +118,6 @@ public void testNoneContainerType() {
     assertFalse(isScheduled);
   }
 
-  @Test
-  public void testSingleCoreTwoTypesOfExecutors() {
-    final List<ScheduledTaskGroup> scheduledTaskGroupsA =
-        convertToScheduledTaskGroups(5, new byte[0], "Stage A", 
ExecutorPlacementProperty.COMPUTE);
-    final List<ScheduledTaskGroup> scheduledTaskGroupsB =
-        convertToScheduledTaskGroups(3, new byte[0], "Stage B", 
ExecutorPlacementProperty.TRANSIENT);
-
-
-    boolean a0 = 
schedulingPolicy.scheduleTaskGroup(scheduledTaskGroupsA.get(0), 
jobStateManager);
-    assertTrue(a0);
-
-    boolean a1 = 
schedulingPolicy.scheduleTaskGroup(scheduledTaskGroupsA.get(1), 
jobStateManager);
-    assertTrue(a1);
-
-    boolean a2 = 
schedulingPolicy.scheduleTaskGroup(scheduledTaskGroupsA.get(2), 
jobStateManager);
-    assertTrue(a2);
-
-    boolean a3 = 
schedulingPolicy.scheduleTaskGroup(scheduledTaskGroupsA.get(3), 
jobStateManager);
-    // After 2000 ms
-    assertFalse(a3);
-
-    schedulingPolicy.onTaskGroupExecutionComplete("a1", 
scheduledTaskGroupsA.get(0).getTaskGroupId());
-
-    a3 = schedulingPolicy.scheduleTaskGroup(scheduledTaskGroupsA.get(3), 
jobStateManager);
-    assertTrue(a3);
-
-    boolean a4 = 
schedulingPolicy.scheduleTaskGroup(scheduledTaskGroupsA.get(4), 
jobStateManager);
-    // After 2000 ms
-    assertFalse(a4);
-
-    schedulingPolicy.onTaskGroupExecutionComplete("a3", 
scheduledTaskGroupsA.get(2).getTaskGroupId());
-
-    a4 = schedulingPolicy.scheduleTaskGroup(scheduledTaskGroupsA.get(4), 
jobStateManager);
-    assertTrue(a4);
-
-    boolean b0 = 
schedulingPolicy.scheduleTaskGroup(scheduledTaskGroupsB.get(0), 
jobStateManager);
-    assertTrue(b0);
-
-    boolean b1 = 
schedulingPolicy.scheduleTaskGroup(scheduledTaskGroupsB.get(1), 
jobStateManager);
-    assertTrue(b1);
-
-    boolean b2 = 
schedulingPolicy.scheduleTaskGroup(scheduledTaskGroupsB.get(2), 
jobStateManager);
-    // After 2000 ms
-    assertFalse(b2);
-
-    schedulingPolicy.onTaskGroupExecutionComplete("b1", 
scheduledTaskGroupsB.get(0).getTaskGroupId());
-
-    b2 = schedulingPolicy.scheduleTaskGroup(scheduledTaskGroupsB.get(2), 
jobStateManager);
-    assertTrue(b2);
-
-    Set<String> executingTaskGroups = schedulingPolicy.onExecutorRemoved("b1");
-    assertEquals(1, executingTaskGroups.size());
-    assertEquals(scheduledTaskGroupsB.get(2).getTaskGroupId(), 
executingTaskGroups.iterator().next());
-
-    executingTaskGroups = schedulingPolicy.onExecutorRemoved("a1");
-    assertEquals(1, executingTaskGroups.size());
-    assertEquals(scheduledTaskGroupsA.get(3).getTaskGroupId(), 
executingTaskGroups.iterator().next());
-
-    verify(mockMsgSender, times(8)).send(anyObject());
-  }
-
   /**
    * Wrap a DAG of a task group into {@link ScheduledTaskGroup}s.
    *
diff --git 
a/tests/src/test/java/edu/snu/nemo/tests/runtime/master/scheduler/SingleTaskGroupQueueTest.java
 
b/tests/src/test/java/edu/snu/nemo/tests/runtime/master/scheduler/SingleTaskGroupQueueTest.java
index d48192fe..41bdb4dc 100644
--- 
a/tests/src/test/java/edu/snu/nemo/tests/runtime/master/scheduler/SingleTaskGroupQueueTest.java
+++ 
b/tests/src/test/java/edu/snu/nemo/tests/runtime/master/scheduler/SingleTaskGroupQueueTest.java
@@ -29,13 +29,14 @@
 import edu.snu.nemo.conf.JobConf;
 import edu.snu.nemo.runtime.common.RuntimeIdGenerator;
 import edu.snu.nemo.runtime.common.plan.physical.*;
-import edu.snu.nemo.runtime.master.scheduler.SingleJobTaskGroupQueue;
+import edu.snu.nemo.runtime.master.scheduler.SingleJobTaskGroupCollection;
 import edu.snu.nemo.tests.compiler.optimizer.policy.TestPolicy;
 import org.apache.reef.tang.Injector;
 import org.apache.reef.tang.Tang;
 import org.junit.Before;
 import org.junit.Test;
 
+import java.util.Collection;
 import java.util.Collections;
 import java.util.List;
 import java.util.concurrent.CountDownLatch;
@@ -48,11 +49,11 @@
 import static org.mockito.Mockito.mock;
 
 /**
- * Tests {@link SingleJobTaskGroupQueue}.
+ * Tests {@link SingleJobTaskGroupCollection}.
  */
 public final class SingleTaskGroupQueueTest {
   private DAGBuilder<IRVertex, IREdge> irDAGBuilder;
-  private SingleJobTaskGroupQueue pendingTaskGroupPriorityQueue;
+  private SingleJobTaskGroupCollection pendingTaskGroupPriorityQueue;
   private PhysicalPlanGenerator physicalPlanGenerator;
 
   /**
@@ -63,7 +64,7 @@
   @Before
   public void setUp() throws Exception{
     irDAGBuilder = new DAGBuilder<>();
-    pendingTaskGroupPriorityQueue = new SingleJobTaskGroupQueue();
+    pendingTaskGroupPriorityQueue = new SingleJobTaskGroupCollection();
     executorService = Executors.newFixedThreadPool(2);
 
     final Injector injector = Tang.Factory.getTang().newInjector();
@@ -72,7 +73,7 @@ public void setUp() throws Exception{
   }
 
   /**
-   * This method builds a physical DAG starting from an IR DAG and submits it 
to {@link SingleJobTaskGroupQueue}.
+   * This method builds a physical DAG starting from an IR DAG and submits it 
to {@link SingleJobTaskGroupCollection}.
    * Tests whether the dequeued TaskGroups are according to the 
stage-dependency priority.
    */
   @Test
@@ -117,7 +118,7 @@ public void testPushPriority() throws Exception {
     final AtomicBoolean passed = new AtomicBoolean(true);
 
     // This mimics Batch Scheduler's behavior
-    executorService.execute(() -> {
+    executorService.submit(() -> {
       // First schedule the children TaskGroups (since it is push).
       // BatchSingleJobScheduler will schedule TaskGroups in this order as 
well.
       scheduleStage(dagOf2Stages.get(1));
@@ -125,18 +126,18 @@ public void testPushPriority() throws Exception {
       scheduleStage(dagOf2Stages.get(0));
 
       countDownLatch.countDown();
-    });
+    }).get();
 
     // This mimics SchedulerRunner's behavior
-    executorService.execute(() -> {
+    executorService.submit(() -> {
       try {
         assertEquals(dequeueAndGetStageId(), dagOf2Stages.get(1).getId());
-        final ScheduledTaskGroup dequeuedTaskGroup = 
pendingTaskGroupPriorityQueue.dequeue().get();
+        final ScheduledTaskGroup dequeuedTaskGroup = dequeue();
         
assertEquals(RuntimeIdGenerator.getStageIdFromTaskGroupId(dequeuedTaskGroup.getTaskGroupId()),
             dagOf2Stages.get(1).getId());
 
-        // Let's say we fail to schedule, and enqueue this TaskGroup back.
-        pendingTaskGroupPriorityQueue.enqueue(dequeuedTaskGroup);
+        // Let's say we fail to schedule, and add this TaskGroup back.
+        pendingTaskGroupPriorityQueue.add(dequeuedTaskGroup);
         assertEquals(dequeueAndGetStageId(), dagOf2Stages.get(1).getId());
 
         // Now that we've dequeued all of the children TaskGroups, we should 
now start getting the parents.
@@ -148,14 +149,14 @@ public void testPushPriority() throws Exception {
       } finally {
         countDownLatch.countDown();
       }
-    });
+    }).get();
 
     countDownLatch.await();
     assertTrue(passed.get());
   }
 
   /**
-   * This method builds a physical DAG starting from an IR DAG and submits it 
to {@link SingleJobTaskGroupQueue}.
+   * This method builds a physical DAG starting from an IR DAG and submits it 
to {@link SingleJobTaskGroupCollection}.
    * Tests whether the dequeued TaskGroups are according to the 
stage-dependency priority.
    */
   @Test
@@ -199,23 +200,23 @@ public void testPullPriority() throws Exception {
     final CountDownLatch countDownLatch = new CountDownLatch(2);
 
     // This mimics Batch Scheduler's behavior
-    executorService.execute(() -> {
+    executorService.submit(() -> {
       // First schedule the parent TaskGroups (since it is pull).
       // BatchSingleJobScheduler will schedule TaskGroups in this order as 
well.
       scheduleStage(dagOf2Stages.get(0));
       countDownLatch.countDown();
-    });
+    }).get();
 
     // This mimics SchedulerRunner's behavior
-    executorService.execute(() -> {
+    executorService.submit(() -> {
       try {
         assertEquals(dequeueAndGetStageId(), dagOf2Stages.get(0).getId());
-        final ScheduledTaskGroup dequeuedTaskGroup = 
pendingTaskGroupPriorityQueue.dequeue().get();
+        final ScheduledTaskGroup dequeuedTaskGroup = dequeue();
         
assertEquals(RuntimeIdGenerator.getStageIdFromTaskGroupId(dequeuedTaskGroup.getTaskGroupId()),
             dagOf2Stages.get(0).getId());
 
-        // Let's say we fail to schedule, and enqueue this TaskGroup back.
-        pendingTaskGroupPriorityQueue.enqueue(dequeuedTaskGroup);
+        // Let's say we fail to schedule, and add this TaskGroup back.
+        pendingTaskGroupPriorityQueue.add(dequeuedTaskGroup);
         assertEquals(dequeueAndGetStageId(), dagOf2Stages.get(0).getId());
 
         // Now that we've dequeued all of the children TaskGroups, we should 
now schedule children.
@@ -228,102 +229,18 @@ public void testPullPriority() throws Exception {
       } finally {
         countDownLatch.countDown();
       }
-    });
+    }).get();
 
     countDownLatch.await();
   }
 
   /**
-   * This method builds a physical DAG starting from an IR DAG and submits it 
to {@link SingleJobTaskGroupQueue}.
-   * Tests whether the dequeued TaskGroups are according to the 
stage-dependency priority.
-   */
-  @Test
-  public void testPushRemoveAndAddStageDependency() throws Exception {
-    final Transform t = mock(Transform.class);
-    final IRVertex v1 = new OperatorVertex(t);
-    v1.setProperty(ParallelismProperty.of(3));
-    
v1.setProperty(ExecutorPlacementProperty.of(ExecutorPlacementProperty.COMPUTE));
-    irDAGBuilder.addVertex(v1);
-
-    final IRVertex v2 = new OperatorVertex(t);
-    v2.setProperty(ParallelismProperty.of(2));
-    
v2.setProperty(ExecutorPlacementProperty.of(ExecutorPlacementProperty.COMPUTE));
-    irDAGBuilder.addVertex(v2);
-
-    final IRVertex v3 = new OperatorVertex(t);
-    v3.setProperty(ParallelismProperty.of(2));
-    
v3.setProperty(ExecutorPlacementProperty.of(ExecutorPlacementProperty.COMPUTE));
-    irDAGBuilder.addVertex(v3);
-
-    final IREdge e1 = new 
IREdge(DataCommunicationPatternProperty.Value.Shuffle, v1, v2, 
Coder.DUMMY_CODER);
-    irDAGBuilder.connectVertices(e1);
-
-    final IREdge e2 = new 
IREdge(DataCommunicationPatternProperty.Value.OneToOne, v2, v3, 
Coder.DUMMY_CODER);
-    irDAGBuilder.connectVertices(e2);
-
-    final DAG<IRVertex, IREdge> irDAG = 
CompiletimeOptimizer.optimize(irDAGBuilder.buildWithoutSourceSinkCheck(),
-        new TestPolicy(true), "");
-
-    final DAG<PhysicalStage, PhysicalStageEdge> physicalDAG = 
irDAG.convert(physicalPlanGenerator);
-
-    pendingTaskGroupPriorityQueue.onJobScheduled(
-        new PhysicalPlan("TestPlan", physicalDAG, 
physicalPlanGenerator.getTaskIRVertexMap()));
-
-    final List<PhysicalStage> dagOf2Stages = physicalDAG.getTopologicalSort();
-
-    // Make sure that ScheduleGroups have been assigned to satisfy PendingPQ's 
requirements.
-    assertEquals(dagOf2Stages.get(0).getScheduleGroupIndex(), 
dagOf2Stages.get(1).getScheduleGroupIndex());
-
-    final CountDownLatch countDownLatch = new CountDownLatch(2);
-
-    // This mimics SchedulerRunner's behavior, but let's schedule this thread 
first this time,
-    // as opposed to testPushPriority.
-    executorService.execute(() -> {
-      try {
-        assertEquals(dequeueAndGetStageId(), dagOf2Stages.get(1).getId());
-        final ScheduledTaskGroup dequeuedTaskGroup = 
pendingTaskGroupPriorityQueue.dequeue().get();
-        
assertEquals(RuntimeIdGenerator.getStageIdFromTaskGroupId(dequeuedTaskGroup.getTaskGroupId()),
-            dagOf2Stages.get(1).getId());
-
-        // SchedulerRunner will never dequeue another TaskGroup before enquing 
back the failed TaskGroup,
-        // but just for testing purposes of PendingTGPQ...
-        assertEquals(dequeueAndGetStageId(), dagOf2Stages.get(0).getId());
-
-        // Let's say we fail to schedule, and enqueue this TaskGroup back.
-        pendingTaskGroupPriorityQueue.enqueue(dequeuedTaskGroup);
-        assertEquals(dequeueAndGetStageId(), dagOf2Stages.get(1).getId());
-
-        // Now that we've dequeued all of the children TaskGroups, we should 
now start getting the parents.
-        assertEquals(dequeueAndGetStageId(), dagOf2Stages.get(0).getId());
-        assertEquals(dequeueAndGetStageId(), dagOf2Stages.get(0).getId());
-      } catch (Exception e) {
-        e.printStackTrace();
-      } finally {
-        countDownLatch.countDown();
-      }
-    });
-
-    // This mimics Batch Scheduler's behavior
-    executorService.execute(() -> {
-      // First schedule the children TaskGroups (since it is push).
-      // BatchSingleJobScheduler will schedule TaskGroups in this order as 
well.
-      scheduleStage(dagOf2Stages.get(1));
-      // Then, schedule the parent TaskGroups.
-      scheduleStage(dagOf2Stages.get(0));
-
-      countDownLatch.countDown();
-    });
-
-    countDownLatch.await();
-  }
-
-  /**
-   * This method builds a physical DAG starting from an IR DAG and submits it 
to {@link SingleJobTaskGroupQueue}.
+   * This method builds a physical DAG starting from an IR DAG and submits it 
to {@link SingleJobTaskGroupCollection}.
    * Tests whether the dequeued TaskGroups are according to the 
stage-dependency priority,
    * while concurrently scheduling TaskGroups that have dependencies, but are 
of different container types.
    */
   @Test
-  public void testContainerTypeAwareness() throws Exception {
+  public void testWithDifferentContainerType() throws Exception {
     final Transform t = mock(Transform.class);
     final IRVertex v1 = new OperatorVertex(t);
     v1.setProperty(ParallelismProperty.of(3));
@@ -370,24 +287,23 @@ public void testContainerTypeAwareness() throws Exception 
{
     countDownLatch.countDown();
 
     // This mimics SchedulerRunner's behavior.
-    executorService.execute(() -> {
+    executorService.submit(() -> {
       try {
-        // Since Stage-0 and Stage-1 have different container types, they 
should simply alternate turns in scheduling.
         assertEquals(dequeueAndGetStageId(), dagOf2Stages.get(1).getId());
 
-        assertEquals(dequeueAndGetStageId(), dagOf2Stages.get(0).getId());
-
         assertEquals(dequeueAndGetStageId(), dagOf2Stages.get(1).getId());
 
         assertEquals(dequeueAndGetStageId(), dagOf2Stages.get(0).getId());
 
         assertEquals(dequeueAndGetStageId(), dagOf2Stages.get(0).getId());
+
+        assertEquals(dequeueAndGetStageId(), dagOf2Stages.get(0).getId());
       } catch (Exception e) {
         e.printStackTrace();
       } finally {
         countDownLatch.countDown();
       }
-    });
+    }).get();
 
     countDownLatch.await();
   }
@@ -398,7 +314,7 @@ public void testContainerTypeAwareness() throws Exception {
    */
   private void scheduleStage(final PhysicalStage stage) {
     stage.getTaskGroupIds().forEach(taskGroupId ->
-        pendingTaskGroupPriorityQueue.enqueue(new ScheduledTaskGroup(
+        pendingTaskGroupPriorityQueue.add(new ScheduledTaskGroup(
             "TestPlan", stage.getSerializedTaskGroupDag(), taskGroupId, 
Collections.emptyList(),
             Collections.emptyList(), 0, stage.getContainerType(), 
Collections.emptyMap())));
   }
@@ -408,7 +324,17 @@ private void scheduleStage(final PhysicalStage stage) {
    * @return the stage name of the dequeued task group.
    */
   private String dequeueAndGetStageId() {
-    final ScheduledTaskGroup scheduledTaskGroup = 
pendingTaskGroupPriorityQueue.dequeue().get();
+    final ScheduledTaskGroup scheduledTaskGroup = dequeue();
     return 
RuntimeIdGenerator.getStageIdFromTaskGroupId(scheduledTaskGroup.getTaskGroupId());
   }
+
+  /**
+   * Dequeues a scheduled task group from the task group priority queue.
+   * @return the TaskGroup dequeued
+   */
+  private ScheduledTaskGroup dequeue() {
+    final Collection<ScheduledTaskGroup> scheduledTaskGroups
+        = pendingTaskGroupPriorityQueue.peekSchedulableTaskGroups().get();
+    return 
pendingTaskGroupPriorityQueue.remove(scheduledTaskGroups.iterator().next().getTaskGroupId());
+  }
 }
diff --git 
a/tests/src/test/java/edu/snu/nemo/tests/runtime/master/scheduler/SourceLocationAwareSchedulingPolicyTest.java
 
b/tests/src/test/java/edu/snu/nemo/tests/runtime/master/scheduler/SourceLocationAwareSchedulingPolicyTest.java
index baf18cfa..143ecb4c 100644
--- 
a/tests/src/test/java/edu/snu/nemo/tests/runtime/master/scheduler/SourceLocationAwareSchedulingPolicyTest.java
+++ 
b/tests/src/test/java/edu/snu/nemo/tests/runtime/master/scheduler/SourceLocationAwareSchedulingPolicyTest.java
@@ -61,13 +61,14 @@
   private SpiedSchedulingPolicyWrapper<RoundRobinSchedulingPolicy> roundRobin;
   private MockJobStateManagerWrapper jobStateManager;
 
-  private void setup(final int schedulerTimeoutMs) {
+  @Before
+  public void setup() {
     final Injector injector = Tang.Factory.getTang().newInjector();
     jobStateManager = new MockJobStateManagerWrapper();
 
     final ExecutorRegistry executorRegistry = new ExecutorRegistry();
     final RoundRobinSchedulingPolicy roundRobinSchedulingPolicy =
-        new RoundRobinSchedulingPolicy(executorRegistry, schedulerTimeoutMs);
+        new RoundRobinSchedulingPolicy(executorRegistry);
     roundRobin = new SpiedSchedulingPolicyWrapper(roundRobinSchedulingPolicy, 
jobStateManager.get());
 
     injector.bindVolatileInstance(RoundRobinSchedulingPolicy.class, 
roundRobin.get());
@@ -92,8 +93,6 @@ public void teardown() {
    */
   @Test
   public void testRoundRobinSchedulerFallback() {
-    setup(500);
-
     // Prepare test scenario
     final ScheduledTaskGroup tg0 = 
CreateScheduledTaskGroup.withoutReadables(ExecutorPlacementProperty.NONE);
     final ScheduledTaskGroup tg1 = 
CreateScheduledTaskGroup.withReadablesWithoutSourceLocations(2,
@@ -131,8 +130,6 @@ public void testRoundRobinSchedulerFallback() {
    */
   @Test
   public void testSourceLocationAwareSchedulingNotAvailable() {
-    setup(500);
-
     // Prepare test scenario
     final ScheduledTaskGroup tg = 
CreateScheduledTaskGroup.withReadablesWithSourceLocations(
         Collections.singletonList(Collections.singletonList(SITE_0)), 
ExecutorPlacementProperty.NONE);
@@ -156,8 +153,6 @@ public void testSourceLocationAwareSchedulingNotAvailable() 
{
    */
   @Test
   public void testSourceLocationAwareSchedulingWithContainerType() {
-    setup(500);
-
     // Prepare test scenario
     final ScheduledTaskGroup tg = 
CreateScheduledTaskGroup.withReadablesWithSourceLocations(
         Collections.singletonList(Collections.singletonList(SITE_0)), 
CONTAINER_TYPE_A);
@@ -190,8 +185,6 @@ public void 
testSourceLocationAwareSchedulingWithContainerType() {
    */
   @Test
   public void testSourceLocationAwareSchedulingDoesNotOverSchedule() {
-    setup(500);
-
     // Prepare test scenario
     final ScheduledTaskGroup tg0 = 
CreateScheduledTaskGroup.withReadablesWithSourceLocations(
         Collections.singletonList(Collections.singletonList(SITE_0)), 
CONTAINER_TYPE_A);
@@ -221,8 +214,6 @@ public void 
testSourceLocationAwareSchedulingDoesNotOverSchedule() {
    */
   @Test
   public void testSourceLocationAwareSchedulingWithMultiSource() {
-    setup(500);
-
     // Prepare test scenario
     final ScheduledTaskGroup tg0 = 
CreateScheduledTaskGroup.withReadablesWithSourceLocations(
         Collections.singletonList(Collections.singletonList(SITE_1)), 
CONTAINER_TYPE_A);
@@ -247,62 +238,6 @@ public void 
testSourceLocationAwareSchedulingWithMultiSource() {
     e.assertScheduledTaskGroups(Arrays.asList(tg0, tg1, tg2, tg3));
   }
 
-  /**
-   * If there are no appropriate executors available, {@link 
SourceLocationAwareSchedulingPolicy} should await
-   * for the given amount of time, immediately waking up on executor addition.
-   */
-  @Test
-  public void testWakeupFromAwaitByExecutorAddition() {
-    // We need timeout value which is long enough.
-    setup(20000);
-    final Timer timer = new Timer();
-
-    // Prepare test scenario
-    final ScheduledTaskGroup tg = 
CreateScheduledTaskGroup.withReadablesWithSourceLocations(
-        Collections.singletonList(Collections.singletonList(SITE_1)), 
CONTAINER_TYPE_A);
-    final MockExecutorRepresenterWrapper e = new 
MockExecutorRepresenterWrapper(SITE_1, CONTAINER_TYPE_A, 1);
-
-    // The executor will be available in 1000ms.
-    timer.schedule(new TimerTask() {
-      @Override
-      public void run() {
-        addExecutor(e);
-      }
-    }, 1000);
-    // Attempt to schedule TG must success
-    assertTrue(sourceLocationAware.scheduleTaskGroup(tg, 
jobStateManager.get()));
-  }
-
-  /**
-   * If there are no appropriate executors available, {@link 
SourceLocationAwareSchedulingPolicy} should await
-   * for the given amount of time, immediately waking up on TaskGroup 
completion.
-   */
-  @Test
-  public void testWakeupFromAwaitByTaskGroupCompletion() {
-    // We need timeout value which is long enough.
-    setup(20000);
-    final Timer timer = new Timer();
-
-    // Prepare test scenario
-    final ScheduledTaskGroup tg0 = 
CreateScheduledTaskGroup.withReadablesWithSourceLocations(
-        Collections.singletonList(Collections.singletonList(SITE_1)), 
CONTAINER_TYPE_A);
-    final ScheduledTaskGroup tg1 = 
CreateScheduledTaskGroup.withReadablesWithSourceLocations(
-        Collections.singletonList(Collections.singletonList(SITE_1)), 
CONTAINER_TYPE_A);
-    final MockExecutorRepresenterWrapper e = addExecutor(new 
MockExecutorRepresenterWrapper(SITE_1, CONTAINER_TYPE_A, 1));
-
-    // Attempt to schedule TG must success
-    assertTrue(sourceLocationAware.scheduleTaskGroup(tg0, 
jobStateManager.get()));
-    // The TaskGroup will be completed in 1000ms.
-    timer.schedule(new TimerTask() {
-      @Override
-      public void run() {
-        
sourceLocationAware.onTaskGroupExecutionComplete(e.get().getExecutorId(), 
tg0.getTaskGroupId());
-      }
-    }, 1000);
-    // Attempt to schedule TG must success
-    assertTrue(sourceLocationAware.scheduleTaskGroup(tg1, 
jobStateManager.get()));
-  }
-
   private MockExecutorRepresenterWrapper addExecutor(final 
MockExecutorRepresenterWrapper executor) {
     sourceLocationAware.onExecutorAdded(executor.get());
     return executor;
@@ -362,7 +297,7 @@ static ScheduledTaskGroup 
withReadablesWhichThrowException(final int numReadable
         final List<Readable> readables = new ArrayList<>();
         for (int i = 0; i < numReadables; i++) {
           final Readable readable = mock(Readable.class);
-          when(readable.getLocations()).thenThrow(new Exception("EXCEPTION"));
+          when(readable.getLocations()).thenThrow(new 
UnsupportedOperationException());
           readables.add(readable);
         }
         return doCreate(readables, containerType);


 

----------------------------------------------------------------
This is an automated message from the Apache Git Service.
To respond to the message, please log on GitHub and use the
URL above to go to the specific comment.
 
For queries about this service, please contact Infrastructure at:
[email protected]


With regards,
Apache Git Services

Reply via email to