seojangho closed pull request #53: [NEMO-123] Replace PendingTaskCollection 
with pointers to ScheduleGroups
URL: https://github.com/apache/incubator-nemo/pull/53
 
 
   

This is a PR merged from a forked repository.
As GitHub hides the original diff on merge, it is displayed below for
the sake of provenance:

As this is a foreign pull request (from a fork), the diff is supplied
below (as it won't show otherwise due to GitHub magic):

diff --git 
a/compiler/optimizer/src/main/java/edu/snu/nemo/compiler/optimizer/pass/compiletime/composite/SailfishPass.java
 
b/compiler/optimizer/src/main/java/edu/snu/nemo/compiler/optimizer/pass/compiletime/composite/SailfishPass.java
index d5c50e4b..ab1ece12 100644
--- 
a/compiler/optimizer/src/main/java/edu/snu/nemo/compiler/optimizer/pass/compiletime/composite/SailfishPass.java
+++ 
b/compiler/optimizer/src/main/java/edu/snu/nemo/compiler/optimizer/pass/compiletime/composite/SailfishPass.java
@@ -32,8 +32,9 @@ public SailfishPass() {
         new SailfishRelayReshapingPass(),
         new SailfishEdgeDataFlowModelPass(),
         new SailfishEdgeDataStorePass(),
-        new SailfishEdgeDecoderPass(),
-        new SailfishEdgeEncoderPass(),
+        // TODO #125: Fix data loss bug caused by SailfishSchedulingPolicy
+        // new SailfishEdgeDecoderPass(),
+        // new SailfishEdgeEncoderPass(),
         new SailfishEdgeUsedDataHandlingPass()
     ));
   }
diff --git 
a/runtime/common/src/main/java/edu/snu/nemo/runtime/common/plan/Task.java 
b/runtime/common/src/main/java/edu/snu/nemo/runtime/common/plan/Task.java
index 21ea23a0..ce0a4ff2 100644
--- a/runtime/common/src/main/java/edu/snu/nemo/runtime/common/plan/Task.java
+++ b/runtime/common/src/main/java/edu/snu/nemo/runtime/common/plan/Task.java
@@ -18,6 +18,7 @@
 import edu.snu.nemo.common.ir.Readable;
 import edu.snu.nemo.common.ir.executionproperty.ExecutionPropertyMap;
 import edu.snu.nemo.common.ir.executionproperty.VertexExecutionProperty;
+import org.apache.commons.lang.SerializationUtils;
 
 import java.io.Serializable;
 import java.util.List;
@@ -134,4 +135,20 @@ public int getAttemptIdx() {
   public Map<String, Readable> getIrVertexIdToReadable() {
     return irVertexIdToReadable;
   }
+
+  @Override
+  public String toString() {
+    final StringBuilder sb = new StringBuilder();
+    sb.append("jobId: ");
+    sb.append(jobId);
+    sb.append(" / taskId: ");
+    sb.append(taskId);
+    sb.append(" / attempt: ");
+    sb.append(attemptIdx);
+    sb.append(" / irDAG: ");
+    sb.append(SerializationUtils.deserialize(serializedIRDag));
+    sb.append("/ exec props: ");
+    sb.append(getExecutionProperties());
+    return sb.toString();
+  }
 }
diff --git 
a/runtime/executor/src/main/java/edu/snu/nemo/runtime/executor/data/metadata/PartitionMetadata.java
 
b/runtime/executor/src/main/java/edu/snu/nemo/runtime/executor/data/metadata/PartitionMetadata.java
index 93b2c83d..9d68e5a3 100644
--- 
a/runtime/executor/src/main/java/edu/snu/nemo/runtime/executor/data/metadata/PartitionMetadata.java
+++ 
b/runtime/executor/src/main/java/edu/snu/nemo/runtime/executor/data/metadata/PartitionMetadata.java
@@ -72,4 +72,18 @@ public long getOffset() {
   public long getElementsTotal() {
     return elementsTotal;
   }
+
+  @Override
+  public String toString() {
+    final StringBuilder sb = new StringBuilder();
+    sb.append("key: ");
+    sb.append(key);
+    sb.append("/ partitionSize: ");
+    sb.append(partitionSize);
+    sb.append("/ offset: ");
+    sb.append(offset);
+    sb.append("/ elementsTotal: ");
+    sb.append(elementsTotal);
+    return sb.toString();
+  }
 }
diff --git 
a/runtime/executor/src/test/java/edu/snu/nemo/runtime/executor/datatransfer/DataTransferTest.java
 
