wonook closed pull request #57: [NEMO-73,75] SchedulingPolicy as Vertex-level 
Execution Property
URL: https://github.com/apache/incubator-nemo/pull/57
 
 
   

This is a PR merged from a forked repository.
As GitHub hides the original diff on merge, it is displayed below for
the sake of provenance:

As this is a foreign pull request (from a fork), the diff is supplied
below (as it won't show otherwise due to GitHub magic):

diff --git 
a/common/src/main/java/edu/snu/nemo/common/ir/executionproperty/AssociatedProperty.java
 
b/common/src/main/java/edu/snu/nemo/common/ir/executionproperty/AssociatedProperty.java
new file mode 100644
index 00000000..ceebd861
--- /dev/null
+++ 
b/common/src/main/java/edu/snu/nemo/common/ir/executionproperty/AssociatedProperty.java
@@ -0,0 +1,28 @@
+/*
+ * Copyright (C) 2018 Seoul National University
+ *
+ * Licensed under the Apache License, Version 2.0 (the "License");
+ * you may not use this file except in compliance with the License.
+ * You may obtain a copy of the License at
+ *
+ *         http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package edu.snu.nemo.common.ir.executionproperty;
+
+import java.lang.annotation.*;
+
+/**
+ * Declares associated {@link ExecutionProperty} for implementations.
+ */
+@Target({ElementType.TYPE})
+@Documented
+@Retention(RetentionPolicy.RUNTIME)
+public @interface AssociatedProperty {
+  Class<? extends ExecutionProperty> value();
+}
diff --git 
a/common/src/main/java/edu/snu/nemo/common/ir/vertex/executionproperty/ExecutorSlotComplianceProperty.java
 
b/common/src/main/java/edu/snu/nemo/common/ir/vertex/executionproperty/ExecutorSlotComplianceProperty.java
new file mode 100644
index 00000000..357be21d
--- /dev/null
+++ 
b/common/src/main/java/edu/snu/nemo/common/ir/vertex/executionproperty/ExecutorSlotComplianceProperty.java
@@ -0,0 +1,46 @@
+/*
+ * Copyright (C) 2018 Seoul National University
+ *
+ * Licensed under the Apache License, Version 2.0 (the "License");
+ * you may not use this file except in compliance with the License.
+ * You may obtain a copy of the License at
+ *
+ *         http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package edu.snu.nemo.common.ir.vertex.executionproperty;
+
+import edu.snu.nemo.common.ir.executionproperty.VertexExecutionProperty;
+
+/**
+ * This property decides whether or not to comply to slot restrictions when 
scheduling this vertex.
+ */
+public final class ExecutorSlotComplianceProperty extends 
VertexExecutionProperty<Boolean> {
+  private static final ExecutorSlotComplianceProperty COMPLIANCE_TRUE = new 
ExecutorSlotComplianceProperty(true);
+  private static final ExecutorSlotComplianceProperty COMPLIANCE_FALSE
+      = new ExecutorSlotComplianceProperty(false);
+
+  /**
+   * Default constructor.
+   *
+   * @param value value of the ExecutionProperty
+   */
+  private ExecutorSlotComplianceProperty(final boolean value) {
+    super(value);
+  }
+
+  /**
+   * Static method getting execution property.
+   *
+   * @param value value of the new execution property
+   * @return the execution property
+   */
+  public static ExecutorSlotComplianceProperty of(final boolean value) {
+    return value ? COMPLIANCE_TRUE : COMPLIANCE_FALSE;
+  }
+}
diff --git 
a/common/src/main/java/edu/snu/nemo/common/ir/vertex/executionproperty/ScheduleGroupIndexProperty.java
 
b/common/src/main/java/edu/snu/nemo/common/ir/vertex/executionproperty/ScheduleGroupProperty.java
similarity index 76%
rename from 
common/src/main/java/edu/snu/nemo/common/ir/vertex/executionproperty/ScheduleGroupIndexProperty.java
rename to 
common/src/main/java/edu/snu/nemo/common/ir/vertex/executionproperty/ScheduleGroupProperty.java
index 08518ff9..3b521cbb 100644
--- 
a/common/src/main/java/edu/snu/nemo/common/ir/vertex/executionproperty/ScheduleGroupIndexProperty.java
+++ 
b/common/src/main/java/edu/snu/nemo/common/ir/vertex/executionproperty/ScheduleGroupProperty.java
@@ -18,14 +18,14 @@
 import edu.snu.nemo.common.ir.executionproperty.VertexExecutionProperty;
 
 /**
- * ScheduleGroupIndex ExecutionProperty.
+ * ScheduleGroup ExecutionProperty.
  */
-public final class ScheduleGroupIndexProperty extends 
VertexExecutionProperty<Integer> {
+public final class ScheduleGroupProperty extends 
VertexExecutionProperty<Integer> {
   /**
    * Constructor.
    * @param value value of the execution property.
    */
-  private ScheduleGroupIndexProperty(final Integer value) {
+  private ScheduleGroupProperty(final Integer value) {
     super(value);
   }
 
@@ -34,7 +34,7 @@ private ScheduleGroupIndexProperty(final Integer value) {
    * @param value value of the new execution property.
    * @return the newly created execution property.
    */
-  public static ScheduleGroupIndexProperty of(final Integer value) {
-    return new ScheduleGroupIndexProperty(value);
+  public static ScheduleGroupProperty of(final Integer value) {
+    return new ScheduleGroupProperty(value);
   }
 }
diff --git 
a/common/src/main/java/edu/snu/nemo/common/ir/vertex/executionproperty/SourceLocationAwareSchedulingProperty.java
 
b/common/src/main/java/edu/snu/nemo/common/ir/vertex/executionproperty/SourceLocationAwareSchedulingProperty.java
new file mode 100644
index 00000000..cad987da
--- /dev/null
+++ 
b/common/src/main/java/edu/snu/nemo/common/ir/vertex/executionproperty/SourceLocationAwareSchedulingProperty.java
@@ -0,0 +1,47 @@
+/*
+ * Copyright (C) 2018 Seoul National University
+ *
+ * Licensed under the Apache License, Version 2.0 (the "License");
+ * you may not use this file except in compliance with the License.
+ * You may obtain a copy of the License at
+ *
+ *         http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package edu.snu.nemo.common.ir.vertex.executionproperty;
+
+import edu.snu.nemo.common.ir.executionproperty.VertexExecutionProperty;
+
+/**
+ * This property decides whether or not to schedule this vertex only on 
executors where source data reside.
+ */
+public final class SourceLocationAwareSchedulingProperty extends 
VertexExecutionProperty<Boolean> {
+  private static final SourceLocationAwareSchedulingProperty SOURCE_TRUE
+      = new SourceLocationAwareSchedulingProperty(true);
+  private static final SourceLocationAwareSchedulingProperty SOURCE_FALSE
+      = new SourceLocationAwareSchedulingProperty(false);
+
+  /**
+   * Default constructor.
+   *
+   * @param value value of the ExecutionProperty
+   */
+  private SourceLocationAwareSchedulingProperty(final boolean value) {
+    super(value);
+  }
+
+  /**
+   * Static method getting execution property.
+   *
+   * @param value value of the new execution property
+   * @return the execution property
+   */
+  public static SourceLocationAwareSchedulingProperty of(final boolean value) {
+    return value ? SOURCE_TRUE : SOURCE_FALSE;
+  }
+}
diff --git 
a/compiler/optimizer/src/main/java/edu/snu/nemo/compiler/optimizer/pass/compiletime/annotating/DefaultScheduleGroupPass.java
 
b/compiler/optimizer/src/main/java/edu/snu/nemo/compiler/optimizer/pass/compiletime/annotating/DefaultScheduleGroupPass.java
index 38f964cb..ca788bdf 100644
--- 
a/compiler/optimizer/src/main/java/edu/snu/nemo/compiler/optimizer/pass/compiletime/annotating/DefaultScheduleGroupPass.java
+++ 
b/compiler/optimizer/src/main/java/edu/snu/nemo/compiler/optimizer/pass/compiletime/annotating/DefaultScheduleGroupPass.java
@@ -23,7 +23,7 @@
 import 
edu.snu.nemo.common.ir.edge.executionproperty.DataCommunicationPatternProperty;
 import edu.snu.nemo.common.ir.vertex.IRVertex;
 import edu.snu.nemo.common.ir.edge.executionproperty.DataFlowModelProperty;
-import 
edu.snu.nemo.common.ir.vertex.executionproperty.ScheduleGroupIndexProperty;
+import edu.snu.nemo.common.ir.vertex.executionproperty.ScheduleGroupProperty;
 import org.apache.commons.lang3.mutable.MutableInt;
 
 import java.util.*;
@@ -71,7 +71,7 @@ public DefaultScheduleGroupPass() {
   public DefaultScheduleGroupPass(final boolean 
allowBroadcastWithinScheduleGroup,
                                   final boolean 
allowShuffleWithinScheduleGroup,
                                   final boolean 
allowMultipleInEdgesWithinScheduleGroup) {
-    super(ScheduleGroupIndexProperty.class, Stream.of(
+    super(ScheduleGroupProperty.class, Stream.of(
         DataCommunicationPatternProperty.class,
         DataFlowModelProperty.class
     ).collect(Collectors.toSet()));
@@ -93,7 +93,7 @@ public DefaultScheduleGroupPass(final boolean 
allowBroadcastWithinScheduleGroup,
         newScheduleGroup.vertices.add(irVertex);
         irVertexToScheduleGroupMap.put(irVertex, newScheduleGroup);
       }
-      // Get scheduleGroupIndex
+      // Get scheduleGroup
       final ScheduleGroup scheduleGroup = 
irVertexToScheduleGroupMap.get(irVertex);
       if (scheduleGroup == null) {
         throw new RuntimeException(String.format("ScheduleGroup must be set 
for %s", irVertex));
@@ -203,42 +203,42 @@ public DefaultScheduleGroupPass(final boolean 
allowBroadcastWithinScheduleGroup,
       }
     });
 
-    // Assign ScheduleGroupIndex property based on topology of ScheduleGroups
-    final MutableInt currentScheduleGroupIndex = new 
MutableInt(getNextScheudleGroupIndex(dag.getVertices()));
+    // Assign ScheduleGroup property based on topology of ScheduleGroups
+    final MutableInt currentScheduleGroup = new 
MutableInt(getNextScheudleGroup(dag.getVertices()));
     final DAGBuilder<ScheduleGroup, ScheduleGroupEdge> scheduleGroupDAGBuilder 
= new DAGBuilder<>();
     scheduleGroups.forEach(scheduleGroupDAGBuilder::addVertex);
     scheduleGroups.forEach(src -> src.scheduleGroupsTo
         .forEach(dst -> scheduleGroupDAGBuilder.connectVertices(new 
ScheduleGroupEdge(src, dst))));
     scheduleGroupDAGBuilder.build().topologicalDo(scheduleGroup -> {
-      boolean usedCurrentIndex = false;
+      boolean usedCurrentScheduleGroup = false;
       for (final IRVertex irVertex : scheduleGroup.vertices) {
-        if 
(!irVertex.getPropertyValue(ScheduleGroupIndexProperty.class).isPresent()) {
-          
irVertex.getExecutionProperties().put(ScheduleGroupIndexProperty.of(currentScheduleGroupIndex.getValue()));
-          usedCurrentIndex = true;
+        if 
(!irVertex.getPropertyValue(ScheduleGroupProperty.class).isPresent()) {
+          
irVertex.getExecutionProperties().put(ScheduleGroupProperty.of(currentScheduleGroup.getValue()));
+          usedCurrentScheduleGroup = true;
         }
       }
-      if (usedCurrentIndex) {
-        currentScheduleGroupIndex.increment();
+      if (usedCurrentScheduleGroup) {
+        currentScheduleGroup.increment();
       }
     });
     return dag;
   }
 
   /**
-   * Determines the range of {@link ScheduleGroupIndexProperty} value that 
will prevent collision
-   * with the existing {@link ScheduleGroupIndexProperty}.
+   * Determines the range of {@link ScheduleGroupProperty} value that will 
prevent collision
+   * with the existing {@link ScheduleGroupProperty}.
    * @param irVertexCollection collection of {@link IRVertex}
-   * @return the minimum value for the {@link ScheduleGroupIndexProperty} that 
won't collide with the existing values
+   * @return the minimum value for the {@link ScheduleGroupProperty} that 
won't collide with the existing values
    */
-  private int getNextScheudleGroupIndex(final Collection<IRVertex> 
irVertexCollection) {
-    int nextScheduleGroupIndex = 0;
+  private int getNextScheudleGroup(final Collection<IRVertex> 
irVertexCollection) {
+    int nextScheduleGroup = 0;
     for (final IRVertex irVertex : irVertexCollection) {
-      final Optional<Integer> scheduleGroupIndex = 
irVertex.getPropertyValue(ScheduleGroupIndexProperty.class);
-      if (scheduleGroupIndex.isPresent()) {
-        nextScheduleGroupIndex = Math.max(scheduleGroupIndex.get() + 1, 
nextScheduleGroupIndex);
+      final Optional<Integer> scheduleGroup = 
irVertex.getPropertyValue(ScheduleGroupProperty.class);
+      if (scheduleGroup.isPresent()) {
+        nextScheduleGroup = Math.max(scheduleGroup.get() + 1, 
nextScheduleGroup);
       }
     }
-    return nextScheduleGroupIndex;
+    return nextScheduleGroup;
   }
 
   /**
diff --git 
a/compiler/optimizer/src/main/java/edu/snu/nemo/compiler/optimizer/pass/compiletime/annotating/ExecutorSlotCompliancePass.java
 
b/compiler/optimizer/src/main/java/edu/snu/nemo/compiler/optimizer/pass/compiletime/annotating/ExecutorSlotCompliancePass.java
new file mode 100644
index 00000000..e1b2123d
--- /dev/null
+++ 
b/compiler/optimizer/src/main/java/edu/snu/nemo/compiler/optimizer/pass/compiletime/annotating/ExecutorSlotCompliancePass.java
@@ -0,0 +1,40 @@
+/*
+ * Copyright (C) 2018 Seoul National University
+ *
+ * Licensed under the Apache License, Version 2.0 (the "License");
+ * you may not use this file except in compliance with the License.
+ * You may obtain a copy of the License at
+ *
+ *         http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package edu.snu.nemo.compiler.optimizer.pass.compiletime.annotating;
+
+import edu.snu.nemo.common.dag.DAG;
+import edu.snu.nemo.common.ir.edge.IREdge;
+import edu.snu.nemo.common.ir.vertex.IRVertex;
+import 
edu.snu.nemo.common.ir.vertex.executionproperty.ExecutorSlotComplianceProperty;
+
+/**
+ * Sets {@link ExecutorSlotComplianceProperty}.
+ */
+public final class ExecutorSlotCompliancePass extends AnnotatingPass {
+
+  public ExecutorSlotCompliancePass() {
+    super(ExecutorSlotComplianceProperty.class);
+  }
+
+  @Override
+  public DAG<IRVertex, IREdge> apply(final DAG<IRVertex, IREdge> dag) {
+    // On every vertex, if ExecutorSlotComplianceProperty is not set, put it 
as true.
+    dag.getVertices().stream()
+        .filter(v -> 
!v.getExecutionProperties().containsKey(ExecutorSlotComplianceProperty.class))
+        .forEach(v -> 
v.getExecutionProperties().put(ExecutorSlotComplianceProperty.of(true)));
+    return dag;
+  }
+}
diff --git 
a/compiler/optimizer/src/main/java/edu/snu/nemo/compiler/optimizer/pass/compiletime/annotating/SourceLocationAwareSchedulingPass.java
 
b/compiler/optimizer/src/main/java/edu/snu/nemo/compiler/optimizer/pass/compiletime/annotating/SourceLocationAwareSchedulingPass.java
new file mode 100644
index 00000000..8c30d95d
--- /dev/null
+++ 
b/compiler/optimizer/src/main/java/edu/snu/nemo/compiler/optimizer/pass/compiletime/annotating/SourceLocationAwareSchedulingPass.java
@@ -0,0 +1,40 @@
+/*
+ * Copyright (C) 2018 Seoul National University
+ *
+ * Licensed under the Apache License, Version 2.0 (the "License");
+ * you may not use this file except in compliance with the License.
+ * You may obtain a copy of the License at
+ *
+ *         http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package edu.snu.nemo.compiler.optimizer.pass.compiletime.annotating;
+
+import edu.snu.nemo.common.dag.DAG;
+import edu.snu.nemo.common.ir.edge.IREdge;
+import edu.snu.nemo.common.ir.vertex.IRVertex;
+import 
edu.snu.nemo.common.ir.vertex.executionproperty.SourceLocationAwareSchedulingProperty;
+
+/**
+ * Sets {@link SourceLocationAwareSchedulingProperty}.
+ */
+public final class SourceLocationAwareSchedulingPass extends AnnotatingPass {
+
+  public SourceLocationAwareSchedulingPass() {
+    super(SourceLocationAwareSchedulingProperty.class);
+  }
+
+  @Override
+  public DAG<IRVertex, IREdge> apply(final DAG<IRVertex, IREdge> dag) {
+    // On every vertex, if SourceLocationAwareSchedulingProperty is not set, 
put it as true.
+    dag.getVertices().stream()
+        .filter(v -> 
!v.getExecutionProperties().containsKey(SourceLocationAwareSchedulingProperty.class))
+        .forEach(v -> 
v.getExecutionProperties().put(SourceLocationAwareSchedulingProperty.of(true)));
+    return dag;
+  }
+}
diff --git 
a/compiler/optimizer/src/main/java/edu/snu/nemo/compiler/optimizer/pass/compiletime/composite/PrimitiveCompositePass.java
 
b/compiler/optimizer/src/main/java/edu/snu/nemo/compiler/optimizer/pass/compiletime/composite/PrimitiveCompositePass.java
index 3a5c80c9..a30abec1 100644
--- 
a/compiler/optimizer/src/main/java/edu/snu/nemo/compiler/optimizer/pass/compiletime/composite/PrimitiveCompositePass.java
+++ 
b/compiler/optimizer/src/main/java/edu/snu/nemo/compiler/optimizer/pass/compiletime/composite/PrimitiveCompositePass.java
@@ -37,7 +37,9 @@ public PrimitiveCompositePass() {
         new DefaultEdgeUsedDataHandlingPass(),
         new DefaultScheduleGroupPass(),
         new CompressionPass(),
-        new DecompressionPass()
+        new DecompressionPass(),
+        new SourceLocationAwareSchedulingPass(),
+        new ExecutorSlotCompliancePass()
     ));
   }
 }
diff --git 
a/runtime/common/src/main/java/edu/snu/nemo/runtime/common/plan/PhysicalPlanGenerator.java
 
b/runtime/common/src/main/java/edu/snu/nemo/runtime/common/plan/PhysicalPlanGenerator.java
index 107a6231..b9b97959 100644
--- 
a/runtime/common/src/main/java/edu/snu/nemo/runtime/common/plan/PhysicalPlanGenerator.java
+++ 
b/runtime/common/src/main/java/edu/snu/nemo/runtime/common/plan/PhysicalPlanGenerator.java
@@ -24,7 +24,7 @@
 import edu.snu.nemo.common.ir.vertex.*;
 import 
edu.snu.nemo.common.ir.vertex.executionproperty.DynamicOptimizationProperty;
 import edu.snu.nemo.common.ir.vertex.executionproperty.ParallelismProperty;
-import 
edu.snu.nemo.common.ir.vertex.executionproperty.ScheduleGroupIndexProperty;
+import edu.snu.nemo.common.ir.vertex.executionproperty.ScheduleGroupProperty;
 import edu.snu.nemo.conf.JobConf;
 import edu.snu.nemo.common.dag.DAG;
 import edu.snu.nemo.common.dag.DAGBuilder;
@@ -226,8 +226,8 @@ private void handleDuplicateEdgeGroupProperty(final 
DAG<Stage, StageEdge> dagOfS
   private void integrityCheck(final Stage stage) {
     stage.getPropertyValue(ParallelismProperty.class)
         .orElseThrow(() -> new RuntimeException("Parallelism property must be 
set for Stage"));
-    stage.getPropertyValue(ScheduleGroupIndexProperty.class)
-        .orElseThrow(() -> new RuntimeException("ScheduleGroupIndex property 
must be set for Stage"));
+    stage.getPropertyValue(ScheduleGroupProperty.class)
+        .orElseThrow(() -> new RuntimeException("ScheduleGroup property must 
be set for Stage"));
 
     stage.getIRDAG().getVertices().forEach(irVertex -> {
       // Check vertex type.
@@ -241,24 +241,24 @@ private void integrityCheck(final Stage stage) {
 
   /**
    * Split ScheduleGroups by Pull {@link StageEdge}s, and ensure topological 
ordering of
-   * {@link ScheduleGroupIndexProperty}.
+   * {@link ScheduleGroupProperty}.
    *
    * @param dag {@link DAG} of {@link Stage}s to manipulate
    */
   private void splitScheduleGroupByPullStageEdges(final DAG<Stage, StageEdge> 
dag) {
-    final MutableInt nextScheduleGroupIndex = new MutableInt(0);
-    final Map<Stage, Integer> stageToScheduleGroupIndexMap = new HashMap<>();
+    final MutableInt nextScheduleGroup = new MutableInt(0);
+    final Map<Stage, Integer> stageToScheduleGroupMap = new HashMap<>();
 
     dag.topologicalDo(currentStage -> {
-      // Base case: assign New ScheduleGroupIndex of the Stage
-      stageToScheduleGroupIndexMap.computeIfAbsent(currentStage, s -> 
getAndIncrement(nextScheduleGroupIndex));
+      // Base case: assign New ScheduleGroup of the Stage
+      stageToScheduleGroupMap.computeIfAbsent(currentStage, s -> 
getAndIncrement(nextScheduleGroup));
 
       for (final StageEdge stageEdgeFromCurrentStage : 
dag.getOutgoingEdgesOf(currentStage)) {
         final Stage destination = stageEdgeFromCurrentStage.getDst();
-        // Skip if some Stages that destination depends on do not have 
assigned new ScheduleGroupIndex
+        // Skip if some Stages that destination depends on do not have 
assigned new ScheduleGroup
         boolean skip = false;
         for (final StageEdge stageEdgeToDestination : 
dag.getIncomingEdgesOf(destination)) {
-          if 
(!stageToScheduleGroupIndexMap.containsKey(stageEdgeToDestination.getSrc())) {
+          if 
(!stageToScheduleGroupMap.containsKey(stageEdgeToDestination.getSrc())) {
             skip = true;
             break;
           }
@@ -266,42 +266,42 @@ private void splitScheduleGroupByPullStageEdges(final 
DAG<Stage, StageEdge> dag)
         if (skip) {
           continue;
         }
-        if (stageToScheduleGroupIndexMap.containsKey(destination)) {
+        if (stageToScheduleGroupMap.containsKey(destination)) {
           continue;
         }
 
         // Find any non-pull inEdge
-        Integer scheduleGroupIndex = null;
-        Integer newScheduleGroupIndex = null;
+        Integer scheduleGroup = null;
+        Integer newScheduleGroup = null;
         for (final StageEdge stageEdge : dag.getIncomingEdgesOf(destination)) {
           final Stage source = stageEdge.getSrc();
           if (stageEdge.getDataFlowModel() != 
DataFlowModelProperty.Value.Pull) {
-            if (scheduleGroupIndex != null && source.getScheduleGroupIndex() 
!= scheduleGroupIndex) {
+            if (scheduleGroup != null && source.getScheduleGroup() != 
scheduleGroup) {
               throw new RuntimeException(String.format("Multiple Push inEdges 
from different ScheduleGroup: %d, %d",
-                  scheduleGroupIndex, source.getScheduleGroupIndex()));
+                  scheduleGroup, source.getScheduleGroup()));
             }
-            if (source.getScheduleGroupIndex() != 
destination.getScheduleGroupIndex()) {
+            if (source.getScheduleGroup() != destination.getScheduleGroup()) {
               throw new RuntimeException(String.format("Split ScheduleGroup by 
push StageEdge: %d, %d",
-                  source.getScheduleGroupIndex(), 
destination.getScheduleGroupIndex()));
+                  source.getScheduleGroup(), destination.getScheduleGroup()));
             }
-            scheduleGroupIndex = source.getScheduleGroupIndex();
-            newScheduleGroupIndex = stageToScheduleGroupIndexMap.get(source);
+            scheduleGroup = source.getScheduleGroup();
+            newScheduleGroup = stageToScheduleGroupMap.get(source);
           }
         }
 
-        if (newScheduleGroupIndex == null) {
-          stageToScheduleGroupIndexMap.put(destination, 
getAndIncrement(nextScheduleGroupIndex));
+        if (newScheduleGroup == null) {
+          stageToScheduleGroupMap.put(destination, 
getAndIncrement(nextScheduleGroup));
         } else {
-          stageToScheduleGroupIndexMap.put(destination, newScheduleGroupIndex);
+          stageToScheduleGroupMap.put(destination, newScheduleGroup);
         }
       }
     });
 
     dag.topologicalDo(stage -> {
-      final int scheduleGroupIndex = stageToScheduleGroupIndexMap.get(stage);
-      
stage.getExecutionProperties().put(ScheduleGroupIndexProperty.of(scheduleGroupIndex));
+      final int scheduleGroup = stageToScheduleGroupMap.get(stage);
+      
stage.getExecutionProperties().put(ScheduleGroupProperty.of(scheduleGroup));
       stage.getIRDAG().topologicalDo(vertex -> vertex.getExecutionProperties()
-          .put(ScheduleGroupIndexProperty.of(scheduleGroupIndex)));
+          .put(ScheduleGroupProperty.of(scheduleGroup)));
     });
   }
 
diff --git 
a/runtime/common/src/main/java/edu/snu/nemo/runtime/common/plan/Stage.java 
b/runtime/common/src/main/java/edu/snu/nemo/runtime/common/plan/Stage.java
index 9e5da38b..c2abcc4e 100644
--- a/runtime/common/src/main/java/edu/snu/nemo/runtime/common/plan/Stage.java
+++ b/runtime/common/src/main/java/edu/snu/nemo/runtime/common/plan/Stage.java
@@ -22,7 +22,7 @@
 import edu.snu.nemo.common.ir.executionproperty.VertexExecutionProperty;
 import edu.snu.nemo.common.ir.vertex.IRVertex;
 import edu.snu.nemo.common.ir.vertex.executionproperty.ParallelismProperty;
-import 
edu.snu.nemo.common.ir.vertex.executionproperty.ScheduleGroupIndexProperty;
+import edu.snu.nemo.common.ir.vertex.executionproperty.ScheduleGroupProperty;
 import edu.snu.nemo.runtime.common.RuntimeIdGenerator;
 import org.apache.commons.lang3.SerializationUtils;
 
@@ -94,11 +94,11 @@ public int getParallelism() {
   }
 
   /**
-   * @return the schedule group index.
+   * @return the schedule group.
    */
-  public int getScheduleGroupIndex() {
-    return executionProperties.get(ScheduleGroupIndexProperty.class)
-        .orElseThrow(() -> new RuntimeException("ScheduleGroupIndex property 
must be set for Stage"));
+  public int getScheduleGroup() {
+    return executionProperties.get(ScheduleGroupProperty.class)
+        .orElseThrow(() -> new RuntimeException("ScheduleGroup property must 
be set for Stage"));
   }
 
   /**
@@ -130,7 +130,7 @@ public int getScheduleGroupIndex() {
   @Override
   public String propertiesToJSON() {
     final StringBuilder sb = new StringBuilder();
-    sb.append("{\"scheduleGroupIndex\": ").append(getScheduleGroupIndex());
+    sb.append("{\"scheduleGroup\": ").append(getScheduleGroup());
     sb.append(", \"irDag\": ").append(irDag);
     sb.append(", \"parallelism\": ").append(getParallelism());
     sb.append(", \"executionProperties\": ").append(executionProperties);
diff --git 
a/runtime/executor/src/test/java/edu/snu/nemo/runtime/executor/datatransfer/DataTransferTest.java
 
b/runtime/executor/src/test/java/edu/snu/nemo/runtime/executor/datatransfer/DataTransferTest.java
index 846bccab..c2919a32 100644
--- 
a/runtime/executor/src/test/java/edu/snu/nemo/runtime/executor/datatransfer/DataTransferTest.java
+++ 
b/runtime/executor/src/test/java/edu/snu/nemo/runtime/executor/datatransfer/DataTransferTest.java
@@ -23,7 +23,7 @@
 import edu.snu.nemo.common.ir.vertex.SourceVertex;
 import edu.snu.nemo.common.ir.vertex.IRVertex;
 import edu.snu.nemo.common.ir.vertex.executionproperty.ParallelismProperty;
-import 
edu.snu.nemo.common.ir.vertex.executionproperty.ScheduleGroupIndexProperty;
+import edu.snu.nemo.common.ir.vertex.executionproperty.ScheduleGroupProperty;
 import edu.snu.nemo.common.test.EmptyComponents;
 import edu.snu.nemo.conf.JobConf;
 import edu.snu.nemo.common.Pair;
@@ -131,9 +131,10 @@ public void setUp() throws InjectionException {
     final MetricMessageHandler metricMessageHandler = 
mock(MetricMessageHandler.class);
     final PubSubEventHandlerWrapper pubSubEventHandler = 
mock(PubSubEventHandlerWrapper.class);
     final UpdatePhysicalPlanEventHandler updatePhysicalPlanEventHandler = 
mock(UpdatePhysicalPlanEventHandler.class);
-    final SchedulingPolicy schedulingPolicy = 
injector.getInstance(CompositeSchedulingPolicy.class);
+    final SchedulingConstraintRegistry schedulingConstraint = 
injector.getInstance(SchedulingConstraintRegistry.class);
+    final SchedulingPolicy schedulingPolicy = 
injector.getInstance(SchedulingPolicy.class);
     final PendingTaskCollectionPointer taskQueue = new 
PendingTaskCollectionPointer();
-    final SchedulerRunner schedulerRunner = new 
SchedulerRunner(schedulingPolicy, taskQueue, executorRegistry);
+    final SchedulerRunner schedulerRunner = new 
SchedulerRunner(schedulingConstraint, schedulingPolicy, taskQueue, 
executorRegistry);
     final Scheduler scheduler = new BatchSingleJobScheduler(
         schedulerRunner, taskQueue, master, pubSubEventHandler, 
updatePhysicalPlanEventHandler, executorRegistry);
     final AtomicInteger executorCount = new AtomicInteger(0);
@@ -549,7 +550,7 @@ private Stage setupStages(final String stageId) {
 
     final ExecutionPropertyMap<VertexExecutionProperty> stageExecutionProperty 
= new ExecutionPropertyMap<>(stageId);
     stageExecutionProperty.put(ParallelismProperty.of(PARALLELISM_TEN));
-    stageExecutionProperty.put(ScheduleGroupIndexProperty.of(0));
+    stageExecutionProperty.put(ScheduleGroupProperty.of(0));
     return new Stage(stageId, emptyDag, stageExecutionProperty, 
Collections.emptyList());
   }
 }
diff --git 
a/runtime/master/src/main/java/edu/snu/nemo/runtime/master/scheduler/BatchSingleJobScheduler.java
 
b/runtime/master/src/main/java/edu/snu/nemo/runtime/master/scheduler/BatchSingleJobScheduler.java
index 4bd8d4fb..40094243 100644
--- 
a/runtime/master/src/main/java/edu/snu/nemo/runtime/master/scheduler/BatchSingleJobScheduler.java
+++ 
b/runtime/master/src/main/java/edu/snu/nemo/runtime/master/scheduler/BatchSingleJobScheduler.java
@@ -114,7 +114,7 @@ public void scheduleJob(final PhysicalPlan 
physicalPlanOfJob, final JobStateMana
     LOG.info("Job to schedule: {}", physicalPlanOfJob.getId());
 
     this.initialScheduleGroup = 
physicalPlanOfJob.getStageDAG().getVertices().stream()
-        .mapToInt(stage -> stage.getScheduleGroupIndex())
+        .mapToInt(stage -> stage.getScheduleGroup())
         .min().getAsInt();
 
     scheduleNextScheduleGroup(initialScheduleGroup);
@@ -213,7 +213,7 @@ public void onExecutorRemoved(final String executorId) {
       // Schedule a stage after marking the necessary tasks to 
failed_recoverable.
       // The stage for one of the tasks that failed is a starting point to look
       // for the next stage to be scheduled.
-      scheduleNextScheduleGroup(getSchedulingIndexOfStage(
+      scheduleNextScheduleGroup(getScheduleGroupOfStage(
           
RuntimeIdGenerator.getStageIdFromTaskId(tasksToReExecute.iterator().next())));
     }
   }
@@ -245,7 +245,7 @@ private void scheduleNextScheduleGroup(final int 
referenceIndex) {
 
   /**
    * Selects the next stage to schedule.
-   * It takes the referenceScheduleGroupIndex as a reference point to begin 
looking for the stages to execute:
+   * It takes the referenceScheduleGroup as a reference point to begin looking 
for the stages to execute:
    *
    * a) returns the failed_recoverable stage(s) of the earliest schedule 
group, if it(they) exists.
    * b) returns an empty optional if there are no schedulable stages at the 
moment.
@@ -253,15 +253,15 @@ private void scheduleNextScheduleGroup(final int 
referenceIndex) {
    *    - if an ancestor schedule group is still executing
    * c) returns the next set of schedulable stages (if the current schedule 
group has completed execution)
    *
-   * @param referenceScheduleGroupIndex
+   * @param referenceScheduleGroup
    *      the index of the schedule group that is executing/has executed when 
this method is called.
    * @return an optional of the (possibly empty) next schedulable stage
    */
-  private Optional<List<Stage>> selectNextScheduleGroupToSchedule(final int 
referenceScheduleGroupIndex) {
+  private Optional<List<Stage>> selectNextScheduleGroupToSchedule(final int 
referenceScheduleGroup) {
     // Recursively check the previous schedule group.
-    if (referenceScheduleGroupIndex > initialScheduleGroup) {
+    if (referenceScheduleGroup > initialScheduleGroup) {
       final Optional<List<Stage>> ancestorStagesFromAScheduleGroup =
-          selectNextScheduleGroupToSchedule(referenceScheduleGroupIndex - 1);
+          selectNextScheduleGroupToSchedule(referenceScheduleGroup - 1);
       if (ancestorStagesFromAScheduleGroup.isPresent()) {
         // Nothing to schedule from the previous schedule group.
         return ancestorStagesFromAScheduleGroup;
@@ -277,7 +277,7 @@ private void scheduleNextScheduleGroup(final int 
referenceIndex) {
     // All previous schedule groups are complete, we need to check for the 
current schedule group.
     final List<Stage> currentScheduleGroup = reverseTopoStages
         .stream()
-        .filter(stage -> stage.getScheduleGroupIndex() == 
referenceScheduleGroupIndex)
+        .filter(stage -> stage.getScheduleGroup() == referenceScheduleGroup)
         .collect(Collectors.toList());
     final boolean allStagesOfThisGroupComplete = currentScheduleGroup
         .stream()
@@ -286,7 +286,7 @@ private void scheduleNextScheduleGroup(final int 
referenceIndex) {
         .allMatch(state -> state.equals(StageState.State.COMPLETE));
 
     if (!allStagesOfThisGroupComplete) {
-      LOG.info("There are remaining stages in the current schedule group, {}", 
referenceScheduleGroupIndex);
+      LOG.info("There are remaining stages in the current schedule group, {}", 
referenceScheduleGroup);
       final List<Stage> stagesToSchedule = currentScheduleGroup
           .stream()
           .filter(stage -> {
@@ -304,7 +304,7 @@ private void scheduleNextScheduleGroup(final int 
referenceIndex) {
       final List<Stage> stagesToSchedule = reverseTopoStages
           .stream()
           .filter(stage -> {
-            if (stage.getScheduleGroupIndex() == referenceScheduleGroupIndex + 
1) {
+            if (stage.getScheduleGroup() == referenceScheduleGroup + 1) {
               final String stageId = stage.getId();
               return jobStateManager.getStageState(stageId) != 
StageState.State.EXECUTING
                   && jobStateManager.getStageState(stageId) != 
StageState.State.COMPLETE;
@@ -314,7 +314,7 @@ private void scheduleNextScheduleGroup(final int 
referenceIndex) {
           .collect(Collectors.toList());
 
       if (stagesToSchedule.isEmpty()) {
-        LOG.debug("ScheduleGroup {}: already executing/complete!, so we skip 
this", referenceScheduleGroupIndex + 1);
+        LOG.debug("ScheduleGroup {}: already executing/complete!, so we skip 
this", referenceScheduleGroup + 1);
         return Optional.empty();
       }
 
@@ -433,7 +433,7 @@ private void onTaskExecutionComplete(final String 
executorId,
     if 
(jobStateManager.getStageState(stageIdForTaskUponCompletion).equals(StageState.State.COMPLETE))
 {
       // if the stage this task belongs to is complete,
       if (!jobStateManager.isJobDone()) {
-        
scheduleNextScheduleGroup(getSchedulingIndexOfStage(stageIdForTaskUponCompletion));
+        
scheduleNextScheduleGroup(getScheduleGroupOfStage(stageIdForTaskUponCompletion));
       }
     }
     schedulerRunner.onAnExecutorAvailable();
@@ -502,7 +502,7 @@ private void onTaskExecutionFailedRecoverable(final String 
executorId,
         // TODO #50: Carefully retry tasks in the scheduler
       case OUTPUT_WRITE_FAILURE:
         blockManagerMaster.onProducerTaskFailed(taskId);
-        scheduleNextScheduleGroup(getSchedulingIndexOfStage(stageId));
+        scheduleNextScheduleGroup(getScheduleGroupOfStage(stageId));
         break;
       case CONTAINER_FAILURE:
         LOG.info("Only the failed task will be retried.");
@@ -513,7 +513,7 @@ private void onTaskExecutionFailedRecoverable(final String 
executorId,
     schedulerRunner.onAnExecutorAvailable();
   }
 
-  private int getSchedulingIndexOfStage(final String stageId) {
-    return 
physicalPlan.getStageDAG().getVertexById(stageId).getScheduleGroupIndex();
+  private int getScheduleGroupOfStage(final String stageId) {
+    return 
physicalPlan.getStageDAG().getVertexById(stageId).getScheduleGroup();
   }
 }
diff --git 
a/runtime/master/src/main/java/edu/snu/nemo/runtime/master/scheduler/CompositeSchedulingPolicy.java
 
b/runtime/master/src/main/java/edu/snu/nemo/runtime/master/scheduler/CompositeSchedulingPolicy.java
deleted file mode 100644
index 213f5f19..00000000
--- 
a/runtime/master/src/main/java/edu/snu/nemo/runtime/master/scheduler/CompositeSchedulingPolicy.java
+++ /dev/null
@@ -1,56 +0,0 @@
-/*
- * Copyright (C) 2018 Seoul National University
- *
- * Licensed under the Apache License, Version 2.0 (the "License");
- * you may not use this file except in compliance with the License.
- * You may obtain a copy of the License at
- *
- *         http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
- */
-package edu.snu.nemo.runtime.master.scheduler;
-
-import edu.snu.nemo.runtime.common.plan.Task;
-import edu.snu.nemo.runtime.master.resource.ExecutorRepresenter;
-
-import javax.inject.Inject;
-import java.util.Arrays;
-import java.util.List;
-import java.util.Set;
-
-/**
- * Temporary class to implement stacked scheduling policy.
- * At now, policies are injected through Tang, but have to be configurable by 
users
- * when Nemo supports job-wide execution property.
- * TODO #69: Support job-wide execution property.
- */
-public final class CompositeSchedulingPolicy implements SchedulingPolicy {
-  private final List<SchedulingPolicy> schedulingPolicies;
-
-  @Inject
-  private CompositeSchedulingPolicy(final SourceLocationAwareSchedulingPolicy 
sourceLocationAwareSchedulingPolicy,
-                                    final MinOccupancyFirstSchedulingPolicy 
minOccupancyFirstSchedulingPolicy,
-                                    final FreeSlotSchedulingPolicy 
freeSlotSchedulingPolicy,
-                                    final ContainerTypeAwareSchedulingPolicy 
containerTypeAwareSchedulingPolicy) {
-    schedulingPolicies = Arrays.asList(
-        freeSlotSchedulingPolicy,
-        containerTypeAwareSchedulingPolicy,
-        sourceLocationAwareSchedulingPolicy,
-        minOccupancyFirstSchedulingPolicy);
-  }
-
-  @Override
-  public Set<ExecutorRepresenter> filterExecutorRepresenters(final 
Set<ExecutorRepresenter> executorRepresenterSet,
-                                                             final Task task) {
-    Set<ExecutorRepresenter> candidates = executorRepresenterSet;
-    for (final SchedulingPolicy schedulingPolicy : schedulingPolicies) {
-      candidates = schedulingPolicy.filterExecutorRepresenters(candidates, 
task);
-    }
-    return candidates;
-  }
-}
diff --git 
a/runtime/master/src/main/java/edu/snu/nemo/runtime/master/scheduler/ContainerTypeAwareSchedulingPolicy.java
 
b/runtime/master/src/main/java/edu/snu/nemo/runtime/master/scheduler/ContainerTypeAwareSchedulingConstraint.java
similarity index 52%
rename from 
runtime/master/src/main/java/edu/snu/nemo/runtime/master/scheduler/ContainerTypeAwareSchedulingPolicy.java
rename to 
runtime/master/src/main/java/edu/snu/nemo/runtime/master/scheduler/ContainerTypeAwareSchedulingConstraint.java
index 5f64e9c3..a91e996e 100644
--- 
a/runtime/master/src/main/java/edu/snu/nemo/runtime/master/scheduler/ContainerTypeAwareSchedulingPolicy.java
+++ 
b/runtime/master/src/main/java/edu/snu/nemo/runtime/master/scheduler/ContainerTypeAwareSchedulingConstraint.java
@@ -16,45 +16,29 @@
 package edu.snu.nemo.runtime.master.scheduler;
 
 import com.google.common.annotations.VisibleForTesting;
+import edu.snu.nemo.common.ir.executionproperty.AssociatedProperty;
 import 
edu.snu.nemo.common.ir.vertex.executionproperty.ExecutorPlacementProperty;
 import edu.snu.nemo.runtime.common.plan.Task;
 import edu.snu.nemo.runtime.master.resource.ExecutorRepresenter;
 
 import javax.inject.Inject;
-import java.util.Set;
-import java.util.stream.Collectors;
 
 /**
  * This policy find executors which has corresponding container type.
  */
-public final class ContainerTypeAwareSchedulingPolicy implements 
SchedulingPolicy {
+@AssociatedProperty(ExecutorPlacementProperty.class)
+public final class ContainerTypeAwareSchedulingConstraint implements 
SchedulingConstraint {
 
   @VisibleForTesting
   @Inject
-  public ContainerTypeAwareSchedulingPolicy() {
+  public ContainerTypeAwareSchedulingConstraint() {
   }
 
-  /**
-   * @param executorRepresenterSet Set of {@link ExecutorRepresenter} to be 
filtered by the container type.
-   *                               If the container type of target Task is 
NONE, it will return the original set.
-   * @param task {@link Task} to be scheduled.
-   * @return filtered Set of {@link ExecutorRepresenter}.
-   */
   @Override
-  public Set<ExecutorRepresenter> filterExecutorRepresenters(final 
Set<ExecutorRepresenter> executorRepresenterSet,
-                                                             final Task task) {
-
+  public boolean testSchedulability(final ExecutorRepresenter executor, final 
Task task) {
     final String executorPlacementPropertyValue = 
task.getPropertyValue(ExecutorPlacementProperty.class)
         .orElse(ExecutorPlacementProperty.NONE);
-    if (executorPlacementPropertyValue.equals(ExecutorPlacementProperty.NONE)) 
{
-      return executorRepresenterSet;
-    }
-
-    final Set<ExecutorRepresenter> candidateExecutors =
-        executorRepresenterSet.stream()
-            .filter(executor -> 
executor.getContainerType().equals(executorPlacementPropertyValue))
-            .collect(Collectors.toSet());
-
-    return candidateExecutors;
+    return 
executorPlacementPropertyValue.equals(ExecutorPlacementProperty.NONE) ? true
+        : executor.getContainerType().equals(executorPlacementPropertyValue);
   }
 }
diff --git 
a/runtime/master/src/main/java/edu/snu/nemo/runtime/master/scheduler/FreeSlotSchedulingPolicy.java
 
b/runtime/master/src/main/java/edu/snu/nemo/runtime/master/scheduler/FreeSlotSchedulingConstraint.java
similarity index 50%
rename from 
runtime/master/src/main/java/edu/snu/nemo/runtime/master/scheduler/FreeSlotSchedulingPolicy.java
rename to 
runtime/master/src/main/java/edu/snu/nemo/runtime/master/scheduler/FreeSlotSchedulingConstraint.java
index d92d9d46..1fc1f6e6 100644
--- 
a/runtime/master/src/main/java/edu/snu/nemo/runtime/master/scheduler/FreeSlotSchedulingPolicy.java
+++ 
b/runtime/master/src/main/java/edu/snu/nemo/runtime/master/scheduler/FreeSlotSchedulingConstraint.java
@@ -16,36 +16,29 @@
 package edu.snu.nemo.runtime.master.scheduler;
 
 import com.google.common.annotations.VisibleForTesting;
+import edu.snu.nemo.common.ir.executionproperty.AssociatedProperty;
+import 
edu.snu.nemo.common.ir.vertex.executionproperty.ExecutorSlotComplianceProperty;
 import edu.snu.nemo.runtime.common.plan.Task;
 import edu.snu.nemo.runtime.master.resource.ExecutorRepresenter;
 
 import javax.inject.Inject;
-import java.util.Set;
-import java.util.stream.Collectors;
 
 /**
  * This policy finds executor that has free slot for a Task.
  */
-public final class FreeSlotSchedulingPolicy implements SchedulingPolicy {
+@AssociatedProperty(ExecutorSlotComplianceProperty.class)
+public final class FreeSlotSchedulingConstraint implements 
SchedulingConstraint {
   @VisibleForTesting
   @Inject
-  public FreeSlotSchedulingPolicy() {
+  public FreeSlotSchedulingConstraint() {
   }
 
-  /**
-   * @param executorRepresenterSet Set of {@link ExecutorRepresenter} to be 
filtered by the free slot of executors.
-   *                               Executors that do not have any free slots 
will be filtered by this policy.
-   * @param task {@link Task} to be scheduled.
-   * @return filtered Set of {@link ExecutorRepresenter}.
-   */
   @Override
-  public Set<ExecutorRepresenter> filterExecutorRepresenters(final 
Set<ExecutorRepresenter> executorRepresenterSet,
-                                                             final Task task) {
-    final Set<ExecutorRepresenter> candidateExecutors =
-        executorRepresenterSet.stream()
-            .filter(executor -> executor.getRunningTasks().size() < 
executor.getExecutorCapacity())
-            .collect(Collectors.toSet());
+  public boolean testSchedulability(final ExecutorRepresenter executor, final 
Task task) {
+    if 
(!task.getPropertyValue(ExecutorSlotComplianceProperty.class).orElse(false)) {
+      return true;
+    }
 
-    return candidateExecutors;
+    return executor.getRunningTasks().size() < executor.getExecutorCapacity();
   }
 }
diff --git 
a/runtime/master/src/main/java/edu/snu/nemo/runtime/master/scheduler/MinOccupancyFirstSchedulingPolicy.java
 
b/runtime/master/src/main/java/edu/snu/nemo/runtime/master/scheduler/MinOccupancyFirstSchedulingPolicy.java
index 120e1fb0..e53f659a 100644
--- 
a/runtime/master/src/main/java/edu/snu/nemo/runtime/master/scheduler/MinOccupancyFirstSchedulingPolicy.java
+++ 
b/runtime/master/src/main/java/edu/snu/nemo/runtime/master/scheduler/MinOccupancyFirstSchedulingPolicy.java
@@ -26,12 +26,7 @@
 import org.slf4j.Logger;
 import org.slf4j.LoggerFactory;
 
-import java.util.stream.Collectors;
-
 /**
- * {@inheritDoc}
- * A scheduling policy used by {@link BatchSingleJobScheduler}.
- *
  * This policy chooses a set of Executors, on which have minimum running Tasks.
  */
 @ThreadSafe
@@ -44,28 +39,20 @@
   public MinOccupancyFirstSchedulingPolicy() {
   }
 
-  /**
-   * @param executorRepresenterSet Set of {@link ExecutorRepresenter} to be 
filtered by the occupancy of the Executors.
-   * @param task {@link Task} to be scheduled.
-   * @return filtered Set of {@link ExecutorRepresenter}.
-   */
   @Override
-  public Set<ExecutorRepresenter> filterExecutorRepresenters(final 
Set<ExecutorRepresenter> executorRepresenterSet,
-                                                             final Task task) {
+  public ExecutorRepresenter selectExecutor(final 
Collection<ExecutorRepresenter> executors, final Task task) {
     final OptionalInt minOccupancy =
-        executorRepresenterSet.stream()
+        executors.stream()
         .map(executor -> executor.getRunningTasks().size())
         .mapToInt(i -> i).min();
 
     if (!minOccupancy.isPresent()) {
-      return Collections.emptySet();
+      throw new RuntimeException("Cannot find min occupancy");
     }
 
-    final Set<ExecutorRepresenter> candidateExecutors =
-        executorRepresenterSet.stream()
+    return executors.stream()
         .filter(executor -> executor.getRunningTasks().size() == 
minOccupancy.getAsInt())
-        .collect(Collectors.toSet());
-
-    return candidateExecutors;
+        .findFirst()
+        .orElseThrow(() -> new RuntimeException("No such executor"));
   }
 }
diff --git 
a/runtime/master/src/main/java/edu/snu/nemo/runtime/master/scheduler/SchedulerRunner.java
 
b/runtime/master/src/main/java/edu/snu/nemo/runtime/master/scheduler/SchedulerRunner.java
index 62af0408..42e9a2c4 100644
--- 
a/runtime/master/src/main/java/edu/snu/nemo/runtime/master/scheduler/SchedulerRunner.java
+++ 
b/runtime/master/src/main/java/edu/snu/nemo/runtime/master/scheduler/SchedulerRunner.java
@@ -20,6 +20,7 @@
 import edu.snu.nemo.runtime.common.state.TaskState;
 import edu.snu.nemo.runtime.master.JobStateManager;
 import edu.snu.nemo.runtime.master.resource.ExecutorRepresenter;
+import org.apache.commons.lang3.mutable.MutableObject;
 import org.apache.reef.annotations.audience.DriverSide;
 
 import java.util.*;
@@ -28,6 +29,7 @@
 import java.util.concurrent.locks.Condition;
 import java.util.concurrent.locks.Lock;
 import java.util.concurrent.locks.ReentrantLock;
+import java.util.stream.Collectors;
 
 import org.slf4j.Logger;
 import org.slf4j.LoggerFactory;
@@ -52,12 +54,14 @@
   private boolean isTerminated;
 
   private final DelayedSignalingCondition schedulingIteration = new 
DelayedSignalingCondition();
-  private ExecutorRegistry executorRegistry;
-  private SchedulingPolicy schedulingPolicy;
+  private final ExecutorRegistry executorRegistry;
+  private final SchedulingConstraintRegistry schedulingConstraintRegistry;
+  private final SchedulingPolicy schedulingPolicy;
 
   @VisibleForTesting
   @Inject
-  public SchedulerRunner(final SchedulingPolicy schedulingPolicy,
+  public SchedulerRunner(final SchedulingConstraintRegistry 
schedulingConstraintRegistry,
+                         final SchedulingPolicy schedulingPolicy,
                          final PendingTaskCollectionPointer 
pendingTaskCollectionPointer,
                          final ExecutorRegistry executorRegistry) {
     this.jobStateManagers = new HashMap<>();
@@ -67,6 +71,7 @@ public SchedulerRunner(final SchedulingPolicy 
schedulingPolicy,
     this.isTerminated = false;
     this.executorRegistry = executorRegistry;
     this.schedulingPolicy = schedulingPolicy;
+    this.schedulingConstraintRegistry = schedulingConstraintRegistry;
   }
 
   /**
@@ -111,16 +116,23 @@ void doScheduleTaskList() {
 
       LOG.debug("Trying to schedule {}...", task.getTaskId());
       executorRegistry.viewExecutors(executors -> {
-        final Set<ExecutorRepresenter> candidateExecutors =
-            schedulingPolicy.filterExecutorRepresenters(executors, task);
-        final Optional<ExecutorRepresenter> firstCandidate = 
candidateExecutors.stream().findFirst();
-
-        if (firstCandidate.isPresent()) {
+        final MutableObject<Set<ExecutorRepresenter>> candidateExecutors = new 
MutableObject<>(executors);
+        task.getExecutionProperties().forEachProperties(property -> {
+          final Optional<SchedulingConstraint> constraint = 
schedulingConstraintRegistry.get(property.getClass());
+          if (constraint.isPresent() && 
!candidateExecutors.getValue().isEmpty()) {
+            candidateExecutors.setValue(candidateExecutors.getValue().stream()
+                .filter(e -> constraint.get().testSchedulability(e, task))
+                .collect(Collectors.toSet()));
+          }
+        });
+        if (!candidateExecutors.getValue().isEmpty()) {
+          // Select executor
+          final ExecutorRepresenter selectedExecutor
+              = schedulingPolicy.selectExecutor(candidateExecutors.getValue(), 
task);
           // update metadata first
           jobStateManager.onTaskStateChanged(task.getTaskId(), 
TaskState.State.EXECUTING);
 
           // send the task
-          final ExecutorRepresenter selectedExecutor = firstCandidate.get();
           selectedExecutor.onTaskScheduled(task);
         } else {
           couldNotSchedule.add(task);
diff --git 
a/runtime/master/src/main/java/edu/snu/nemo/runtime/master/scheduler/SchedulingConstraint.java
 
b/runtime/master/src/main/java/edu/snu/nemo/runtime/master/scheduler/SchedulingConstraint.java
new file mode 100644
index 00000000..7e713441
--- /dev/null
+++ 
b/runtime/master/src/main/java/edu/snu/nemo/runtime/master/scheduler/SchedulingConstraint.java
@@ -0,0 +1,32 @@
+/*
+ * Copyright (C) 2018 Seoul National University
+ *
+ * Licensed under the Apache License, Version 2.0 (the "License");
+ * you may not use this file except in compliance with the License.
+ * You may obtain a copy of the License at
+ *
+ *         http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package edu.snu.nemo.runtime.master.scheduler;
+
+import edu.snu.nemo.runtime.common.plan.Task;
+import edu.snu.nemo.runtime.master.resource.ExecutorRepresenter;
+import org.apache.reef.annotations.audience.DriverSide;
+
+import javax.annotation.concurrent.ThreadSafe;
+
+/**
+ * Functions to test schedulability with a pair of an executor and a task.
+ */
+@DriverSide
+@ThreadSafe
+@FunctionalInterface
+public interface SchedulingConstraint {
+  boolean testSchedulability(final ExecutorRepresenter executor, final Task 
task);
+}
diff --git 
a/runtime/master/src/main/java/edu/snu/nemo/runtime/master/scheduler/SchedulingConstraintRegistry.java
 
b/runtime/master/src/main/java/edu/snu/nemo/runtime/master/scheduler/SchedulingConstraintRegistry.java
new file mode 100644
index 00000000..97ab5544
--- /dev/null
+++ 
b/runtime/master/src/main/java/edu/snu/nemo/runtime/master/scheduler/SchedulingConstraintRegistry.java
@@ -0,0 +1,74 @@
+/*
+ * Copyright (C) 2018 Seoul National University
+ *
+ * Licensed under the Apache License, Version 2.0 (the "License");
+ * you may not use this file except in compliance with the License.
+ * You may obtain a copy of the License at
+ *
+ *         http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package edu.snu.nemo.runtime.master.scheduler;
+
+import edu.snu.nemo.common.ir.executionproperty.AssociatedProperty;
+import edu.snu.nemo.common.ir.executionproperty.ExecutionProperty;
+import edu.snu.nemo.common.ir.executionproperty.VertexExecutionProperty;
+import org.apache.reef.annotations.audience.DriverSide;
+
+import javax.annotation.concurrent.ThreadSafe;
+import javax.inject.Inject;
+import java.lang.reflect.Type;
+import java.util.Map;
+import java.util.Optional;
+import java.util.concurrent.ConcurrentHashMap;
+
+/**
+ * Registry for {@link SchedulingConstraint}.
+ */
+@DriverSide
+@ThreadSafe
+public final class SchedulingConstraintRegistry {
+  private final Map<Type, SchedulingConstraint> typeToSchedulingConstraintMap 
= new ConcurrentHashMap<>();
+
+  @Inject
+  private SchedulingConstraintRegistry(
+      final ContainerTypeAwareSchedulingConstraint 
containerTypeAwareSchedulingConstraint,
+      final FreeSlotSchedulingConstraint freeSlotSchedulingConstraint,
+      final SourceLocationAwareSchedulingConstraint 
sourceLocationAwareSchedulingConstraint) {
+    registerSchedulingConstraint(containerTypeAwareSchedulingConstraint);
+    registerSchedulingConstraint(freeSlotSchedulingConstraint);
+    registerSchedulingConstraint(sourceLocationAwareSchedulingConstraint);
+  }
+
+  /**
+   * Registers a {@link SchedulingConstraint}.
+   * @param policy the policy to register
+   */
+  public void registerSchedulingConstraint(final SchedulingConstraint policy) {
+    final AssociatedProperty associatedProperty = 
policy.getClass().getAnnotation(AssociatedProperty.class);
+    if (associatedProperty == null || associatedProperty.value() == null) {
+      throw new RuntimeException(String.format("SchedulingConstraint %s has no 
associated VertexExecutionProperty",
+          policy.getClass()));
+    }
+    final Class<? extends ExecutionProperty> property = 
associatedProperty.value();
+    if (typeToSchedulingConstraintMap.putIfAbsent(property, policy) != null) {
+      throw new RuntimeException(String.format("Multiple SchedulingConstraint 
for VertexExecutionProperty %s:"
+          + "%s, %s", property, typeToSchedulingConstraintMap.get(property), 
policy));
+    }
+  }
+
+  /**
+   * Returns {@link SchedulingConstraint} for the given {@link 
VertexExecutionProperty}.
+   * @param propertyClass {@link VertexExecutionProperty} class
+   * @return the corresponding {@link SchedulingConstraint} object,
+   *         or {@link Optional#EMPTY} if no such policy was found
+   */
+  public Optional<SchedulingConstraint> get(final Class<? extends 
VertexExecutionProperty> propertyClass) {
+    return 
Optional.ofNullable(typeToSchedulingConstraintMap.get(propertyClass));
+  }
+}
diff --git 
a/runtime/master/src/main/java/edu/snu/nemo/runtime/master/scheduler/SchedulingPolicy.java
 
b/runtime/master/src/main/java/edu/snu/nemo/runtime/master/scheduler/SchedulingPolicy.java
index 0064310c..95288e7e 100644
--- 
a/runtime/master/src/main/java/edu/snu/nemo/runtime/master/scheduler/SchedulingPolicy.java
+++ 
b/runtime/master/src/main/java/edu/snu/nemo/runtime/master/scheduler/SchedulingPolicy.java
@@ -17,20 +17,27 @@
 
 import edu.snu.nemo.runtime.common.plan.Task;
 import edu.snu.nemo.runtime.master.resource.ExecutorRepresenter;
+import net.jcip.annotations.ThreadSafe;
 import org.apache.reef.annotations.audience.DriverSide;
 import org.apache.reef.tang.annotations.DefaultImplementation;
 
-import javax.annotation.concurrent.ThreadSafe;
-import java.util.Set;
+import java.util.Collection;
 
 /**
- * (WARNING) Implementations of this interface must be thread-safe.
+ * A function to select an executor from collection of available executors.
  */
 @DriverSide
 @ThreadSafe
 @FunctionalInterface
-@DefaultImplementation(CompositeSchedulingPolicy.class)
+@DefaultImplementation(MinOccupancyFirstSchedulingPolicy.class)
 public interface SchedulingPolicy {
-  Set<ExecutorRepresenter> filterExecutorRepresenters(final 
Set<ExecutorRepresenter> executorRepresenterSet,
-                                                      final Task task);
+  /**
+   * A function to select an executor from the specified collection of 
available executors.
+   *
+   * @param executors The collection of available executors.
+   *                  Implementations can assume that the collection is not 
empty.
+   * @param task The task to schedule
+   * @return The selected executor. It must be a member of {@code executors}.
+   */
+  ExecutorRepresenter selectExecutor(final Collection<ExecutorRepresenter> 
executors, final Task task);
 }
diff --git 
a/runtime/master/src/main/java/edu/snu/nemo/runtime/master/scheduler/SourceLocationAwareSchedulingPolicy.java
 
b/runtime/master/src/main/java/edu/snu/nemo/runtime/master/scheduler/SourceLocationAwareSchedulingConstraint.java
similarity index 67%
rename from 
runtime/master/src/main/java/edu/snu/nemo/runtime/master/scheduler/SourceLocationAwareSchedulingPolicy.java
rename to 
runtime/master/src/main/java/edu/snu/nemo/runtime/master/scheduler/SourceLocationAwareSchedulingConstraint.java
index f84f8a85..f18c9008 100644
--- 
a/runtime/master/src/main/java/edu/snu/nemo/runtime/master/scheduler/SourceLocationAwareSchedulingPolicy.java
+++ 
b/runtime/master/src/main/java/edu/snu/nemo/runtime/master/scheduler/SourceLocationAwareSchedulingConstraint.java
@@ -17,6 +17,8 @@
 
 import com.google.common.annotations.VisibleForTesting;
 import edu.snu.nemo.common.ir.Readable;
+import edu.snu.nemo.common.ir.executionproperty.AssociatedProperty;
+import 
edu.snu.nemo.common.ir.vertex.executionproperty.SourceLocationAwareSchedulingProperty;
 import edu.snu.nemo.runtime.common.plan.Task;
 import edu.snu.nemo.runtime.master.resource.ExecutorRepresenter;
 import org.apache.reef.annotations.audience.DriverSide;
@@ -26,21 +28,21 @@
 import javax.annotation.concurrent.ThreadSafe;
 import javax.inject.Inject;
 import java.util.*;
-import java.util.stream.Collectors;
 
 /**
  * This policy is same as {@link MinOccupancyFirstSchedulingPolicy}, however 
for Tasks
  * with {@link edu.snu.nemo.common.ir.vertex.SourceVertex}, it tries to pick 
one of the executors
- * where the corresponding data resides.
+ * where the corresponding data reside.
  */
 @ThreadSafe
 @DriverSide
-public final class SourceLocationAwareSchedulingPolicy implements 
SchedulingPolicy {
-  private static final Logger LOG = 
LoggerFactory.getLogger(SourceLocationAwareSchedulingPolicy.class);
+@AssociatedProperty(SourceLocationAwareSchedulingProperty.class)
+public final class SourceLocationAwareSchedulingConstraint implements 
SchedulingConstraint {
+  private static final Logger LOG = 
LoggerFactory.getLogger(SourceLocationAwareSchedulingConstraint.class);
 
   @VisibleForTesting
   @Inject
-  public SourceLocationAwareSchedulingPolicy() {
+  public SourceLocationAwareSchedulingConstraint() {
   }
 
   /**
@@ -56,33 +58,21 @@ public SourceLocationAwareSchedulingPolicy() {
     return new HashSet<>(sourceLocations);
   }
 
-  /**
-   * @param executorRepresenterSet Set of {@link ExecutorRepresenter} to be 
filtered by source location.
-   *                               If there is no source locations, will 
return original set.
-   * @param task {@link Task} to be scheduled.
-   * @return filtered Set of {@link ExecutorRepresenter}.
-   */
   @Override
-  public Set<ExecutorRepresenter> filterExecutorRepresenters(final 
Set<ExecutorRepresenter> executorRepresenterSet,
-                                                             final Task task) {
+  public boolean testSchedulability(final ExecutorRepresenter executor, final 
Task task) {
     final Set<String> sourceLocations;
     try {
       sourceLocations = 
getSourceLocations(task.getIrVertexIdToReadable().values());
     } catch (final UnsupportedOperationException e) {
-      return executorRepresenterSet;
+      return true;
     } catch (final Exception e) {
       throw new RuntimeException(e);
     }
 
     if (sourceLocations.size() == 0) {
-      return executorRepresenterSet;
+      return true;
     }
 
-    final Set<ExecutorRepresenter> candidateExecutors =
-            executorRepresenterSet.stream()
-            .filter(executor -> 
sourceLocations.contains(executor.getNodeName()))
-            .collect(Collectors.toSet());
-
-    return candidateExecutors;
+    return sourceLocations.contains(executor.getNodeName());
   }
 }
diff --git 
a/runtime/master/src/test/java/edu/snu/nemo/runtime/master/scheduler/BatchSingleJobSchedulerTest.java
 
b/runtime/master/src/test/java/edu/snu/nemo/runtime/master/scheduler/BatchSingleJobSchedulerTest.java
index ba8a2f5e..8bfe43d2 100644
--- 
a/runtime/master/src/test/java/edu/snu/nemo/runtime/master/scheduler/BatchSingleJobSchedulerTest.java
+++ 
b/runtime/master/src/test/java/edu/snu/nemo/runtime/master/scheduler/BatchSingleJobSchedulerTest.java
@@ -63,6 +63,7 @@
 public final class BatchSingleJobSchedulerTest {
   private static final Logger LOG = 
LoggerFactory.getLogger(BatchSingleJobSchedulerTest.class.getName());
   private Scheduler scheduler;
+  private SchedulingConstraintRegistry schedulingConstraint;
   private SchedulingPolicy schedulingPolicy;
   private SchedulerRunner schedulerRunner;
   private ExecutorRegistry executorRegistry;
@@ -86,8 +87,9 @@ public void setUp() throws Exception {
     executorRegistry = injector.getInstance(ExecutorRegistry.class);
     metricMessageHandler = mock(MetricMessageHandler.class);
     pendingTaskCollectionPointer = new PendingTaskCollectionPointer();
-    schedulingPolicy = injector.getInstance(CompositeSchedulingPolicy.class);
-    schedulerRunner = new SchedulerRunner(schedulingPolicy, 
pendingTaskCollectionPointer, executorRegistry);
+    schedulingConstraint = 
injector.getInstance(SchedulingConstraintRegistry.class);
+    schedulingPolicy = injector.getInstance(SchedulingPolicy.class);
+    schedulerRunner = new SchedulerRunner(schedulingConstraint, 
schedulingPolicy, pendingTaskCollectionPointer, executorRegistry);
     pubSubEventHandler = mock(PubSubEventHandlerWrapper.class);
     updatePhysicalPlanEventHandler = 
mock(UpdatePhysicalPlanEventHandler.class);
     scheduler =
@@ -154,7 +156,7 @@ private void scheduleAndCheckJobTermination(final 
PhysicalPlan plan) throws Inje
     // b) the stages of the next ScheduleGroup are scheduled after the stages 
of each ScheduleGroup are made "complete".
     for (int i = 0; i < getNumScheduleGroups(plan.getStageDAG()); i++) {
       final int scheduleGroupIdx = i;
-      final List<Stage> stages = 
filterStagesWithAScheduleGroupIndex(plan.getStageDAG(), scheduleGroupIdx);
+      final List<Stage> stages = 
filterStagesWithAScheduleGroup(plan.getStageDAG(), scheduleGroupIdx);
 
       LOG.debug("Checking that all stages of ScheduleGroup {} enter the 
executing state", scheduleGroupIdx);
       stages.forEach(stage -> {
@@ -175,10 +177,10 @@ private void scheduleAndCheckJobTermination(final 
PhysicalPlan plan) throws Inje
     assertTrue(jobStateManager.isJobDone());
   }
 
-  private List<Stage> filterStagesWithAScheduleGroupIndex(
-      final DAG<Stage, StageEdge> physicalDAG, final int scheduleGroupIndex) {
+  private List<Stage> filterStagesWithAScheduleGroup(
+      final DAG<Stage, StageEdge> physicalDAG, final int scheduleGroup) {
     final Set<Stage> stageSet = new HashSet<>(physicalDAG.filterVertices(
-        stage -> stage.getScheduleGroupIndex() == scheduleGroupIndex));
+        stage -> stage.getScheduleGroup() == scheduleGroup));
 
     // Return the filtered vertices as a sorted list
     final List<Stage> sortedStages = new ArrayList<>(stageSet.size());
@@ -192,7 +194,7 @@ private void scheduleAndCheckJobTermination(final 
PhysicalPlan plan) throws Inje
 
   private int getNumScheduleGroups(final DAG<Stage, StageEdge> physicalDAG) {
     final Set<Integer> scheduleGroupSet = new HashSet<>();
-    physicalDAG.getVertices().forEach(stage -> 
scheduleGroupSet.add(stage.getScheduleGroupIndex()));
+    physicalDAG.getVertices().forEach(stage -> 
scheduleGroupSet.add(stage.getScheduleGroup()));
     return scheduleGroupSet.size();
   }
 }
diff --git 
a/runtime/master/src/test/java/edu/snu/nemo/runtime/master/scheduler/ContainerTypeAwareSchedulingPolicyTest.java
 
b/runtime/master/src/test/java/edu/snu/nemo/runtime/master/scheduler/ContainerTypeAwareSchedulingConstraintTest.java
similarity index 78%
rename from 
runtime/master/src/test/java/edu/snu/nemo/runtime/master/scheduler/ContainerTypeAwareSchedulingPolicyTest.java
rename to 
runtime/master/src/test/java/edu/snu/nemo/runtime/master/scheduler/ContainerTypeAwareSchedulingConstraintTest.java
index 7a2d1490..4aa2ecca 100644
--- 
a/runtime/master/src/test/java/edu/snu/nemo/runtime/master/scheduler/ContainerTypeAwareSchedulingPolicyTest.java
+++ 
b/runtime/master/src/test/java/edu/snu/nemo/runtime/master/scheduler/ContainerTypeAwareSchedulingConstraintTest.java
@@ -24,16 +24,17 @@
 import org.powermock.modules.junit4.PowerMockRunner;
 
 import java.util.*;
+import java.util.stream.Collectors;
 
 import static org.junit.Assert.assertEquals;
 import static org.mockito.Mockito.*;
 
 /**
- * Tests {@link ContainerTypeAwareSchedulingPolicy}.
+ * Tests {@link ContainerTypeAwareSchedulingConstraint}.
  */
 @RunWith(PowerMockRunner.class)
 @PrepareForTest({ExecutorRepresenter.class, Task.class})
-public final class ContainerTypeAwareSchedulingPolicyTest {
+public final class ContainerTypeAwareSchedulingConstraintTest {
 
   private static ExecutorRepresenter mockExecutorRepresenter(final String 
containerType) {
     final ExecutorRepresenter executorRepresenter = 
mock(ExecutorRepresenter.class);
@@ -43,7 +44,7 @@ private static ExecutorRepresenter 
mockExecutorRepresenter(final String containe
 
   @Test
   public void testContainerTypeAware() {
-    final SchedulingPolicy schedulingPolicy = new 
ContainerTypeAwareSchedulingPolicy();
+    final SchedulingConstraint schedulingConstraint = new 
ContainerTypeAwareSchedulingConstraint();
     final ExecutorRepresenter a0 = 
mockExecutorRepresenter(ExecutorPlacementProperty.TRANSIENT);
     final ExecutorRepresenter a1 = 
mockExecutorRepresenter(ExecutorPlacementProperty.RESERVED);
     final ExecutorRepresenter a2 = 
mockExecutorRepresenter(ExecutorPlacementProperty.NONE);
@@ -54,10 +55,11 @@ public void testContainerTypeAware() {
 
     final Set<ExecutorRepresenter> executorRepresenterList1 = new 
HashSet<>(Arrays.asList(a0, a1, a2));
 
-    final Set<ExecutorRepresenter> candidateExecutors1 =
-        schedulingPolicy.filterExecutorRepresenters(executorRepresenterList1, 
task1);
+    final Set<ExecutorRepresenter> candidateExecutors1 = 
executorRepresenterList1.stream()
+        .filter(e -> schedulingConstraint.testSchedulability(e, task1))
+        .collect(Collectors.toSet());;
 
-    final Set<ExecutorRepresenter> expectedExecutors1 = new 
HashSet<>(Arrays.asList(a1));
+    final Set<ExecutorRepresenter> expectedExecutors1 = 
Collections.singleton(a1);
     assertEquals(expectedExecutors1, candidateExecutors1);
 
     final Task task2 = mock(Task.class);
@@ -66,8 +68,9 @@ public void testContainerTypeAware() {
 
     final Set<ExecutorRepresenter> executorRepresenterList2 = new 
HashSet<>(Arrays.asList(a0, a1, a2));
 
-    final Set<ExecutorRepresenter> candidateExecutors2 =
-        schedulingPolicy.filterExecutorRepresenters(executorRepresenterList2, 
task2);
+    final Set<ExecutorRepresenter> candidateExecutors2 = 
executorRepresenterList2.stream()
+        .filter(e -> schedulingConstraint.testSchedulability(e, task2))
+        .collect(Collectors.toSet());
 
     final Set<ExecutorRepresenter> expectedExecutors2 = new 
HashSet<>(Arrays.asList(a0, a1, a2));
     assertEquals(expectedExecutors2, candidateExecutors2);
diff --git 
a/runtime/master/src/test/java/edu/snu/nemo/runtime/master/scheduler/FaultToleranceTest.java
 
b/runtime/master/src/test/java/edu/snu/nemo/runtime/master/scheduler/FaultToleranceTest.java
deleted file mode 100644
index 62b51812..00000000
--- 
a/runtime/master/src/test/java/edu/snu/nemo/runtime/master/scheduler/FaultToleranceTest.java
+++ /dev/null
@@ -1,379 +0,0 @@
-/*
- * Copyright (C) 2018 Seoul National University
- *
- * Licensed under the Apache License, Version 2.0 (the "License");
- * you may not use this file except in compliance with the License.
- * You may obtain a copy of the License at
- *
- *         http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
- */
-package edu.snu.nemo.runtime.master.scheduler;
-
-import edu.snu.nemo.common.eventhandler.PubSubEventHandlerWrapper;
-import 
edu.snu.nemo.common.ir.vertex.executionproperty.ExecutorPlacementProperty;
-import edu.snu.nemo.runtime.common.comm.ControlMessage;
-import edu.snu.nemo.runtime.common.message.MessageSender;
-import edu.snu.nemo.runtime.common.plan.PhysicalPlan;
-import edu.snu.nemo.runtime.common.plan.Stage;
-import edu.snu.nemo.runtime.common.state.TaskState;
-import edu.snu.nemo.runtime.master.JobStateManager;
-import edu.snu.nemo.runtime.master.MetricMessageHandler;
-import edu.snu.nemo.runtime.master.BlockManagerMaster;
-import edu.snu.nemo.runtime.master.eventhandler.UpdatePhysicalPlanEventHandler;
-import edu.snu.nemo.runtime.master.resource.ExecutorRepresenter;
-import edu.snu.nemo.runtime.master.resource.ResourceSpecification;
-import edu.snu.nemo.runtime.plangenerator.TestPlanGenerator;
-import org.apache.reef.driver.context.ActiveContext;
-import org.apache.reef.tang.Injector;
-import org.apache.reef.tang.Tang;
-import org.apache.reef.tang.exceptions.InjectionException;
-import org.junit.Before;
-import org.junit.Test;
-import org.junit.runner.RunWith;
-import org.mockito.Mockito;
-import org.powermock.core.classloader.annotations.PrepareForTest;
-import org.powermock.modules.junit4.PowerMockRunner;
-import org.slf4j.Logger;
-import org.slf4j.LoggerFactory;
-
-import java.util.*;
-import java.util.concurrent.ExecutorService;
-import java.util.concurrent.Executors;
-import java.util.function.Function;
-
-import static edu.snu.nemo.runtime.common.state.StageState.State.COMPLETE;
-import static org.junit.Assert.assertEquals;
-import static org.junit.Assert.assertTrue;
-import static org.mockito.Mockito.mock;
-
-/**
- * Tests fault tolerance.
- */
-@RunWith(PowerMockRunner.class)
-@PrepareForTest({BlockManagerMaster.class, SchedulerRunner.class,
-    PubSubEventHandlerWrapper.class, UpdatePhysicalPlanEventHandler.class, 
MetricMessageHandler.class})
-public final class FaultToleranceTest {
-  private static final Logger LOG = 
LoggerFactory.getLogger(FaultToleranceTest.class.getName());
-
-  private SchedulingPolicy schedulingPolicy;
-  private SchedulerRunner schedulerRunner;
-  private ExecutorRegistry executorRegistry;
-
-  private MetricMessageHandler metricMessageHandler;
-  private PendingTaskCollectionPointer pendingTaskCollectionPointer;
-  private PubSubEventHandlerWrapper pubSubEventHandler;
-  private UpdatePhysicalPlanEventHandler updatePhysicalPlanEventHandler;
-  private BlockManagerMaster blockManagerMaster = 
mock(BlockManagerMaster.class);
-  private final MessageSender<ControlMessage.Message> mockMsgSender = 
mock(MessageSender.class);
-  private final ExecutorService serExecutorService = 
Executors.newSingleThreadExecutor();
-
-  private static final int MAX_SCHEDULE_ATTEMPT = Integer.MAX_VALUE;
-
-  @Before
-  public void setUp() throws Exception {
-    metricMessageHandler = mock(MetricMessageHandler.class);
-    pubSubEventHandler = mock(PubSubEventHandlerWrapper.class);
-    updatePhysicalPlanEventHandler = 
mock(UpdatePhysicalPlanEventHandler.class);
-
-  }
-
-  private Scheduler setUpScheduler(final boolean useMockSchedulerRunner) 
throws InjectionException {
-    final Injector injector = Tang.Factory.getTang().newInjector();
-    executorRegistry = injector.getInstance(ExecutorRegistry.class);
-
-    pendingTaskCollectionPointer = new PendingTaskCollectionPointer();
-    schedulingPolicy = injector.getInstance(CompositeSchedulingPolicy.class);
-
-    if (useMockSchedulerRunner) {
-      schedulerRunner = mock(SchedulerRunner.class);
-    } else {
-      schedulerRunner = new SchedulerRunner(schedulingPolicy, 
pendingTaskCollectionPointer, executorRegistry);
-    }
-    return new BatchSingleJobScheduler(schedulerRunner, 
pendingTaskCollectionPointer, blockManagerMaster,
-        pubSubEventHandler, updatePhysicalPlanEventHandler, executorRegistry);
-  }
-
-  /**
-   * Tests fault tolerance after a container removal.
-   */
-  @Test(timeout=50000)
-  public void testContainerRemoval() throws Exception {
-    final ActiveContext activeContext = mock(ActiveContext.class);
-    Mockito.doThrow(new RuntimeException()).when(activeContext).close();
-
-    final ResourceSpecification computeSpec = new 
ResourceSpecification(ExecutorPlacementProperty.COMPUTE, 2, 0);
-    final Function<String, ExecutorRepresenter> executorRepresenterGenerator = 
executorId ->
-        new ExecutorRepresenter(executorId, computeSpec, mockMsgSender, 
activeContext, serExecutorService, executorId);
-    final ExecutorRepresenter a3 = executorRepresenterGenerator.apply("a3");
-    final ExecutorRepresenter a2 = executorRepresenterGenerator.apply("a2");
-    final ExecutorRepresenter a1 = executorRepresenterGenerator.apply("a1");
-
-    final List<ExecutorRepresenter> executors = new ArrayList<>();
-    executors.add(a1);
-    executors.add(a2);
-    executors.add(a3);
-
-    final Scheduler scheduler = setUpScheduler(true);
-    for (final ExecutorRepresenter executor : executors) {
-      scheduler.onExecutorAdded(executor);
-    }
-
-    final PhysicalPlan plan =
-        
TestPlanGenerator.generatePhysicalPlan(TestPlanGenerator.PlanType.TwoVerticesJoined,
 false);
-
-    final JobStateManager jobStateManager =
-        new JobStateManager(plan, blockManagerMaster, metricMessageHandler, 
MAX_SCHEDULE_ATTEMPT);
-    scheduler.scheduleJob(plan, jobStateManager);
-
-    final List<Stage> dagOf4Stages = plan.getStageDAG().getTopologicalSort();
-
-    for (final Stage stage : dagOf4Stages) {
-      if (stage.getScheduleGroupIndex() == 0 || stage.getScheduleGroupIndex() 
== 1) {
-
-        // There are 3 executors, each of capacity 2, and there are 6 Tasks in 
ScheduleGroup 0 and 1.
-        
SchedulerTestUtil.mockSchedulingBySchedulerRunner(pendingTaskCollectionPointer, 
schedulingPolicy, jobStateManager,
-            executorRegistry, false);
-        stage.getTaskIds().forEach(taskId ->
-            SchedulerTestUtil.sendTaskStateEventToScheduler(scheduler, 
executorRegistry,
-                taskId, TaskState.State.COMPLETE, 1));
-      } else if (stage.getScheduleGroupIndex() == 2) {
-        scheduler.onExecutorRemoved("a3");
-        // There are 2 executors, each of capacity 2, and there are 2 Tasks in 
ScheduleGroup 2.
-        
SchedulerTestUtil.mockSchedulingBySchedulerRunner(pendingTaskCollectionPointer, 
schedulingPolicy, jobStateManager,
-            executorRegistry, false);
-
-        // Due to round robin scheduling, "a2" is assured to have a running 
Task.
-        scheduler.onExecutorRemoved("a2");
-
-        // Re-schedule
-        
SchedulerTestUtil.mockSchedulingBySchedulerRunner(pendingTaskCollectionPointer, 
schedulingPolicy, jobStateManager,
-            executorRegistry, false);
-
-        final Optional<Integer> maxTaskAttempt = stage.getTaskIds().stream()
-            .map(jobStateManager::getTaskAttempt).max(Integer::compareTo);
-        assertTrue(maxTaskAttempt.isPresent());
-        assertEquals(2, (int) maxTaskAttempt.get());
-
-        
SchedulerTestUtil.mockSchedulingBySchedulerRunner(pendingTaskCollectionPointer, 
schedulingPolicy, jobStateManager,
-            executorRegistry, false);
-        stage.getTaskIds().forEach(taskId ->
-            SchedulerTestUtil.sendTaskStateEventToScheduler(scheduler, 
executorRegistry,
-                taskId, TaskState.State.COMPLETE, 1));
-      } else if (stage.getScheduleGroupIndex() == 3) {
-        // There are 1 executors, each of capacity 2, and there are 2 Tasks in 
ScheduleGroup 3.
-        // Schedule only the first Task
-        
SchedulerTestUtil.mockSchedulingBySchedulerRunner(pendingTaskCollectionPointer, 
schedulingPolicy, jobStateManager,
-            executorRegistry, true);
-      } else {
-        throw new RuntimeException(String.format("Unexpected 
ScheduleGroupIndex: %d",
-            stage.getScheduleGroupIndex()));
-      }
-    }
-  }
-
-  /**
-   * Tests fault tolerance after an output write failure.
-   */
-  @Test(timeout=50000)
-  public void testOutputFailure() throws Exception {
-    final ActiveContext activeContext = mock(ActiveContext.class);
-    Mockito.doThrow(new RuntimeException()).when(activeContext).close();
-
-    final ResourceSpecification computeSpec = new 
ResourceSpecification(ExecutorPlacementProperty.COMPUTE, 2, 0);
-    final Function<String, ExecutorRepresenter> executorRepresenterGenerator = 
executorId ->
-        new ExecutorRepresenter(executorId, computeSpec, mockMsgSender, 
activeContext, serExecutorService, executorId);
-    final ExecutorRepresenter a3 = executorRepresenterGenerator.apply("a3");
-    final ExecutorRepresenter a2 = executorRepresenterGenerator.apply("a2");
-    final ExecutorRepresenter a1 = executorRepresenterGenerator.apply("a1");
-
-    final List<ExecutorRepresenter> executors = new ArrayList<>();
-    executors.add(a1);
-    executors.add(a2);
-    executors.add(a3);
-    final Scheduler scheduler = setUpScheduler(true);
-    for (final ExecutorRepresenter executor : executors) {
-      scheduler.onExecutorAdded(executor);
-    }
-
-    final PhysicalPlan plan =
-        
TestPlanGenerator.generatePhysicalPlan(TestPlanGenerator.PlanType.TwoVerticesJoined,
 false);
-    final JobStateManager jobStateManager =
-        new JobStateManager(plan, blockManagerMaster, metricMessageHandler, 
MAX_SCHEDULE_ATTEMPT);
-    scheduler.scheduleJob(plan, jobStateManager);
-
-    final List<Stage> dagOf4Stages = plan.getStageDAG().getTopologicalSort();
-
-    for (final Stage stage : dagOf4Stages) {
-      if (stage.getScheduleGroupIndex() == 0 || stage.getScheduleGroupIndex() 
== 1) {
-
-        // There are 3 executors, each of capacity 2, and there are 6 Tasks in 
ScheduleGroup 0 and 1.
-        
SchedulerTestUtil.mockSchedulingBySchedulerRunner(pendingTaskCollectionPointer, 
schedulingPolicy, jobStateManager,
-            executorRegistry, false);
-        stage.getTaskIds().forEach(taskId ->
-            SchedulerTestUtil.sendTaskStateEventToScheduler(scheduler, 
executorRegistry,
-                taskId, TaskState.State.COMPLETE, 1));
-      } else if (stage.getScheduleGroupIndex() == 2) {
-        // There are 3 executors, each of capacity 2, and there are 2 Tasks in 
ScheduleGroup 2.
-        
SchedulerTestUtil.mockSchedulingBySchedulerRunner(pendingTaskCollectionPointer, 
schedulingPolicy, jobStateManager,
-            executorRegistry, false);
-        stage.getTaskIds().forEach(taskId ->
-            SchedulerTestUtil.sendTaskStateEventToScheduler(scheduler, 
executorRegistry,
-                taskId, TaskState.State.FAILED_RECOVERABLE, 1,
-                TaskState.RecoverableFailureCause.OUTPUT_WRITE_FAILURE));
-
-        // Re-schedule
-        
SchedulerTestUtil.mockSchedulingBySchedulerRunner(pendingTaskCollectionPointer, 
schedulingPolicy, jobStateManager,
-            executorRegistry, false);
-
-        final Optional<Integer> maxTaskAttempt = stage.getTaskIds().stream()
-            .map(jobStateManager::getTaskAttempt).max(Integer::compareTo);
-        assertTrue(maxTaskAttempt.isPresent());
-        assertEquals(2, (int) maxTaskAttempt.get());
-
-        stage.getTaskIds().forEach(taskId ->
-            assertEquals(TaskState.State.EXECUTING, 
jobStateManager.getTaskState(taskId)));
-      }
-    }
-  }
-
-  /**
-   * Tests fault tolerance after an input read failure.
-   */
-  @Test(timeout=50000)
-  public void testInputReadFailure() throws Exception {
-    final ActiveContext activeContext = mock(ActiveContext.class);
-    Mockito.doThrow(new RuntimeException()).when(activeContext).close();
-
-    final ResourceSpecification computeSpec = new 
ResourceSpecification(ExecutorPlacementProperty.COMPUTE, 2, 0);
-    final Function<String, ExecutorRepresenter> executorRepresenterGenerator = 
executorId ->
-        new ExecutorRepresenter(executorId, computeSpec, mockMsgSender, 
activeContext, serExecutorService, executorId);
-    final ExecutorRepresenter a3 = executorRepresenterGenerator.apply("a3");
-    final ExecutorRepresenter a2 = executorRepresenterGenerator.apply("a2");
-    final ExecutorRepresenter a1 = executorRepresenterGenerator.apply("a1");
-
-    final List<ExecutorRepresenter> executors = new ArrayList<>();
-    executors.add(a1);
-    executors.add(a2);
-    executors.add(a3);
-    final Scheduler scheduler = setUpScheduler(true);
-    for (final ExecutorRepresenter executor : executors) {
-      scheduler.onExecutorAdded(executor);
-    }
-
-    final PhysicalPlan plan =
-        
TestPlanGenerator.generatePhysicalPlan(TestPlanGenerator.PlanType.TwoVerticesJoined,
 false);
-    final JobStateManager jobStateManager =
-        new JobStateManager(plan, blockManagerMaster, metricMessageHandler, 
MAX_SCHEDULE_ATTEMPT);
-    scheduler.scheduleJob(plan, jobStateManager);
-
-    final List<Stage> dagOf4Stages = plan.getStageDAG().getTopologicalSort();
-
-    for (final Stage stage : dagOf4Stages) {
-      if (stage.getScheduleGroupIndex() == 0 || stage.getScheduleGroupIndex() 
== 1) {
-
-        // There are 3 executors, each of capacity 2, and there are 6 Tasks in 
ScheduleGroup 0 and 1.
-        
SchedulerTestUtil.mockSchedulingBySchedulerRunner(pendingTaskCollectionPointer, 
schedulingPolicy, jobStateManager,
-            executorRegistry, false);
-        stage.getTaskIds().forEach(taskId ->
-            SchedulerTestUtil.sendTaskStateEventToScheduler(scheduler, 
executorRegistry,
-                taskId, TaskState.State.COMPLETE, 1));
-      } else if (stage.getScheduleGroupIndex() == 2) {
-        // There are 3 executors, each of capacity 2, and there are 2 Tasks in 
ScheduleGroup 2.
-        
SchedulerTestUtil.mockSchedulingBySchedulerRunner(pendingTaskCollectionPointer, 
schedulingPolicy, jobStateManager,
-            executorRegistry, false);
-
-        stage.getTaskIds().forEach(taskId ->
-            SchedulerTestUtil.sendTaskStateEventToScheduler(scheduler, 
executorRegistry,
-                taskId, TaskState.State.FAILED_RECOVERABLE, 1,
-                TaskState.RecoverableFailureCause.INPUT_READ_FAILURE));
-
-        // Re-schedule
-        
SchedulerTestUtil.mockSchedulingBySchedulerRunner(pendingTaskCollectionPointer, 
schedulingPolicy, jobStateManager,
-            executorRegistry, false);
-
-        final Optional<Integer> maxTaskAttempt = stage.getTaskIds().stream()
-            .map(jobStateManager::getTaskAttempt).max(Integer::compareTo);
-        assertTrue(maxTaskAttempt.isPresent());
-        assertEquals(2, (int) maxTaskAttempt.get());
-
-        stage.getTaskIds().forEach(taskId ->
-            assertEquals(TaskState.State.EXECUTING, 
jobStateManager.getTaskState(taskId)));
-      }
-    }
-  }
-
-  /**
-   * Tests the rescheduling of Tasks upon a failure.
-   */
-  @Test(timeout=200000)
-  public void testTaskReexecutionForFailure() throws Exception {
-    final ActiveContext activeContext = mock(ActiveContext.class);
-    Mockito.doThrow(new RuntimeException()).when(activeContext).close();
-
-    final ResourceSpecification computeSpec = new 
ResourceSpecification(ExecutorPlacementProperty.COMPUTE, 2, 0);
-    final Function<String, ExecutorRepresenter> executorRepresenterGenerator = 
executorId ->
-        new ExecutorRepresenter(executorId, computeSpec, mockMsgSender, 
activeContext, serExecutorService, executorId);
-
-    final Scheduler scheduler = setUpScheduler(false);
-    final PhysicalPlan plan =
-        
TestPlanGenerator.generatePhysicalPlan(TestPlanGenerator.PlanType.TwoVerticesJoined,
 false);
-    final JobStateManager jobStateManager =
-        new JobStateManager(plan, blockManagerMaster, metricMessageHandler, 
MAX_SCHEDULE_ATTEMPT);
-    scheduler.scheduleJob(plan, jobStateManager);
-
-    final List<ExecutorRepresenter> executors = new ArrayList<>();
-    final List<Stage> dagOf4Stages = plan.getStageDAG().getTopologicalSort();
-
-    int executorIdIndex = 1;
-    float removalChance = 0.5f; // Out of 1.0
-    final Random random = new Random(0); // Deterministic seed.
-
-    for (final Stage stage : dagOf4Stages) {
-
-      while (jobStateManager.getStageState(stage.getId()) != COMPLETE) {
-        // By chance, remove or add executor
-        if (isTrueByChance(random, removalChance)) {
-          // REMOVE EXECUTOR
-          if (!executors.isEmpty()) {
-            
scheduler.onExecutorRemoved(executors.remove(random.nextInt(executors.size())).getExecutorId());
-          } else {
-            // Skip, since no executor is running.
-          }
-        } else {
-          if (executors.size() < 3) {
-            // ADD EXECUTOR
-            final ExecutorRepresenter newExecutor = 
executorRepresenterGenerator.apply("a" + executorIdIndex);
-            executorIdIndex += 1;
-            executors.add(newExecutor);
-            scheduler.onExecutorAdded(newExecutor);
-          } else {
-            // Skip, in order to keep the total number of running executors 
below or equal to 3
-          }
-        }
-
-        // Complete the execution of tasks
-        if (!executors.isEmpty()) {
-          final int indexOfCompletedExecutor = 
random.nextInt(executors.size());
-          // New set for snapshotting
-          final Map<String, Integer> runningTaskSnapshot =
-              new 
HashMap<>(executors.get(indexOfCompletedExecutor).getRunningTaskToAttempt());
-          runningTaskSnapshot.entrySet().forEach(entry -> {
-            SchedulerTestUtil.sendTaskStateEventToScheduler(
-                scheduler, executorRegistry, entry.getKey(), 
TaskState.State.COMPLETE, entry.getValue());
-          });
-        }
-      }
-    }
-    assertTrue(jobStateManager.isJobDone());
-  }
-
-  private boolean isTrueByChance(final Random random, final float chance) {
-    return chance > random.nextDouble();
-  }
-}
diff --git 
a/runtime/master/src/test/java/edu/snu/nemo/runtime/master/scheduler/FreeSlotSchedulingPolicyTest.java
 
b/runtime/master/src/test/java/edu/snu/nemo/runtime/master/scheduler/FreeSlotSchedulingConstraintTest.java
similarity index 75%
rename from 
runtime/master/src/test/java/edu/snu/nemo/runtime/master/scheduler/FreeSlotSchedulingPolicyTest.java
rename to 
runtime/master/src/test/java/edu/snu/nemo/runtime/master/scheduler/FreeSlotSchedulingConstraintTest.java
index c60eeb3a..f2dd8785 100644
--- 
a/runtime/master/src/test/java/edu/snu/nemo/runtime/master/scheduler/FreeSlotSchedulingPolicyTest.java
+++ 
b/runtime/master/src/test/java/edu/snu/nemo/runtime/master/scheduler/FreeSlotSchedulingConstraintTest.java
@@ -15,6 +15,7 @@
  */
 package edu.snu.nemo.runtime.master.scheduler;
 
+import 
edu.snu.nemo.common.ir.vertex.executionproperty.ExecutorSlotComplianceProperty;
 import edu.snu.nemo.runtime.common.plan.Task;
 import edu.snu.nemo.runtime.master.resource.ExecutorRepresenter;
 import org.junit.Test;
@@ -23,17 +24,18 @@
 import org.powermock.modules.junit4.PowerMockRunner;
 
 import java.util.*;
+import java.util.stream.Collectors;
 import java.util.stream.IntStream;
 
 import static org.junit.Assert.assertEquals;
 import static org.mockito.Mockito.*;
 
 /**
- * Tests {@link FreeSlotSchedulingPolicy}.
+ * Tests {@link FreeSlotSchedulingConstraint}.
  */
 @RunWith(PowerMockRunner.class)
 @PrepareForTest({ExecutorRepresenter.class, Task.class})
-public final class FreeSlotSchedulingPolicyTest {
+public final class FreeSlotSchedulingConstraintTest {
 
   private static ExecutorRepresenter mockExecutorRepresenter(final int 
numRunningTasks,
                                                              final int 
capacity) {
@@ -47,18 +49,20 @@ private static ExecutorRepresenter 
mockExecutorRepresenter(final int numRunningT
 
   @Test
   public void testFreeSlot() {
-    final SchedulingPolicy schedulingPolicy = new FreeSlotSchedulingPolicy();
+    final SchedulingConstraint schedulingConstraint = new 
FreeSlotSchedulingConstraint();
     final ExecutorRepresenter a0 = mockExecutorRepresenter(1, 1);
     final ExecutorRepresenter a1 = mockExecutorRepresenter(2, 3);
 
     final Task task = mock(Task.class);
+    
when(task.getPropertyValue(ExecutorSlotComplianceProperty.class)).thenReturn(Optional.of(true));
 
     final Set<ExecutorRepresenter> executorRepresenterList = new 
HashSet<>(Arrays.asList(a0, a1));
 
-    final Set<ExecutorRepresenter> candidateExecutors =
-        schedulingPolicy.filterExecutorRepresenters(executorRepresenterList, 
task);
+    final Set<ExecutorRepresenter> candidateExecutors = 
executorRepresenterList.stream()
+        .filter(e -> schedulingConstraint.testSchedulability(e, task))
+        .collect(Collectors.toSet());
 
-    final Set<ExecutorRepresenter> expectedExecutors = new 
HashSet<>(Arrays.asList(a1));
+    final Set<ExecutorRepresenter> expectedExecutors = 
Collections.singleton(a1);
     assertEquals(expectedExecutors, candidateExecutors);
   }
 }
diff --git 
a/runtime/master/src/test/java/edu/snu/nemo/runtime/master/scheduler/MinOccupancyFirstSchedulingPolicyTest.java
 
b/runtime/master/src/test/java/edu/snu/nemo/runtime/master/scheduler/MinOccupancyFirstSchedulingPolicyTest.java
index 60311a63..bf6ebc8a 100644
--- 
a/runtime/master/src/test/java/edu/snu/nemo/runtime/master/scheduler/MinOccupancyFirstSchedulingPolicyTest.java
+++ 
b/runtime/master/src/test/java/edu/snu/nemo/runtime/master/scheduler/MinOccupancyFirstSchedulingPolicyTest.java
@@ -44,7 +44,7 @@ private static ExecutorRepresenter 
mockExecutorRepresenter(final int numRunningT
   }
 
   @Test
-  public void testRoundRobin() {
+  public void test() {
     final SchedulingPolicy schedulingPolicy = new 
MinOccupancyFirstSchedulingPolicy();
     final ExecutorRepresenter a0 = mockExecutorRepresenter(1);
     final ExecutorRepresenter a1 = mockExecutorRepresenter(2);
@@ -52,13 +52,9 @@ public void testRoundRobin() {
 
     final Task task = mock(Task.class);
 
-    final Set<ExecutorRepresenter> executorRepresenterList = new 
HashSet<>(Arrays.asList(a0, a1, a2));
+    final List<ExecutorRepresenter> executorRepresenterList = 
Arrays.asList(a0, a1, a2);
 
-    final Set<ExecutorRepresenter> candidateExecutors =
-        schedulingPolicy.filterExecutorRepresenters(executorRepresenterList, 
task);
-
-    final Set<ExecutorRepresenter> expectedExecutors = new 
HashSet<>(Arrays.asList(a0));
-    assertEquals(expectedExecutors, candidateExecutors);
+    assertEquals(a0, schedulingPolicy.selectExecutor(executorRepresenterList, 
task));
   }
 }
 
diff --git 
a/runtime/master/src/test/java/edu/snu/nemo/runtime/master/scheduler/SchedulerTestUtil.java
 
b/runtime/master/src/test/java/edu/snu/nemo/runtime/master/scheduler/SchedulerTestUtil.java
index 41f9642c..387c6b90 100644
--- 
a/runtime/master/src/test/java/edu/snu/nemo/runtime/master/scheduler/SchedulerTestUtil.java
+++ 
b/runtime/master/src/test/java/edu/snu/nemo/runtime/master/scheduler/SchedulerTestUtil.java
@@ -16,14 +16,11 @@
 package edu.snu.nemo.runtime.master.scheduler;
 
 import edu.snu.nemo.runtime.common.plan.Stage;
-import edu.snu.nemo.runtime.common.plan.Task;
 import edu.snu.nemo.runtime.common.state.StageState;
 import edu.snu.nemo.runtime.common.state.TaskState;
 import edu.snu.nemo.runtime.master.JobStateManager;
 import edu.snu.nemo.runtime.master.resource.ExecutorRepresenter;
 
-import java.util.Collection;
-import java.util.List;
 import java.util.Optional;
 
 /**
@@ -94,23 +91,4 @@ static void sendTaskStateEventToScheduler(final Scheduler 
scheduler,
     scheduler.onTaskStateReportFromExecutor(scheduledExecutor.getExecutorId(), 
taskId, attemptIdx,
         newState, null, cause);
   }
-
-  static void sendTaskStateEventToScheduler(final Scheduler scheduler,
-                                            final ExecutorRegistry 
executorRegistry,
-                                            final String taskId,
-                                            final TaskState.State newState,
-                                            final int attemptIdx) {
-    sendTaskStateEventToScheduler(scheduler, executorRegistry, taskId, 
newState, attemptIdx, null);
-  }
-
-  static void mockSchedulingBySchedulerRunner(final 
PendingTaskCollectionPointer pendingTaskCollectionPointer,
-                                              final SchedulingPolicy 
schedulingPolicy,
-                                              final JobStateManager 
jobStateManager,
-                                              final ExecutorRegistry 
executorRegistry,
-                                              final boolean 
scheduleOnlyTheFirstStage) {
-    final SchedulerRunner schedulerRunner =
-        new SchedulerRunner(schedulingPolicy, pendingTaskCollectionPointer, 
executorRegistry);
-    schedulerRunner.scheduleJob(jobStateManager);
-    schedulerRunner.doScheduleTaskList();
-  }
 }
diff --git 
a/runtime/master/src/test/java/edu/snu/nemo/runtime/master/scheduler/SourceLocationAwareSchedulingPolicyTest.java
 
b/runtime/master/src/test/java/edu/snu/nemo/runtime/master/scheduler/SourceLocationAwareSchedulingConstraintTest.java
similarity index 83%
rename from 
runtime/master/src/test/java/edu/snu/nemo/runtime/master/scheduler/SourceLocationAwareSchedulingPolicyTest.java
rename to 
runtime/master/src/test/java/edu/snu/nemo/runtime/master/scheduler/SourceLocationAwareSchedulingConstraintTest.java
index 2f538ee9..772c587e 100644
--- 
a/runtime/master/src/test/java/edu/snu/nemo/runtime/master/scheduler/SourceLocationAwareSchedulingPolicyTest.java
+++ 
b/runtime/master/src/test/java/edu/snu/nemo/runtime/master/scheduler/SourceLocationAwareSchedulingConstraintTest.java
@@ -15,6 +15,7 @@
  */
 package edu.snu.nemo.runtime.master.scheduler;
 
+import 
edu.snu.nemo.common.ir.vertex.executionproperty.SourceLocationAwareSchedulingProperty;
 import edu.snu.nemo.runtime.common.plan.Task;
 import edu.snu.nemo.common.ir.Readable;
 import edu.snu.nemo.runtime.master.resource.ExecutorRepresenter;
@@ -25,16 +26,18 @@
 
 import java.util.*;
 import java.util.concurrent.atomic.AtomicInteger;
+import java.util.stream.Collectors;
 
 import static org.junit.Assert.assertEquals;
+import static org.junit.Assert.assertTrue;
 import static org.mockito.Mockito.*;
 
 /**
- * Test cases for {@link SourceLocationAwareSchedulingPolicy}.
+ * Test cases for {@link SourceLocationAwareSchedulingConstraint}.
  */
 @RunWith(PowerMockRunner.class)
 @PrepareForTest({ExecutorRepresenter.class, Task.class, Readable.class})
-public final class SourceLocationAwareSchedulingPolicyTest {
+public final class SourceLocationAwareSchedulingConstraintTest {
   private static final String SITE_0 = "SEOUL";
   private static final String SITE_1 = "JINJU";
   private static final String SITE_2 = "BUSAN";
@@ -46,12 +49,12 @@ private static ExecutorRepresenter 
mockExecutorRepresenter(final String executor
   }
 
   /**
-   * {@link SourceLocationAwareSchedulingPolicy} should fail to schedule a 
{@link Task} when
+   * {@link SourceLocationAwareSchedulingConstraint} should fail to schedule a 
{@link Task} when
    * there are no executors in appropriate location(s).
    */
   @Test
   public void testSourceLocationAwareSchedulingNotAvailable() {
-    final SchedulingPolicy schedulingPolicy = new 
SourceLocationAwareSchedulingPolicy();
+    final SchedulingConstraint schedulingConstraint = new 
SourceLocationAwareSchedulingConstraint();
 
     // Prepare test scenario
     final Task task = CreateTask.withReadablesWithSourceLocations(
@@ -59,16 +62,17 @@ public void testSourceLocationAwareSchedulingNotAvailable() 
{
     final ExecutorRepresenter e0 = mockExecutorRepresenter(SITE_1);
     final ExecutorRepresenter e1 = mockExecutorRepresenter(SITE_1);
 
-    assertEquals(Collections.emptySet(),
-        schedulingPolicy.filterExecutorRepresenters(new 
HashSet<>(Arrays.asList(e0, e1)), task));
+    assertEquals(Collections.emptySet(), Arrays.asList(e0, e1).stream()
+        .filter(e -> schedulingConstraint.testSchedulability(e, task))
+        .collect(Collectors.toSet()));
   }
 
   /**
-   * {@link SourceLocationAwareSchedulingPolicy} should properly schedule TGs 
with multiple source locations.
+   * {@link SourceLocationAwareSchedulingConstraint} should properly schedule 
TGs with multiple source locations.
    */
   @Test
   public void testSourceLocationAwareSchedulingWithMultiSource() {
-    final SchedulingPolicy schedulingPolicy = new 
SourceLocationAwareSchedulingPolicy();
+    final SchedulingConstraint schedulingConstraint = new 
SourceLocationAwareSchedulingConstraint();
     // Prepare test scenario
     final Task task0 = CreateTask.withReadablesWithSourceLocations(
         Collections.singletonList(Collections.singletonList(SITE_1)));
@@ -83,8 +87,7 @@ public void 
testSourceLocationAwareSchedulingWithMultiSource() {
 
     final ExecutorRepresenter e = mockExecutorRepresenter(SITE_1);
     for (final Task task : new HashSet<>(Arrays.asList(task0, task1, task2, 
task3))) {
-      assertEquals(new HashSet<>(Collections.singletonList(e)), 
schedulingPolicy.filterExecutorRepresenters(
-          new HashSet<>(Collections.singletonList(e)), task));
+      assertTrue(schedulingConstraint.testSchedulability(e, task));
     }
   }
 
@@ -103,6 +106,7 @@ private static Task doCreate(final Collection<Readable> 
readables) {
           readable));
       when(mockInstance.getTaskId()).thenReturn(String.format("T-%d", 
taskIndex.getAndIncrement()));
       when(mockInstance.getIrVertexIdToReadable()).thenReturn(readableMap);
+      
when(mockInstance.getPropertyValue(SourceLocationAwareSchedulingProperty.class)).thenReturn(Optional.of(true));
       return mockInstance;
     }
 
diff --git 
a/tests/src/test/java/edu/snu/nemo/runtime/common/plan/PhysicalPlanGeneratorTest.java
 
b/tests/src/test/java/edu/snu/nemo/runtime/common/plan/PhysicalPlanGeneratorTest.java
index f8c4f902..3c74fece 100644
--- 
a/tests/src/test/java/edu/snu/nemo/runtime/common/plan/PhysicalPlanGeneratorTest.java
+++ 
b/tests/src/test/java/edu/snu/nemo/runtime/common/plan/PhysicalPlanGeneratorTest.java
@@ -23,7 +23,7 @@
 import edu.snu.nemo.common.ir.vertex.IRVertex;
 import edu.snu.nemo.common.ir.vertex.OperatorVertex;
 import edu.snu.nemo.common.ir.vertex.executionproperty.ParallelismProperty;
-import 
edu.snu.nemo.common.ir.vertex.executionproperty.ScheduleGroupIndexProperty;
+import edu.snu.nemo.common.ir.vertex.executionproperty.ScheduleGroupProperty;
 import edu.snu.nemo.common.ir.vertex.transform.Transform;
 import edu.snu.nemo.common.test.EmptyComponents;
 import org.apache.reef.tang.Injector;
@@ -63,12 +63,12 @@ public void testSplitScheduleGroupByPullStageEdges() throws 
Exception {
     final Stage s0 = stages.next();
     final Stage s1 = stages.next();
 
-    assertNotEquals(s0.getScheduleGroupIndex(), s1.getScheduleGroupIndex());
+    assertNotEquals(s0.getScheduleGroup(), s1.getScheduleGroup());
   }
 
-  private static final IRVertex newIRVertex(final int scheduleGroupIndex, 
final int parallelism) {
+  private static final IRVertex newIRVertex(final int scheduleGroup, final int 
parallelism) {
     final IRVertex irVertex = new OperatorVertex(EMPTY_TRANSFORM);
-    
irVertex.getExecutionProperties().put(ScheduleGroupIndexProperty.of(scheduleGroupIndex));
+    
irVertex.getExecutionProperties().put(ScheduleGroupProperty.of(scheduleGroup));
     irVertex.getExecutionProperties().put(ParallelismProperty.of(parallelism));
     return irVertex;
   }
diff --git 
a/tests/src/test/java/edu/snu/nemo/runtime/common/plan/StagePartitionerTest.java
 
b/tests/src/test/java/edu/snu/nemo/runtime/common/plan/StagePartitionerTest.java
index a60cf880..515bd951 100644
--- 
a/tests/src/test/java/edu/snu/nemo/runtime/common/plan/StagePartitionerTest.java
+++ 
b/tests/src/test/java/edu/snu/nemo/runtime/common/plan/StagePartitionerTest.java
@@ -24,7 +24,7 @@
 import 
edu.snu.nemo.common.ir.vertex.executionproperty.DynamicOptimizationProperty;
 import 
edu.snu.nemo.common.ir.vertex.executionproperty.ExecutorPlacementProperty;
 import edu.snu.nemo.common.ir.vertex.executionproperty.ParallelismProperty;
-import 
edu.snu.nemo.common.ir.vertex.executionproperty.ScheduleGroupIndexProperty;
+import edu.snu.nemo.common.ir.vertex.executionproperty.ScheduleGroupProperty;
 import edu.snu.nemo.common.ir.vertex.transform.Transform;
 import edu.snu.nemo.common.test.EmptyComponents;
 import org.apache.reef.tang.Tang;
@@ -56,21 +56,21 @@ public void setup() throws InjectionException {
 
   /**
    * @param parallelism {@link ParallelismProperty} value for the new vertex
-   * @param scheduleGroupIndex {@link ScheduleGroupIndexProperty} value for 
the new vertex
+   * @param scheduleGroup {@link ScheduleGroupProperty} value for the new 
vertex
    * @param otherProperties other {@link VertexExecutionProperty} for the new 
vertex
    * @return new {@link IRVertex}
    */
-  private static IRVertex newVertex(final int parallelism, final int 
scheduleGroupIndex,
+  private static IRVertex newVertex(final int parallelism, final int 
scheduleGroup,
                                     final List<VertexExecutionProperty> 
otherProperties) {
     final IRVertex vertex = new OperatorVertex(EMPTY_TRANSFORM);
     vertex.getExecutionProperties().put(ParallelismProperty.of(parallelism));
-    
vertex.getExecutionProperties().put(ScheduleGroupIndexProperty.of(scheduleGroupIndex));
+    
vertex.getExecutionProperties().put(ScheduleGroupProperty.of(scheduleGroup));
     otherProperties.forEach(property -> 
vertex.getExecutionProperties().put(property));
     return vertex;
   }
 
   /**
-   * A simple case where two vertices have common parallelism and 
ScheduleGroupIndex so that get merged into one stage.
+   * A simple case where two vertices have common parallelism and 
ScheduleGroup so that get merged into one stage.
    */
   @Test
   public void testLinear() {
@@ -101,10 +101,10 @@ public void testSplitByParallelism() {
   }
 
   /**
-   * A simple case where two vertices have different ScheduleGroupIndex.
+   * A simple case where two vertices have different ScheduleGroup.
    */
   @Test
-  public void testSplitByScheduleGroupIndex() {
+  public void testSplitByScheduleGroup() {
     final DAGBuilder<IRVertex, IREdge> dagBuilder = new DAGBuilder<>();
     final IRVertex v0 = newVertex(1, 0, Collections.emptyList());
     final IRVertex v1 = newVertex(1, 1, Collections.emptyList());
diff --git 
a/tests/src/test/java/edu/snu/nemo/runtime/master/scheduler/SchedulingConstraintnRegistryTest.java
 
b/tests/src/test/java/edu/snu/nemo/runtime/master/scheduler/SchedulingConstraintnRegistryTest.java
new file mode 100644
index 00000000..443ffca8
--- /dev/null
+++ 
b/tests/src/test/java/edu/snu/nemo/runtime/master/scheduler/SchedulingConstraintnRegistryTest.java
@@ -0,0 +1,50 @@
+/*
+ * Copyright (C) 2018 Seoul National University
+ *
+ * Licensed under the Apache License, Version 2.0 (the "License");
+ * you may not use this file except in compliance with the License.
+ * You may obtain a copy of the License at
+ *
+ *         http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package edu.snu.nemo.runtime.master.scheduler;
+
+import edu.snu.nemo.common.ir.executionproperty.VertexExecutionProperty;
+import 
edu.snu.nemo.common.ir.vertex.executionproperty.ExecutorPlacementProperty;
+import 
edu.snu.nemo.common.ir.vertex.executionproperty.ExecutorSlotComplianceProperty;
+import 
edu.snu.nemo.common.ir.vertex.executionproperty.SourceLocationAwareSchedulingProperty;
+import org.apache.reef.tang.Tang;
+import org.apache.reef.tang.exceptions.InjectionException;
+import org.junit.Test;
+
+import static org.junit.Assert.assertEquals;
+
+/**
+ * Tests {@link SchedulingConstraintRegistry}.
+ */
+public final class SchedulingConstraintnRegistryTest {
+  @Test
+  public void testSchedulingConstraintRegistry() throws InjectionException {
+    final SchedulingConstraintRegistry registry = 
Tang.Factory.getTang().newInjector()
+        .getInstance(SchedulingConstraintRegistry.class);
+    assertEquals(FreeSlotSchedulingConstraint.class, 
getConstraintOf(ExecutorSlotComplianceProperty.class, registry));
+    assertEquals(ContainerTypeAwareSchedulingConstraint.class,
+        getConstraintOf(ExecutorPlacementProperty.class, registry));
+    assertEquals(SourceLocationAwareSchedulingConstraint.class,
+        getConstraintOf(SourceLocationAwareSchedulingProperty.class, 
registry));
+  }
+
+  private static Class<? extends SchedulingConstraint> getConstraintOf(
+      final Class<? extends VertexExecutionProperty> property, final 
SchedulingConstraintRegistry registry) {
+    return registry.get(property)
+        .orElseThrow(() -> new RuntimeException(String.format(
+            "No SchedulingConstraint found for property %s", property)))
+        .getClass();
+  }
+}
diff --git 
a/tests/src/test/java/edu/snu/nemo/tests/compiler/optimizer/pass/compiletime/annotating/DefaultScheduleGroupPassTest.java
 
b/tests/src/test/java/edu/snu/nemo/tests/compiler/optimizer/pass/compiletime/annotating/DefaultScheduleGroupPassTest.java
index 5cf95b5d..33e4988c 100644
--- 
a/tests/src/test/java/edu/snu/nemo/tests/compiler/optimizer/pass/compiletime/annotating/DefaultScheduleGroupPassTest.java
+++ 
b/tests/src/test/java/edu/snu/nemo/tests/compiler/optimizer/pass/compiletime/annotating/DefaultScheduleGroupPassTest.java
@@ -24,7 +24,7 @@
 import edu.snu.nemo.common.ir.edge.executionproperty.DataFlowModelProperty;
 import edu.snu.nemo.common.ir.vertex.IRVertex;
 import edu.snu.nemo.common.ir.vertex.OperatorVertex;
-import 
edu.snu.nemo.common.ir.vertex.executionproperty.ScheduleGroupIndexProperty;
+import edu.snu.nemo.common.ir.vertex.executionproperty.ScheduleGroupProperty;
 import edu.snu.nemo.common.ir.vertex.transform.Transform;
 import edu.snu.nemo.common.test.EmptyComponents;
 import edu.snu.nemo.compiler.optimizer.CompiletimeOptimizer;
@@ -54,7 +54,7 @@
   @Test
   public void testAnnotatingPass() {
     final AnnotatingPass scheduleGroupPass = new DefaultScheduleGroupPass();
-    assertEquals(ScheduleGroupIndexProperty.class, 
scheduleGroupPass.getExecutionPropertyToModify());
+    assertEquals(ScheduleGroupProperty.class, 
scheduleGroupPass.getExecutionPropertyToModify());
   }
 
   /**
@@ -67,11 +67,11 @@ public void testTopologicalOrdering() throws Exception {
         new TestPolicy(), "");
 
     for (final IRVertex irVertex : processedDAG.getTopologicalSort()) {
-      final Integer currentScheduleGroupIndex = 
irVertex.getPropertyValue(ScheduleGroupIndexProperty.class).get();
-      final Integer largestScheduleGroupIndexOfParent = 
processedDAG.getParents(irVertex.getId()).stream()
-          .mapToInt(v -> 
v.getPropertyValue(ScheduleGroupIndexProperty.class).get())
+      final Integer currentScheduleGroup = 
irVertex.getPropertyValue(ScheduleGroupProperty.class).get();
+      final Integer largestScheduleGroupOfParent = 
processedDAG.getParents(irVertex.getId()).stream()
+          .mapToInt(v -> v.getPropertyValue(ScheduleGroupProperty.class).get())
           .max().orElse(0);
-      assertTrue(currentScheduleGroupIndex >= 
largestScheduleGroupIndexOfParent);
+      assertTrue(currentScheduleGroup >= largestScheduleGroupOfParent);
     }
   }
 
@@ -155,31 +155,31 @@ public void testTopologicalOrdering() throws Exception {
   }
 
   /**
-   * Asserts that the {@link ScheduleGroupIndexProperty} is equal to {@code 
expected}.
+   * Asserts that the {@link ScheduleGroupProperty} is equal to {@code 
expected}.
    * @param expected the expected property value
    * @param vertex the vertex to test
    */
-  private static void assertScheduleGroupIndex(final int expected, final 
IRVertex vertex) {
-    assertEquals(expected, getScheduleGroupIndex(vertex));
+  private static void assertScheduleGroup(final int expected, final IRVertex 
vertex) {
+    assertEquals(expected, getScheduleGroup(vertex));
   }
 
   /**
    * @param vertex a vertex
-   * @return {@link ScheduleGroupIndexProperty} of {@code vertex}
+   * @return {@link ScheduleGroupProperty} of {@code vertex}
    */
-  private static int getScheduleGroupIndex(final IRVertex vertex) {
-    return vertex.getPropertyValue(ScheduleGroupIndexProperty.class)
+  private static int getScheduleGroup(final IRVertex vertex) {
+    return vertex.getPropertyValue(ScheduleGroupProperty.class)
         .orElseThrow(() -> new RuntimeException(String.format("ScheduleGroup 
not set for %s", vertex.getId())));
   }
 
   /**
-   * Ensures that all vertices in {@code vertices} have different {@link 
ScheduleGroupIndexProperty} value.
+   * Ensures that all vertices in {@code vertices} have different {@link 
ScheduleGroupProperty} value.
    * @param vertices vertices to test
    */
-  private static void assertDifferentScheduleGroupIndex(final 
Collection<IRVertex> vertices) {
+  private static void assertDifferentScheduleGroup(final Collection<IRVertex> 
vertices) {
     final Set<Integer> indices = new HashSet<>();
     vertices.forEach(v -> {
-      final int idx = getScheduleGroupIndex(v);
+      final int idx = getScheduleGroup(v);
       assertFalse(indices.contains(idx));
       indices.add(idx);
     });
@@ -194,7 +194,7 @@ public void testBranch() {
     final Pair<DAG<IRVertex, IREdge>, List<IRVertex>> dag
         = generateBranchDAG(DataCommunicationPatternProperty.Value.OneToOne, 
DataFlowModelProperty.Value.Pull);
     pass.apply(dag.left());
-    dag.right().forEach(v -> assertScheduleGroupIndex(0, v));
+    dag.right().forEach(v -> assertScheduleGroup(0, v));
   }
 
   /**
@@ -206,12 +206,12 @@ public void testBranchWhenMultipleInEdgeNotAllowed() {
     final Pair<DAG<IRVertex, IREdge>, List<IRVertex>> dag
         = generateBranchDAG(DataCommunicationPatternProperty.Value.OneToOne, 
DataFlowModelProperty.Value.Pull);
     pass.apply(dag.left());
-    dag.right().subList(0, 4).forEach(v -> assertScheduleGroupIndex(0, v));
-    dag.right().subList(4, 5).forEach(v -> assertScheduleGroupIndex(1, v));
+    dag.right().subList(0, 4).forEach(v -> assertScheduleGroup(0, v));
+    dag.right().subList(4, 5).forEach(v -> assertScheduleGroup(1, v));
   }
 
   /**
-   * Test scenario to determine whether push edges properly enforces same 
scheduleGroupIndex or not.
+   * Test scenario to determine whether push edges properly enforces same 
scheduleGroup or not.
    */
   @Test
   public void testBranchWithPush() {
@@ -219,7 +219,7 @@ public void testBranchWithPush() {
     final Pair<DAG<IRVertex, IREdge>, List<IRVertex>> dag
         = generateBranchDAG(DataCommunicationPatternProperty.Value.Shuffle, 
DataFlowModelProperty.Value.Push);
     pass.apply(dag.left());
-    dag.right().forEach(v -> assertScheduleGroupIndex(0, v));
+    dag.right().forEach(v -> assertScheduleGroup(0, v));
   }
 
   /**
@@ -230,7 +230,7 @@ public void testBranchWithBroadcast() {
     final DefaultScheduleGroupPass pass = new DefaultScheduleGroupPass(false, 
true, true);
     final Pair<DAG<IRVertex, IREdge>, List<IRVertex>> dag
         = generateBranchDAG(DataCommunicationPatternProperty.Value.BroadCast, 
DataFlowModelProperty.Value.Pull);
-    assertDifferentScheduleGroupIndex(pass.apply(dag.left()).getVertices());
+    assertDifferentScheduleGroup(pass.apply(dag.left()).getVertices());
   }
 
   /**
@@ -241,7 +241,7 @@ public void testBranchWithShuffle() {
     final DefaultScheduleGroupPass pass = new DefaultScheduleGroupPass(true, 
false, true);
     final Pair<DAG<IRVertex, IREdge>, List<IRVertex>> dag
         = generateBranchDAG(DataCommunicationPatternProperty.Value.Shuffle, 
DataFlowModelProperty.Value.Pull);
-    assertDifferentScheduleGroupIndex(pass.apply(dag.left()).getVertices());
+    assertDifferentScheduleGroup(pass.apply(dag.left()).getVertices());
   }
 
   /**
@@ -253,11 +253,11 @@ public void testJoin() {
     final Pair<DAG<IRVertex, IREdge>, List<IRVertex>> dag
         = generateJoinDAG(DataCommunicationPatternProperty.Value.OneToOne, 
DataFlowModelProperty.Value.Pull);
     pass.apply(dag.left());
-    final int idxForFirstScheduleGroup = 
getScheduleGroupIndex(dag.right().get(0));
-    final int idxForSecondScheduleGroup = 
getScheduleGroupIndex(dag.right().get(2));
-    dag.right().subList(0, 2).forEach(v -> 
assertScheduleGroupIndex(idxForFirstScheduleGroup, v));
-    dag.right().subList(2, 4).forEach(v -> 
assertScheduleGroupIndex(idxForSecondScheduleGroup, v));
-    dag.right().subList(4, 6).forEach(v -> assertScheduleGroupIndex(2, v));
+    final int idxForFirstScheduleGroup = getScheduleGroup(dag.right().get(0));
+    final int idxForSecondScheduleGroup = getScheduleGroup(dag.right().get(2));
+    dag.right().subList(0, 2).forEach(v -> 
assertScheduleGroup(idxForFirstScheduleGroup, v));
+    dag.right().subList(2, 4).forEach(v -> 
assertScheduleGroup(idxForSecondScheduleGroup, v));
+    dag.right().subList(4, 6).forEach(v -> assertScheduleGroup(2, v));
   }
 
   /**
@@ -269,7 +269,7 @@ public void testJoinWithPush() {
     final Pair<DAG<IRVertex, IREdge>, List<IRVertex>> dag
         = generateJoinDAG(DataCommunicationPatternProperty.Value.OneToOne, 
DataFlowModelProperty.Value.Push);
     pass.apply(dag.left());
-    dag.right().forEach(v -> assertScheduleGroupIndex(0, v));
+    dag.right().forEach(v -> assertScheduleGroup(0, v));
   }
 
   /**
@@ -283,9 +283,9 @@ public void testJoinWithSinglePush() {
     dag.left().getOutgoingEdgesOf(dag.right().get(1)).iterator().next()
         
.getExecutionProperties().put(DataFlowModelProperty.of(DataFlowModelProperty.Value.Pull));
     pass.apply(dag.left());
-    final int idxForFirstScheduleGroup = 
getScheduleGroupIndex(dag.right().get(0));
-    final int idxForSecondScheduleGroup = 
getScheduleGroupIndex(dag.right().get(2));
-    dag.right().subList(0, 2).forEach(v -> 
assertScheduleGroupIndex(idxForFirstScheduleGroup, v));
-    dag.right().subList(2, 6).forEach(v -> 
assertScheduleGroupIndex(idxForSecondScheduleGroup, v));
+    final int idxForFirstScheduleGroup = getScheduleGroup(dag.right().get(0));
+    final int idxForSecondScheduleGroup = getScheduleGroup(dag.right().get(2));
+    dag.right().subList(0, 2).forEach(v -> 
assertScheduleGroup(idxForFirstScheduleGroup, v));
+    dag.right().subList(2, 6).forEach(v -> 
assertScheduleGroup(idxForSecondScheduleGroup, v));
   }
 }
diff --git 
a/tests/src/test/java/edu/snu/nemo/tests/compiler/optimizer/policy/PolicyBuilderTest.java
 
b/tests/src/test/java/edu/snu/nemo/tests/compiler/optimizer/policy/PolicyBuilderTest.java
index b1ea8f83..41edef3f 100644
--- 
a/tests/src/test/java/edu/snu/nemo/tests/compiler/optimizer/policy/PolicyBuilderTest.java
+++ 
b/tests/src/test/java/edu/snu/nemo/tests/compiler/optimizer/policy/PolicyBuilderTest.java
@@ -28,21 +28,21 @@
   @Test
   public void testDisaggregationPolicy() {
     final Policy disaggregationPolicy = new DisaggregationPolicy();
-    assertEquals(14, disaggregationPolicy.getCompileTimePasses().size());
+    assertEquals(16, disaggregationPolicy.getCompileTimePasses().size());
     assertEquals(0, disaggregationPolicy.getRuntimePasses().size());
   }
 
   @Test
   public void testPadoPolicy() {
     final Policy padoPolicy = new PadoPolicy();
-    assertEquals(16, padoPolicy.getCompileTimePasses().size());
+    assertEquals(18, padoPolicy.getCompileTimePasses().size());
     assertEquals(0, padoPolicy.getRuntimePasses().size());
   }
 
   @Test
   public void testDataSkewPolicy() {
     final Policy dataSkewPolicy = new DataSkewPolicy();
-    assertEquals(18, dataSkewPolicy.getCompileTimePasses().size());
+    assertEquals(20, dataSkewPolicy.getCompileTimePasses().size());
     assertEquals(1, dataSkewPolicy.getRuntimePasses().size());
   }
 


 

----------------------------------------------------------------
This is an automated message from the Apache Git Service.
To respond to the message, please log on GitHub and use the
URL above to go to the specific comment.
 
For queries about this service, please contact Infrastructure at:
[email protected]


With regards,
Apache Git Services

Reply via email to