johnyangk closed pull request #101: [NEMO-168] Fine-grained Execution Property 
Check
URL: https://github.com/apache/incubator-nemo/pull/101
 
 
   

This is a PR merged from a forked repository.
As GitHub hides the original diff on merge, it is displayed below for
the sake of provenance:

As this is a foreign pull request (from a fork), the diff is supplied
below (as it won't show otherwise due to GitHub magic):

diff --git a/common/src/main/java/edu/snu/nemo/common/dag/DAGBuilder.java 
b/common/src/main/java/edu/snu/nemo/common/dag/DAGBuilder.java
index ffdd468a3..295b56ccb 100644
--- a/common/src/main/java/edu/snu/nemo/common/dag/DAGBuilder.java
+++ b/common/src/main/java/edu/snu/nemo/common/dag/DAGBuilder.java
@@ -15,6 +15,7 @@
  */
 package edu.snu.nemo.common.dag;
 
+import edu.snu.nemo.common.exception.CompileTimeOptimizationException;
 import edu.snu.nemo.common.ir.edge.IREdge;
 import edu.snu.nemo.common.ir.edge.executionproperty.DataFlowProperty;
 import edu.snu.nemo.common.ir.edge.executionproperty.MetricCollectionProperty;
@@ -193,7 +194,7 @@ private void cycleCheck(final Stack<V> stack, final Set<V> 
visited, final V vert
     stack.push(vertex);
     // When we encounter a vertex that we've already gone through, then there 
is a cycle.
     if 
(outgoingEdges.get(vertex).stream().map(Edge::getDst).anyMatch(stack::contains))
 {
-      throw new RuntimeException("DAG contains a cycle");
+      throw new CompileTimeOptimizationException("DAG contains a cycle");
     } else {
       outgoingEdges.get(vertex).stream().map(Edge::getDst)
           .filter(v -> !visited.contains(v))
@@ -215,7 +216,7 @@ private void sourceCheck() {
           .filter(v -> !(v instanceof SourceVertex))
           .map(V::getId)
           .collect(Collectors.toList()).toString();
-      throw new RuntimeException("DAG source check failed while building DAG. 
" + problematicVertices);
+      throw new CompileTimeOptimizationException("DAG source check failed 
while building DAG. " + problematicVertices);
     }
   }
 
@@ -232,7 +233,7 @@ private void sinkCheck() {
       final String problematicVertices = verticesToObserve.get().filter(v ->
           !(v instanceof OperatorVertex || v instanceof LoopVertex))
           .map(V::getId).collect(Collectors.toList()).toString();
-      throw new RuntimeException("DAG sink check failed while building DAG: " 
+ problematicVertices);
+      throw new CompileTimeOptimizationException("DAG sink check failed while 
building DAG: " + problematicVertices);
     }
   }
 
@@ -245,7 +246,7 @@ private void executionPropertyCheck() {
         .filter(e -> Boolean.TRUE.equals(e.isSideInput()))
         .filter(e -> 
DataFlowProperty.Value.Push.equals(e.getPropertyValue(DataFlowProperty.class).get()))
         .forEach(e -> {
-          throw new RuntimeException("DAG execution property check: "
+          throw new CompileTimeOptimizationException("DAG execution property 
check: "
               + "SideInput edge is not compatible with push" + e.getId());
         }));
     // DataSizeMetricCollection is not compatible with Push (All data have to 
be stored before the data collection)
@@ -254,7 +255,7 @@ private void executionPropertyCheck() {
                       
.equals(e.getPropertyValue(MetricCollectionProperty.class)))
         .filter(e -> 
DataFlowProperty.Value.Push.equals(e.getPropertyValue(DataFlowProperty.class).get()))
         .forEach(e -> {
-          throw new RuntimeException("DAG execution property check: "
+          throw new CompileTimeOptimizationException("DAG execution property 
check: "
               + "DataSizeMetricCollection edge is not compatible with push" + 
e.getId());
         }));
   }
diff --git a/common/src/main/java/edu/snu/nemo/common/ir/edge/IREdge.java 
b/common/src/main/java/edu/snu/nemo/common/ir/edge/IREdge.java
index 080fc4b59..d2c75e391 100644
--- a/common/src/main/java/edu/snu/nemo/common/ir/edge/IREdge.java
+++ b/common/src/main/java/edu/snu/nemo/common/ir/edge/IREdge.java
@@ -26,7 +26,6 @@
 
 import java.io.Serializable;
 import java.util.Optional;
-import java.util.function.Consumer;
 
 /**
  * Physical execution plan of intermediate data movement.
@@ -68,12 +67,21 @@ public IREdge(final CommunicationPatternProperty.Value 
commPattern,
 
   /**
    * Set an executionProperty of the IREdge.
-   *
    * @param executionProperty the execution property.
    * @return the IREdge with the execution property set.
    */
   public IREdge setProperty(final EdgeExecutionProperty<?> executionProperty) {
-    executionProperties.put(executionProperty);
+    executionProperties.put(executionProperty, false);
+    return this;
+  }
+
+  /**
+   * Set an executionProperty of the IREdge, permanently.
+   * @param executionProperty the execution property.
+   * @return the IREdge with the execution property set.
+   */
+  public IREdge setPropertyPermanently(final EdgeExecutionProperty<?> 
executionProperty) {
+    executionProperties.put(executionProperty, true);
     return this;
   }
 
@@ -92,7 +100,7 @@ public IREdge setProperty(final EdgeExecutionProperty<?> 
executionProperty) {
   /**
    * @return the ExecutionPropertyMap of the IREdge.
    */
-  public ExecutionPropertyMap getExecutionProperties() {
+  public ExecutionPropertyMap<EdgeExecutionProperty> getExecutionProperties() {
     return executionProperties;
   }
 
@@ -117,7 +125,7 @@ public Boolean hasSameItineraryAs(final IREdge edge) {
    * @param thatEdge the edge to copy executionProperties to.
    */
   public void copyExecutionPropertiesTo(final IREdge thatEdge) {
-    
this.getExecutionProperties().forEachProperties((Consumer<EdgeExecutionProperty>)
 thatEdge::setProperty);
+    this.getExecutionProperties().forEachProperties(thatEdge::setProperty);
   }
 
   @Override
diff --git 
a/common/src/main/java/edu/snu/nemo/common/ir/executionproperty/ExecutionPropertyMap.java
 
b/common/src/main/java/edu/snu/nemo/common/ir/executionproperty/ExecutionPropertyMap.java
index a2ef8d901..869f72559 100644
--- 
a/common/src/main/java/edu/snu/nemo/common/ir/executionproperty/ExecutionPropertyMap.java
+++ 
b/common/src/main/java/edu/snu/nemo/common/ir/executionproperty/ExecutionPropertyMap.java
@@ -15,11 +15,11 @@
  */
 package edu.snu.nemo.common.ir.executionproperty;
 
+import edu.snu.nemo.common.coder.DecoderFactory;
+import edu.snu.nemo.common.coder.EncoderFactory;
+import edu.snu.nemo.common.exception.CompileTimeOptimizationException;
 import edu.snu.nemo.common.ir.edge.IREdge;
-import 
edu.snu.nemo.common.ir.edge.executionproperty.CommunicationPatternProperty;
-import edu.snu.nemo.common.ir.edge.executionproperty.DataFlowProperty;
-import edu.snu.nemo.common.ir.edge.executionproperty.DataStoreProperty;
-import edu.snu.nemo.common.ir.edge.executionproperty.PartitionerProperty;
+import edu.snu.nemo.common.ir.edge.executionproperty.*;
 import edu.snu.nemo.common.ir.vertex.IRVertex;
 import 
edu.snu.nemo.common.ir.vertex.executionproperty.ResourcePriorityProperty;
 import edu.snu.nemo.common.ir.vertex.executionproperty.ParallelismProperty;
@@ -42,6 +42,7 @@
 public final class ExecutionPropertyMap<T extends ExecutionProperty> 
implements Serializable {
   private final String id;
   private final Map<Class<? extends ExecutionProperty>, T> properties = new 
HashMap<>();
+  private final Set<Class<? extends ExecutionProperty>> finalizedProperties = 
new HashSet<>();
 
   /**
    * Constructor for ExecutionPropertyMap class.
@@ -64,6 +65,8 @@ public ExecutionPropertyMap(final String id) {
     final ExecutionPropertyMap<EdgeExecutionProperty> map = new 
ExecutionPropertyMap<>(irEdge.getId());
     map.put(CommunicationPatternProperty.of(commPattern));
     map.put(DataFlowProperty.of(DataFlowProperty.Value.Pull));
+    map.put(EncoderProperty.of(EncoderFactory.DUMMY_ENCODER_FACTORY));
+    map.put(DecoderProperty.of(DecoderFactory.DUMMY_DECODER_FACTORY));
     switch (commPattern) {
       case Shuffle:
         
map.put(PartitionerProperty.of(PartitionerProperty.Value.HashPartitioner));
@@ -105,11 +108,36 @@ public String getId() {
   }
 
   /**
-   * Put the given execution property  in the ExecutionPropertyMap.
+   * Put the given execution property  in the ExecutionPropertyMap. By 
default, it does not finalize the property.
    * @param executionProperty execution property to insert.
-   * @return the previous execution property, or null if there was no 
execution property with the specified property key
+   * @return the previous execution property, or null if there was no 
execution property
+   * with the specified property key.
    */
   public T put(final T executionProperty) {
+    return this.put(executionProperty, false);
+  }
+
+  /**
+   * Put the given execution property in the ExecutionPropertyMap.
+   * @param executionProperty execution property to insert.
+   * @param finalize whether or not to finalize the execution property.
+   * @return the previous execution property, or null if there was no 
execution property
+   * with the specified property key.
+   */
+  public T put(final T executionProperty, final Boolean finalize) {
+    // check if the property has been already finalized. We don't mind 
overwriting an identical value.
+    if (finalizedProperties.contains(executionProperty.getClass())
+        && properties.get(executionProperty.getClass()) != null
+        && 
!properties.get(executionProperty.getClass()).equals(executionProperty)) {
+      throw new CompileTimeOptimizationException("Trying to overwrite a 
finalized execution property ["
+          + executionProperty.getClass().getSimpleName() + "] from ["
+          + properties.get(executionProperty.getClass()).getValue() + "] to [" 
+ executionProperty.getValue() + "]");
+    }
+
+    // start the actual put process.
+    if (finalize) {
+      this.finalizedProperties.add(executionProperty.getClass());
+    }
     return properties.put(executionProperty.getClass(), executionProperty);
   }
 
diff --git a/common/src/main/java/edu/snu/nemo/common/ir/vertex/IRVertex.java 
b/common/src/main/java/edu/snu/nemo/common/ir/vertex/IRVertex.java
index de7359b89..67fc42dc1 100644
--- a/common/src/main/java/edu/snu/nemo/common/ir/vertex/IRVertex.java
+++ b/common/src/main/java/edu/snu/nemo/common/ir/vertex/IRVertex.java
@@ -22,7 +22,6 @@
 
 import java.io.Serializable;
 import java.util.Optional;
-import java.util.function.Consumer;
 
 /**
  * The basic unit of operation in a dataflow program, as well as the most 
important data structure in Nemo.
@@ -49,7 +48,7 @@ public IRVertex() {
    * @param thatVertex the edge to copy executionProperties to.
    */
   public final void copyExecutionPropertiesTo(final IRVertex thatVertex) {
-    
this.getExecutionProperties().forEachProperties((Consumer<VertexExecutionProperty>)
 thatVertex::setProperty);
+    this.getExecutionProperties().forEachProperties(thatVertex::setProperty);
   }
 
   /**
@@ -58,7 +57,17 @@ public final void copyExecutionPropertiesTo(final IRVertex 
thatVertex) {
    * @return the IRVertex with the execution property set.
    */
   public final IRVertex setProperty(final VertexExecutionProperty<?> 
executionProperty) {
-    executionProperties.put(executionProperty);
+    executionProperties.put(executionProperty, false);
+    return this;
+  }
+
+  /**
+   * Set an executionProperty of the IRVertex, permanently.
+   * @param executionProperty new execution property.
+   * @return the IRVertex with the execution property set.
+   */
+  public final IRVertex setPropertyPermanently(final 
VertexExecutionProperty<?> executionProperty) {
+    executionProperties.put(executionProperty, true);
     return this;
   }
 
@@ -76,7 +85,7 @@ public final IRVertex setProperty(final 
VertexExecutionProperty<?> executionProp
   /**
    * @return the ExecutionPropertyMap of the IRVertex.
    */
-  public final ExecutionPropertyMap getExecutionProperties() {
+  public final ExecutionPropertyMap<VertexExecutionProperty> 
getExecutionProperties() {
     return executionProperties;
   }
 
diff --git a/common/src/main/java/edu/snu/nemo/common/test/EmptyComponents.java 
b/common/src/main/java/edu/snu/nemo/common/test/EmptyComponents.java
index 5af568827..88a768a02 100644
--- a/common/src/main/java/edu/snu/nemo/common/test/EmptyComponents.java
+++ b/common/src/main/java/edu/snu/nemo/common/test/EmptyComponents.java
@@ -15,8 +15,14 @@
  */
 package edu.snu.nemo.common.test;
 
+import edu.snu.nemo.common.dag.DAG;
+import edu.snu.nemo.common.dag.DAGBuilder;
 import edu.snu.nemo.common.ir.OutputCollector;
 import edu.snu.nemo.common.ir.Readable;
+import edu.snu.nemo.common.ir.edge.IREdge;
+import 
edu.snu.nemo.common.ir.edge.executionproperty.CommunicationPatternProperty;
+import edu.snu.nemo.common.ir.vertex.IRVertex;
+import edu.snu.nemo.common.ir.vertex.OperatorVertex;
 import edu.snu.nemo.common.ir.vertex.SourceVertex;
 import edu.snu.nemo.common.ir.vertex.transform.Transform;
 
@@ -32,6 +38,28 @@
   private EmptyComponents() {
   }
 
+  public static DAG<IRVertex, IREdge> buildEmptyDAG() {
+    DAGBuilder<IRVertex, IREdge> dagBuilder = new DAGBuilder<>();
+    final IRVertex s = new EmptyComponents.EmptySourceVertex<>("s");
+    final IRVertex t1 = new OperatorVertex(new 
EmptyComponents.EmptyTransform("t1"));
+    final IRVertex t2 = new OperatorVertex(new 
EmptyComponents.EmptyTransform("t2"));
+    final IRVertex t3 = new OperatorVertex(new 
EmptyComponents.EmptyTransform("t3"));
+    final IRVertex t4 = new OperatorVertex(new 
EmptyComponents.EmptyTransform("t4"));
+    final IRVertex t5 = new OperatorVertex(new 
EmptyComponents.EmptyTransform("t5"));
+    dagBuilder.addVertex(s);
+    dagBuilder.addVertex(t1);
+    dagBuilder.addVertex(t2);
+    dagBuilder.addVertex(t3);
+    dagBuilder.addVertex(t4);
+    dagBuilder.addVertex(t5);
+    dagBuilder.connectVertices(new 
IREdge(CommunicationPatternProperty.Value.OneToOne, s, t1));
+    dagBuilder.connectVertices(new 
IREdge(CommunicationPatternProperty.Value.Shuffle, t1, t2));
+    dagBuilder.connectVertices(new 
IREdge(CommunicationPatternProperty.Value.OneToOne, t2, t3));
+    dagBuilder.connectVertices(new 
IREdge(CommunicationPatternProperty.Value.Shuffle, t3, t4));
+    dagBuilder.connectVertices(new 
IREdge(CommunicationPatternProperty.Value.OneToOne, t2, t5));
+    return dagBuilder.build();
+  }
+
   /**
    * An empty transform.
    * @param <I> input type.
diff --git 
a/common/src/test/java/edu/snu/nemo/common/ir/executionproperty/ExecutionPropertyMapTest.java
 
b/common/src/test/java/edu/snu/nemo/common/ir/executionproperty/ExecutionPropertyMapTest.java
index d5aba035e..fc2dc0c1a 100644
--- 
a/common/src/test/java/edu/snu/nemo/common/ir/executionproperty/ExecutionPropertyMapTest.java
+++ 
b/common/src/test/java/edu/snu/nemo/common/ir/executionproperty/ExecutionPropertyMapTest.java
@@ -17,6 +17,7 @@
 
 import edu.snu.nemo.common.coder.DecoderFactory;
 import edu.snu.nemo.common.coder.EncoderFactory;
+import edu.snu.nemo.common.exception.CompileTimeOptimizationException;
 import edu.snu.nemo.common.ir.edge.IREdge;
 import edu.snu.nemo.common.ir.edge.executionproperty.*;
 import edu.snu.nemo.common.ir.vertex.IRVertex;
@@ -24,7 +25,9 @@
 import edu.snu.nemo.common.ir.vertex.executionproperty.ParallelismProperty;
 import edu.snu.nemo.common.test.EmptyComponents;
 import org.junit.Before;
+import org.junit.Rule;
 import org.junit.Test;
+import org.junit.rules.ExpectedException;
 
 import static org.junit.Assert.assertEquals;
 import static org.junit.Assert.assertFalse;
@@ -102,4 +105,20 @@ public void testEquality() {
     assertTrue(map0.equals(map1));
     assertTrue(map1.equals(map0));
   }
+
+  @Rule
+  public final ExpectedException expectedException = ExpectedException.none();
+
+  @Test
+  public void testFinalizedProperty() {
+    // this should work without a problem..
+    final ExecutionPropertyMap<ExecutionProperty> map = new 
ExecutionPropertyMap<>("map");
+    map.put(ParallelismProperty.of(1), false);
+    assertEquals(ParallelismProperty.of(1), 
map.put(ParallelismProperty.of(2)));
+    assertEquals(ParallelismProperty.of(2), map.put(ParallelismProperty.of(3), 
true));
+
+    // test exception
+    expectedException.expect(CompileTimeOptimizationException.class);
+    map.put(ParallelismProperty.of(4));
+  }
 }
diff --git 
a/compiler/optimizer/src/main/java/edu/snu/nemo/compiler/optimizer/pass/compiletime/annotating/DefaultDataStorePass.java
 
b/compiler/optimizer/src/main/java/edu/snu/nemo/compiler/optimizer/pass/compiletime/annotating/DefaultDataStorePass.java
index 6cdb1267a..e40b95a2a 100644
--- 
a/compiler/optimizer/src/main/java/edu/snu/nemo/compiler/optimizer/pass/compiletime/annotating/DefaultDataStorePass.java
+++ 
b/compiler/optimizer/src/main/java/edu/snu/nemo/compiler/optimizer/pass/compiletime/annotating/DefaultDataStorePass.java
@@ -37,7 +37,7 @@ public DefaultDataStorePass() {
   public DAG<IRVertex, IREdge> apply(final DAG<IRVertex, IREdge> dag) {
     dag.getVertices().forEach(vertex -> {
       dag.getIncomingEdgesOf(vertex).stream()
-          .filter(edge -> 
!edge.getExecutionProperties().containsKey(DataStoreProperty.class))
+          .filter(edge -> 
!edge.getPropertyValue(DataStoreProperty.class).isPresent())
           .forEach(edge -> edge.setProperty(
               DataStoreProperty.of(DataStoreProperty.Value.LocalFileStore)));
     });
diff --git 
a/compiler/optimizer/src/main/java/edu/snu/nemo/compiler/optimizer/pass/compiletime/annotating/DefaultScheduleGroupPass.java
 
b/compiler/optimizer/src/main/java/edu/snu/nemo/compiler/optimizer/pass/compiletime/annotating/DefaultScheduleGroupPass.java
index 190a5529a..1aa4dcf3a 100644
--- 
a/compiler/optimizer/src/main/java/edu/snu/nemo/compiler/optimizer/pass/compiletime/annotating/DefaultScheduleGroupPass.java
+++ 
b/compiler/optimizer/src/main/java/edu/snu/nemo/compiler/optimizer/pass/compiletime/annotating/DefaultScheduleGroupPass.java
@@ -213,7 +213,7 @@ public DefaultScheduleGroupPass(final boolean 
allowBroadcastWithinScheduleGroup,
       boolean usedCurrentScheduleGroup = false;
       for (final IRVertex irVertex : scheduleGroup.vertices) {
         if 
(!irVertex.getPropertyValue(ScheduleGroupProperty.class).isPresent()) {
-          
irVertex.getExecutionProperties().put(ScheduleGroupProperty.of(currentScheduleGroup.getValue()));
+          
irVertex.setProperty(ScheduleGroupProperty.of(currentScheduleGroup.getValue()));
           usedCurrentScheduleGroup = true;
         }
       }
diff --git 
a/compiler/optimizer/src/main/java/edu/snu/nemo/compiler/optimizer/pass/compiletime/annotating/DisaggregationEdgeDataStorePass.java
 
b/compiler/optimizer/src/main/java/edu/snu/nemo/compiler/optimizer/pass/compiletime/annotating/DisaggregationEdgeDataStorePass.java
index e498c74d6..262f0ca48 100644
--- 
a/compiler/optimizer/src/main/java/edu/snu/nemo/compiler/optimizer/pass/compiletime/annotating/DisaggregationEdgeDataStorePass.java
+++ 
b/compiler/optimizer/src/main/java/edu/snu/nemo/compiler/optimizer/pass/compiletime/annotating/DisaggregationEdgeDataStorePass.java
@@ -39,9 +39,8 @@ public DisaggregationEdgeDataStorePass() {
   public DAG<IRVertex, IREdge> apply(final DAG<IRVertex, IREdge> dag) {
     dag.getVertices().forEach(vertex -> { // Initialize the DataStore of the 
DAG with GlusterFileStore.
       final List<IREdge> inEdges = dag.getIncomingEdgesOf(vertex);
-      inEdges.forEach(edge -> {
-        
edge.setProperty(DataStoreProperty.of(DataStoreProperty.Value.GlusterFileStore));
-      });
+      inEdges.forEach(edge ->
+        
edge.setPropertyPermanently(DataStoreProperty.of(DataStoreProperty.Value.GlusterFileStore)));
     });
     return dag;
   }
diff --git 
a/compiler/optimizer/src/main/java/edu/snu/nemo/compiler/optimizer/pass/compiletime/annotating/LargeShuffleDataFlowPass.java
 
b/compiler/optimizer/src/main/java/edu/snu/nemo/compiler/optimizer/pass/compiletime/annotating/LargeShuffleDataFlowPass.java
index 16a3b81af..d5d18acb2 100644
--- 
a/compiler/optimizer/src/main/java/edu/snu/nemo/compiler/optimizer/pass/compiletime/annotating/LargeShuffleDataFlowPass.java
+++ 
b/compiler/optimizer/src/main/java/edu/snu/nemo/compiler/optimizer/pass/compiletime/annotating/LargeShuffleDataFlowPass.java
@@ -43,9 +43,9 @@ public LargeShuffleDataFlowPass() {
       inEdges.forEach(edge -> {
         if (edge.getPropertyValue(CommunicationPatternProperty.class).get()
             .equals(CommunicationPatternProperty.Value.Shuffle)) {
-          edge.setProperty(DataFlowProperty.of(DataFlowProperty.Value.Push)); 
// Push to the merger vertex.
+          
edge.setPropertyPermanently(DataFlowProperty.of(DataFlowProperty.Value.Push)); 
// Push to the merger vertex.
         } else {
-          edge.setProperty(DataFlowProperty.of(DataFlowProperty.Value.Pull));
+          
edge.setPropertyPermanently(DataFlowProperty.of(DataFlowProperty.Value.Pull));
         }
       });
     });
diff --git 
a/compiler/optimizer/src/main/java/edu/snu/nemo/compiler/optimizer/pass/compiletime/annotating/LargeShuffleDataPersistencePass.java
 
b/compiler/optimizer/src/main/java/edu/snu/nemo/compiler/optimizer/pass/compiletime/annotating/LargeShuffleDataPersistencePass.java
index 7ebad498d..d7901f29d 100644
--- 
a/compiler/optimizer/src/main/java/edu/snu/nemo/compiler/optimizer/pass/compiletime/annotating/LargeShuffleDataPersistencePass.java
+++ 
b/compiler/optimizer/src/main/java/edu/snu/nemo/compiler/optimizer/pass/compiletime/annotating/LargeShuffleDataPersistencePass.java
@@ -42,7 +42,7 @@ public LargeShuffleDataPersistencePass() {
         dag.getIncomingEdgesOf(irVertex).forEach(irEdge -> {
           final DataFlowProperty.Value dataFlowModel = 
irEdge.getPropertyValue(DataFlowProperty.class).get();
           if (DataFlowProperty.Value.Push.equals(dataFlowModel)) {
-            
irEdge.setProperty(DataPersistenceProperty.of(DataPersistenceProperty.Value.Discard));
+            
irEdge.setPropertyPermanently(DataPersistenceProperty.of(DataPersistenceProperty.Value.Discard));
           }
         }));
     return dag;
diff --git 
a/compiler/optimizer/src/main/java/edu/snu/nemo/compiler/optimizer/pass/compiletime/annotating/LargeShuffleDataStorePass.java
 
b/compiler/optimizer/src/main/java/edu/snu/nemo/compiler/optimizer/pass/compiletime/annotating/LargeShuffleDataStorePass.java
index c80f6758b..40b4bb41f 100644
--- 
a/compiler/optimizer/src/main/java/edu/snu/nemo/compiler/optimizer/pass/compiletime/annotating/LargeShuffleDataStorePass.java
+++ 
b/compiler/optimizer/src/main/java/edu/snu/nemo/compiler/optimizer/pass/compiletime/annotating/LargeShuffleDataStorePass.java
@@ -46,13 +46,12 @@ public LargeShuffleDataStorePass() {
           if (CommunicationPatternProperty.Value.Shuffle
           
.equals(edgeToMerger.getPropertyValue(CommunicationPatternProperty.class).get()))
 {
             // Pass data through memory to the merger vertex.
-            edgeToMerger.setProperty(DataStoreProperty
-                .of(DataStoreProperty.Value.SerializedMemoryStore));
+            
edgeToMerger.setPropertyPermanently(DataStoreProperty.of(DataStoreProperty.Value.SerializedMemoryStore));
           }
         });
         dag.getOutgoingEdgesOf(vertex).forEach(edgeFromMerger ->
             // Merge the input data and write it immediately to the remote 
disk.
-            
edgeFromMerger.setProperty(DataStoreProperty.of(DataStoreProperty.Value.LocalFileStore)));
+            
edgeFromMerger.setPropertyPermanently(DataStoreProperty.of(DataStoreProperty.Value.LocalFileStore)));
       }
     });
     return dag;
diff --git 
a/compiler/optimizer/src/main/java/edu/snu/nemo/compiler/optimizer/pass/compiletime/annotating/LargeShuffleDecoderPass.java
 
b/compiler/optimizer/src/main/java/edu/snu/nemo/compiler/optimizer/pass/compiletime/annotating/LargeShuffleDecoderPass.java
index ed66bcdbb..978aff1bf 100644
--- 
a/compiler/optimizer/src/main/java/edu/snu/nemo/compiler/optimizer/pass/compiletime/annotating/LargeShuffleDecoderPass.java
+++ 
b/compiler/optimizer/src/main/java/edu/snu/nemo/compiler/optimizer/pass/compiletime/annotating/LargeShuffleDecoderPass.java
@@ -45,7 +45,7 @@ public LargeShuffleDecoderPass() {
       inEdges.forEach(edge -> {
         if (edge.getPropertyValue(CommunicationPatternProperty.class).get()
             .equals(CommunicationPatternProperty.Value.Shuffle)) {
-          edge.setProperty(DecoderProperty.of(BytesDecoderFactory.of()));
+          
edge.setPropertyPermanently(DecoderProperty.of(BytesDecoderFactory.of()));
         }
       });
     });
diff --git 
a/compiler/optimizer/src/main/java/edu/snu/nemo/compiler/optimizer/pass/compiletime/annotating/LargeShuffleDecompressionPass.java
 
b/compiler/optimizer/src/main/java/edu/snu/nemo/compiler/optimizer/pass/compiletime/annotating/LargeShuffleDecompressionPass.java
index e1b31f2cf..625b2c988 100644
--- 
a/compiler/optimizer/src/main/java/edu/snu/nemo/compiler/optimizer/pass/compiletime/annotating/LargeShuffleDecompressionPass.java
+++ 
b/compiler/optimizer/src/main/java/edu/snu/nemo/compiler/optimizer/pass/compiletime/annotating/LargeShuffleDecompressionPass.java
@@ -45,12 +45,11 @@ public LargeShuffleDecompressionPass() {
       inEdges.forEach(edge -> {
         if (edge.getPropertyValue(CommunicationPatternProperty.class).get()
             .equals(CommunicationPatternProperty.Value.Shuffle)) {
-          
edge.setProperty(DecompressionProperty.of(CompressionProperty.Value.None));
+          
edge.setPropertyPermanently(DecompressionProperty.of(CompressionProperty.Value.None));
 
           dag.getOutgoingEdgesOf(edge.getDst())
-              .forEach(edgeFromRelay -> {
-                
edgeFromRelay.setProperty(DecompressionProperty.of(CompressionProperty.Value.LZ4));
-              });
+              .forEach(edgeFromRelay ->
+                  
edgeFromRelay.setPropertyPermanently(DecompressionProperty.of(CompressionProperty.Value.LZ4)));
         }
       });
     });
diff --git 
a/compiler/optimizer/src/main/java/edu/snu/nemo/compiler/optimizer/pass/compiletime/annotating/LargeShuffleEncoderPass.java
 
b/compiler/optimizer/src/main/java/edu/snu/nemo/compiler/optimizer/pass/compiletime/annotating/LargeShuffleEncoderPass.java
index 8cad7b092..43def8b87 100644
--- 
a/compiler/optimizer/src/main/java/edu/snu/nemo/compiler/optimizer/pass/compiletime/annotating/LargeShuffleEncoderPass.java
+++ 
b/compiler/optimizer/src/main/java/edu/snu/nemo/compiler/optimizer/pass/compiletime/annotating/LargeShuffleEncoderPass.java
@@ -46,9 +46,8 @@ public LargeShuffleEncoderPass() {
         if (edge.getPropertyValue(CommunicationPatternProperty.class).get()
             .equals(CommunicationPatternProperty.Value.Shuffle)) {
           dag.getOutgoingEdgesOf(edge.getDst())
-              .forEach(edgeFromRelay -> {
-                
edgeFromRelay.setProperty(EncoderProperty.of(BytesEncoderFactory.of()));
-              });
+              .forEach(edgeFromRelay ->
+                  
edgeFromRelay.setPropertyPermanently(EncoderProperty.of(BytesEncoderFactory.of())));
         }
       });
     });
diff --git 
a/compiler/optimizer/src/main/java/edu/snu/nemo/compiler/optimizer/pass/compiletime/annotating/LargeShufflePartitionerPass.java
 
b/compiler/optimizer/src/main/java/edu/snu/nemo/compiler/optimizer/pass/compiletime/annotating/LargeShufflePartitionerPass.java
index eb3b2bfdb..084ab6278 100644
--- 
a/compiler/optimizer/src/main/java/edu/snu/nemo/compiler/optimizer/pass/compiletime/annotating/LargeShufflePartitionerPass.java
+++ 
b/compiler/optimizer/src/main/java/edu/snu/nemo/compiler/optimizer/pass/compiletime/annotating/LargeShufflePartitionerPass.java
@@ -47,10 +47,9 @@ public LargeShufflePartitionerPass() {
         if (edge.getPropertyValue(CommunicationPatternProperty.class).get()
             .equals(CommunicationPatternProperty.Value.Shuffle)) {
           dag.getOutgoingEdgesOf(edge.getDst())
-              .forEach(edgeFromRelay -> {
-                edgeFromRelay.setProperty(PartitionerProperty.of(
-                    
PartitionerProperty.Value.DedicatedKeyPerElementPartitioner));
-              });
+              .forEach(edgeFromRelay ->
+                  edgeFromRelay.setPropertyPermanently(PartitionerProperty.of(
+                      
PartitionerProperty.Value.DedicatedKeyPerElementPartitioner)));
         }
       });
     });
diff --git 
a/compiler/optimizer/src/main/java/edu/snu/nemo/compiler/optimizer/pass/compiletime/annotating/LargeShuffleResourceSlotPass.java
 
b/compiler/optimizer/src/main/java/edu/snu/nemo/compiler/optimizer/pass/compiletime/annotating/LargeShuffleResourceSlotPass.java
index 388f7c2da..f47462984 100644
--- 
a/compiler/optimizer/src/main/java/edu/snu/nemo/compiler/optimizer/pass/compiletime/annotating/LargeShuffleResourceSlotPass.java
+++ 
b/compiler/optimizer/src/main/java/edu/snu/nemo/compiler/optimizer/pass/compiletime/annotating/LargeShuffleResourceSlotPass.java
@@ -37,15 +37,15 @@ public LargeShuffleResourceSlotPass() {
     // On every vertex that receive push edge, if ResourceSlotProperty is not 
set, put it as false.
     // For other vertices, if ResourceSlotProperty is not set, put it as true.
     dag.getVertices().stream()
-        .filter(v -> 
!v.getExecutionProperties().containsKey(ResourceSlotProperty.class))
+        .filter(v -> 
!v.getPropertyValue(ResourceSlotProperty.class).isPresent())
         .forEach(v -> {
           if (dag.getIncomingEdgesOf(v).stream().anyMatch(
               e -> e.getPropertyValue(DataFlowProperty.class)
                   .orElseThrow(() -> new 
RuntimeException(String.format("DataFlowProperty for %s must be set",
                       e.getId()))).equals(DataFlowProperty.Value.Push))) {
-            v.getExecutionProperties().put(ResourceSlotProperty.of(false));
+            v.setPropertyPermanently(ResourceSlotProperty.of(false));
           } else {
-            v.getExecutionProperties().put(ResourceSlotProperty.of(true));
+            v.setPropertyPermanently(ResourceSlotProperty.of(true));
           }
         });
     return dag;
diff --git 
a/compiler/optimizer/src/main/java/edu/snu/nemo/compiler/optimizer/pass/compiletime/annotating/LargeSuffleCompressionPass.java
 
b/compiler/optimizer/src/main/java/edu/snu/nemo/compiler/optimizer/pass/compiletime/annotating/LargeSuffleCompressionPass.java
index abc9300da..08ede7271 100644
--- 
a/compiler/optimizer/src/main/java/edu/snu/nemo/compiler/optimizer/pass/compiletime/annotating/LargeSuffleCompressionPass.java
+++ 
b/compiler/optimizer/src/main/java/edu/snu/nemo/compiler/optimizer/pass/compiletime/annotating/LargeSuffleCompressionPass.java
@@ -44,12 +44,11 @@ public LargeSuffleCompressionPass() {
       inEdges.forEach(edge -> {
         if (edge.getPropertyValue(CommunicationPatternProperty.class).get()
             .equals(CommunicationPatternProperty.Value.Shuffle)) {
-          
edge.setProperty(CompressionProperty.of(CompressionProperty.Value.LZ4));
+          
edge.setPropertyPermanently(CompressionProperty.of(CompressionProperty.Value.LZ4));
 
           dag.getOutgoingEdgesOf(edge.getDst())
-              .forEach(edgeFromRelay -> {
-                
edgeFromRelay.setProperty(CompressionProperty.of(CompressionProperty.Value.None));
-              });
+              .forEach(edgeFromRelay ->
+                  
edgeFromRelay.setPropertyPermanently(CompressionProperty.of(CompressionProperty.Value.None)));
         }
       });
     });
diff --git 
a/compiler/optimizer/src/main/java/edu/snu/nemo/compiler/optimizer/pass/compiletime/annotating/ResourceLocalityPass.java
 
b/compiler/optimizer/src/main/java/edu/snu/nemo/compiler/optimizer/pass/compiletime/annotating/ResourceLocalityPass.java
index 4862c7e8b..53f23dfc9 100644
--- 
a/compiler/optimizer/src/main/java/edu/snu/nemo/compiler/optimizer/pass/compiletime/annotating/ResourceLocalityPass.java
+++ 
b/compiler/optimizer/src/main/java/edu/snu/nemo/compiler/optimizer/pass/compiletime/annotating/ResourceLocalityPass.java
@@ -33,8 +33,8 @@ public ResourceLocalityPass() {
   public DAG<IRVertex, IREdge> apply(final DAG<IRVertex, IREdge> dag) {
     // On every vertex, if ResourceLocalityProperty is not set, put it as true.
     dag.getVertices().stream()
-        .filter(v -> 
!v.getExecutionProperties().containsKey(ResourceLocalityProperty.class))
-        .forEach(v -> 
v.getExecutionProperties().put(ResourceLocalityProperty.of(true)));
+        .filter(v -> 
!v.getPropertyValue(ResourceLocalityProperty.class).isPresent())
+        .forEach(v -> v.setProperty(ResourceLocalityProperty.of(true)));
     return dag;
   }
 }
diff --git 
a/compiler/optimizer/src/main/java/edu/snu/nemo/compiler/optimizer/pass/compiletime/annotating/ResourceSitePass.java
 
b/compiler/optimizer/src/main/java/edu/snu/nemo/compiler/optimizer/pass/compiletime/annotating/ResourceSitePass.java
index 618982692..005138cdb 100644
--- 
a/compiler/optimizer/src/main/java/edu/snu/nemo/compiler/optimizer/pass/compiletime/annotating/ResourceSitePass.java
+++ 
b/compiler/optimizer/src/main/java/edu/snu/nemo/compiler/optimizer/pass/compiletime/annotating/ResourceSitePass.java
@@ -101,11 +101,11 @@ private static void assignNodeShares(
       if (inEdges.size() == 0) {
         // This vertex is root vertex.
         // Fall back to setting even distribution
-        
irVertex.getExecutionProperties().put(ResourceSiteProperty.of(EMPTY_MAP));
+        irVertex.setProperty(ResourceSiteProperty.of(EMPTY_MAP));
       } else if (isOneToOneEdge(inEdges)) {
         final Optional<HashMap<String, Integer>> property = 
inEdges.iterator().next().getSrc()
-            .getExecutionProperties().get(ResourceSiteProperty.class);
-        
irVertex.getExecutionProperties().put(ResourceSiteProperty.of(property.get()));
+            .getPropertyValue(ResourceSiteProperty.class);
+        irVertex.setProperty(ResourceSiteProperty.of(property.get()));
       } else {
         // This IRVertex has shuffle inEdge(s), or has multiple inEdges.
         final Map<String, Integer> parentLocationShares = new HashMap<>();
@@ -135,7 +135,7 @@ private static void assignNodeShares(
           shares.put(nodeName, shares.get(nodeName) + 1);
           remainder--;
         }
-        irVertex.getExecutionProperties().put(ResourceSiteProperty.of(shares));
+        irVertex.setProperty(ResourceSiteProperty.of(shares));
       }
     });
   }
diff --git 
a/compiler/optimizer/src/main/java/edu/snu/nemo/compiler/optimizer/pass/compiletime/annotating/ResourceSlotPass.java
 
b/compiler/optimizer/src/main/java/edu/snu/nemo/compiler/optimizer/pass/compiletime/annotating/ResourceSlotPass.java
index 966fb50e8..f9718828b 100644
--- 
a/compiler/optimizer/src/main/java/edu/snu/nemo/compiler/optimizer/pass/compiletime/annotating/ResourceSlotPass.java
+++ 
b/compiler/optimizer/src/main/java/edu/snu/nemo/compiler/optimizer/pass/compiletime/annotating/ResourceSlotPass.java
@@ -33,8 +33,8 @@ public ResourceSlotPass() {
   public DAG<IRVertex, IREdge> apply(final DAG<IRVertex, IREdge> dag) {
     // On every vertex, if ResourceSlotProperty is not set, put it as true.
     dag.getVertices().stream()
-        .filter(v -> 
!v.getExecutionProperties().containsKey(ResourceSlotProperty.class))
-        .forEach(v -> 
v.getExecutionProperties().put(ResourceSlotProperty.of(true)));
+        .filter(v -> 
!v.getPropertyValue(ResourceSlotProperty.class).isPresent())
+        .forEach(v -> v.setProperty(ResourceSlotProperty.of(true)));
     return dag;
   }
 }
diff --git 
a/compiler/optimizer/src/main/java/edu/snu/nemo/compiler/optimizer/pass/compiletime/annotating/SkewDataStorePass.java
 
b/compiler/optimizer/src/main/java/edu/snu/nemo/compiler/optimizer/pass/compiletime/annotating/SkewDataStorePass.java
index 6405c451e..8b1592b23 100644
--- 
a/compiler/optimizer/src/main/java/edu/snu/nemo/compiler/optimizer/pass/compiletime/annotating/SkewDataStorePass.java
+++ 
b/compiler/optimizer/src/main/java/edu/snu/nemo/compiler/optimizer/pass/compiletime/annotating/SkewDataStorePass.java
@@ -45,9 +45,9 @@ public SkewDataStorePass() {
         dag.getIncomingEdgesOf(v).forEach(edge -> {
           // we want it to be in the same stage
           if (edge.equals(edgeToUseMemory)) {
-            
edge.setProperty(DataStoreProperty.of(DataStoreProperty.Value.MemoryStore));
+            
edge.setPropertyPermanently(DataStoreProperty.of(DataStoreProperty.Value.MemoryStore));
           } else {
-            
edge.setProperty(DataStoreProperty.of(DataStoreProperty.Value.LocalFileStore));
+            
edge.setPropertyPermanently(DataStoreProperty.of(DataStoreProperty.Value.LocalFileStore));
           }
         });
       }
diff --git 
a/compiler/optimizer/src/main/java/edu/snu/nemo/compiler/optimizer/pass/compiletime/annotating/SkewMetricCollectionPass.java
 
b/compiler/optimizer/src/main/java/edu/snu/nemo/compiler/optimizer/pass/compiletime/annotating/SkewMetricCollectionPass.java
index fe199428a..f2bf9d354 100644
--- 
a/compiler/optimizer/src/main/java/edu/snu/nemo/compiler/optimizer/pass/compiletime/annotating/SkewMetricCollectionPass.java
+++ 
b/compiler/optimizer/src/main/java/edu/snu/nemo/compiler/optimizer/pass/compiletime/annotating/SkewMetricCollectionPass.java
@@ -46,7 +46,8 @@ public SkewMetricCollectionPass() {
           // double checking.
           if (edge.getPropertyValue(CommunicationPatternProperty.class).get()
               .equals(CommunicationPatternProperty.Value.Shuffle)) {
-            
edge.setProperty(MetricCollectionProperty.of(MetricCollectionProperty.Value.DataSkewRuntimePass));
+            edge.setPropertyPermanently(MetricCollectionProperty.of(
+                MetricCollectionProperty.Value.DataSkewRuntimePass));
           }
         });
       }
diff --git 
a/compiler/optimizer/src/main/java/edu/snu/nemo/compiler/optimizer/pass/compiletime/annotating/SkewPartitionerPass.java
 
b/compiler/optimizer/src/main/java/edu/snu/nemo/compiler/optimizer/pass/compiletime/annotating/SkewPartitionerPass.java
index 2b9b9030c..58c785f51 100644
--- 
a/compiler/optimizer/src/main/java/edu/snu/nemo/compiler/optimizer/pass/compiletime/annotating/SkewPartitionerPass.java
+++ 
b/compiler/optimizer/src/main/java/edu/snu/nemo/compiler/optimizer/pass/compiletime/annotating/SkewPartitionerPass.java
@@ -45,7 +45,7 @@ public SkewPartitionerPass() {
           // double checking.
           if (MetricCollectionProperty.Value.DataSkewRuntimePass
             
.equals(edge.getPropertyValue(MetricCollectionProperty.class).get())) {
-            
edge.setProperty(PartitionerProperty.of(PartitionerProperty.Value.DataSkewHashPartitioner));
+            
edge.setPropertyPermanently(PartitionerProperty.of(PartitionerProperty.Value.DataSkewHashPartitioner));
           }
         });
       }
diff --git 
a/compiler/optimizer/src/main/java/edu/snu/nemo/compiler/optimizer/pass/compiletime/annotating/SkewResourceSkewedDataPass.java
 
b/compiler/optimizer/src/main/java/edu/snu/nemo/compiler/optimizer/pass/compiletime/annotating/SkewResourceSkewedDataPass.java
index ab2f4be41..e2ac70fac 100644
--- 
a/compiler/optimizer/src/main/java/edu/snu/nemo/compiler/optimizer/pass/compiletime/annotating/SkewResourceSkewedDataPass.java
+++ 
b/compiler/optimizer/src/main/java/edu/snu/nemo/compiler/optimizer/pass/compiletime/annotating/SkewResourceSkewedDataPass.java
@@ -55,8 +55,8 @@ private boolean 
hasMetricCollectionBarrierVertexAsParent(final DAG<IRVertex, IRE
             .of(DynamicOptimizationProperty.Value.DataSkewRuntimePass)));
     dag.getVertices().stream()
         .filter(v -> hasMetricCollectionBarrierVertexAsParent(dag, v)
-            && 
!v.getExecutionProperties().containsKey(ResourceSkewedDataProperty.class))
-        .forEach(v -> 
v.getExecutionProperties().put(ResourceSkewedDataProperty.of(true)));
+            && 
!v.getPropertyValue(ResourceSkewedDataProperty.class).isPresent())
+        .forEach(v -> v.setProperty(ResourceSkewedDataProperty.of(true)));
 
     return dag;
   }
diff --git 
a/compiler/optimizer/src/main/java/edu/snu/nemo/compiler/optimizer/pass/compiletime/annotating/TransientResourceDataFlowPass.java
 
b/compiler/optimizer/src/main/java/edu/snu/nemo/compiler/optimizer/pass/compiletime/annotating/TransientResourceDataFlowPass.java
index 3433dbe4e..4da3bcf9f 100644
--- 
a/compiler/optimizer/src/main/java/edu/snu/nemo/compiler/optimizer/pass/compiletime/annotating/TransientResourceDataFlowPass.java
+++ 
b/compiler/optimizer/src/main/java/edu/snu/nemo/compiler/optimizer/pass/compiletime/annotating/TransientResourceDataFlowPass.java
@@ -44,9 +44,9 @@ public TransientResourceDataFlowPass() {
       if (!inEdges.isEmpty()) {
         inEdges.forEach(edge -> {
           if (fromTransientToReserved(edge)) {
-            edge.setProperty(DataFlowProperty.of(DataFlowProperty.Value.Push));
+            
edge.setPropertyPermanently(DataFlowProperty.of(DataFlowProperty.Value.Push));
           } else {
-            edge.setProperty(DataFlowProperty.of(DataFlowProperty.Value.Pull));
+            
edge.setPropertyPermanently(DataFlowProperty.of(DataFlowProperty.Value.Pull));
           }
         });
       }
diff --git 
a/compiler/optimizer/src/main/java/edu/snu/nemo/compiler/optimizer/pass/compiletime/annotating/TransientResourceDataStorePass.java
 
b/compiler/optimizer/src/main/java/edu/snu/nemo/compiler/optimizer/pass/compiletime/annotating/TransientResourceDataStorePass.java
index bcf0d0ec5..dcd913264 100644
--- 
a/compiler/optimizer/src/main/java/edu/snu/nemo/compiler/optimizer/pass/compiletime/annotating/TransientResourceDataStorePass.java
+++ 
b/compiler/optimizer/src/main/java/edu/snu/nemo/compiler/optimizer/pass/compiletime/annotating/TransientResourceDataStorePass.java
@@ -43,12 +43,12 @@ public TransientResourceDataStorePass() {
       if (!inEdges.isEmpty()) {
         inEdges.forEach(edge -> {
           if (fromTransientToReserved(edge) || fromReservedToTransient(edge)) {
-            
edge.setProperty(DataStoreProperty.of(DataStoreProperty.Value.LocalFileStore));
+            
edge.setPropertyPermanently(DataStoreProperty.of(DataStoreProperty.Value.LocalFileStore));
           } else if (CommunicationPatternProperty.Value.OneToOne
               
.equals(edge.getPropertyValue(CommunicationPatternProperty.class).get())) {
-            
edge.setProperty(DataStoreProperty.of(DataStoreProperty.Value.MemoryStore));
+            
edge.setPropertyPermanently(DataStoreProperty.of(DataStoreProperty.Value.MemoryStore));
           } else {
-            
edge.setProperty(DataStoreProperty.of(DataStoreProperty.Value.LocalFileStore));
+            
edge.setPropertyPermanently(DataStoreProperty.of(DataStoreProperty.Value.LocalFileStore));
           }
         });
       }
diff --git 
a/compiler/optimizer/src/main/java/edu/snu/nemo/compiler/optimizer/pass/compiletime/annotating/TransientResourcePriorityPass.java
 
b/compiler/optimizer/src/main/java/edu/snu/nemo/compiler/optimizer/pass/compiletime/annotating/TransientResourcePriorityPass.java
index fd1eb5d59..cdf25a204 100644
--- 
a/compiler/optimizer/src/main/java/edu/snu/nemo/compiler/optimizer/pass/compiletime/annotating/TransientResourcePriorityPass.java
+++ 
b/compiler/optimizer/src/main/java/edu/snu/nemo/compiler/optimizer/pass/compiletime/annotating/TransientResourcePriorityPass.java
@@ -40,12 +40,12 @@ public TransientResourcePriorityPass() {
     dag.topologicalDo(vertex -> {
       final List<IREdge> inEdges = dag.getIncomingEdgesOf(vertex);
       if (inEdges.isEmpty()) {
-        
vertex.setProperty(ResourcePriorityProperty.of(ResourcePriorityProperty.TRANSIENT));
+        
vertex.setPropertyPermanently(ResourcePriorityProperty.of(ResourcePriorityProperty.TRANSIENT));
       } else {
         if (hasM2M(inEdges) || allO2OFromReserved(inEdges)) {
-          
vertex.setProperty(ResourcePriorityProperty.of(ResourcePriorityProperty.RESERVED));
+          
vertex.setPropertyPermanently(ResourcePriorityProperty.of(ResourcePriorityProperty.RESERVED));
         } else {
-          
vertex.setProperty(ResourcePriorityProperty.of(ResourcePriorityProperty.TRANSIENT));
+          
vertex.setPropertyPermanently(ResourcePriorityProperty.of(ResourcePriorityProperty.TRANSIENT));
         }
       }
     });
diff --git 
a/compiler/optimizer/src/main/java/edu/snu/nemo/compiler/optimizer/policy/BasicPullPolicy.java
 
b/compiler/optimizer/src/main/java/edu/snu/nemo/compiler/optimizer/policy/BasicPullPolicy.java
index c2514690b..31999c8fc 100644
--- 
a/compiler/optimizer/src/main/java/edu/snu/nemo/compiler/optimizer/policy/BasicPullPolicy.java
+++ 
b/compiler/optimizer/src/main/java/edu/snu/nemo/compiler/optimizer/policy/BasicPullPolicy.java
@@ -27,7 +27,7 @@
  */
 public final class BasicPullPolicy implements Policy {
   public static final PolicyBuilder BUILDER =
-      new PolicyBuilder(true)
+      new PolicyBuilder()
           .registerCompileTimePass(new DefaultScheduleGroupPass());
   private final Policy policy;
 
diff --git 
a/compiler/optimizer/src/main/java/edu/snu/nemo/compiler/optimizer/policy/BasicPushPolicy.java
 
b/compiler/optimizer/src/main/java/edu/snu/nemo/compiler/optimizer/policy/BasicPushPolicy.java
index 31a5d7b4c..5e401442f 100644
--- 
a/compiler/optimizer/src/main/java/edu/snu/nemo/compiler/optimizer/policy/BasicPushPolicy.java
+++ 
b/compiler/optimizer/src/main/java/edu/snu/nemo/compiler/optimizer/policy/BasicPushPolicy.java
@@ -28,7 +28,7 @@
  */
 public final class BasicPushPolicy implements Policy {
   public static final PolicyBuilder BUILDER =
-      new PolicyBuilder(true)
+      new PolicyBuilder()
           .registerCompileTimePass(new ShuffleEdgePushPass())
           .registerCompileTimePass(new DefaultScheduleGroupPass());
   private final Policy policy;
diff --git 
a/compiler/optimizer/src/main/java/edu/snu/nemo/compiler/optimizer/policy/ConditionalLargeShufflePolicy.java
 
b/compiler/optimizer/src/main/java/edu/snu/nemo/compiler/optimizer/policy/ConditionalLargeShufflePolicy.java
index efdf14d84..807dd5747 100644
--- 
a/compiler/optimizer/src/main/java/edu/snu/nemo/compiler/optimizer/policy/ConditionalLargeShufflePolicy.java
+++ 
b/compiler/optimizer/src/main/java/edu/snu/nemo/compiler/optimizer/policy/ConditionalLargeShufflePolicy.java
@@ -31,7 +31,7 @@
  */
 public final class ConditionalLargeShufflePolicy implements Policy {
   public static final PolicyBuilder BUILDER =
-      new PolicyBuilder(false)
+      new PolicyBuilder()
           .registerCompileTimePass(new LargeShuffleCompositePass(), dag -> 
getMaxParallelism(dag) > 300)
           .registerCompileTimePass(new LoopOptimizationCompositePass())
           .registerCompileTimePass(new DefaultCompositePass());
diff --git 
a/compiler/optimizer/src/main/java/edu/snu/nemo/compiler/optimizer/policy/DataSkewPolicy.java
 
b/compiler/optimizer/src/main/java/edu/snu/nemo/compiler/optimizer/policy/DataSkewPolicy.java
index be23e6c96..9d7eb2972 100644
--- 
a/compiler/optimizer/src/main/java/edu/snu/nemo/compiler/optimizer/policy/DataSkewPolicy.java
+++ 
b/compiler/optimizer/src/main/java/edu/snu/nemo/compiler/optimizer/policy/DataSkewPolicy.java
@@ -30,7 +30,7 @@
  */
 public final class DataSkewPolicy implements Policy {
   public static final PolicyBuilder BUILDER =
-      new PolicyBuilder(true)
+      new PolicyBuilder()
           .registerRuntimePass(new 
DataSkewRuntimePass().setNumSkewedKeys(DataSkewRuntimePass.DEFAULT_NUM_SKEWED_KEYS),
               new SkewCompositePass())
           .registerCompileTimePass(new LoopOptimizationCompositePass())
@@ -45,7 +45,7 @@ public DataSkewPolicy() {
   }
 
   public DataSkewPolicy(final int skewness) {
-    this.policy = new PolicyBuilder(true)
+    this.policy = new PolicyBuilder()
         .registerRuntimePass(new 
DataSkewRuntimePass().setNumSkewedKeys(skewness), new SkewCompositePass())
         .registerCompileTimePass(new LoopOptimizationCompositePass())
         .registerCompileTimePass(new DefaultCompositePass())
diff --git 
a/compiler/optimizer/src/main/java/edu/snu/nemo/compiler/optimizer/policy/DefaultPolicy.java
 
b/compiler/optimizer/src/main/java/edu/snu/nemo/compiler/optimizer/policy/DefaultPolicy.java
index 986aec5e0..a84ac3557 100644
--- 
a/compiler/optimizer/src/main/java/edu/snu/nemo/compiler/optimizer/policy/DefaultPolicy.java
+++ 
b/compiler/optimizer/src/main/java/edu/snu/nemo/compiler/optimizer/policy/DefaultPolicy.java
@@ -27,7 +27,7 @@
  */
 public final class DefaultPolicy implements Policy {
   public static final PolicyBuilder BUILDER =
-      new PolicyBuilder(true)
+      new PolicyBuilder()
           .registerCompileTimePass(new DefaultCompositePass());
   private final Policy policy;
 
diff --git 
a/compiler/optimizer/src/main/java/edu/snu/nemo/compiler/optimizer/policy/DefaultPolicyWithSeparatePass.java
 
b/compiler/optimizer/src/main/java/edu/snu/nemo/compiler/optimizer/policy/DefaultPolicyWithSeparatePass.java
index c832d1fcb..39b658b2e 100644
--- 
a/compiler/optimizer/src/main/java/edu/snu/nemo/compiler/optimizer/policy/DefaultPolicyWithSeparatePass.java
+++ 
b/compiler/optimizer/src/main/java/edu/snu/nemo/compiler/optimizer/policy/DefaultPolicyWithSeparatePass.java
@@ -34,7 +34,7 @@
  */
 public final class DefaultPolicyWithSeparatePass implements Policy {
   public static final PolicyBuilder BUILDER =
-      new PolicyBuilder(true)
+      new PolicyBuilder()
           .registerCompileTimePass(new DefaultParallelismPass())
           .registerCompileTimePass(new RefactoredPass());
   private final Policy policy;
diff --git 
a/compiler/optimizer/src/main/java/edu/snu/nemo/compiler/optimizer/policy/DisaggregationPolicy.java
 
b/compiler/optimizer/src/main/java/edu/snu/nemo/compiler/optimizer/policy/DisaggregationPolicy.java
index 667b6705d..0d934ca74 100644
--- 
a/compiler/optimizer/src/main/java/edu/snu/nemo/compiler/optimizer/policy/DisaggregationPolicy.java
+++ 
b/compiler/optimizer/src/main/java/edu/snu/nemo/compiler/optimizer/policy/DisaggregationPolicy.java
@@ -29,10 +29,10 @@
  */
 public final class DisaggregationPolicy implements Policy {
   public static final PolicyBuilder BUILDER =
-      new PolicyBuilder(false)
+      new PolicyBuilder()
+          .registerCompileTimePass(new DisaggregationEdgeDataStorePass())
           .registerCompileTimePass(new LoopOptimizationCompositePass())
-          .registerCompileTimePass(new DefaultCompositePass())
-          .registerCompileTimePass(new DisaggregationEdgeDataStorePass());
+          .registerCompileTimePass(new DefaultCompositePass());
   private final Policy policy;
 
   /**
diff --git 
a/compiler/optimizer/src/main/java/edu/snu/nemo/compiler/optimizer/policy/LargeShufflePolicy.java
 
b/compiler/optimizer/src/main/java/edu/snu/nemo/compiler/optimizer/policy/LargeShufflePolicy.java
index 018c403ab..57423b813 100644
--- 
a/compiler/optimizer/src/main/java/edu/snu/nemo/compiler/optimizer/policy/LargeShufflePolicy.java
+++ 
b/compiler/optimizer/src/main/java/edu/snu/nemo/compiler/optimizer/policy/LargeShufflePolicy.java
@@ -29,10 +29,10 @@
  */
 public final class LargeShufflePolicy implements Policy {
   public static final PolicyBuilder BUILDER =
-      new PolicyBuilder(false)
-        .registerCompileTimePass(new LargeShuffleCompositePass())
-        .registerCompileTimePass(new LoopOptimizationCompositePass())
-        .registerCompileTimePass(new DefaultCompositePass());
+      new PolicyBuilder()
+          .registerCompileTimePass(new LargeShuffleCompositePass())
+          .registerCompileTimePass(new LoopOptimizationCompositePass())
+          .registerCompileTimePass(new DefaultCompositePass());
   private final Policy policy;
 
   /**
diff --git 
a/compiler/optimizer/src/main/java/edu/snu/nemo/compiler/optimizer/policy/PolicyBuilder.java
 
b/compiler/optimizer/src/main/java/edu/snu/nemo/compiler/optimizer/policy/PolicyBuilder.java
index bf939b93e..13a250726 100644
--- 
a/compiler/optimizer/src/main/java/edu/snu/nemo/compiler/optimizer/policy/PolicyBuilder.java
+++ 
b/compiler/optimizer/src/main/java/edu/snu/nemo/compiler/optimizer/policy/PolicyBuilder.java
@@ -40,27 +40,15 @@
 public final class PolicyBuilder {
   private final List<CompileTimePass> compileTimePasses;
   private final List<RuntimePass<?>> runtimePasses;
-  private final Set<Class<? extends ExecutionProperty>> 
finalizedExecutionProperties;
   private final Set<Class<? extends ExecutionProperty>> 
annotatedExecutionProperties;
-  private final Boolean strictPrerequisiteCheckMode;
 
   /**
    * Default constructor.
    */
   public PolicyBuilder() {
-    this(false);
-  }
-
-  /**
-   * Constructor.
-   * @param strictPrerequisiteCheckMode whether to use strict prerequisite 
check mode or not.
-   */
-  public PolicyBuilder(final Boolean strictPrerequisiteCheckMode) {
     this.compileTimePasses = new ArrayList<>();
     this.runtimePasses = new ArrayList<>();
-    this.finalizedExecutionProperties = new HashSet<>();
     this.annotatedExecutionProperties = new HashSet<>();
-    this.strictPrerequisiteCheckMode = strictPrerequisiteCheckMode;
     // DataCommunicationPattern is already set when creating the IREdge itself.
     annotatedExecutionProperties.add(CommunicationPatternProperty.class);
     // Some default values are already annotated.
@@ -94,13 +82,7 @@ public PolicyBuilder registerCompileTimePass(final 
CompileTimePass compileTimePa
     if (compileTimePass instanceof AnnotatingPass) {
       final AnnotatingPass annotatingPass = (AnnotatingPass) compileTimePass;
       
this.annotatedExecutionProperties.add(annotatingPass.getExecutionPropertyToModify());
-      if (strictPrerequisiteCheckMode
-          && 
finalizedExecutionProperties.contains(annotatingPass.getExecutionPropertyToModify()))
 {
-        throw new 
CompileTimeOptimizationException(annotatingPass.getExecutionPropertyToModify()
-            + " should have already been finalized.");
-      }
     }
-    
finalizedExecutionProperties.addAll(compileTimePass.getPrerequisiteExecutionProperties());
 
     this.compileTimePasses.add(compileTimePass);
 
diff --git 
a/compiler/optimizer/src/main/java/edu/snu/nemo/compiler/optimizer/policy/TransientResourcePolicy.java
 
b/compiler/optimizer/src/main/java/edu/snu/nemo/compiler/optimizer/policy/TransientResourcePolicy.java
index caa29d99e..4528f7070 100644
--- 
a/compiler/optimizer/src/main/java/edu/snu/nemo/compiler/optimizer/policy/TransientResourcePolicy.java
+++ 
b/compiler/optimizer/src/main/java/edu/snu/nemo/compiler/optimizer/policy/TransientResourcePolicy.java
@@ -29,7 +29,7 @@
  */
 public final class TransientResourcePolicy implements Policy {
   public static final PolicyBuilder BUILDER =
-      new PolicyBuilder(true)
+      new PolicyBuilder()
           .registerCompileTimePass(new TransientResourceCompositePass())
           .registerCompileTimePass(new LoopOptimizationCompositePass())
           .registerCompileTimePass(new DefaultCompositePass());
diff --git 
a/compiler/optimizer/src/test/java/edu/snu/nemo/compiler/optimizer/policy/PolicyImplTest.java
 
b/compiler/optimizer/src/test/java/edu/snu/nemo/compiler/optimizer/policy/PolicyImplTest.java
new file mode 100644
index 000000000..2b2c7b782
--- /dev/null
+++ 
b/compiler/optimizer/src/test/java/edu/snu/nemo/compiler/optimizer/policy/PolicyImplTest.java
@@ -0,0 +1,115 @@
+/*
+ * Copyright (C) 2018 Seoul National University
+ *
+ * Licensed under the Apache License, Version 2.0 (the "License");
+ * you may not use this file except in compliance with the License.
+ * You may obtain a copy of the License at
+ *
+ *         http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package edu.snu.nemo.compiler.optimizer.policy;
+
+import edu.snu.nemo.common.dag.DAG;
+import edu.snu.nemo.common.exception.CompileTimeOptimizationException;
+import edu.snu.nemo.common.test.EmptyComponents;
+import edu.snu.nemo.compiler.optimizer.pass.compiletime.CompileTimePass;
+import edu.snu.nemo.runtime.common.optimizer.pass.runtime.RuntimePass;
+import org.junit.Before;
+import org.junit.Rule;
+import org.junit.Test;
+import org.junit.rules.ExpectedException;
+
+import java.util.ArrayList;
+import java.util.List;
+
+public final class PolicyImplTest {
+  private DAG dag;
+
+  @Before
+  public void setUp() {
+    this.dag = EmptyComponents.buildEmptyDAG();
+  }
+
+  @Rule
+  public final ExpectedException expectedException = ExpectedException.none();
+
+  @Test
+  public void testTransientPolicy() throws Exception {
+    // this should run without an exception.
+    TransientResourcePolicy.BUILDER.build().runCompileTimeOptimization(dag, 
DAG.EMPTY_DAG_DIRECTORY);
+  }
+
+  @Test
+  public void testDisaggregationPolicy() throws Exception {
+    // this should run without an exception.
+    DisaggregationPolicy.BUILDER.build().runCompileTimeOptimization(dag, 
DAG.EMPTY_DAG_DIRECTORY);
+  }
+
+  @Test
+  public void testDataSkewPolicy() throws Exception {
+    // this should run without an exception.
+    DataSkewPolicy.BUILDER.build().runCompileTimeOptimization(dag, 
DAG.EMPTY_DAG_DIRECTORY);
+  }
+
+  @Test
+  public void testLargeShufflePolicy() throws Exception {
+    // this should run without an exception.
+    LargeShufflePolicy.BUILDER.build().runCompileTimeOptimization(dag, 
DAG.EMPTY_DAG_DIRECTORY);
+  }
+
+  @Test
+  public void testTransientAndLargeShuffleCombination() throws Exception {
+    final List<CompileTimePass> compileTimePasses = new ArrayList<>();
+    final List<RuntimePass<?>> runtimePasses = new ArrayList<>();
+    
compileTimePasses.addAll(TransientResourcePolicy.BUILDER.getCompileTimePasses());
+    runtimePasses.addAll(TransientResourcePolicy.BUILDER.getRuntimePasses());
+    
compileTimePasses.addAll(LargeShufflePolicy.BUILDER.getCompileTimePasses());
+    runtimePasses.addAll(LargeShufflePolicy.BUILDER.getRuntimePasses());
+
+    final Policy combinedPolicy = new PolicyImpl(compileTimePasses, 
runtimePasses);
+
+    // This should NOT throw an exception and work well together.
+    combinedPolicy.runCompileTimeOptimization(dag, DAG.EMPTY_DAG_DIRECTORY);
+  }
+
+  @Test
+  public void testTransientAndDisaggregationCombination() throws Exception {
+    final List<CompileTimePass> compileTimePasses = new ArrayList<>();
+    final List<RuntimePass<?>> runtimePasses = new ArrayList<>();
+    
compileTimePasses.addAll(TransientResourcePolicy.BUILDER.getCompileTimePasses());
+    runtimePasses.addAll(TransientResourcePolicy.BUILDER.getRuntimePasses());
+    
compileTimePasses.addAll(DisaggregationPolicy.BUILDER.getCompileTimePasses());
+    runtimePasses.addAll(DisaggregationPolicy.BUILDER.getRuntimePasses());
+
+    final Policy combinedPolicy = new PolicyImpl(compileTimePasses, 
runtimePasses);
+
+    // This should throw an exception.
+    // Not all data store should be transferred from and to the GFS.
+    expectedException.expect(CompileTimeOptimizationException.class);
+    combinedPolicy.runCompileTimeOptimization(dag, DAG.EMPTY_DAG_DIRECTORY);
+  }
+
+  @Test
+  public void testDataSkewAndLargeShuffleCombination() throws Exception {
+    final List<CompileTimePass> compileTimePasses = new ArrayList<>();
+    final List<RuntimePass<?>> runtimePasses = new ArrayList<>();
+    compileTimePasses.addAll(DataSkewPolicy.BUILDER.getCompileTimePasses());
+    runtimePasses.addAll(DataSkewPolicy.BUILDER.getRuntimePasses());
+    
compileTimePasses.addAll(LargeShufflePolicy.BUILDER.getCompileTimePasses());
+    runtimePasses.addAll(LargeShufflePolicy.BUILDER.getRuntimePasses());
+
+    final Policy combinedPolicy = new PolicyImpl(compileTimePasses, 
runtimePasses);
+
+    // This should throw an exception.
+    // DataSizeMetricCollection is not compatible with Push (All data have to 
be stored before the data collection).
+    expectedException.expect(CompileTimeOptimizationException.class);
+    combinedPolicy.runCompileTimeOptimization(dag, DAG.EMPTY_DAG_DIRECTORY);
+  }
+}
diff --git 
a/compiler/test/src/test/java/edu/snu/nemo/compiler/optimizer/pass/compiletime/annotating/DefaultScheduleGroupPassTest.java
 
b/compiler/test/src/test/java/edu/snu/nemo/compiler/optimizer/pass/compiletime/annotating/DefaultScheduleGroupPassTest.java
index bf348e04b..70b327580 100644
--- 
a/compiler/test/src/test/java/edu/snu/nemo/compiler/optimizer/pass/compiletime/annotating/DefaultScheduleGroupPassTest.java
+++ 
b/compiler/test/src/test/java/edu/snu/nemo/compiler/optimizer/pass/compiletime/annotating/DefaultScheduleGroupPassTest.java
@@ -102,7 +102,7 @@ public void testTopologicalOrdering() throws Exception {
       dagBuilder.addVertex(vertex);
     }
     for (final IREdge edge : Arrays.asList(e0, e1, e2, e3, e4)) {
-      edge.getExecutionProperties().put(DataFlowProperty.of(dataFlowModel));
+      edge.setProperty(DataFlowProperty.of(dataFlowModel));
       dagBuilder.connectVertices(edge);
     }
     return Pair.of(dagBuilder.buildWithoutSourceSinkCheck(), vertices);
@@ -142,7 +142,7 @@ public void testTopologicalOrdering() throws Exception {
       dagBuilder.addVertex(vertex);
     }
     for (final IREdge edge : Arrays.asList(e0, e1, e2, e3, e4)) {
-      edge.getExecutionProperties().put(DataFlowProperty.of(dataFlowModel));
+      edge.setProperty(DataFlowProperty.of(dataFlowModel));
       dagBuilder.connectVertices(edge);
     }
     return Pair.of(dagBuilder.buildWithoutSourceSinkCheck(), vertices);
@@ -275,7 +275,7 @@ public void testJoinWithSinglePush() {
     final Pair<DAG<IRVertex, IREdge>, List<IRVertex>> dag
         = generateJoinDAG(CommunicationPatternProperty.Value.OneToOne, 
DataFlowProperty.Value.Push);
     dag.left().getOutgoingEdgesOf(dag.right().get(1)).iterator().next()
-        
.getExecutionProperties().put(DataFlowProperty.of(DataFlowProperty.Value.Pull));
+        .setProperty(DataFlowProperty.of(DataFlowProperty.Value.Pull));
     pass.apply(dag.left());
     final int idxForFirstScheduleGroup = getScheduleGroup(dag.right().get(0));
     final int idxForSecondScheduleGroup = getScheduleGroup(dag.right().get(2));
diff --git 
a/examples/spark/src/test/java/edu/snu/nemo/examples/spark/MRJava.java 
b/examples/spark/src/test/java/edu/snu/nemo/examples/spark/MRJava.java
index 31ac261c8..618b89ac8 100644
--- a/examples/spark/src/test/java/edu/snu/nemo/examples/spark/MRJava.java
+++ b/examples/spark/src/test/java/edu/snu/nemo/examples/spark/MRJava.java
@@ -70,7 +70,7 @@ public void testSparkWordCount() throws Exception {
   @Test(timeout = TIMEOUT)
   public void testSparkWordAndLineCount() throws Exception {
     final String inputFileName = "test_input_wordcount_spark";
-    final String outputFileName = "test_output_wordcount_spark";
+    final String outputFileName = "test_output_word_and_line_count";
     final String expectedOutputFilename = 
"expected_output_word_and_line_count";
     final String inputFilePath = fileBasePath + inputFileName;
     final String outputFilePath = fileBasePath + outputFileName;
diff --git 
a/runtime/common/src/main/java/edu/snu/nemo/runtime/common/plan/PhysicalPlanGenerator.java
 
b/runtime/common/src/main/java/edu/snu/nemo/runtime/common/plan/PhysicalPlanGenerator.java
index 364a9383b..ca4124bad 100644
--- 
a/runtime/common/src/main/java/edu/snu/nemo/runtime/common/plan/PhysicalPlanGenerator.java
+++ 
b/runtime/common/src/main/java/edu/snu/nemo/runtime/common/plan/PhysicalPlanGenerator.java
@@ -299,8 +299,7 @@ private void splitScheduleGroupByPullStageEdges(final 
DAG<Stage, StageEdge> dag)
     dag.topologicalDo(stage -> {
       final int scheduleGroup = stageToScheduleGroupMap.get(stage);
       
stage.getExecutionProperties().put(ScheduleGroupProperty.of(scheduleGroup));
-      stage.getIRDAG().topologicalDo(vertex -> vertex.getExecutionProperties()
-          .put(ScheduleGroupProperty.of(scheduleGroup)));
+      stage.getIRDAG().topologicalDo(vertex -> 
vertex.setProperty(ScheduleGroupProperty.of(scheduleGroup)));
     });
   }
 
diff --git 
a/runtime/common/src/main/java/edu/snu/nemo/runtime/common/plan/StagePartitioner.java
 
b/runtime/common/src/main/java/edu/snu/nemo/runtime/common/plan/StagePartitioner.java
index d6926e49d..abe0a8f01 100644
--- 
a/runtime/common/src/main/java/edu/snu/nemo/runtime/common/plan/StagePartitioner.java
+++ 
b/runtime/common/src/main/java/edu/snu/nemo/runtime/common/plan/StagePartitioner.java
@@ -29,7 +29,6 @@
 import java.util.concurrent.ConcurrentHashMap;
 import java.util.function.Function;
 import java.util.stream.Collectors;
-import java.util.stream.Stream;
 
 /**
  * A function that is responsible for stage partitioning on IR DAG.
@@ -119,7 +118,8 @@ private boolean testMergability(final IREdge edge, final 
DAG<IRVertex, IREdge> d
    * @return set of stage-level properties for the stage
    */
   public Set<VertexExecutionProperty> getStageProperties(final IRVertex 
vertex) {
-    final Stream<VertexExecutionProperty> stream = 
vertex.getExecutionProperties().stream();
-    return stream.filter(p -> 
!ignoredPropertyKeys.contains(p.getClass())).collect(Collectors.toSet());
+    return vertex.getExecutionProperties().stream()
+        .filter(p -> !ignoredPropertyKeys.contains(p.getClass()))
+        .collect(Collectors.toSet());
   }
 }
diff --git 
a/runtime/common/src/test/java/edu/snu/nemo/runtime/common/plan/PhysicalPlanGeneratorTest.java
 
b/runtime/common/src/test/java/edu/snu/nemo/runtime/common/plan/PhysicalPlanGeneratorTest.java
index 8f3c957a3..2c4619442 100644
--- 
a/runtime/common/src/test/java/edu/snu/nemo/runtime/common/plan/PhysicalPlanGeneratorTest.java
+++ 
b/runtime/common/src/test/java/edu/snu/nemo/runtime/common/plan/PhysicalPlanGeneratorTest.java
@@ -66,8 +66,8 @@ public void testSplitScheduleGroupByPullStageEdges() throws 
Exception {
 
   private static final IRVertex newIRVertex(final int scheduleGroup, final int 
parallelism) {
     final IRVertex irVertex = new OperatorVertex(EMPTY_TRANSFORM);
-    
irVertex.getExecutionProperties().put(ScheduleGroupProperty.of(scheduleGroup));
-    irVertex.getExecutionProperties().put(ParallelismProperty.of(parallelism));
+    irVertex.setProperty(ScheduleGroupProperty.of(scheduleGroup));
+    irVertex.setProperty(ParallelismProperty.of(parallelism));
     return irVertex;
   }
 
@@ -75,7 +75,7 @@ private static final IREdge newIREdge(final IRVertex src, 
final IRVertex dst,
                                         final 
CommunicationPatternProperty.Value communicationPattern,
                                         final DataFlowProperty.Value 
dataFlowModel) {
     final IREdge irEdge = new IREdge(communicationPattern, src, dst);
-    irEdge.getExecutionProperties().put(DataFlowProperty.of(dataFlowModel));
+    irEdge.setProperty(DataFlowProperty.of(dataFlowModel));
     return irEdge;
   }
 }
diff --git 
a/runtime/common/src/test/java/edu/snu/nemo/runtime/common/plan/StagePartitionerTest.java
 
b/runtime/common/src/test/java/edu/snu/nemo/runtime/common/plan/StagePartitionerTest.java
index 5c26e4506..d7eedcdd4 100644
--- 
a/runtime/common/src/test/java/edu/snu/nemo/runtime/common/plan/StagePartitionerTest.java
+++ 
b/runtime/common/src/test/java/edu/snu/nemo/runtime/common/plan/StagePartitionerTest.java
@@ -58,9 +58,9 @@ public void setup() throws InjectionException {
   private static IRVertex newVertex(final int parallelism, final int 
scheduleGroup,
                                     final List<VertexExecutionProperty> 
otherProperties) {
     final IRVertex vertex = new OperatorVertex(EMPTY_TRANSFORM);
-    vertex.getExecutionProperties().put(ParallelismProperty.of(parallelism));
-    
vertex.getExecutionProperties().put(ScheduleGroupProperty.of(scheduleGroup));
-    otherProperties.forEach(property -> 
vertex.getExecutionProperties().put(property));
+    vertex.setProperty(ParallelismProperty.of(parallelism));
+    vertex.setProperty(ScheduleGroupProperty.of(scheduleGroup));
+    otherProperties.forEach(property -> vertex.setProperty(property));
     return vertex;
   }
 
diff --git 
a/runtime/executor/src/test/java/edu/snu/nemo/runtime/executor/datatransfer/DataTransferTest.java
 
b/runtime/executor/src/test/java/edu/snu/nemo/runtime/executor/datatransfer/DataTransferTest.java
index f309e49c0..c755543af 100644
--- 
a/runtime/executor/src/test/java/edu/snu/nemo/runtime/executor/datatransfer/DataTransferTest.java
+++ 
b/runtime/executor/src/test/java/edu/snu/nemo/runtime/executor/datatransfer/DataTransferTest.java
@@ -481,13 +481,11 @@ private void writeAndReadWithDuplicateData(final 
BlockManagerWorker sender,
 
     // Src setup
     final SourceVertex srcVertex = new 
EmptyComponents.EmptySourceVertex("Source");
-    final ExecutionPropertyMap srcVertexProperties = 
srcVertex.getExecutionProperties();
-    srcVertexProperties.put(ParallelismProperty.of(PARALLELISM_TEN));
+    srcVertex.setProperty(ParallelismProperty.of(PARALLELISM_TEN));
 
     // Dst setup
     final SourceVertex dstVertex = new 
EmptyComponents.EmptySourceVertex("Destination");
-    final ExecutionPropertyMap dstVertexProperties = 
dstVertex.getExecutionProperties();
-    dstVertexProperties.put(ParallelismProperty.of(PARALLELISM_TEN));
+    dstVertex.setProperty(ParallelismProperty.of(PARALLELISM_TEN));
 
     return Pair.of(srcVertex, dstVertex);
   }
@@ -503,13 +501,11 @@ private void writeAndReadWithDuplicateData(final 
BlockManagerWorker sender,
 
     // Src setup
     final SourceVertex srcVertex = new 
EmptyComponents.EmptySourceVertex("Source");
-    final ExecutionPropertyMap srcVertexProperties = 
srcVertex.getExecutionProperties();
-    srcVertexProperties.put(ParallelismProperty.of(PARALLELISM_TEN));
+    srcVertex.setProperty(ParallelismProperty.of(PARALLELISM_TEN));
 
     // Dst setup
     final SourceVertex dstVertex = new 
EmptyComponents.EmptySourceVertex("Destination");
-    final ExecutionPropertyMap dstVertexProperties = 
dstVertex.getExecutionProperties();
-    dstVertexProperties.put(ParallelismProperty.of(PARALLELISM_TEN));
+    dstVertex.setProperty(ParallelismProperty.of(PARALLELISM_TEN));
 
     return Pair.of(srcVertex, dstVertex);
   }


 

----------------------------------------------------------------
This is an automated message from the Apache Git Service.
To respond to the message, please log on GitHub and use the
URL above to go to the specific comment.
 
For queries about this service, please contact Infrastructure at:
[email protected]


With regards,
Apache Git Services

Reply via email to