This is an automated email from the ASF dual-hosted git repository.

johnyangk pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/incubator-nemo.git


The following commit(s) were added to refs/heads/master by this push:
     new d5df47c  [NEMO-168] Fine-grained Execution Property Check (#101)
d5df47c is described below

commit d5df47c11eadd7f20d4223ca411682809b88917c
Author: Won Wook SONG <[email protected]>
AuthorDate: Fri Aug 10 17:57:40 2018 +0900

    [NEMO-168] Fine-grained Execution Property Check (#101)
    
    JIRA: [NEMO-168: Fine-grained Execution Property 
Check](https://issues.apache.org/jira/projects/NEMO/issues/NEMO-168)
    
    **Major changes:**
    - Implements setPropertyPermanently method, to be able to permanently set 
execution properties.
    - Implements finalization of execution properties in the execution property 
map.
    - Removed 'strict checking' mode from PolicyBuilder, as it is no longer 
necessary.
    
    **Minor changes to note:**
    - Fixes overused RuntimeExceptions into appropriate exceptions.
    - Type specifications where it was neglected.
    - Replaced calling getExecutionProperty then manually doing something into 
getPropertyValue and setProperty methods.
    
    **Tests for the changes:**
    - I've added a buildEmptyDAG method to be able to create a mock DAG that 
can be used throughout the tests.
    - I've also added unit tests to test the methods newly implemented in the 
execution property map.
    - I've added appropriate unit tests that expect particular outputs from 
various policies.
    
    **Other comments:**
    - N/A
    
    resolves 
[NEMO-168](https://issues.apache.org/jira/projects/NEMO/issues/NEMO-168)
---
 .../java/edu/snu/nemo/common/dag/DAGBuilder.java   |  11 +-
 .../java/edu/snu/nemo/common/ir/edge/IREdge.java   |  18 +++-
 .../ir/executionproperty/ExecutionPropertyMap.java |  40 +++++--
 .../edu/snu/nemo/common/ir/vertex/IRVertex.java    |  17 ++-
 .../edu/snu/nemo/common/test/EmptyComponents.java  |  28 +++++
 .../ExecutionPropertyMapTest.java                  |  19 ++++
 .../annotating/DefaultDataStorePass.java           |   2 +-
 .../annotating/DefaultScheduleGroupPass.java       |   2 +-
 .../DisaggregationEdgeDataStorePass.java           |   5 +-
 .../annotating/LargeShuffleDataFlowPass.java       |   4 +-
 .../LargeShuffleDataPersistencePass.java           |   2 +-
 .../annotating/LargeShuffleDataStorePass.java      |   5 +-
 .../annotating/LargeShuffleDecoderPass.java        |   2 +-
 .../annotating/LargeShuffleDecompressionPass.java  |   7 +-
 .../annotating/LargeShuffleEncoderPass.java        |   5 +-
 .../annotating/LargeShufflePartitionerPass.java    |   7 +-
 .../annotating/LargeShuffleResourceSlotPass.java   |   6 +-
 .../annotating/LargeSuffleCompressionPass.java     |   7 +-
 .../annotating/ResourceLocalityPass.java           |   4 +-
 .../compiletime/annotating/ResourceSitePass.java   |   8 +-
 .../compiletime/annotating/ResourceSlotPass.java   |   4 +-
 .../compiletime/annotating/SkewDataStorePass.java  |   4 +-
 .../annotating/SkewMetricCollectionPass.java       |   3 +-
 .../annotating/SkewPartitionerPass.java            |   2 +-
 .../annotating/SkewResourceSkewedDataPass.java     |   4 +-
 .../annotating/TransientResourceDataFlowPass.java  |   4 +-
 .../annotating/TransientResourceDataStorePass.java |   6 +-
 .../annotating/TransientResourcePriorityPass.java  |   6 +-
 .../compiler/optimizer/policy/BasicPullPolicy.java |   2 +-
 .../compiler/optimizer/policy/BasicPushPolicy.java |   2 +-
 .../policy/ConditionalLargeShufflePolicy.java      |   2 +-
 .../compiler/optimizer/policy/DataSkewPolicy.java  |   4 +-
 .../compiler/optimizer/policy/DefaultPolicy.java   |   2 +-
 .../policy/DefaultPolicyWithSeparatePass.java      |   2 +-
 .../optimizer/policy/DisaggregationPolicy.java     |   6 +-
 .../optimizer/policy/LargeShufflePolicy.java       |   8 +-
 .../compiler/optimizer/policy/PolicyBuilder.java   |  18 ----
 .../optimizer/policy/TransientResourcePolicy.java  |   2 +-
 .../compiler/optimizer/policy/PolicyImplTest.java  | 115 +++++++++++++++++++++
 .../annotating/DefaultScheduleGroupPassTest.java   |   6 +-
 .../java/edu/snu/nemo/examples/spark/MRJava.java   |   2 +-
 .../runtime/common/plan/PhysicalPlanGenerator.java |   3 +-
 .../nemo/runtime/common/plan/StagePartitioner.java |   6 +-
 .../common/plan/PhysicalPlanGeneratorTest.java     |   6 +-
 .../runtime/common/plan/StagePartitionerTest.java  |   6 +-
 .../executor/datatransfer/DataTransferTest.java    |  12 +--
 46 files changed, 308 insertions(+), 128 deletions(-)

diff --git a/common/src/main/java/edu/snu/nemo/common/dag/DAGBuilder.java 
b/common/src/main/java/edu/snu/nemo/common/dag/DAGBuilder.java
index ffdd468..295b56c 100644
--- a/common/src/main/java/edu/snu/nemo/common/dag/DAGBuilder.java
+++ b/common/src/main/java/edu/snu/nemo/common/dag/DAGBuilder.java
@@ -15,6 +15,7 @@
  */
 package edu.snu.nemo.common.dag;
 
+import edu.snu.nemo.common.exception.CompileTimeOptimizationException;
 import edu.snu.nemo.common.ir.edge.IREdge;
 import edu.snu.nemo.common.ir.edge.executionproperty.DataFlowProperty;
 import edu.snu.nemo.common.ir.edge.executionproperty.MetricCollectionProperty;
@@ -193,7 +194,7 @@ public final class DAGBuilder<V extends Vertex, E extends 
Edge<V>> implements Se
     stack.push(vertex);
     // When we encounter a vertex that we've already gone through, then there 
is a cycle.
     if 
(outgoingEdges.get(vertex).stream().map(Edge::getDst).anyMatch(stack::contains))
 {
-      throw new RuntimeException("DAG contains a cycle");
+      throw new CompileTimeOptimizationException("DAG contains a cycle");
     } else {
       outgoingEdges.get(vertex).stream().map(Edge::getDst)
           .filter(v -> !visited.contains(v))
@@ -215,7 +216,7 @@ public final class DAGBuilder<V extends Vertex, E extends 
Edge<V>> implements Se
           .filter(v -> !(v instanceof SourceVertex))
           .map(V::getId)
           .collect(Collectors.toList()).toString();
-      throw new RuntimeException("DAG source check failed while building DAG. 
" + problematicVertices);
+      throw new CompileTimeOptimizationException("DAG source check failed 
while building DAG. " + problematicVertices);
     }
   }
 
@@ -232,7 +233,7 @@ public final class DAGBuilder<V extends Vertex, E extends 
Edge<V>> implements Se
       final String problematicVertices = verticesToObserve.get().filter(v ->
           !(v instanceof OperatorVertex || v instanceof LoopVertex))
           .map(V::getId).collect(Collectors.toList()).toString();
-      throw new RuntimeException("DAG sink check failed while building DAG: " 
+ problematicVertices);
+      throw new CompileTimeOptimizationException("DAG sink check failed while 
building DAG: " + problematicVertices);
     }
   }
 
@@ -245,7 +246,7 @@ public final class DAGBuilder<V extends Vertex, E extends 
Edge<V>> implements Se
         .filter(e -> Boolean.TRUE.equals(e.isSideInput()))
         .filter(e -> 
DataFlowProperty.Value.Push.equals(e.getPropertyValue(DataFlowProperty.class).get()))
         .forEach(e -> {
-          throw new RuntimeException("DAG execution property check: "
+          throw new CompileTimeOptimizationException("DAG execution property 
check: "
               + "SideInput edge is not compatible with push" + e.getId());
         }));
     // DataSizeMetricCollection is not compatible with Push (All data have to 
be stored before the data collection)
@@ -254,7 +255,7 @@ public final class DAGBuilder<V extends Vertex, E extends 
Edge<V>> implements Se
                       
.equals(e.getPropertyValue(MetricCollectionProperty.class)))
         .filter(e -> 
DataFlowProperty.Value.Push.equals(e.getPropertyValue(DataFlowProperty.class).get()))
         .forEach(e -> {
-          throw new RuntimeException("DAG execution property check: "
+          throw new CompileTimeOptimizationException("DAG execution property 
check: "
               + "DataSizeMetricCollection edge is not compatible with push" + 
e.getId());
         }));
   }
diff --git a/common/src/main/java/edu/snu/nemo/common/ir/edge/IREdge.java 
b/common/src/main/java/edu/snu/nemo/common/ir/edge/IREdge.java
index 080fc4b..d2c75e3 100644
--- a/common/src/main/java/edu/snu/nemo/common/ir/edge/IREdge.java
+++ b/common/src/main/java/edu/snu/nemo/common/ir/edge/IREdge.java
@@ -26,7 +26,6 @@ import org.apache.commons.lang3.builder.HashCodeBuilder;
 
 import java.io.Serializable;
 import java.util.Optional;
-import java.util.function.Consumer;
 
 /**
  * Physical execution plan of intermediate data movement.
@@ -68,12 +67,21 @@ public final class IREdge extends Edge<IRVertex> {
 
   /**
    * Set an executionProperty of the IREdge.
-   *
    * @param executionProperty the execution property.
    * @return the IREdge with the execution property set.
    */
   public IREdge setProperty(final EdgeExecutionProperty<?> executionProperty) {
-    executionProperties.put(executionProperty);
+    executionProperties.put(executionProperty, false);
+    return this;
+  }
+
+  /**
+   * Set an executionProperty of the IREdge, permanently.
+   * @param executionProperty the execution property.
+   * @return the IREdge with the execution property set.
+   */
+  public IREdge setPropertyPermanently(final EdgeExecutionProperty<?> 
executionProperty) {
+    executionProperties.put(executionProperty, true);
     return this;
   }
 
@@ -92,7 +100,7 @@ public final class IREdge extends Edge<IRVertex> {
   /**
    * @return the ExecutionPropertyMap of the IREdge.
    */
-  public ExecutionPropertyMap getExecutionProperties() {
+  public ExecutionPropertyMap<EdgeExecutionProperty> getExecutionProperties() {
     return executionProperties;
   }
 
@@ -117,7 +125,7 @@ public final class IREdge extends Edge<IRVertex> {
    * @param thatEdge the edge to copy executionProperties to.
    */
   public void copyExecutionPropertiesTo(final IREdge thatEdge) {
-    
this.getExecutionProperties().forEachProperties((Consumer<EdgeExecutionProperty>)
 thatEdge::setProperty);
+    this.getExecutionProperties().forEachProperties(thatEdge::setProperty);
   }
 
   @Override
diff --git 
a/common/src/main/java/edu/snu/nemo/common/ir/executionproperty/ExecutionPropertyMap.java
 
b/common/src/main/java/edu/snu/nemo/common/ir/executionproperty/ExecutionPropertyMap.java
index a2ef8d9..869f725 100644
--- 
a/common/src/main/java/edu/snu/nemo/common/ir/executionproperty/ExecutionPropertyMap.java
+++ 
b/common/src/main/java/edu/snu/nemo/common/ir/executionproperty/ExecutionPropertyMap.java
@@ -15,11 +15,11 @@
  */
 package edu.snu.nemo.common.ir.executionproperty;
 
+import edu.snu.nemo.common.coder.DecoderFactory;
+import edu.snu.nemo.common.coder.EncoderFactory;
+import edu.snu.nemo.common.exception.CompileTimeOptimizationException;
 import edu.snu.nemo.common.ir.edge.IREdge;
-import 
edu.snu.nemo.common.ir.edge.executionproperty.CommunicationPatternProperty;
-import edu.snu.nemo.common.ir.edge.executionproperty.DataFlowProperty;
-import edu.snu.nemo.common.ir.edge.executionproperty.DataStoreProperty;
-import edu.snu.nemo.common.ir.edge.executionproperty.PartitionerProperty;
+import edu.snu.nemo.common.ir.edge.executionproperty.*;
 import edu.snu.nemo.common.ir.vertex.IRVertex;
 import 
edu.snu.nemo.common.ir.vertex.executionproperty.ResourcePriorityProperty;
 import edu.snu.nemo.common.ir.vertex.executionproperty.ParallelismProperty;
@@ -42,6 +42,7 @@ import java.util.stream.Stream;
 public final class ExecutionPropertyMap<T extends ExecutionProperty> 
implements Serializable {
   private final String id;
   private final Map<Class<? extends ExecutionProperty>, T> properties = new 
HashMap<>();
+  private final Set<Class<? extends ExecutionProperty>> finalizedProperties = 
new HashSet<>();
 
   /**
    * Constructor for ExecutionPropertyMap class.
@@ -64,6 +65,8 @@ public final class ExecutionPropertyMap<T extends 
ExecutionProperty> implements
     final ExecutionPropertyMap<EdgeExecutionProperty> map = new 
ExecutionPropertyMap<>(irEdge.getId());
     map.put(CommunicationPatternProperty.of(commPattern));
     map.put(DataFlowProperty.of(DataFlowProperty.Value.Pull));
+    map.put(EncoderProperty.of(EncoderFactory.DUMMY_ENCODER_FACTORY));
+    map.put(DecoderProperty.of(DecoderFactory.DUMMY_DECODER_FACTORY));
     switch (commPattern) {
       case Shuffle:
         
map.put(PartitionerProperty.of(PartitionerProperty.Value.HashPartitioner));
@@ -105,11 +108,36 @@ public final class ExecutionPropertyMap<T extends 
ExecutionProperty> implements
   }
 
   /**
-   * Put the given execution property  in the ExecutionPropertyMap.
+   * Put the given execution property  in the ExecutionPropertyMap. By 
default, it does not finalize the property.
    * @param executionProperty execution property to insert.
-   * @return the previous execution property, or null if there was no 
execution property with the specified property key
+   * @return the previous execution property, or null if there was no 
execution property
+   * with the specified property key.
    */
   public T put(final T executionProperty) {
+    return this.put(executionProperty, false);
+  }
+
+  /**
+   * Put the given execution property in the ExecutionPropertyMap.
+   * @param executionProperty execution property to insert.
+   * @param finalize whether or not to finalize the execution property.
+   * @return the previous execution property, or null if there was no 
execution property
+   * with the specified property key.
+   */
+  public T put(final T executionProperty, final Boolean finalize) {
+    // check if the property has been already finalized. We don't mind 
overwriting an identical value.
+    if (finalizedProperties.contains(executionProperty.getClass())
+        && properties.get(executionProperty.getClass()) != null
+        && 
!properties.get(executionProperty.getClass()).equals(executionProperty)) {
+      throw new CompileTimeOptimizationException("Trying to overwrite a 
finalized execution property ["
+          + executionProperty.getClass().getSimpleName() + "] from ["
+          + properties.get(executionProperty.getClass()).getValue() + "] to [" 
+ executionProperty.getValue() + "]");
+    }
+
+    // start the actual put process.
+    if (finalize) {
+      this.finalizedProperties.add(executionProperty.getClass());
+    }
     return properties.put(executionProperty.getClass(), executionProperty);
   }
 
diff --git a/common/src/main/java/edu/snu/nemo/common/ir/vertex/IRVertex.java 
b/common/src/main/java/edu/snu/nemo/common/ir/vertex/IRVertex.java
index de7359b..67fc42d 100644
--- a/common/src/main/java/edu/snu/nemo/common/ir/vertex/IRVertex.java
+++ b/common/src/main/java/edu/snu/nemo/common/ir/vertex/IRVertex.java
@@ -22,7 +22,6 @@ import 
edu.snu.nemo.common.ir.executionproperty.VertexExecutionProperty;
 
 import java.io.Serializable;
 import java.util.Optional;
-import java.util.function.Consumer;
 
 /**
  * The basic unit of operation in a dataflow program, as well as the most 
important data structure in Nemo.
@@ -49,7 +48,7 @@ public abstract class IRVertex extends Vertex {
    * @param thatVertex the edge to copy executionProperties to.
    */
   public final void copyExecutionPropertiesTo(final IRVertex thatVertex) {
-    
this.getExecutionProperties().forEachProperties((Consumer<VertexExecutionProperty>)
 thatVertex::setProperty);
+    this.getExecutionProperties().forEachProperties(thatVertex::setProperty);
   }
 
   /**
@@ -58,7 +57,17 @@ public abstract class IRVertex extends Vertex {
    * @return the IRVertex with the execution property set.
    */
   public final IRVertex setProperty(final VertexExecutionProperty<?> 
executionProperty) {
-    executionProperties.put(executionProperty);
+    executionProperties.put(executionProperty, false);
+    return this;
+  }
+
+  /**
+   * Set an executionProperty of the IRVertex, permanently.
+   * @param executionProperty new execution property.
+   * @return the IRVertex with the execution property set.
+   */
+  public final IRVertex setPropertyPermanently(final 
VertexExecutionProperty<?> executionProperty) {
+    executionProperties.put(executionProperty, true);
     return this;
   }
 
@@ -76,7 +85,7 @@ public abstract class IRVertex extends Vertex {
   /**
    * @return the ExecutionPropertyMap of the IRVertex.
    */
-  public final ExecutionPropertyMap getExecutionProperties() {
+  public final ExecutionPropertyMap<VertexExecutionProperty> 
getExecutionProperties() {
     return executionProperties;
   }
 
diff --git a/common/src/main/java/edu/snu/nemo/common/test/EmptyComponents.java 
b/common/src/main/java/edu/snu/nemo/common/test/EmptyComponents.java
index 5af5688..88a768a 100644
--- a/common/src/main/java/edu/snu/nemo/common/test/EmptyComponents.java
+++ b/common/src/main/java/edu/snu/nemo/common/test/EmptyComponents.java
@@ -15,8 +15,14 @@
  */
 package edu.snu.nemo.common.test;
 
+import edu.snu.nemo.common.dag.DAG;
+import edu.snu.nemo.common.dag.DAGBuilder;
 import edu.snu.nemo.common.ir.OutputCollector;
 import edu.snu.nemo.common.ir.Readable;
+import edu.snu.nemo.common.ir.edge.IREdge;
+import 
edu.snu.nemo.common.ir.edge.executionproperty.CommunicationPatternProperty;
+import edu.snu.nemo.common.ir.vertex.IRVertex;
+import edu.snu.nemo.common.ir.vertex.OperatorVertex;
 import edu.snu.nemo.common.ir.vertex.SourceVertex;
 import edu.snu.nemo.common.ir.vertex.transform.Transform;
 
@@ -32,6 +38,28 @@ public final class EmptyComponents {
   private EmptyComponents() {
   }
 
+  public static DAG<IRVertex, IREdge> buildEmptyDAG() {
+    DAGBuilder<IRVertex, IREdge> dagBuilder = new DAGBuilder<>();
+    final IRVertex s = new EmptyComponents.EmptySourceVertex<>("s");
+    final IRVertex t1 = new OperatorVertex(new 
EmptyComponents.EmptyTransform("t1"));
+    final IRVertex t2 = new OperatorVertex(new 
EmptyComponents.EmptyTransform("t2"));
+    final IRVertex t3 = new OperatorVertex(new 
EmptyComponents.EmptyTransform("t3"));
+    final IRVertex t4 = new OperatorVertex(new 
EmptyComponents.EmptyTransform("t4"));
+    final IRVertex t5 = new OperatorVertex(new 
EmptyComponents.EmptyTransform("t5"));
+    dagBuilder.addVertex(s);
+    dagBuilder.addVertex(t1);
+    dagBuilder.addVertex(t2);
+    dagBuilder.addVertex(t3);
+    dagBuilder.addVertex(t4);
+    dagBuilder.addVertex(t5);
+    dagBuilder.connectVertices(new 
IREdge(CommunicationPatternProperty.Value.OneToOne, s, t1));
+    dagBuilder.connectVertices(new 
IREdge(CommunicationPatternProperty.Value.Shuffle, t1, t2));
+    dagBuilder.connectVertices(new 
IREdge(CommunicationPatternProperty.Value.OneToOne, t2, t3));
+    dagBuilder.connectVertices(new 
IREdge(CommunicationPatternProperty.Value.Shuffle, t3, t4));
+    dagBuilder.connectVertices(new 
IREdge(CommunicationPatternProperty.Value.OneToOne, t2, t5));
+    return dagBuilder.build();
+  }
+
   /**
    * An empty transform.
    * @param <I> input type.
diff --git 
a/common/src/test/java/edu/snu/nemo/common/ir/executionproperty/ExecutionPropertyMapTest.java
 
b/common/src/test/java/edu/snu/nemo/common/ir/executionproperty/ExecutionPropertyMapTest.java
index d5aba03..fc2dc0c 100644
--- 
a/common/src/test/java/edu/snu/nemo/common/ir/executionproperty/ExecutionPropertyMapTest.java
+++ 
b/common/src/test/java/edu/snu/nemo/common/ir/executionproperty/ExecutionPropertyMapTest.java
@@ -17,6 +17,7 @@ package edu.snu.nemo.common.ir.executionproperty;
 
 import edu.snu.nemo.common.coder.DecoderFactory;
 import edu.snu.nemo.common.coder.EncoderFactory;
+import edu.snu.nemo.common.exception.CompileTimeOptimizationException;
 import edu.snu.nemo.common.ir.edge.IREdge;
 import edu.snu.nemo.common.ir.edge.executionproperty.*;
 import edu.snu.nemo.common.ir.vertex.IRVertex;
@@ -24,7 +25,9 @@ import edu.snu.nemo.common.ir.vertex.OperatorVertex;
 import edu.snu.nemo.common.ir.vertex.executionproperty.ParallelismProperty;
 import edu.snu.nemo.common.test.EmptyComponents;
 import org.junit.Before;
+import org.junit.Rule;
 import org.junit.Test;
+import org.junit.rules.ExpectedException;
 
 import static org.junit.Assert.assertEquals;
 import static org.junit.Assert.assertFalse;
@@ -102,4 +105,20 @@ public class ExecutionPropertyMapTest {
     assertTrue(map0.equals(map1));
     assertTrue(map1.equals(map0));
   }
+
+  @Rule
+  public final ExpectedException expectedException = ExpectedException.none();
+
+  @Test
+  public void testFinalizedProperty() {
+    // this should work without a problem..
+    final ExecutionPropertyMap<ExecutionProperty> map = new 
ExecutionPropertyMap<>("map");
+    map.put(ParallelismProperty.of(1), false);
+    assertEquals(ParallelismProperty.of(1), 
map.put(ParallelismProperty.of(2)));
+    assertEquals(ParallelismProperty.of(2), map.put(ParallelismProperty.of(3), 
true));
+
+    // test exception
+    expectedException.expect(CompileTimeOptimizationException.class);
+    map.put(ParallelismProperty.of(4));
+  }
 }
diff --git 
a/compiler/optimizer/src/main/java/edu/snu/nemo/compiler/optimizer/pass/compiletime/annotating/DefaultDataStorePass.java
 
b/compiler/optimizer/src/main/java/edu/snu/nemo/compiler/optimizer/pass/compiletime/annotating/DefaultDataStorePass.java
index 6cdb126..e40b95a 100644
--- 
a/compiler/optimizer/src/main/java/edu/snu/nemo/compiler/optimizer/pass/compiletime/annotating/DefaultDataStorePass.java
+++ 
b/compiler/optimizer/src/main/java/edu/snu/nemo/compiler/optimizer/pass/compiletime/annotating/DefaultDataStorePass.java
@@ -37,7 +37,7 @@ public final class DefaultDataStorePass extends 
AnnotatingPass {
   public DAG<IRVertex, IREdge> apply(final DAG<IRVertex, IREdge> dag) {
     dag.getVertices().forEach(vertex -> {
       dag.getIncomingEdgesOf(vertex).stream()
-          .filter(edge -> 
!edge.getExecutionProperties().containsKey(DataStoreProperty.class))
+          .filter(edge -> 
!edge.getPropertyValue(DataStoreProperty.class).isPresent())
           .forEach(edge -> edge.setProperty(
               DataStoreProperty.of(DataStoreProperty.Value.LocalFileStore)));
     });
diff --git 
a/compiler/optimizer/src/main/java/edu/snu/nemo/compiler/optimizer/pass/compiletime/annotating/DefaultScheduleGroupPass.java
 
b/compiler/optimizer/src/main/java/edu/snu/nemo/compiler/optimizer/pass/compiletime/annotating/DefaultScheduleGroupPass.java
index 190a552..1aa4dcf 100644
--- 
a/compiler/optimizer/src/main/java/edu/snu/nemo/compiler/optimizer/pass/compiletime/annotating/DefaultScheduleGroupPass.java
+++ 
b/compiler/optimizer/src/main/java/edu/snu/nemo/compiler/optimizer/pass/compiletime/annotating/DefaultScheduleGroupPass.java
@@ -213,7 +213,7 @@ public final class DefaultScheduleGroupPass extends 
AnnotatingPass {
       boolean usedCurrentScheduleGroup = false;
       for (final IRVertex irVertex : scheduleGroup.vertices) {
         if 
(!irVertex.getPropertyValue(ScheduleGroupProperty.class).isPresent()) {
-          
irVertex.getExecutionProperties().put(ScheduleGroupProperty.of(currentScheduleGroup.getValue()));
+          
irVertex.setProperty(ScheduleGroupProperty.of(currentScheduleGroup.getValue()));
           usedCurrentScheduleGroup = true;
         }
       }
diff --git 
a/compiler/optimizer/src/main/java/edu/snu/nemo/compiler/optimizer/pass/compiletime/annotating/DisaggregationEdgeDataStorePass.java
 
b/compiler/optimizer/src/main/java/edu/snu/nemo/compiler/optimizer/pass/compiletime/annotating/DisaggregationEdgeDataStorePass.java
index e498c74..262f0ca 100644
--- 
a/compiler/optimizer/src/main/java/edu/snu/nemo/compiler/optimizer/pass/compiletime/annotating/DisaggregationEdgeDataStorePass.java
+++ 
b/compiler/optimizer/src/main/java/edu/snu/nemo/compiler/optimizer/pass/compiletime/annotating/DisaggregationEdgeDataStorePass.java
@@ -39,9 +39,8 @@ public final class DisaggregationEdgeDataStorePass extends 
AnnotatingPass {
   public DAG<IRVertex, IREdge> apply(final DAG<IRVertex, IREdge> dag) {
     dag.getVertices().forEach(vertex -> { // Initialize the DataStore of the 
DAG with GlusterFileStore.
       final List<IREdge> inEdges = dag.getIncomingEdgesOf(vertex);
-      inEdges.forEach(edge -> {
-        
edge.setProperty(DataStoreProperty.of(DataStoreProperty.Value.GlusterFileStore));
-      });
+      inEdges.forEach(edge ->
+        
edge.setPropertyPermanently(DataStoreProperty.of(DataStoreProperty.Value.GlusterFileStore)));
     });
     return dag;
   }
diff --git 
a/compiler/optimizer/src/main/java/edu/snu/nemo/compiler/optimizer/pass/compiletime/annotating/LargeShuffleDataFlowPass.java
 
b/compiler/optimizer/src/main/java/edu/snu/nemo/compiler/optimizer/pass/compiletime/annotating/LargeShuffleDataFlowPass.java
index 16a3b81..d5d18ac 100644
--- 
a/compiler/optimizer/src/main/java/edu/snu/nemo/compiler/optimizer/pass/compiletime/annotating/LargeShuffleDataFlowPass.java
+++ 
b/compiler/optimizer/src/main/java/edu/snu/nemo/compiler/optimizer/pass/compiletime/annotating/LargeShuffleDataFlowPass.java
@@ -43,9 +43,9 @@ public final class LargeShuffleDataFlowPass extends 
AnnotatingPass {
       inEdges.forEach(edge -> {
         if (edge.getPropertyValue(CommunicationPatternProperty.class).get()
             .equals(CommunicationPatternProperty.Value.Shuffle)) {
-          edge.setProperty(DataFlowProperty.of(DataFlowProperty.Value.Push)); 
// Push to the merger vertex.
+          
edge.setPropertyPermanently(DataFlowProperty.of(DataFlowProperty.Value.Push)); 
// Push to the merger vertex.
         } else {
-          edge.setProperty(DataFlowProperty.of(DataFlowProperty.Value.Pull));
+          
edge.setPropertyPermanently(DataFlowProperty.of(DataFlowProperty.Value.Pull));
         }
       });
     });
diff --git 
a/compiler/optimizer/src/main/java/edu/snu/nemo/compiler/optimizer/pass/compiletime/annotating/LargeShuffleDataPersistencePass.java
 
b/compiler/optimizer/src/main/java/edu/snu/nemo/compiler/optimizer/pass/compiletime/annotating/LargeShuffleDataPersistencePass.java
index 7ebad49..d7901f2 100644
--- 
a/compiler/optimizer/src/main/java/edu/snu/nemo/compiler/optimizer/pass/compiletime/annotating/LargeShuffleDataPersistencePass.java
+++ 
b/compiler/optimizer/src/main/java/edu/snu/nemo/compiler/optimizer/pass/compiletime/annotating/LargeShuffleDataPersistencePass.java
@@ -42,7 +42,7 @@ public final class LargeShuffleDataPersistencePass extends 
AnnotatingPass {
         dag.getIncomingEdgesOf(irVertex).forEach(irEdge -> {
           final DataFlowProperty.Value dataFlowModel = 
irEdge.getPropertyValue(DataFlowProperty.class).get();
           if (DataFlowProperty.Value.Push.equals(dataFlowModel)) {
-            
irEdge.setProperty(DataPersistenceProperty.of(DataPersistenceProperty.Value.Discard));
+            
irEdge.setPropertyPermanently(DataPersistenceProperty.of(DataPersistenceProperty.Value.Discard));
           }
         }));
     return dag;
diff --git 
a/compiler/optimizer/src/main/java/edu/snu/nemo/compiler/optimizer/pass/compiletime/annotating/LargeShuffleDataStorePass.java
 
b/compiler/optimizer/src/main/java/edu/snu/nemo/compiler/optimizer/pass/compiletime/annotating/LargeShuffleDataStorePass.java
index c80f675..40b4bb4 100644
--- 
a/compiler/optimizer/src/main/java/edu/snu/nemo/compiler/optimizer/pass/compiletime/annotating/LargeShuffleDataStorePass.java
+++ 
b/compiler/optimizer/src/main/java/edu/snu/nemo/compiler/optimizer/pass/compiletime/annotating/LargeShuffleDataStorePass.java
@@ -46,13 +46,12 @@ public final class LargeShuffleDataStorePass extends 
AnnotatingPass {
           if (CommunicationPatternProperty.Value.Shuffle
           
.equals(edgeToMerger.getPropertyValue(CommunicationPatternProperty.class).get()))
 {
             // Pass data through memory to the merger vertex.
-            edgeToMerger.setProperty(DataStoreProperty
-                .of(DataStoreProperty.Value.SerializedMemoryStore));
+            
edgeToMerger.setPropertyPermanently(DataStoreProperty.of(DataStoreProperty.Value.SerializedMemoryStore));
           }
         });
         dag.getOutgoingEdgesOf(vertex).forEach(edgeFromMerger ->
             // Merge the input data and write it immediately to the remote 
disk.
-            
edgeFromMerger.setProperty(DataStoreProperty.of(DataStoreProperty.Value.LocalFileStore)));
+            
edgeFromMerger.setPropertyPermanently(DataStoreProperty.of(DataStoreProperty.Value.LocalFileStore)));
       }
     });
     return dag;
diff --git 
a/compiler/optimizer/src/main/java/edu/snu/nemo/compiler/optimizer/pass/compiletime/annotating/LargeShuffleDecoderPass.java
 
b/compiler/optimizer/src/main/java/edu/snu/nemo/compiler/optimizer/pass/compiletime/annotating/LargeShuffleDecoderPass.java
index ed66bcd..978aff1 100644
--- 
a/compiler/optimizer/src/main/java/edu/snu/nemo/compiler/optimizer/pass/compiletime/annotating/LargeShuffleDecoderPass.java
+++ 
b/compiler/optimizer/src/main/java/edu/snu/nemo/compiler/optimizer/pass/compiletime/annotating/LargeShuffleDecoderPass.java
@@ -45,7 +45,7 @@ public final class LargeShuffleDecoderPass extends 
AnnotatingPass {
       inEdges.forEach(edge -> {
         if (edge.getPropertyValue(CommunicationPatternProperty.class).get()
             .equals(CommunicationPatternProperty.Value.Shuffle)) {
-          edge.setProperty(DecoderProperty.of(BytesDecoderFactory.of()));
+          
edge.setPropertyPermanently(DecoderProperty.of(BytesDecoderFactory.of()));
         }
       });
     });
diff --git 
a/compiler/optimizer/src/main/java/edu/snu/nemo/compiler/optimizer/pass/compiletime/annotating/LargeShuffleDecompressionPass.java
 
b/compiler/optimizer/src/main/java/edu/snu/nemo/compiler/optimizer/pass/compiletime/annotating/LargeShuffleDecompressionPass.java
index e1b31f2..625b2c9 100644
--- 
a/compiler/optimizer/src/main/java/edu/snu/nemo/compiler/optimizer/pass/compiletime/annotating/LargeShuffleDecompressionPass.java
+++ 
b/compiler/optimizer/src/main/java/edu/snu/nemo/compiler/optimizer/pass/compiletime/annotating/LargeShuffleDecompressionPass.java
@@ -45,12 +45,11 @@ public final class LargeShuffleDecompressionPass extends 
AnnotatingPass {
       inEdges.forEach(edge -> {
         if (edge.getPropertyValue(CommunicationPatternProperty.class).get()
             .equals(CommunicationPatternProperty.Value.Shuffle)) {
-          
edge.setProperty(DecompressionProperty.of(CompressionProperty.Value.None));
+          
edge.setPropertyPermanently(DecompressionProperty.of(CompressionProperty.Value.None));
 
           dag.getOutgoingEdgesOf(edge.getDst())
-              .forEach(edgeFromRelay -> {
-                
edgeFromRelay.setProperty(DecompressionProperty.of(CompressionProperty.Value.LZ4));
-              });
+              .forEach(edgeFromRelay ->
+                  
edgeFromRelay.setPropertyPermanently(DecompressionProperty.of(CompressionProperty.Value.LZ4)));
         }
       });
     });
diff --git 
a/compiler/optimizer/src/main/java/edu/snu/nemo/compiler/optimizer/pass/compiletime/annotating/LargeShuffleEncoderPass.java
 
b/compiler/optimizer/src/main/java/edu/snu/nemo/compiler/optimizer/pass/compiletime/annotating/LargeShuffleEncoderPass.java
index 8cad7b0..43def8b 100644
--- 
a/compiler/optimizer/src/main/java/edu/snu/nemo/compiler/optimizer/pass/compiletime/annotating/LargeShuffleEncoderPass.java
+++ 
b/compiler/optimizer/src/main/java/edu/snu/nemo/compiler/optimizer/pass/compiletime/annotating/LargeShuffleEncoderPass.java
@@ -46,9 +46,8 @@ public final class LargeShuffleEncoderPass extends 
AnnotatingPass {
         if (edge.getPropertyValue(CommunicationPatternProperty.class).get()
             .equals(CommunicationPatternProperty.Value.Shuffle)) {
           dag.getOutgoingEdgesOf(edge.getDst())
-              .forEach(edgeFromRelay -> {
-                
edgeFromRelay.setProperty(EncoderProperty.of(BytesEncoderFactory.of()));
-              });
+              .forEach(edgeFromRelay ->
+                  
edgeFromRelay.setPropertyPermanently(EncoderProperty.of(BytesEncoderFactory.of())));
         }
       });
     });
diff --git 
a/compiler/optimizer/src/main/java/edu/snu/nemo/compiler/optimizer/pass/compiletime/annotating/LargeShufflePartitionerPass.java
 
b/compiler/optimizer/src/main/java/edu/snu/nemo/compiler/optimizer/pass/compiletime/annotating/LargeShufflePartitionerPass.java
index eb3b2bf..084ab62 100644
--- 
a/compiler/optimizer/src/main/java/edu/snu/nemo/compiler/optimizer/pass/compiletime/annotating/LargeShufflePartitionerPass.java
+++ 
b/compiler/optimizer/src/main/java/edu/snu/nemo/compiler/optimizer/pass/compiletime/annotating/LargeShufflePartitionerPass.java
@@ -47,10 +47,9 @@ public final class LargeShufflePartitionerPass extends 
AnnotatingPass {
         if (edge.getPropertyValue(CommunicationPatternProperty.class).get()
             .equals(CommunicationPatternProperty.Value.Shuffle)) {
           dag.getOutgoingEdgesOf(edge.getDst())
-              .forEach(edgeFromRelay -> {
-                edgeFromRelay.setProperty(PartitionerProperty.of(
-                    
PartitionerProperty.Value.DedicatedKeyPerElementPartitioner));
-              });
+              .forEach(edgeFromRelay ->
+                  edgeFromRelay.setPropertyPermanently(PartitionerProperty.of(
+                      
PartitionerProperty.Value.DedicatedKeyPerElementPartitioner)));
         }
       });
     });
diff --git 
a/compiler/optimizer/src/main/java/edu/snu/nemo/compiler/optimizer/pass/compiletime/annotating/LargeShuffleResourceSlotPass.java
 
b/compiler/optimizer/src/main/java/edu/snu/nemo/compiler/optimizer/pass/compiletime/annotating/LargeShuffleResourceSlotPass.java
index 388f7c2..f474629 100644
--- 
a/compiler/optimizer/src/main/java/edu/snu/nemo/compiler/optimizer/pass/compiletime/annotating/LargeShuffleResourceSlotPass.java
+++ 
b/compiler/optimizer/src/main/java/edu/snu/nemo/compiler/optimizer/pass/compiletime/annotating/LargeShuffleResourceSlotPass.java
@@ -37,15 +37,15 @@ public final class LargeShuffleResourceSlotPass extends 
AnnotatingPass {
     // On every vertex that receive push edge, if ResourceSlotProperty is not 
set, put it as false.
     // For other vertices, if ResourceSlotProperty is not set, put it as true.
     dag.getVertices().stream()
-        .filter(v -> 
!v.getExecutionProperties().containsKey(ResourceSlotProperty.class))
+        .filter(v -> 
!v.getPropertyValue(ResourceSlotProperty.class).isPresent())
         .forEach(v -> {
           if (dag.getIncomingEdgesOf(v).stream().anyMatch(
               e -> e.getPropertyValue(DataFlowProperty.class)
                   .orElseThrow(() -> new 
RuntimeException(String.format("DataFlowProperty for %s must be set",
                       e.getId()))).equals(DataFlowProperty.Value.Push))) {
-            v.getExecutionProperties().put(ResourceSlotProperty.of(false));
+            v.setPropertyPermanently(ResourceSlotProperty.of(false));
           } else {
-            v.getExecutionProperties().put(ResourceSlotProperty.of(true));
+            v.setPropertyPermanently(ResourceSlotProperty.of(true));
           }
         });
     return dag;
diff --git 
a/compiler/optimizer/src/main/java/edu/snu/nemo/compiler/optimizer/pass/compiletime/annotating/LargeSuffleCompressionPass.java
 
b/compiler/optimizer/src/main/java/edu/snu/nemo/compiler/optimizer/pass/compiletime/annotating/LargeSuffleCompressionPass.java
index abc9300..08ede72 100644
--- 
a/compiler/optimizer/src/main/java/edu/snu/nemo/compiler/optimizer/pass/compiletime/annotating/LargeSuffleCompressionPass.java
+++ 
b/compiler/optimizer/src/main/java/edu/snu/nemo/compiler/optimizer/pass/compiletime/annotating/LargeSuffleCompressionPass.java
@@ -44,12 +44,11 @@ public final class LargeSuffleCompressionPass extends 
AnnotatingPass {
       inEdges.forEach(edge -> {
         if (edge.getPropertyValue(CommunicationPatternProperty.class).get()
             .equals(CommunicationPatternProperty.Value.Shuffle)) {
-          
edge.setProperty(CompressionProperty.of(CompressionProperty.Value.LZ4));
+          
edge.setPropertyPermanently(CompressionProperty.of(CompressionProperty.Value.LZ4));
 
           dag.getOutgoingEdgesOf(edge.getDst())
-              .forEach(edgeFromRelay -> {
-                
edgeFromRelay.setProperty(CompressionProperty.of(CompressionProperty.Value.None));
-              });
+              .forEach(edgeFromRelay ->
+                  
edgeFromRelay.setPropertyPermanently(CompressionProperty.of(CompressionProperty.Value.None)));
         }
       });
     });
diff --git 
a/compiler/optimizer/src/main/java/edu/snu/nemo/compiler/optimizer/pass/compiletime/annotating/ResourceLocalityPass.java
 
b/compiler/optimizer/src/main/java/edu/snu/nemo/compiler/optimizer/pass/compiletime/annotating/ResourceLocalityPass.java
index 4862c7e..53f23df 100644
--- 
a/compiler/optimizer/src/main/java/edu/snu/nemo/compiler/optimizer/pass/compiletime/annotating/ResourceLocalityPass.java
+++ 
b/compiler/optimizer/src/main/java/edu/snu/nemo/compiler/optimizer/pass/compiletime/annotating/ResourceLocalityPass.java
@@ -33,8 +33,8 @@ public final class ResourceLocalityPass extends 
AnnotatingPass {
   public DAG<IRVertex, IREdge> apply(final DAG<IRVertex, IREdge> dag) {
     // On every vertex, if ResourceLocalityProperty is not set, put it as true.
     dag.getVertices().stream()
-        .filter(v -> 
!v.getExecutionProperties().containsKey(ResourceLocalityProperty.class))
-        .forEach(v -> 
v.getExecutionProperties().put(ResourceLocalityProperty.of(true)));
+        .filter(v -> 
!v.getPropertyValue(ResourceLocalityProperty.class).isPresent())
+        .forEach(v -> v.setProperty(ResourceLocalityProperty.of(true)));
     return dag;
   }
 }
diff --git 
a/compiler/optimizer/src/main/java/edu/snu/nemo/compiler/optimizer/pass/compiletime/annotating/ResourceSitePass.java
 
b/compiler/optimizer/src/main/java/edu/snu/nemo/compiler/optimizer/pass/compiletime/annotating/ResourceSitePass.java
index 6189826..005138c 100644
--- 
a/compiler/optimizer/src/main/java/edu/snu/nemo/compiler/optimizer/pass/compiletime/annotating/ResourceSitePass.java
+++ 
b/compiler/optimizer/src/main/java/edu/snu/nemo/compiler/optimizer/pass/compiletime/annotating/ResourceSitePass.java
@@ -101,11 +101,11 @@ public final class ResourceSitePass extends 
AnnotatingPass {
       if (inEdges.size() == 0) {
         // This vertex is root vertex.
         // Fall back to setting even distribution
-        
irVertex.getExecutionProperties().put(ResourceSiteProperty.of(EMPTY_MAP));
+        irVertex.setProperty(ResourceSiteProperty.of(EMPTY_MAP));
       } else if (isOneToOneEdge(inEdges)) {
         final Optional<HashMap<String, Integer>> property = 
inEdges.iterator().next().getSrc()
-            .getExecutionProperties().get(ResourceSiteProperty.class);
-        
irVertex.getExecutionProperties().put(ResourceSiteProperty.of(property.get()));
+            .getPropertyValue(ResourceSiteProperty.class);
+        irVertex.setProperty(ResourceSiteProperty.of(property.get()));
       } else {
         // This IRVertex has shuffle inEdge(s), or has multiple inEdges.
         final Map<String, Integer> parentLocationShares = new HashMap<>();
@@ -135,7 +135,7 @@ public final class ResourceSitePass extends AnnotatingPass {
           shares.put(nodeName, shares.get(nodeName) + 1);
           remainder--;
         }
-        irVertex.getExecutionProperties().put(ResourceSiteProperty.of(shares));
+        irVertex.setProperty(ResourceSiteProperty.of(shares));
       }
     });
   }
diff --git 
a/compiler/optimizer/src/main/java/edu/snu/nemo/compiler/optimizer/pass/compiletime/annotating/ResourceSlotPass.java
 
b/compiler/optimizer/src/main/java/edu/snu/nemo/compiler/optimizer/pass/compiletime/annotating/ResourceSlotPass.java
index 966fb50..f971882 100644
--- 
a/compiler/optimizer/src/main/java/edu/snu/nemo/compiler/optimizer/pass/compiletime/annotating/ResourceSlotPass.java
+++ 
b/compiler/optimizer/src/main/java/edu/snu/nemo/compiler/optimizer/pass/compiletime/annotating/ResourceSlotPass.java
@@ -33,8 +33,8 @@ public final class ResourceSlotPass extends AnnotatingPass {
   public DAG<IRVertex, IREdge> apply(final DAG<IRVertex, IREdge> dag) {
     // On every vertex, if ResourceSlotProperty is not set, put it as true.
     dag.getVertices().stream()
-        .filter(v -> 
!v.getExecutionProperties().containsKey(ResourceSlotProperty.class))
-        .forEach(v -> 
v.getExecutionProperties().put(ResourceSlotProperty.of(true)));
+        .filter(v -> 
!v.getPropertyValue(ResourceSlotProperty.class).isPresent())
+        .forEach(v -> v.setProperty(ResourceSlotProperty.of(true)));
     return dag;
   }
 }
diff --git 
a/compiler/optimizer/src/main/java/edu/snu/nemo/compiler/optimizer/pass/compiletime/annotating/SkewDataStorePass.java
 
b/compiler/optimizer/src/main/java/edu/snu/nemo/compiler/optimizer/pass/compiletime/annotating/SkewDataStorePass.java
index 6405c45..8b1592b 100644
--- 
a/compiler/optimizer/src/main/java/edu/snu/nemo/compiler/optimizer/pass/compiletime/annotating/SkewDataStorePass.java
+++ 
b/compiler/optimizer/src/main/java/edu/snu/nemo/compiler/optimizer/pass/compiletime/annotating/SkewDataStorePass.java
@@ -45,9 +45,9 @@ public final class SkewDataStorePass extends AnnotatingPass {
         dag.getIncomingEdgesOf(v).forEach(edge -> {
           // we want it to be in the same stage
           if (edge.equals(edgeToUseMemory)) {
-            
edge.setProperty(DataStoreProperty.of(DataStoreProperty.Value.MemoryStore));
+            
edge.setPropertyPermanently(DataStoreProperty.of(DataStoreProperty.Value.MemoryStore));
           } else {
-            
edge.setProperty(DataStoreProperty.of(DataStoreProperty.Value.LocalFileStore));
+            
edge.setPropertyPermanently(DataStoreProperty.of(DataStoreProperty.Value.LocalFileStore));
           }
         });
       }
diff --git 
a/compiler/optimizer/src/main/java/edu/snu/nemo/compiler/optimizer/pass/compiletime/annotating/SkewMetricCollectionPass.java
 
b/compiler/optimizer/src/main/java/edu/snu/nemo/compiler/optimizer/pass/compiletime/annotating/SkewMetricCollectionPass.java
index fe19942..f2bf9d3 100644
--- 
a/compiler/optimizer/src/main/java/edu/snu/nemo/compiler/optimizer/pass/compiletime/annotating/SkewMetricCollectionPass.java
+++ 
b/compiler/optimizer/src/main/java/edu/snu/nemo/compiler/optimizer/pass/compiletime/annotating/SkewMetricCollectionPass.java
@@ -46,7 +46,8 @@ public final class SkewMetricCollectionPass extends 
AnnotatingPass {
           // double checking.
           if (edge.getPropertyValue(CommunicationPatternProperty.class).get()
               .equals(CommunicationPatternProperty.Value.Shuffle)) {
-            
edge.setProperty(MetricCollectionProperty.of(MetricCollectionProperty.Value.DataSkewRuntimePass));
+            edge.setPropertyPermanently(MetricCollectionProperty.of(
+                MetricCollectionProperty.Value.DataSkewRuntimePass));
           }
         });
       }
diff --git 
a/compiler/optimizer/src/main/java/edu/snu/nemo/compiler/optimizer/pass/compiletime/annotating/SkewPartitionerPass.java
 
b/compiler/optimizer/src/main/java/edu/snu/nemo/compiler/optimizer/pass/compiletime/annotating/SkewPartitionerPass.java
index 2b9b903..58c785f 100644
--- 
a/compiler/optimizer/src/main/java/edu/snu/nemo/compiler/optimizer/pass/compiletime/annotating/SkewPartitionerPass.java
+++ 
b/compiler/optimizer/src/main/java/edu/snu/nemo/compiler/optimizer/pass/compiletime/annotating/SkewPartitionerPass.java
@@ -45,7 +45,7 @@ public final class SkewPartitionerPass extends AnnotatingPass 
{
           // double checking.
           if (MetricCollectionProperty.Value.DataSkewRuntimePass
             
.equals(edge.getPropertyValue(MetricCollectionProperty.class).get())) {
-            
edge.setProperty(PartitionerProperty.of(PartitionerProperty.Value.DataSkewHashPartitioner));
+            
edge.setPropertyPermanently(PartitionerProperty.of(PartitionerProperty.Value.DataSkewHashPartitioner));
           }
         });
       }
diff --git 
a/compiler/optimizer/src/main/java/edu/snu/nemo/compiler/optimizer/pass/compiletime/annotating/SkewResourceSkewedDataPass.java
 
b/compiler/optimizer/src/main/java/edu/snu/nemo/compiler/optimizer/pass/compiletime/annotating/SkewResourceSkewedDataPass.java
index ab2f4be..e2ac70f 100644
--- 
a/compiler/optimizer/src/main/java/edu/snu/nemo/compiler/optimizer/pass/compiletime/annotating/SkewResourceSkewedDataPass.java
+++ 
b/compiler/optimizer/src/main/java/edu/snu/nemo/compiler/optimizer/pass/compiletime/annotating/SkewResourceSkewedDataPass.java
@@ -55,8 +55,8 @@ public final class SkewResourceSkewedDataPass extends 
AnnotatingPass {
             .of(DynamicOptimizationProperty.Value.DataSkewRuntimePass)));
     dag.getVertices().stream()
         .filter(v -> hasMetricCollectionBarrierVertexAsParent(dag, v)
-            && 
!v.getExecutionProperties().containsKey(ResourceSkewedDataProperty.class))
-        .forEach(v -> 
v.getExecutionProperties().put(ResourceSkewedDataProperty.of(true)));
+            && 
!v.getPropertyValue(ResourceSkewedDataProperty.class).isPresent())
+        .forEach(v -> v.setProperty(ResourceSkewedDataProperty.of(true)));
 
     return dag;
   }
diff --git 
a/compiler/optimizer/src/main/java/edu/snu/nemo/compiler/optimizer/pass/compiletime/annotating/TransientResourceDataFlowPass.java
 
b/compiler/optimizer/src/main/java/edu/snu/nemo/compiler/optimizer/pass/compiletime/annotating/TransientResourceDataFlowPass.java
index 3433dbe..4da3bcf 100644
--- 
a/compiler/optimizer/src/main/java/edu/snu/nemo/compiler/optimizer/pass/compiletime/annotating/TransientResourceDataFlowPass.java
+++ 
b/compiler/optimizer/src/main/java/edu/snu/nemo/compiler/optimizer/pass/compiletime/annotating/TransientResourceDataFlowPass.java
@@ -44,9 +44,9 @@ public final class TransientResourceDataFlowPass extends 
AnnotatingPass {
       if (!inEdges.isEmpty()) {
         inEdges.forEach(edge -> {
           if (fromTransientToReserved(edge)) {
-            edge.setProperty(DataFlowProperty.of(DataFlowProperty.Value.Push));
+            
edge.setPropertyPermanently(DataFlowProperty.of(DataFlowProperty.Value.Push));
           } else {
-            edge.setProperty(DataFlowProperty.of(DataFlowProperty.Value.Pull));
+            
edge.setPropertyPermanently(DataFlowProperty.of(DataFlowProperty.Value.Pull));
           }
         });
       }
diff --git 
a/compiler/optimizer/src/main/java/edu/snu/nemo/compiler/optimizer/pass/compiletime/annotating/TransientResourceDataStorePass.java
 
b/compiler/optimizer/src/main/java/edu/snu/nemo/compiler/optimizer/pass/compiletime/annotating/TransientResourceDataStorePass.java
index bcf0d0e..dcd9132 100644
--- 
a/compiler/optimizer/src/main/java/edu/snu/nemo/compiler/optimizer/pass/compiletime/annotating/TransientResourceDataStorePass.java
+++ 
b/compiler/optimizer/src/main/java/edu/snu/nemo/compiler/optimizer/pass/compiletime/annotating/TransientResourceDataStorePass.java
@@ -43,12 +43,12 @@ public final class TransientResourceDataStorePass extends 
AnnotatingPass {
       if (!inEdges.isEmpty()) {
         inEdges.forEach(edge -> {
           if (fromTransientToReserved(edge) || fromReservedToTransient(edge)) {
-            
edge.setProperty(DataStoreProperty.of(DataStoreProperty.Value.LocalFileStore));
+            
edge.setPropertyPermanently(DataStoreProperty.of(DataStoreProperty.Value.LocalFileStore));
           } else if (CommunicationPatternProperty.Value.OneToOne
               
.equals(edge.getPropertyValue(CommunicationPatternProperty.class).get())) {
-            
edge.setProperty(DataStoreProperty.of(DataStoreProperty.Value.MemoryStore));
+            
edge.setPropertyPermanently(DataStoreProperty.of(DataStoreProperty.Value.MemoryStore));
           } else {
-            
edge.setProperty(DataStoreProperty.of(DataStoreProperty.Value.LocalFileStore));
+            
edge.setPropertyPermanently(DataStoreProperty.of(DataStoreProperty.Value.LocalFileStore));
           }
         });
       }
diff --git 
a/compiler/optimizer/src/main/java/edu/snu/nemo/compiler/optimizer/pass/compiletime/annotating/TransientResourcePriorityPass.java
 
b/compiler/optimizer/src/main/java/edu/snu/nemo/compiler/optimizer/pass/compiletime/annotating/TransientResourcePriorityPass.java
index fd1eb5d..cdf25a2 100644
--- 
a/compiler/optimizer/src/main/java/edu/snu/nemo/compiler/optimizer/pass/compiletime/annotating/TransientResourcePriorityPass.java
+++ 
b/compiler/optimizer/src/main/java/edu/snu/nemo/compiler/optimizer/pass/compiletime/annotating/TransientResourcePriorityPass.java
@@ -40,12 +40,12 @@ public final class TransientResourcePriorityPass extends 
AnnotatingPass {
     dag.topologicalDo(vertex -> {
       final List<IREdge> inEdges = dag.getIncomingEdgesOf(vertex);
       if (inEdges.isEmpty()) {
-        
vertex.setProperty(ResourcePriorityProperty.of(ResourcePriorityProperty.TRANSIENT));
+        
vertex.setPropertyPermanently(ResourcePriorityProperty.of(ResourcePriorityProperty.TRANSIENT));
       } else {
         if (hasM2M(inEdges) || allO2OFromReserved(inEdges)) {
-          
vertex.setProperty(ResourcePriorityProperty.of(ResourcePriorityProperty.RESERVED));
+          
vertex.setPropertyPermanently(ResourcePriorityProperty.of(ResourcePriorityProperty.RESERVED));
         } else {
-          
vertex.setProperty(ResourcePriorityProperty.of(ResourcePriorityProperty.TRANSIENT));
+          
vertex.setPropertyPermanently(ResourcePriorityProperty.of(ResourcePriorityProperty.TRANSIENT));
         }
       }
     });
diff --git 
a/compiler/optimizer/src/main/java/edu/snu/nemo/compiler/optimizer/policy/BasicPullPolicy.java
 
b/compiler/optimizer/src/main/java/edu/snu/nemo/compiler/optimizer/policy/BasicPullPolicy.java
index c251469..31999c8 100644
--- 
a/compiler/optimizer/src/main/java/edu/snu/nemo/compiler/optimizer/policy/BasicPullPolicy.java
+++ 
b/compiler/optimizer/src/main/java/edu/snu/nemo/compiler/optimizer/policy/BasicPullPolicy.java
@@ -27,7 +27,7 @@ import org.apache.reef.tang.Injector;
  */
 public final class BasicPullPolicy implements Policy {
   public static final PolicyBuilder BUILDER =
-      new PolicyBuilder(true)
+      new PolicyBuilder()
           .registerCompileTimePass(new DefaultScheduleGroupPass());
   private final Policy policy;
 
diff --git 
a/compiler/optimizer/src/main/java/edu/snu/nemo/compiler/optimizer/policy/BasicPushPolicy.java
 
b/compiler/optimizer/src/main/java/edu/snu/nemo/compiler/optimizer/policy/BasicPushPolicy.java
index 31a5d7b..5e40144 100644
--- 
a/compiler/optimizer/src/main/java/edu/snu/nemo/compiler/optimizer/policy/BasicPushPolicy.java
+++ 
b/compiler/optimizer/src/main/java/edu/snu/nemo/compiler/optimizer/policy/BasicPushPolicy.java
@@ -28,7 +28,7 @@ import org.apache.reef.tang.Injector;
  */
 public final class BasicPushPolicy implements Policy {
   public static final PolicyBuilder BUILDER =
-      new PolicyBuilder(true)
+      new PolicyBuilder()
           .registerCompileTimePass(new ShuffleEdgePushPass())
           .registerCompileTimePass(new DefaultScheduleGroupPass());
   private final Policy policy;
diff --git 
a/compiler/optimizer/src/main/java/edu/snu/nemo/compiler/optimizer/policy/ConditionalLargeShufflePolicy.java
 
b/compiler/optimizer/src/main/java/edu/snu/nemo/compiler/optimizer/policy/ConditionalLargeShufflePolicy.java
index efdf14d..807dd57 100644
--- 
a/compiler/optimizer/src/main/java/edu/snu/nemo/compiler/optimizer/policy/ConditionalLargeShufflePolicy.java
+++ 
b/compiler/optimizer/src/main/java/edu/snu/nemo/compiler/optimizer/policy/ConditionalLargeShufflePolicy.java
@@ -31,7 +31,7 @@ import org.apache.reef.tang.Injector;
  */
 public final class ConditionalLargeShufflePolicy implements Policy {
   public static final PolicyBuilder BUILDER =
-      new PolicyBuilder(false)
+      new PolicyBuilder()
           .registerCompileTimePass(new LargeShuffleCompositePass(), dag -> 
getMaxParallelism(dag) > 300)
           .registerCompileTimePass(new LoopOptimizationCompositePass())
           .registerCompileTimePass(new DefaultCompositePass());
diff --git 
a/compiler/optimizer/src/main/java/edu/snu/nemo/compiler/optimizer/policy/DataSkewPolicy.java
 
b/compiler/optimizer/src/main/java/edu/snu/nemo/compiler/optimizer/policy/DataSkewPolicy.java
index be23e6c..9d7eb29 100644
--- 
a/compiler/optimizer/src/main/java/edu/snu/nemo/compiler/optimizer/policy/DataSkewPolicy.java
+++ 
b/compiler/optimizer/src/main/java/edu/snu/nemo/compiler/optimizer/policy/DataSkewPolicy.java
@@ -30,7 +30,7 @@ import org.apache.reef.tang.Injector;
  */
 public final class DataSkewPolicy implements Policy {
   public static final PolicyBuilder BUILDER =
-      new PolicyBuilder(true)
+      new PolicyBuilder()
           .registerRuntimePass(new 
DataSkewRuntimePass().setNumSkewedKeys(DataSkewRuntimePass.DEFAULT_NUM_SKEWED_KEYS),
               new SkewCompositePass())
           .registerCompileTimePass(new LoopOptimizationCompositePass())
@@ -45,7 +45,7 @@ public final class DataSkewPolicy implements Policy {
   }
 
   public DataSkewPolicy(final int skewness) {
-    this.policy = new PolicyBuilder(true)
+    this.policy = new PolicyBuilder()
         .registerRuntimePass(new 
DataSkewRuntimePass().setNumSkewedKeys(skewness), new SkewCompositePass())
         .registerCompileTimePass(new LoopOptimizationCompositePass())
         .registerCompileTimePass(new DefaultCompositePass())
diff --git 
a/compiler/optimizer/src/main/java/edu/snu/nemo/compiler/optimizer/policy/DefaultPolicy.java
 
b/compiler/optimizer/src/main/java/edu/snu/nemo/compiler/optimizer/policy/DefaultPolicy.java
index 986aec5..a84ac35 100644
--- 
a/compiler/optimizer/src/main/java/edu/snu/nemo/compiler/optimizer/policy/DefaultPolicy.java
+++ 
b/compiler/optimizer/src/main/java/edu/snu/nemo/compiler/optimizer/policy/DefaultPolicy.java
@@ -27,7 +27,7 @@ import org.apache.reef.tang.Injector;
  */
 public final class DefaultPolicy implements Policy {
   public static final PolicyBuilder BUILDER =
-      new PolicyBuilder(true)
+      new PolicyBuilder()
           .registerCompileTimePass(new DefaultCompositePass());
   private final Policy policy;
 
diff --git 
a/compiler/optimizer/src/main/java/edu/snu/nemo/compiler/optimizer/policy/DefaultPolicyWithSeparatePass.java
 
b/compiler/optimizer/src/main/java/edu/snu/nemo/compiler/optimizer/policy/DefaultPolicyWithSeparatePass.java
index c832d1f..39b658b 100644
--- 
a/compiler/optimizer/src/main/java/edu/snu/nemo/compiler/optimizer/policy/DefaultPolicyWithSeparatePass.java
+++ 
b/compiler/optimizer/src/main/java/edu/snu/nemo/compiler/optimizer/policy/DefaultPolicyWithSeparatePass.java
@@ -34,7 +34,7 @@ import java.util.Arrays;
  */
 public final class DefaultPolicyWithSeparatePass implements Policy {
   public static final PolicyBuilder BUILDER =
-      new PolicyBuilder(true)
+      new PolicyBuilder()
           .registerCompileTimePass(new DefaultParallelismPass())
           .registerCompileTimePass(new RefactoredPass());
   private final Policy policy;
diff --git 
a/compiler/optimizer/src/main/java/edu/snu/nemo/compiler/optimizer/policy/DisaggregationPolicy.java
 
b/compiler/optimizer/src/main/java/edu/snu/nemo/compiler/optimizer/policy/DisaggregationPolicy.java
index 667b670..0d934ca 100644
--- 
a/compiler/optimizer/src/main/java/edu/snu/nemo/compiler/optimizer/policy/DisaggregationPolicy.java
+++ 
b/compiler/optimizer/src/main/java/edu/snu/nemo/compiler/optimizer/policy/DisaggregationPolicy.java
@@ -29,10 +29,10 @@ import org.apache.reef.tang.Injector;
  */
 public final class DisaggregationPolicy implements Policy {
   public static final PolicyBuilder BUILDER =
-      new PolicyBuilder(false)
+      new PolicyBuilder()
+          .registerCompileTimePass(new DisaggregationEdgeDataStorePass())
           .registerCompileTimePass(new LoopOptimizationCompositePass())
-          .registerCompileTimePass(new DefaultCompositePass())
-          .registerCompileTimePass(new DisaggregationEdgeDataStorePass());
+          .registerCompileTimePass(new DefaultCompositePass());
   private final Policy policy;
 
   /**
diff --git 
a/compiler/optimizer/src/main/java/edu/snu/nemo/compiler/optimizer/policy/LargeShufflePolicy.java
 
b/compiler/optimizer/src/main/java/edu/snu/nemo/compiler/optimizer/policy/LargeShufflePolicy.java
index 018c403..57423b8 100644
--- 
a/compiler/optimizer/src/main/java/edu/snu/nemo/compiler/optimizer/policy/LargeShufflePolicy.java
+++ 
b/compiler/optimizer/src/main/java/edu/snu/nemo/compiler/optimizer/policy/LargeShufflePolicy.java
@@ -29,10 +29,10 @@ import org.apache.reef.tang.Injector;
  */
 public final class LargeShufflePolicy implements Policy {
   public static final PolicyBuilder BUILDER =
-      new PolicyBuilder(false)
-        .registerCompileTimePass(new LargeShuffleCompositePass())
-        .registerCompileTimePass(new LoopOptimizationCompositePass())
-        .registerCompileTimePass(new DefaultCompositePass());
+      new PolicyBuilder()
+          .registerCompileTimePass(new LargeShuffleCompositePass())
+          .registerCompileTimePass(new LoopOptimizationCompositePass())
+          .registerCompileTimePass(new DefaultCompositePass());
   private final Policy policy;
 
   /**
diff --git 
a/compiler/optimizer/src/main/java/edu/snu/nemo/compiler/optimizer/policy/PolicyBuilder.java
 
b/compiler/optimizer/src/main/java/edu/snu/nemo/compiler/optimizer/policy/PolicyBuilder.java
index bf939b9..13a2507 100644
--- 
a/compiler/optimizer/src/main/java/edu/snu/nemo/compiler/optimizer/policy/PolicyBuilder.java
+++ 
b/compiler/optimizer/src/main/java/edu/snu/nemo/compiler/optimizer/policy/PolicyBuilder.java
@@ -40,27 +40,15 @@ import java.util.function.Predicate;
 public final class PolicyBuilder {
   private final List<CompileTimePass> compileTimePasses;
   private final List<RuntimePass<?>> runtimePasses;
-  private final Set<Class<? extends ExecutionProperty>> 
finalizedExecutionProperties;
   private final Set<Class<? extends ExecutionProperty>> 
annotatedExecutionProperties;
-  private final Boolean strictPrerequisiteCheckMode;
 
   /**
    * Default constructor.
    */
   public PolicyBuilder() {
-    this(false);
-  }
-
-  /**
-   * Constructor.
-   * @param strictPrerequisiteCheckMode whether to use strict prerequisite 
check mode or not.
-   */
-  public PolicyBuilder(final Boolean strictPrerequisiteCheckMode) {
     this.compileTimePasses = new ArrayList<>();
     this.runtimePasses = new ArrayList<>();
-    this.finalizedExecutionProperties = new HashSet<>();
     this.annotatedExecutionProperties = new HashSet<>();
-    this.strictPrerequisiteCheckMode = strictPrerequisiteCheckMode;
     // DataCommunicationPattern is already set when creating the IREdge itself.
     annotatedExecutionProperties.add(CommunicationPatternProperty.class);
     // Some default values are already annotated.
@@ -94,13 +82,7 @@ public final class PolicyBuilder {
     if (compileTimePass instanceof AnnotatingPass) {
       final AnnotatingPass annotatingPass = (AnnotatingPass) compileTimePass;
       
this.annotatedExecutionProperties.add(annotatingPass.getExecutionPropertyToModify());
-      if (strictPrerequisiteCheckMode
-          && 
finalizedExecutionProperties.contains(annotatingPass.getExecutionPropertyToModify()))
 {
-        throw new 
CompileTimeOptimizationException(annotatingPass.getExecutionPropertyToModify()
-            + " should have already been finalized.");
-      }
     }
-    
finalizedExecutionProperties.addAll(compileTimePass.getPrerequisiteExecutionProperties());
 
     this.compileTimePasses.add(compileTimePass);
 
diff --git 
a/compiler/optimizer/src/main/java/edu/snu/nemo/compiler/optimizer/policy/TransientResourcePolicy.java
 
b/compiler/optimizer/src/main/java/edu/snu/nemo/compiler/optimizer/policy/TransientResourcePolicy.java
index caa29d9..4528f70 100644
--- 
a/compiler/optimizer/src/main/java/edu/snu/nemo/compiler/optimizer/policy/TransientResourcePolicy.java
+++ 
b/compiler/optimizer/src/main/java/edu/snu/nemo/compiler/optimizer/policy/TransientResourcePolicy.java
@@ -29,7 +29,7 @@ import org.apache.reef.tang.Injector;
  */
 public final class TransientResourcePolicy implements Policy {
   public static final PolicyBuilder BUILDER =
-      new PolicyBuilder(true)
+      new PolicyBuilder()
           .registerCompileTimePass(new TransientResourceCompositePass())
           .registerCompileTimePass(new LoopOptimizationCompositePass())
           .registerCompileTimePass(new DefaultCompositePass());
diff --git 
a/compiler/optimizer/src/test/java/edu/snu/nemo/compiler/optimizer/policy/PolicyImplTest.java
 
b/compiler/optimizer/src/test/java/edu/snu/nemo/compiler/optimizer/policy/PolicyImplTest.java
new file mode 100644
index 0000000..2b2c7b7
--- /dev/null
+++ 
b/compiler/optimizer/src/test/java/edu/snu/nemo/compiler/optimizer/policy/PolicyImplTest.java
@@ -0,0 +1,115 @@
+/*
+ * Copyright (C) 2018 Seoul National University
+ *
+ * Licensed under the Apache License, Version 2.0 (the "License");
+ * you may not use this file except in compliance with the License.
+ * You may obtain a copy of the License at
+ *
+ *         http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package edu.snu.nemo.compiler.optimizer.policy;
+
+import edu.snu.nemo.common.dag.DAG;
+import edu.snu.nemo.common.exception.CompileTimeOptimizationException;
+import edu.snu.nemo.common.test.EmptyComponents;
+import edu.snu.nemo.compiler.optimizer.pass.compiletime.CompileTimePass;
+import edu.snu.nemo.runtime.common.optimizer.pass.runtime.RuntimePass;
+import org.junit.Before;
+import org.junit.Rule;
+import org.junit.Test;
+import org.junit.rules.ExpectedException;
+
+import java.util.ArrayList;
+import java.util.List;
+
+public final class PolicyImplTest {
+  private DAG dag;
+
+  @Before
+  public void setUp() {
+    this.dag = EmptyComponents.buildEmptyDAG();
+  }
+
+  @Rule
+  public final ExpectedException expectedException = ExpectedException.none();
+
+  @Test
+  public void testTransientPolicy() throws Exception {
+    // this should run without an exception.
+    TransientResourcePolicy.BUILDER.build().runCompileTimeOptimization(dag, 
DAG.EMPTY_DAG_DIRECTORY);
+  }
+
+  @Test
+  public void testDisaggregationPolicy() throws Exception {
+    // this should run without an exception.
+    DisaggregationPolicy.BUILDER.build().runCompileTimeOptimization(dag, 
DAG.EMPTY_DAG_DIRECTORY);
+  }
+
+  @Test
+  public void testDataSkewPolicy() throws Exception {
+    // this should run without an exception.
+    DataSkewPolicy.BUILDER.build().runCompileTimeOptimization(dag, 
DAG.EMPTY_DAG_DIRECTORY);
+  }
+
+  @Test
+  public void testLargeShufflePolicy() throws Exception {
+    // this should run without an exception.
+    LargeShufflePolicy.BUILDER.build().runCompileTimeOptimization(dag, 
DAG.EMPTY_DAG_DIRECTORY);
+  }
+
+  @Test
+  public void testTransientAndLargeShuffleCombination() throws Exception {
+    final List<CompileTimePass> compileTimePasses = new ArrayList<>();
+    final List<RuntimePass<?>> runtimePasses = new ArrayList<>();
+    
compileTimePasses.addAll(TransientResourcePolicy.BUILDER.getCompileTimePasses());
+    runtimePasses.addAll(TransientResourcePolicy.BUILDER.getRuntimePasses());
+    
compileTimePasses.addAll(LargeShufflePolicy.BUILDER.getCompileTimePasses());
+    runtimePasses.addAll(LargeShufflePolicy.BUILDER.getRuntimePasses());
+
+    final Policy combinedPolicy = new PolicyImpl(compileTimePasses, 
runtimePasses);
+
+    // This should NOT throw an exception and work well together.
+    combinedPolicy.runCompileTimeOptimization(dag, DAG.EMPTY_DAG_DIRECTORY);
+  }
+
+  @Test
+  public void testTransientAndDisaggregationCombination() throws Exception {
+    final List<CompileTimePass> compileTimePasses = new ArrayList<>();
+    final List<RuntimePass<?>> runtimePasses = new ArrayList<>();
+    
compileTimePasses.addAll(TransientResourcePolicy.BUILDER.getCompileTimePasses());
+    runtimePasses.addAll(TransientResourcePolicy.BUILDER.getRuntimePasses());
+    
compileTimePasses.addAll(DisaggregationPolicy.BUILDER.getCompileTimePasses());
+    runtimePasses.addAll(DisaggregationPolicy.BUILDER.getRuntimePasses());
+
+    final Policy combinedPolicy = new PolicyImpl(compileTimePasses, 
runtimePasses);
+
+    // This should throw an exception.
+    // Not all data store should be transferred from and to the GFS.
+    expectedException.expect(CompileTimeOptimizationException.class);
+    combinedPolicy.runCompileTimeOptimization(dag, DAG.EMPTY_DAG_DIRECTORY);
+  }
+
+  @Test
+  public void testDataSkewAndLargeShuffleCombination() throws Exception {
+    final List<CompileTimePass> compileTimePasses = new ArrayList<>();
+    final List<RuntimePass<?>> runtimePasses = new ArrayList<>();
+    compileTimePasses.addAll(DataSkewPolicy.BUILDER.getCompileTimePasses());
+    runtimePasses.addAll(DataSkewPolicy.BUILDER.getRuntimePasses());
+    
compileTimePasses.addAll(LargeShufflePolicy.BUILDER.getCompileTimePasses());
+    runtimePasses.addAll(LargeShufflePolicy.BUILDER.getRuntimePasses());
+
+    final Policy combinedPolicy = new PolicyImpl(compileTimePasses, 
runtimePasses);
+
+    // This should throw an exception.
+    // DataSizeMetricCollection is not compatible with Push (All data have to 
be stored before the data collection).
+    expectedException.expect(CompileTimeOptimizationException.class);
+    combinedPolicy.runCompileTimeOptimization(dag, DAG.EMPTY_DAG_DIRECTORY);
+  }
+}
diff --git 
a/compiler/test/src/test/java/edu/snu/nemo/compiler/optimizer/pass/compiletime/annotating/DefaultScheduleGroupPassTest.java
 
b/compiler/test/src/test/java/edu/snu/nemo/compiler/optimizer/pass/compiletime/annotating/DefaultScheduleGroupPassTest.java
index bf348e0..70b3275 100644
--- 
a/compiler/test/src/test/java/edu/snu/nemo/compiler/optimizer/pass/compiletime/annotating/DefaultScheduleGroupPassTest.java
+++ 
b/compiler/test/src/test/java/edu/snu/nemo/compiler/optimizer/pass/compiletime/annotating/DefaultScheduleGroupPassTest.java
@@ -102,7 +102,7 @@ public final class DefaultScheduleGroupPassTest {
       dagBuilder.addVertex(vertex);
     }
     for (final IREdge edge : Arrays.asList(e0, e1, e2, e3, e4)) {
-      edge.getExecutionProperties().put(DataFlowProperty.of(dataFlowModel));
+      edge.setProperty(DataFlowProperty.of(dataFlowModel));
       dagBuilder.connectVertices(edge);
     }
     return Pair.of(dagBuilder.buildWithoutSourceSinkCheck(), vertices);
@@ -142,7 +142,7 @@ public final class DefaultScheduleGroupPassTest {
       dagBuilder.addVertex(vertex);
     }
     for (final IREdge edge : Arrays.asList(e0, e1, e2, e3, e4)) {
-      edge.getExecutionProperties().put(DataFlowProperty.of(dataFlowModel));
+      edge.setProperty(DataFlowProperty.of(dataFlowModel));
       dagBuilder.connectVertices(edge);
     }
     return Pair.of(dagBuilder.buildWithoutSourceSinkCheck(), vertices);
@@ -275,7 +275,7 @@ public final class DefaultScheduleGroupPassTest {
     final Pair<DAG<IRVertex, IREdge>, List<IRVertex>> dag
         = generateJoinDAG(CommunicationPatternProperty.Value.OneToOne, 
DataFlowProperty.Value.Push);
     dag.left().getOutgoingEdgesOf(dag.right().get(1)).iterator().next()
-        
.getExecutionProperties().put(DataFlowProperty.of(DataFlowProperty.Value.Pull));
+        .setProperty(DataFlowProperty.of(DataFlowProperty.Value.Pull));
     pass.apply(dag.left());
     final int idxForFirstScheduleGroup = getScheduleGroup(dag.right().get(0));
     final int idxForSecondScheduleGroup = getScheduleGroup(dag.right().get(2));
diff --git 
a/examples/spark/src/test/java/edu/snu/nemo/examples/spark/MRJava.java 
b/examples/spark/src/test/java/edu/snu/nemo/examples/spark/MRJava.java
index 31ac261..618b89a 100644
--- a/examples/spark/src/test/java/edu/snu/nemo/examples/spark/MRJava.java
+++ b/examples/spark/src/test/java/edu/snu/nemo/examples/spark/MRJava.java
@@ -70,7 +70,7 @@ public final class MRJava {
   @Test(timeout = TIMEOUT)
   public void testSparkWordAndLineCount() throws Exception {
     final String inputFileName = "test_input_wordcount_spark";
-    final String outputFileName = "test_output_wordcount_spark";
+    final String outputFileName = "test_output_word_and_line_count";
     final String expectedOutputFilename = 
"expected_output_word_and_line_count";
     final String inputFilePath = fileBasePath + inputFileName;
     final String outputFilePath = fileBasePath + outputFileName;
diff --git 
a/runtime/common/src/main/java/edu/snu/nemo/runtime/common/plan/PhysicalPlanGenerator.java
 
b/runtime/common/src/main/java/edu/snu/nemo/runtime/common/plan/PhysicalPlanGenerator.java
index 364a938..ca4124b 100644
--- 
a/runtime/common/src/main/java/edu/snu/nemo/runtime/common/plan/PhysicalPlanGenerator.java
+++ 
b/runtime/common/src/main/java/edu/snu/nemo/runtime/common/plan/PhysicalPlanGenerator.java
@@ -299,8 +299,7 @@ public final class PhysicalPlanGenerator implements 
Function<DAG<IRVertex, IREdg
     dag.topologicalDo(stage -> {
       final int scheduleGroup = stageToScheduleGroupMap.get(stage);
       
stage.getExecutionProperties().put(ScheduleGroupProperty.of(scheduleGroup));
-      stage.getIRDAG().topologicalDo(vertex -> vertex.getExecutionProperties()
-          .put(ScheduleGroupProperty.of(scheduleGroup)));
+      stage.getIRDAG().topologicalDo(vertex -> 
vertex.setProperty(ScheduleGroupProperty.of(scheduleGroup)));
     });
   }
 
diff --git 
a/runtime/common/src/main/java/edu/snu/nemo/runtime/common/plan/StagePartitioner.java
 
b/runtime/common/src/main/java/edu/snu/nemo/runtime/common/plan/StagePartitioner.java
index d6926e4..abe0a8f 100644
--- 
a/runtime/common/src/main/java/edu/snu/nemo/runtime/common/plan/StagePartitioner.java
+++ 
b/runtime/common/src/main/java/edu/snu/nemo/runtime/common/plan/StagePartitioner.java
@@ -29,7 +29,6 @@ import java.util.*;
 import java.util.concurrent.ConcurrentHashMap;
 import java.util.function.Function;
 import java.util.stream.Collectors;
-import java.util.stream.Stream;
 
 /**
  * A function that is responsible for stage partitioning on IR DAG.
@@ -119,7 +118,8 @@ public final class StagePartitioner implements 
Function<DAG<IRVertex, IREdge>, M
    * @return set of stage-level properties for the stage
    */
   public Set<VertexExecutionProperty> getStageProperties(final IRVertex 
vertex) {
-    final Stream<VertexExecutionProperty> stream = 
vertex.getExecutionProperties().stream();
-    return stream.filter(p -> 
!ignoredPropertyKeys.contains(p.getClass())).collect(Collectors.toSet());
+    return vertex.getExecutionProperties().stream()
+        .filter(p -> !ignoredPropertyKeys.contains(p.getClass()))
+        .collect(Collectors.toSet());
   }
 }
diff --git 
a/runtime/common/src/test/java/edu/snu/nemo/runtime/common/plan/PhysicalPlanGeneratorTest.java
 
b/runtime/common/src/test/java/edu/snu/nemo/runtime/common/plan/PhysicalPlanGeneratorTest.java
index 8f3c957..2c46194 100644
--- 
a/runtime/common/src/test/java/edu/snu/nemo/runtime/common/plan/PhysicalPlanGeneratorTest.java
+++ 
b/runtime/common/src/test/java/edu/snu/nemo/runtime/common/plan/PhysicalPlanGeneratorTest.java
@@ -66,8 +66,8 @@ public final class PhysicalPlanGeneratorTest {
 
   private static final IRVertex newIRVertex(final int scheduleGroup, final int 
parallelism) {
     final IRVertex irVertex = new OperatorVertex(EMPTY_TRANSFORM);
-    
irVertex.getExecutionProperties().put(ScheduleGroupProperty.of(scheduleGroup));
-    irVertex.getExecutionProperties().put(ParallelismProperty.of(parallelism));
+    irVertex.setProperty(ScheduleGroupProperty.of(scheduleGroup));
+    irVertex.setProperty(ParallelismProperty.of(parallelism));
     return irVertex;
   }
 
@@ -75,7 +75,7 @@ public final class PhysicalPlanGeneratorTest {
                                         final 
CommunicationPatternProperty.Value communicationPattern,
                                         final DataFlowProperty.Value 
dataFlowModel) {
     final IREdge irEdge = new IREdge(communicationPattern, src, dst);
-    irEdge.getExecutionProperties().put(DataFlowProperty.of(dataFlowModel));
+    irEdge.setProperty(DataFlowProperty.of(dataFlowModel));
     return irEdge;
   }
 }
diff --git 
a/runtime/common/src/test/java/edu/snu/nemo/runtime/common/plan/StagePartitionerTest.java
 
b/runtime/common/src/test/java/edu/snu/nemo/runtime/common/plan/StagePartitionerTest.java
index 5c26e45..d7eedcd 100644
--- 
a/runtime/common/src/test/java/edu/snu/nemo/runtime/common/plan/StagePartitionerTest.java
+++ 
b/runtime/common/src/test/java/edu/snu/nemo/runtime/common/plan/StagePartitionerTest.java
@@ -58,9 +58,9 @@ public final class StagePartitionerTest {
   private static IRVertex newVertex(final int parallelism, final int 
scheduleGroup,
                                     final List<VertexExecutionProperty> 
otherProperties) {
     final IRVertex vertex = new OperatorVertex(EMPTY_TRANSFORM);
-    vertex.getExecutionProperties().put(ParallelismProperty.of(parallelism));
-    
vertex.getExecutionProperties().put(ScheduleGroupProperty.of(scheduleGroup));
-    otherProperties.forEach(property -> 
vertex.getExecutionProperties().put(property));
+    vertex.setProperty(ParallelismProperty.of(parallelism));
+    vertex.setProperty(ScheduleGroupProperty.of(scheduleGroup));
+    otherProperties.forEach(property -> vertex.setProperty(property));
     return vertex;
   }
 
diff --git 
a/runtime/executor/src/test/java/edu/snu/nemo/runtime/executor/datatransfer/DataTransferTest.java
 
b/runtime/executor/src/test/java/edu/snu/nemo/runtime/executor/datatransfer/DataTransferTest.java
index f309e49..c755543 100644
--- 
a/runtime/executor/src/test/java/edu/snu/nemo/runtime/executor/datatransfer/DataTransferTest.java
+++ 
b/runtime/executor/src/test/java/edu/snu/nemo/runtime/executor/datatransfer/DataTransferTest.java
@@ -481,13 +481,11 @@ public final class DataTransferTest {
 
     // Src setup
     final SourceVertex srcVertex = new 
EmptyComponents.EmptySourceVertex("Source");
-    final ExecutionPropertyMap srcVertexProperties = 
srcVertex.getExecutionProperties();
-    srcVertexProperties.put(ParallelismProperty.of(PARALLELISM_TEN));
+    srcVertex.setProperty(ParallelismProperty.of(PARALLELISM_TEN));
 
     // Dst setup
     final SourceVertex dstVertex = new 
EmptyComponents.EmptySourceVertex("Destination");
-    final ExecutionPropertyMap dstVertexProperties = 
dstVertex.getExecutionProperties();
-    dstVertexProperties.put(ParallelismProperty.of(PARALLELISM_TEN));
+    dstVertex.setProperty(ParallelismProperty.of(PARALLELISM_TEN));
 
     return Pair.of(srcVertex, dstVertex);
   }
@@ -503,13 +501,11 @@ public final class DataTransferTest {
 
     // Src setup
     final SourceVertex srcVertex = new 
EmptyComponents.EmptySourceVertex("Source");
-    final ExecutionPropertyMap srcVertexProperties = 
srcVertex.getExecutionProperties();
-    srcVertexProperties.put(ParallelismProperty.of(PARALLELISM_TEN));
+    srcVertex.setProperty(ParallelismProperty.of(PARALLELISM_TEN));
 
     // Dst setup
     final SourceVertex dstVertex = new 
EmptyComponents.EmptySourceVertex("Destination");
-    final ExecutionPropertyMap dstVertexProperties = 
dstVertex.getExecutionProperties();
-    dstVertexProperties.put(ParallelismProperty.of(PARALLELISM_TEN));
+    dstVertex.setProperty(ParallelismProperty.of(PARALLELISM_TEN));
 
     return Pair.of(srcVertex, dstVertex);
   }

Reply via email to