jeongyooneo closed pull request #17: [NEMO-66] Move /tests/scheduler into 
/runtime/master/scheduler/test
URL: https://github.com/apache/incubator-nemo/pull/17
 
 
   

This is a PR merged from a forked repository.
As GitHub hides the original diff on merge, it is displayed below for
the sake of provenance:

As this is a foreign pull request (from a fork), the diff is supplied
below (as it won't show otherwise due to GitHub magic):

diff --git 
a/compiler/optimizer/src/main/java/edu/snu/nemo/compiler/optimizer/policy/BasicPullPolicy.java
 
b/compiler/optimizer/src/main/java/edu/snu/nemo/compiler/optimizer/policy/BasicPullPolicy.java
new file mode 100644
index 00000000..50d24664
--- /dev/null
+++ 
b/compiler/optimizer/src/main/java/edu/snu/nemo/compiler/optimizer/policy/BasicPullPolicy.java
@@ -0,0 +1,42 @@
+/*
+ * Copyright (C) 2017 Seoul National University
+ *
+ * Licensed under the Apache License, Version 2.0 (the "License");
+ * you may not use this file except in compliance with the License.
+ * You may obtain a copy of the License at
+ *
+ *         http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package edu.snu.nemo.compiler.optimizer.policy;
+
+import edu.snu.nemo.compiler.optimizer.pass.compiletime.CompileTimePass;
+import 
edu.snu.nemo.compiler.optimizer.pass.compiletime.annotating.DefaultStagePartitioningPass;
+import 
edu.snu.nemo.compiler.optimizer.pass.compiletime.annotating.ScheduleGroupPass;
+import edu.snu.nemo.runtime.common.optimizer.pass.runtime.RuntimePass;
+
+import java.util.ArrayList;
+import java.util.List;
+
+/**
+ * Basic pull policy.
+ */
+public final class BasicPullPolicy implements Policy {
+  @Override
+  public List<CompileTimePass> getCompileTimePasses() {
+    List<CompileTimePass> policy = new ArrayList<>();
+    policy.add(new DefaultStagePartitioningPass());
+    policy.add(new ScheduleGroupPass());
+    return policy;
+  }
+
+  @Override
+  public List<RuntimePass<?>> getRuntimePasses() {
+    return new ArrayList<>();
+  }
+}
diff --git 
a/compiler/optimizer/src/main/java/edu/snu/nemo/compiler/optimizer/policy/BasicPushPolicy.java
 
b/compiler/optimizer/src/main/java/edu/snu/nemo/compiler/optimizer/policy/BasicPushPolicy.java
new file mode 100644
index 00000000..201cfa92
--- /dev/null
+++ 
b/compiler/optimizer/src/main/java/edu/snu/nemo/compiler/optimizer/policy/BasicPushPolicy.java
@@ -0,0 +1,44 @@
+/*
+ * Copyright (C) 2017 Seoul National University
+ *
+ * Licensed under the Apache License, Version 2.0 (the "License");
+ * you may not use this file except in compliance with the License.
+ * You may obtain a copy of the License at
+ *
+ *         http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package edu.snu.nemo.compiler.optimizer.policy;
+
+import edu.snu.nemo.compiler.optimizer.pass.compiletime.CompileTimePass;
+import 
edu.snu.nemo.compiler.optimizer.pass.compiletime.annotating.DefaultStagePartitioningPass;
+import 
edu.snu.nemo.compiler.optimizer.pass.compiletime.annotating.ScheduleGroupPass;
+import 
edu.snu.nemo.compiler.optimizer.pass.compiletime.annotating.ShuffleEdgePushPass;
+import edu.snu.nemo.runtime.common.optimizer.pass.runtime.RuntimePass;
+
+import java.util.ArrayList;
+import java.util.List;
+
+/**
+ * Basic push policy.
+ */
+public final class BasicPushPolicy implements Policy {
+  @Override
+  public List<CompileTimePass> getCompileTimePasses() {
+    List<CompileTimePass> policy = new ArrayList<>();
+    policy.add(new DefaultStagePartitioningPass());
+    policy.add(new ShuffleEdgePushPass());
+    policy.add(new ScheduleGroupPass());
+    return policy;
+  }
+
+  @Override
+  public List<RuntimePass<?>> getRuntimePasses() {
+    return new ArrayList<>();
+  }
+}
diff --git a/pom.xml b/pom.xml
index ba0a0991..6f13f595 100644
--- a/pom.xml
+++ b/pom.xml
@@ -63,6 +63,7 @@ limitations under the License.
         <module>runtime/executor</module>
         <module>runtime/master</module>
         <module>runtime/driver</module>
+        <module>runtime/plangenerator</module>
         <module>tests</module>
     </modules>
 
diff --git 
a/runtime/common/src/main/java/edu/snu/nemo/runtime/common/optimizer/pass/runtime/DataSkewRuntimePass.java
 
b/runtime/common/src/main/java/edu/snu/nemo/runtime/common/optimizer/pass/runtime/DataSkewRuntimePass.java
index cf48631c..8b750009 100644
--- 
a/runtime/common/src/main/java/edu/snu/nemo/runtime/common/optimizer/pass/runtime/DataSkewRuntimePass.java
+++ 
b/runtime/common/src/main/java/edu/snu/nemo/runtime/common/optimizer/pass/runtime/DataSkewRuntimePass.java
@@ -106,7 +106,8 @@ public PhysicalPlan apply(final PhysicalPlan originalPlan, 
final Map<String, Lis
         .map(list -> list.stream()
             .map(pair -> pair.left())
             .max(Integer::compareTo)
-            .orElseThrow(() -> new DynamicOptimizationException("Cannot find 
max hash value in a block.")))
+            .<DynamicOptimizationException>orElseThrow(
+                () -> new DynamicOptimizationException("Cannot find max hash 
value in a block.")))
         .max(Integer::compareTo)
         .orElseThrow(() -> new DynamicOptimizationException("Cannot find max 
hash value among blocks."));
 
diff --git a/runtime/master/pom.xml b/runtime/master/pom.xml
index a797767e..c0a69b2b 100644
--- a/runtime/master/pom.xml
+++ b/runtime/master/pom.xml
@@ -46,6 +46,11 @@ limitations under the License.
             <artifactId>nemo-runtime-common</artifactId>
             <version>${project.version}</version>
         </dependency>
+        <dependency>
+            <groupId>edu.snu.nemo</groupId>
+            <artifactId>nemo-runtime-plangenerator</artifactId>
+            <version>${project.version}</version>
+        </dependency>
         <dependency>
             <groupId>com.fasterxml.jackson.core</groupId>
             <artifactId>jackson-core</artifactId>
diff --git 
a/tests/src/test/java/edu/snu/nemo/tests/runtime/master/scheduler/BatchSingleJobSchedulerTest.java
 
b/runtime/master/src/test/java/edu.snu.nemo.runtime.master/scheduler/BatchSingleJobSchedulerTest.java
similarity index 66%
rename from 
tests/src/test/java/edu/snu/nemo/tests/runtime/master/scheduler/BatchSingleJobSchedulerTest.java
rename to 
runtime/master/src/test/java/edu.snu.nemo.runtime.master/scheduler/BatchSingleJobSchedulerTest.java
index ab39a968..585cddaa 100644
--- 
a/tests/src/test/java/edu/snu/nemo/tests/runtime/master/scheduler/BatchSingleJobSchedulerTest.java
+++ 
b/runtime/master/src/test/java/edu.snu.nemo.runtime.master/scheduler/BatchSingleJobSchedulerTest.java
@@ -13,24 +13,11 @@
  * See the License for the specific language governing permissions and
  * limitations under the License.
  */
-package edu.snu.nemo.tests.runtime.master.scheduler;
+package edu.snu.nemo.runtime.master.scheduler;
 
-import edu.snu.nemo.common.coder.Coder;
 import edu.snu.nemo.common.eventhandler.PubSubEventHandlerWrapper;
-import edu.snu.nemo.common.ir.edge.IREdge;
-import 
edu.snu.nemo.common.ir.edge.executionproperty.DataCommunicationPatternProperty;
-import edu.snu.nemo.common.ir.vertex.IRVertex;
-import edu.snu.nemo.common.ir.vertex.OperatorVertex;
 import 
edu.snu.nemo.common.ir.vertex.executionproperty.ExecutorPlacementProperty;
-import edu.snu.nemo.common.ir.vertex.executionproperty.ParallelismProperty;
-import edu.snu.nemo.compiler.frontend.beam.transform.DoTransform;
-import edu.snu.nemo.common.ir.vertex.transform.Transform;
-import edu.snu.nemo.common.ir.executionproperty.ExecutionProperty;
-import edu.snu.nemo.compiler.optimizer.CompiletimeOptimizer;
-import edu.snu.nemo.compiler.optimizer.examples.EmptyComponents;
 import edu.snu.nemo.conf.JobConf;
-import edu.snu.nemo.runtime.master.scheduler.ExecutorRegistry;
-import edu.snu.nemo.tests.runtime.RuntimeTestUtil;
 import edu.snu.nemo.runtime.common.comm.ControlMessage;
 import edu.snu.nemo.runtime.common.message.MessageSender;
 import edu.snu.nemo.runtime.common.plan.physical.*;
@@ -43,9 +30,7 @@
 import edu.snu.nemo.runtime.master.resource.ExecutorRepresenter;
 import edu.snu.nemo.runtime.master.resource.ResourceSpecification;
 import edu.snu.nemo.common.dag.DAG;
-import edu.snu.nemo.common.dag.DAGBuilder;
-import edu.snu.nemo.runtime.master.scheduler.*;
-import edu.snu.nemo.tests.compiler.optimizer.policy.TestPolicy;
+import edu.snu.nemo.runtime.plangenerator.TestPlanGenerator;
 import org.apache.reef.driver.context.ActiveContext;
 import org.apache.reef.tang.Injector;
 import org.apache.reef.tang.Tang;
@@ -75,7 +60,6 @@
     PubSubEventHandlerWrapper.class, UpdatePhysicalPlanEventHandler.class, 
MetricMessageHandler.class})
 public final class BatchSingleJobSchedulerTest {
   private static final Logger LOG = 
LoggerFactory.getLogger(BatchSingleJobSchedulerTest.class.getName());
-  private DAGBuilder<IRVertex, IREdge> irDAGBuilder;
   private Scheduler scheduler;
   private SchedulingPolicy schedulingPolicy;
   private SchedulerRunner schedulerRunner;
@@ -86,7 +70,6 @@
   private UpdatePhysicalPlanEventHandler updatePhysicalPlanEventHandler;
   private BlockManagerMaster blockManagerMaster = 
mock(BlockManagerMaster.class);
   private final MessageSender<ControlMessage.Message> mockMsgSender = 
mock(MessageSender.class);
-  private PhysicalPlanGenerator physicalPlanGenerator;
 
   private static final int EXECUTOR_CAPACITY = 20;
 
@@ -98,7 +81,6 @@ public void setUp() throws Exception {
     final Injector injector = Tang.Factory.getTang().newInjector();
     injector.bindVolatileParameter(JobConf.DAGDirectory.class, "");
 
-    irDAGBuilder = initializeDAGBuilder();
     executorRegistry = injector.getInstance(ExecutorRegistry.class);
     metricMessageHandler = mock(MetricMessageHandler.class);
     pendingTaskGroupCollection = new SingleJobTaskGroupCollection();
@@ -139,8 +121,6 @@ public void setUp() throws Exception {
     // Add storage nodes
     scheduler.onExecutorAdded(b1);
     scheduler.onExecutorAdded(b2);
-
-    physicalPlanGenerator = injector.getInstance(PhysicalPlanGenerator.class);
   }
 
   /**
@@ -149,9 +129,8 @@ public void setUp() throws Exception {
    */
   @Test(timeout=10000)
   public void testPull() throws Exception {
-    final DAG<IRVertex, IREdge> pullIRDAG = 
CompiletimeOptimizer.optimize(irDAGBuilder.buildWithoutSourceSinkCheck(),
-        new TestPolicy(), "");
-    scheduleAndCheckJobTermination(pullIRDAG);
+    scheduleAndCheckJobTermination(
+        
TestPlanGenerator.generatePhysicalPlan(TestPlanGenerator.PlanType.TwoVerticesJoined,
 false));
   }
 
   /**
@@ -160,68 +139,20 @@ public void testPull() throws Exception {
    */
   @Test(timeout=10000)
   public void testPush() throws Exception {
-    final DAG<IRVertex, IREdge> pushIRDAG = 
CompiletimeOptimizer.optimize(irDAGBuilder.buildWithoutSourceSinkCheck(),
-        new TestPolicy(true), "");
-    scheduleAndCheckJobTermination(pushIRDAG);
-  }
-
-  private DAGBuilder<IRVertex, IREdge> initializeDAGBuilder() {
-    final DAGBuilder<IRVertex, IREdge> dagBuilder = new DAGBuilder<>();
-
-    final Transform t = new EmptyComponents.EmptyTransform("empty");
-    final IRVertex v1 = new OperatorVertex(t);
-    v1.setProperty(ParallelismProperty.of(1));
-    
v1.setProperty(ExecutorPlacementProperty.of(ExecutorPlacementProperty.COMPUTE));
-    dagBuilder.addVertex(v1);
-
-    final IRVertex v2 = new OperatorVertex(t);
-    v2.setProperty(ParallelismProperty.of(2));
-    
v2.setProperty(ExecutorPlacementProperty.of(ExecutorPlacementProperty.COMPUTE));
-    dagBuilder.addVertex(v2);
-
-    final IRVertex v3 = new OperatorVertex(t);
-    v3.setProperty(ParallelismProperty.of(3));
-    
v3.setProperty(ExecutorPlacementProperty.of(ExecutorPlacementProperty.COMPUTE));
-    dagBuilder.addVertex(v3);
-
-    final IRVertex v4 = new OperatorVertex(t);
-    v4.setProperty(ParallelismProperty.of(4));
-    
v4.setProperty(ExecutorPlacementProperty.of(ExecutorPlacementProperty.COMPUTE));
-    dagBuilder.addVertex(v4);
-
-    final IRVertex v5 = new OperatorVertex(new DoTransform(null, null));
-    v5.setProperty(ParallelismProperty.of(5));
-    
v5.setProperty(ExecutorPlacementProperty.of(ExecutorPlacementProperty.TRANSIENT));
-    dagBuilder.addVertex(v5);
-
-    final IREdge e1 = new 
IREdge(DataCommunicationPatternProperty.Value.Shuffle, v1, v2, 
Coder.DUMMY_CODER);
-    dagBuilder.connectVertices(e1);
-
-    final IREdge e2 = new 
IREdge(DataCommunicationPatternProperty.Value.Shuffle, v3, v2, 
Coder.DUMMY_CODER);
-    dagBuilder.connectVertices(e2);
-
-    final IREdge e4 = new 
IREdge(DataCommunicationPatternProperty.Value.Shuffle, v2, v4, 
Coder.DUMMY_CODER);
-    dagBuilder.connectVertices(e4);
-
-    final IREdge e5 = new 
IREdge(DataCommunicationPatternProperty.Value.Shuffle, v2, v5, 
Coder.DUMMY_CODER);
-    dagBuilder.connectVertices(e5);
-
-    return dagBuilder;
+    scheduleAndCheckJobTermination(
+        
TestPlanGenerator.generatePhysicalPlan(TestPlanGenerator.PlanType.TwoVerticesJoined,
 true));
   }
 
-  private void scheduleAndCheckJobTermination(final DAG<IRVertex, IREdge> 
irDAG) throws InjectionException {
-    final DAG<PhysicalStage, PhysicalStageEdge> physicalDAG = 
irDAG.convert(physicalPlanGenerator);
-
-    final PhysicalPlan plan = new PhysicalPlan("TestPlan", physicalDAG, 
physicalPlanGenerator.getTaskIRVertexMap());
+  private void scheduleAndCheckJobTermination(final PhysicalPlan plan) throws 
InjectionException {
     final JobStateManager jobStateManager = new JobStateManager(plan, 
blockManagerMaster, metricMessageHandler, 1);
     scheduler.scheduleJob(plan, jobStateManager);
 
     // For each ScheduleGroup, test:
     // a) all stages in the ScheduleGroup enters the executing state
     // b) the stages of the next ScheduleGroup are scheduled after the stages 
of each ScheduleGroup are made "complete".
-    for (int i = 0; i < getNumScheduleGroups(irDAG); i++) {
+    for (int i = 0; i < getNumScheduleGroups(plan.getStageDAG()); i++) {
       final int scheduleGroupIdx = i;
-      final List<PhysicalStage> stages = 
filterStagesWithAScheduleGroupIndex(physicalDAG, scheduleGroupIdx);
+      final List<PhysicalStage> stages = 
filterStagesWithAScheduleGroupIndex(plan.getStageDAG(), scheduleGroupIdx);
 
       LOG.debug("Checking that all stages of ScheduleGroup {} enter the 
executing state", scheduleGroupIdx);
       stages.forEach(physicalStage -> {
@@ -232,7 +163,7 @@ private void scheduleAndCheckJobTermination(final 
DAG<IRVertex, IREdge> irDAG) t
       });
 
       stages.forEach(physicalStage -> {
-        RuntimeTestUtil.completeStage(
+        SchedulerTestUtil.completeStage(
             jobStateManager, scheduler, executorRegistry, physicalStage, 
MAGIC_SCHEDULE_ATTEMPT_INDEX);
       });
     }
@@ -258,10 +189,9 @@ private void scheduleAndCheckJobTermination(final 
DAG<IRVertex, IREdge> irDAG) t
     return sortedStages;
   }
 
-  private int getNumScheduleGroups(final DAG<IRVertex, IREdge> irDAG) {
+  private int getNumScheduleGroups(final DAG<PhysicalStage, PhysicalStageEdge> 
physicalDAG) {
     final Set<Integer> scheduleGroupSet = new HashSet<>();
-    irDAG.getVertices().forEach(irVertex ->
-        scheduleGroupSet.add((Integer) 
irVertex.getProperty(ExecutionProperty.Key.ScheduleGroupIndex)));
+    physicalDAG.getVertices().forEach(stage -> 
scheduleGroupSet.add(stage.getScheduleGroupIndex()));
     return scheduleGroupSet.size();
   }
 }
diff --git 
a/tests/src/test/java/edu/snu/nemo/tests/runtime/master/scheduler/ContainerTypeAwareSchedulingPolicyTest.java
 
b/runtime/master/src/test/java/edu.snu.nemo.runtime.master/scheduler/ContainerTypeAwareSchedulingPolicyTest.java
similarity index 98%
rename from 
tests/src/test/java/edu/snu/nemo/tests/runtime/master/scheduler/ContainerTypeAwareSchedulingPolicyTest.java
rename to 
runtime/master/src/test/java/edu.snu.nemo.runtime.master/scheduler/ContainerTypeAwareSchedulingPolicyTest.java
index 1f9462a8..b95c2a83 100644
--- 
a/tests/src/test/java/edu/snu/nemo/tests/runtime/master/scheduler/ContainerTypeAwareSchedulingPolicyTest.java
+++ 
b/runtime/master/src/test/java/edu.snu.nemo.runtime.master/scheduler/ContainerTypeAwareSchedulingPolicyTest.java
@@ -13,7 +13,7 @@
  * See the License for the specific language governing permissions and
  * limitations under the License.
  */
-package edu.snu.nemo.tests.runtime.master.scheduler;
+package edu.snu.nemo.runtime.master.scheduler;
 
 import 
edu.snu.nemo.common.ir.vertex.executionproperty.ExecutorPlacementProperty;
 import edu.snu.nemo.runtime.common.plan.physical.ScheduledTaskGroup;
diff --git 
a/tests/src/test/java/edu/snu/nemo/tests/runtime/master/scheduler/FaultToleranceTest.java
 
b/runtime/master/src/test/java/edu.snu.nemo.runtime.master/scheduler/FaultToleranceTest.java
similarity index 72%
rename from 
tests/src/test/java/edu/snu/nemo/tests/runtime/master/scheduler/FaultToleranceTest.java
rename to 
runtime/master/src/test/java/edu.snu.nemo.runtime.master/scheduler/FaultToleranceTest.java
index f6419247..bd171244 100644
--- 
a/tests/src/test/java/edu/snu/nemo/tests/runtime/master/scheduler/FaultToleranceTest.java
+++ 
b/runtime/master/src/test/java/edu.snu.nemo.runtime.master/scheduler/FaultToleranceTest.java
@@ -13,24 +13,13 @@
  * See the License for the specific language governing permissions and
  * limitations under the License.
  */
-package edu.snu.nemo.tests.runtime.master.scheduler;
+package edu.snu.nemo.runtime.master.scheduler;
 
-import edu.snu.nemo.common.coder.Coder;
-import edu.snu.nemo.common.dag.DAG;
 import edu.snu.nemo.common.dag.DAGBuilder;
 import edu.snu.nemo.common.eventhandler.PubSubEventHandlerWrapper;
 import edu.snu.nemo.common.ir.edge.IREdge;
-import 
edu.snu.nemo.common.ir.edge.executionproperty.DataCommunicationPatternProperty;
 import edu.snu.nemo.common.ir.vertex.IRVertex;
-import edu.snu.nemo.common.ir.vertex.OperatorVertex;
 import 
edu.snu.nemo.common.ir.vertex.executionproperty.ExecutorPlacementProperty;
-import edu.snu.nemo.common.ir.vertex.executionproperty.ParallelismProperty;
-import edu.snu.nemo.common.ir.vertex.transform.Transform;
-import edu.snu.nemo.compiler.optimizer.CompiletimeOptimizer;
-import edu.snu.nemo.compiler.optimizer.examples.EmptyComponents;
-import edu.snu.nemo.conf.JobConf;
-import edu.snu.nemo.runtime.master.scheduler.ExecutorRegistry;
-import edu.snu.nemo.tests.runtime.RuntimeTestUtil;
 import edu.snu.nemo.runtime.common.comm.ControlMessage;
 import edu.snu.nemo.runtime.common.message.MessageSender;
 import edu.snu.nemo.runtime.common.plan.physical.*;
@@ -41,8 +30,7 @@
 import edu.snu.nemo.runtime.master.eventhandler.UpdatePhysicalPlanEventHandler;
 import edu.snu.nemo.runtime.master.resource.ExecutorRepresenter;
 import edu.snu.nemo.runtime.master.resource.ResourceSpecification;
-import edu.snu.nemo.runtime.master.scheduler.*;
-import edu.snu.nemo.tests.compiler.optimizer.policy.TestPolicy;
+import edu.snu.nemo.runtime.plangenerator.TestPlanGenerator;
 import org.apache.reef.driver.context.ActiveContext;
 import org.apache.reef.tang.Injector;
 import org.apache.reef.tang.Tang;
@@ -89,7 +77,6 @@
   private BlockManagerMaster blockManagerMaster = 
mock(BlockManagerMaster.class);
   private final MessageSender<ControlMessage.Message> mockMsgSender = 
mock(MessageSender.class);
   private final ExecutorService serExecutorService = 
Executors.newSingleThreadExecutor();
-  private PhysicalPlanGenerator physicalPlanGenerator;
 
   private static final int MAX_SCHEDULE_ATTEMPT = 3;
 
@@ -101,9 +88,6 @@ public void setUp() throws Exception {
     pubSubEventHandler = mock(PubSubEventHandlerWrapper.class);
     updatePhysicalPlanEventHandler = 
mock(UpdatePhysicalPlanEventHandler.class);
 
-    final Injector injector = Tang.Factory.getTang().newInjector();
-    injector.bindVolatileParameter(JobConf.DAGDirectory.class, "");
-    physicalPlanGenerator = injector.getInstance(PhysicalPlanGenerator.class);
   }
 
   private void setUpExecutors(final Collection<ExecutorRepresenter> executors,
@@ -129,53 +113,6 @@ private void setUpExecutors(final 
Collection<ExecutorRepresenter> executors,
     }
   }
 
-  private PhysicalPlan buildPlan() throws Exception {
-    // Build DAG
-    final Transform t = new EmptyComponents.EmptyTransform("empty");
-    final IRVertex v1 = new OperatorVertex(t);
-    v1.setProperty(ParallelismProperty.of(3));
-    
v1.setProperty(ExecutorPlacementProperty.of(ExecutorPlacementProperty.COMPUTE));
-    irDAGBuilder.addVertex(v1);
-
-    final IRVertex v2 = new OperatorVertex(t);
-    v2.setProperty(ParallelismProperty.of(2));
-    
v2.setProperty(ExecutorPlacementProperty.of(ExecutorPlacementProperty.COMPUTE));
-    irDAGBuilder.addVertex(v2);
-
-    final IRVertex v3 = new OperatorVertex(t);
-    v3.setProperty(ParallelismProperty.of(3));
-    
v3.setProperty(ExecutorPlacementProperty.of(ExecutorPlacementProperty.COMPUTE));
-    irDAGBuilder.addVertex(v3);
-
-    final IRVertex v4 = new OperatorVertex(t);
-    v4.setProperty(ParallelismProperty.of(2));
-    
v4.setProperty(ExecutorPlacementProperty.of(ExecutorPlacementProperty.COMPUTE));
-    irDAGBuilder.addVertex(v4);
-
-    final IRVertex v5 = new OperatorVertex(t);
-    v5.setProperty(ParallelismProperty.of(2));
-    
v5.setProperty(ExecutorPlacementProperty.of(ExecutorPlacementProperty.COMPUTE));
-    irDAGBuilder.addVertex(v5);
-
-    final IREdge e1 = new 
IREdge(DataCommunicationPatternProperty.Value.Shuffle, v1, v2, 
Coder.DUMMY_CODER);
-    irDAGBuilder.connectVertices(e1);
-
-    final IREdge e2 = new 
IREdge(DataCommunicationPatternProperty.Value.Shuffle, v3, v2, 
Coder.DUMMY_CODER);
-    irDAGBuilder.connectVertices(e2);
-
-    final IREdge e3 = new 
IREdge(DataCommunicationPatternProperty.Value.Shuffle, v2, v4, 
Coder.DUMMY_CODER);
-    irDAGBuilder.connectVertices(e3);
-
-    final IREdge e4 = new 
IREdge(DataCommunicationPatternProperty.Value.OneToOne, v4, v5, 
Coder.DUMMY_CODER);
-    irDAGBuilder.connectVertices(e4);
-
-    final DAG<IRVertex, IREdge> irDAG =
-        
CompiletimeOptimizer.optimize(irDAGBuilder.buildWithoutSourceSinkCheck(), new 
TestPolicy(), "");
-    final DAG<PhysicalStage, PhysicalStageEdge> physicalDAG = 
irDAG.convert(physicalPlanGenerator);
-
-    return new PhysicalPlan("TestPlan", physicalDAG, 
physicalPlanGenerator.getTaskIRVertexMap());
-  }
-
   /**
    * Tests fault tolerance after a container removal.
    */
@@ -197,7 +134,9 @@ public void testContainerRemoval() throws Exception {
     executors.add(a3);
 
     setUpExecutors(executors, true);
-    final PhysicalPlan plan = buildPlan();
+    final PhysicalPlan plan =
+        
TestPlanGenerator.generatePhysicalPlan(TestPlanGenerator.PlanType.TwoVerticesJoined,
 false);
+
     final JobStateManager jobStateManager =
         new JobStateManager(plan, blockManagerMaster, metricMessageHandler, 
MAX_SCHEDULE_ATTEMPT);
     scheduler.scheduleJob(plan, jobStateManager);
@@ -208,16 +147,16 @@ public void testContainerRemoval() throws Exception {
       if (stage.getScheduleGroupIndex() == 0) {
 
         // There are 3 executors, each of capacity 2, and there are 6 
TaskGroups in ScheduleGroup 0.
-        RuntimeTestUtil.mockSchedulerRunner(pendingTaskGroupCollection, 
schedulingPolicy, jobStateManager,
+        SchedulerTestUtil.mockSchedulerRunner(pendingTaskGroupCollection, 
schedulingPolicy, jobStateManager,
             executorRegistry, false);
         assertTrue(pendingTaskGroupCollection.isEmpty());
         stage.getTaskGroupIds().forEach(taskGroupId ->
-          RuntimeTestUtil.sendTaskGroupStateEventToScheduler(scheduler, 
executorRegistry,
-              taskGroupId, TaskGroupState.State.COMPLETE, 1));
+            SchedulerTestUtil.sendTaskGroupStateEventToScheduler(scheduler, 
executorRegistry,
+                taskGroupId, TaskGroupState.State.COMPLETE, 1));
       } else if (stage.getScheduleGroupIndex() == 1) {
         scheduler.onExecutorRemoved("a3");
         // There are 2 executors, each of capacity 2, and there are 2 
TaskGroups in ScheduleGroup 1.
-        RuntimeTestUtil.mockSchedulerRunner(pendingTaskGroupCollection, 
schedulingPolicy, jobStateManager,
+        SchedulerTestUtil.mockSchedulerRunner(pendingTaskGroupCollection, 
schedulingPolicy, jobStateManager,
             executorRegistry, false);
 
         // Due to round robin scheduling, "a2" is assured to have a running 
TaskGroup.
@@ -228,16 +167,16 @@ public void testContainerRemoval() throws Exception {
         }
         assertEquals(jobStateManager.getAttemptCountForStage(stage.getId()), 
2);
 
-        RuntimeTestUtil.mockSchedulerRunner(pendingTaskGroupCollection, 
schedulingPolicy, jobStateManager,
+        SchedulerTestUtil.mockSchedulerRunner(pendingTaskGroupCollection, 
schedulingPolicy, jobStateManager,
             executorRegistry, false);
         assertTrue(pendingTaskGroupCollection.isEmpty());
         stage.getTaskGroupIds().forEach(taskGroupId ->
-          RuntimeTestUtil.sendTaskGroupStateEventToScheduler(scheduler, 
executorRegistry,
-              taskGroupId, TaskGroupState.State.COMPLETE, 1));
+            SchedulerTestUtil.sendTaskGroupStateEventToScheduler(scheduler, 
executorRegistry,
+                taskGroupId, TaskGroupState.State.COMPLETE, 1));
       } else {
         // There are 1 executors, each of capacity 2, and there are 2 
TaskGroups in ScheduleGroup 2.
         // Schedule only the first TaskGroup
-        RuntimeTestUtil.mockSchedulerRunner(pendingTaskGroupCollection, 
schedulingPolicy, jobStateManager,
+        SchedulerTestUtil.mockSchedulerRunner(pendingTaskGroupCollection, 
schedulingPolicy, jobStateManager,
             executorRegistry, true);
       }
     }
@@ -264,7 +203,8 @@ public void testOutputFailure() throws Exception {
     executors.add(a3);
     setUpExecutors(executors, true);
 
-    final PhysicalPlan plan = buildPlan();
+    final PhysicalPlan plan =
+        
TestPlanGenerator.generatePhysicalPlan(TestPlanGenerator.PlanType.TwoVerticesJoined,
 false);
     final JobStateManager jobStateManager =
         new JobStateManager(plan, blockManagerMaster, metricMessageHandler, 
MAX_SCHEDULE_ATTEMPT);
     scheduler.scheduleJob(plan, jobStateManager);
@@ -275,21 +215,21 @@ public void testOutputFailure() throws Exception {
       if (stage.getScheduleGroupIndex() == 0) {
 
         // There are 3 executors, each of capacity 2, and there are 6 
TaskGroups in ScheduleGroup 0.
-        RuntimeTestUtil.mockSchedulerRunner(pendingTaskGroupCollection, 
schedulingPolicy, jobStateManager,
+        SchedulerTestUtil.mockSchedulerRunner(pendingTaskGroupCollection, 
schedulingPolicy, jobStateManager,
             executorRegistry, false);
         assertTrue(pendingTaskGroupCollection.isEmpty());
         stage.getTaskGroupIds().forEach(taskGroupId ->
-          RuntimeTestUtil.sendTaskGroupStateEventToScheduler(scheduler, 
executorRegistry,
-              taskGroupId, TaskGroupState.State.COMPLETE, 1));
+            SchedulerTestUtil.sendTaskGroupStateEventToScheduler(scheduler, 
executorRegistry,
+                taskGroupId, TaskGroupState.State.COMPLETE, 1));
       } else if (stage.getScheduleGroupIndex() == 1) {
         // There are 3 executors, each of capacity 2, and there are 2 
TaskGroups in ScheduleGroup 1.
-        RuntimeTestUtil.mockSchedulerRunner(pendingTaskGroupCollection, 
schedulingPolicy, jobStateManager,
+        SchedulerTestUtil.mockSchedulerRunner(pendingTaskGroupCollection, 
schedulingPolicy, jobStateManager,
             executorRegistry, false);
         assertTrue(pendingTaskGroupCollection.isEmpty());
         stage.getTaskGroupIds().forEach(taskGroupId ->
-          RuntimeTestUtil.sendTaskGroupStateEventToScheduler(scheduler, 
executorRegistry,
-              taskGroupId, TaskGroupState.State.FAILED_RECOVERABLE, 1,
-              TaskGroupState.RecoverableFailureCause.OUTPUT_WRITE_FAILURE));
+            SchedulerTestUtil.sendTaskGroupStateEventToScheduler(scheduler, 
executorRegistry,
+                taskGroupId, TaskGroupState.State.FAILED_RECOVERABLE, 1,
+                TaskGroupState.RecoverableFailureCause.OUTPUT_WRITE_FAILURE));
 
         while 
(jobStateManager.getStageState(stage.getId()).getStateMachine().getCurrentState()
 != EXECUTING) {
 
@@ -326,7 +266,8 @@ public void testInputReadFailure() throws Exception {
     executors.add(a3);
     setUpExecutors(executors, true);
 
-    final PhysicalPlan plan = buildPlan();
+    final PhysicalPlan plan =
+        
TestPlanGenerator.generatePhysicalPlan(TestPlanGenerator.PlanType.TwoVerticesJoined,
 false);
     final JobStateManager jobStateManager =
         new JobStateManager(plan, blockManagerMaster, metricMessageHandler, 
MAX_SCHEDULE_ATTEMPT);
     scheduler.scheduleJob(plan, jobStateManager);
@@ -337,21 +278,21 @@ public void testInputReadFailure() throws Exception {
       if (stage.getScheduleGroupIndex() == 0) {
 
         // There are 3 executors, each of capacity 2, and there are 6 
TaskGroups in ScheduleGroup 0.
-        RuntimeTestUtil.mockSchedulerRunner(pendingTaskGroupCollection, 
schedulingPolicy, jobStateManager,
+        SchedulerTestUtil.mockSchedulerRunner(pendingTaskGroupCollection, 
schedulingPolicy, jobStateManager,
             executorRegistry, false);
         assertTrue(pendingTaskGroupCollection.isEmpty());
         stage.getTaskGroupIds().forEach(taskGroupId ->
-          RuntimeTestUtil.sendTaskGroupStateEventToScheduler(scheduler, 
executorRegistry,
-              taskGroupId, TaskGroupState.State.COMPLETE, 1));
+            SchedulerTestUtil.sendTaskGroupStateEventToScheduler(scheduler, 
executorRegistry,
+                taskGroupId, TaskGroupState.State.COMPLETE, 1));
       } else if (stage.getScheduleGroupIndex() == 1) {
         // There are 3 executors, each of capacity 2, and there are 2 
TaskGroups in ScheduleGroup 1.
-        RuntimeTestUtil.mockSchedulerRunner(pendingTaskGroupCollection, 
schedulingPolicy, jobStateManager,
+        SchedulerTestUtil.mockSchedulerRunner(pendingTaskGroupCollection, 
schedulingPolicy, jobStateManager,
             executorRegistry, false);
 
         stage.getTaskGroupIds().forEach(taskGroupId ->
-          RuntimeTestUtil.sendTaskGroupStateEventToScheduler(scheduler, 
executorRegistry,
-              taskGroupId, TaskGroupState.State.FAILED_RECOVERABLE, 1,
-              TaskGroupState.RecoverableFailureCause.INPUT_READ_FAILURE));
+            SchedulerTestUtil.sendTaskGroupStateEventToScheduler(scheduler, 
executorRegistry,
+                taskGroupId, TaskGroupState.State.FAILED_RECOVERABLE, 1,
+                TaskGroupState.RecoverableFailureCause.INPUT_READ_FAILURE));
 
         while 
(jobStateManager.getStageState(stage.getId()).getStateMachine().getCurrentState()
 != EXECUTING) {
 
@@ -371,7 +312,7 @@ public void testInputReadFailure() throws Exception {
    */
   @Test(timeout=10000)
   public void testTaskGroupReexecutionForFailure() throws Exception {
-  final ActiveContext activeContext = mock(ActiveContext.class);
+    final ActiveContext activeContext = mock(ActiveContext.class);
     Mockito.doThrow(new RuntimeException()).when(activeContext).close();
 
     final ResourceSpecification computeSpec = new 
ResourceSpecification(ExecutorPlacementProperty.COMPUTE, 2, 0);
@@ -388,7 +329,10 @@ public void testTaskGroupReexecutionForFailure() throws 
Exception {
 
     setUpExecutors(executors, false);
 
-    final PhysicalPlan plan = buildPlan();
+    final PhysicalPlan plan =
+        
TestPlanGenerator.generatePhysicalPlan(TestPlanGenerator.PlanType.TwoVerticesJoined,
 false);
+
+
     final JobStateManager jobStateManager =
         new JobStateManager(plan, blockManagerMaster, metricMessageHandler, 
MAX_SCHEDULE_ATTEMPT);
 
@@ -403,11 +347,11 @@ public void testTaskGroupReexecutionForFailure() throws 
Exception {
         final Set<String> a3RunningTaskGroups = new 
HashSet<>(a3.getRunningTaskGroups());
 
         a1RunningTaskGroups.forEach(taskGroupId ->
-            RuntimeTestUtil.sendTaskGroupStateEventToScheduler(scheduler, 
executorRegistry,
+            SchedulerTestUtil.sendTaskGroupStateEventToScheduler(scheduler, 
executorRegistry,
                 taskGroupId, TaskGroupState.State.COMPLETE, 1));
 
         a3RunningTaskGroups.forEach(taskGroupId ->
-            RuntimeTestUtil.sendTaskGroupStateEventToScheduler(scheduler, 
executorRegistry,
+            SchedulerTestUtil.sendTaskGroupStateEventToScheduler(scheduler, 
executorRegistry,
                 taskGroupId, TaskGroupState.State.COMPLETE, 1));
       }
     }
diff --git 
a/tests/src/test/java/edu/snu/nemo/tests/runtime/master/scheduler/FreeSlotSchedulingPolicyTest.java
 
b/runtime/master/src/test/java/edu.snu.nemo.runtime.master/scheduler/FreeSlotSchedulingPolicyTest.java
similarity index 98%
rename from 
tests/src/test/java/edu/snu/nemo/tests/runtime/master/scheduler/FreeSlotSchedulingPolicyTest.java
rename to 
runtime/master/src/test/java/edu.snu.nemo.runtime.master/scheduler/FreeSlotSchedulingPolicyTest.java
index 5308ab6a..162a1402 100644
--- 
a/tests/src/test/java/edu/snu/nemo/tests/runtime/master/scheduler/FreeSlotSchedulingPolicyTest.java
+++ 
b/runtime/master/src/test/java/edu.snu.nemo.runtime.master/scheduler/FreeSlotSchedulingPolicyTest.java
@@ -13,7 +13,7 @@
  * See the License for the specific language governing permissions and
  * limitations under the License.
  */
-package edu.snu.nemo.tests.runtime.master.scheduler;
+package edu.snu.nemo.runtime.master.scheduler;
 
 import edu.snu.nemo.runtime.common.plan.physical.ScheduledTaskGroup;
 import edu.snu.nemo.runtime.master.resource.ExecutorRepresenter;
diff --git 
a/tests/src/test/java/edu/snu/nemo/tests/runtime/master/scheduler/RoundRobinSchedulingPolicyTest.java
 
b/runtime/master/src/test/java/edu.snu.nemo.runtime.master/scheduler/RoundRobinSchedulingPolicyTest.java
similarity index 93%
rename from 
tests/src/test/java/edu/snu/nemo/tests/runtime/master/scheduler/RoundRobinSchedulingPolicyTest.java
rename to 
runtime/master/src/test/java/edu.snu.nemo.runtime.master/scheduler/RoundRobinSchedulingPolicyTest.java
index 6061fc06..04d30759 100644
--- 
a/tests/src/test/java/edu/snu/nemo/tests/runtime/master/scheduler/RoundRobinSchedulingPolicyTest.java
+++ 
b/runtime/master/src/test/java/edu.snu.nemo.runtime.master/scheduler/RoundRobinSchedulingPolicyTest.java
@@ -13,12 +13,10 @@
  * See the License for the specific language governing permissions and
  * limitations under the License.
  */
-package edu.snu.nemo.tests.runtime.master.scheduler;
+package edu.snu.nemo.runtime.master.scheduler;
 
 import edu.snu.nemo.runtime.common.plan.physical.ScheduledTaskGroup;
 import edu.snu.nemo.runtime.master.resource.ExecutorRepresenter;
-import edu.snu.nemo.runtime.master.scheduler.RoundRobinSchedulingPolicy;
-import edu.snu.nemo.runtime.master.scheduler.SchedulingPolicy;
 import org.junit.Test;
 import org.junit.runner.RunWith;
 import org.powermock.core.classloader.annotations.PrepareForTest;
diff --git 
a/runtime/master/src/test/java/edu.snu.nemo.runtime.master/scheduler/SchedulerTestUtil.java
 
b/runtime/master/src/test/java/edu.snu.nemo.runtime.master/scheduler/SchedulerTestUtil.java
new file mode 100644
index 00000000..e3b1abba
--- /dev/null
+++ 
b/runtime/master/src/test/java/edu.snu.nemo.runtime.master/scheduler/SchedulerTestUtil.java
@@ -0,0 +1,148 @@
+/*
+ * Copyright (C) 2017 Seoul National University
+ *
+ * Licensed under the Apache License, Version 2.0 (the "License");
+ * you may not use this file except in compliance with the License.
+ * You may obtain a copy of the License at
+ *
+ *         http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package edu.snu.nemo.runtime.master.scheduler;
+
+import edu.snu.nemo.runtime.common.plan.physical.PhysicalStage;
+import edu.snu.nemo.runtime.common.plan.physical.ScheduledTaskGroup;
+import edu.snu.nemo.runtime.common.state.StageState;
+import edu.snu.nemo.runtime.common.state.TaskGroupState;
+import edu.snu.nemo.runtime.master.JobStateManager;
+import edu.snu.nemo.runtime.master.resource.ExecutorRepresenter;
+
+import java.util.Set;
+import java.util.stream.Collectors;
+
+/**
+ * Utility class for runtime unit tests.
+ */
+public final class SchedulerTestUtil {
+  /**
+   * Complete the stage by completing all of its TaskGroups.
+   * @param jobStateManager for the submitted job.
+   * @param scheduler for the submitted job.
+   * @param executorRegistry provides executor representers
+   * @param physicalStage for which the states should be marked as complete.
+   */
+  public static void completeStage(final JobStateManager jobStateManager,
+                                   final Scheduler scheduler,
+                                   final ExecutorRegistry executorRegistry,
+                                   final PhysicalStage physicalStage,
+                                   final int attemptIdx) {
+    // Loop until the stage completes.
+    while (true) {
+      final Enum stageState = 
jobStateManager.getStageState(physicalStage.getId()).getStateMachine().getCurrentState();
+      if (StageState.State.COMPLETE == stageState) {
+        // Stage has completed, so we break out of the loop.
+        break;
+      } else if (StageState.State.EXECUTING == stageState) {
+        physicalStage.getTaskGroupIds().forEach(taskGroupId -> {
+          final Enum tgState = 
jobStateManager.getTaskGroupState(taskGroupId).getStateMachine().getCurrentState();
+          if (TaskGroupState.State.EXECUTING == tgState) {
+            sendTaskGroupStateEventToScheduler(scheduler, executorRegistry, 
taskGroupId,
+                TaskGroupState.State.COMPLETE, attemptIdx, null);
+          } else if (TaskGroupState.State.READY == tgState || 
TaskGroupState.State.COMPLETE == tgState) {
+            // Skip READY (try in the next loop and see if it becomes 
EXECUTING) and COMPLETE.
+          } else {
+            throw new IllegalStateException(tgState.toString());
+          }
+        });
+      } else if (StageState.State.READY == stageState) {
+        // Skip and retry in the next loop.
+      } else {
+        throw new IllegalStateException(stageState.toString());
+      }
+    }
+  }
+
+  /**
+   * Sends task group state change event to scheduler.
+   * This replaces executor's task group completion messages for testing 
purposes.
+   * @param scheduler for the submitted job.
+   * @param executorRegistry provides executor representers
+   * @param taskGroupId for the task group to change the state.
+   * @param newState for the task group.
+   * @param cause in the case of a recoverable failure.
+   */
+  public static void sendTaskGroupStateEventToScheduler(final Scheduler 
scheduler,
+                                                        final ExecutorRegistry 
executorRegistry,
+                                                        final String 
taskGroupId,
+                                                        final 
TaskGroupState.State newState,
+                                                        final int attemptIdx,
+                                                        final 
TaskGroupState.RecoverableFailureCause cause) {
+    ExecutorRepresenter scheduledExecutor;
+    do {
+      scheduledExecutor = findExecutorForTaskGroup(executorRegistry, 
taskGroupId);
+    } while (scheduledExecutor == null);
+
+    scheduler.onTaskGroupStateChanged(scheduledExecutor.getExecutorId(), 
taskGroupId,
+        newState, attemptIdx, null, cause);
+  }
+
+  public static void sendTaskGroupStateEventToScheduler(final Scheduler 
scheduler,
+                                                        final ExecutorRegistry 
executorRegistry,
+                                                        final String 
taskGroupId,
+                                                        final 
TaskGroupState.State newState,
+                                                        final int attemptIdx) {
+    sendTaskGroupStateEventToScheduler(scheduler, executorRegistry, 
taskGroupId, newState, attemptIdx, null);
+  }
+
+  public static void mockSchedulerRunner(final PendingTaskGroupCollection 
pendingTaskGroupCollection,
+                                         final SchedulingPolicy 
schedulingPolicy,
+                                         final JobStateManager jobStateManager,
+                                         final ExecutorRegistry 
executorRegistry,
+                                         final boolean isPartialSchedule) {
+    while (!pendingTaskGroupCollection.isEmpty()) {
+      final ScheduledTaskGroup taskGroupToSchedule = 
pendingTaskGroupCollection.remove(
+          
pendingTaskGroupCollection.peekSchedulableTaskGroups().get().iterator().next().getTaskGroupId());
+
+      final Set<ExecutorRepresenter> runningExecutorRepresenter =
+          executorRegistry.getRunningExecutorIds().stream()
+              .map(executorId -> 
executorRegistry.getExecutorRepresenter(executorId))
+              .collect(Collectors.toSet());
+      final Set<ExecutorRepresenter> candidateExecutors =
+          
schedulingPolicy.filterExecutorRepresenters(runningExecutorRepresenter, 
taskGroupToSchedule);
+      if (candidateExecutors.size() > 0) {
+        
jobStateManager.onTaskGroupStateChanged(taskGroupToSchedule.getTaskGroupId(),
+            TaskGroupState.State.EXECUTING);
+        final ExecutorRepresenter executor = 
candidateExecutors.stream().findFirst().get();
+        executor.onTaskGroupScheduled(taskGroupToSchedule);
+      }
+
+      // Schedule only the first task group.
+      if (isPartialSchedule) {
+        break;
+      }
+    }
+  }
+
+  /**
+   * Retrieves the executor to which the given task group was scheduled.
+   * @param taskGroupId of the task group to search.
+   * @param executorRegistry provides executor representers
+   * @return the {@link ExecutorRepresenter} of the executor the task group 
was scheduled to.
+   */
+  private static ExecutorRepresenter findExecutorForTaskGroup(final 
ExecutorRegistry executorRegistry,
+                                                              final String 
taskGroupId) {
+    for (final String executorId : executorRegistry.getRunningExecutorIds()) {
+      final ExecutorRepresenter executor = 
executorRegistry.getRunningExecutorRepresenter(executorId);
+      if (executor.getRunningTaskGroups().contains(taskGroupId)
+          || executor.getCompleteTaskGroups().contains(taskGroupId)) {
+        return executor;
+      }
+    }
+    return null;
+  }
+}
diff --git 
a/tests/src/test/java/edu/snu/nemo/tests/runtime/master/scheduler/SingleTaskGroupQueueTest.java
 
b/runtime/master/src/test/java/edu.snu.nemo.runtime.master/scheduler/SingleTaskGroupQueueTest.java
similarity index 59%
rename from 
tests/src/test/java/edu/snu/nemo/tests/runtime/master/scheduler/SingleTaskGroupQueueTest.java
rename to 
runtime/master/src/test/java/edu.snu.nemo.runtime.master/scheduler/SingleTaskGroupQueueTest.java
index 41bdb4dc..2e86dbbf 100644
--- 
a/tests/src/test/java/edu/snu/nemo/tests/runtime/master/scheduler/SingleTaskGroupQueueTest.java
+++ 
b/runtime/master/src/test/java/edu.snu.nemo.runtime.master/scheduler/SingleTaskGroupQueueTest.java
@@ -13,26 +13,11 @@
  * See the License for the specific language governing permissions and
  * limitations under the License.
  */
-package edu.snu.nemo.tests.runtime.master.scheduler;
-
-import edu.snu.nemo.common.coder.Coder;
-import edu.snu.nemo.common.dag.DAG;
-import edu.snu.nemo.common.dag.DAGBuilder;
-import edu.snu.nemo.common.ir.vertex.transform.Transform;
-import edu.snu.nemo.common.ir.edge.IREdge;
-import 
edu.snu.nemo.common.ir.edge.executionproperty.DataCommunicationPatternProperty;
-import edu.snu.nemo.common.ir.vertex.IRVertex;
-import edu.snu.nemo.common.ir.vertex.OperatorVertex;
-import 
edu.snu.nemo.common.ir.vertex.executionproperty.ExecutorPlacementProperty;
-import edu.snu.nemo.common.ir.vertex.executionproperty.ParallelismProperty;
-import edu.snu.nemo.compiler.optimizer.CompiletimeOptimizer;
-import edu.snu.nemo.conf.JobConf;
+package edu.snu.nemo.runtime.master.scheduler;
+
 import edu.snu.nemo.runtime.common.RuntimeIdGenerator;
 import edu.snu.nemo.runtime.common.plan.physical.*;
-import edu.snu.nemo.runtime.master.scheduler.SingleJobTaskGroupCollection;
-import edu.snu.nemo.tests.compiler.optimizer.policy.TestPolicy;
-import org.apache.reef.tang.Injector;
-import org.apache.reef.tang.Tang;
+import edu.snu.nemo.runtime.plangenerator.TestPlanGenerator;
 import org.junit.Before;
 import org.junit.Test;
 
@@ -46,15 +31,12 @@
 
 import static org.junit.Assert.assertEquals;
 import static org.junit.Assert.assertTrue;
-import static org.mockito.Mockito.mock;
 
 /**
  * Tests {@link SingleJobTaskGroupCollection}.
  */
 public final class SingleTaskGroupQueueTest {
-  private DAGBuilder<IRVertex, IREdge> irDAGBuilder;
   private SingleJobTaskGroupCollection pendingTaskGroupPriorityQueue;
-  private PhysicalPlanGenerator physicalPlanGenerator;
 
   /**
    * To be used for a thread pool to execute task groups.
@@ -63,13 +45,8 @@
 
   @Before
   public void setUp() throws Exception{
-    irDAGBuilder = new DAGBuilder<>();
     pendingTaskGroupPriorityQueue = new SingleJobTaskGroupCollection();
     executorService = Executors.newFixedThreadPool(2);
-
-    final Injector injector = Tang.Factory.getTang().newInjector();
-    injector.bindVolatileParameter(JobConf.DAGDirectory.class, "");
-    physicalPlanGenerator = injector.getInstance(PhysicalPlanGenerator.class);
   }
 
   /**
@@ -78,37 +55,11 @@ public void setUp() throws Exception{
    */
   @Test
   public void testPushPriority() throws Exception {
-    final Transform t = mock(Transform.class);
-    final IRVertex v1 = new OperatorVertex(t);
-    v1.setProperty(ParallelismProperty.of(3));
-    
v1.setProperty(ExecutorPlacementProperty.of(ExecutorPlacementProperty.COMPUTE));
-    irDAGBuilder.addVertex(v1);
-
-    final IRVertex v2 = new OperatorVertex(t);
-    v2.setProperty(ParallelismProperty.of(2));
-    
v2.setProperty(ExecutorPlacementProperty.of(ExecutorPlacementProperty.COMPUTE));
-    irDAGBuilder.addVertex(v2);
-
-    final IRVertex v3 = new OperatorVertex(t);
-    v3.setProperty(ParallelismProperty.of(2));
-    
v3.setProperty(ExecutorPlacementProperty.of(ExecutorPlacementProperty.COMPUTE));
-    irDAGBuilder.addVertex(v3);
-
-    final IREdge e1 = new 
IREdge(DataCommunicationPatternProperty.Value.Shuffle, v1, v2, 
Coder.DUMMY_CODER);
-    irDAGBuilder.connectVertices(e1);
-
-    final IREdge e2 = new 
IREdge(DataCommunicationPatternProperty.Value.OneToOne, v2, v3, 
Coder.DUMMY_CODER);
-    irDAGBuilder.connectVertices(e2);
-
-    final DAG<IRVertex, IREdge> irDAG = 
CompiletimeOptimizer.optimize(irDAGBuilder.buildWithoutSourceSinkCheck(),
-        new TestPolicy(true), "");
+    final PhysicalPlan physicalPlan =
+        
TestPlanGenerator.generatePhysicalPlan(TestPlanGenerator.PlanType.ThreeSequentialVertices,
 true);
 
-    final DAG<PhysicalStage, PhysicalStageEdge> physicalDAG = 
irDAG.convert(physicalPlanGenerator);
-
-    pendingTaskGroupPriorityQueue.onJobScheduled(
-        new PhysicalPlan("TestPlan", physicalDAG, 
physicalPlanGenerator.getTaskIRVertexMap()));
-
-    final List<PhysicalStage> dagOf2Stages = physicalDAG.getTopologicalSort();
+    pendingTaskGroupPriorityQueue.onJobScheduled(physicalPlan);
+    final List<PhysicalStage> dagOf2Stages = 
physicalPlan.getStageDAG().getTopologicalSort();
 
     // Make sure that ScheduleGroups have been assigned to satisfy PendingPQ's 
requirements.
     assertEquals(dagOf2Stages.get(0).getScheduleGroupIndex(), 
dagOf2Stages.get(1).getScheduleGroupIndex());
@@ -161,37 +112,10 @@ public void testPushPriority() throws Exception {
    */
   @Test
   public void testPullPriority() throws Exception {
-    final Transform t = mock(Transform.class);
-    final IRVertex v1 = new OperatorVertex(t);
-    v1.setProperty(ParallelismProperty.of(3));
-    
v1.setProperty(ExecutorPlacementProperty.of(ExecutorPlacementProperty.COMPUTE));
-    irDAGBuilder.addVertex(v1);
-
-    final IRVertex v2 = new OperatorVertex(t);
-    v2.setProperty(ParallelismProperty.of(2));
-    
v2.setProperty(ExecutorPlacementProperty.of(ExecutorPlacementProperty.COMPUTE));
-    irDAGBuilder.addVertex(v2);
-
-    final IRVertex v3 = new OperatorVertex(t);
-    v3.setProperty(ParallelismProperty.of(2));
-    
v3.setProperty(ExecutorPlacementProperty.of(ExecutorPlacementProperty.COMPUTE));
-    irDAGBuilder.addVertex(v3);
-
-    final IREdge e1 = new 
IREdge(DataCommunicationPatternProperty.Value.Shuffle, v1, v2, 
Coder.DUMMY_CODER);
-    irDAGBuilder.connectVertices(e1);
-
-    final IREdge e2 = new 
IREdge(DataCommunicationPatternProperty.Value.OneToOne, v2, v3, 
Coder.DUMMY_CODER);
-    irDAGBuilder.connectVertices(e2);
-
-    final DAG<IRVertex, IREdge> irDAG = 
CompiletimeOptimizer.optimize(irDAGBuilder.buildWithoutSourceSinkCheck(),
-        new TestPolicy(), "");
-
-    final DAG<PhysicalStage, PhysicalStageEdge> physicalDAG = 
irDAG.convert(physicalPlanGenerator);
-
-    pendingTaskGroupPriorityQueue.onJobScheduled(
-        new PhysicalPlan("TestPlan", physicalDAG, 
physicalPlanGenerator.getTaskIRVertexMap()));
-
-    final List<PhysicalStage> dagOf2Stages = physicalDAG.getTopologicalSort();
+    final PhysicalPlan physicalPlan =
+        
TestPlanGenerator.generatePhysicalPlan(TestPlanGenerator.PlanType.ThreeSequentialVertices,
 false);
+    pendingTaskGroupPriorityQueue.onJobScheduled(physicalPlan);
+    final List<PhysicalStage> dagOf2Stages = 
physicalPlan.getStageDAG().getTopologicalSort();
 
     // Make sure that ScheduleGroups have been assigned to satisfy PendingPQ's 
requirements.
     assertEquals(dagOf2Stages.get(0).getScheduleGroupIndex(), 0);
@@ -241,37 +165,10 @@ public void testPullPriority() throws Exception {
    */
   @Test
   public void testWithDifferentContainerType() throws Exception {
-    final Transform t = mock(Transform.class);
-    final IRVertex v1 = new OperatorVertex(t);
-    v1.setProperty(ParallelismProperty.of(3));
-    
v1.setProperty(ExecutorPlacementProperty.of(ExecutorPlacementProperty.COMPUTE));
-    irDAGBuilder.addVertex(v1);
-
-    final IRVertex v2 = new OperatorVertex(t);
-    v2.setProperty(ParallelismProperty.of(2));
-    
v2.setProperty(ExecutorPlacementProperty.of(ExecutorPlacementProperty.TRANSIENT));
-    irDAGBuilder.addVertex(v2);
-
-    final IRVertex v3 = new OperatorVertex(t);
-    v3.setProperty(ParallelismProperty.of(2));
-    
v3.setProperty(ExecutorPlacementProperty.of(ExecutorPlacementProperty.TRANSIENT));
-    irDAGBuilder.addVertex(v3);
-
-    final IREdge e1 = new 
IREdge(DataCommunicationPatternProperty.Value.Shuffle, v1, v2, 
Coder.DUMMY_CODER);
-    irDAGBuilder.connectVertices(e1);
-
-    final IREdge e2 = new 
IREdge(DataCommunicationPatternProperty.Value.OneToOne, v2, v3, 
Coder.DUMMY_CODER);
-    irDAGBuilder.connectVertices(e2);
-
-    final DAG<IRVertex, IREdge> irDAG = 
CompiletimeOptimizer.optimize(irDAGBuilder.buildWithoutSourceSinkCheck(),
-        new TestPolicy(true), "");
-
-    final DAG<PhysicalStage, PhysicalStageEdge> physicalDAG = 
irDAG.convert(physicalPlanGenerator);
-
-    pendingTaskGroupPriorityQueue.onJobScheduled(
-        new PhysicalPlan("TestPlan", physicalDAG, 
physicalPlanGenerator.getTaskIRVertexMap()));
-
-    final List<PhysicalStage> dagOf2Stages = physicalDAG.getTopologicalSort();
+    final PhysicalPlan physicalPlan = TestPlanGenerator.generatePhysicalPlan(
+        
TestPlanGenerator.PlanType.ThreeSequentialVerticesWithDifferentContainerTypes, 
true);
+    pendingTaskGroupPriorityQueue.onJobScheduled(physicalPlan);
+    final List<PhysicalStage> dagOf2Stages = 
physicalPlan.getStageDAG().getTopologicalSort();
 
     // Make sure that ScheduleGroups have been assigned to satisfy PendingPQ's 
requirements.
     assertEquals(dagOf2Stages.get(0).getScheduleGroupIndex(), 
dagOf2Stages.get(1).getScheduleGroupIndex());
diff --git 
a/tests/src/test/java/edu/snu/nemo/tests/runtime/master/scheduler/SourceLocationAwareSchedulingPolicyTest.java
 
b/runtime/master/src/test/java/edu.snu.nemo.runtime.master/scheduler/SourceLocationAwareSchedulingPolicyTest.java
similarity index 97%
rename from 
tests/src/test/java/edu/snu/nemo/tests/runtime/master/scheduler/SourceLocationAwareSchedulingPolicyTest.java
rename to 
runtime/master/src/test/java/edu.snu.nemo.runtime.master/scheduler/SourceLocationAwareSchedulingPolicyTest.java
index 810bb313..6d5fc43a 100644
--- 
a/tests/src/test/java/edu/snu/nemo/tests/runtime/master/scheduler/SourceLocationAwareSchedulingPolicyTest.java
+++ 
b/runtime/master/src/test/java/edu.snu.nemo.runtime.master/scheduler/SourceLocationAwareSchedulingPolicyTest.java
@@ -13,13 +13,11 @@
  * See the License for the specific language governing permissions and
  * limitations under the License.
  */
-package edu.snu.nemo.tests.runtime.master.scheduler;
+package edu.snu.nemo.runtime.master.scheduler;
 
 import edu.snu.nemo.runtime.common.plan.physical.ScheduledTaskGroup;
 import edu.snu.nemo.common.ir.Readable;
 import edu.snu.nemo.runtime.master.resource.ExecutorRepresenter;
-import edu.snu.nemo.runtime.master.scheduler.SchedulingPolicy;
-import 
edu.snu.nemo.runtime.master.scheduler.SourceLocationAwareSchedulingPolicy;
 import org.junit.Test;
 import org.junit.runner.RunWith;
 import org.powermock.core.classloader.annotations.PrepareForTest;
diff --git a/runtime/plangenerator/pom.xml b/runtime/plangenerator/pom.xml
new file mode 100644
index 00000000..26ced4fc
--- /dev/null
+++ b/runtime/plangenerator/pom.xml
@@ -0,0 +1,50 @@
+<?xml version="1.0" encoding="UTF-8"?>
+<!--
+Copyright (C) 2017 Seoul National University
+Licensed under the Apache License, Version 2.0 (the "License");
+you may not use this file except in compliance with the License.
+You may obtain a copy of the License at
+
+  http://www.apache.org/licenses/LICENSE-2.0
+
+Unless required by applicable law or agreed to in writing, software
+distributed under the License is distributed on an "AS IS" BASIS,
+WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+See the License for the specific language governing permissions and
+limitations under the License.
+-->
+<project xmlns="http://maven.apache.org/POM/4.0.0";
+         xmlns:xsi="http://www.w3.org/2001/XMLSchema-instance";
+         xsi:schemaLocation="http://maven.apache.org/POM/4.0.0 
http://maven.apache.org/xsd/maven-4.0.0.xsd";>
+    <modelVersion>4.0.0</modelVersion>
+
+    <parent>
+        <groupId>edu.snu.nemo</groupId>
+        <artifactId>nemo-project</artifactId>
+        <version>0.1-SNAPSHOT</version>
+        <relativePath>../../</relativePath>
+    </parent>
+
+    <artifactId>nemo-runtime-plangenerator</artifactId>
+    <name>Nemo Runtime Test Plans</name>
+
+    <repositories>
+        <repository>
+            <id>Bundled Maven Repository</id>
+            
<url>file://${basedir}/../../common/src/main/resources/repository</url>
+        </repository>
+    </repositories>
+
+    <dependencies>
+        <dependency>
+            <groupId>edu.snu.nemo</groupId>
+            <artifactId>nemo-runtime-common</artifactId>
+            <version>${project.version}</version>
+        </dependency>
+        <dependency>
+            <groupId>edu.snu.nemo</groupId>
+            <artifactId>nemo-compiler-optimizer</artifactId>
+            <version>${project.version}</version>
+        </dependency>
+    </dependencies>
+</project>
diff --git 
a/runtime/plangenerator/src/main/java/edu/snu/nemo/runtime/plangenerator/TestPlanGenerator.java
 
b/runtime/plangenerator/src/main/java/edu/snu/nemo/runtime/plangenerator/TestPlanGenerator.java
new file mode 100644
index 00000000..d856dd05
--- /dev/null
+++ 
b/runtime/plangenerator/src/main/java/edu/snu/nemo/runtime/plangenerator/TestPlanGenerator.java
@@ -0,0 +1,194 @@
+/*
+ * Copyright (C) 2017 Seoul National University
+ *
+ * Licensed under the Apache License, Version 2.0 (the "License");
+ * you may not use this file except in compliance with the License.
+ * You may obtain a copy of the License at
+ *
+ *         http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package edu.snu.nemo.runtime.plangenerator;
+
+import edu.snu.nemo.common.coder.Coder;
+import edu.snu.nemo.common.dag.DAG;
+import edu.snu.nemo.common.dag.DAGBuilder;
+import edu.snu.nemo.common.ir.edge.IREdge;
+import 
edu.snu.nemo.common.ir.edge.executionproperty.DataCommunicationPatternProperty;
+import edu.snu.nemo.common.ir.vertex.IRVertex;
+import edu.snu.nemo.common.ir.vertex.OperatorVertex;
+import 
edu.snu.nemo.common.ir.vertex.executionproperty.ExecutorPlacementProperty;
+import edu.snu.nemo.common.ir.vertex.executionproperty.ParallelismProperty;
+import edu.snu.nemo.common.ir.vertex.transform.Transform;
+import edu.snu.nemo.compiler.optimizer.CompiletimeOptimizer;
+import edu.snu.nemo.compiler.optimizer.examples.EmptyComponents;
+import edu.snu.nemo.compiler.optimizer.policy.BasicPullPolicy;
+import edu.snu.nemo.compiler.optimizer.policy.BasicPushPolicy;
+import edu.snu.nemo.compiler.optimizer.policy.Policy;
+import edu.snu.nemo.conf.JobConf;
+import edu.snu.nemo.runtime.common.plan.physical.PhysicalPlan;
+import edu.snu.nemo.runtime.common.plan.physical.PhysicalPlanGenerator;
+import edu.snu.nemo.runtime.common.plan.physical.PhysicalStage;
+import edu.snu.nemo.runtime.common.plan.physical.PhysicalStageEdge;
+import org.apache.reef.tang.Injector;
+import org.apache.reef.tang.Tang;
+import org.apache.reef.tang.exceptions.InjectionException;
+
+/**
+ * Generates physical plans for testing purposes.
+ */
+public final class TestPlanGenerator {
+  private static final PhysicalPlanGenerator PLAN_GENERATOR;
+  private static final String EMPTY_DAG_DIRECTORY = "";
+
+  static {
+    final Injector injector = Tang.Factory.getTang().newInjector();
+    injector.bindVolatileParameter(JobConf.DAGDirectory.class, 
EMPTY_DAG_DIRECTORY);
+    try {
+      PLAN_GENERATOR = injector.getInstance(PhysicalPlanGenerator.class);
+    } catch (InjectionException e) {
+      throw new RuntimeException(e);
+    }
+  }
+
+  /**
+   * Type of the plan to generate.
+   */
+  public enum PlanType {
+    TwoVerticesJoined,
+    ThreeSequentialVertices,
+    ThreeSequentialVerticesWithDifferentContainerTypes
+  }
+
+  /**
+   * private constructor.
+   */
+  private TestPlanGenerator() {
+  }
+
+  /**
+   * @param planType type of the plan to generate.
+   * @param isPush whether to use the push policy.
+   * @return the generated plan.
+   * @throws Exception exception.
+   */
+  public static PhysicalPlan generatePhysicalPlan(final PlanType planType, 
final boolean isPush) throws Exception {
+    final Policy policy = isPush ? new BasicPushPolicy() : new 
BasicPullPolicy();
+    switch (planType) {
+      case TwoVerticesJoined:
+        return convertIRToPhysical(getTwoVerticesJoinedDAG(), policy);
+      case ThreeSequentialVertices:
+        return convertIRToPhysical(getThreeSequentialVerticesDAG(true), 
policy);
+      case ThreeSequentialVerticesWithDifferentContainerTypes:
+        return convertIRToPhysical(getThreeSequentialVerticesDAG(false), 
policy);
+      default:
+        throw new IllegalArgumentException(planType.toString());
+    }
+  }
+
+  /**
+   * @param irDAG irDAG.
+   * @param policy policy.
+   * @return convert an IR into a physical plan using the given policy.
+   * @throws Exception exception.
+   */
+  private static PhysicalPlan convertIRToPhysical(final DAG<IRVertex, IREdge> 
irDAG,
+                                                  final Policy policy) throws 
Exception {
+    final DAG<IRVertex, IREdge> optimized = 
CompiletimeOptimizer.optimize(irDAG, policy, EMPTY_DAG_DIRECTORY);
+    final DAG<PhysicalStage, PhysicalStageEdge> physicalDAG = 
optimized.convert(PLAN_GENERATOR);
+    return new PhysicalPlan("Plan", physicalDAG, 
PLAN_GENERATOR.getTaskIRVertexMap());
+  }
+
+  /**
+   * @return a dag that joins two vertices.
+   */
+  private static DAG<IRVertex, IREdge> getTwoVerticesJoinedDAG() {
+    final DAGBuilder<IRVertex, IREdge> dagBuilder = new DAGBuilder<>();
+
+    final Transform t = new EmptyComponents.EmptyTransform("empty");
+    final IRVertex v1 = new OperatorVertex(t);
+    v1.setProperty(ParallelismProperty.of(3));
+    
v1.setProperty(ExecutorPlacementProperty.of(ExecutorPlacementProperty.COMPUTE));
+    dagBuilder.addVertex(v1);
+
+    final IRVertex v2 = new OperatorVertex(t);
+    v2.setProperty(ParallelismProperty.of(2));
+    
v2.setProperty(ExecutorPlacementProperty.of(ExecutorPlacementProperty.COMPUTE));
+    dagBuilder.addVertex(v2);
+
+    final IRVertex v3 = new OperatorVertex(t);
+    v3.setProperty(ParallelismProperty.of(3));
+    
v3.setProperty(ExecutorPlacementProperty.of(ExecutorPlacementProperty.COMPUTE));
+    dagBuilder.addVertex(v3);
+
+    final IRVertex v4 = new OperatorVertex(t);
+    v4.setProperty(ParallelismProperty.of(2));
+    
v4.setProperty(ExecutorPlacementProperty.of(ExecutorPlacementProperty.COMPUTE));
+    dagBuilder.addVertex(v4);
+
+    final IRVertex v5 = new OperatorVertex(t);
+    v5.setProperty(ParallelismProperty.of(2));
+    
v5.setProperty(ExecutorPlacementProperty.of(ExecutorPlacementProperty.COMPUTE));
+    dagBuilder.addVertex(v5);
+
+    final IREdge e1 = new 
IREdge(DataCommunicationPatternProperty.Value.Shuffle, v1, v2, 
Coder.DUMMY_CODER);
+    dagBuilder.connectVertices(e1);
+
+    final IREdge e2 = new 
IREdge(DataCommunicationPatternProperty.Value.Shuffle, v3, v2, 
Coder.DUMMY_CODER);
+    dagBuilder.connectVertices(e2);
+
+    final IREdge e3 = new 
IREdge(DataCommunicationPatternProperty.Value.Shuffle, v2, v4, 
Coder.DUMMY_CODER);
+    dagBuilder.connectVertices(e3);
+
+    final IREdge e4 = new 
IREdge(DataCommunicationPatternProperty.Value.OneToOne, v4, v5, 
Coder.DUMMY_CODER);
+    dagBuilder.connectVertices(e4);
+
+    return dagBuilder.buildWithoutSourceSinkCheck();
+  }
+
+  /**
+   * @param sameContainerType whether all three vertices are of the same 
container type
+   * @return a dag with 3 sequential vertices.
+   */
+  private static DAG<IRVertex, IREdge> getThreeSequentialVerticesDAG(final 
boolean sameContainerType) {
+    final DAGBuilder<IRVertex, IREdge> dagBuilder = new DAGBuilder<>();
+
+    final Transform t = new EmptyComponents.EmptyTransform("empty");
+    final IRVertex v1 = new OperatorVertex(t);
+    v1.setProperty(ParallelismProperty.of(3));
+    
v1.setProperty(ExecutorPlacementProperty.of(ExecutorPlacementProperty.COMPUTE));
+    dagBuilder.addVertex(v1);
+
+    final IRVertex v2 = new OperatorVertex(t);
+    v2.setProperty(ParallelismProperty.of(2));
+    if (sameContainerType) {
+      
v2.setProperty(ExecutorPlacementProperty.of(ExecutorPlacementProperty.COMPUTE));
+    } else {
+      
v2.setProperty(ExecutorPlacementProperty.of(ExecutorPlacementProperty.TRANSIENT));
+    }
+    dagBuilder.addVertex(v2);
+
+    final IRVertex v3 = new OperatorVertex(t);
+    v3.setProperty(ParallelismProperty.of(2));
+    if (sameContainerType) {
+      
v3.setProperty(ExecutorPlacementProperty.of(ExecutorPlacementProperty.COMPUTE));
+    } else {
+      
v3.setProperty(ExecutorPlacementProperty.of(ExecutorPlacementProperty.TRANSIENT));
+    }
+    dagBuilder.addVertex(v3);
+
+    final IREdge e1 = new 
IREdge(DataCommunicationPatternProperty.Value.Shuffle, v1, v2, 
Coder.DUMMY_CODER);
+    dagBuilder.connectVertices(e1);
+
+    final IREdge e2 = new 
IREdge(DataCommunicationPatternProperty.Value.OneToOne, v2, v3, 
Coder.DUMMY_CODER);
+    dagBuilder.connectVertices(e2);
+
+    return dagBuilder.buildWithoutSourceSinkCheck();
+  }
+}
+
diff --git 
a/tests/src/test/java/edu/snu/nemo/tests/runtime/RuntimeTestUtil.java 
b/tests/src/test/java/edu/snu/nemo/tests/runtime/RuntimeTestUtil.java
index 49d5d4dc..fbe00d27 100644
--- a/tests/src/test/java/edu/snu/nemo/tests/runtime/RuntimeTestUtil.java
+++ b/tests/src/test/java/edu/snu/nemo/tests/runtime/RuntimeTestUtil.java
@@ -15,15 +15,6 @@
  */
 package edu.snu.nemo.tests.runtime;
 
-import edu.snu.nemo.runtime.common.plan.physical.*;
-import edu.snu.nemo.runtime.common.state.StageState;
-import edu.snu.nemo.runtime.common.state.TaskGroupState;
-import edu.snu.nemo.runtime.master.JobStateManager;
-import edu.snu.nemo.runtime.master.scheduler.ExecutorRegistry;
-import edu.snu.nemo.runtime.master.resource.ExecutorRepresenter;
-import edu.snu.nemo.runtime.master.scheduler.PendingTaskGroupCollection;
-import edu.snu.nemo.runtime.master.scheduler.Scheduler;
-import edu.snu.nemo.runtime.master.scheduler.SchedulingPolicy;
 import org.apache.beam.sdk.values.KV;
 
 import java.util.*;
@@ -34,123 +25,6 @@
  * Utility class for runtime unit tests.
  */
 public final class RuntimeTestUtil {
-  /**
-   * Complete the stage by completing all of its TaskGroups.
-   * @param jobStateManager for the submitted job.
-   * @param scheduler for the submitted job.
-   * @param executorRegistry provides executor representers
-   * @param physicalStage for which the states should be marked as complete.
-   */
-  public static void completeStage(final JobStateManager jobStateManager,
-                                   final Scheduler scheduler,
-                                   final ExecutorRegistry executorRegistry,
-                                   final PhysicalStage physicalStage,
-                                   final int attemptIdx) {
-    // Loop until the stage completes.
-    while (true) {
-      final Enum stageState = 
jobStateManager.getStageState(physicalStage.getId()).getStateMachine().getCurrentState();
-      if (StageState.State.COMPLETE == stageState) {
-        // Stage has completed, so we break out of the loop.
-        break;
-      } else if (StageState.State.EXECUTING == stageState) {
-        physicalStage.getTaskGroupIds().forEach(taskGroupId -> {
-          final Enum tgState = 
jobStateManager.getTaskGroupState(taskGroupId).getStateMachine().getCurrentState();
-          if (TaskGroupState.State.EXECUTING == tgState) {
-            sendTaskGroupStateEventToScheduler(scheduler, executorRegistry, 
taskGroupId,
-                TaskGroupState.State.COMPLETE, attemptIdx, null);
-          } else if (TaskGroupState.State.READY == tgState || 
TaskGroupState.State.COMPLETE == tgState) {
-            // Skip READY (try in the next loop and see if it becomes 
EXECUTING) and COMPLETE.
-          } else {
-            throw new IllegalStateException(tgState.toString());
-          }
-        });
-      } else if (StageState.State.READY == stageState) {
-        // Skip and retry in the next loop.
-      } else {
-        throw new IllegalStateException(stageState.toString());
-      }
-    }
-  }
-
-  /**
-   * Sends task group state change event to scheduler.
-   * This replaces executor's task group completion messages for testing 
purposes.
-   * @param scheduler for the submitted job.
-   * @param executorRegistry provides executor representers
-   * @param taskGroupId for the task group to change the state.
-   * @param newState for the task group.
-   * @param cause in the case of a recoverable failure.
-   */
-  public static void sendTaskGroupStateEventToScheduler(final Scheduler 
scheduler,
-                                                        final ExecutorRegistry 
executorRegistry,
-                                                        final String 
taskGroupId,
-                                                        final 
TaskGroupState.State newState,
-                                                        final int attemptIdx,
-                                                        final 
TaskGroupState.RecoverableFailureCause cause) {
-    ExecutorRepresenter scheduledExecutor;
-    do {
-      scheduledExecutor = findExecutorForTaskGroup(executorRegistry, 
taskGroupId);
-    } while (scheduledExecutor == null);
-
-    scheduler.onTaskGroupStateChanged(scheduledExecutor.getExecutorId(), 
taskGroupId,
-        newState, attemptIdx, null, cause);
-  }
-
-  public static void sendTaskGroupStateEventToScheduler(final Scheduler 
scheduler,
-                                                        final ExecutorRegistry 
executorRegistry,
-                                                        final String 
taskGroupId,
-                                                        final 
TaskGroupState.State newState,
-                                                        final int attemptIdx) {
-    sendTaskGroupStateEventToScheduler(scheduler, executorRegistry, 
taskGroupId, newState, attemptIdx, null);
-  }
-
-  public static void mockSchedulerRunner(final PendingTaskGroupCollection 
pendingTaskGroupCollection,
-                                         final SchedulingPolicy 
schedulingPolicy,
-                                         final JobStateManager jobStateManager,
-                                         final ExecutorRegistry 
executorRegistry,
-                                         final boolean isPartialSchedule) {
-    while (!pendingTaskGroupCollection.isEmpty()) {
-      final ScheduledTaskGroup taskGroupToSchedule = 
pendingTaskGroupCollection.remove(
-          
pendingTaskGroupCollection.peekSchedulableTaskGroups().get().iterator().next().getTaskGroupId());
-
-      final Set<ExecutorRepresenter> runningExecutorRepresenter =
-          executorRegistry.getRunningExecutorIds().stream()
-              .map(executorId -> 
executorRegistry.getExecutorRepresenter(executorId))
-              .collect(Collectors.toSet());
-      final Set<ExecutorRepresenter> candidateExecutors =
-          
schedulingPolicy.filterExecutorRepresenters(runningExecutorRepresenter, 
taskGroupToSchedule);
-      if (candidateExecutors.size() > 0) {
-        
jobStateManager.onTaskGroupStateChanged(taskGroupToSchedule.getTaskGroupId(),
-            TaskGroupState.State.EXECUTING);
-        final ExecutorRepresenter executor = 
candidateExecutors.stream().findFirst().get();
-        executor.onTaskGroupScheduled(taskGroupToSchedule);
-      }
-
-      // Schedule only the first task group.
-      if (isPartialSchedule) {
-        break;
-      }
-    }
-  }
-
-  /**
-   * Retrieves the executor to which the given task group was scheduled.
-   * @param taskGroupId of the task group to search.
-   * @param executorRegistry provides executor representers
-   * @return the {@link ExecutorRepresenter} of the executor the task group 
was scheduled to.
-   */
-  private static ExecutorRepresenter findExecutorForTaskGroup(final 
ExecutorRegistry executorRegistry,
-                                                              final String 
taskGroupId) {
-    for (final String executorId : executorRegistry.getRunningExecutorIds()) {
-      final ExecutorRepresenter executor = 
executorRegistry.getRunningExecutorRepresenter(executorId);
-      if (executor.getRunningTaskGroups().contains(taskGroupId)
-          || executor.getCompleteTaskGroups().contains(taskGroupId)) {
-        return executor;
-      }
-    }
-    return null;
-  }
-
   /**
    * Gets a list of integer pair elements in range.
    * @param start value of the range (inclusive).


 

----------------------------------------------------------------
This is an automated message from the Apache Git Service.
To respond to the message, please log on GitHub and use the
URL above to go to the specific comment.
 
For queries about this service, please contact Infrastructure at:
[email protected]


With regards,
Apache Git Services

Reply via email to