jeongyooneo closed pull request #21: [NEMO-46] Make the operations on 
ExecutorRegistry atomic
URL: https://github.com/apache/incubator-nemo/pull/21
 
 
   

This is a PR merged from a forked repository.
As GitHub hides the original diff on merge, it is displayed below for
the sake of provenance:

As this is a foreign pull request (from a fork), the diff is supplied
below (as it won't show otherwise due to GitHub magic):

diff --git 
a/runtime/master/src/main/java/edu/snu/nemo/runtime/master/JobStateManager.java 
b/runtime/master/src/main/java/edu/snu/nemo/runtime/master/JobStateManager.java
index 61f7d6e4..80ab1ea9 100644
--- 
a/runtime/master/src/main/java/edu/snu/nemo/runtime/master/JobStateManager.java
+++ 
b/runtime/master/src/main/java/edu/snu/nemo/runtime/master/JobStateManager.java
@@ -65,6 +65,12 @@
   private final Map<String, StageState> idToStageStates;
   private final Map<String, TaskState> idToTaskStates;
 
+  /**
+   * Maintain the number of schedule attempts for each task.
+   * The attempt numbers are updated only here, and are read-only in other 
places.
+   */
+  private final Map<String, Integer> taskIdToCurrentAttempt;
+
   /**
    * Keeps track of the number of schedule attempts for each stage.
    */
@@ -111,6 +117,7 @@ public JobStateManager(final PhysicalPlan physicalPlan,
     this.jobState = new JobState();
     this.idToStageStates = new HashMap<>();
     this.idToTaskStates = new HashMap<>();
+    this.taskIdToCurrentAttempt = new HashMap<>();
     this.scheduleAttemptIdxByStage = new HashMap<>();
     this.stageIdToRemainingTaskSet = new HashMap<>();
     this.currentJobStageIds = new HashSet<>();
@@ -133,6 +140,7 @@ private void initializeComputationStates() {
       idToStageStates.put(physicalStage.getId(), new StageState());
       physicalStage.getTaskIds().forEach(taskId -> {
         idToTaskStates.put(taskId, new TaskState());
+        taskIdToCurrentAttempt.put(taskId, 1);
       });
     });
   }