b/runtime/executor/src/test/java/edu/snu/nemo/runtime/executor/datatransfer/DataTransferTest.java
index 88dda9dd..73f531c4 100644
--- 
a/runtime/executor/src/test/java/edu/snu/nemo/runtime/executor/datatransfer/DataTransferTest.java
+++ 
b/runtime/executor/src/test/java/edu/snu/nemo/runtime/executor/datatransfer/DataTransferTest.java
@@ -131,7 +131,7 @@ public void setUp() throws InjectionException {
     final PubSubEventHandlerWrapper pubSubEventHandler = 
mock(PubSubEventHandlerWrapper.class);
     final UpdatePhysicalPlanEventHandler updatePhysicalPlanEventHandler = 
mock(UpdatePhysicalPlanEventHandler.class);
     final SchedulingPolicy schedulingPolicy = 
injector.getInstance(CompositeSchedulingPolicy.class);
-    final PendingTaskCollection taskQueue = new SingleJobTaskCollection();
+    final PendingTaskCollectionPointer taskQueue = new 
PendingTaskCollectionPointer();
     final SchedulerRunner schedulerRunner = new 
SchedulerRunner(schedulingPolicy, taskQueue, executorRegistry);
     final Scheduler scheduler = new BatchSingleJobScheduler(
         schedulerRunner, taskQueue, master, pubSubEventHandler, 
updatePhysicalPlanEventHandler, executorRegistry);
diff --git 
a/runtime/master/src/main/java/edu/snu/nemo/runtime/master/BlockManagerMaster.java
 
b/runtime/master/src/main/java/edu/snu/nemo/runtime/master/BlockManagerMaster.java
index 9c24671c..1da2e722 100644
--- 
a/runtime/master/src/main/java/edu/snu/nemo/runtime/master/BlockManagerMaster.java
+++ 
b/runtime/master/src/main/java/edu/snu/nemo/runtime/master/BlockManagerMaster.java
@@ -176,9 +176,6 @@ public BlockLocationRequestHandler 
getBlockLocationHandler(final String blockId)
 
   /**
    * To be called when a potential producer task is scheduled.
-   * To be precise, it is called when the task is enqueued to
-   * {@link edu.snu.nemo.runtime.master.scheduler.PendingTaskCollection}.
-   *
    * @param scheduledTaskId the ID of the scheduled task.
    */
   public void onProducerTaskScheduled(final String scheduledTaskId) {
diff --git 
a/runtime/master/src/main/java/edu/snu/nemo/runtime/master/scheduler/BatchSingleJobScheduler.java
 
b/runtime/master/src/main/java/edu/snu/nemo/runtime/master/scheduler/BatchSingleJobScheduler.java
index 3506063c..4bd8d4fb 100644
--- 
a/runtime/master/src/main/java/edu/snu/nemo/runtime/master/scheduler/BatchSingleJobScheduler.java
+++ 
b/runtime/master/src/main/java/edu/snu/nemo/runtime/master/scheduler/BatchSingleJobScheduler.java
@@ -46,7 +46,7 @@
 import static edu.snu.nemo.runtime.common.state.TaskState.State.READY;
 
 /**
- * (WARNING) Only a single dedicated thread should use the public methods of 
this class.
+ * (CONCURRENCY) Only a single dedicated thread should use the public methods 
of this class.
  * (i.e., runtimeMasterThread in RuntimeMaster)
  *
  * BatchSingleJobScheduler receives a single {@link PhysicalPlan} to execute 
and schedules the Tasks.
@@ -60,7 +60,7 @@
    * Components related to scheduling the given job.
    */
   private final SchedulerRunner schedulerRunner;
-  private final PendingTaskCollection pendingTaskCollection;
+  private final PendingTaskCollectionPointer pendingTaskCollectionPointer;
   private final ExecutorRegistry executorRegistry;
 
   /**
@@ -78,13 +78,13 @@
 
   @Inject
   public BatchSingleJobScheduler(final SchedulerRunner schedulerRunner,
-                                 final PendingTaskCollection 
pendingTaskCollection,
+                                 final PendingTaskCollectionPointer 
pendingTaskCollectionPointer,
                                  final BlockManagerMaster blockManagerMaster,
                                  final PubSubEventHandlerWrapper 
pubSubEventHandlerWrapper,
                                  final UpdatePhysicalPlanEventHandler 
updatePhysicalPlanEventHandler,
                                  final ExecutorRegistry executorRegistry) {
     this.schedulerRunner = schedulerRunner;
-    this.pendingTaskCollection = pendingTaskCollection;
+    this.pendingTaskCollectionPointer = pendingTaskCollectionPointer;
     this.blockManagerMaster = blockManagerMaster;
     this.pubSubEventHandlerWrapper = pubSubEventHandlerWrapper;
     updatePhysicalPlanEventHandler.setScheduler(this);
@@ -96,26 +96,28 @@ public BatchSingleJobScheduler(final SchedulerRunner 
schedulerRunner,
   }
 
   /**
-   * Receives a job to schedule.
-   * @param jobToSchedule the physical plan for the job.
-   * @param scheduledJobStateManager to keep track of the submitted job's 
states.
+   * @param physicalPlanOfJob of the job.
+   * @param jobStateManagerOfJob of the job.
    */
   @Override
-  public void scheduleJob(final PhysicalPlan jobToSchedule, final 
JobStateManager scheduledJobStateManager) {
-    this.physicalPlan = jobToSchedule;
-    this.jobStateManager = scheduledJobStateManager;
+  public void scheduleJob(final PhysicalPlan physicalPlanOfJob, final 
JobStateManager jobStateManagerOfJob) {
+    if (this.physicalPlan != null || this.jobStateManager != null) {
+      throw new IllegalStateException("scheduleJob() has been called more than 
once");
+    }
+
+    this.physicalPlan = physicalPlanOfJob;
+    this.jobStateManager = jobStateManagerOfJob;
 
-    schedulerRunner.scheduleJob(scheduledJobStateManager);
+    schedulerRunner.scheduleJob(jobStateManagerOfJob);
     schedulerRunner.runSchedulerThread();
-    pendingTaskCollection.onJobScheduled(physicalPlan);
 
-    LOG.info("Job to schedule: {}", jobToSchedule.getId());
+    LOG.info("Job to schedule: {}", physicalPlanOfJob.getId());
 
-    this.initialScheduleGroup = 
jobToSchedule.getStageDAG().getVertices().stream()
+    this.initialScheduleGroup = 
physicalPlanOfJob.getStageDAG().getVertices().stream()
         .mapToInt(stage -> stage.getScheduleGroupIndex())
         .min().getAsInt();
 
-    scheduleRootStages();
+    scheduleNextScheduleGroup(initialScheduleGroup);
   }
 
   @Override
@@ -211,7 +213,8 @@ public void onExecutorRemoved(final String executorId) {
       // Schedule a stage after marking the necessary tasks to 
failed_recoverable.
       // The stage for one of the tasks that failed is a starting point to look
       // for the next stage to be scheduled.
-      
scheduleNextStage(RuntimeIdGenerator.getStageIdFromTaskId(tasksToReExecute.iterator().next()));
+      scheduleNextScheduleGroup(getSchedulingIndexOfStage(
+          
RuntimeIdGenerator.getStageIdFromTaskId(tasksToReExecute.iterator().next())));
     }
   }
 
@@ -219,133 +222,115 @@ public void onExecutorRemoved(final String executorId) {
   public void terminate() {
     this.schedulerRunner.terminate();
     this.executorRegistry.terminate();
-    this.pendingTaskCollection.close();
-  }
-
-  /**
-   * Schedule stages in initial schedule group, in reverse-topological order.
-   */
-  private void scheduleRootStages() {
-    final List<Stage> rootStages =
-        physicalPlan.getStageDAG().getTopologicalSort().stream().filter(stage 
->
-            stage.getScheduleGroupIndex() == initialScheduleGroup)
-            .collect(Collectors.toList());
-    Collections.reverse(rootStages);
-    rootStages.forEach(this::scheduleStage);
   }
 
   /**
-   * Schedules the next stage to execute after a stage completion.
-   * @param completedStageId the ID of the stage that just completed and 
triggered this scheduling.
+   * Schedules the next schedule group to execute.
+   * @param referenceIndex of the schedule group.
    */
-  private void scheduleNextStage(final String completedStageId) {
-    final Stage completeOrFailedStage = getStageById(completedStageId);
-    final Optional<List<Stage>> nextStagesToSchedule =
-        
selectNextStagesToSchedule(completeOrFailedStage.getScheduleGroupIndex());
-
-    if (nextStagesToSchedule.isPresent()) {
-      LOG.info("Scheduling: ScheduleGroup {}", 
nextStagesToSchedule.get().get(0).getScheduleGroupIndex());
-
-      nextStagesToSchedule.get().forEach(this::scheduleStage);
+  private void scheduleNextScheduleGroup(final int referenceIndex) {
+    final Optional<List<Stage>> nextScheduleGroupToSchedule = 
selectNextScheduleGroupToSchedule(referenceIndex);
+
+    if (nextScheduleGroupToSchedule.isPresent()) {
+      LOG.info("Scheduling: ScheduleGroup {}", 
nextScheduleGroupToSchedule.get());
+      final List<Task> tasksToSchedule = 
nextScheduleGroupToSchedule.get().stream()
+          .flatMap(stage -> getSchedulableTasks(stage).stream())
+          .collect(Collectors.toList());
+      pendingTaskCollectionPointer.setToOverwrite(tasksToSchedule);
+      schedulerRunner.onNewPendingTaskCollectionAvailable();
     } else {
       LOG.info("Skipping this round as the next schedulable stages have 
already been scheduled.");
     }
   }
 
   /**
-   * Selects the list of stages to schedule, in the order they must be added 
to {@link PendingTaskCollection}.
+   * Selects the next stage to schedule.
+   * It takes the referenceScheduleGroupIndex as a reference point to begin 
looking for the stages to execute:
    *
-   * This is a recursive function that decides which schedule group to 
schedule upon a stage completion, or a failure.
-   * It takes the currentScheduleGroupIndex as a reference point to begin 
looking for the stages to execute:
    * a) returns the failed_recoverable stage(s) of the earliest schedule 
group, if it(they) exists.
    * b) returns an empty optional if there are no schedulable stages at the 
moment.
    *    - if the current schedule group is still executing
    *    - if an ancestor schedule group is still executing
    * c) returns the next set of schedulable stages (if the current schedule 
group has completed execution)
    *
-   * The current implementation assumes that the stages that belong to the 
same schedule group are
-   * either mutually independent, or connected by a "push" edge.
-   *
-   * @param currentScheduleGroupIndex
+   * @param referenceScheduleGroupIndex
    *      the index of the schedule group that is executing/has executed when 
this method is called.
-   * @return an optional of the (possibly empty) list of next schedulable 
stages, in the order they should be
-   * enqueued to {@link PendingTaskCollection}.
+   * @return an optional of the (possibly empty) next schedulable stage
    */
-  private Optional<List<Stage>> selectNextStagesToSchedule(final int 
currentScheduleGroupIndex) {
-    if (currentScheduleGroupIndex > initialScheduleGroup) {
+  private Optional<List<Stage>> selectNextScheduleGroupToSchedule(final int 
referenceScheduleGroupIndex) {
+    // Recursively check the previous schedule group.
+    if (referenceScheduleGroupIndex > initialScheduleGroup) {
       final Optional<List<Stage>> ancestorStagesFromAScheduleGroup =
-          selectNextStagesToSchedule(currentScheduleGroupIndex - 1);
+          selectNextScheduleGroupToSchedule(referenceScheduleGroupIndex - 1);
       if (ancestorStagesFromAScheduleGroup.isPresent()) {
+        // Nothing to schedule from the previous schedule group.
         return ancestorStagesFromAScheduleGroup;
       }
     }
 
+    // Return the schedulable stage list in reverse-topological order
+    // since the stages that belong to the same schedule group are mutually 
independent,
+    // or connected by a "push" edge, where scheduling the children stages 
first is preferred.
+    final List<Stage> reverseTopoStages = 
physicalPlan.getStageDAG().getTopologicalSort();
+    Collections.reverse(reverseTopoStages);
+
     // All previous schedule groups are complete, we need to check for the 
current schedule group.
-    final List<Stage> currentScheduleGroup =
-        physicalPlan.getStageDAG().getTopologicalSort().stream().filter(stage 
->
-            stage.getScheduleGroupIndex() == currentScheduleGroupIndex)
-            .collect(Collectors.toList());
-    List<Stage> stagesToSchedule = new LinkedList<>();
-    boolean allStagesComplete = true;
-
-    // We need to reschedule failed_recoverable stages.
-    for (final Stage stageToCheck : currentScheduleGroup) {
-      final StageState.State stageState = 
jobStateManager.getStageState(stageToCheck.getId());
-      switch (stageState) {
-        case FAILED_RECOVERABLE:
-          stagesToSchedule.add(stageToCheck);
-          allStagesComplete = false;
-          break;
-        case READY:
-        case EXECUTING:
-          allStagesComplete = false;
-          break;
-        default:
-          break;
+    final List<Stage> currentScheduleGroup = reverseTopoStages
+        .stream()
+        .filter(stage -> stage.getScheduleGroupIndex() == 
referenceScheduleGroupIndex)
+        .collect(Collectors.toList());
+    final boolean allStagesOfThisGroupComplete = currentScheduleGroup
+        .stream()
+        .map(Stage::getId)
+        .map(jobStateManager::getStageState)
+        .allMatch(state -> state.equals(StageState.State.COMPLETE));
+
+    if (!allStagesOfThisGroupComplete) {
+      LOG.info("There are remaining stages in the current schedule group, {}", 
referenceScheduleGroupIndex);
+      final List<Stage> stagesToSchedule = currentScheduleGroup
+          .stream()
+          .filter(stage -> {
+            final StageState.State stageState = 
jobStateManager.getStageState(stage.getId());
+            return stageState.equals(StageState.State.FAILED_RECOVERABLE)
+                || stageState.equals(StageState.State.READY);
+          })
+          .collect(Collectors.toList());
+      return (stagesToSchedule.isEmpty())
+          ? Optional.empty()
+          : Optional.of(stagesToSchedule);
+    } else {
+      // By the time the control flow has reached here,
+      // we are ready to move onto the next ScheduleGroup
+      final List<Stage> stagesToSchedule = reverseTopoStages
+          .stream()
+          .filter(stage -> {
+            if (stage.getScheduleGroupIndex() == referenceScheduleGroupIndex + 
1) {
+              final String stageId = stage.getId();
+              return jobStateManager.getStageState(stageId) != 
StageState.State.EXECUTING
+                  && jobStateManager.getStageState(stageId) != 
StageState.State.COMPLETE;
+            }
+            return false;
+          })
+          .collect(Collectors.toList());
+
+      if (stagesToSchedule.isEmpty()) {
+        LOG.debug("ScheduleGroup {}: already executing/complete!, so we skip 
this", referenceScheduleGroupIndex + 1);
+        return Optional.empty();
       }
-    }
-    if (!allStagesComplete) {
-      LOG.info("There are remaining stages in the current schedule group, {}", 
currentScheduleGroupIndex);
-      return (stagesToSchedule.isEmpty()) ? Optional.empty() : 
Optional.of(stagesToSchedule);
-    }
 
-    // By the time the control flow has reached here,
-    // we are ready to move onto the next ScheduleGroup
-    stagesToSchedule =
-        physicalPlan.getStageDAG().getTopologicalSort().stream().filter(stage 
-> {
-          if (stage.getScheduleGroupIndex() == currentScheduleGroupIndex + 1) {
-            final String stageId = stage.getId();
-            return jobStateManager.getStageState(stageId) != 
StageState.State.EXECUTING
-                && jobStateManager.getStageState(stageId) != 
StageState.State.COMPLETE;
-          }
-          return false;
-        }).collect(Collectors.toList());
-
-    if (stagesToSchedule.isEmpty()) {
-      LOG.debug("ScheduleGroup {}: already executing/complete!, so we skip 
this", currentScheduleGroupIndex + 1);
-      return Optional.empty();
+      return Optional.of(stagesToSchedule);
     }
-
-    // Return the schedulable stage list in reverse-topological order
-    // since the stages that belong to the same schedule group are mutually 
independent,
-    // or connected by a "push" edge, requiring the children stages to be 
scheduled first.
-    Collections.reverse(stagesToSchedule);
-    return Optional.of(stagesToSchedule);
   }
 
   /**
-   * Schedules the given stage.
-   * It adds the list of tasks for the stage where the scheduler thread 
continuously polls from.
    * @param stageToSchedule the stage to schedule.
    */
-  private void scheduleStage(final Stage stageToSchedule) {
+  private List<Task> getSchedulableTasks(final Stage stageToSchedule) {
     final List<StageEdge> stageIncomingEdges =
         physicalPlan.getStageDAG().getIncomingEdgesOf(stageToSchedule.getId());
     final List<StageEdge> stageOutgoingEdges =
         physicalPlan.getStageDAG().getOutgoingEdgesOf(stageToSchedule.getId());
 
-    final StageState.State stageState = 
jobStateManager.getStageState(stageToSchedule.getId());
-
     final List<String> taskIdsToSchedule = new LinkedList<>();
     for (final String taskId : stageToSchedule.getTaskIds()) {
       // this happens when the belonging stage's other tasks have failed 
recoverable,
@@ -357,18 +342,9 @@ private void scheduleStage(final Stage stageToSchedule) {
         case EXECUTING:
           LOG.info("Skipping {} because its outputs are safe!", taskId);
           break;
-        case READY:
-          if (stageState == StageState.State.FAILED_RECOVERABLE) {
-            LOG.info("Skipping {} because it is already in the queue, but just 
hasn't been scheduled yet!",
-                taskId);
-          } else {
-            LOG.info("Scheduling {}", taskId);
-            taskIdsToSchedule.add(taskId);
-          }
-          break;
         case FAILED_RECOVERABLE:
-          LOG.info("Re-scheduling {} for failure recovery", taskId);
           jobStateManager.onTaskStateChanged(taskId, READY);
+        case READY:
           taskIdsToSchedule.add(taskId);
           break;
         case ON_HOLD:
@@ -384,13 +360,13 @@ private void scheduleStage(final Stage stageToSchedule) {
     // each readable and source task will be bounded in executor.
     final List<Map<String, Readable>> vertexIdToReadables = 
stageToSchedule.getVertexIdToReadables();
 
+    final List<Task> tasks = new ArrayList<>(taskIdsToSchedule.size());
     taskIdsToSchedule.forEach(taskId -> {
       blockManagerMaster.onProducerTaskScheduled(taskId);
       final int taskIdx = RuntimeIdGenerator.getIndexFromTaskId(taskId);
       final int attemptIdx = jobStateManager.getTaskAttempt(taskId);
 
-      LOG.debug("Enqueueing {}", taskId);
-      pendingTaskCollection.add(new Task(
+      tasks.add(new Task(
           physicalPlan.getId(),
           taskId,
           attemptIdx,
@@ -400,7 +376,7 @@ private void scheduleStage(final Stage stageToSchedule) {
           stageOutgoingEdges,
           vertexIdToReadables.get(taskIdx)));
     });
-    schedulerRunner.onATaskAvailable();
+    return tasks;
   }
 
   /**
@@ -457,7 +433,7 @@ private void onTaskExecutionComplete(final String 
executorId,
     if 
(jobStateManager.getStageState(stageIdForTaskUponCompletion).equals(StageState.State.COMPLETE))
 {
       // if the stage this task belongs to is complete,
       if (!jobStateManager.isJobDone()) {
-        scheduleNextStage(stageIdForTaskUponCompletion);
+        
scheduleNextScheduleGroup(getSchedulingIndexOfStage(stageIdForTaskUponCompletion));
       }
     }
     schedulerRunner.onAnExecutorAvailable();
@@ -526,7 +502,7 @@ private void onTaskExecutionFailedRecoverable(final String 
executorId,
         // TODO #50: Carefully retry tasks in the scheduler
       case OUTPUT_WRITE_FAILURE:
         blockManagerMaster.onProducerTaskFailed(taskId);
-        scheduleNextStage(stageId);
+        scheduleNextScheduleGroup(getSchedulingIndexOfStage(stageId));
         break;
       case CONTAINER_FAILURE:
         LOG.info("Only the failed task will be retried.");
@@ -536,4 +512,8 @@ private void onTaskExecutionFailedRecoverable(final String 
executorId,
     }
     schedulerRunner.onAnExecutorAvailable();
   }
+
+  private int getSchedulingIndexOfStage(final String stageId) {
+    return 
physicalPlan.getStageDAG().getVertexById(stageId).getScheduleGroupIndex();
+  }
 }
diff --git 
a/runtime/master/src/main/java/edu/snu/nemo/runtime/master/scheduler/PendingTaskCollection.java
 
b/runtime/master/src/main/java/edu/snu/nemo/runtime/master/scheduler/PendingTaskCollection.java
deleted file mode 100644
index a1fa3ee9..00000000
--- 
a/runtime/master/src/main/java/edu/snu/nemo/runtime/master/scheduler/PendingTaskCollection.java
+++ /dev/null
@@ -1,85 +0,0 @@
-/*
- * Copyright (C) 2018 Seoul National University
- *
- * Licensed under the Apache License, Version 2.0 (the "License");
- * you may not use this file except in compliance with the License.
- * You may obtain a copy of the License at
- *
- *         http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
- */
-package edu.snu.nemo.runtime.master.scheduler;
-
-import edu.snu.nemo.runtime.common.plan.PhysicalPlan;
-import edu.snu.nemo.runtime.common.plan.Task;
-
-import net.jcip.annotations.ThreadSafe;
-import org.apache.reef.annotations.audience.DriverSide;
-import org.apache.reef.tang.annotations.DefaultImplementation;
-
-import java.util.Collection;
-import java.util.NoSuchElementException;
-import java.util.Optional;
-
-/**
- * Keep tracks of all pending tasks.
- * {@link Scheduler} enqueues the Tasks to schedule to this queue.
- * {@link SchedulerRunner} refers to this queue when scheduling Tasks.
- */
-@ThreadSafe
-@DriverSide
-@DefaultImplementation(SingleJobTaskCollection.class)
-public interface PendingTaskCollection {
-
-  /**
-   * Adds a Task to this collection.
-   * @param task to add.
-   */
-  void add(final Task task);
-
-  /**
-   * Removes the specified Task to be scheduled.
-   * @param taskId id of the Task
-   * @return the specified Task
-   * @throws NoSuchElementException if the specified Task is not in the queue,
-   *                                or removing this Task breaks scheduling 
order
-   */
-  Task remove(final String taskId) throws NoSuchElementException;
-
-  /**
-   * Peeks stage that can be scheduled according to job dependency priority.
-   * Changes to the queue must not reflected to the returned collection to 
avoid concurrent modification.
-   * @return stage that can be scheduled, or {@link Optional#empty()} if the 
queue is empty
-   */
-  Optional<Collection<Task>> peekSchedulableStage();
-
-  /**
-   * Registers a job to this queue in case the queue needs to understand the 
topology of the job DAG.
-   * @param physicalPlanForJob the job to schedule.
-   */
-  void onJobScheduled(final PhysicalPlan physicalPlanForJob);
-
-  /**
-   * Removes a stage and its descendant stages from this queue.
-   * This is to be used for fault tolerance purposes,
-   * say when a stage fails and all affected Tasks must be removed.
-   * @param stageIdOfTasks for the stage to begin the removal recursively.
-   */
-  void removeTasksAndDescendants(final String stageIdOfTasks);
-
-  /**
-   * Checks whether there are schedulable Tasks in the queue or not.
-   * @return true if there are schedulable Tasks in the queue, false otherwise.
-   */
-  boolean isEmpty();
-
-  /**
-   * Closes and cleans up this queue.
-   */
-  void close();
-}
diff --git 
a/runtime/master/src/main/java/edu/snu/nemo/runtime/master/scheduler/PendingTaskCollectionPointer.java
 
b/runtime/master/src/main/java/edu/snu/nemo/runtime/master/scheduler/PendingTaskCollectionPointer.java
new file mode 100644
index 00000000..3fb08c69
--- /dev/null
+++ 
b/runtime/master/src/main/java/edu/snu/nemo/runtime/master/scheduler/PendingTaskCollectionPointer.java
@@ -0,0 +1,66 @@
+/*
+ * Copyright (C) 2018 Seoul National University
+ *
+ * Licensed under the Apache License, Version 2.0 (the "License");
+ * you may not use this file except in compliance with the License.
+ * You may obtain a copy of the License at
+ *
+ *         http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package edu.snu.nemo.runtime.master.scheduler;
+
+import edu.snu.nemo.runtime.common.plan.Task;
+
+import net.jcip.annotations.ThreadSafe;
+
+import javax.inject.Inject;
+import java.util.Collection;
+import java.util.Optional;
+
+/**
+ * Points to a collection of pending tasks eligible for scheduling.
+ * This pointer effectively points to a subset of a scheduling group.
+ * Within the collection, the tasks can be scheduled in any order.
+ */
+@ThreadSafe
+public final class PendingTaskCollectionPointer {
+  private Collection<Task> curTaskCollection;
+
+  @Inject
+  public PendingTaskCollectionPointer() {
+  }
+
+  /**
+   * This collection of tasks should take precedence over any previous 
collection of tasks.
+   * @param newCollection to schedule.
+   */
+  synchronized void setToOverwrite(final Collection<Task> newCollection) {
+    this.curTaskCollection = newCollection;
+  }
+
+  /**
+   * This collection of tasks can be scheduled only if there's no collection 
of tasks to schedule at the moment.
+   * @param newCollection to schedule
+   */
+  synchronized void setIfNull(final Collection<Task> newCollection) {
+    if (this.curTaskCollection == null) {
+      this.curTaskCollection = newCollection;
+    }
+  }
+
+  /**
+   * Take the whole collection of tasks to schedule, and set the pointer to 
null.
+   * @return optional tasks to schedule
+   */
+  synchronized Optional<Collection<Task>> getAndSetNull() {
+    final Collection<Task> cur = curTaskCollection;
+    curTaskCollection = null;
+    return Optional.ofNullable(cur);
+  }
+}
diff --git 
a/runtime/master/src/main/java/edu/snu/nemo/runtime/master/scheduler/Scheduler.java
 
b/runtime/master/src/main/java/edu/snu/nemo/runtime/master/scheduler/Scheduler.java
index ebf79371..12114a75 100644
--- 
a/runtime/master/src/main/java/edu/snu/nemo/runtime/master/scheduler/Scheduler.java
+++ 
b/runtime/master/src/main/java/edu/snu/nemo/runtime/master/scheduler/Scheduler.java
@@ -27,7 +27,7 @@
 
 /**
  * Only two threads call scheduling code: RuntimeMaster thread (RMT), and 
SchedulerThread(ST).
- * RMT and ST meet only at two points: {@link ExecutorRegistry}, and {@link 
PendingTaskCollection},
+ * RMT and ST meet only at two points: {@link ExecutorRegistry}, and {@link 
PendingTaskCollectionPointer},
  * which are synchronized(ThreadSafe).
  * Other scheduler-related classes that are accessed by only one of the two 
threads are not synchronized(NotThreadSafe).
  */
diff --git 
a/runtime/master/src/main/java/edu/snu/nemo/runtime/master/scheduler/SchedulerRunner.java
 
b/runtime/master/src/main/java/edu/snu/nemo/runtime/master/scheduler/SchedulerRunner.java
index 243ef075..62af0408 100644
--- 
a/runtime/master/src/main/java/edu/snu/nemo/runtime/master/scheduler/SchedulerRunner.java
+++ 
b/runtime/master/src/main/java/edu/snu/nemo/runtime/master/scheduler/SchedulerRunner.java
@@ -25,8 +25,6 @@
 import java.util.*;
 import java.util.concurrent.ExecutorService;
 import java.util.concurrent.Executors;
-import java.util.concurrent.atomic.AtomicBoolean;
-import java.util.concurrent.atomic.AtomicInteger;
 import java.util.concurrent.locks.Condition;
 import java.util.concurrent.locks.Lock;
 import java.util.concurrent.locks.ReentrantLock;
@@ -38,91 +36,80 @@
 import javax.inject.Inject;
 
 /**
- * Takes a Task from the pending queue and schedules it to an executor.
+ * Schedules tasks in discrete batches (scheduling iterations).
+ * A scheduling iteration occurs under one of the following conditions
+ * - An executor slot becomes available (for reasons such as task 
completion/failure, or executor addition)
+ * - A new list of tasks become available (for reasons such as stage 
completion, task failure, or executor removal)
  */
 @DriverSide
 @NotThreadSafe
 public final class SchedulerRunner {
   private static final Logger LOG = 
LoggerFactory.getLogger(SchedulerRunner.class.getName());
   private final Map<String, JobStateManager> jobStateManagers;
-  private final PendingTaskCollection pendingTaskCollection;
+  private final PendingTaskCollectionPointer pendingTaskCollectionPointer;
   private final ExecutorService schedulerThread;
-  private AtomicBoolean isSchedulerRunning;
+  private boolean isSchedulerRunning;
   private boolean isTerminated;
 
-  // (available executor AND available task to schedule) OR the scheduler has 
terminated
-  private final DelayedSignalingCondition canScheduleOrTerminated = new 
DelayedSignalingCondition();
+  private final DelayedSignalingCondition schedulingIteration = new 
DelayedSignalingCondition();
   private ExecutorRegistry executorRegistry;
   private SchedulingPolicy schedulingPolicy;
 
   @VisibleForTesting
   @Inject
   public SchedulerRunner(final SchedulingPolicy schedulingPolicy,
-                         final PendingTaskCollection pendingTaskCollection,
+                         final PendingTaskCollectionPointer 
pendingTaskCollectionPointer,
                          final ExecutorRegistry executorRegistry) {
     this.jobStateManagers = new HashMap<>();
-    this.pendingTaskCollection = pendingTaskCollection;
+    this.pendingTaskCollectionPointer = pendingTaskCollectionPointer;
     this.schedulerThread = Executors.newSingleThreadExecutor(runnable -> new 
Thread(runnable, "SchedulerRunner"));
-    this.isSchedulerRunning = new AtomicBoolean(false);
+    this.isSchedulerRunning = false;
     this.isTerminated = false;
     this.executorRegistry = executorRegistry;
     this.schedulingPolicy = schedulingPolicy;
   }
 
   /**
-   * Signals to the condition on executor availability.
-   */
-  public void onAnExecutorAvailable() {
-    canScheduleOrTerminated.signal();
-  }
-
-  /**
-   * Signals to the condition on Task availability.
-   */
-  public void onATaskAvailable() {
-    canScheduleOrTerminated.signal();
-  }
-
-  /**
-   * Run the scheduler thread.
+   * A separate thread is run to schedule tasks to executors.
+   * See comments in the {@link Scheduler} for avoiding race conditions.
    */
-  void runSchedulerThread() {
-    if (!isTerminated) {
-      if (!isSchedulerRunning.getAndSet(true)) {
-        schedulerThread.execute(new SchedulerThread());
-        schedulerThread.shutdown();
+  private final class SchedulerThread implements Runnable {
+    @Override
+    public void run() {
+      while (!isTerminated) {
+        doScheduleTaskList();
+        schedulingIteration.await();
       }
+      jobStateManagers.values().forEach(jobStateManager -> {
+        if (jobStateManager.isJobDone()) {
+          LOG.info("{} is complete.", jobStateManager.getJobId());
+        } else {
+          LOG.info("{} is incomplete.", jobStateManager.getJobId());
+        }
+      });
+      LOG.info("SchedulerRunner Terminated!");
     }
   }
 
-  /**
-   * Begin scheduling a job.
-   * @param jobStateManager the corresponding {@link JobStateManager}
-   */
-  void scheduleJob(final JobStateManager jobStateManager) {
-    if (!isTerminated) {
-      jobStateManagers.put(jobStateManager.getJobId(), jobStateManager);
-    } // else ignore new incoming jobs when terminated.
-  }
-
-  void terminate() {
-    isTerminated = true;
-    canScheduleOrTerminated.signal();
-  }
-
-  void doScheduleStage() {
-    final Collection<Task> stageToSchedule = 
pendingTaskCollection.peekSchedulableStage().orElse(null);
-    if (stageToSchedule == null) {
-      // Task queue is empty
-      LOG.debug("PendingTaskCollection is empty. Awaiting for more Tasks...");
+  void doScheduleTaskList() {
+    final Optional<Collection<Task>> taskListOptional = 
pendingTaskCollectionPointer.getAndSetNull();
+    if (!taskListOptional.isPresent()) {
+      // Task list is empty
+      LOG.debug("PendingTaskCollectionPointer is empty. Awaiting for more 
Tasks...");
       return;
     }
 
-    final AtomicInteger numScheduledTasks = new AtomicInteger(0); // to be 
incremented in lambda
-    for (final Task task : stageToSchedule) {
+    final Collection<Task> taskList = taskListOptional.get();
+    final List<Task> couldNotSchedule = new ArrayList<>();
+    for (final Task task : taskList) {
       final JobStateManager jobStateManager = 
jobStateManagers.get(task.getJobId());
-      LOG.debug("Trying to schedule {}...", task.getTaskId());
+      if 
(!jobStateManager.getTaskState(task.getTaskId()).equals(TaskState.State.READY)) 
{
+        // Guard against race conditions causing duplicate task launches
+        LOG.debug("Skipping {} as it is not READY", task.getTaskId());
+        continue;
+      }
 
+      LOG.debug("Trying to schedule {}...", task.getTaskId());
       executorRegistry.viewExecutors(executors -> {
         final Set<ExecutorRepresenter> candidateExecutors =
             schedulingPolicy.filterExecutorRepresenters(executors, task);
@@ -131,59 +118,68 @@ void doScheduleStage() {
         if (firstCandidate.isPresent()) {
           // update metadata first
           jobStateManager.onTaskStateChanged(task.getTaskId(), 
TaskState.State.EXECUTING);
-          pendingTaskCollection.remove(task.getTaskId());
-          numScheduledTasks.incrementAndGet();
 
           // send the task
           final ExecutorRepresenter selectedExecutor = firstCandidate.get();
           selectedExecutor.onTaskScheduled(task);
-          LOG.debug("Successfully scheduled {}", task.getTaskId());
         } else {
-          LOG.debug("Failed to schedule {}", task.getTaskId());
+          couldNotSchedule.add(task);
         }
       });
     }
 
-    LOG.debug("Examined {} Tasks, scheduled {} Tasks", stageToSchedule.size(), 
numScheduledTasks);
-    if (stageToSchedule.size() == numScheduledTasks.get()) {
-      // Scheduled all tasks in the stage
-      // Immediately run next iteration to check whether there is another 
stage that can be scheduled
-      LOG.debug("Trying to schedule next Stage in the ScheduleGroup (if 
any)...");
-      canScheduleOrTerminated.signal();
+    LOG.debug("All except {} were scheduled among {}", new 
Object[]{couldNotSchedule, taskList});
+    if (couldNotSchedule.size() > 0) {
+      // Try these again, if no new task list has been set
+      pendingTaskCollectionPointer.setIfNull(couldNotSchedule);
     }
   }
 
   /**
-   * A separate thread is run to schedule tasks to executors.
-   * See comments in the {@link Scheduler} for avoiding race conditions.
+   * Signals to the condition on executor availability.
    */
-  private final class SchedulerThread implements Runnable {
-    @Override
-    public void run() {
-      // Run the first iteration unconditionally
-      canScheduleOrTerminated.signal();
+  void onAnExecutorAvailable() {
+    schedulingIteration.signal();
+  }
 
-      while (!isTerminated) {
-        // Iteration guard
-        canScheduleOrTerminated.await();
-        doScheduleStage();
-      }
-      jobStateManagers.values().forEach(jobStateManager -> {
-        if (jobStateManager.isJobDone()) {
-          LOG.info("{} is complete.", jobStateManager.getJobId());
-        } else {
-          LOG.info("{} is incomplete.", jobStateManager.getJobId());
-        }
-      });
-      LOG.info("SchedulerRunner Terminated!");
+  /**
+   * Signals to the condition on the Task collection availability.
+   */
+  void onNewPendingTaskCollectionAvailable() {
+    schedulingIteration.signal();
+  }
+
+  /**
+   * Run the scheduler thread.
+   */
+  void runSchedulerThread() {
+    if (!isTerminated && !isSchedulerRunning) {
+      schedulerThread.execute(new SchedulerThread());
+      schedulerThread.shutdown();
+      isSchedulerRunning = true;
     }
   }
 
+  /**
+   * Begin scheduling a job.
+   * @param jobStateManager the corresponding {@link JobStateManager}
+   */
+  void scheduleJob(final JobStateManager jobStateManager) {
+    if (!isTerminated) {
+      jobStateManagers.put(jobStateManager.getJobId(), jobStateManager);
+    } // else ignore new incoming jobs when terminated.
+  }
+
+  void terminate() {
+    isTerminated = true;
+    schedulingIteration.signal();
+  }
+
   /**
    * A {@link Condition} that allows 'delayed' signaling.
    */
   private final class DelayedSignalingCondition {
-    private final AtomicBoolean hasDelayedSignal = new AtomicBoolean(false);
+    private boolean hasDelayedSignal = false;
     private final Lock lock = new ReentrantLock();
     private final Condition condition = lock.newCondition();
 
@@ -191,10 +187,10 @@ public void run() {
      * Signals to this condition. If no thread is awaiting for this condition,
      * signaling is delayed until the first next {@link #await} invocation.
      */
-    public void signal() {
+    void signal() {
       lock.lock();
       try {
-        hasDelayedSignal.set(true);
+        hasDelayedSignal = true;
         condition.signal();
       } finally {
         lock.unlock();
@@ -205,13 +201,13 @@ public void signal() {
      * Awaits to this condition. The thread will awake when there is a delayed 
signal,
      * or the next first {@link #signal} invocation.
      */
-    public void await() {
+    void await() {
       lock.lock();
       try {
-        if (!hasDelayedSignal.get()) {
+        if (!hasDelayedSignal) {
           condition.await();
         }
-        hasDelayedSignal.set(false);
+        hasDelayedSignal = false;
       } catch (final InterruptedException e) {
         Thread.currentThread().interrupt();
         throw new RuntimeException(e);
diff --git 
a/runtime/master/src/main/java/edu/snu/nemo/runtime/master/scheduler/SingleJobTaskCollection.java
 
b/runtime/master/src/main/java/edu/snu/nemo/runtime/master/scheduler/SingleJobTaskCollection.java
deleted file mode 100644
index ddcfbf55..00000000
--- 
a/runtime/master/src/main/java/edu/snu/nemo/runtime/master/scheduler/SingleJobTaskCollection.java
+++ /dev/null
@@ -1,209 +0,0 @@
-/*
- * Copyright (C) 2018 Seoul National University
- *
- * Licensed under the Apache License, Version 2.0 (the "License");
- * you may not use this file except in compliance with the License.
- * You may obtain a copy of the License at
- *
- *         http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
- */
-package edu.snu.nemo.runtime.master.scheduler;
-
-import edu.snu.nemo.common.dag.DAG;
-import edu.snu.nemo.runtime.common.RuntimeIdGenerator;
-import edu.snu.nemo.runtime.common.plan.PhysicalPlan;
-import edu.snu.nemo.runtime.common.plan.Stage;
-import edu.snu.nemo.runtime.common.plan.StageEdge;
-import edu.snu.nemo.runtime.common.plan.Task;
-import net.jcip.annotations.ThreadSafe;
-import org.apache.reef.annotations.audience.DriverSide;
-
-import javax.inject.Inject;
-import java.util.*;
-import java.util.concurrent.*;
-
-/**
- * {@link PendingTaskCollection} implementation.
- * This class provides two-level scheduling by keeping track of schedulable 
stages and stage-Task membership.
- * {@link #peekSchedulableStage()} returns collection of Tasks which belong to 
one of the schedulable stages.
- */
-@ThreadSafe
-@DriverSide
-public final class SingleJobTaskCollection implements PendingTaskCollection {
-  private PhysicalPlan physicalPlan;
-
-  /**
-   * Pending Tasks awaiting to be scheduled for each stage.
-   */
-  private final ConcurrentMap<String, Map<String, Task>> stageIdToPendingTasks;
-
-  /**
-   * Stages with Tasks that have not yet been scheduled.
-   */
-  private final BlockingDeque<String> schedulableStages;
-
-  @Inject
-  public SingleJobTaskCollection() {
-    stageIdToPendingTasks = new ConcurrentHashMap<>();
-    schedulableStages = new LinkedBlockingDeque<>();
-  }
-
-  @Override
-  public synchronized void add(final Task task) {
-    final String stageId = 
RuntimeIdGenerator.getStageIdFromTaskId(task.getTaskId());
-
-    stageIdToPendingTasks.compute(stageId, (s, taskIdToTask) -> {
-      if (taskIdToTask == null) {
-        final Map<String, Task> taskIdToTaskMap = new HashMap<>();
-        taskIdToTaskMap.put(task.getTaskId(), task);
-        updateSchedulableStages(stageId);
-        return taskIdToTaskMap;
-      } else {
-        taskIdToTask.put(task.getTaskId(), task);
-        return taskIdToTask;
-      }
-    });
-  }
-
-  /**
-   * Removes the specified Task to be scheduled.
-   * The specified Task should belong to the collection from {@link 
#peekSchedulableStage()}.
-   * @param taskId id of the Task
-   * @return the specified Task
-   * @throws NoSuchElementException if the specified Task is not in the queue,
-   *                                or removing this Task breaks scheduling 
order
-   *                                (i.e. does not belong to the collection 
from {@link #peekSchedulableStage()}.
-   */
-  @Override
-  public synchronized Task remove(final String taskId) throws 
NoSuchElementException {
-    final String stageId = schedulableStages.peekFirst();
-    if (stageId == null) {
-      throw new NoSuchElementException("No schedulable stage in Task queue");
-    }
-
-    final Map<String, Task> pendingTasksForStage = 
stageIdToPendingTasks.get(stageId);
-
-    if (pendingTasksForStage == null) {
-      throw new RuntimeException(String.format("Stage %s not found in Task 
queue", stageId));
-    }
-    final Task taskToSchedule = pendingTasksForStage.remove(taskId);
-    if (taskToSchedule == null) {
-      throw new NoSuchElementException(String.format("Task %s not found in 
Task queue", taskId));
-    }
-    if (pendingTasksForStage.isEmpty()) {
-      if (!schedulableStages.pollFirst().equals(stageId)) {
-        throw new RuntimeException(String.format("Expected stage %s to be 
polled", stageId));
-      }
-      stageIdToPendingTasks.remove(stageId);
-      stageIdToPendingTasks.forEach((scheduledStageId, tasks) ->
-          updateSchedulableStages(scheduledStageId));
-    }
-
-    return taskToSchedule;
-  }
-
-  /**
-   * Peeks Tasks that can be scheduled.
-   * @return Tasks to be scheduled, or {@link Optional#empty()} if the queue 
is empty
-   * @return collection of Tasks which belong to one of the schedulable stages
-   *         or {@link Optional#empty} if the queue is empty
-   */
-  @Override
-  public synchronized Optional<Collection<Task>> peekSchedulableStage() {
-    final String stageId = schedulableStages.peekFirst();
-    if (stageId == null) {
-      return Optional.empty();
-    }
-
-    final Map<String, Task> pendingTasksForStage = 
stageIdToPendingTasks.get(stageId);
-    if (pendingTasksForStage == null) {
-      throw new RuntimeException(String.format("Stage %s not found in 
stageIdToPendingTasks map", stageId));
-    }
-    return Optional.of(new ArrayList<>(pendingTasksForStage.values()));
-  }
-
-  /**
-   * Removes a stage and its descendant stages from this PQ.
-   * @param stageId for the stage to begin the removal recursively.
-   */
-  @Override
-  public synchronized void removeTasksAndDescendants(final String stageId) {
-    removeStageAndChildren(stageId);
-  }
-
-  /**
-   * Recursively removes a stage and its children stages from this PQ.
-   * @param stageId for the stage to begin the removal recursively.
-   */
-  private synchronized void removeStageAndChildren(final String stageId) {
-    if (schedulableStages.remove(stageId)) {
-      stageIdToPendingTasks.remove(stageId);
-    }
-
-    physicalPlan.getStageDAG().getChildren(stageId).forEach(
-        stage -> removeStageAndChildren(stage.getId()));
-  }
-
-  /**
-   * Updates the two-level data structure by examining a new candidate stage.
-   * If there are no stages with higher priority, the candidate can be made 
schedulable.
-   *
-   * NOTE: This method provides the "line up" between stages, by assigning 
priorities,
-   * serving as the key to the "priority" implementation of this class.
-   * @param candidateStageId for the stage that can potentially be scheduled.
-   */
-  private synchronized void updateSchedulableStages(final String 
candidateStageId) {
-    final DAG<Stage, StageEdge> jobDAG = physicalPlan.getStageDAG();
-
-    if (isSchedulable(candidateStageId)) {
-      // Check for ancestor stages that became schedulable due to 
candidateStage's absence from the queue.
-      jobDAG.getAncestors(candidateStageId).forEach(ancestorStage -> {
-        if (schedulableStages.contains(ancestorStage.getId())) {
-          if (!schedulableStages.remove(ancestorStage.getId())) {
-            throw new RuntimeException(String.format("No such stage: %s", 
ancestorStage.getId()));
-          }
-        }
-      });
-      if (!schedulableStages.contains(candidateStageId)) {
-        schedulableStages.addLast(candidateStageId);
-      }
-    }
-  }
-
-  /**
-   * Determines whether the given candidate stage is schedulable immediately 
or not.
-   * @param candidateStageId for the stage that can potentially be scheduled.
-   * @return true if schedulable, false otherwise.
-   */
-  private synchronized boolean isSchedulable(final String candidateStageId) {
-    final DAG<Stage, StageEdge> jobDAG = physicalPlan.getStageDAG();
-    for (final Stage descendantStage : 
jobDAG.getDescendants(candidateStageId)) {
-      if (schedulableStages.contains(descendantStage.getId())) {
-        return false;
-      }
-    }
-    return true;
-  }
-
-  @Override
-  public synchronized void onJobScheduled(final PhysicalPlan 
physicalPlanForJob) {
-    this.physicalPlan = physicalPlanForJob;
-  }
-
-  @Override
-  public synchronized boolean isEmpty() {
-    return schedulableStages.isEmpty();
-  }
-
-  @Override
-  public synchronized void close() {
-    schedulableStages.clear();
-    stageIdToPendingTasks.clear();
-  }
-}
diff --git 
a/runtime/master/src/test/java/edu/snu/nemo/runtime/master/scheduler/BatchSingleJobSchedulerTest.java
 
b/runtime/master/src/test/java/edu/snu/nemo/runtime/master/scheduler/BatchSingleJobSchedulerTest.java
index f0f5d191..ba8a2f5e 100644
--- 
a/runtime/master/src/test/java/edu/snu/nemo/runtime/master/scheduler/BatchSingleJobSchedulerTest.java
+++ 
b/runtime/master/src/test/java/edu/snu/nemo/runtime/master/scheduler/BatchSingleJobSchedulerTest.java
@@ -67,7 +67,7 @@
   private SchedulerRunner schedulerRunner;
   private ExecutorRegistry executorRegistry;
   private MetricMessageHandler metricMessageHandler;
-  private PendingTaskCollection pendingTaskCollection;
+  private PendingTaskCollectionPointer pendingTaskCollectionPointer;
   private PubSubEventHandlerWrapper pubSubEventHandler;
   private UpdatePhysicalPlanEventHandler updatePhysicalPlanEventHandler;
   private BlockManagerMaster blockManagerMaster = 
mock(BlockManagerMaster.class);
@@ -85,13 +85,13 @@ public void setUp() throws Exception {
 
     executorRegistry = injector.getInstance(ExecutorRegistry.class);
     metricMessageHandler = mock(MetricMessageHandler.class);
-    pendingTaskCollection = new SingleJobTaskCollection();
+    pendingTaskCollectionPointer = new PendingTaskCollectionPointer();
     schedulingPolicy = injector.getInstance(CompositeSchedulingPolicy.class);
-    schedulerRunner = new SchedulerRunner(schedulingPolicy, 
pendingTaskCollection, executorRegistry);
+    schedulerRunner = new SchedulerRunner(schedulingPolicy, 
pendingTaskCollectionPointer, executorRegistry);
     pubSubEventHandler = mock(PubSubEventHandlerWrapper.class);
     updatePhysicalPlanEventHandler = 
mock(UpdatePhysicalPlanEventHandler.class);
     scheduler =
-        new BatchSingleJobScheduler(schedulerRunner, pendingTaskCollection,
+        new BatchSingleJobScheduler(schedulerRunner, 
pendingTaskCollectionPointer,
             blockManagerMaster, pubSubEventHandler, 
updatePhysicalPlanEventHandler, executorRegistry);
 
     final ActiveContext activeContext = mock(ActiveContext.class);
diff --git 
a/runtime/master/src/test/java/edu/snu/nemo/runtime/master/scheduler/FaultToleranceTest.java
 
b/runtime/master/src/test/java/edu/snu/nemo/runtime/master/scheduler/FaultToleranceTest.java
index d2ec9629..0e57dc5b 100644
--- 
a/runtime/master/src/test/java/edu/snu/nemo/runtime/master/scheduler/FaultToleranceTest.java
+++ 
b/runtime/master/src/test/java/edu/snu/nemo/runtime/master/scheduler/FaultToleranceTest.java
@@ -66,7 +66,7 @@
   private ExecutorRegistry executorRegistry;
 
   private MetricMessageHandler metricMessageHandler;
-  private PendingTaskCollection pendingTaskCollection;
+  private PendingTaskCollectionPointer pendingTaskCollectionPointer;
   private PubSubEventHandlerWrapper pubSubEventHandler;
   private UpdatePhysicalPlanEventHandler updatePhysicalPlanEventHandler;
   private BlockManagerMaster blockManagerMaster = 
mock(BlockManagerMaster.class);
@@ -87,15 +87,15 @@ private Scheduler setUpScheduler(final boolean 
useMockSchedulerRunner) throws In
     final Injector injector = Tang.Factory.getTang().newInjector();
     executorRegistry = injector.getInstance(ExecutorRegistry.class);
 
-    pendingTaskCollection = new SingleJobTaskCollection();
+    pendingTaskCollectionPointer = new PendingTaskCollectionPointer();
     schedulingPolicy = injector.getInstance(CompositeSchedulingPolicy.class);
 
     if (useMockSchedulerRunner) {
       schedulerRunner = mock(SchedulerRunner.class);
     } else {
-      schedulerRunner = new SchedulerRunner(schedulingPolicy, 
pendingTaskCollection, executorRegistry);
+      schedulerRunner = new SchedulerRunner(schedulingPolicy, 
pendingTaskCollectionPointer, executorRegistry);
     }
-    return new BatchSingleJobScheduler(schedulerRunner, pendingTaskCollection, 
blockManagerMaster,
+    return new BatchSingleJobScheduler(schedulerRunner, 
pendingTaskCollectionPointer, blockManagerMaster,
         pubSubEventHandler, updatePhysicalPlanEventHandler, executorRegistry);
   }
 
@@ -137,23 +137,22 @@ public void testContainerRemoval() throws Exception {
       if (stage.getScheduleGroupIndex() == 0 || stage.getScheduleGroupIndex() 
== 1) {
 
         // There are 3 executors, each of capacity 2, and there are 6 Tasks in 
ScheduleGroup 0 and 1.
-        
SchedulerTestUtil.mockSchedulingBySchedulerRunner(pendingTaskCollection, 
schedulingPolicy, jobStateManager,
+        
SchedulerTestUtil.mockSchedulingBySchedulerRunner(pendingTaskCollectionPointer, 
schedulingPolicy, jobStateManager,
             executorRegistry, false);
-        assertTrue(pendingTaskCollection.isEmpty());
         stage.getTaskIds().forEach(taskId ->
             SchedulerTestUtil.sendTaskStateEventToScheduler(scheduler, 
executorRegistry,
                 taskId, TaskState.State.COMPLETE, 1));
       } else if (stage.getScheduleGroupIndex() == 2) {
         scheduler.onExecutorRemoved("a3");
         // There are 2 executors, each of capacity 2, and there are 2 Tasks in 
ScheduleGroup 2.
-        
SchedulerTestUtil.mockSchedulingBySchedulerRunner(pendingTaskCollection, 
schedulingPolicy, jobStateManager,
+        
SchedulerTestUtil.mockSchedulingBySchedulerRunner(pendingTaskCollectionPointer, 
schedulingPolicy, jobStateManager,
             executorRegistry, false);
 
         // Due to round robin scheduling, "a2" is assured to have a running 
Task.
         scheduler.onExecutorRemoved("a2");
 
         // Re-schedule
-        
SchedulerTestUtil.mockSchedulingBySchedulerRunner(pendingTaskCollection, 
schedulingPolicy, jobStateManager,
+        
SchedulerTestUtil.mockSchedulingBySchedulerRunner(pendingTaskCollectionPointer, 
schedulingPolicy, jobStateManager,
             executorRegistry, false);
 
         final Optional<Integer> maxTaskAttempt = stage.getTaskIds().stream()
@@ -161,16 +160,15 @@ public void testContainerRemoval() throws Exception {
         assertTrue(maxTaskAttempt.isPresent());
         assertEquals(2, (int) maxTaskAttempt.get());
 
-        
SchedulerTestUtil.mockSchedulingBySchedulerRunner(pendingTaskCollection, 
schedulingPolicy, jobStateManager,
+        
SchedulerTestUtil.mockSchedulingBySchedulerRunner(pendingTaskCollectionPointer, 
schedulingPolicy, jobStateManager,
             executorRegistry, false);
-        assertTrue(pendingTaskCollection.isEmpty());
         stage.getTaskIds().forEach(taskId ->
             SchedulerTestUtil.sendTaskStateEventToScheduler(scheduler, 
executorRegistry,
                 taskId, TaskState.State.COMPLETE, 1));
       } else if (stage.getScheduleGroupIndex() == 3) {
         // There are 1 executors, each of capacity 2, and there are 2 Tasks in 
ScheduleGroup 3.
         // Schedule only the first Task
-        
SchedulerTestUtil.mockSchedulingBySchedulerRunner(pendingTaskCollection, 
schedulingPolicy, jobStateManager,
+        
SchedulerTestUtil.mockSchedulingBySchedulerRunner(pendingTaskCollectionPointer, 
schedulingPolicy, jobStateManager,
             executorRegistry, true);
       } else {
         throw new RuntimeException(String.format("Unexpected 
ScheduleGroupIndex: %d",
@@ -215,24 +213,22 @@ public void testOutputFailure() throws Exception {
       if (stage.getScheduleGroupIndex() == 0 || stage.getScheduleGroupIndex() 
== 1) {
 
         // There are 3 executors, each of capacity 2, and there are 6 Tasks in 
ScheduleGroup 0 and 1.
-        
SchedulerTestUtil.mockSchedulingBySchedulerRunner(pendingTaskCollection, 
schedulingPolicy, jobStateManager,
+        
SchedulerTestUtil.mockSchedulingBySchedulerRunner(pendingTaskCollectionPointer, 
schedulingPolicy, jobStateManager,
             executorRegistry, false);
-        assertTrue(pendingTaskCollection.isEmpty());
         stage.getTaskIds().forEach(taskId ->
             SchedulerTestUtil.sendTaskStateEventToScheduler(scheduler, 
executorRegistry,
                 taskId, TaskState.State.COMPLETE, 1));
       } else if (stage.getScheduleGroupIndex() == 2) {
         // There are 3 executors, each of capacity 2, and there are 2 Tasks in 
ScheduleGroup 2.
-        
SchedulerTestUtil.mockSchedulingBySchedulerRunner(pendingTaskCollection, 
schedulingPolicy, jobStateManager,
+        
SchedulerTestUtil.mockSchedulingBySchedulerRunner(pendingTaskCollectionPointer, 
schedulingPolicy, jobStateManager,
             executorRegistry, false);
-        assertTrue(pendingTaskCollection.isEmpty());
         stage.getTaskIds().forEach(taskId ->
             SchedulerTestUtil.sendTaskStateEventToScheduler(scheduler, 
executorRegistry,
                 taskId, TaskState.State.FAILED_RECOVERABLE, 1,
                 TaskState.RecoverableFailureCause.OUTPUT_WRITE_FAILURE));
 
         // Re-schedule
-        
SchedulerTestUtil.mockSchedulingBySchedulerRunner(pendingTaskCollection, 
schedulingPolicy, jobStateManager,
+        
SchedulerTestUtil.mockSchedulingBySchedulerRunner(pendingTaskCollectionPointer, 
schedulingPolicy, jobStateManager,
             executorRegistry, false);
 
         final Optional<Integer> maxTaskAttempt = stage.getTaskIds().stream()
@@ -240,7 +236,6 @@ public void testOutputFailure() throws Exception {
         assertTrue(maxTaskAttempt.isPresent());
         assertEquals(2, (int) maxTaskAttempt.get());
 
-        assertTrue(pendingTaskCollection.isEmpty());
         stage.getTaskIds().forEach(taskId ->
             assertEquals(TaskState.State.EXECUTING, 
jobStateManager.getTaskState(taskId)));
       }
@@ -283,15 +278,14 @@ public void testInputReadFailure() throws Exception {
       if (stage.getScheduleGroupIndex() == 0 || stage.getScheduleGroupIndex() 
== 1) {
 
         // There are 3 executors, each of capacity 2, and there are 6 Tasks in 
ScheduleGroup 0 and 1.
-        
SchedulerTestUtil.mockSchedulingBySchedulerRunner(pendingTaskCollection, 
schedulingPolicy, jobStateManager,
+        
SchedulerTestUtil.mockSchedulingBySchedulerRunner(pendingTaskCollectionPointer, 
schedulingPolicy, jobStateManager,
             executorRegistry, false);
-        assertTrue(pendingTaskCollection.isEmpty());
         stage.getTaskIds().forEach(taskId ->
             SchedulerTestUtil.sendTaskStateEventToScheduler(scheduler, 
executorRegistry,
                 taskId, TaskState.State.COMPLETE, 1));
       } else if (stage.getScheduleGroupIndex() == 2) {
         // There are 3 executors, each of capacity 2, and there are 2 Tasks in 
ScheduleGroup 2.
-        
SchedulerTestUtil.mockSchedulingBySchedulerRunner(pendingTaskCollection, 
schedulingPolicy, jobStateManager,
+        
SchedulerTestUtil.mockSchedulingBySchedulerRunner(pendingTaskCollectionPointer, 
schedulingPolicy, jobStateManager,
             executorRegistry, false);
 
         stage.getTaskIds().forEach(taskId ->
@@ -300,7 +294,7 @@ public void testInputReadFailure() throws Exception {
                 TaskState.RecoverableFailureCause.INPUT_READ_FAILURE));
 
         // Re-schedule
-        
SchedulerTestUtil.mockSchedulingBySchedulerRunner(pendingTaskCollection, 
schedulingPolicy, jobStateManager,
+        
SchedulerTestUtil.mockSchedulingBySchedulerRunner(pendingTaskCollectionPointer, 
schedulingPolicy, jobStateManager,
             executorRegistry, false);
 
         final Optional<Integer> maxTaskAttempt = stage.getTaskIds().stream()
diff --git 
a/runtime/master/src/test/java/edu/snu/nemo/runtime/master/scheduler/PendingTaskCollectionPointerTest.java
 
b/runtime/master/src/test/java/edu/snu/nemo/runtime/master/scheduler/PendingTaskCollectionPointerTest.java
new file mode 100644
index 00000000..46cf822f
--- /dev/null
+++ 
b/runtime/master/src/test/java/edu/snu/nemo/runtime/master/scheduler/PendingTaskCollectionPointerTest.java
@@ -0,0 +1,78 @@
+/*
+ * Copyright (C) 2018 Seoul National University
+ *
+ * Licensed under the Apache License, Version 2.0 (the "License");
+ * you may not use this file except in compliance with the License.
+ * You may obtain a copy of the License at
+ *
+ *         http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package edu.snu.nemo.runtime.master.scheduler;
+
+import edu.snu.nemo.runtime.common.plan.Task;
+import org.junit.Before;
+import org.junit.Test;
+import org.junit.runner.RunWith;
+import org.powermock.core.classloader.annotations.PrepareForTest;
+import org.powermock.modules.junit4.PowerMockRunner;
+
+import java.util.Arrays;
+import java.util.Collection;
+import java.util.List;
+import java.util.Optional;
+
+import static org.junit.Assert.assertEquals;
+import static org.junit.Assert.assertFalse;
+import static org.junit.Assert.assertTrue;
+import static org.mockito.Mockito.mock;
+
+/**
+ * Tests {@link PendingTaskCollectionPointer}
+ */
+@RunWith(PowerMockRunner.class)
+@PrepareForTest({Task.class})
+public final class PendingTaskCollectionPointerTest {
+  private PendingTaskCollectionPointer pendingTaskCollectionPointer;
+
+  private List<Task> mockTaskList() {
+    final Task task = mock(Task.class);
+    return Arrays.asList(task);
+  }
+
+  @Before
+  public void setUp() {
+    this.pendingTaskCollectionPointer = new PendingTaskCollectionPointer();
+  }
+
+  @Test
+  public void nullByDefault() {
+    assertFalse(pendingTaskCollectionPointer.getAndSetNull().isPresent());
+  }
+
+  @Test
+  public void setIfNull() {
+    final List<Task> taskList = mockTaskList();
+    pendingTaskCollectionPointer.setIfNull(taskList);
+    final Optional<Collection<Task>> optional = 
pendingTaskCollectionPointer.getAndSetNull();
+    assertTrue(optional.isPresent());
+    assertEquals(taskList, optional.get());
+  }
+
+  @Test
+  public void setToOverwrite() {
+    final List<Task> taskList1 = mockTaskList();
+    pendingTaskCollectionPointer.setIfNull(taskList1);
+    final List<Task> taskList2 = mockTaskList();
+    pendingTaskCollectionPointer.setToOverwrite(taskList2);
+    final Optional<Collection<Task>> optional = 
pendingTaskCollectionPointer.getAndSetNull();
+    assertTrue(optional.isPresent());
+    assertEquals(taskList2, optional.get());
+  }
+}
+
diff --git 
a/runtime/master/src/test/java/edu/snu/nemo/runtime/master/scheduler/SchedulerTestUtil.java
 
b/runtime/master/src/test/java/edu/snu/nemo/runtime/master/scheduler/SchedulerTestUtil.java
index cb15f7ad..41f9642c 100644
--- 
a/runtime/master/src/test/java/edu/snu/nemo/runtime/master/scheduler/SchedulerTestUtil.java
+++ 
b/runtime/master/src/test/java/edu/snu/nemo/runtime/master/scheduler/SchedulerTestUtil.java
@@ -16,11 +16,14 @@
 package edu.snu.nemo.runtime.master.scheduler;
 
 import edu.snu.nemo.runtime.common.plan.Stage;
+import edu.snu.nemo.runtime.common.plan.Task;
 import edu.snu.nemo.runtime.common.state.StageState;
 import edu.snu.nemo.runtime.common.state.TaskState;
 import edu.snu.nemo.runtime.master.JobStateManager;
 import edu.snu.nemo.runtime.master.resource.ExecutorRepresenter;
 
+import java.util.Collection;
+import java.util.List;
 import java.util.Optional;
 
 /**
@@ -100,20 +103,14 @@ static void sendTaskStateEventToScheduler(final Scheduler 
scheduler,
     sendTaskStateEventToScheduler(scheduler, executorRegistry, taskId, 
newState, attemptIdx, null);
   }
 
-  static void mockSchedulingBySchedulerRunner(final PendingTaskCollection 
pendingTaskCollection,
+  static void mockSchedulingBySchedulerRunner(final 
PendingTaskCollectionPointer pendingTaskCollectionPointer,
                                               final SchedulingPolicy 
schedulingPolicy,
                                               final JobStateManager 
jobStateManager,
                                               final ExecutorRegistry 
executorRegistry,
                                               final boolean 
scheduleOnlyTheFirstStage) {
     final SchedulerRunner schedulerRunner =
-        new SchedulerRunner(schedulingPolicy, pendingTaskCollection, 
executorRegistry);
+        new SchedulerRunner(schedulingPolicy, pendingTaskCollectionPointer, 
executorRegistry);
     schedulerRunner.scheduleJob(jobStateManager);
-    while (!pendingTaskCollection.isEmpty()) {
-      schedulerRunner.doScheduleStage();
-      if (scheduleOnlyTheFirstStage) {
-        // Schedule only the first stage
-        break;
-      }
-    }
+    schedulerRunner.doScheduleTaskList();
   }
 }
diff --git 
a/runtime/master/src/test/java/edu/snu/nemo/runtime/master/scheduler/SingleTaskQueueTest.java
 
b/runtime/master/src/test/java/edu/snu/nemo/runtime/master/scheduler/SingleTaskQueueTest.java
deleted file mode 100644
index bb48746a..00000000
--- 
a/runtime/master/src/test/java/edu/snu/nemo/runtime/master/scheduler/SingleTaskQueueTest.java
+++ /dev/null
@@ -1,245 +0,0 @@
-/*
- * Copyright (C) 2018 Seoul National University
- *
- * Licensed under the Apache License, Version 2.0 (the "License");
- * you may not use this file except in compliance with the License.
- * You may obtain a copy of the License at
- *
- *         http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
- */
-package edu.snu.nemo.runtime.master.scheduler;
-
-import edu.snu.nemo.runtime.common.RuntimeIdGenerator;
-import edu.snu.nemo.runtime.common.plan.Task;
-import edu.snu.nemo.runtime.common.plan.PhysicalPlan;
-import edu.snu.nemo.runtime.common.plan.Stage;
-import edu.snu.nemo.runtime.plangenerator.TestPlanGenerator;
-import org.junit.Before;
-import org.junit.Test;
-
-import java.util.Collection;
-import java.util.Collections;
-import java.util.List;
-import java.util.concurrent.CountDownLatch;
-import java.util.concurrent.ExecutorService;
-import java.util.concurrent.Executors;
-import java.util.concurrent.atomic.AtomicBoolean;
-
-import static org.junit.Assert.assertEquals;
-import static org.junit.Assert.assertTrue;
-
-/**
- * Tests {@link SingleJobTaskCollection}.
- */
-public final class SingleTaskQueueTest {
-  private SingleJobTaskCollection pendingTaskPriorityQueue;
-
-  /**
-   * To be used for a thread pool to execute tasks.
-   */
-  private ExecutorService executorService;
-
-  @Before
-  public void setUp() throws Exception{
-    pendingTaskPriorityQueue = new SingleJobTaskCollection();
-    executorService = Executors.newFixedThreadPool(2);
-  }
-
-  /**
-   * This method builds a physical DAG starting from an IR DAG and submits it 
to {@link SingleJobTaskCollection}.
-   * Tests whether the dequeued Tasks are according to the stage-dependency 
priority.
-   */
-  @Test
-  public void testPushPriority() throws Exception {
-    final PhysicalPlan physicalPlan =
-        
TestPlanGenerator.generatePhysicalPlan(TestPlanGenerator.PlanType.ThreeSequentialVertices,
 true);
-
-    pendingTaskPriorityQueue.onJobScheduled(physicalPlan);
-    final List<Stage> dagOf2Stages = 
physicalPlan.getStageDAG().getTopologicalSort();
-
-    // Make sure that ScheduleGroups have been assigned to satisfy PendingPQ's 
requirements.
-    assertEquals(dagOf2Stages.get(0).getScheduleGroupIndex(), 
dagOf2Stages.get(1).getScheduleGroupIndex());
-
-    final CountDownLatch countDownLatch = new CountDownLatch(2);
-
-    final AtomicBoolean passed = new AtomicBoolean(true);
-
-    // This mimics Batch Scheduler's behavior
-    executorService.submit(() -> {
-      // First schedule the children Tasks (since it is push).
-      // BatchSingleJobScheduler will schedule Tasks in this order as well.
-      scheduleStage(dagOf2Stages.get(1));
-      // Then, schedule the parent Tasks.
-      scheduleStage(dagOf2Stages.get(0));
-
-      countDownLatch.countDown();
-    }).get();
-
-    // This mimics SchedulerRunner's behavior
-    executorService.submit(() -> {
-      try {
-        assertEquals(dequeueAndGetStageId(), dagOf2Stages.get(1).getId());
-        final Task dequeuedTask = dequeue();
-        
assertEquals(RuntimeIdGenerator.getStageIdFromTaskId(dequeuedTask.getTaskId()),
-            dagOf2Stages.get(1).getId());
-
-        // Let's say we fail to schedule, and add this Task back.
-        pendingTaskPriorityQueue.add(dequeuedTask);
-        assertEquals(dequeueAndGetStageId(), dagOf2Stages.get(1).getId());
-
-        // Now that we've dequeued all of the children Tasks, we should now 
start getting the parents.
-        assertEquals(dequeueAndGetStageId(), dagOf2Stages.get(0).getId());
-        assertEquals(dequeueAndGetStageId(), dagOf2Stages.get(0).getId());
-      } catch (Exception e) {
-        e.printStackTrace();
-        passed.getAndSet(false);
-      } finally {
-        countDownLatch.countDown();
-      }
-    }).get();
-
-    countDownLatch.await();
-    assertTrue(passed.get());
-  }
-
-  /**
-   * This method builds a physical DAG starting from an IR DAG and submits it 
to {@link SingleJobTaskCollection}.
-   * Tests whether the dequeued Tasks are according to the stage-dependency 
priority.
-   */
-  @Test
-  public void testPullPriority() throws Exception {
-    final PhysicalPlan physicalPlan =
-        
TestPlanGenerator.generatePhysicalPlan(TestPlanGenerator.PlanType.ThreeSequentialVertices,
 false);
-    pendingTaskPriorityQueue.onJobScheduled(physicalPlan);
-    final List<Stage> dagOf2Stages = 
physicalPlan.getStageDAG().getTopologicalSort();
-
-    // Make sure that ScheduleGroups have been assigned to satisfy PendingPQ's 
requirements.
-    assertEquals(dagOf2Stages.get(0).getScheduleGroupIndex(), 0);
-    assertEquals(dagOf2Stages.get(1).getScheduleGroupIndex(), 1);
-
-    final CountDownLatch countDownLatch = new CountDownLatch(2);
-
-    // This mimics Batch Scheduler's behavior
-    executorService.submit(() -> {
-      // First schedule the parent Tasks (since it is pull).
-      // BatchSingleJobScheduler will schedule Tasks in this order as well.
-      scheduleStage(dagOf2Stages.get(0));
-      countDownLatch.countDown();
-    }).get();
-
-    // This mimics SchedulerRunner's behavior
-    executorService.submit(() -> {
-      try {
-        assertEquals(dequeueAndGetStageId(), dagOf2Stages.get(0).getId());
-        final Task dequeuedTask = dequeue();
-        
assertEquals(RuntimeIdGenerator.getStageIdFromTaskId(dequeuedTask.getTaskId()),
-            dagOf2Stages.get(0).getId());
-
-        // Let's say we fail to schedule, and add this Task back.
-        pendingTaskPriorityQueue.add(dequeuedTask);
-        assertEquals(dequeueAndGetStageId(), dagOf2Stages.get(0).getId());
-
-        // Now that we've dequeued all of the children Tasks, we should now 
schedule children.
-        assertEquals(dequeueAndGetStageId(), dagOf2Stages.get(0).getId());
-
-        // Schedule the children Tasks.
-        scheduleStage(dagOf2Stages.get(1));
-      } catch (Exception e) {
-        e.printStackTrace();
-      } finally {
-        countDownLatch.countDown();
-      }
-    }).get();
-
-    countDownLatch.await();
-  }
-
-  /**
-   * This method builds a physical DAG starting from an IR DAG and submits it 
to {@link SingleJobTaskCollection}.
-   * Tests whether the dequeued Tasks are according to the stage-dependency 
priority,
-   * while concurrently scheduling Tasks that have dependencies, but are of 
different container types.
-   */
-  @Test
-  public void testWithDifferentContainerType() throws Exception {
-    final PhysicalPlan physicalPlan = TestPlanGenerator.generatePhysicalPlan(
-        
TestPlanGenerator.PlanType.ThreeSequentialVerticesWithDifferentContainerTypes, 
true);
-    pendingTaskPriorityQueue.onJobScheduled(physicalPlan);
-    final List<Stage> dagOf2Stages = 
physicalPlan.getStageDAG().getTopologicalSort();
-
-    // Make sure that ScheduleGroups have been assigned to satisfy PendingPQ's 
requirements.
-    assertEquals(dagOf2Stages.get(0).getScheduleGroupIndex(), 
dagOf2Stages.get(1).getScheduleGroupIndex());
-
-    final CountDownLatch countDownLatch = new CountDownLatch(2);
-
-    // First schedule the children Tasks (since it is push).
-    // BatchSingleJobScheduler will schedule Tasks in this order as well.
-    scheduleStage(dagOf2Stages.get(1));
-    // Then, schedule the parent Tasks.
-    scheduleStage(dagOf2Stages.get(0));
-
-    countDownLatch.countDown();
-
-    // This mimics SchedulerRunner's behavior.
-    executorService.submit(() -> {
-      try {
-        assertEquals(dequeueAndGetStageId(), dagOf2Stages.get(1).getId());
-
-        assertEquals(dequeueAndGetStageId(), dagOf2Stages.get(1).getId());
-
-        assertEquals(dequeueAndGetStageId(), dagOf2Stages.get(0).getId());
-
-        assertEquals(dequeueAndGetStageId(), dagOf2Stages.get(0).getId());
-
-        assertEquals(dequeueAndGetStageId(), dagOf2Stages.get(0).getId());
-      } catch (Exception e) {
-        e.printStackTrace();
-      } finally {
-        countDownLatch.countDown();
-      }
-    }).get();
-
-    countDownLatch.await();
-  }
-
-  /**
-   * Schedule the tasks in a stage.
-   * @param stage the stage to schedule.
-   */
-  private void scheduleStage(final Stage stage) {
-    stage.getTaskIds().forEach(taskId ->
-        pendingTaskPriorityQueue.add(new Task(
-            "TestPlan",
-            taskId,
-            0,
-            stage.getExecutionProperties(),
-            stage.getSerializedIRDAG(),
-            Collections.emptyList(),
-            Collections.emptyList(),
-            Collections.emptyMap())));
-  }
-
-  /**
-   * Dequeues a scheduled task from the task priority queue and get it's stage 
name.
-   * @return the stage name of the dequeued task.
-   */
-  private String dequeueAndGetStageId() {
-    final Task task = dequeue();
-    return RuntimeIdGenerator.getStageIdFromTaskId(task.getTaskId());
-  }
-
-  /**
-   * Dequeues a scheduled task from the task priority queue.
-   * @return the Task dequeued
-   */
-  private Task dequeue() {
-    final Collection<Task> tasks
-        = pendingTaskPriorityQueue.peekSchedulableStage().get();
-    return 
pendingTaskPriorityQueue.remove(tasks.iterator().next().getTaskId());
-  }
-}
diff --git 
a/tests/src/test/java/edu/snu/nemo/tests/compiler/optimizer/pass/compiletime/composite/SailfishPassTest.java
 
b/tests/src/test/java/edu/snu/nemo/tests/compiler/optimizer/pass/compiletime/composite/SailfishPassTest.java
index ab1beb7b..5d24f812 100644
--- 
a/tests/src/test/java/edu/snu/nemo/tests/compiler/optimizer/pass/compiletime/composite/SailfishPassTest.java
+++ 
b/tests/src/test/java/edu/snu/nemo/tests/compiler/optimizer/pass/compiletime/composite/SailfishPassTest.java
@@ -63,8 +63,9 @@ public void testSailfish() {
                 
edgeToMerger.getPropertyValue(UsedDataHandlingProperty.class).get());
             
assertEquals(InterTaskDataStoreProperty.Value.SerializedMemoryStore,
                 
edgeToMerger.getPropertyValue(InterTaskDataStoreProperty.class).get());
-            assertEquals(BytesDecoderFactory.of(),
-                edgeToMerger.getPropertyValue(DecoderProperty.class).get());
+            // TODO #125: Fix data loss bug caused by SailfishSchedulingPolicy
+            //assertEquals(BytesDecoderFactory.of(),
+            //    edgeToMerger.getPropertyValue(DecoderProperty.class).get());
           } else {
             assertEquals(DataFlowModelProperty.Value.Pull,
                 
edgeToMerger.getPropertyValue(DataFlowModelProperty.class).get());
@@ -77,8 +78,9 @@ public void testSailfish() {
               
edgeFromMerger.getPropertyValue(DataCommunicationPatternProperty.class).get());
           assertEquals(InterTaskDataStoreProperty.Value.LocalFileStore,
               
edgeFromMerger.getPropertyValue(InterTaskDataStoreProperty.class).get());
-          assertEquals(BytesEncoderFactory.of(),
-              edgeFromMerger.getPropertyValue(EncoderProperty.class).get());
+          // TODO #125: Fix data loss bug caused by SailfishSchedulingPolicy
+          //assertEquals(BytesEncoderFactory.of(),
+          //    edgeFromMerger.getPropertyValue(EncoderProperty.class).get());
         });
       } else {
         // Non merger vertex.


 

----------------------------------------------------------------
This is an automated message from the Apache Git Service.
To respond to the message, please log on GitHub and use the
URL above to go to the specific comment.
 
For queries about this service, please contact Infrastructure at:
[email protected]


With regards,
Apache Git Services

Reply via email to