This is an automated email from the ASF dual-hosted git repository.

sanha pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/incubator-nemo.git


The following commit(s) were added to refs/heads/master by this push:
     new 0c42189  [NEMO-152] Support Multiple DAGs Execution in a Single User 
Program (#94)
0c42189 is described below

commit 0c4218994f5700272b2b826610d102e004e63b76
Author: Won Wook SONG <[email protected]>
AuthorDate: Thu Aug 9 13:56:45 2018 +0900

    [NEMO-152] Support Multiple DAGs Execution in a Single User Program (#94)
    
    JIRA: [NEMO-152: Support Multiple DAGs Execution in a Single User 
Program](https://issues.apache.org/jira/projects/NEMO/issues/NEMO-152)
    
    **Major changes:**
    - Divides the usage of the word 'Job' and 'Plan' to be more specific about 
what is being run. Before, a job had a single physical plan, so it was possible 
to use them with an identical name, but as jobs can contain multiple physical 
plans, it was necessary to divide them
    - Fixes the scheduler and the scheduler runner to accept multiple physical 
plans
    
    **Minor changes to note:**
    - Grammatical errors on the comments
    - The convert method on the DAG had almost no usages, and it was misleading 
on the way it was used, so it was removed.
    - I've allowed multiple spark contexts on an evaluator, so that an 
evaluator can be reused for multiple physical plans.
    
    **Tests for the changes:**
    - A sample program, performing a line count on one execution, and then word 
count on another, is added as an integration test. All existing tests pass with 
the changes.
    
    **Other comments:**
    - N/A
    
    resolves 
[NEMO-152](https://issues.apache.org/jira/projects/NEMO/issues/NEMO-152)
---
 bin/json2dot.py                                    |  18 +--
 .../java/edu/snu/nemo/client/ClientEndpoint.java   |  24 ++--
 .../java/edu/snu/nemo/client/DriverEndpoint.java   |  46 +++----
 .../java/edu/snu/nemo/client/StateTranslator.java  |  11 +-
 .../edu/snu/nemo/client/ClientEndpointTest.java    |  26 ++--
 .../src/main/java/edu/snu/nemo/common/dag/DAG.java |  12 --
 .../nemo/compiler/backend/nemo/NemoBackend.java    |   5 +-
 .../frontend/beam/BeamStateTranslator.java         |   4 +-
 .../compiler/frontend/beam/NemoPipelineResult.java |   2 +-
 .../compiler/frontend/spark/sql/SparkSession.java  |   3 +
 .../snu/nemo/compiler/optimizer/policy/Policy.java |   1 +
 .../compiler/backend/nemo/DAGConverterTest.java    |   2 +-
 .../java/edu/snu/nemo/examples/spark/MRJava.java   |  22 +++-
 .../snu/nemo/runtime/common/metric/JobMetric.java  |  10 +-
 .../edu/snu/nemo/runtime/common/plan/Task.java     |  18 +--
 .../common/state/{JobState.java => PlanState.java} |  16 +--
 .../edu/snu/nemo/driver/UserApplicationRunner.java |  10 +-
 .../nemo/runtime/master/BlockManagerMaster.java    |   5 +-
 ...{JobStateManager.java => PlanStateManager.java} | 139 +++++++++++----------
 .../edu/snu/nemo/runtime/master/RuntimeMaster.java |  24 ++--
 .../UpdatePhysicalPlanEventHandler.java            |   2 +-
 ...SingleJobScheduler.java => BatchScheduler.java} |  76 ++++++-----
 .../nemo/runtime/master/scheduler/Scheduler.java   |  16 +--
 .../runtime/master/scheduler/SchedulerRunner.java  |  24 ++--
 ...eManagerTest.java => PlanStateManagerTest.java} |  50 ++++----
 ...bSchedulerTest.java => BatchSchedulerTest.java} |  32 ++---
 .../master/scheduler/SchedulerTestUtil.java        |  14 +--
 .../runtime/master/scheduler/TaskRetryTest.java    |  50 ++++----
 .../runtime/common/plan/TestPlanGenerator.java     |   2 +-
 29 files changed, 336 insertions(+), 328 deletions(-)

diff --git a/bin/json2dot.py b/bin/json2dot.py
index 582f30a..f41146b 100755
--- a/bin/json2dot.py
+++ b/bin/json2dot.py
@@ -44,15 +44,15 @@ def stateToColor(state):
     except:
         return 'white'
 
-class JobState:
+class PlanState:
     def __init__(self, data):
-        self.id = data['jobId']
+        self.id = data['planId']
         self.stages = {}
         for stage in data['stages']:
             self.stages[stage['id']] = StageState(stage)
     @classmethod
     def empty(cls):
-        return cls({'jobId': None, 'stages': []})
+        return cls({'planId': None, 'stages': []})
     def get(self, id):
         try:
             return self.stages[id]
@@ -96,11 +96,11 @@ class DAG:
     A class for converting DAG to Graphviz representation.
     JSON representation should be formatted like what toString method in 
DAG.java does.
     '''
-    def __init__(self, dag, jobState):
+    def __init__(self, dag, planState):
         self.vertices = {}
         self.edges = []
         for vertex in dag['vertices']:
-            self.vertices[vertex['id']] = Vertex(vertex['id'], 
vertex['properties'], jobState.get(vertex['id']))
+            self.vertices[vertex['id']] = Vertex(vertex['id'], 
vertex['properties'], planState.get(vertex['id']))
         for edge in dag['edges']:
             self.edges.append(Edge(self.vertices[edge['src']], 
self.vertices[edge['dst']], edge['properties']))
     @property
@@ -178,7 +178,7 @@ class NormalVertex:
 class LoopVertex:
     def __init__(self, id, properties):
         self.id = id
-        self.dag = DAG(properties['DAG'], JobState.empty())
+        self.dag = DAG(properties['DAG'], PlanState.empty())
         self.remaining_iteration = properties['remainingIteration']
         self.executionProperties = properties['executionProperties']
         self.incoming = properties['dagIncomingEdges']
@@ -217,7 +217,7 @@ class Stage:
     def __init__(self, id, properties, state):
         self.id = id
         self.properties = properties
-        self.stageDAG = DAG(properties['irDag'], JobState.empty())
+        self.stageDAG = DAG(properties['irDag'], PlanState.empty())
         self.idx = getIdx()
         self.state = state
         self.executionProperties = self.properties['executionProperties']
@@ -316,9 +316,9 @@ class RuntimeEdge:
 
 def jsonToDot(jsonDict):
     try:
-        dag = DAG(jsonDict['dag'], JobState(jsonDict['jobState']))
+        dag = DAG(jsonDict['dag'], PlanState(jsonDict['planState']))
     except:
-        dag = DAG(jsonDict, JobState.empty())
+        dag = DAG(jsonDict, PlanState.empty())
     return 'digraph dag {compound=true; nodesep=1.0; forcelabels=true;' + 
dag.dot + '}'
 
 if __name__ == "__main__":
diff --git a/client/src/main/java/edu/snu/nemo/client/ClientEndpoint.java 
b/client/src/main/java/edu/snu/nemo/client/ClientEndpoint.java
index 2d27647..b31807a 100644
--- a/client/src/main/java/edu/snu/nemo/client/ClientEndpoint.java
+++ b/client/src/main/java/edu/snu/nemo/client/ClientEndpoint.java
@@ -15,7 +15,7 @@
  */
 package edu.snu.nemo.client;
 
-import edu.snu.nemo.runtime.common.state.JobState;
+import edu.snu.nemo.runtime.common.state.PlanState;
 
 import java.util.concurrent.TimeUnit;
 import java.util.concurrent.atomic.AtomicReference;
@@ -24,12 +24,12 @@ import java.util.concurrent.locks.Lock;
 import java.util.concurrent.locks.ReentrantLock;
 
 /**
- * A request endpoint in client side of a job.
+ * A request endpoint in client side of a plan.
  */
 public abstract class ClientEndpoint {
 
   /**
-   * The request endpoint in driver side of the job.
+   * The request endpoint in driver side of the plan.
    */
   private final AtomicReference<DriverEndpoint> driverEndpoint;
 
@@ -41,13 +41,13 @@ public abstract class ClientEndpoint {
   private static final long DEFAULT_DRIVER_WAIT_IN_MILLIS = 100;
 
   /**
-   * A {@link StateTranslator} for this job.
+   * A {@link StateTranslator} for this plan.
    */
   private final StateTranslator stateTranslator;
 
   /**
    * Constructor.
-   * @param stateTranslator translator to translate between the state of job 
and corresponding.
+   * @param stateTranslator translator to translate between the state of plan 
and corresponding.
    */
   public ClientEndpoint(final StateTranslator stateTranslator) {
     this.driverEndpoint = new AtomicReference<>();
@@ -57,7 +57,7 @@ public abstract class ClientEndpoint {
   }
 
   /**
-   * Connect the driver endpoint of this job.
+   * Connect the driver endpoint of this plan.
    * This method will be called by {@link DriverEndpoint}.
    *
    * @param dep connected with this client.
@@ -122,15 +122,15 @@ public abstract class ClientEndpoint {
   }
 
   /**
-   * Get the current state of the running job.
+   * Get the current state of the running plan.
    *
-   * @return the current state of the running job.
+   * @return the current state of the running plan.
    */
-  public final synchronized Enum getJobState() {
+  public final synchronized Enum getPlanState() {
     if (driverEndpoint.get() != null) {
       return stateTranslator.translateState(driverEndpoint.get().getState());
     } else {
-      return stateTranslator.translateState(JobState.State.READY);
+      return stateTranslator.translateState(PlanState.State.READY);
     }
   }
 
@@ -161,7 +161,7 @@ public abstract class ClientEndpoint {
         return stateTranslator.translateState(driverEndpoint.get().
             waitUntilFinish(timeout - unit.convert(consumedTime, 
TimeUnit.NANOSECONDS), unit));
       } else {
-        return JobState.State.READY;
+        return PlanState.State.READY;
       }
     }
   }
@@ -181,7 +181,7 @@ public abstract class ClientEndpoint {
       if (driverIsConnected) {
         return 
stateTranslator.translateState(driverEndpoint.get().waitUntilFinish());
       } else {
-        return JobState.State.READY;
+        return PlanState.State.READY;
       }
     }
   }
diff --git a/client/src/main/java/edu/snu/nemo/client/DriverEndpoint.java 
b/client/src/main/java/edu/snu/nemo/client/DriverEndpoint.java
index fc33eea..4338877 100644
--- a/client/src/main/java/edu/snu/nemo/client/DriverEndpoint.java
+++ b/client/src/main/java/edu/snu/nemo/client/DriverEndpoint.java
@@ -15,67 +15,67 @@
  */
 package edu.snu.nemo.client;
 
-import edu.snu.nemo.runtime.common.state.JobState;
-import edu.snu.nemo.runtime.master.JobStateManager;
+import edu.snu.nemo.runtime.common.state.PlanState;
+import edu.snu.nemo.runtime.master.PlanStateManager;
 
 import java.util.concurrent.TimeUnit;
 
 /**
- * A request endpoint in driver side of a job.
+ * A request endpoint in driver side of a plan.
  */
 public final class DriverEndpoint {
 
   /**
-   * The {@link JobStateManager} of the running job.
+   * The {@link PlanStateManager} of the running plan.
    */
-  private final JobStateManager jobStateManager;
+  private final PlanStateManager planStateManager;
 
   /**
-   * The {@link ClientEndpoint} of the job.
+   * The {@link ClientEndpoint} of the plan.
    */
   private final ClientEndpoint clientEndpoint;
 
   /**
    * Construct an endpoint in driver side.
    * This method will be called by {@link ClientEndpoint}.
-   * @param jobStateManager of running job.
-   * @param clientEndpoint of running job.
+   * @param planStateManager of running plan.
+   * @param clientEndpoint of running plan.
    */
-  public DriverEndpoint(final JobStateManager jobStateManager,
+  public DriverEndpoint(final PlanStateManager planStateManager,
                         final ClientEndpoint clientEndpoint) {
-    this.jobStateManager = jobStateManager;
+    this.planStateManager = planStateManager;
     this.clientEndpoint = clientEndpoint;
     clientEndpoint.connectDriver(this);
   }
 
   /**
-   * Get the current state of the running job.
+   * Get the current state of the running plan.
    * This method will be called by {@link ClientEndpoint}.
-   * @return the current state of the running job.
+   * @return the current state of the running plan.
    */
-  JobState.State getState() {
-    return jobStateManager.getJobState();
+  PlanState.State getState() {
+    return planStateManager.getPlanState();
   }
 
   /**
-   * Wait for this job to be finished and return the final state.
+   * Wait for this plan to be finished and return the final state.
    * It wait for at most the given time.
    * This method will be called by {@link ClientEndpoint}.
    * @param timeout of waiting.
    * @param unit of the timeout.
-   * @return the final state of this job.
+   * @return the final state of this plan.
    */
-  JobState.State waitUntilFinish(final long timeout,
-                                 final TimeUnit unit) {
-    return jobStateManager.waitUntilFinish(timeout, unit);
+  PlanState.State waitUntilFinish(final long timeout,
+                                  final TimeUnit unit) {
+    return planStateManager.waitUntilFinish(timeout, unit);
   }
 
   /**
-   * Wait for this job to be finished and return the final state.
+   * Wait for this plan to be finished and return the final state.
    * This method will be called by {@link ClientEndpoint}.
-   * @return the final state of this job.
+   * @return the final state of this plan.
    */
-  JobState.State waitUntilFinish() {
-    return jobStateManager.waitUntilFinish();
+  PlanState.State waitUntilFinish() {
+    return planStateManager.waitUntilFinish();
   }
 }
diff --git a/client/src/main/java/edu/snu/nemo/client/StateTranslator.java 
b/client/src/main/java/edu/snu/nemo/client/StateTranslator.java
index 35082d7..287a0a0 100644
--- a/client/src/main/java/edu/snu/nemo/client/StateTranslator.java
+++ b/client/src/main/java/edu/snu/nemo/client/StateTranslator.java
@@ -15,19 +15,18 @@
  */
 package edu.snu.nemo.client;
 
-import edu.snu.nemo.runtime.common.state.JobState;
+import edu.snu.nemo.runtime.common.state.PlanState;
 
 /**
- * A class provides the translation between the state of job and corresponding
- * {@link ClientEndpoint}.
+ * A class provides the translation between the state of plan and 
corresponding {@link ClientEndpoint}.
  */
 public interface StateTranslator {
 
   /**
-   * Translate a job state of nemo to a corresponding client endpoint state.
+   * Translate a plan state of nemo to a corresponding client endpoint state.
    *
-   * @param jobState to translate.
+   * @param planState to translate.
    * @return the translated state.
    */
-  Enum translateState(final JobState.State jobState);
+  Enum translateState(final PlanState.State planState);
 }
diff --git a/client/src/test/java/edu/snu/nemo/client/ClientEndpointTest.java 
b/client/src/test/java/edu/snu/nemo/client/ClientEndpointTest.java
index 3f0d1a0..744a992 100644
--- a/client/src/test/java/edu/snu/nemo/client/ClientEndpointTest.java
+++ b/client/src/test/java/edu/snu/nemo/client/ClientEndpointTest.java
@@ -16,10 +16,10 @@
 package edu.snu.nemo.client;
 
 import edu.snu.nemo.runtime.common.plan.PhysicalPlan;
-import edu.snu.nemo.runtime.common.state.JobState;
+import edu.snu.nemo.runtime.common.state.PlanState;
 import edu.snu.nemo.runtime.common.state.TaskState;
 import edu.snu.nemo.runtime.master.MetricMessageHandler;
-import edu.snu.nemo.runtime.master.JobStateManager;
+import edu.snu.nemo.runtime.master.PlanStateManager;
 import edu.snu.nemo.runtime.common.plan.TestPlanGenerator;
 import org.junit.Test;
 import org.junit.runner.RunWith;
@@ -50,32 +50,32 @@ public class ClientEndpointTest {
     final StateTranslator stateTranslator = mock(StateTranslator.class);
     when(stateTranslator.translateState(any())).then(state -> 
state.getArgument(0));
     final ClientEndpoint clientEndpoint = new 
TestClientEndpoint(stateTranslator);
-    assertEquals(clientEndpoint.getJobState(), JobState.State.READY);
+    assertEquals(clientEndpoint.getPlanState(), PlanState.State.READY);
 
     // Wait for connection but not connected.
-    assertEquals(clientEndpoint.waitUntilJobFinish(100, 
TimeUnit.MILLISECONDS), JobState.State.READY);
+    assertEquals(clientEndpoint.waitUntilJobFinish(100, 
TimeUnit.MILLISECONDS), PlanState.State.READY);
 
-    // Create a JobStateManager of a dag and create a DriverEndpoint with it.
+    // Create a PlanStateManager of a dag and create a DriverEndpoint with it.
     final PhysicalPlan physicalPlan =
         
TestPlanGenerator.generatePhysicalPlan(TestPlanGenerator.PlanType.TwoVerticesJoined,
 false);
-    final JobStateManager jobStateManager =
-        new JobStateManager(physicalPlan, metricMessageHandler, 
MAX_SCHEDULE_ATTEMPT);
+    final PlanStateManager planStateManager =
+        new PlanStateManager(physicalPlan, metricMessageHandler, 
MAX_SCHEDULE_ATTEMPT);
 
-    final DriverEndpoint driverEndpoint = new DriverEndpoint(jobStateManager, 
clientEndpoint);
+    final DriverEndpoint driverEndpoint = new DriverEndpoint(planStateManager, 
clientEndpoint);
 
     // Check the current state.
-    assertEquals(clientEndpoint.getJobState(), JobState.State.EXECUTING);
+    assertEquals(clientEndpoint.getPlanState(), PlanState.State.EXECUTING);
 
     // Wait for the job to finish but not finished
-    assertEquals(clientEndpoint.waitUntilJobFinish(100, 
TimeUnit.MILLISECONDS), JobState.State.EXECUTING);
+    assertEquals(clientEndpoint.waitUntilJobFinish(100, 
TimeUnit.MILLISECONDS), PlanState.State.EXECUTING);
 
     // Check finish.
     final List<String> tasks = 
physicalPlan.getStageDAG().getTopologicalSort().stream()
         .flatMap(stage -> stage.getTaskIds().stream())
         .collect(Collectors.toList());
-    tasks.forEach(taskId -> jobStateManager.onTaskStateChanged(taskId, 
TaskState.State.EXECUTING));
-    tasks.forEach(taskId -> jobStateManager.onTaskStateChanged(taskId, 
TaskState.State.COMPLETE));
-    assertEquals(JobState.State.COMPLETE, clientEndpoint.waitUntilJobFinish());
+    tasks.forEach(taskId -> planStateManager.onTaskStateChanged(taskId, 
TaskState.State.EXECUTING));
+    tasks.forEach(taskId -> planStateManager.onTaskStateChanged(taskId, 
TaskState.State.COMPLETE));
+    assertEquals(PlanState.State.COMPLETE, 
clientEndpoint.waitUntilJobFinish());
   }
 
   /**
diff --git a/common/src/main/java/edu/snu/nemo/common/dag/DAG.java 
b/common/src/main/java/edu/snu/nemo/common/dag/DAG.java
index aeaea6f..593a99b 100644
--- a/common/src/main/java/edu/snu/nemo/common/dag/DAG.java
+++ b/common/src/main/java/edu/snu/nemo/common/dag/DAG.java
@@ -27,7 +27,6 @@ import java.io.PrintWriter;
 import java.io.Serializable;
 import java.util.*;
 import java.util.function.Consumer;
-import java.util.function.Function;
 import java.util.function.Predicate;
 import java.util.stream.Collectors;
 
@@ -84,17 +83,6 @@ public final class DAG<V extends Vertex, E extends Edge<V>> 
implements Serializa
   }
 
   /**
-   * Converts a DAG into another DAG according to a function.
-   * @param function to apply when converting a DAG to another.
-   * @param <V2> the converted DAG's vertex type.
-   * @param <E2> the converted DAG's edge type.
-   * @return the converted DAG.
-   */
-  public <V2 extends Vertex, E2 extends Edge<V2>> DAG<V2, E2> convert(final 
Function<DAG<V, E>, DAG<V2, E2>> function) {
-    return function.apply(this);
-  }
-
-  /**
    * Retrieves the vertex given its ID.
    * @param id of the vertex to retrieve
    * @return the vertex
diff --git 
a/compiler/backend/src/main/java/edu/snu/nemo/compiler/backend/nemo/NemoBackend.java
 
b/compiler/backend/src/main/java/edu/snu/nemo/compiler/backend/nemo/NemoBackend.java
index 465fb77..3e933a2 100644
--- 
a/compiler/backend/src/main/java/edu/snu/nemo/compiler/backend/nemo/NemoBackend.java
+++ 
b/compiler/backend/src/main/java/edu/snu/nemo/compiler/backend/nemo/NemoBackend.java
@@ -60,8 +60,7 @@ public final class NemoBackend implements 
Backend<PhysicalPlan> {
    */
   public PhysicalPlan compile(final DAG<IRVertex, IREdge> irDAG,
                               final PhysicalPlanGenerator 
physicalPlanGenerator) {
-    final DAG<Stage, StageEdge> stageDAG = 
irDAG.convert(physicalPlanGenerator);
-    final PhysicalPlan physicalPlan = new 
PhysicalPlan(RuntimeIdGenerator.generatePhysicalPlanId(), stageDAG);
-    return physicalPlan;
+    final DAG<Stage, StageEdge> stageDAG = physicalPlanGenerator.apply(irDAG);
+    return new PhysicalPlan(RuntimeIdGenerator.generatePhysicalPlanId(), 
stageDAG);
   }
 }
diff --git 
a/compiler/frontend/beam/src/main/java/edu/snu/nemo/compiler/frontend/beam/BeamStateTranslator.java
 
b/compiler/frontend/beam/src/main/java/edu/snu/nemo/compiler/frontend/beam/BeamStateTranslator.java
index 0212c20..3f420e8 100644
--- 
a/compiler/frontend/beam/src/main/java/edu/snu/nemo/compiler/frontend/beam/BeamStateTranslator.java
+++ 
b/compiler/frontend/beam/src/main/java/edu/snu/nemo/compiler/frontend/beam/BeamStateTranslator.java
@@ -16,7 +16,7 @@
 package edu.snu.nemo.compiler.frontend.beam;
 
 import edu.snu.nemo.client.StateTranslator;
-import edu.snu.nemo.runtime.common.state.JobState;
+import edu.snu.nemo.runtime.common.state.PlanState;
 
 import static org.apache.beam.sdk.PipelineResult.State.*;
 
@@ -33,7 +33,7 @@ public final class BeamStateTranslator implements 
StateTranslator {
    * @return the translated state.
    */
   @Override
-  public Enum translateState(final JobState.State jobState) {
+  public Enum translateState(final PlanState.State jobState) {
     switch (jobState) {
       case READY:
         return RUNNING;
diff --git 
a/compiler/frontend/beam/src/main/java/edu/snu/nemo/compiler/frontend/beam/NemoPipelineResult.java
 
b/compiler/frontend/beam/src/main/java/edu/snu/nemo/compiler/frontend/beam/NemoPipelineResult.java
index ffd09cf..cf0ab51 100644
--- 
a/compiler/frontend/beam/src/main/java/edu/snu/nemo/compiler/frontend/beam/NemoPipelineResult.java
+++ 
b/compiler/frontend/beam/src/main/java/edu/snu/nemo/compiler/frontend/beam/NemoPipelineResult.java
@@ -37,7 +37,7 @@ public final class NemoPipelineResult extends ClientEndpoint 
implements Pipeline
 
   @Override
   public State getState() {
-    return (State) super.getJobState();
+    return (State) super.getPlanState();
   }
 
   @Override
diff --git 
a/compiler/frontend/spark/src/main/java/edu/snu/nemo/compiler/frontend/spark/sql/SparkSession.java
 
b/compiler/frontend/spark/src/main/java/edu/snu/nemo/compiler/frontend/spark/sql/SparkSession.java
index 65515cc..9393ec6 100644
--- 
a/compiler/frontend/spark/src/main/java/edu/snu/nemo/compiler/frontend/spark/sql/SparkSession.java
+++ 
b/compiler/frontend/spark/src/main/java/edu/snu/nemo/compiler/frontend/spark/sql/SparkSession.java
@@ -321,6 +321,9 @@ public final class SparkSession extends 
org.apache.spark.sql.SparkSession implem
       if (!options.containsKey("spark.master")) { // default spark_master 
option.
         return this.master("local[*]").getOrCreate();
       }
+      if (!options.containsKey("spark.driver.allowMultipleContexts")) {
+        return this.config("spark.driver.allowMultipleContexts", 
"true").getOrCreate();
+      }
 
       
UserGroupInformation.setLoginUser(UserGroupInformation.createRemoteUser("ubuntu"));
 
diff --git 
a/compiler/optimizer/src/main/java/edu/snu/nemo/compiler/optimizer/policy/Policy.java
 
b/compiler/optimizer/src/main/java/edu/snu/nemo/compiler/optimizer/policy/Policy.java
index 6516012..60d897b 100644
--- 
a/compiler/optimizer/src/main/java/edu/snu/nemo/compiler/optimizer/policy/Policy.java
+++ 
b/compiler/optimizer/src/main/java/edu/snu/nemo/compiler/optimizer/policy/Policy.java
@@ -26,6 +26,7 @@ import java.io.Serializable;
 /**
  * An interface for policies, each of which is composed of a list of static 
optimization passes.
  * The list of static optimization passes are run in the order provided by the 
implementation.
+ * Most policies follow the implementation in {@link PolicyImpl}.
  */
 public interface Policy extends Serializable {
   /**
diff --git 
a/compiler/test/src/test/java/edu/snu/nemo/compiler/backend/nemo/DAGConverterTest.java
 
b/compiler/test/src/test/java/edu/snu/nemo/compiler/backend/nemo/DAGConverterTest.java
index b4237bf..52cc006 100644
--- 
a/compiler/test/src/test/java/edu/snu/nemo/compiler/backend/nemo/DAGConverterTest.java
+++ 
b/compiler/test/src/test/java/edu/snu/nemo/compiler/backend/nemo/DAGConverterTest.java
@@ -76,7 +76,7 @@ public final class DAGConverterTest {
     final DAG<IRVertex, IREdge> irDAG = new 
TestPolicy().runCompileTimeOptimization(
         irDAGBuilder.buildWithoutSourceSinkCheck(), DAG.EMPTY_DAG_DIRECTORY);
     final DAG<Stage, StageEdge> DAGOfStages = 
physicalPlanGenerator.stagePartitionIrDAG(irDAG);
-    final DAG<Stage, StageEdge> physicalDAG = 
irDAG.convert(physicalPlanGenerator);
+    final DAG<Stage, StageEdge> physicalDAG = 
physicalPlanGenerator.apply(irDAG);
 
     // Test DAG of stages
     final List<Stage> sortedDAGOfStages = DAGOfStages.getTopologicalSort();
diff --git 
a/examples/spark/src/test/java/edu/snu/nemo/examples/spark/MRJava.java 
b/examples/spark/src/test/java/edu/snu/nemo/examples/spark/MRJava.java
index e49dd0c..31ac261 100644
--- a/examples/spark/src/test/java/edu/snu/nemo/examples/spark/MRJava.java
+++ b/examples/spark/src/test/java/edu/snu/nemo/examples/spark/MRJava.java
@@ -67,5 +67,25 @@ public final class MRJava {
     }
   }
 
-  // TODO #152: enable execution of multiple jobs (call scheduleJob multiple 
times with caching).
+  @Test(timeout = TIMEOUT)
+  public void testSparkWordAndLineCount() throws Exception {
+    final String inputFileName = "test_input_wordcount_spark";
+    final String outputFileName = "test_output_wordcount_spark";
+    final String expectedOutputFilename = 
"expected_output_word_and_line_count";
+    final String inputFilePath = fileBasePath + inputFileName;
+    final String outputFilePath = fileBasePath + outputFileName;
+
+    JobLauncher.main(builder
+        .addJobId(JavaWordAndLineCount.class.getSimpleName() + "_test")
+        .addUserMain(JavaWordAndLineCount.class.getCanonicalName())
+        .addUserArgs(inputFilePath, outputFilePath)
+        .addOptimizationPolicy(DefaultPolicy.class.getCanonicalName())
+        .build());
+
+    try {
+      ExampleTestUtil.ensureOutputValidity(fileBasePath, outputFileName, 
expectedOutputFilename);
+    } finally {
+      ExampleTestUtil.deleteOutputFile(fileBasePath, outputFileName);
+    }
+  }
 }
diff --git 
a/runtime/common/src/main/java/edu/snu/nemo/runtime/common/metric/JobMetric.java
 
b/runtime/common/src/main/java/edu/snu/nemo/runtime/common/metric/JobMetric.java
index fea3a20..ffedf21 100644
--- 
a/runtime/common/src/main/java/edu/snu/nemo/runtime/common/metric/JobMetric.java
+++ 
b/runtime/common/src/main/java/edu/snu/nemo/runtime/common/metric/JobMetric.java
@@ -20,7 +20,7 @@ import com.fasterxml.jackson.databind.JsonNode;
 import com.fasterxml.jackson.databind.ObjectMapper;
 import edu.snu.nemo.common.dag.DAG;
 import edu.snu.nemo.runtime.common.plan.PhysicalPlan;
-import edu.snu.nemo.runtime.common.state.JobState;
+import edu.snu.nemo.runtime.common.state.PlanState;
 
 import java.io.IOException;
 import java.util.ArrayList;
@@ -29,9 +29,9 @@ import java.util.List;
 /**
  * Metric class for Job (or {@link PhysicalPlan}).
  */
-public final class JobMetric implements StateMetric<JobState.State> {
+public final class JobMetric implements StateMetric<PlanState.State> {
   private String id;
-  private List<StateTransitionEvent<JobState.State>> stateTransitionEvents = 
new ArrayList<>();
+  private List<StateTransitionEvent<PlanState.State>> stateTransitionEvents = 
new ArrayList<>();
   private JsonNode stageDagJson;
 
   public JobMetric(final PhysicalPlan physicalPlan) {
@@ -63,12 +63,12 @@ public final class JobMetric implements 
StateMetric<JobState.State> {
   }
 
   @Override
-  public List<StateTransitionEvent<JobState.State>> getStateTransitionEvents() 
{
+  public List<StateTransitionEvent<PlanState.State>> 
getStateTransitionEvents() {
     return stateTransitionEvents;
   }
 
   @Override
-  public void addEvent(final JobState.State prevState, final JobState.State 
newState) {
+  public void addEvent(final PlanState.State prevState, final PlanState.State 
newState) {
     stateTransitionEvents.add(new 
StateTransitionEvent<>(System.currentTimeMillis(), prevState, newState));
   }
 
diff --git 
a/runtime/common/src/main/java/edu/snu/nemo/runtime/common/plan/Task.java 
b/runtime/common/src/main/java/edu/snu/nemo/runtime/common/plan/Task.java
index 7663a8c..0cfe863 100644
--- a/runtime/common/src/main/java/edu/snu/nemo/runtime/common/plan/Task.java
+++ b/runtime/common/src/main/java/edu/snu/nemo/runtime/common/plan/Task.java
@@ -28,7 +28,7 @@ import java.util.Optional;
  * A Task is a self-contained executable that can be executed on a machine.
  */
 public final class Task implements Serializable {
-  private final String jobId;
+  private final String planId;
   private final String taskId;
   private final List<StageEdge> taskIncomingEdges;
   private final List<StageEdge> taskOutgoingEdges;
@@ -40,7 +40,7 @@ public final class Task implements Serializable {
   /**
    * Constructor.
    *
-   * @param jobId                the id of the job.
+   * @param planId               the id of the physical plan.
    * @param taskId               the ID of the task.
    * @param attemptIdx           the attempt index.
    * @param executionProperties  {@link VertexExecutionProperty} map for the 
corresponding stage
@@ -49,7 +49,7 @@ public final class Task implements Serializable {
    * @param taskOutgoingEdges    the outgoing edges of the task.
    * @param irVertexIdToReadable the map between IRVertex id to readable.
    */
-  public Task(final String jobId,
+  public Task(final String planId,
               final String taskId,
               final int attemptIdx,
               final ExecutionPropertyMap<VertexExecutionProperty> 
executionProperties,
@@ -57,7 +57,7 @@ public final class Task implements Serializable {
               final List<StageEdge> taskIncomingEdges,
               final List<StageEdge> taskOutgoingEdges,
               final Map<String, Readable> irVertexIdToReadable) {
-    this.jobId = jobId;
+    this.planId = planId;
     this.taskId = taskId;
     this.attemptIdx = attemptIdx;
     this.executionProperties = executionProperties;
@@ -68,10 +68,10 @@ public final class Task implements Serializable {
   }
 
   /**
-   * @return the id of the job.
+   * @return the id of the plan.
    */
-  public String getJobId() {
-    return jobId;
+  public String getPlanId() {
+    return planId;
   }
 
   /**
@@ -138,8 +138,8 @@ public final class Task implements Serializable {
   @Override
   public String toString() {
     final StringBuilder sb = new StringBuilder();
-    sb.append("jobId: ");
-    sb.append(jobId);
+    sb.append("planId: ");
+    sb.append(planId);
     sb.append(" / taskId: ");
     sb.append(taskId);
     sb.append(" / attempt: ");
diff --git 
a/runtime/common/src/main/java/edu/snu/nemo/runtime/common/state/JobState.java 
b/runtime/common/src/main/java/edu/snu/nemo/runtime/common/state/PlanState.java
similarity index 79%
rename from 
runtime/common/src/main/java/edu/snu/nemo/runtime/common/state/JobState.java
rename to 
runtime/common/src/main/java/edu/snu/nemo/runtime/common/state/PlanState.java
index 6d65691..7edafab 100644
--- 
a/runtime/common/src/main/java/edu/snu/nemo/runtime/common/state/JobState.java
+++ 
b/runtime/common/src/main/java/edu/snu/nemo/runtime/common/state/PlanState.java
@@ -20,10 +20,10 @@ import edu.snu.nemo.common.StateMachine;
 /**
  * Represents the states and their transitions of a physical plan.
  */
-public final class JobState {
+public final class PlanState {
   private final StateMachine stateMachine;
 
-  public JobState() {
+  public PlanState() {
     stateMachine = buildTaskStateMachine();
   }
 
@@ -31,16 +31,16 @@ public final class JobState {
     final StateMachine.Builder stateMachineBuilder = StateMachine.newBuilder();
 
     // Add states
-    stateMachineBuilder.addState(State.READY, "The job has been created and 
submitted to runtime.");
-    stateMachineBuilder.addState(State.EXECUTING, "The job is executing (with 
its stages executing).");
-    stateMachineBuilder.addState(State.COMPLETE, "The job is complete.");
-    stateMachineBuilder.addState(State.FAILED, "Job failed.");
+    stateMachineBuilder.addState(State.READY, "The plan has been created and 
submitted to runtime.");
+    stateMachineBuilder.addState(State.EXECUTING, "The plan is executing (with 
its stages executing).");
+    stateMachineBuilder.addState(State.COMPLETE, "The plan is complete.");
+    stateMachineBuilder.addState(State.FAILED, "Plan failed.");
 
     // Add transitions
     stateMachineBuilder.addTransition(State.READY, State.EXECUTING,
         "Begin executing!");
     stateMachineBuilder.addTransition(State.EXECUTING, State.COMPLETE,
-        "All stages complete, job complete");
+        "All stages complete, plan complete");
     stateMachineBuilder.addTransition(State.EXECUTING, State.FAILED,
         "Unrecoverable failure in a stage");
 
@@ -54,7 +54,7 @@ public final class JobState {
   }
 
   /**
-   * JobState.
+   * PlanState.
    */
   public enum State {
     READY,
diff --git 
a/runtime/driver/src/main/java/edu/snu/nemo/driver/UserApplicationRunner.java 
b/runtime/driver/src/main/java/edu/snu/nemo/driver/UserApplicationRunner.java
index 52ea468..6863b6f 100644
--- 
a/runtime/driver/src/main/java/edu/snu/nemo/driver/UserApplicationRunner.java
+++ 
b/runtime/driver/src/main/java/edu/snu/nemo/driver/UserApplicationRunner.java
@@ -26,7 +26,7 @@ import edu.snu.nemo.compiler.backend.nemo.NemoBackend;
 import edu.snu.nemo.compiler.optimizer.policy.Policy;
 import edu.snu.nemo.conf.JobConf;
 import edu.snu.nemo.runtime.common.plan.PhysicalPlan;
-import edu.snu.nemo.runtime.master.JobStateManager;
+import edu.snu.nemo.runtime.master.PlanStateManager;
 import edu.snu.nemo.runtime.master.RuntimeMaster;
 import org.apache.commons.lang3.SerializationUtils;
 import org.apache.reef.tang.Injector;
@@ -102,16 +102,16 @@ public final class UserApplicationRunner {
       physicalPlan.getStageDAG().storeJSON(dagDirectory, "plan", "physical 
execution plan by compiler");
 
       // Execute!
-      final Pair<JobStateManager, ScheduledExecutorService> executionResult =
+      final Pair<PlanStateManager, ScheduledExecutorService> executionResult =
           runtimeMaster.execute(physicalPlan, maxScheduleAttempt);
 
       // Wait for the job to finish and stop logging
-      final JobStateManager jobStateManager = executionResult.left();
+      final PlanStateManager planStateManager = executionResult.left();
       final ScheduledExecutorService dagLoggingExecutor = 
executionResult.right();
-      jobStateManager.waitUntilFinish();
+      planStateManager.waitUntilFinish();
       dagLoggingExecutor.shutdown();
 
-      jobStateManager.storeJSON(dagDirectory, "final");
+      planStateManager.storeJSON(dagDirectory, "final");
       LOG.info("{} is complete!", physicalPlan.getId());
     } catch (final Exception e) {
       throw new RuntimeException(e);
diff --git 
a/runtime/master/src/main/java/edu/snu/nemo/runtime/master/BlockManagerMaster.java
 
b/runtime/master/src/main/java/edu/snu/nemo/runtime/master/BlockManagerMaster.java
index f07fe1b..5461039 100644
--- 
a/runtime/master/src/main/java/edu/snu/nemo/runtime/master/BlockManagerMaster.java
+++ 
b/runtime/master/src/main/java/edu/snu/nemo/runtime/master/BlockManagerMaster.java
@@ -50,6 +50,7 @@ import org.slf4j.Logger;
 import org.slf4j.LoggerFactory;
 
 import static edu.snu.nemo.runtime.common.state.BlockState.State.IN_PROGRESS;
+import static edu.snu.nemo.runtime.common.state.BlockState.State.NOT_AVAILABLE;
 
 /**
  * Master-side block manager.
@@ -229,8 +230,8 @@ public final class BlockManagerMaster {
     try {
       if (producerTaskIdToBlockIds.containsKey(scheduledTaskId)) {
         producerTaskIdToBlockIds.get(scheduledTaskId).forEach(blockId -> {
-          if (!blockIdToMetadata.get(blockId).getBlockState()
-              .getStateMachine().getCurrentState().equals(IN_PROGRESS)) {
+          if (blockIdToMetadata.get(blockId).getBlockState()
+              .getStateMachine().getCurrentState().equals(NOT_AVAILABLE)) {
             onBlockStateChanged(blockId, IN_PROGRESS, null);
           }
         });
diff --git 
a/runtime/master/src/main/java/edu/snu/nemo/runtime/master/JobStateManager.java 
b/runtime/master/src/main/java/edu/snu/nemo/runtime/master/PlanStateManager.java
similarity index 73%
rename from 
runtime/master/src/main/java/edu/snu/nemo/runtime/master/JobStateManager.java
rename to 
runtime/master/src/main/java/edu/snu/nemo/runtime/master/PlanStateManager.java
index 9ef2f8e..dd9f302 100644
--- 
a/runtime/master/src/main/java/edu/snu/nemo/runtime/master/JobStateManager.java
+++ 
b/runtime/master/src/main/java/edu/snu/nemo/runtime/master/PlanStateManager.java
@@ -23,7 +23,7 @@ import edu.snu.nemo.common.StateMachine;
 import edu.snu.nemo.runtime.common.RuntimeIdGenerator;
 import edu.snu.nemo.runtime.common.plan.PhysicalPlan;
 import edu.snu.nemo.runtime.common.plan.Stage;
-import edu.snu.nemo.runtime.common.state.JobState;
+import edu.snu.nemo.runtime.common.state.PlanState;
 import edu.snu.nemo.runtime.common.state.StageState;
 
 import java.io.File;
@@ -48,23 +48,23 @@ import javax.annotation.concurrent.ThreadSafe;
 import static edu.snu.nemo.common.dag.DAG.EMPTY_DAG_DIRECTORY;
 
 /**
- * Maintains three levels of state machines (JobState, StageState, and 
TaskState) of a physical plan.
+ * Maintains three levels of state machines (PlanState, StageState, and 
TaskState) of a physical plan.
  * The main API this class provides is onTaskStateReportFromExecutor(), which 
directly changes a TaskState.
- * JobState and StageState are updated internally in the class, and can only 
be read from the outside.
+ * PlanState and StageState are updated internally in the class, and can only 
be read from the outside.
  *
  * (CONCURRENCY) The public methods of this class are synchronized.
  */
 @DriverSide
 @ThreadSafe
-public final class JobStateManager {
-  private static final Logger LOG = 
LoggerFactory.getLogger(JobStateManager.class.getName());
-  private final String jobId;
+public final class PlanStateManager {
+  private static final Logger LOG = 
LoggerFactory.getLogger(PlanStateManager.class.getName());
+  private final String planId;
   private final int maxScheduleAttempt;
 
   /**
-   * The data structures below track the execution states of this job.
+   * The data structures below track the execution states of this plan.
    */
-  private final JobState jobState;
+  private final PlanState planState;
   private final Map<String, StageState> idToStageStates;
   private final Map<String, TaskState> idToTaskStates;
 
@@ -75,15 +75,15 @@ public final class JobStateManager {
   private final Map<String, Integer> taskIdToCurrentAttempt;
 
   /**
-   * Represents the job to manage.
+   * Represents the plan to manage.
    */
   private final PhysicalPlan physicalPlan;
 
   /**
-   * A lock and condition to check whether the job is finished or not.
+   * A lock and condition to check whether the plan is finished or not.
    */
   private final Lock finishLock;
-  private final Condition jobFinishedCondition;
+  private final Condition planFinishedCondition;
 
   /**
    * For metrics.
@@ -92,33 +92,33 @@ public final class JobStateManager {
 
   private MetricStore metricStore;
 
-  public JobStateManager(final PhysicalPlan physicalPlan,
-                         final MetricMessageHandler metricMessageHandler,
-                         final int maxScheduleAttempt) {
-    this.jobId = physicalPlan.getId();
+  public PlanStateManager(final PhysicalPlan physicalPlan,
+                          final MetricMessageHandler metricMessageHandler,
+                          final int maxScheduleAttempt) {
+    this.planId = physicalPlan.getId();
     this.physicalPlan = physicalPlan;
     this.metricMessageHandler = metricMessageHandler;
     this.maxScheduleAttempt = maxScheduleAttempt;
-    this.jobState = new JobState();
+    this.planState = new PlanState();
     this.idToStageStates = new HashMap<>();
     this.idToTaskStates = new HashMap<>();
     this.taskIdToCurrentAttempt = new HashMap<>();
     this.finishLock = new ReentrantLock();
-    this.jobFinishedCondition = finishLock.newCondition();
+    this.planFinishedCondition = finishLock.newCondition();
     this.metricStore = MetricStore.getStore();
 
-    metricStore.getOrCreateMetric(JobMetric.class, 
jobId).setStageDAG(physicalPlan.getStageDAG());
-    metricStore.triggerBroadcast(JobMetric.class, jobId);
+    metricStore.getOrCreateMetric(JobMetric.class, 
planId).setStageDAG(physicalPlan.getStageDAG());
+    metricStore.triggerBroadcast(JobMetric.class, planId);
     initializeComputationStates();
   }
 
   /**
-   * Initializes the states for the job/stages/tasks for this job.
+   * Initializes the states for the plan/stages/tasks for this plan.
    */
   private void initializeComputationStates() {
-    onJobStateChanged(JobState.State.EXECUTING);
+    onPlanStateChanged(PlanState.State.EXECUTING);
 
-    // Initialize the states for the job down to task-level.
+    // Initialize the states for the plan down to task-level.
     physicalPlan.getStageDAG().topologicalDo(stage -> {
       idToStageStates.put(stage.getId(), new StageState());
       stage.getTaskIds().forEach(taskId -> {
@@ -132,9 +132,9 @@ public final class JobStateManager {
    * Updates the state of a task.
    * Task state changes can occur both in master and executor.
    * State changes that occur in master are
-   * initiated in {@link 
edu.snu.nemo.runtime.master.scheduler.BatchSingleJobScheduler}.
+   * initiated in {@link edu.snu.nemo.runtime.master.scheduler.BatchScheduler}.
    * State changes that occur in executors are sent to master as a control 
message,
-   * and the call to this method is initiated in {@link 
edu.snu.nemo.runtime.master.scheduler.BatchSingleJobScheduler}
+   * and the call to this method is initiated in {@link 
edu.snu.nemo.runtime.master.scheduler.BatchScheduler}
    * when the message/event is received.
    *
    * @param taskId  the ID of the task.
@@ -226,97 +226,98 @@ public final class JobStateManager {
         new Object[]{stageId, stageStateMachine.getCurrentState(), 
newStageState});
     stageStateMachine.setState(newStageState);
 
-    // Change job state if needed
+    // Change plan state if needed
     final boolean allStagesCompleted = 
idToStageStates.values().stream().allMatch(state ->
         
state.getStateMachine().getCurrentState().equals(StageState.State.COMPLETE));
     if (allStagesCompleted) {
-      onJobStateChanged(JobState.State.COMPLETE);
+      onPlanStateChanged(PlanState.State.COMPLETE);
     }
   }
 
   /**
    * (PRIVATE METHOD)
-   * Updates the state of the job.
-   * @param newState of the job.
+   * Updates the state of the plan.
+   * @param newState of the plan.
    */
-  private void onJobStateChanged(final JobState.State newState) {
-    metricStore.getOrCreateMetric(JobMetric.class, jobId)
-        .addEvent((JobState.State) 
jobState.getStateMachine().getCurrentState(), newState);
-    metricStore.triggerBroadcast(JobMetric.class, jobId);
+  private void onPlanStateChanged(final PlanState.State newState) {
+    metricStore.getOrCreateMetric(JobMetric.class, planId)
+        .addEvent((PlanState.State) 
planState.getStateMachine().getCurrentState(), newState);
+    metricStore.triggerBroadcast(JobMetric.class, planId);
 
-    jobState.getStateMachine().setState(newState);
+    planState.getStateMachine().setState(newState);
 
-    if (newState == JobState.State.EXECUTING) {
-      LOG.debug("Executing Job ID {}...", this.jobId);
-    } else if (newState == JobState.State.COMPLETE || newState == 
JobState.State.FAILED) {
-      LOG.debug("Job ID {} {}!", new Object[]{jobId, newState});
+    if (newState == PlanState.State.EXECUTING) {
+      LOG.debug("Executing Plan ID {}...", this.planId);
+    } else if (newState == PlanState.State.COMPLETE || newState == 
PlanState.State.FAILED) {
+      LOG.debug("Plan ID {} {}!", new Object[]{planId, newState});
 
-      // Awake all threads waiting the finish of this job.
+      // Awake all threads waiting the finish of this plan.
       finishLock.lock();
 
       try {
-        jobFinishedCondition.signalAll();
+        planFinishedCondition.signalAll();
       } finally {
         finishLock.unlock();
       }
     } else {
-      throw new IllegalStateTransitionException(new Exception("Illegal Job 
State Transition"));
+      throw new IllegalStateTransitionException(new Exception("Illegal Plan 
State Transition"));
     }
   }
 
 
   /**
-   * Wait for this job to be finished and return the final state.
-   * @return the final state of this job.
+   * Wait for this plan to be finished and return the final state.
+   * @return the final state of this plan.
    */
-  public JobState.State waitUntilFinish() {
+  public PlanState.State waitUntilFinish() {
     finishLock.lock();
     try {
-      if (!isJobDone()) {
-        jobFinishedCondition.await();
+      if (!isPlanDone()) {
+        planFinishedCondition.await();
       }
     } catch (final InterruptedException e) {
-      LOG.warn("Interrupted during waiting the finish of Job ID {}", jobId);
+      LOG.warn("Interrupted during waiting the finish of Plan ID {}", planId);
       Thread.currentThread().interrupt();
     } finally {
       finishLock.unlock();
     }
-    return getJobState();
+    return getPlanState();
   }
 
   /**
-   * Wait for this job to be finished and return the final state.
+   * Wait for this plan to be finished and return the final state.
    * It wait for at most the given time.
    * @param timeout of waiting.
    * @param unit of the timeout.
-   * @return the final state of this job.
+   * @return the final state of this plan.
    */
-  public JobState.State waitUntilFinish(final long timeout, final TimeUnit 
unit) {
+  public PlanState.State waitUntilFinish(final long timeout, final TimeUnit 
unit) {
     finishLock.lock();
     try {
-      if (!isJobDone()) {
-        if (!jobFinishedCondition.await(timeout, unit)) {
-          LOG.warn("Timeout during waiting the finish of Job ID {}", jobId);
+      if (!isPlanDone()) {
+        if (!planFinishedCondition.await(timeout, unit)) {
+          LOG.warn("Timeout during waiting the finish of Plan ID {}", planId);
         }
       }
     } catch (final InterruptedException e) {
-      LOG.warn("Interrupted during waiting the finish of Job ID {}", jobId);
+      LOG.warn("Interrupted during waiting the finish of Plan ID {}", planId);
       Thread.currentThread().interrupt();
     } finally {
       finishLock.unlock();
     }
-    return getJobState();
+    return getPlanState();
   }
 
-  public synchronized boolean isJobDone() {
-    return (getJobState() == JobState.State.COMPLETE || getJobState() == 
JobState.State.FAILED);
+  public synchronized boolean isPlanDone() {
+    return (getPlanState() == PlanState.State.COMPLETE || getPlanState() == 
PlanState.State.FAILED);
   }
-  public synchronized String getJobId() {
-    return jobId;
+
+  public synchronized String getPlanId() {
+    return planId;
   }
 
-  public synchronized JobState.State getJobState() {
-    return (JobState.State) jobState.getStateMachine().getCurrentState();
+  public synchronized PlanState.State getPlanState() {
+    return (PlanState.State) planState.getStateMachine().getCurrentState();
   }
 
   public synchronized StageState.State getStageState(final String stageId) {
@@ -341,7 +342,7 @@ public final class JobStateManager {
   }
 
   /**
-   * Stores JSON representation of job state into a file.
+   * Stores JSON representation of plan state into a file.
    * @param directory the directory which JSON representation is saved to
    * @param suffix suffix for file name
    */
@@ -350,29 +351,29 @@ public final class JobStateManager {
       return;
     }
 
-    final File file = new File(directory, jobId + "-" + suffix + ".json");
+    final File file = new File(directory, planId + "-" + suffix + ".json");
     file.getParentFile().mkdirs();
     try (final PrintWriter printWriter = new PrintWriter(file)) {
       printWriter.println(toStringWithPhysicalPlan());
-      LOG.debug(String.format("JSON representation of job state for %s(%s) was 
saved to %s",
-          jobId, suffix, file.getPath()));
+      LOG.debug(String.format("JSON representation of plan state for %s(%s) 
was saved to %s",
+          planId, suffix, file.getPath()));
     } catch (final IOException e) {
-      LOG.warn(String.format("Cannot store JSON representation of job state 
for %s(%s) to %s: %s",
-          jobId, suffix, file.getPath(), e.toString()));
+      LOG.warn(String.format("Cannot store JSON representation of plan state 
for %s(%s) to %s: %s",
+          planId, suffix, file.getPath(), e.toString()));
     }
   }
 
   public String toStringWithPhysicalPlan() {
     final StringBuilder sb = new StringBuilder("{");
     sb.append("\"dag\": 
").append(physicalPlan.getStageDAG().toString()).append(", ");
-    sb.append("\"jobState\": ").append(toString()).append("}");
+    sb.append("\"planState\": ").append(toString()).append("}");
     return sb.toString();
   }
 
   @Override
   public synchronized String toString() {
     final StringBuilder sb = new StringBuilder("{");
-    sb.append("\"jobId\": \"").append(jobId).append("\", ");
+    sb.append("\"planId\": \"").append(planId).append("\", ");
     sb.append("\"stages\": [");
     boolean isFirstStage = true;
     for (final Stage stage : physicalPlan.getStageDAG().getVertices()) {
diff --git 
a/runtime/master/src/main/java/edu/snu/nemo/runtime/master/RuntimeMaster.java 
b/runtime/master/src/main/java/edu/snu/nemo/runtime/master/RuntimeMaster.java
index 0aef6a6..41ad243 100644
--- 
a/runtime/master/src/main/java/edu/snu/nemo/runtime/master/RuntimeMaster.java
+++ 
b/runtime/master/src/main/java/edu/snu/nemo/runtime/master/RuntimeMaster.java
@@ -62,7 +62,7 @@ import static 
edu.snu.nemo.runtime.common.state.TaskState.State.ON_HOLD;
  * Runtime Master is the central controller of Runtime.
  * Compiler submits an {@link PhysicalPlan} to Runtime Master to execute a job.
  * Runtime Master handles:
- *    a) Scheduling the job with {@link Scheduler}.
+ *    a) Scheduling the plan with {@link Scheduler}.
  *    b) Managing resources with {@link ContainerManager}.
  *    c) Managing blocks with {@link BlockManagerMaster}.
  *    d) Receiving and sending control messages with {@link 
MessageEnvironment}.
@@ -156,22 +156,22 @@ public final class RuntimeMaster {
    * @param plan to execute
    * @param maxScheduleAttempt the max number of times this plan/sub-part of 
the plan should be attempted.
    */
-  public Pair<JobStateManager, ScheduledExecutorService> execute(final 
PhysicalPlan plan,
-                                                                 final int 
maxScheduleAttempt) {
-    final Callable<Pair<JobStateManager, ScheduledExecutorService>> 
jobExecutionCallable = () -> {
+  public Pair<PlanStateManager, ScheduledExecutorService> execute(final 
PhysicalPlan plan,
+                                                                  final int 
maxScheduleAttempt) {
+    final Callable<Pair<PlanStateManager, ScheduledExecutorService>> 
planExecutionCallable = () -> {
       this.irVertices.addAll(plan.getIdToIRVertex().values());
       try {
         blockManagerMaster.initialize(plan);
-        final JobStateManager jobStateManager = new JobStateManager(plan, 
metricMessageHandler, maxScheduleAttempt);
-        scheduler.scheduleJob(plan, jobStateManager);
-        final ScheduledExecutorService dagLoggingExecutor = 
scheduleDagLogging(jobStateManager);
-        return Pair.of(jobStateManager, dagLoggingExecutor);
+        final PlanStateManager planStateManager = new PlanStateManager(plan, 
metricMessageHandler, maxScheduleAttempt);
+        scheduler.schedulePlan(plan, planStateManager);
+        final ScheduledExecutorService dagLoggingExecutor = 
scheduleDagLogging(planStateManager);
+        return Pair.of(planStateManager, dagLoggingExecutor);
       } catch (Exception e) {
         throw new RuntimeException(e);
       }
     };
     try {
-      return runtimeMasterThread.submit(jobExecutionCallable).get();
+      return runtimeMasterThread.submit(planExecutionCallable).get();
     } catch (Exception e) {
       throw new RuntimeException(e);
     }
@@ -444,17 +444,17 @@ public final class RuntimeMaster {
 
   /**
    * Schedules a periodic DAG logging thread.
-   * @param jobStateManager for the job the DAG should be logged.
+   * @param planStateManager for the plan the DAG should be logged.
    * TODO #20: RESTful APIs to Access Job State and Metric.
    * @return the scheduled executor service.
    */
-  private ScheduledExecutorService scheduleDagLogging(final JobStateManager 
jobStateManager) {
+  private ScheduledExecutorService scheduleDagLogging(final PlanStateManager 
planStateManager) {
     final ScheduledExecutorService dagLoggingExecutor = 
Executors.newSingleThreadScheduledExecutor();
     dagLoggingExecutor.scheduleAtFixedRate(new Runnable() {
       private int dagLogFileIndex = 0;
 
       public void run() {
-        jobStateManager.storeJSON(dagDirectory, 
String.valueOf(dagLogFileIndex++));
+        planStateManager.storeJSON(dagDirectory, 
String.valueOf(dagLogFileIndex++));
       }
     }, DAG_LOGGING_PERIOD, DAG_LOGGING_PERIOD, TimeUnit.MILLISECONDS);
 
diff --git 
a/runtime/master/src/main/java/edu/snu/nemo/runtime/master/eventhandler/UpdatePhysicalPlanEventHandler.java
 
b/runtime/master/src/main/java/edu/snu/nemo/runtime/master/eventhandler/UpdatePhysicalPlanEventHandler.java
index 4047cbb..9872f95 100644
--- 
a/runtime/master/src/main/java/edu/snu/nemo/runtime/master/eventhandler/UpdatePhysicalPlanEventHandler.java
+++ 
b/runtime/master/src/main/java/edu/snu/nemo/runtime/master/eventhandler/UpdatePhysicalPlanEventHandler.java
@@ -48,6 +48,6 @@ public final class UpdatePhysicalPlanEventHandler implements 
CompilerEventHandle
   public void onNext(final UpdatePhysicalPlanEvent updatePhysicalPlanEvent) {
     final PhysicalPlan newPlan = updatePhysicalPlanEvent.getNewPhysicalPlan();
 
-    this.scheduler.updateJob(newPlan.getId(), newPlan);
+    this.scheduler.updatePlan(newPlan.getId(), newPlan);
   }
 }
diff --git 
a/runtime/master/src/main/java/edu/snu/nemo/runtime/master/scheduler/BatchSingleJobScheduler.java
 
b/runtime/master/src/main/java/edu/snu/nemo/runtime/master/scheduler/BatchScheduler.java
similarity index 88%
rename from 
runtime/master/src/main/java/edu/snu/nemo/runtime/master/scheduler/BatchSingleJobScheduler.java
rename to 
runtime/master/src/main/java/edu/snu/nemo/runtime/master/scheduler/BatchScheduler.java
index 57db43b..ffee479 100644
--- 
a/runtime/master/src/main/java/edu/snu/nemo/runtime/master/scheduler/BatchSingleJobScheduler.java
+++ 
b/runtime/master/src/main/java/edu/snu/nemo/runtime/master/scheduler/BatchScheduler.java
@@ -31,7 +31,7 @@ import edu.snu.nemo.common.exception.*;
 import edu.snu.nemo.common.ir.vertex.MetricCollectionBarrierVertex;
 import edu.snu.nemo.runtime.common.state.StageState;
 import edu.snu.nemo.runtime.master.BlockManagerMaster;
-import edu.snu.nemo.runtime.master.JobStateManager;
+import edu.snu.nemo.runtime.master.PlanStateManager;
 import edu.snu.nemo.runtime.master.resource.ExecutorRepresenter;
 import org.apache.reef.annotations.audience.DriverSide;
 import org.slf4j.LoggerFactory;
@@ -49,15 +49,15 @@ import org.slf4j.Logger;
  * (CONCURRENCY) Only a single dedicated thread should use the public methods 
of this class.
  * (i.e., runtimeMasterThread in RuntimeMaster)
  *
- * BatchSingleJobScheduler receives a single {@link PhysicalPlan} to execute 
and schedules the Tasks.
+ * BatchScheduler receives a single {@link PhysicalPlan} to execute and 
schedules the Tasks.
  */
 @DriverSide
 @NotThreadSafe
-public final class BatchSingleJobScheduler implements Scheduler {
-  private static final Logger LOG = 
LoggerFactory.getLogger(BatchSingleJobScheduler.class.getName());
+public final class BatchScheduler implements Scheduler {
+  private static final Logger LOG = 
LoggerFactory.getLogger(BatchScheduler.class.getName());
 
   /**
-   * Components related to scheduling the given job.
+   * Components related to scheduling the given plan.
    */
   private final SchedulerRunner schedulerRunner;
   private final PendingTaskCollectionPointer pendingTaskCollectionPointer;
@@ -70,19 +70,19 @@ public final class BatchSingleJobScheduler implements 
Scheduler {
   private final PubSubEventHandlerWrapper pubSubEventHandlerWrapper;
 
   /**
-   * The below variables depend on the submitted job to execute.
+   * The below variables depend on the submitted plan to execute.
    */
   private PhysicalPlan physicalPlan;
-  private JobStateManager jobStateManager;
+  private PlanStateManager planStateManager;
   private List<List<Stage>> sortedScheduleGroups;
 
   @Inject
-  private BatchSingleJobScheduler(final SchedulerRunner schedulerRunner,
-                                  final PendingTaskCollectionPointer 
pendingTaskCollectionPointer,
-                                  final BlockManagerMaster blockManagerMaster,
-                                  final PubSubEventHandlerWrapper 
pubSubEventHandlerWrapper,
-                                  final UpdatePhysicalPlanEventHandler 
updatePhysicalPlanEventHandler,
-                                  final ExecutorRegistry executorRegistry) {
+  private BatchScheduler(final SchedulerRunner schedulerRunner,
+                         final PendingTaskCollectionPointer 
pendingTaskCollectionPointer,
+                         final BlockManagerMaster blockManagerMaster,
+                         final PubSubEventHandlerWrapper 
pubSubEventHandlerWrapper,
+                         final UpdatePhysicalPlanEventHandler 
updatePhysicalPlanEventHandler,
+                         final ExecutorRegistry executorRegistry) {
     this.schedulerRunner = schedulerRunner;
     this.pendingTaskCollectionPointer = pendingTaskCollectionPointer;
     this.blockManagerMaster = blockManagerMaster;
@@ -96,26 +96,22 @@ public final class BatchSingleJobScheduler implements 
Scheduler {
   }
 
   /**
-   * @param physicalPlanOfJob of the job.
-   * @param jobStateManagerOfJob of the job.
+   * @param submittedPhysicalPlan the physical plan to schedule.
+   * @param submittedPlanStateManager the state manager of the plan.
    */
   @Override
-  public void scheduleJob(final PhysicalPlan physicalPlanOfJob, final 
JobStateManager jobStateManagerOfJob) {
-    if (this.physicalPlan != null || this.jobStateManager != null) {
-      throw new IllegalStateException("scheduleJob() has been called more than 
once");
-    }
+  public void schedulePlan(final PhysicalPlan submittedPhysicalPlan, final 
PlanStateManager submittedPlanStateManager) {
+    LOG.info("Scheduled plan");
 
-    this.physicalPlan = physicalPlanOfJob;
-    this.jobStateManager = jobStateManagerOfJob;
+    this.physicalPlan = submittedPhysicalPlan;
+    this.planStateManager = submittedPlanStateManager;
 
-    schedulerRunner.run(jobStateManager);
-    LOG.info("Job to schedule: {}", this.physicalPlan.getId());
+    schedulerRunner.run(this.planStateManager);
+    LOG.info("Plan to schedule: {}", this.physicalPlan.getId());
 
-    this.sortedScheduleGroups = this.physicalPlan.getStageDAG().getVertices()
-        .stream()
+    this.sortedScheduleGroups = 
this.physicalPlan.getStageDAG().getVertices().stream()
         .collect(Collectors.groupingBy(Stage::getScheduleGroup))
-        .entrySet()
-        .stream()
+        .entrySet().stream()
         .sorted(Map.Entry.comparingByKey())
         .map(Map.Entry::getValue)
         .collect(Collectors.toList());
@@ -124,8 +120,8 @@ public final class BatchSingleJobScheduler implements 
Scheduler {
   }
 
   @Override
-  public void updateJob(final String jobId, final PhysicalPlan 
newPhysicalPlan) {
-    // update the job in the scheduler.
+  public void updatePlan(final String planId, final PhysicalPlan 
newPhysicalPlan) {
+    // update the physical plan in the scheduler.
     // NOTE: what's already been executed is not modified in the new physical 
plan.
     this.physicalPlan = newPhysicalPlan;
   }
@@ -148,11 +144,11 @@ public final class BatchSingleJobScheduler implements 
Scheduler {
                                             final TaskState.State newState,
                                             @Nullable final String 
vertexPutOnHold,
                                             final 
TaskState.RecoverableTaskFailureCause failureCause) {
-    final int currentTaskAttemptIndex = jobStateManager.getTaskAttempt(taskId);
+    final int currentTaskAttemptIndex = 
planStateManager.getTaskAttempt(taskId);
 
     if (taskAttemptIndex == currentTaskAttemptIndex) {
       // Do change state, as this notification is for the current task attempt.
-      jobStateManager.onTaskStateChanged(taskId, newState);
+      planStateManager.onTaskStateChanged(taskId, newState);
       switch (newState) {
         case COMPLETE:
           onTaskExecutionComplete(executorId, taskId);
@@ -165,7 +161,7 @@ public final class BatchSingleJobScheduler implements 
Scheduler {
           onTaskExecutionOnHold(executorId, taskId, vertexPutOnHold);
           break;
         case FAILED:
-          throw new UnrecoverableFailureException(new Exception(new 
StringBuffer().append("The job failed on Task #")
+          throw new UnrecoverableFailureException(new Exception(new 
StringBuffer().append("The plan failed on Task #")
               .append(taskId).append(" in Executor 
").append(executorId).toString()));
         case READY:
         case EXECUTING:
@@ -181,8 +177,8 @@ public final class BatchSingleJobScheduler implements 
Scheduler {
         case ON_HOLD:
           // If the stage has completed
           final String stageIdForTaskUponCompletion = 
RuntimeIdGenerator.getStageIdFromTaskId(taskId);
-          if 
(jobStateManager.getStageState(stageIdForTaskUponCompletion).equals(StageState.State.COMPLETE))
 {
-            if (!jobStateManager.isJobDone()) {
+          if 
(planStateManager.getStageState(stageIdForTaskUponCompletion).equals(StageState.State.COMPLETE))
 {
+            if (!planStateManager.isPlanDone()) {
               doSchedule();
             }
           }
@@ -296,7 +292,7 @@ public final class BatchSingleJobScheduler implements 
Scheduler {
     return sortedScheduleGroups.stream()
         .filter(scheduleGroup -> scheduleGroup.stream()
             .map(Stage::getId)
-            .map(jobStateManager::getStageState)
+            .map(planStateManager::getStageState)
             .anyMatch(state -> state.equals(StageState.State.INCOMPLETE))) // 
any incomplete stage in the group
         .findFirst(); // selects the one with the smallest scheduling group 
index.
   }
@@ -309,7 +305,7 @@ public final class BatchSingleJobScheduler implements 
Scheduler {
 
     final List<String> taskIdsToSchedule = new LinkedList<>();
     for (final String taskId : stageToSchedule.getTaskIds()) {
-      final TaskState.State taskState = jobStateManager.getTaskState(taskId);
+      final TaskState.State taskState = planStateManager.getTaskState(taskId);
 
       switch (taskState) {
         // Don't schedule these.
@@ -320,7 +316,7 @@ public final class BatchSingleJobScheduler implements 
Scheduler {
 
         // These are schedulable.
         case SHOULD_RETRY:
-          jobStateManager.onTaskStateChanged(taskId, TaskState.State.READY);
+          planStateManager.onTaskStateChanged(taskId, TaskState.State.READY);
         case READY:
           taskIdsToSchedule.add(taskId);
           break;
@@ -337,7 +333,7 @@ public final class BatchSingleJobScheduler implements 
Scheduler {
     taskIdsToSchedule.forEach(taskId -> {
       blockManagerMaster.onProducerTaskScheduled(taskId); // Notify the block 
manager early for push edges.
       final int taskIdx = RuntimeIdGenerator.getIndexFromTaskId(taskId);
-      final int attemptIdx = jobStateManager.getTaskAttempt(taskId);
+      final int attemptIdx = planStateManager.getTaskAttempt(taskId);
       tasks.add(new Task(
           physicalPlan.getId(),
           taskId,
@@ -385,7 +381,7 @@ public final class BatchSingleJobScheduler implements 
Scheduler {
     final String stageIdForTaskUponCompletion = 
RuntimeIdGenerator.getStageIdFromTaskId(taskId);
 
     final boolean stageComplete =
-        
jobStateManager.getStageState(stageIdForTaskUponCompletion).equals(StageState.State.COMPLETE);
+        
planStateManager.getStageState(stageIdForTaskUponCompletion).equals(StageState.State.COMPLETE);
 
     if (stageComplete) {
       // get optimization vertex from the task.
@@ -441,7 +437,7 @@ public final class BatchSingleJobScheduler implements 
Scheduler {
     final Set<String> tasksToRetry = Sets.union(tasks, requiredParents);
     LOG.info("Will be retried: {}", tasksToRetry);
     tasksToRetry.forEach(
-        taskToReExecute -> jobStateManager.onTaskStateChanged(taskToReExecute, 
TaskState.State.SHOULD_RETRY));
+        taskToReExecute -> 
planStateManager.onTaskStateChanged(taskToReExecute, 
TaskState.State.SHOULD_RETRY));
   }
 
   private Set<String> recursivelyGetParentTasksForLostBlocks(final Set<String> 
children) {
diff --git 
a/runtime/master/src/main/java/edu/snu/nemo/runtime/master/scheduler/Scheduler.java
 
b/runtime/master/src/main/java/edu/snu/nemo/runtime/master/scheduler/Scheduler.java
index aaa82e0..f8413f1 100644
--- 
a/runtime/master/src/main/java/edu/snu/nemo/runtime/master/scheduler/Scheduler.java
+++ 
b/runtime/master/src/main/java/edu/snu/nemo/runtime/master/scheduler/Scheduler.java
@@ -17,7 +17,7 @@ package edu.snu.nemo.runtime.master.scheduler;
 
 import edu.snu.nemo.runtime.common.plan.PhysicalPlan;
 import edu.snu.nemo.runtime.common.state.TaskState;
-import edu.snu.nemo.runtime.master.JobStateManager;
+import edu.snu.nemo.runtime.master.PlanStateManager;
 import edu.snu.nemo.runtime.master.resource.ExecutorRepresenter;
 import org.apache.reef.annotations.audience.DriverSide;
 import org.apache.reef.tang.annotations.DefaultImplementation;
@@ -31,23 +31,23 @@ import javax.annotation.Nullable;
  * Other scheduler-related classes that are accessed by only one of the two 
threads are not synchronized(NotThreadSafe).
  */
 @DriverSide
-@DefaultImplementation(BatchSingleJobScheduler.class)
+@DefaultImplementation(BatchScheduler.class)
 public interface Scheduler {
 
   /**
-   * Schedules the given job.
+   * Schedules the given plan.
    * @param physicalPlan of the job being submitted.
-   * @param jobStateManager to manage the states of the submitted job.
+   * @param planStateManager to manage the states of the submitted plan.
    */
-  void scheduleJob(PhysicalPlan physicalPlan,
-                   JobStateManager jobStateManager);
+  void schedulePlan(PhysicalPlan physicalPlan,
+                    PlanStateManager planStateManager);
 
   /**
    * Receives and updates the scheduler with a new physical plan for a job.
-   * @param jobId the ID of the job to change the physical plan.
+   * @param planId the ID of the physical plan to change.
    * @param newPhysicalPlan new physical plan for the job.
    */
-  void updateJob(String jobId, PhysicalPlan newPhysicalPlan);
+  void updatePlan(String planId, PhysicalPlan newPhysicalPlan);
 
   /**
    * Called when an executor is added to Runtime, so that the extra resource 
can be used to execute the job.
diff --git 
a/runtime/master/src/main/java/edu/snu/nemo/runtime/master/scheduler/SchedulerRunner.java
 
b/runtime/master/src/main/java/edu/snu/nemo/runtime/master/scheduler/SchedulerRunner.java
index defc91b..750c0fc 100644
--- 
a/runtime/master/src/main/java/edu/snu/nemo/runtime/master/scheduler/SchedulerRunner.java
+++ 
b/runtime/master/src/main/java/edu/snu/nemo/runtime/master/scheduler/SchedulerRunner.java
@@ -17,7 +17,7 @@ package edu.snu.nemo.runtime.master.scheduler;
 
 import edu.snu.nemo.runtime.common.plan.Task;
 import edu.snu.nemo.runtime.common.state.TaskState;
-import edu.snu.nemo.runtime.master.JobStateManager;
+import edu.snu.nemo.runtime.master.PlanStateManager;
 import edu.snu.nemo.runtime.master.resource.ExecutorRepresenter;
 import org.apache.commons.lang3.mutable.MutableObject;
 import org.apache.reef.annotations.audience.DriverSide;
@@ -46,7 +46,7 @@ import javax.inject.Inject;
 @NotThreadSafe
 public final class SchedulerRunner {
   private static final Logger LOG = 
LoggerFactory.getLogger(SchedulerRunner.class.getName());
-  private final Map<String, JobStateManager> jobStateManagers;
+  private final Map<String, PlanStateManager> planStateManagers;
   private final PendingTaskCollectionPointer pendingTaskCollectionPointer;
   private final ExecutorService schedulerThread;
   private boolean isSchedulerRunning;
@@ -62,7 +62,7 @@ public final class SchedulerRunner {
                           final SchedulingPolicy schedulingPolicy,
                           final PendingTaskCollectionPointer 
pendingTaskCollectionPointer,
                           final ExecutorRegistry executorRegistry) {
-    this.jobStateManagers = new HashMap<>();
+    this.planStateManagers = new HashMap<>();
     this.pendingTaskCollectionPointer = pendingTaskCollectionPointer;
     this.schedulerThread = Executors.newSingleThreadExecutor(runnable ->
         new Thread(runnable, "SchedulerRunner thread"));
@@ -84,11 +84,11 @@ public final class SchedulerRunner {
         doScheduleTaskList();
         schedulingIteration.await();
       }
-      jobStateManagers.values().forEach(jobStateManager -> {
-        if (jobStateManager.isJobDone()) {
-          LOG.info("{} is complete.", jobStateManager.getJobId());
+      planStateManagers.values().forEach(planStateManager -> {
+        if (planStateManager.isPlanDone()) {
+          LOG.info("{} is complete.", planStateManager.getPlanId());
         } else {
-          LOG.info("{} is incomplete.", jobStateManager.getJobId());
+          LOG.info("{} is incomplete.", planStateManager.getPlanId());
         }
       });
       LOG.info("SchedulerRunner Terminated!");
@@ -106,8 +106,8 @@ public final class SchedulerRunner {
     final Collection<Task> taskList = taskListOptional.get();
     final List<Task> couldNotSchedule = new ArrayList<>();
     for (final Task task : taskList) {
-      final JobStateManager jobStateManager = 
jobStateManagers.get(task.getJobId());
-      if 
(!jobStateManager.getTaskState(task.getTaskId()).equals(TaskState.State.READY)) 
{
+      final PlanStateManager planStateManager = 
planStateManagers.get(task.getPlanId());
+      if 
(!planStateManager.getTaskState(task.getTaskId()).equals(TaskState.State.READY))
 {
         // Guard against race conditions causing duplicate task launches
         LOG.debug("Skipping {} as it is not READY", task.getTaskId());
         continue;
@@ -128,7 +128,7 @@ public final class SchedulerRunner {
           final ExecutorRepresenter selectedExecutor
               = schedulingPolicy.selectExecutor(candidateExecutors.getValue(), 
task);
           // update metadata first
-          jobStateManager.onTaskStateChanged(task.getTaskId(), 
TaskState.State.EXECUTING);
+          planStateManager.onTaskStateChanged(task.getTaskId(), 
TaskState.State.EXECUTING);
 
           LOG.info("{} scheduled to {}", task.getTaskId(), 
selectedExecutor.getExecutorId());
 
@@ -164,9 +164,9 @@ public final class SchedulerRunner {
   /**
    * Run the scheduler thread.
    */
-  void run(final JobStateManager jobStateManager) {
+  void run(final PlanStateManager planStateManager) {
+    planStateManagers.put(planStateManager.getPlanId(), planStateManager);
     if (!isTerminated && !isSchedulerRunning) {
-      jobStateManagers.put(jobStateManager.getJobId(), jobStateManager);
       schedulerThread.execute(new SchedulerThread());
       schedulerThread.shutdown();
       isSchedulerRunning = true;
diff --git 
a/runtime/master/src/test/java/edu/snu/nemo/runtime/master/JobStateManagerTest.java
 
b/runtime/master/src/test/java/edu/snu/nemo/runtime/master/PlanStateManagerTest.java
similarity index 67%
rename from 
runtime/master/src/test/java/edu/snu/nemo/runtime/master/JobStateManagerTest.java
rename to 
runtime/master/src/test/java/edu/snu/nemo/runtime/master/PlanStateManagerTest.java
index 73176be..2003153 100644
--- 
a/runtime/master/src/test/java/edu/snu/nemo/runtime/master/JobStateManagerTest.java
+++ 
b/runtime/master/src/test/java/edu/snu/nemo/runtime/master/PlanStateManagerTest.java
@@ -22,7 +22,7 @@ import 
edu.snu.nemo.runtime.common.message.local.LocalMessageDispatcher;
 import edu.snu.nemo.runtime.common.message.local.LocalMessageEnvironment;
 import edu.snu.nemo.runtime.common.plan.PhysicalPlan;
 import edu.snu.nemo.runtime.common.plan.Stage;
-import edu.snu.nemo.runtime.common.state.JobState;
+import edu.snu.nemo.runtime.common.state.PlanState;
 import edu.snu.nemo.runtime.common.state.StageState;
 import edu.snu.nemo.runtime.common.state.TaskState;
 import edu.snu.nemo.runtime.common.plan.TestPlanGenerator;
@@ -42,11 +42,11 @@ import static org.junit.Assert.assertFalse;
 import static org.mockito.Mockito.mock;
 
 /**
- * Tests {@link JobStateManager}.
+ * Tests {@link PlanStateManager}.
  */
 @RunWith(PowerMockRunner.class)
 @PrepareForTest(MetricMessageHandler.class)
-public final class JobStateManagerTest {
+public final class PlanStateManagerTest {
   private static final int MAX_SCHEDULE_ATTEMPT = 2;
   private MetricMessageHandler metricMessageHandler;
 
@@ -59,17 +59,17 @@ public final class JobStateManagerTest {
   }
 
   /**
-   * This method builds a physical DAG starting from an IR DAG and submits it 
to {@link JobStateManager}.
+   * This method builds a physical DAG starting from an IR DAG and submits it 
to {@link PlanStateManager}.
    * State changes are explicitly called to check whether states are managed 
correctly or not.
    */
   @Test
   public void testPhysicalPlanStateChanges() throws Exception {
     final PhysicalPlan physicalPlan =
         
TestPlanGenerator.generatePhysicalPlan(TestPlanGenerator.PlanType.TwoVerticesJoined,
 false);
-    final JobStateManager jobStateManager =
-        new JobStateManager(physicalPlan, metricMessageHandler, 
MAX_SCHEDULE_ATTEMPT);
+    final PlanStateManager planStateManager =
+        new PlanStateManager(physicalPlan, metricMessageHandler, 
MAX_SCHEDULE_ATTEMPT);
 
-    assertEquals(jobStateManager.getJobId(), "TestPlan");
+    assertEquals(planStateManager.getPlanId(), "TestPlan");
 
     final List<Stage> stageList = 
physicalPlan.getStageDAG().getTopologicalSort();
 
@@ -77,45 +77,45 @@ public final class JobStateManagerTest {
       final Stage stage = stageList.get(stageIdx);
       final List<String> taskIds = stage.getTaskIds();
       taskIds.forEach(taskId -> {
-        jobStateManager.onTaskStateChanged(taskId, TaskState.State.EXECUTING);
-        jobStateManager.onTaskStateChanged(taskId, TaskState.State.COMPLETE);
+        planStateManager.onTaskStateChanged(taskId, TaskState.State.EXECUTING);
+        planStateManager.onTaskStateChanged(taskId, TaskState.State.COMPLETE);
         if (RuntimeIdGenerator.getIndexFromTaskId(taskId) == taskIds.size() - 
1) {
-          assertEquals(StageState.State.COMPLETE, 
jobStateManager.getStageState(stage.getId()));
+          assertEquals(StageState.State.COMPLETE, 
planStateManager.getStageState(stage.getId()));
         }
       });
-      taskIds.forEach(taskId -> 
assertEquals(jobStateManager.getTaskState(taskId), TaskState.State.COMPLETE));
+      taskIds.forEach(taskId -> 
assertEquals(planStateManager.getTaskState(taskId), TaskState.State.COMPLETE));
 
       if (stageIdx == stageList.size() - 1) {
-        assertEquals(jobStateManager.getJobState(), JobState.State.COMPLETE);
+        assertEquals(planStateManager.getPlanState(), 
PlanState.State.COMPLETE);
       }
     }
   }
 
   /**
-   * Test whether the methods waiting finish of job works properly.
+   * Test whether the methods waiting for the finish of the plan works 
properly.
    */
   @Test(timeout = 2000)
   public void testWaitUntilFinish() throws Exception {
     final PhysicalPlan physicalPlan =
         
TestPlanGenerator.generatePhysicalPlan(TestPlanGenerator.PlanType.TwoVerticesJoined,
 false);
-    final JobStateManager jobStateManager =
-        new JobStateManager(physicalPlan, metricMessageHandler, 
MAX_SCHEDULE_ATTEMPT);
+    final PlanStateManager planStateManager =
+        new PlanStateManager(physicalPlan, metricMessageHandler, 
MAX_SCHEDULE_ATTEMPT);
 
-    assertFalse(jobStateManager.isJobDone());
+    assertFalse(planStateManager.isPlanDone());
 
-    // Wait for the job to finish and check the job state.
+    // Wait for the plan to finish and check the plan state.
     // It have to return EXECUTING state after timeout.
-    final JobState.State executingState = jobStateManager.waitUntilFinish(100, 
TimeUnit.MILLISECONDS);
-    assertEquals(JobState.State.EXECUTING, executingState);
+    final PlanState.State executingState = 
planStateManager.waitUntilFinish(100, TimeUnit.MILLISECONDS);
+    assertEquals(PlanState.State.EXECUTING, executingState);
 
-    // Complete the job and check the result again.
-    // It have to return COMPLETE.
+    // Complete the plan and check the result again.
+    // It has to return COMPLETE.
     final List<String> tasks = 
physicalPlan.getStageDAG().getTopologicalSort().stream()
         .flatMap(stage -> stage.getTaskIds().stream())
         .collect(Collectors.toList());
-    tasks.forEach(taskId -> jobStateManager.onTaskStateChanged(taskId, 
TaskState.State.EXECUTING));
-    tasks.forEach(taskId -> jobStateManager.onTaskStateChanged(taskId, 
TaskState.State.COMPLETE));
-    final JobState.State completedState = jobStateManager.waitUntilFinish();
-    assertEquals(JobState.State.COMPLETE, completedState);
+    tasks.forEach(taskId -> planStateManager.onTaskStateChanged(taskId, 
TaskState.State.EXECUTING));
+    tasks.forEach(taskId -> planStateManager.onTaskStateChanged(taskId, 
TaskState.State.COMPLETE));
+    final PlanState.State completedState = planStateManager.waitUntilFinish();
+    assertEquals(PlanState.State.COMPLETE, completedState);
   }
 }
diff --git 
a/runtime/master/src/test/java/edu/snu/nemo/runtime/master/scheduler/BatchSingleJobSchedulerTest.java
 
b/runtime/master/src/test/java/edu/snu/nemo/runtime/master/scheduler/BatchSchedulerTest.java
similarity index 88%
rename from 
runtime/master/src/test/java/edu/snu/nemo/runtime/master/scheduler/BatchSingleJobSchedulerTest.java
rename to 
runtime/master/src/test/java/edu/snu/nemo/runtime/master/scheduler/BatchSchedulerTest.java
index cfa6194..9733c75 100644
--- 
a/runtime/master/src/test/java/edu/snu/nemo/runtime/master/scheduler/BatchSingleJobSchedulerTest.java
+++ 
b/runtime/master/src/test/java/edu/snu/nemo/runtime/master/scheduler/BatchSchedulerTest.java
@@ -23,7 +23,7 @@ import edu.snu.nemo.runtime.common.message.MessageSender;
 import edu.snu.nemo.runtime.common.plan.PhysicalPlan;
 import edu.snu.nemo.runtime.common.plan.Stage;
 import edu.snu.nemo.runtime.common.plan.StageEdge;
-import edu.snu.nemo.runtime.master.JobStateManager;
+import edu.snu.nemo.runtime.master.PlanStateManager;
 import edu.snu.nemo.runtime.master.MetricMessageHandler;
 import edu.snu.nemo.runtime.master.BlockManagerMaster;
 import edu.snu.nemo.runtime.master.eventhandler.UpdatePhysicalPlanEventHandler;
@@ -54,13 +54,13 @@ import static org.junit.Assert.assertTrue;
 import static org.mockito.Mockito.mock;
 
 /**
- * Tests {@link BatchSingleJobScheduler}.
+ * Tests {@link BatchScheduler}.
  */
 @RunWith(PowerMockRunner.class)
 @PrepareForTest({ContainerManager.class, BlockManagerMaster.class,
     PubSubEventHandlerWrapper.class, UpdatePhysicalPlanEventHandler.class, 
MetricMessageHandler.class})
-public final class BatchSingleJobSchedulerTest {
-  private static final Logger LOG = 
LoggerFactory.getLogger(BatchSingleJobSchedulerTest.class.getName());
+public final class BatchSchedulerTest {
+  private static final Logger LOG = 
LoggerFactory.getLogger(BatchSchedulerTest.class.getName());
   private Scheduler scheduler;
   private ExecutorRegistry executorRegistry;
   private final MetricMessageHandler metricMessageHandler = 
mock(MetricMessageHandler.class);
@@ -80,7 +80,7 @@ public final class BatchSingleJobSchedulerTest {
     injector.bindVolatileInstance(BlockManagerMaster.class, 
mock(BlockManagerMaster.class));
     injector.bindVolatileInstance(PubSubEventHandlerWrapper.class, 
mock(PubSubEventHandlerWrapper.class));
     injector.bindVolatileInstance(UpdatePhysicalPlanEventHandler.class, 
mock(UpdatePhysicalPlanEventHandler.class));
-    scheduler = injector.getInstance(BatchSingleJobScheduler.class);
+    scheduler = injector.getInstance(BatchScheduler.class);
 
     final ActiveContext activeContext = mock(ActiveContext.class);
     Mockito.doThrow(new RuntimeException()).when(activeContext).close();
@@ -114,28 +114,28 @@ public final class BatchSingleJobSchedulerTest {
   }
 
   /**
-   * This method builds a physical DAG starting from an IR DAG and submits it 
to {@link BatchSingleJobScheduler}.
+   * This method builds a physical DAG starting from an IR DAG and submits it 
to {@link BatchScheduler}.
    * Task state changes are explicitly submitted to scheduler instead of 
executor messages.
    */
   @Test(timeout=10000)
   public void testPull() throws Exception {
-    scheduleAndCheckJobTermination(
+    scheduleAndCheckPlanTermination(
         
TestPlanGenerator.generatePhysicalPlan(TestPlanGenerator.PlanType.TwoVerticesJoined,
 false));
   }
 
   /**
-   * This method builds a physical DAG starting from an IR DAG and submits it 
to {@link BatchSingleJobScheduler}.
+   * This method builds a physical DAG starting from an IR DAG and submits it 
to {@link BatchScheduler}.
    * Task state changes are explicitly submitted to scheduler instead of 
executor messages.
    */
   @Test(timeout=10000)
   public void testPush() throws Exception {
-    scheduleAndCheckJobTermination(
+    scheduleAndCheckPlanTermination(
         
TestPlanGenerator.generatePhysicalPlan(TestPlanGenerator.PlanType.TwoVerticesJoined,
 true));
   }
 
-  private void scheduleAndCheckJobTermination(final PhysicalPlan plan) throws 
InjectionException {
-    final JobStateManager jobStateManager = new JobStateManager(plan, 
metricMessageHandler, 1);
-    scheduler.scheduleJob(plan, jobStateManager);
+  private void scheduleAndCheckPlanTermination(final PhysicalPlan plan) throws 
InjectionException {
+    final PlanStateManager planStateManager = new PlanStateManager(plan, 
metricMessageHandler, 1);
+    scheduler.schedulePlan(plan, planStateManager);
 
     // For each ScheduleGroup, test if the tasks of the next ScheduleGroup are 
scheduled
     // after the stages of each ScheduleGroup are made "complete".
@@ -146,14 +146,14 @@ public final class BatchSingleJobSchedulerTest {
       LOG.debug("Checking that all stages of ScheduleGroup {} enter the 
executing state", scheduleGroupIdx);
       stages.forEach(stage -> {
         SchedulerTestUtil.completeStage(
-            jobStateManager, scheduler, executorRegistry, stage, 
SCHEDULE_ATTEMPT_INDEX);
+            planStateManager, scheduler, executorRegistry, stage, 
SCHEDULE_ATTEMPT_INDEX);
       });
     }
 
-    LOG.debug("Waiting for job termination after sending stage completion 
events");
-    while (!jobStateManager.isJobDone()) {
+    LOG.debug("Waiting for plan termination after sending stage completion 
events");
+    while (!planStateManager.isPlanDone()) {
     }
-    assertTrue(jobStateManager.isJobDone());
+    assertTrue(planStateManager.isPlanDone());
   }
 
   private List<Stage> filterStagesWithAScheduleGroup(
diff --git 
a/runtime/master/src/test/java/edu/snu/nemo/runtime/master/scheduler/SchedulerTestUtil.java
 
b/runtime/master/src/test/java/edu/snu/nemo/runtime/master/scheduler/SchedulerTestUtil.java
index 815dff8..d31965b 100644
--- 
a/runtime/master/src/test/java/edu/snu/nemo/runtime/master/scheduler/SchedulerTestUtil.java
+++ 
b/runtime/master/src/test/java/edu/snu/nemo/runtime/master/scheduler/SchedulerTestUtil.java
@@ -18,7 +18,7 @@ package edu.snu.nemo.runtime.master.scheduler;
 import edu.snu.nemo.runtime.common.plan.Stage;
 import edu.snu.nemo.runtime.common.state.StageState;
 import edu.snu.nemo.runtime.common.state.TaskState;
-import edu.snu.nemo.runtime.master.JobStateManager;
+import edu.snu.nemo.runtime.master.PlanStateManager;
 import edu.snu.nemo.runtime.master.resource.ExecutorRepresenter;
 
 import java.util.Optional;
@@ -29,25 +29,25 @@ import java.util.Optional;
 final class SchedulerTestUtil {
   /**
    * Complete the stage by completing all of its Tasks.
-   * @param jobStateManager for the submitted job.
-   * @param scheduler for the submitted job.
+   * @param planStateManager for the submitted plan.
+   * @param scheduler for the submitted plan.
    * @param executorRegistry provides executor representers
    * @param stage for which the states should be marked as complete.
    */
-  static void completeStage(final JobStateManager jobStateManager,
+  static void completeStage(final PlanStateManager planStateManager,
                             final Scheduler scheduler,
                             final ExecutorRegistry executorRegistry,
                             final Stage stage,
                             final int attemptIdx) {
     // Loop until the stage completes.
     while (true) {
-      final StageState.State stageState = 
jobStateManager.getStageState(stage.getId());
+      final StageState.State stageState = 
planStateManager.getStageState(stage.getId());
       if (StageState.State.COMPLETE == stageState) {
         // Stage has completed, so we break out of the loop.
         break;
       } else if (StageState.State.INCOMPLETE == stageState) {
         stage.getTaskIds().forEach(taskId -> {
-          final TaskState.State taskState = 
jobStateManager.getTaskState(taskId);
+          final TaskState.State taskState = 
planStateManager.getTaskState(taskId);
           if (TaskState.State.EXECUTING == taskState) {
             sendTaskStateEventToScheduler(scheduler, executorRegistry, taskId,
                 TaskState.State.COMPLETE, attemptIdx, null);
@@ -66,7 +66,7 @@ final class SchedulerTestUtil {
   /**
    * Sends task state change event to scheduler.
    * This replaces executor's task completion messages for testing purposes.
-   * @param scheduler for the submitted job.
+   * @param scheduler for the submitted plan.
    * @param executorRegistry provides executor representers
    * @param taskId for the task to change the state.
    * @param newState for the task.
diff --git 
a/runtime/master/src/test/java/edu/snu/nemo/runtime/master/scheduler/TaskRetryTest.java
 
b/runtime/master/src/test/java/edu/snu/nemo/runtime/master/scheduler/TaskRetryTest.java
index cd54432..e1ab449 100644
--- 
a/runtime/master/src/test/java/edu/snu/nemo/runtime/master/scheduler/TaskRetryTest.java
+++ 
b/runtime/master/src/test/java/edu/snu/nemo/runtime/master/scheduler/TaskRetryTest.java
@@ -20,10 +20,10 @@ import 
edu.snu.nemo.common.ir.vertex.executionproperty.ResourcePriorityProperty;
 import edu.snu.nemo.runtime.common.comm.ControlMessage;
 import edu.snu.nemo.runtime.common.message.MessageSender;
 import edu.snu.nemo.runtime.common.plan.PhysicalPlan;
-import edu.snu.nemo.runtime.common.state.JobState;
+import edu.snu.nemo.runtime.common.state.PlanState;
 import edu.snu.nemo.runtime.common.state.TaskState;
 import edu.snu.nemo.runtime.master.BlockManagerMaster;
-import edu.snu.nemo.runtime.master.JobStateManager;
+import edu.snu.nemo.runtime.master.PlanStateManager;
 import edu.snu.nemo.runtime.master.MetricMessageHandler;
 import edu.snu.nemo.runtime.master.eventhandler.UpdatePhysicalPlanEventHandler;
 import edu.snu.nemo.runtime.master.resource.ExecutorRepresenter;
@@ -68,7 +68,7 @@ public final class TaskRetryTest {
   private Random random;
   private Scheduler scheduler;
   private ExecutorRegistry executorRegistry;
-  private JobStateManager jobStateManager;
+  private PlanStateManager planStateManager;
 
   private static final int MAX_SCHEDULE_ATTEMPT = Integer.MAX_VALUE;
 
@@ -91,14 +91,14 @@ public final class TaskRetryTest {
     injector.bindVolatileInstance(BlockManagerMaster.class, 
mock(BlockManagerMaster.class));
     scheduler = injector.getInstance(Scheduler.class);
 
-    // Get JobStateManager
-    jobStateManager = 
runPhysicalPlan(TestPlanGenerator.PlanType.TwoVerticesJoined);
+    // Get PlanStateManager
+    planStateManager = 
runPhysicalPlan(TestPlanGenerator.PlanType.TwoVerticesJoined);
   }
 
   @Test(timeout=7000)
   public void testExecutorRemoved() throws Exception {
-    // Until the job finishes, events happen
-    while (!jobStateManager.isJobDone()) {
+    // Until the plan finishes, events happen
+    while (!planStateManager.isPlanDone()) {
       // 50% chance remove, 50% chance add, 80% chance task completed
       executorRemoved(0.5);
       executorAdded(0.5);
@@ -108,9 +108,9 @@ public final class TaskRetryTest {
       Thread.sleep(10);
     }
 
-    // Job should COMPLETE
-    assertEquals(JobState.State.COMPLETE, jobStateManager.getJobState());
-    assertTrue(jobStateManager.isJobDone());
+    // Plan should COMPLETE
+    assertEquals(PlanState.State.COMPLETE, planStateManager.getPlanState());
+    assertTrue(planStateManager.isPlanDone());
   }
 
   @Test(timeout=7000)
@@ -120,8 +120,8 @@ public final class TaskRetryTest {
     executorAdded(1.0);
     executorAdded(1.0);
 
-    // Until the job finishes, events happen
-    while (!jobStateManager.isJobDone()) {
+    // Until the plan finishes, events happen
+    while (!planStateManager.isPlanDone()) {
       // 50% chance task completed
       // 50% chance task output write failed
       taskCompleted(0.5);
@@ -131,9 +131,9 @@ public final class TaskRetryTest {
       Thread.sleep(10);
     }
 
-    // Job should COMPLETE
-    assertEquals(JobState.State.COMPLETE, jobStateManager.getJobState());
-    assertTrue(jobStateManager.isJobDone());
+    // Plan should COMPLETE
+    assertEquals(PlanState.State.COMPLETE, planStateManager.getPlanState());
+    assertTrue(planStateManager.isPlanDone());
   }
 
   ////////////////////////////////////////////////////////////////// Events
@@ -177,12 +177,12 @@ public final class TaskRetryTest {
       return;
     }
 
-    final List<String> executingTasks = getTasksInState(jobStateManager, 
TaskState.State.EXECUTING);
+    final List<String> executingTasks = getTasksInState(planStateManager, 
TaskState.State.EXECUTING);
     if (!executingTasks.isEmpty()) {
       final int randomIndex = random.nextInt(executingTasks.size());
       final String selectedTask = executingTasks.get(randomIndex);
       SchedulerTestUtil.sendTaskStateEventToScheduler(scheduler, 
executorRegistry, selectedTask,
-          TaskState.State.COMPLETE, 
jobStateManager.getTaskAttempt(selectedTask));
+          TaskState.State.COMPLETE, 
planStateManager.getTaskAttempt(selectedTask));
     }
   }
 
@@ -191,30 +191,30 @@ public final class TaskRetryTest {
       return;
     }
 
-    final List<String> executingTasks = getTasksInState(jobStateManager, 
TaskState.State.EXECUTING);
+    final List<String> executingTasks = getTasksInState(planStateManager, 
TaskState.State.EXECUTING);
     if (!executingTasks.isEmpty()) {
       final int randomIndex = random.nextInt(executingTasks.size());
       final String selectedTask = executingTasks.get(randomIndex);
       SchedulerTestUtil.sendTaskStateEventToScheduler(scheduler, 
executorRegistry, selectedTask,
-          TaskState.State.SHOULD_RETRY, 
jobStateManager.getTaskAttempt(selectedTask),
+          TaskState.State.SHOULD_RETRY, 
planStateManager.getTaskAttempt(selectedTask),
           TaskState.RecoverableTaskFailureCause.OUTPUT_WRITE_FAILURE);
     }
   }
 
   ////////////////////////////////////////////////////////////////// Helper 
methods
 
-  private List<String> getTasksInState(final JobStateManager jobStateManager, 
final TaskState.State state) {
-    return jobStateManager.getAllTaskStates().entrySet().stream()
+  private List<String> getTasksInState(final PlanStateManager 
planStateManager, final TaskState.State state) {
+    return planStateManager.getAllTaskStates().entrySet().stream()
         .filter(entry -> 
entry.getValue().getStateMachine().getCurrentState().equals(state))
         .map(Map.Entry::getKey)
         .collect(Collectors.toList());
   }
 
-  private JobStateManager runPhysicalPlan(final TestPlanGenerator.PlanType 
planType) throws Exception {
+  private PlanStateManager runPhysicalPlan(final TestPlanGenerator.PlanType 
planType) throws Exception {
     final MetricMessageHandler metricMessageHandler = 
mock(MetricMessageHandler.class);
     final PhysicalPlan plan = TestPlanGenerator.generatePhysicalPlan(planType, 
false);
-    final JobStateManager jobStateManager = new JobStateManager(plan, 
metricMessageHandler, MAX_SCHEDULE_ATTEMPT);
-    scheduler.scheduleJob(plan, jobStateManager);
-    return jobStateManager;
+    final PlanStateManager planStateManager = new PlanStateManager(plan, 
metricMessageHandler, MAX_SCHEDULE_ATTEMPT);
+    scheduler.schedulePlan(plan, planStateManager);
+    return planStateManager;
   }
 }
diff --git 
a/runtime/test/src/main/java/edu/snu/nemo/runtime/common/plan/TestPlanGenerator.java
 
b/runtime/test/src/main/java/edu/snu/nemo/runtime/common/plan/TestPlanGenerator.java
index 5742238..3b3646e 100644
--- 
a/runtime/test/src/main/java/edu/snu/nemo/runtime/common/plan/TestPlanGenerator.java
+++ 
b/runtime/test/src/main/java/edu/snu/nemo/runtime/common/plan/TestPlanGenerator.java
@@ -94,7 +94,7 @@ public final class TestPlanGenerator {
   private static PhysicalPlan convertIRToPhysical(final DAG<IRVertex, IREdge> 
irDAG,
                                                   final Policy policy) throws 
Exception {
     final DAG<IRVertex, IREdge> optimized = 
policy.runCompileTimeOptimization(irDAG, EMPTY_DAG_DIRECTORY);
-    final DAG<Stage, StageEdge> physicalDAG = 
optimized.convert(PLAN_GENERATOR);
+    final DAG<Stage, StageEdge> physicalDAG = PLAN_GENERATOR.apply(optimized);
     return new PhysicalPlan("TestPlan", physicalDAG);
   }
 

Reply via email to