@@ -274,6 +282,7 @@ public synchronized void onStageStateChanged(final String 
stageId, final StageSt
   public synchronized void onTaskStateChanged(final String taskId, final 
TaskState.State newState) {
     final StateMachine taskState = 
idToTaskStates.get(taskId).getStateMachine();
     final String stageId = RuntimeIdGenerator.getStageIdFromTaskId(taskId);
+
     LOG.debug("Task State Transition: id {}, from {} to {}",
         new Object[]{taskId, taskState.getCurrentState(), newState});
     final Map<String, Object> metric = new HashMap<>();
@@ -323,6 +332,9 @@ public synchronized void onTaskStateChanged(final String 
taskId, final TaskState
           throw new IllegalStateTransitionException(
               new Throwable("The stage has not yet been submitted for 
execution"));
         }
+
+        // We'll recover and retry this task
+        taskIdToCurrentAttempt.put(taskId, taskIdToCurrentAttempt.get(taskId) 
+ 1);
       } else {
         LOG.info("{} state is already FAILED_RECOVERABLE. Skipping this 
event.",
             taskId);
@@ -358,6 +370,14 @@ public synchronized int getAttemptCountForStage(final 
String stageId) {
     }
   }
 
+  public synchronized int getCurrentAttemptIndexForTask(final String taskId) {
+    if (taskIdToCurrentAttempt.containsKey(taskId)) {
+      return taskIdToCurrentAttempt.get(taskId);
+    } else {
+      throw new IllegalStateException("No mapping for this task's attemptIdx, 
an inconsistent state occurred.");
+    }
+  }
+
   /**
    * Wait for this job to be finished and return the final state.
    * @return the final state of this job.
diff --git 
a/runtime/master/src/main/java/edu/snu/nemo/runtime/master/RuntimeMaster.java 
b/runtime/master/src/main/java/edu/snu/nemo/runtime/master/RuntimeMaster.java
index e2825095..8d224244 100644
--- 
a/runtime/master/src/main/java/edu/snu/nemo/runtime/master/RuntimeMaster.java
+++ 
b/runtime/master/src/main/java/edu/snu/nemo/runtime/master/RuntimeMaster.java
@@ -53,6 +53,7 @@
 
 /**
  * (WARNING) Use runtimeMasterThread for all public methods to avoid race 
conditions.
+ * See comments in the {@link Scheduler} for avoiding race conditions.
  *
  * Runtime Master is the central controller of Runtime.
  * Compiler submits an {@link PhysicalPlan} to Runtime Master to execute a job.
@@ -265,8 +266,8 @@ private void handleControlMessage(final 
ControlMessage.Message message) {
 
         scheduler.onTaskStateChanged(taskStateChangedMsg.getExecutorId(),
             taskStateChangedMsg.getTaskId(),
-            convertTaskState(taskStateChangedMsg.getState()),
             taskStateChangedMsg.getAttemptIdx(),
+            convertTaskState(taskStateChangedMsg.getState()),
             taskStateChangedMsg.getTaskPutOnHoldId(),
             convertFailureCause(taskStateChangedMsg.getFailureCause()));
         break;
diff --git 
a/runtime/master/src/main/java/edu/snu/nemo/runtime/master/resource/ExecutorRepresenter.java
 
b/runtime/master/src/main/java/edu/snu/nemo/runtime/master/resource/ExecutorRepresenter.java
index d9641f48..caf16061 100644
--- 
a/runtime/master/src/main/java/edu/snu/nemo/runtime/master/resource/ExecutorRepresenter.java
+++ 
b/runtime/master/src/main/java/edu/snu/nemo/runtime/master/resource/ExecutorRepresenter.java
@@ -25,12 +25,14 @@
 import org.apache.reef.driver.context.ActiveContext;
 
 import javax.annotation.concurrent.NotThreadSafe;
+import java.util.HashMap;
 import java.util.HashSet;
+import java.util.Map;
 import java.util.Set;
 import java.util.concurrent.ExecutorService;
 
 /**
- * (WARNING) This class is not thread-safe.
+ * (WARNING) This class is not thread-safe, and thus should only be accessed 
through ExecutorRegistry.
  *
  * Contains information/state regarding an executor.
  * Such information may include:
@@ -46,6 +48,7 @@
   private final String executorId;
   private final ResourceSpecification resourceSpecification;
   private final Set<String> runningTasks;
+  private final Map<String, Integer> runningTaskToAttempt;
   private final Set<String> completeTasks;
   private final Set<String> failedTasks;
   private final MessageSender<ControlMessage.Message> messageSender;
@@ -72,6 +75,7 @@ public ExecutorRepresenter(final String executorId,
     this.resourceSpecification = resourceSpecification;
     this.messageSender = messageSender;
     this.runningTasks = new HashSet<>();
+    this.runningTaskToAttempt = new HashMap<>();
     this.completeTasks = new HashSet<>();
     this.failedTasks = new HashSet<>();
     this.activeContext = activeContext;
@@ -82,9 +86,11 @@ public ExecutorRepresenter(final String executorId,
   /**
    * Marks all Tasks which were running in this executor as failed.
    */
-  public void onExecutorFailed() {
-    runningTasks.forEach(taskId -> failedTasks.add(taskId));
+  public Set<String> onExecutorFailed() {
+    failedTasks.addAll(runningTasks);
+    final Set<String> snapshot = new HashSet<>(runningTasks);
     runningTasks.clear();
+    return snapshot;
   }
 
   /**
@@ -93,6 +99,7 @@ public void onExecutorFailed() {
    */
   public void onTaskScheduled(final ScheduledTask scheduledTask) {
     runningTasks.add(scheduledTask.getTaskId());
+    runningTaskToAttempt.put(scheduledTask.getTaskId(), 
scheduledTask.getAttemptIdx());
     failedTasks.remove(scheduledTask.getTaskId());
 
     serializationExecutorService.submit(new Runnable() {
@@ -123,10 +130,11 @@ public void sendControlMessage(final 
ControlMessage.Message message) {
 
   /**
    * Marks the specified Task as completed.
-   * @param taskId id of the Task
+   *
    */
   public void onTaskExecutionComplete(final String taskId) {
     runningTasks.remove(taskId);
+    runningTaskToAttempt.remove(taskId);
     completeTasks.add(taskId);
   }
 
@@ -136,6 +144,7 @@ public void onTaskExecutionComplete(final String taskId) {
    */
   public void onTaskExecutionFailed(final String taskId) {
     runningTasks.remove(taskId);
+    runningTaskToAttempt.remove(taskId);
     failedTasks.add(taskId);
   }
 
@@ -153,9 +162,13 @@ public int getExecutorCapacity() {
     return runningTasks;
   }
 
+  public Map<String, Integer> getRunningTaskToAttempt() {
+    return runningTaskToAttempt;
+  }
+
   /**
    * @return set of ids of Tasks that have been failed in this exeuctor
-   */
+
   public Set<String> getFailedTasks() {
     return failedTasks;
   }
diff --git 
a/runtime/master/src/main/java/edu/snu/nemo/runtime/master/scheduler/BatchSingleJobScheduler.java
 
b/runtime/master/src/main/java/edu/snu/nemo/runtime/master/scheduler/BatchSingleJobScheduler.java
index bdc265d9..a74ff10c 100644
--- 
a/runtime/master/src/main/java/edu/snu/nemo/runtime/master/scheduler/BatchSingleJobScheduler.java
+++ 
b/runtime/master/src/main/java/edu/snu/nemo/runtime/master/scheduler/BatchSingleJobScheduler.java
@@ -49,18 +49,15 @@
  * (i.e., runtimeMasterThread in RuntimeMaster)
  *
  * BatchSingleJobScheduler receives a single {@link PhysicalPlan} to execute 
and schedules the Tasks.
- * The policy by which it schedules them is dependent on the implementation of 
{@link SchedulingPolicy}.
  */
 @DriverSide
 @NotThreadSafe
 public final class BatchSingleJobScheduler implements Scheduler {
   private static final Logger LOG = 
LoggerFactory.getLogger(BatchSingleJobScheduler.class.getName());
-  private static final int SCHEDULE_ATTEMPT_ON_CONTAINER_FAILURE = 
Integer.MAX_VALUE;
 
   /**
    * Components related to scheduling the given job.
    */
-  private final SchedulingPolicy schedulingPolicy;
   private final SchedulerRunner schedulerRunner;
   private final PendingTaskCollection pendingTaskCollection;
   private final ExecutorRegistry executorRegistry;
@@ -79,14 +76,12 @@
   private int initialScheduleGroup;
 
   @Inject
-  public BatchSingleJobScheduler(final SchedulingPolicy schedulingPolicy,
-                                 final SchedulerRunner schedulerRunner,
+  public BatchSingleJobScheduler(final SchedulerRunner schedulerRunner,
                                  final PendingTaskCollection 
pendingTaskCollection,
                                  final BlockManagerMaster blockManagerMaster,
                                  final PubSubEventHandlerWrapper 
pubSubEventHandlerWrapper,
                                  final UpdatePhysicalPlanEventHandler 
updatePhysicalPlanEventHandler,
                                  final ExecutorRegistry executorRegistry) {
-    this.schedulingPolicy = schedulingPolicy;
     this.schedulerRunner = schedulerRunner;
     this.pendingTaskCollection = pendingTaskCollection;
     this.blockManagerMaster = blockManagerMaster;
@@ -110,6 +105,7 @@ public void scheduleJob(final PhysicalPlan jobToSchedule, 
final JobStateManager
     this.jobStateManager = scheduledJobStateManager;
 
     schedulerRunner.scheduleJob(scheduledJobStateManager);
+    schedulerRunner.runSchedulerThread();
     pendingTaskCollection.onJobScheduled(physicalPlan);
 
     LOG.info("Job to schedule: {}", jobToSchedule.getId());
@@ -132,65 +128,83 @@ public void updateJob(final String jobId, final 
PhysicalPlan newPhysicalPlan, fi
   }
 
   /**
-   * Receives a {@link 
edu.snu.nemo.runtime.common.comm.ControlMessage.TaskStateChangedMsg} from an 
executor.
-   * The message is received via communicator where this method is called.
+   * Handles task state transition notifications sent from executors.
+   * Note that we can receive notifications for previous task attempts, due to 
the nature of asynchronous events.
+   * We ignore such late-arriving notifications, and only handle notifications 
for the current task attempt.
+   *
    * @param executorId the id of the executor where the message was sent from.
    * @param taskId whose state has changed
+   * @param taskAttemptIndex of the task whose state has changed
    * @param newState the state to change to
    * @param taskPutOnHold the ID of task that are put on hold. It is null 
otherwise.
    */
   @Override
-  public void onTaskStateChanged(final String executorId, final String taskId,
-                                 final TaskState.State newState, final int 
attemptIdx,
+  public void onTaskStateChanged(final String executorId,
+                                 final String taskId,
+                                 final int taskAttemptIndex,
+                                 final TaskState.State newState,
                                  @Nullable final String taskPutOnHold,
                                  final TaskState.RecoverableFailureCause 
failureCause) {
-    switch (newState) {
-      case COMPLETE:
-        jobStateManager.onTaskStateChanged(taskId, newState);
-        onTaskExecutionComplete(executorId, taskId);
-        break;
-      case FAILED_RECOVERABLE:
-        onTaskExecutionFailedRecoverable(executorId, taskId, attemptIdx, 
newState, failureCause);
-        break;
-      case ON_HOLD:
-        jobStateManager.onTaskStateChanged(taskId, newState);
-        onTaskExecutionOnHold(executorId, taskId, taskPutOnHold);
-        break;
-      case FAILED_UNRECOVERABLE:
-        throw new UnrecoverableFailureException(new Exception(new 
StringBuffer().append("The job failed on Task #")
-            .append(taskId).append(" in Executor 
").append(executorId).toString()));
-      case READY:
-      case EXECUTING:
-        throw new IllegalStateTransitionException(
-            new Exception("The states READY/EXECUTING cannot occur at this 
point"));
-      default:
-        throw new UnknownExecutionStateException(new Exception("This TaskState 
is unknown: " + newState));
+    final int currentTaskAttemptIndex = 
jobStateManager.getCurrentAttemptIndexForTask(taskId);
+    if (taskAttemptIndex == currentTaskAttemptIndex) {
+      // Do change state, as this notification is for the current task attempt.
+      switch (newState) {
+        case COMPLETE:
+          jobStateManager.onTaskStateChanged(taskId, newState);
+          onTaskExecutionComplete(executorId, taskId);
+          break;
+        case FAILED_RECOVERABLE:
+          onTaskExecutionFailedRecoverable(executorId, taskId, newState, 
failureCause);
+          break;
+        case ON_HOLD:
+          jobStateManager.onTaskStateChanged(taskId, newState);
+          onTaskExecutionOnHold(executorId, taskId, taskPutOnHold);
+          break;
+        case FAILED_UNRECOVERABLE:
+          throw new UnrecoverableFailureException(new Exception(new 
StringBuffer().append("The job failed on Task #")
+              .append(taskId).append(" in Executor 
").append(executorId).toString()));
+        case READY:
+        case EXECUTING:
+          throw new IllegalStateTransitionException(
+              new Exception("The states READY/EXECUTING cannot occur at this 
point"));
+        default:
+          throw new UnknownExecutionStateException(new Exception("This 
TaskState is unknown: " + newState));
+      }
+    } else if (taskAttemptIndex < currentTaskAttemptIndex) {
+      // Do not change state, as this notification is for a previous task 
attempt.
+      // For example, the master can receive a notification that an executor 
has been removed,
+      // and then a notification that the task that was running in the removed 
executor has been completed.
+      // In this case, if we do not consider the attempt number, the state 
changes from FAILED_RECOVERABLE to COMPLETED,
+      // which is illegal.
+      LOG.info("{} state change to {} arrived late, we will ignore this.", new 
Object[]{taskId, newState});
+    } else {
+      throw new SchedulingException(new Throwable("AttemptIdx for a task 
cannot be greater than its current index"));
     }
   }
 
   @Override
   public void onExecutorAdded(final ExecutorRepresenter executorRepresenter) {
-    executorRegistry.registerRepresenter(executorRepresenter);
+    executorRegistry.registerExecutor(executorRepresenter);
     schedulerRunner.onAnExecutorAvailable();
   }
 
   @Override
   public void onExecutorRemoved(final String executorId) {
     final Set<String> tasksToReExecute = new HashSet<>();
-
     // Tasks for lost blocks
     tasksToReExecute.addAll(blockManagerMaster.removeWorker(executorId));
 
     // Tasks executing on the removed executor
-    executorRegistry.setRepresenterAsFailed(executorId);
-    final ExecutorRepresenter executor = 
executorRegistry.getFailedExecutorRepresenter(executorId);
-    executor.onExecutorFailed();
-    tasksToReExecute.addAll(executor.getFailedTasks());
+    executorRegistry.updateExecutor(executorId, (executor, state) -> {
+      tasksToReExecute.addAll(executor.onExecutorFailed());
+      return Pair.of(executor, ExecutorRegistry.ExecutorState.FAILED);
+    });
 
-    tasksToReExecute.forEach(failedTaskId ->
-        onTaskStateChanged(executorId, failedTaskId, 
TaskState.State.FAILED_RECOVERABLE,
-            SCHEDULE_ATTEMPT_ON_CONTAINER_FAILURE, null,
-            TaskState.RecoverableFailureCause.CONTAINER_FAILURE));
+    tasksToReExecute.forEach(failedTaskId -> {
+      final int attemptIndex = 
jobStateManager.getCurrentAttemptIndexForTask(failedTaskId);
+      onTaskStateChanged(executorId, failedTaskId, attemptIndex, 
TaskState.State.FAILED_RECOVERABLE,
+          null, TaskState.RecoverableFailureCause.CONTAINER_FAILURE);
+    });
 
     if (!tasksToReExecute.isEmpty()) {
       // Schedule a stage after marking the necessary tasks to 
failed_recoverable.
@@ -203,6 +217,7 @@ public void onExecutorRemoved(final String executorId) {
   @Override
   public void terminate() {
     this.schedulerRunner.terminate();
+    this.executorRegistry.terminate();
     this.pendingTaskCollection.close();
   }
 
@@ -374,8 +389,7 @@ private void scheduleStage(final PhysicalStage 
stageToSchedule) {
 
     // attemptIdx is only initialized/updated when we set the stage's state to 
executing
     jobStateManager.onStageStateChanged(stageToSchedule.getId(), 
StageState.State.EXECUTING);
-    final int attemptIdx = 
jobStateManager.getAttemptCountForStage(stageToSchedule.getId());
-    LOG.info("Scheduling Stage {} with attemptIdx={}", new 
Object[]{stageToSchedule.getId(), attemptIdx});
+    LOG.info("Scheduling Stage {}", stageToSchedule.getId());
 
     // each readable and source task will be bounded in executor.
     final List<Map<String, Readable>> logicalTaskIdToReadables = 
stageToSchedule.getLogicalTaskIdToReadables();
@@ -383,7 +397,9 @@ private void scheduleStage(final PhysicalStage 
stageToSchedule) {
     taskIdsToSchedule.forEach(taskId -> {
       blockManagerMaster.onProducerTaskScheduled(taskId);
       final int taskIdx = RuntimeIdGenerator.getIndexFromTaskId(taskId);
-      LOG.debug("Enquing {}", taskId);
+      final int attemptIdx = 
jobStateManager.getCurrentAttemptIndexForTask(taskId);
+
+      LOG.debug("Enqueueing {}", taskId);
       pendingTaskCollection.add(new ScheduledTask(physicalPlan.getId(),
           stageToSchedule.getSerializedTaskDag(), taskId, stageIncomingEdges, 
stageOutgoingEdges, attemptIdx,
           stageToSchedule.getContainerType(), 
logicalTaskIdToReadables.get(taskIdx)));
@@ -437,7 +453,10 @@ private void onTaskExecutionComplete(final String 
executorId,
                                        final Boolean isOnHoldToComplete) {
     LOG.debug("{} completed in {}", new Object[]{taskId, executorId});
     if (!isOnHoldToComplete) {
-      
executorRegistry.getRunningExecutorRepresenter(executorId).onTaskExecutionComplete(taskId);
+      executorRegistry.updateExecutor(executorId, (executor, state) -> {
+        executor.onTaskExecutionComplete(taskId);
+        return Pair.of(executor, state);
+      });
     }
 
     final String stageIdForTaskUponCompletion = 
RuntimeIdGenerator.getStageIdFromTaskId(taskId);
@@ -460,7 +479,10 @@ private void onTaskExecutionOnHold(final String executorId,
                                      final String taskId,
                                      final String taskPutOnHold) {
     LOG.info("{} put on hold in {}", new Object[]{taskId, executorId});
-    
executorRegistry.getRunningExecutorRepresenter(executorId).onTaskExecutionComplete(taskId);
+    executorRegistry.updateExecutor(executorId, (executor, state) -> {
+      executor.onTaskExecutionComplete(taskId);
+      return Pair.of(executor, state);
+    });
     final String stageIdForTaskUponCompletion = 
RuntimeIdGenerator.getStageIdFromTaskId(taskId);
 
     final boolean stageComplete =
@@ -492,49 +514,43 @@ private void onTaskExecutionOnHold(final String 
executorId,
    * Action for after task execution has failed but it's recoverable.
    * @param executorId    the ID of the executor
    * @param taskId   the ID of the task
-   * @param attemptIdx    the attempt index
    * @param newState      the state this situation
    * @param failureCause  the cause of failure
    */
-  private void onTaskExecutionFailedRecoverable(final String executorId, final 
String taskId,
-                                                final int attemptIdx, final 
TaskState.State newState,
+  private void onTaskExecutionFailedRecoverable(final String executorId,
+                                                final String taskId,
+                                                final TaskState.State newState,
                                                 final 
TaskState.RecoverableFailureCause failureCause) {
-    LOG.info("{} failed in {} by {}", new Object[]{taskId, executorId, 
failureCause});
-    
executorRegistry.getExecutorRepresenter(executorId).onTaskExecutionFailed(taskId);
+    LOG.info("{} failed in {} by {}", taskId, executorId, failureCause);
+    executorRegistry.updateExecutor(executorId, (executor, state) -> {
+      executor.onTaskExecutionFailed(taskId);
+      return Pair.of(executor, state);
+    });
 
     final String stageId = RuntimeIdGenerator.getStageIdFromTaskId(taskId);
-    final int attemptIndexForStage =
-        
jobStateManager.getAttemptCountForStage(RuntimeIdGenerator.getStageIdFromTaskId(taskId));
 
     switch (failureCause) {
       // Previous task must be re-executed, and incomplete tasks of the 
belonging stage must be rescheduled.
       case INPUT_READ_FAILURE:
-        if (attemptIdx == attemptIndexForStage) {
-          jobStateManager.onTaskStateChanged(taskId, newState);
-          LOG.info("All tasks of {} will be made failed_recoverable.", 
stageId);
-          for (final PhysicalStage stage : 
physicalPlan.getStageDAG().getTopologicalSort()) {
-            if (stage.getId().equals(stageId)) {
-              LOG.info("Removing Tasks for {} before they are scheduled to an 
executor", stage.getId());
-              pendingTaskCollection.removeTasksAndDescendants(stage.getId());
-              stage.getTaskIds().forEach(dstTaskId -> {
-                if 
(jobStateManager.getTaskState(dstTaskId).getStateMachine().getCurrentState()
-                    != TaskState.State.COMPLETE) {
-                  jobStateManager.onTaskStateChanged(dstTaskId, 
TaskState.State.FAILED_RECOVERABLE);
-                  blockManagerMaster.onProducerTaskFailed(dstTaskId);
-                }
-              });
-              break;
-            }
+        jobStateManager.onTaskStateChanged(taskId, newState);
+        LOG.info("All tasks of {} will be made failed_recoverable.", stageId);
+        for (final PhysicalStage stage : 
physicalPlan.getStageDAG().getTopologicalSort()) {
+          if (stage.getId().equals(stageId)) {
+            LOG.info("Removing Tasks for {} before they are scheduled to an 
executor", stage.getId());
+            pendingTaskCollection.removeTasksAndDescendants(stage.getId());
+            stage.getTaskIds().forEach(dstTaskId -> {
+              if 
(jobStateManager.getTaskState(dstTaskId).getStateMachine().getCurrentState()
+                  != TaskState.State.COMPLETE) {
+                jobStateManager.onTaskStateChanged(dstTaskId, 
TaskState.State.FAILED_RECOVERABLE);
+                blockManagerMaster.onProducerTaskFailed(dstTaskId);
+              }
+            });
+            break;
           }
-          // the stage this task belongs to has become failed recoverable.
-          // it is a good point to start searching for another stage to 
schedule.
-          scheduleNextStage(stageId);
-        } else if (attemptIdx < attemptIndexForStage) {
-          // if attemptIdx < attemptIndexForStage, we can ignore this late 
arriving message.
-          LOG.info("{} state change to failed_recoverable arrived late, we 
will ignore this.", taskId);
-        } else {
-          throw new SchedulingException(new Throwable("AttemptIdx for a task 
cannot be greater than its stage"));
         }
+        // the stage this task belongs to has become failed recoverable.
+        // it is a good point to start searching for another stage to schedule.
+        scheduleNextStage(stageId);
         break;
       // The task executed successfully but there is something wrong with the 
output store.
       case OUTPUT_WRITE_FAILURE:
diff --git 
a/runtime/master/src/main/java/edu/snu/nemo/runtime/master/scheduler/ExecutorRegistry.java
 
b/runtime/master/src/main/java/edu/snu/nemo/runtime/master/scheduler/ExecutorRegistry.java
index df896330..343151f0 100644
--- 
a/runtime/master/src/main/java/edu/snu/nemo/runtime/master/scheduler/ExecutorRegistry.java
+++ 
b/runtime/master/src/main/java/edu/snu/nemo/runtime/master/scheduler/ExecutorRegistry.java
@@ -15,190 +15,97 @@
  */
 package edu.snu.nemo.runtime.master.scheduler;
 
+import com.google.common.annotations.VisibleForTesting;
+import edu.snu.nemo.common.Pair;
 import edu.snu.nemo.runtime.master.resource.ExecutorRepresenter;
 import org.apache.reef.annotations.audience.DriverSide;
 
-import javax.annotation.Nonnull;
-import javax.annotation.concurrent.NotThreadSafe;
+import javax.annotation.concurrent.ThreadSafe;
 import javax.inject.Inject;
 import java.util.*;
+import java.util.function.BiFunction;
+import java.util.function.Consumer;
+import java.util.stream.Collectors;
 
 /**
- * (WARNING) This class is not thread-safe.
- * (i.e., Only a SchedulingPolicy accesses this class)
- *
+ * (WARNING) This class must be thread-safe.
  * Maintains map between executor id and {@link ExecutorRepresenter}.
  */
 @DriverSide
-@NotThreadSafe
+@ThreadSafe
 public final class ExecutorRegistry {
-  private final Map<String, ExecutorRepresenter> runningExecutors;
-  private final Map<String, ExecutorRepresenter> failedExecutors;
-  private final Map<String, ExecutorRepresenter> completedExecutors;
-
-  @Inject
-  public ExecutorRegistry() {
-    this.runningExecutors = new HashMap<>();
-    this.failedExecutors = new HashMap<>();
-    this.completedExecutors = new HashMap<>();
-  }
-
-  @Override
-  public String toString() {
-    final StringBuffer sb = new StringBuffer();
-    sb.append("Running: ");
-    sb.append(runningExecutors.toString());
-    sb.append("/ Failed: ");
-    sb.append(failedExecutors.toString());
-    sb.append("/ Completed: ");
-    sb.append(completedExecutors.toString());
-    return sb.toString();
-  }
-
   /**
-   * @param executorId the executor id
-   * @return the corresponding {@link ExecutorRepresenter} that has not failed
-   * @throws NoSuchExecutorException when the executor was not found
+   * States of an executor.
    */
-  @Nonnull
-  public ExecutorRepresenter getExecutorRepresenter(final String executorId) 
throws NoSuchExecutorException {
-    try {
-      return getRunningExecutorRepresenter(executorId);
-    } catch (final NoSuchExecutorException e) {
-      return getFailedExecutorRepresenter(executorId);
-    }
+  enum ExecutorState {
+    RUNNING,
+    FAILED,
+    TERMINATED
   }
 
-  /**
-   * @param executorId the executor id
-   * @return the corresponding {@link ExecutorRepresenter} that has not failed
-   * @throws NoSuchExecutorException when the executor was not found
-   */
-  @Nonnull
-  public ExecutorRepresenter getRunningExecutorRepresenter(final String 
executorId) throws NoSuchExecutorException {
-    final ExecutorRepresenter representer = runningExecutors.get(executorId);
-    if (representer == null) {
-      throw new NoSuchExecutorException(executorId);
-    }
-    return representer;
+  private final Map<String, Pair<ExecutorRepresenter, ExecutorState>> 
executors;
+
+  @Inject
+  public ExecutorRegistry() {
+    this.executors = new HashMap<>();
   }
 
-  /**
-   * @param executorId the executor id
-   * @return the corresponding {@link ExecutorRepresenter} that has not failed
-   * @throws NoSuchExecutorException when the executor was not found
-   */
-  @Nonnull
-  public ExecutorRepresenter getFailedExecutorRepresenter(final String 
executorId) throws NoSuchExecutorException {
-    final ExecutorRepresenter representer = failedExecutors.get(executorId);
-    if (representer == null) {
-      throw new NoSuchExecutorException(executorId);
+  synchronized void registerExecutor(final ExecutorRepresenter executor) {
+    final String executorId = executor.getExecutorId();
+    if (executors.containsKey(executorId)) {
+      throw new IllegalArgumentException("Duplicate executor: " + 
executor.toString());
+    } else {
+      executors.put(executorId, Pair.of(executor, ExecutorState.RUNNING));
     }
-    return representer;
   }
 
-  /**
-   * Returns a {@link Set} of running executor ids in the registry.
-   * Note the set is not modifiable. Also, further changes in the registry 
will not be reflected to the set.
-   * @return a {@link Set} of executor ids for running executors in the 
registry
-   */
-  public Set<String> getRunningExecutorIds() {
-    return Collections.unmodifiableSet(new 
TreeSet<>(runningExecutors.keySet()));
+  synchronized void viewExecutors(final Consumer<Set<ExecutorRepresenter>> 
consumer) {
+    consumer.accept(getRunningExecutors());
   }
 
-  /**
-   * Adds executor representer.
-   * @param representer the {@link ExecutorRepresenter} to register.
-   * @throws DuplicateExecutorIdException on multiple attempts to register 
same representer,
-   *         or different representers with same executor id.
-   */
-  public void registerRepresenter(final ExecutorRepresenter representer) 
throws DuplicateExecutorIdException {
-    final String executorId = representer.getExecutorId();
-    if (failedExecutors.get(executorId) != null) {
-      throw new DuplicateExecutorIdException(executorId);
+  synchronized void updateExecutor(
+      final String executorId,
+      final BiFunction<ExecutorRepresenter, ExecutorState, 
Pair<ExecutorRepresenter, ExecutorState>> updater) {
+    final Pair<ExecutorRepresenter, ExecutorState> pair = 
executors.get(executorId);
+    if (pair == null) {
+      throw new IllegalArgumentException("Unknown executor id " + executorId);
+    } else {
+      executors.put(executorId, updater.apply(pair.left(), pair.right()));
     }
-    runningExecutors.compute(executorId, (id, existingRepresenter) -> {
-      if (existingRepresenter != null) {
-        throw new DuplicateExecutorIdException(id);
-      }
-      return representer;
-    });
   }
 
-  /**
-   * Moves the representer into the pool of representer of the failed 
executors.
-   * @param executorId the corresponding executor id
-   * @throws NoSuchExecutorException when the specified executor id is not 
registered, or already set as failed
-   */
-  public void setRepresenterAsFailed(final String executorId) throws 
NoSuchExecutorException {
-    final ExecutorRepresenter representer = 
runningExecutors.remove(executorId);
-    if (representer == null) {
-      throw new NoSuchExecutorException(executorId);
+  synchronized void terminate() {
+    for (final ExecutorRepresenter executor : getRunningExecutors()) {
+      executor.shutDown();
+      executors.put(executor.getExecutorId(), Pair.of(executor, 
ExecutorState.TERMINATED));
     }
-    failedExecutors.put(executorId, representer);
   }
 
   /**
-   * Moves the representer into the pool of representer of the failed 
executors.
-   * @param executorId the corresponding executor id
-   * @throws NoSuchExecutorException when the specified executor id is not 
registered, or already set as failed
+   * Retrieves the executor to which the given task was scheduled.
+   * @param taskId of the task to search.
+   * @return the {@link ExecutorRepresenter} of the executor the task was 
scheduled to.
    */
-  public void setRepresenterAsCompleted(final String executorId) throws 
NoSuchExecutorException {
-    final ExecutorRepresenter representer = 
runningExecutors.remove(executorId);
-    if (representer == null) {
-      throw new NoSuchExecutorException(executorId);
-    }
-    if (failedExecutors.containsKey(executorId)) {
-      throw new IllegalStateException(executorId + " is in " + 
failedExecutors);
-    }
-    if (completedExecutors.containsKey(executorId)) {
-      throw new IllegalStateException(executorId + " is already in " + 
completedExecutors);
+  @VisibleForTesting
+  synchronized Optional<ExecutorRepresenter> findExecutorForTask(final String 
taskId) {
+    for (final ExecutorRepresenter executor : getRunningExecutors()) {
+      if (executor.getRunningTasks().contains(taskId) || 
executor.getCompleteTasks().contains(taskId)) {
+        return Optional.of(executor);
+      }
     }
-
-    completedExecutors.put(executorId, representer);
+    return Optional.empty();
   }
 
-  /**
-   * Exception that indicates multiple attempts to register executors with 
same executor id.
-   */
-  public final class DuplicateExecutorIdException extends RuntimeException {
-    private final String executorId;
-
-    /**
-     * @param executorId the executor id that caused this exception
-     */
-    public DuplicateExecutorIdException(final String executorId) {
-      super(String.format("Duplicate executorId: %s", executorId));
-      this.executorId = executorId;
-    }
-
-    /**
-     * @return the executor id for this exception
-     */
-    public String getExecutorId() {
-      return executorId;
-    }
+  private Set<ExecutorRepresenter> getRunningExecutors() {
+    return executors.values()
+        .stream()
+        .filter(pair -> pair.right().equals(ExecutorState.RUNNING))
+        .map(Pair::left)
+        .collect(Collectors.toSet());
   }
 
-  /**
-   * Exception that indicates no executor for the specified executorId.
-   */
-  public final class NoSuchExecutorException extends RuntimeException {
-    private final String executorId;
-
-    /**
-     * @param executorId the executor id that caused this exception
-     */
-    public NoSuchExecutorException(final String executorId) {
-      super(String.format("No such executor: %s", executorId));
-      this.executorId = executorId;
-    }
-
-    /**
-     * @return the executor id for this exception
-     */
-    public String getExecutorId() {
-      return executorId;
-    }
+  @Override
+  public String toString() {
+    return executors.toString();
   }
 }
diff --git 
a/runtime/master/src/main/java/edu/snu/nemo/runtime/master/scheduler/PendingTaskCollection.java
 
b/runtime/master/src/main/java/edu/snu/nemo/runtime/master/scheduler/PendingTaskCollection.java
index 0fcbe74c..d6696e9c 100644
--- 
a/runtime/master/src/main/java/edu/snu/nemo/runtime/master/scheduler/PendingTaskCollection.java
+++ 
b/runtime/master/src/main/java/edu/snu/nemo/runtime/master/scheduler/PendingTaskCollection.java
@@ -52,11 +52,11 @@
   ScheduledTask remove(final String taskId) throws NoSuchElementException;
 
   /**
-   * Peeks Tasks that can be scheduled according to job dependency priority.
+   * Peeks stage that can be scheduled according to job dependency priority.
    * Changes to the queue must not reflected to the returned collection to 
avoid concurrent modification.
-   * @return Tasks that can be scheduled, or {@link Optional#empty()} if the 
queue is empty
+   * @return stage that can be scheduled, or {@link Optional#empty()} if the 
queue is empty
    */
-  Optional<Collection<ScheduledTask>> peekSchedulableTasks();
+  Optional<Collection<ScheduledTask>> peekSchedulableStage();
 
   /**
    * Registers a job to this queue in case the queue needs to understand the 
topology of the job DAG.
diff --git 
a/runtime/master/src/main/java/edu/snu/nemo/runtime/master/scheduler/Scheduler.java
 
b/runtime/master/src/main/java/edu/snu/nemo/runtime/master/scheduler/Scheduler.java
index a7a9c36f..f9bb73ac 100644
--- 
a/runtime/master/src/main/java/edu/snu/nemo/runtime/master/scheduler/Scheduler.java
+++ 
b/runtime/master/src/main/java/edu/snu/nemo/runtime/master/scheduler/Scheduler.java
@@ -27,7 +27,7 @@
 
 /**
  * Only two threads call scheduling code: RuntimeMaster thread (RMT), and 
SchedulerThread(ST).
- * RMT and ST meet only at two points: SchedulingPolicy, and 
PendingTaskCollection,
+ * RMT and ST meet only at two points: {@link ExecutorRegistry}, and {@link 
PendingTaskCollection},
  * which are synchronized(ThreadSafe).
  * Other scheduler-related classes that are accessed by only one of the two 
threads are not synchronized(NotThreadSafe).
  *
@@ -79,8 +79,8 @@ void scheduleJob(PhysicalPlan physicalPlan,
    */
   void onTaskStateChanged(String executorId,
                           String taskId,
-                          TaskState.State newState,
                           int attemptIdx,
+                          TaskState.State newState,
                           @Nullable String taskPutOnHold,
                           TaskState.RecoverableFailureCause failureCause);
 
diff --git 
a/runtime/master/src/main/java/edu/snu/nemo/runtime/master/scheduler/SchedulerRunner.java
 
b/runtime/master/src/main/java/edu/snu/nemo/runtime/master/scheduler/SchedulerRunner.java
index f7db2512..fa98e399 100644
--- 
a/runtime/master/src/main/java/edu/snu/nemo/runtime/master/scheduler/SchedulerRunner.java
+++ 
b/runtime/master/src/main/java/edu/snu/nemo/runtime/master/scheduler/SchedulerRunner.java
@@ -27,10 +27,10 @@
 import java.util.concurrent.ExecutorService;
 import java.util.concurrent.Executors;
 import java.util.concurrent.atomic.AtomicBoolean;
+import java.util.concurrent.atomic.AtomicInteger;
 import java.util.concurrent.locks.Condition;
 import java.util.concurrent.locks.Lock;
 import java.util.concurrent.locks.ReentrantLock;
-import java.util.stream.Collectors;
 
 import org.slf4j.Logger;
 import org.slf4j.LoggerFactory;
@@ -48,10 +48,11 @@
   private final Map<String, JobStateManager> jobStateManagers;
   private final PendingTaskCollection pendingTaskCollection;
   private final ExecutorService schedulerThread;
-  private boolean initialJobScheduled;
+  private AtomicBoolean isSchedulerRunning;
   private boolean isTerminated;
-  private final DelayedSignalingCondition 
mustCheckSchedulingAvailabilityOrSchedulerTerminated
-      = new DelayedSignalingCondition();
+
+  // (available executor AND available task to schedule) OR the scheduler has 
terminated
+  private final DelayedSignalingCondition canScheduleOrTerminated = new 
DelayedSignalingCondition();
   private ExecutorRegistry executorRegistry;
   private SchedulingPolicy schedulingPolicy;
 
@@ -63,7 +64,7 @@ public SchedulerRunner(final SchedulingPolicy 
schedulingPolicy,
     this.jobStateManagers = new HashMap<>();
     this.pendingTaskCollection = pendingTaskCollection;
     this.schedulerThread = Executors.newSingleThreadExecutor(runnable -> new 
Thread(runnable, "SchedulerRunner"));
-    this.initialJobScheduled = false;
+    this.isSchedulerRunning = new AtomicBoolean(false);
     this.isTerminated = false;
     this.executorRegistry = executorRegistry;
     this.schedulingPolicy = schedulingPolicy;
@@ -73,14 +74,26 @@ public SchedulerRunner(final SchedulingPolicy 
schedulingPolicy,
    * Signals to the condition on executor availability.
    */
   public void onAnExecutorAvailable() {
-    mustCheckSchedulingAvailabilityOrSchedulerTerminated.signal();
+    canScheduleOrTerminated.signal();
   }
 
   /**
    * Signals to the condition on Task availability.
    */
   public void onATaskAvailable() {
-    mustCheckSchedulingAvailabilityOrSchedulerTerminated.signal();
+    canScheduleOrTerminated.signal();
+  }
+
+  /**
+   * Run the scheduler thread.
+   */
+  void runSchedulerThread() {
+    if (!isTerminated) {
+      if (!isSchedulerRunning.getAndSet(true)) {
+        schedulerThread.execute(new SchedulerThread());
+        schedulerThread.shutdown();
+      }
+    }
   }
 
   /**
@@ -90,81 +103,71 @@ public void onATaskAvailable() {
   void scheduleJob(final JobStateManager jobStateManager) {
     if (!isTerminated) {
       jobStateManagers.put(jobStateManager.getJobId(), jobStateManager);
-
-      if (!initialJobScheduled) {
-        initialJobScheduled = true;
-        schedulerThread.execute(new SchedulerThread());
-        schedulerThread.shutdown();
-      }
     } // else ignore new incoming jobs when terminated.
   }
 
   void terminate() {
-    for (final String executorId : executorRegistry.getRunningExecutorIds()) {
-      final ExecutorRepresenter representer = 
executorRegistry.getRunningExecutorRepresenter(executorId);
-      representer.shutDown();
-      executorRegistry.setRepresenterAsCompleted(executorId);
-    }
     isTerminated = true;
-    mustCheckSchedulingAvailabilityOrSchedulerTerminated.signal();
+    canScheduleOrTerminated.signal();
+  }
+
+  void doScheduleStage() {
+    final Collection<ScheduledTask> stageToSchedule = 
pendingTaskCollection.peekSchedulableStage().orElse(null);
+    if (stageToSchedule == null) {
+      // Task queue is empty
+      LOG.debug("PendingTaskCollection is empty. Awaiting for more Tasks...");
+      return;
+    }
+
+    final AtomicInteger numScheduledTasks = new AtomicInteger(0); // to be 
incremented in lambda
+    for (final ScheduledTask schedulableTask : stageToSchedule) {
+      final JobStateManager jobStateManager = 
jobStateManagers.get(schedulableTask.getJobId());
+      LOG.debug("Trying to schedule {}...", schedulableTask.getTaskId());
+
+      executorRegistry.viewExecutors(executors -> {
+        final Set<ExecutorRepresenter> candidateExecutors =
+            schedulingPolicy.filterExecutorRepresenters(executors, 
schedulableTask);
+        final Optional<ExecutorRepresenter> firstCandidate = 
candidateExecutors.stream().findFirst();
+
+        if (firstCandidate.isPresent()) {
+          // update metadata first
+          jobStateManager.onTaskStateChanged(schedulableTask.getTaskId(), 
TaskState.State.EXECUTING);
+          pendingTaskCollection.remove(schedulableTask.getTaskId());
+          numScheduledTasks.incrementAndGet();
+
+          // send the task
+          final ExecutorRepresenter selectedExecutor = firstCandidate.get();
+          selectedExecutor.onTaskScheduled(schedulableTask);
+          LOG.debug("Successfully scheduled {}", schedulableTask.getTaskId());
+        } else {
+          LOG.debug("Failed to schedule {}", schedulableTask.getTaskId());
+        }
+      });
+    }
+
+    LOG.debug("Examined {} Tasks, scheduled {} Tasks", stageToSchedule.size(), 
numScheduledTasks);
+    if (stageToSchedule.size() == numScheduledTasks.get()) {
+      // Scheduled all tasks in the stage
+      // Immediately run next iteration to check whether there is another 
stage that can be scheduled
+      LOG.debug("Trying to schedule next Stage in the ScheduleGroup (if 
any)...");
+      canScheduleOrTerminated.signal();
+    }
   }
 
   /**
    * A separate thread is run to schedule tasks to executors.
+   * See comments in the {@link Scheduler} for avoiding race conditions.
    */
   private final class SchedulerThread implements Runnable {
     @Override
     public void run() {
       // Run the first iteration unconditionally
-      mustCheckSchedulingAvailabilityOrSchedulerTerminated.signal();
+      canScheduleOrTerminated.signal();
 
       while (!isTerminated) {
         // Iteration guard
-        mustCheckSchedulingAvailabilityOrSchedulerTerminated.await();
-
-        final Collection<ScheduledTask> schedulableTasks = 
pendingTaskCollection
-            .peekSchedulableTasks().orElse(null);
-        if (schedulableTasks == null) {
-          // Task queue is empty
-          LOG.debug("PendingTaskCollection is empty. Awaiting for more 
Tasks...");
-          continue;
-        }
-
-        int numScheduledTasks = 0;
-        for (final ScheduledTask schedulableTask : schedulableTasks) {
-          final JobStateManager jobStateManager = 
jobStateManagers.get(schedulableTask.getJobId());
-          LOG.debug("Trying to schedule {}...", schedulableTask.getTaskId());
-
-          final Set<ExecutorRepresenter> runningExecutorRepresenter =
-              executorRegistry.getRunningExecutorIds().stream()
-              .map(executorId -> 
executorRegistry.getExecutorRepresenter(executorId))
-              .collect(Collectors.toSet());
-
-          final Set<ExecutorRepresenter> candidateExecutors =
-              
schedulingPolicy.filterExecutorRepresenters(runningExecutorRepresenter, 
schedulableTask);
-
-          if (candidateExecutors.size() != 0) {
-            jobStateManager.onTaskStateChanged(schedulableTask.getTaskId(),
-                TaskState.State.EXECUTING);
-            final ExecutorRepresenter executor = 
candidateExecutors.stream().findFirst().get();
-            executor.onTaskScheduled(schedulableTask);
-
-            pendingTaskCollection.remove(schedulableTask.getTaskId());
-            numScheduledTasks++;
-            LOG.debug("Successfully scheduled {}", 
schedulableTask.getTaskId());
-          } else {
-            LOG.debug("Failed to schedule {}", schedulableTask.getTaskId());
-          }
-        }
-
-        LOG.debug("Examined {} Tasks, scheduled {} Tasks",
-            schedulableTasks.size(), numScheduledTasks);
-        if (schedulableTasks.size() == numScheduledTasks) {
-          // Scheduled all Tasks in the stage
-          // Immediately run next iteration to check whether there is another 
schedulable stage
-          LOG.debug("Trying to schedule next Stage in the ScheduleGroup (if 
any)...");
-          mustCheckSchedulingAvailabilityOrSchedulerTerminated.signal();
-        }
+        canScheduleOrTerminated.await();
+        doScheduleStage();
       }
       jobStateManagers.values().forEach(jobStateManager -> {
         if (jobStateManager.getJobState().getStateMachine().getCurrentState() 
== JobState.State.COMPLETE) {
diff --git 
a/runtime/master/src/main/java/edu/snu/nemo/runtime/master/scheduler/SingleJobTaskCollection.java
 
b/runtime/master/src/main/java/edu/snu/nemo/runtime/master/scheduler/SingleJobTaskCollection.java
index 526fbec2..a5f57361 100644
--- 
a/runtime/master/src/main/java/edu/snu/nemo/runtime/master/scheduler/SingleJobTaskCollection.java
+++ 
b/runtime/master/src/main/java/edu/snu/nemo/runtime/master/scheduler/SingleJobTaskCollection.java
@@ -31,7 +31,7 @@
 /**
  * {@link PendingTaskCollection} implementation.
  * This class provides two-level scheduling by keeping track of schedulable 
stages and stage-Task membership.
- * {@link #peekSchedulableTasks()} returns collection of Tasks which belong to 
one of the schedulable stages.
+ * {@link #peekSchedulableStage()} returns collection of Tasks which belong to 
one of the schedulable stages.
  */
 @ThreadSafe
 @DriverSide
@@ -73,12 +73,12 @@ public synchronized void add(final ScheduledTask 
scheduledTask) {
 
   /**
    * Removes the specified Task to be scheduled.
-   * The specified Task should belong to the collection from {@link 
#peekSchedulableTasks()}.
+   * The specified Task should belong to the collection from {@link 
#peekSchedulableStage()}.
    * @param taskId id of the Task
    * @return the specified Task
    * @throws NoSuchElementException if the specified Task is not in the queue,
    *                                or removing this Task breaks scheduling 
order
-   *                                (i.e. does not belong to the collection 
from {@link #peekSchedulableTasks()}.
+   *                                (i.e. does not belong to the collection 
from {@link #peekSchedulableStage()}.
    */
   @Override
   public synchronized ScheduledTask remove(final String taskId) throws 
NoSuchElementException {
@@ -115,7 +115,7 @@ public synchronized ScheduledTask remove(final String 
taskId) throws NoSuchEleme
    *         or {@link Optional#empty} if the queue is empty
    */
   @Override
-  public synchronized Optional<Collection<ScheduledTask>> 
peekSchedulableTasks() {
+  public synchronized Optional<Collection<ScheduledTask>> 
peekSchedulableStage() {
     final String stageId = schedulableStages.peekFirst();
     if (stageId == null) {
       return Optional.empty();
diff --git 
a/runtime/master/src/test/java/edu.snu.nemo.runtime.master/scheduler/BatchSingleJobSchedulerTest.java
 
b/runtime/master/src/test/java/edu.snu.nemo.runtime.master/scheduler/BatchSingleJobSchedulerTest.java
index 47c052f2..78466b27 100644
--- 
a/runtime/master/src/test/java/edu.snu.nemo.runtime.master/scheduler/BatchSingleJobSchedulerTest.java
+++ 
b/runtime/master/src/test/java/edu.snu.nemo.runtime.master/scheduler/BatchSingleJobSchedulerTest.java
@@ -73,8 +73,8 @@
 
   private static final int EXECUTOR_CAPACITY = 20;
 
-  // This schedule index will make sure that task events are not ignored
-  private static final int MAGIC_SCHEDULE_ATTEMPT_INDEX = Integer.MAX_VALUE;
+  // Assume no failures
+  private static final int SCHEDULE_ATTEMPT_INDEX = 1;
 
   @Before
   public void setUp() throws Exception {
@@ -89,7 +89,7 @@ public void setUp() throws Exception {
     pubSubEventHandler = mock(PubSubEventHandlerWrapper.class);
     updatePhysicalPlanEventHandler = 
mock(UpdatePhysicalPlanEventHandler.class);
     scheduler =
-        new BatchSingleJobScheduler(schedulingPolicy, schedulerRunner, 
pendingTaskCollection,
+        new BatchSingleJobScheduler(schedulerRunner, pendingTaskCollection,
             blockManagerMaster, pubSubEventHandler, 
updatePhysicalPlanEventHandler, executorRegistry);
 
     final ActiveContext activeContext = mock(ActiveContext.class);
@@ -164,7 +164,7 @@ private void scheduleAndCheckJobTermination(final 
PhysicalPlan plan) throws Inje
 
       stages.forEach(physicalStage -> {
         SchedulerTestUtil.completeStage(
-            jobStateManager, scheduler, executorRegistry, physicalStage, 
MAGIC_SCHEDULE_ATTEMPT_INDEX);
+            jobStateManager, scheduler, executorRegistry, physicalStage, 
SCHEDULE_ATTEMPT_INDEX);
       });
     }
 
diff --git 
a/runtime/master/src/test/java/edu.snu.nemo.runtime.master/scheduler/FaultToleranceTest.java
 
b/runtime/master/src/test/java/edu.snu.nemo.runtime.master/scheduler/FaultToleranceTest.java
index d2ee9037..5b483157 100644
--- 
a/runtime/master/src/test/java/edu.snu.nemo.runtime.master/scheduler/FaultToleranceTest.java
+++ 
b/runtime/master/src/test/java/edu.snu.nemo.runtime.master/scheduler/FaultToleranceTest.java
@@ -15,10 +15,7 @@
  */
 package edu.snu.nemo.runtime.master.scheduler;
 
-import edu.snu.nemo.common.dag.DAGBuilder;
 import edu.snu.nemo.common.eventhandler.PubSubEventHandlerWrapper;
-import edu.snu.nemo.common.ir.edge.IREdge;
-import edu.snu.nemo.common.ir.vertex.IRVertex;
 import 
edu.snu.nemo.common.ir.vertex.executionproperty.ExecutorPlacementProperty;
 import edu.snu.nemo.runtime.common.comm.ControlMessage;
 import edu.snu.nemo.runtime.common.message.MessageSender;
@@ -64,8 +61,7 @@
     PubSubEventHandlerWrapper.class, UpdatePhysicalPlanEventHandler.class, 
MetricMessageHandler.class})
 public final class FaultToleranceTest {
   private static final Logger LOG = 
LoggerFactory.getLogger(FaultToleranceTest.class.getName());
-  private DAGBuilder<IRVertex, IREdge> irDAGBuilder;
-  private Scheduler scheduler;
+
   private SchedulingPolicy schedulingPolicy;
   private SchedulerRunner schedulerRunner;
   private ExecutorRegistry executorRegistry;
@@ -78,20 +74,17 @@
   private final MessageSender<ControlMessage.Message> mockMsgSender = 
mock(MessageSender.class);
   private final ExecutorService serExecutorService = 
Executors.newSingleThreadExecutor();
 
-  private static final int MAX_SCHEDULE_ATTEMPT = 3;
+  private static final int MAX_SCHEDULE_ATTEMPT = Integer.MAX_VALUE;
 
   @Before
   public void setUp() throws Exception {
-    irDAGBuilder = new DAGBuilder<>();
-
     metricMessageHandler = mock(MetricMessageHandler.class);
     pubSubEventHandler = mock(PubSubEventHandlerWrapper.class);
     updatePhysicalPlanEventHandler = 
mock(UpdatePhysicalPlanEventHandler.class);
 
   }
 
-  private void setUpExecutors(final Collection<ExecutorRepresenter> executors,
-                              final boolean useMockSchedulerRunner) throws 
InjectionException {
+  private Scheduler setUpScheduler(final boolean useMockSchedulerRunner) 
throws InjectionException {
     final Injector injector = Tang.Factory.getTang().newInjector();
     executorRegistry = injector.getInstance(ExecutorRegistry.class);
 
@@ -103,14 +96,8 @@ private void setUpExecutors(final 
Collection<ExecutorRepresenter> executors,
     } else {
       schedulerRunner = new SchedulerRunner(schedulingPolicy, 
pendingTaskCollection, executorRegistry);
     }
-    scheduler =
-        new BatchSingleJobScheduler(schedulingPolicy, schedulerRunner, 
pendingTaskCollection,
-            blockManagerMaster, pubSubEventHandler, 
updatePhysicalPlanEventHandler, executorRegistry);
-
-    // Add nodes
-    for (final ExecutorRepresenter executor : executors) {
-      scheduler.onExecutorAdded(executor);
-    }
+    return new BatchSingleJobScheduler(schedulerRunner, pendingTaskCollection, 
blockManagerMaster,
+        pubSubEventHandler, updatePhysicalPlanEventHandler, executorRegistry);
   }
 
   /**
@@ -133,7 +120,11 @@ public void testContainerRemoval() throws Exception {
     executors.add(a2);
     executors.add(a3);
 
-    setUpExecutors(executors, true);
+    final Scheduler scheduler = setUpScheduler(true);
+    for (final ExecutorRepresenter executor : executors) {
+      scheduler.onExecutorAdded(executor);
+    }
+
     final PhysicalPlan plan =
         
TestPlanGenerator.generatePhysicalPlan(TestPlanGenerator.PlanType.TwoVerticesJoined,
 false);
 
@@ -201,7 +192,10 @@ public void testOutputFailure() throws Exception {
     executors.add(a1);
     executors.add(a2);
     executors.add(a3);
-    setUpExecutors(executors, true);
+    final Scheduler scheduler = setUpScheduler(true);
+    for (final ExecutorRepresenter executor : executors) {
+      scheduler.onExecutorAdded(executor);
+    }
 
     final PhysicalPlan plan =
         
TestPlanGenerator.generatePhysicalPlan(TestPlanGenerator.PlanType.TwoVerticesJoined,
 false);
@@ -264,7 +258,10 @@ public void testInputReadFailure() throws Exception {
     executors.add(a1);
     executors.add(a2);
     executors.add(a3);
-    setUpExecutors(executors, true);
+    final Scheduler scheduler = setUpScheduler(true);
+    for (final ExecutorRepresenter executor : executors) {
+      scheduler.onExecutorAdded(executor);
+    }
 
     final PhysicalPlan plan =
         
TestPlanGenerator.generatePhysicalPlan(TestPlanGenerator.PlanType.TwoVerticesJoined,
 false);
@@ -310,7 +307,7 @@ public void testInputReadFailure() throws Exception {
   /**
    * Tests the rescheduling of Tasks upon a failure.
    */
-  @Test(timeout=10000)
+  @Test(timeout=20000)
   public void testTaskReexecutionForFailure() throws Exception {
     final ActiveContext activeContext = mock(ActiveContext.class);
     Mockito.doThrow(new RuntimeException()).when(activeContext).close();
@@ -318,43 +315,61 @@ public void testTaskReexecutionForFailure() throws 
Exception {
     final ResourceSpecification computeSpec = new 
ResourceSpecification(ExecutorPlacementProperty.COMPUTE, 2, 0);
     final Function<String, ExecutorRepresenter> executorRepresenterGenerator = 
executorId ->
         new ExecutorRepresenter(executorId, computeSpec, mockMsgSender, 
activeContext, serExecutorService, executorId);
-    final ExecutorRepresenter a3 = executorRepresenterGenerator.apply("a3");
-    final ExecutorRepresenter a2 = executorRepresenterGenerator.apply("a2");
-    final ExecutorRepresenter a1 = executorRepresenterGenerator.apply("a1");
-
-    final List<ExecutorRepresenter> executors = new ArrayList<>();
-    executors.add(a1);
-    executors.add(a2);
-    executors.add(a3);
-
-    setUpExecutors(executors, false);
 
+    final Scheduler scheduler = setUpScheduler(false);
     final PhysicalPlan plan =
         
TestPlanGenerator.generatePhysicalPlan(TestPlanGenerator.PlanType.TwoVerticesJoined,
 false);
-
-
     final JobStateManager jobStateManager =
         new JobStateManager(plan, blockManagerMaster, metricMessageHandler, 
MAX_SCHEDULE_ATTEMPT);
-
     scheduler.scheduleJob(plan, jobStateManager);
-    scheduler.onExecutorRemoved("a2");
 
+    final List<ExecutorRepresenter> executors = new ArrayList<>();
     final List<PhysicalStage> dagOf4Stages = 
plan.getStageDAG().getTopologicalSort();
 
+    int executorIdIndex = 1;
+    float removalChance = 0.7f; // Out of 1.0
+    final Random random = new Random(0); // Deterministic seed.
+
     for (final PhysicalStage stage : dagOf4Stages) {
-      while 
(jobStateManager.getStageState(stage.getId()).getStateMachine().getCurrentState()
 != COMPLETE) {
-        final Set<String> a1RunningTasks = new HashSet<>(a1.getRunningTasks());
-        final Set<String> a3RunningTasks = new HashSet<>(a3.getRunningTasks());
 
-        a1RunningTasks.forEach(taskId ->
-            SchedulerTestUtil.sendTaskStateEventToScheduler(scheduler, 
executorRegistry,
-                taskId, TaskState.State.COMPLETE, 1));
+      while 
(jobStateManager.getStageState(stage.getId()).getStateMachine().getCurrentState()
 != COMPLETE) {
+        // By chance, remove or add executor
+        if (isTrueByChance(random, removalChance)) {
+          // REMOVE EXECUTOR
+          if (!executors.isEmpty()) {
+            
scheduler.onExecutorRemoved(executors.remove(random.nextInt(executors.size())).getExecutorId());
+          } else {
+            // Skip, since no executor is running.
+          }
+        } else {
+          if (executors.size() < 3) {
+            // ADD EXECUTOR
+            final ExecutorRepresenter newExecutor = 
executorRepresenterGenerator.apply("a" + executorIdIndex);
+            executorIdIndex += 1;
+            executors.add(newExecutor);
+            scheduler.onExecutorAdded(newExecutor);
+          } else {
+            // Skip, in order to keep the total number of running executors 
below or equal to 3
+          }
+        }
 
-        a3RunningTasks.forEach(taskId ->
-            SchedulerTestUtil.sendTaskStateEventToScheduler(scheduler, 
executorRegistry,
-                taskId, TaskState.State.COMPLETE, 1));
+        // Complete the execution of tasks
+        if (!executors.isEmpty()) {
+          final int indexOfCompletedExecutor = 
random.nextInt(executors.size());
+          // New set for snapshotting
+          final Map<String, Integer> runningTaskSnapshot =
+              new 
HashMap<>(executors.get(indexOfCompletedExecutor).getRunningTaskToAttempt());
+          runningTaskSnapshot.entrySet().forEach(entry -> {
+            SchedulerTestUtil.sendTaskStateEventToScheduler(
+                scheduler, executorRegistry, entry.getKey(), 
TaskState.State.COMPLETE, entry.getValue());
+          });
+        }
       }
     }
     assertTrue(jobStateManager.checkJobTermination());
   }
+
+  private boolean isTrueByChance(final Random random, final float chance) {
+    return chance > random.nextDouble();
+  }
 }
diff --git 
a/runtime/master/src/test/java/edu.snu.nemo.runtime.master/scheduler/SchedulerTestUtil.java
 
b/runtime/master/src/test/java/edu.snu.nemo.runtime.master/scheduler/SchedulerTestUtil.java
index 36b489a7..a9d1d4b2 100644
--- 
a/runtime/master/src/test/java/edu.snu.nemo.runtime.master/scheduler/SchedulerTestUtil.java
+++ 
b/runtime/master/src/test/java/edu.snu.nemo.runtime.master/scheduler/SchedulerTestUtil.java
@@ -16,19 +16,17 @@
 package edu.snu.nemo.runtime.master.scheduler;
 
 import edu.snu.nemo.runtime.common.plan.physical.PhysicalStage;
-import edu.snu.nemo.runtime.common.plan.physical.ScheduledTask;
 import edu.snu.nemo.runtime.common.state.StageState;
 import edu.snu.nemo.runtime.common.state.TaskState;
 import edu.snu.nemo.runtime.master.JobStateManager;
 import edu.snu.nemo.runtime.master.resource.ExecutorRepresenter;
 
-import java.util.Set;
-import java.util.stream.Collectors;
+import java.util.Optional;
 
 /**
  * Utility class for runtime unit tests.
  */
-public final class SchedulerTestUtil {
+final class SchedulerTestUtil {
   /**
    * Complete the stage by completing all of its Tasks.
    * @param jobStateManager for the submitted job.
@@ -36,11 +34,11 @@
    * @param executorRegistry provides executor representers
    * @param physicalStage for which the states should be marked as complete.
    */
-  public static void completeStage(final JobStateManager jobStateManager,
-                                   final Scheduler scheduler,
-                                   final ExecutorRegistry executorRegistry,
-                                   final PhysicalStage physicalStage,
-                                   final int attemptIdx) {
+  static void completeStage(final JobStateManager jobStateManager,
+                            final Scheduler scheduler,
+                            final ExecutorRegistry executorRegistry,
+                            final PhysicalStage physicalStage,
+                            final int attemptIdx) {
     // Loop until the stage completes.
     while (true) {
       final Enum stageState = 
jobStateManager.getStageState(physicalStage.getId()).getStateMachine().getCurrentState();
@@ -76,73 +74,46 @@ public static void completeStage(final JobStateManager 
jobStateManager,
    * @param newState for the task.
    * @param cause in the case of a recoverable failure.
    */
-  public static void sendTaskStateEventToScheduler(final Scheduler scheduler,
-                                                   final ExecutorRegistry 
executorRegistry,
-                                                   final String taskId,
-                                                   final TaskState.State 
newState,
-                                                   final int attemptIdx,
-                                                   final 
TaskState.RecoverableFailureCause cause) {
-    ExecutorRepresenter scheduledExecutor;
-    do {
-      scheduledExecutor = findExecutorForTask(executorRegistry, taskId);
-    } while (scheduledExecutor == null);
-
-    scheduler.onTaskStateChanged(scheduledExecutor.getExecutorId(), taskId,
-        newState, attemptIdx, null, cause);
+  static void sendTaskStateEventToScheduler(final Scheduler scheduler,
+                                            final ExecutorRegistry 
executorRegistry,
+                                            final String taskId,
+                                            final TaskState.State newState,
+                                            final int attemptIdx,
+                                            final 
TaskState.RecoverableFailureCause cause) {
+    final ExecutorRepresenter scheduledExecutor;
+    while (true) {
+      final Optional<ExecutorRepresenter> optional = 
executorRegistry.findExecutorForTask(taskId);
+      if (optional.isPresent()) {
+        scheduledExecutor = optional.get();
+        break;
+      }
+    }
+    scheduler.onTaskStateChanged(scheduledExecutor.getExecutorId(), taskId, 
attemptIdx,
+        newState, null, cause);
   }
 
-  public static void sendTaskStateEventToScheduler(final Scheduler scheduler,
-                                                   final ExecutorRegistry 
executorRegistry,
-                                                   final String taskId,
-                                                   final TaskState.State 
newState,
-                                                   final int attemptIdx) {
+  static void sendTaskStateEventToScheduler(final Scheduler scheduler,
+                                            final ExecutorRegistry 
executorRegistry,
+                                            final String taskId,
+                                            final TaskState.State newState,
+                                            final int attemptIdx) {
     sendTaskStateEventToScheduler(scheduler, executorRegistry, taskId, 
newState, attemptIdx, null);
   }
 
-  public static void mockSchedulerRunner(final PendingTaskCollection 
pendingTaskCollection,
-                                         final SchedulingPolicy 
schedulingPolicy,
-                                         final JobStateManager jobStateManager,
-                                         final ExecutorRegistry 
executorRegistry,
-                                         final boolean isPartialSchedule) {
+  static void mockSchedulerRunner(final PendingTaskCollection 
pendingTaskCollection,
+                                  final SchedulingPolicy schedulingPolicy,
+                                  final JobStateManager jobStateManager,
+                                  final ExecutorRegistry executorRegistry,
+                                  final boolean isPartialSchedule) {
+    final SchedulerRunner schedulerRunner =
+        new SchedulerRunner(schedulingPolicy, pendingTaskCollection, 
executorRegistry);
+    schedulerRunner.scheduleJob(jobStateManager);
     while (!pendingTaskCollection.isEmpty()) {
-      final ScheduledTask taskToSchedule = pendingTaskCollection.remove(
-          
pendingTaskCollection.peekSchedulableTasks().get().iterator().next().getTaskId());
-
-      final Set<ExecutorRepresenter> runningExecutorRepresenter =
-          executorRegistry.getRunningExecutorIds().stream()
-              .map(executorId -> 
executorRegistry.getExecutorRepresenter(executorId))
-              .collect(Collectors.toSet());
-      final Set<ExecutorRepresenter> candidateExecutors =
-          
schedulingPolicy.filterExecutorRepresenters(runningExecutorRepresenter, 
taskToSchedule);
-      if (candidateExecutors.size() > 0) {
-        jobStateManager.onTaskStateChanged(taskToSchedule.getTaskId(),
-            TaskState.State.EXECUTING);
-        final ExecutorRepresenter executor = 
candidateExecutors.stream().findFirst().get();
-        executor.onTaskScheduled(taskToSchedule);
-      }
-
-      // Schedule only the first task.
+      schedulerRunner.doScheduleStage();
       if (isPartialSchedule) {
+        // Schedule only the first stage
         break;
       }
     }
   }
-
-  /**
-   * Retrieves the executor to which the given task was scheduled.
-   * @param taskId of the task to search.
-   * @param executorRegistry provides executor representers
-   * @return the {@link ExecutorRepresenter} of the executor the task was 
scheduled to.
-   */
-  private static ExecutorRepresenter findExecutorForTask(final 
ExecutorRegistry executorRegistry,
-                                                         final String taskId) {
-    for (final String executorId : executorRegistry.getRunningExecutorIds()) {
-      final ExecutorRepresenter executor = 
executorRegistry.getRunningExecutorRepresenter(executorId);
-      if (executor.getRunningTasks().contains(taskId)
-          || executor.getCompleteTasks().contains(taskId)) {
-        return executor;
-      }
-    }
-    return null;
-  }
 }
diff --git 
a/runtime/master/src/test/java/edu.snu.nemo.runtime.master/scheduler/SingleTaskQueueTest.java
 
b/runtime/master/src/test/java/edu.snu.nemo.runtime.master/scheduler/SingleTaskQueueTest.java
index b7b7a2ef..5aecc3d2 100644
--- 
a/runtime/master/src/test/java/edu.snu.nemo.runtime.master/scheduler/SingleTaskQueueTest.java
+++ 
b/runtime/master/src/test/java/edu.snu.nemo.runtime.master/scheduler/SingleTaskQueueTest.java
@@ -231,7 +231,7 @@ private String dequeueAndGetStageId() {
    */
   private ScheduledTask dequeue() {
     final Collection<ScheduledTask> scheduledTasks
-        = pendingTaskPriorityQueue.peekSchedulableTasks().get();
+        = pendingTaskPriorityQueue.peekSchedulableStage().get();
     return 
pendingTaskPriorityQueue.remove(scheduledTasks.iterator().next().getTaskId());
   }
 }
diff --git 
a/tests/src/test/java/edu/snu/nemo/tests/runtime/executor/datatransfer/DataTransferTest.java
 
b/tests/src/test/java/edu/snu/nemo/tests/runtime/executor/datatransfer/DataTransferTest.java
index 025a3935..998faf5f 100644
--- 
a/tests/src/test/java/edu/snu/nemo/tests/runtime/executor/datatransfer/DataTransferTest.java
+++ 
b/tests/src/test/java/edu/snu/nemo/tests/runtime/executor/datatransfer/DataTransferTest.java
@@ -138,9 +138,8 @@ public void setUp() throws InjectionException {
     final SchedulingPolicy schedulingPolicy = 
injector.getInstance(CompositeSchedulingPolicy.class);
     final PendingTaskCollection taskQueue = new SingleJobTaskCollection();
     final SchedulerRunner schedulerRunner = new 
SchedulerRunner(schedulingPolicy, taskQueue, executorRegistry);
-    final Scheduler scheduler =
-        new BatchSingleJobScheduler(schedulingPolicy, schedulerRunner, 
taskQueue, master,
-            pubSubEventHandler, updatePhysicalPlanEventHandler, 
executorRegistry);
+    final Scheduler scheduler = new BatchSingleJobScheduler(
+        schedulerRunner, taskQueue, master, pubSubEventHandler, 
updatePhysicalPlanEventHandler, executorRegistry);
     final AtomicInteger executorCount = new AtomicInteger(0);
 
     // Necessary for wiring up the message environments


 

----------------------------------------------------------------
This is an automated message from the Apache Git Service.
To respond to the message, please log on GitHub and use the
URL above to go to the specific comment.
 
For queries about this service, please contact Infrastructure at:
[email protected]


With regards,
Apache Git Services

Reply via email to