This is an automated email from the ASF dual-hosted git repository.
johnyangk pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/incubator-nemo.git
The following commit(s) were added to refs/heads/master by this push:
new d5df47c [NEMO-168] Fine-grained Execution Property Check (#101)
d5df47c is described below
commit d5df47c11eadd7f20d4223ca411682809b88917c
Author: Won Wook SONG <[email protected]>
AuthorDate: Fri Aug 10 17:57:40 2018 +0900
[NEMO-168] Fine-grained Execution Property Check (#101)
JIRA: [NEMO-168: Fine-grained Execution Property
Check](https://issues.apache.org/jira/projects/NEMO/issues/NEMO-168)
**Major changes:**
- Implements setPropertyPermanently method, to be able to permanently set
execution properties.
- Implements finalization of execution properties in the execution property
map.
- Removed 'strict checking' mode from PolicyBuilder, as it is no longer
necessary.
**Minor changes to note:**
- Fixes overused RuntimeExceptions into appropriate exceptions.
- Type specifications where it was neglected.
- Replaced calling getExecutionProperty then manually doing something into
getPropertyValue and setProperty methods.
**Tests for the changes:**
- I've added a buildEmptyDAG method to be able to create a mock DAG that
can be used throughout the tests.
- I've also added unit tests to test the methods newly implemented in the
execution property map.
- I've added appropriate unit tests that expect particular outputs from
various policies.
**Other comments:**
- N/A
resolves
[NEMO-168](https://issues.apache.org/jira/projects/NEMO/issues/NEMO-168)
---
.../java/edu/snu/nemo/common/dag/DAGBuilder.java | 11 +-
.../java/edu/snu/nemo/common/ir/edge/IREdge.java | 18 +++-
.../ir/executionproperty/ExecutionPropertyMap.java | 40 +++++--
.../edu/snu/nemo/common/ir/vertex/IRVertex.java | 17 ++-
.../edu/snu/nemo/common/test/EmptyComponents.java | 28 +++++
.../ExecutionPropertyMapTest.java | 19 ++++
.../annotating/DefaultDataStorePass.java | 2 +-
.../annotating/DefaultScheduleGroupPass.java | 2 +-
.../DisaggregationEdgeDataStorePass.java | 5 +-
.../annotating/LargeShuffleDataFlowPass.java | 4 +-
.../LargeShuffleDataPersistencePass.java | 2 +-
.../annotating/LargeShuffleDataStorePass.java | 5 +-
.../annotating/LargeShuffleDecoderPass.java | 2 +-
.../annotating/LargeShuffleDecompressionPass.java | 7 +-
.../annotating/LargeShuffleEncoderPass.java | 5 +-
.../annotating/LargeShufflePartitionerPass.java | 7 +-
.../annotating/LargeShuffleResourceSlotPass.java | 6 +-
.../annotating/LargeSuffleCompressionPass.java | 7 +-
.../annotating/ResourceLocalityPass.java | 4 +-
.../compiletime/annotating/ResourceSitePass.java | 8 +-
.../compiletime/annotating/ResourceSlotPass.java | 4 +-
.../compiletime/annotating/SkewDataStorePass.java | 4 +-
.../annotating/SkewMetricCollectionPass.java | 3 +-
.../annotating/SkewPartitionerPass.java | 2 +-
.../annotating/SkewResourceSkewedDataPass.java | 4 +-
.../annotating/TransientResourceDataFlowPass.java | 4 +-
.../annotating/TransientResourceDataStorePass.java | 6 +-
.../annotating/TransientResourcePriorityPass.java | 6 +-
.../compiler/optimizer/policy/BasicPullPolicy.java | 2 +-
.../compiler/optimizer/policy/BasicPushPolicy.java | 2 +-
.../policy/ConditionalLargeShufflePolicy.java | 2 +-
.../compiler/optimizer/policy/DataSkewPolicy.java | 4 +-
.../compiler/optimizer/policy/DefaultPolicy.java | 2 +-
.../policy/DefaultPolicyWithSeparatePass.java | 2 +-
.../optimizer/policy/DisaggregationPolicy.java | 6 +-
.../optimizer/policy/LargeShufflePolicy.java | 8 +-
.../compiler/optimizer/policy/PolicyBuilder.java | 18 ----
.../optimizer/policy/TransientResourcePolicy.java | 2 +-
.../compiler/optimizer/policy/PolicyImplTest.java | 115 +++++++++++++++++++++
.../annotating/DefaultScheduleGroupPassTest.java | 6 +-
.../java/edu/snu/nemo/examples/spark/MRJava.java | 2 +-
.../runtime/common/plan/PhysicalPlanGenerator.java | 3 +-
.../nemo/runtime/common/plan/StagePartitioner.java | 6 +-
.../common/plan/PhysicalPlanGeneratorTest.java | 6 +-
.../runtime/common/plan/StagePartitionerTest.java | 6 +-
.../executor/datatransfer/DataTransferTest.java | 12 +--
46 files changed, 308 insertions(+), 128 deletions(-)
diff --git a/common/src/main/java/edu/snu/nemo/common/dag/DAGBuilder.java
b/common/src/main/java/edu/snu/nemo/common/dag/DAGBuilder.java
index ffdd468..295b56c 100644
--- a/common/src/main/java/edu/snu/nemo/common/dag/DAGBuilder.java
+++ b/common/src/main/java/edu/snu/nemo/common/dag/DAGBuilder.java
@@ -15,6 +15,7 @@
*/
package edu.snu.nemo.common.dag;
+import edu.snu.nemo.common.exception.CompileTimeOptimizationException;
import edu.snu.nemo.common.ir.edge.IREdge;
import edu.snu.nemo.common.ir.edge.executionproperty.DataFlowProperty;
import edu.snu.nemo.common.ir.edge.executionproperty.MetricCollectionProperty;
@@ -193,7 +194,7 @@ public final class DAGBuilder<V extends Vertex, E extends
Edge<V>> implements Se
stack.push(vertex);
// When we encounter a vertex that we've already gone through, then there
is a cycle.
if
(outgoingEdges.get(vertex).stream().map(Edge::getDst).anyMatch(stack::contains))
{
- throw new RuntimeException("DAG contains a cycle");
+ throw new CompileTimeOptimizationException("DAG contains a cycle");
} else {
outgoingEdges.get(vertex).stream().map(Edge::getDst)
.filter(v -> !visited.contains(v))
@@ -215,7 +216,7 @@ public final class DAGBuilder<V extends Vertex, E extends
Edge<V>> implements Se
.filter(v -> !(v instanceof SourceVertex))
.map(V::getId)
.collect(Collectors.toList()).toString();
- throw new RuntimeException("DAG source check failed while building DAG.
" + problematicVertices);
+ throw new CompileTimeOptimizationException("DAG source check failed
while building DAG. " + problematicVertices);
}
}
@@ -232,7 +233,7 @@ public final class DAGBuilder<V extends Vertex, E extends
Edge<V>> implements Se
final String problematicVertices = verticesToObserve.get().filter(v ->
!(v instanceof OperatorVertex || v instanceof LoopVertex))
.map(V::getId).collect(Collectors.toList()).toString();
- throw new RuntimeException("DAG sink check failed while building DAG: "
+ problematicVertices);
+ throw new CompileTimeOptimizationException("DAG sink check failed while
building DAG: " + problematicVertices);
}
}
@@ -245,7 +246,7 @@ public final class DAGBuilder<V extends Vertex, E extends
Edge<V>> implements Se
.filter(e -> Boolean.TRUE.equals(e.isSideInput()))
.filter(e ->
DataFlowProperty.Value.Push.equals(e.getPropertyValue(DataFlowProperty.class).get()))
.forEach(e -> {
- throw new RuntimeException("DAG execution property check: "
+ throw new CompileTimeOptimizationException("DAG execution property
check: "
+ "SideInput edge is not compatible with push" + e.getId());
}));
// DataSizeMetricCollection is not compatible with Push (All data have to
be stored before the data collection)
@@ -254,7 +255,7 @@ public final class DAGBuilder<V extends Vertex, E extends
Edge<V>> implements Se
.equals(e.getPropertyValue(MetricCollectionProperty.class)))
.filter(e ->
DataFlowProperty.Value.Push.equals(e.getPropertyValue(DataFlowProperty.class).get()))
.forEach(e -> {
- throw new RuntimeException("DAG execution property check: "
+ throw new CompileTimeOptimizationException("DAG execution property
check: "
+ "DataSizeMetricCollection edge is not compatible with push" +
e.getId());
}));
}
diff --git a/common/src/main/java/edu/snu/nemo/common/ir/edge/IREdge.java
b/common/src/main/java/edu/snu/nemo/common/ir/edge/IREdge.java
index 080fc4b..d2c75e3 100644
--- a/common/src/main/java/edu/snu/nemo/common/ir/edge/IREdge.java
+++ b/common/src/main/java/edu/snu/nemo/common/ir/edge/IREdge.java
@@ -26,7 +26,6 @@ import org.apache.commons.lang3.builder.HashCodeBuilder;
import java.io.Serializable;
import java.util.Optional;
-import java.util.function.Consumer;
/**
* Physical execution plan of intermediate data movement.
@@ -68,12 +67,21 @@ public final class IREdge extends Edge<IRVertex> {
/**
* Set an executionProperty of the IREdge.
- *
* @param executionProperty the execution property.
* @return the IREdge with the execution property set.
*/
public IREdge setProperty(final EdgeExecutionProperty<?> executionProperty) {
- executionProperties.put(executionProperty);
+ executionProperties.put(executionProperty, false);
+ return this;
+ }
+
+ /**
+ * Set an executionProperty of the IREdge, permanently.
+ * @param executionProperty the execution property.
+ * @return the IREdge with the execution property set.
+ */
+ public IREdge setPropertyPermanently(final EdgeExecutionProperty<?>
executionProperty) {
+ executionProperties.put(executionProperty, true);
return this;
}
@@ -92,7 +100,7 @@ public final class IREdge extends Edge<IRVertex> {
/**
* @return the ExecutionPropertyMap of the IREdge.
*/
- public ExecutionPropertyMap getExecutionProperties() {
+ public ExecutionPropertyMap<EdgeExecutionProperty> getExecutionProperties() {
return executionProperties;
}
@@ -117,7 +125,7 @@ public final class IREdge extends Edge<IRVertex> {
* @param thatEdge the edge to copy executionProperties to.
*/
public void copyExecutionPropertiesTo(final IREdge thatEdge) {
-
this.getExecutionProperties().forEachProperties((Consumer<EdgeExecutionProperty>)
thatEdge::setProperty);
+ this.getExecutionProperties().forEachProperties(thatEdge::setProperty);
}
@Override
diff --git
a/common/src/main/java/edu/snu/nemo/common/ir/executionproperty/ExecutionPropertyMap.java
b/common/src/main/java/edu/snu/nemo/common/ir/executionproperty/ExecutionPropertyMap.java
index a2ef8d9..869f725 100644
---
a/common/src/main/java/edu/snu/nemo/common/ir/executionproperty/ExecutionPropertyMap.java
+++
b/common/src/main/java/edu/snu/nemo/common/ir/executionproperty/ExecutionPropertyMap.java
@@ -15,11 +15,11 @@
*/
package edu.snu.nemo.common.ir.executionproperty;
+import edu.snu.nemo.common.coder.DecoderFactory;
+import edu.snu.nemo.common.coder.EncoderFactory;
+import edu.snu.nemo.common.exception.CompileTimeOptimizationException;
import edu.snu.nemo.common.ir.edge.IREdge;
-import
edu.snu.nemo.common.ir.edge.executionproperty.CommunicationPatternProperty;
-import edu.snu.nemo.common.ir.edge.executionproperty.DataFlowProperty;
-import edu.snu.nemo.common.ir.edge.executionproperty.DataStoreProperty;
-import edu.snu.nemo.common.ir.edge.executionproperty.PartitionerProperty;
+import edu.snu.nemo.common.ir.edge.executionproperty.*;
import edu.snu.nemo.common.ir.vertex.IRVertex;
import
edu.snu.nemo.common.ir.vertex.executionproperty.ResourcePriorityProperty;
import edu.snu.nemo.common.ir.vertex.executionproperty.ParallelismProperty;
@@ -42,6 +42,7 @@ import java.util.stream.Stream;
public final class ExecutionPropertyMap<T extends ExecutionProperty>
implements Serializable {
private final String id;
private final Map<Class<? extends ExecutionProperty>, T> properties = new
HashMap<>();
+ private final Set<Class<? extends ExecutionProperty>> finalizedProperties =
new HashSet<>();
/**
* Constructor for ExecutionPropertyMap class.
@@ -64,6 +65,8 @@ public final class ExecutionPropertyMap<T extends
ExecutionProperty> implements
final ExecutionPropertyMap<EdgeExecutionProperty> map = new
ExecutionPropertyMap<>(irEdge.getId());
map.put(CommunicationPatternProperty.of(commPattern));
map.put(DataFlowProperty.of(DataFlowProperty.Value.Pull));
+ map.put(EncoderProperty.of(EncoderFactory.DUMMY_ENCODER_FACTORY));
+ map.put(DecoderProperty.of(DecoderFactory.DUMMY_DECODER_FACTORY));
switch (commPattern) {
case Shuffle:
map.put(PartitionerProperty.of(PartitionerProperty.Value.HashPartitioner));
@@ -105,11 +108,36 @@ public final class ExecutionPropertyMap<T extends
ExecutionProperty> implements
}
/**
- * Put the given execution property in the ExecutionPropertyMap.
+ * Put the given execution property in the ExecutionPropertyMap. By
default, it does not finalize the property.
* @param executionProperty execution property to insert.
- * @return the previous execution property, or null if there was no
execution property with the specified property key
+ * @return the previous execution property, or null if there was no
execution property
+ * with the specified property key.
*/
public T put(final T executionProperty) {
+ return this.put(executionProperty, false);
+ }
+
+ /**
+ * Put the given execution property in the ExecutionPropertyMap.
+ * @param executionProperty execution property to insert.
+ * @param finalize whether or not to finalize the execution property.
+ * @return the previous execution property, or null if there was no
execution property
+ * with the specified property key.
+ */
+ public T put(final T executionProperty, final Boolean finalize) {
+ // check if the property has been already finalized. We don't mind
overwriting an identical value.
+ if (finalizedProperties.contains(executionProperty.getClass())
+ && properties.get(executionProperty.getClass()) != null
+ &&
!properties.get(executionProperty.getClass()).equals(executionProperty)) {
+ throw new CompileTimeOptimizationException("Trying to overwrite a
finalized execution property ["
+ + executionProperty.getClass().getSimpleName() + "] from ["
+ + properties.get(executionProperty.getClass()).getValue() + "] to ["
+ executionProperty.getValue() + "]");
+ }
+
+ // start the actual put process.
+ if (finalize) {
+ this.finalizedProperties.add(executionProperty.getClass());
+ }
return properties.put(executionProperty.getClass(), executionProperty);
}
diff --git a/common/src/main/java/edu/snu/nemo/common/ir/vertex/IRVertex.java
b/common/src/main/java/edu/snu/nemo/common/ir/vertex/IRVertex.java
index de7359b..67fc42d 100644
--- a/common/src/main/java/edu/snu/nemo/common/ir/vertex/IRVertex.java
+++ b/common/src/main/java/edu/snu/nemo/common/ir/vertex/IRVertex.java
@@ -22,7 +22,6 @@ import
edu.snu.nemo.common.ir.executionproperty.VertexExecutionProperty;
import java.io.Serializable;
import java.util.Optional;
-import java.util.function.Consumer;
/**
* The basic unit of operation in a dataflow program, as well as the most
important data structure in Nemo.
@@ -49,7 +48,7 @@ public abstract class IRVertex extends Vertex {
* @param thatVertex the edge to copy executionProperties to.
*/
public final void copyExecutionPropertiesTo(final IRVertex thatVertex) {
-
this.getExecutionProperties().forEachProperties((Consumer<VertexExecutionProperty>)
thatVertex::setProperty);
+ this.getExecutionProperties().forEachProperties(thatVertex::setProperty);
}
/**
@@ -58,7 +57,17 @@ public abstract class IRVertex extends Vertex {
* @return the IRVertex with the execution property set.
*/
public final IRVertex setProperty(final VertexExecutionProperty<?>
executionProperty) {
- executionProperties.put(executionProperty);
+ executionProperties.put(executionProperty, false);
+ return this;
+ }
+
+ /**
+ * Set an executionProperty of the IRVertex, permanently.
+ * @param executionProperty new execution property.
+ * @return the IRVertex with the execution property set.
+ */
+ public final IRVertex setPropertyPermanently(final
VertexExecutionProperty<?> executionProperty) {
+ executionProperties.put(executionProperty, true);
return this;
}
@@ -76,7 +85,7 @@ public abstract class IRVertex extends Vertex {
/**
* @return the ExecutionPropertyMap of the IRVertex.
*/
- public final ExecutionPropertyMap getExecutionProperties() {
+ public final ExecutionPropertyMap<VertexExecutionProperty>
getExecutionProperties() {
return executionProperties;
}
diff --git a/common/src/main/java/edu/snu/nemo/common/test/EmptyComponents.java
b/common/src/main/java/edu/snu/nemo/common/test/EmptyComponents.java
index 5af5688..88a768a 100644
--- a/common/src/main/java/edu/snu/nemo/common/test/EmptyComponents.java
+++ b/common/src/main/java/edu/snu/nemo/common/test/EmptyComponents.java
@@ -15,8 +15,14 @@
*/
package edu.snu.nemo.common.test;
+import edu.snu.nemo.common.dag.DAG;
+import edu.snu.nemo.common.dag.DAGBuilder;
import edu.snu.nemo.common.ir.OutputCollector;
import edu.snu.nemo.common.ir.Readable;
+import edu.snu.nemo.common.ir.edge.IREdge;
+import
edu.snu.nemo.common.ir.edge.executionproperty.CommunicationPatternProperty;
+import edu.snu.nemo.common.ir.vertex.IRVertex;
+import edu.snu.nemo.common.ir.vertex.OperatorVertex;
import edu.snu.nemo.common.ir.vertex.SourceVertex;
import edu.snu.nemo.common.ir.vertex.transform.Transform;
@@ -32,6 +38,28 @@ public final class EmptyComponents {
private EmptyComponents() {
}
+ public static DAG<IRVertex, IREdge> buildEmptyDAG() {
+ DAGBuilder<IRVertex, IREdge> dagBuilder = new DAGBuilder<>();
+ final IRVertex s = new EmptyComponents.EmptySourceVertex<>("s");
+ final IRVertex t1 = new OperatorVertex(new
EmptyComponents.EmptyTransform("t1"));
+ final IRVertex t2 = new OperatorVertex(new
EmptyComponents.EmptyTransform("t2"));
+ final IRVertex t3 = new OperatorVertex(new
EmptyComponents.EmptyTransform("t3"));
+ final IRVertex t4 = new OperatorVertex(new
EmptyComponents.EmptyTransform("t4"));
+ final IRVertex t5 = new OperatorVertex(new
EmptyComponents.EmptyTransform("t5"));
+ dagBuilder.addVertex(s);
+ dagBuilder.addVertex(t1);
+ dagBuilder.addVertex(t2);
+ dagBuilder.addVertex(t3);
+ dagBuilder.addVertex(t4);
+ dagBuilder.addVertex(t5);
+ dagBuilder.connectVertices(new
IREdge(CommunicationPatternProperty.Value.OneToOne, s, t1));
+ dagBuilder.connectVertices(new
IREdge(CommunicationPatternProperty.Value.Shuffle, t1, t2));
+ dagBuilder.connectVertices(new
IREdge(CommunicationPatternProperty.Value.OneToOne, t2, t3));
+ dagBuilder.connectVertices(new
IREdge(CommunicationPatternProperty.Value.Shuffle, t3, t4));
+ dagBuilder.connectVertices(new
IREdge(CommunicationPatternProperty.Value.OneToOne, t2, t5));
+ return dagBuilder.build();
+ }
+
/**
* An empty transform.
* @param <I> input type.
diff --git
a/common/src/test/java/edu/snu/nemo/common/ir/executionproperty/ExecutionPropertyMapTest.java
b/common/src/test/java/edu/snu/nemo/common/ir/executionproperty/ExecutionPropertyMapTest.java
index d5aba03..fc2dc0c 100644
---
a/common/src/test/java/edu/snu/nemo/common/ir/executionproperty/ExecutionPropertyMapTest.java
+++
b/common/src/test/java/edu/snu/nemo/common/ir/executionproperty/ExecutionPropertyMapTest.java
@@ -17,6 +17,7 @@ package edu.snu.nemo.common.ir.executionproperty;
import edu.snu.nemo.common.coder.DecoderFactory;
import edu.snu.nemo.common.coder.EncoderFactory;
+import edu.snu.nemo.common.exception.CompileTimeOptimizationException;
import edu.snu.nemo.common.ir.edge.IREdge;
import edu.snu.nemo.common.ir.edge.executionproperty.*;
import edu.snu.nemo.common.ir.vertex.IRVertex;
@@ -24,7 +25,9 @@ import edu.snu.nemo.common.ir.vertex.OperatorVertex;
import edu.snu.nemo.common.ir.vertex.executionproperty.ParallelismProperty;
import edu.snu.nemo.common.test.EmptyComponents;
import org.junit.Before;
+import org.junit.Rule;
import org.junit.Test;
+import org.junit.rules.ExpectedException;
import static org.junit.Assert.assertEquals;
import static org.junit.Assert.assertFalse;
@@ -102,4 +105,20 @@ public class ExecutionPropertyMapTest {
assertTrue(map0.equals(map1));
assertTrue(map1.equals(map0));
}
+
+ @Rule
+ public final ExpectedException expectedException = ExpectedException.none();
+
+ @Test
+ public void testFinalizedProperty() {
+ // this should work without a problem..
+ final ExecutionPropertyMap<ExecutionProperty> map = new
ExecutionPropertyMap<>("map");
+ map.put(ParallelismProperty.of(1), false);
+ assertEquals(ParallelismProperty.of(1),
map.put(ParallelismProperty.of(2)));
+ assertEquals(ParallelismProperty.of(2), map.put(ParallelismProperty.of(3),
true));
+
+ // test exception
+ expectedException.expect(CompileTimeOptimizationException.class);
+ map.put(ParallelismProperty.of(4));
+ }
}
diff --git
a/compiler/optimizer/src/main/java/edu/snu/nemo/compiler/optimizer/pass/compiletime/annotating/DefaultDataStorePass.java
b/compiler/optimizer/src/main/java/edu/snu/nemo/compiler/optimizer/pass/compiletime/annotating/DefaultDataStorePass.java
index 6cdb126..e40b95a 100644
---
a/compiler/optimizer/src/main/java/edu/snu/nemo/compiler/optimizer/pass/compiletime/annotating/DefaultDataStorePass.java
+++
b/compiler/optimizer/src/main/java/edu/snu/nemo/compiler/optimizer/pass/compiletime/annotating/DefaultDataStorePass.java
@@ -37,7 +37,7 @@ public final class DefaultDataStorePass extends
AnnotatingPass {
public DAG<IRVertex, IREdge> apply(final DAG<IRVertex, IREdge> dag) {
dag.getVertices().forEach(vertex -> {
dag.getIncomingEdgesOf(vertex).stream()
- .filter(edge ->
!edge.getExecutionProperties().containsKey(DataStoreProperty.class))
+ .filter(edge ->
!edge.getPropertyValue(DataStoreProperty.class).isPresent())
.forEach(edge -> edge.setProperty(
DataStoreProperty.of(DataStoreProperty.Value.LocalFileStore)));
});
diff --git
a/compiler/optimizer/src/main/java/edu/snu/nemo/compiler/optimizer/pass/compiletime/annotating/DefaultScheduleGroupPass.java
b/compiler/optimizer/src/main/java/edu/snu/nemo/compiler/optimizer/pass/compiletime/annotating/DefaultScheduleGroupPass.java
index 190a552..1aa4dcf 100644
---
a/compiler/optimizer/src/main/java/edu/snu/nemo/compiler/optimizer/pass/compiletime/annotating/DefaultScheduleGroupPass.java
+++
b/compiler/optimizer/src/main/java/edu/snu/nemo/compiler/optimizer/pass/compiletime/annotating/DefaultScheduleGroupPass.java
@@ -213,7 +213,7 @@ public final class DefaultScheduleGroupPass extends
AnnotatingPass {
boolean usedCurrentScheduleGroup = false;
for (final IRVertex irVertex : scheduleGroup.vertices) {
if
(!irVertex.getPropertyValue(ScheduleGroupProperty.class).isPresent()) {
-
irVertex.getExecutionProperties().put(ScheduleGroupProperty.of(currentScheduleGroup.getValue()));
+
irVertex.setProperty(ScheduleGroupProperty.of(currentScheduleGroup.getValue()));
usedCurrentScheduleGroup = true;
}
}
diff --git
a/compiler/optimizer/src/main/java/edu/snu/nemo/compiler/optimizer/pass/compiletime/annotating/DisaggregationEdgeDataStorePass.java
b/compiler/optimizer/src/main/java/edu/snu/nemo/compiler/optimizer/pass/compiletime/annotating/DisaggregationEdgeDataStorePass.java
index e498c74..262f0ca 100644
---
a/compiler/optimizer/src/main/java/edu/snu/nemo/compiler/optimizer/pass/compiletime/annotating/DisaggregationEdgeDataStorePass.java
+++
b/compiler/optimizer/src/main/java/edu/snu/nemo/compiler/optimizer/pass/compiletime/annotating/DisaggregationEdgeDataStorePass.java
@@ -39,9 +39,8 @@ public final class DisaggregationEdgeDataStorePass extends
AnnotatingPass {
public DAG<IRVertex, IREdge> apply(final DAG<IRVertex, IREdge> dag) {
dag.getVertices().forEach(vertex -> { // Initialize the DataStore of the
DAG with GlusterFileStore.
final List<IREdge> inEdges = dag.getIncomingEdgesOf(vertex);
- inEdges.forEach(edge -> {
-
edge.setProperty(DataStoreProperty.of(DataStoreProperty.Value.GlusterFileStore));
- });
+ inEdges.forEach(edge ->
+
edge.setPropertyPermanently(DataStoreProperty.of(DataStoreProperty.Value.GlusterFileStore)));
});
return dag;
}
diff --git
a/compiler/optimizer/src/main/java/edu/snu/nemo/compiler/optimizer/pass/compiletime/annotating/LargeShuffleDataFlowPass.java
b/compiler/optimizer/src/main/java/edu/snu/nemo/compiler/optimizer/pass/compiletime/annotating/LargeShuffleDataFlowPass.java
index 16a3b81..d5d18ac 100644
---
a/compiler/optimizer/src/main/java/edu/snu/nemo/compiler/optimizer/pass/compiletime/annotating/LargeShuffleDataFlowPass.java
+++
b/compiler/optimizer/src/main/java/edu/snu/nemo/compiler/optimizer/pass/compiletime/annotating/LargeShuffleDataFlowPass.java
@@ -43,9 +43,9 @@ public final class LargeShuffleDataFlowPass extends
AnnotatingPass {
inEdges.forEach(edge -> {
if (edge.getPropertyValue(CommunicationPatternProperty.class).get()
.equals(CommunicationPatternProperty.Value.Shuffle)) {
- edge.setProperty(DataFlowProperty.of(DataFlowProperty.Value.Push));
// Push to the merger vertex.
+
edge.setPropertyPermanently(DataFlowProperty.of(DataFlowProperty.Value.Push));
// Push to the merger vertex.
} else {
- edge.setProperty(DataFlowProperty.of(DataFlowProperty.Value.Pull));
+
edge.setPropertyPermanently(DataFlowProperty.of(DataFlowProperty.Value.Pull));
}
});
});
diff --git
a/compiler/optimizer/src/main/java/edu/snu/nemo/compiler/optimizer/pass/compiletime/annotating/LargeShuffleDataPersistencePass.java
b/compiler/optimizer/src/main/java/edu/snu/nemo/compiler/optimizer/pass/compiletime/annotating/LargeShuffleDataPersistencePass.java
index 7ebad49..d7901f2 100644
---
a/compiler/optimizer/src/main/java/edu/snu/nemo/compiler/optimizer/pass/compiletime/annotating/LargeShuffleDataPersistencePass.java
+++
b/compiler/optimizer/src/main/java/edu/snu/nemo/compiler/optimizer/pass/compiletime/annotating/LargeShuffleDataPersistencePass.java
@@ -42,7 +42,7 @@ public final class LargeShuffleDataPersistencePass extends
AnnotatingPass {
dag.getIncomingEdgesOf(irVertex).forEach(irEdge -> {
final DataFlowProperty.Value dataFlowModel =
irEdge.getPropertyValue(DataFlowProperty.class).get();
if (DataFlowProperty.Value.Push.equals(dataFlowModel)) {
-
irEdge.setProperty(DataPersistenceProperty.of(DataPersistenceProperty.Value.Discard));
+
irEdge.setPropertyPermanently(DataPersistenceProperty.of(DataPersistenceProperty.Value.Discard));
}
}));
return dag;
diff --git
a/compiler/optimizer/src/main/java/edu/snu/nemo/compiler/optimizer/pass/compiletime/annotating/LargeShuffleDataStorePass.java
b/compiler/optimizer/src/main/java/edu/snu/nemo/compiler/optimizer/pass/compiletime/annotating/LargeShuffleDataStorePass.java
index c80f675..40b4bb4 100644
---
a/compiler/optimizer/src/main/java/edu/snu/nemo/compiler/optimizer/pass/compiletime/annotating/LargeShuffleDataStorePass.java
+++
b/compiler/optimizer/src/main/java/edu/snu/nemo/compiler/optimizer/pass/compiletime/annotating/LargeShuffleDataStorePass.java
@@ -46,13 +46,12 @@ public final class LargeShuffleDataStorePass extends
AnnotatingPass {
if (CommunicationPatternProperty.Value.Shuffle
.equals(edgeToMerger.getPropertyValue(CommunicationPatternProperty.class).get()))
{
// Pass data through memory to the merger vertex.
- edgeToMerger.setProperty(DataStoreProperty
- .of(DataStoreProperty.Value.SerializedMemoryStore));
+
edgeToMerger.setPropertyPermanently(DataStoreProperty.of(DataStoreProperty.Value.SerializedMemoryStore));
}
});
dag.getOutgoingEdgesOf(vertex).forEach(edgeFromMerger ->
// Merge the input data and write it immediately to the remote
disk.
-
edgeFromMerger.setProperty(DataStoreProperty.of(DataStoreProperty.Value.LocalFileStore)));
+
edgeFromMerger.setPropertyPermanently(DataStoreProperty.of(DataStoreProperty.Value.LocalFileStore)));
}
});
return dag;
diff --git
a/compiler/optimizer/src/main/java/edu/snu/nemo/compiler/optimizer/pass/compiletime/annotating/LargeShuffleDecoderPass.java
b/compiler/optimizer/src/main/java/edu/snu/nemo/compiler/optimizer/pass/compiletime/annotating/LargeShuffleDecoderPass.java
index ed66bcd..978aff1 100644
---
a/compiler/optimizer/src/main/java/edu/snu/nemo/compiler/optimizer/pass/compiletime/annotating/LargeShuffleDecoderPass.java
+++
b/compiler/optimizer/src/main/java/edu/snu/nemo/compiler/optimizer/pass/compiletime/annotating/LargeShuffleDecoderPass.java
@@ -45,7 +45,7 @@ public final class LargeShuffleDecoderPass extends
AnnotatingPass {
inEdges.forEach(edge -> {
if (edge.getPropertyValue(CommunicationPatternProperty.class).get()
.equals(CommunicationPatternProperty.Value.Shuffle)) {
- edge.setProperty(DecoderProperty.of(BytesDecoderFactory.of()));
+
edge.setPropertyPermanently(DecoderProperty.of(BytesDecoderFactory.of()));
}
});
});
diff --git
a/compiler/optimizer/src/main/java/edu/snu/nemo/compiler/optimizer/pass/compiletime/annotating/LargeShuffleDecompressionPass.java
b/compiler/optimizer/src/main/java/edu/snu/nemo/compiler/optimizer/pass/compiletime/annotating/LargeShuffleDecompressionPass.java
index e1b31f2..625b2c9 100644
---
a/compiler/optimizer/src/main/java/edu/snu/nemo/compiler/optimizer/pass/compiletime/annotating/LargeShuffleDecompressionPass.java
+++
b/compiler/optimizer/src/main/java/edu/snu/nemo/compiler/optimizer/pass/compiletime/annotating/LargeShuffleDecompressionPass.java
@@ -45,12 +45,11 @@ public final class LargeShuffleDecompressionPass extends
AnnotatingPass {
inEdges.forEach(edge -> {
if (edge.getPropertyValue(CommunicationPatternProperty.class).get()
.equals(CommunicationPatternProperty.Value.Shuffle)) {
-
edge.setProperty(DecompressionProperty.of(CompressionProperty.Value.None));
+
edge.setPropertyPermanently(DecompressionProperty.of(CompressionProperty.Value.None));
dag.getOutgoingEdgesOf(edge.getDst())
- .forEach(edgeFromRelay -> {
-
edgeFromRelay.setProperty(DecompressionProperty.of(CompressionProperty.Value.LZ4));
- });
+ .forEach(edgeFromRelay ->
+
edgeFromRelay.setPropertyPermanently(DecompressionProperty.of(CompressionProperty.Value.LZ4)));
}
});
});
diff --git
a/compiler/optimizer/src/main/java/edu/snu/nemo/compiler/optimizer/pass/compiletime/annotating/LargeShuffleEncoderPass.java
b/compiler/optimizer/src/main/java/edu/snu/nemo/compiler/optimizer/pass/compiletime/annotating/LargeShuffleEncoderPass.java
index 8cad7b0..43def8b 100644
---
a/compiler/optimizer/src/main/java/edu/snu/nemo/compiler/optimizer/pass/compiletime/annotating/LargeShuffleEncoderPass.java
+++
b/compiler/optimizer/src/main/java/edu/snu/nemo/compiler/optimizer/pass/compiletime/annotating/LargeShuffleEncoderPass.java
@@ -46,9 +46,8 @@ public final class LargeShuffleEncoderPass extends
AnnotatingPass {
if (edge.getPropertyValue(CommunicationPatternProperty.class).get()
.equals(CommunicationPatternProperty.Value.Shuffle)) {
dag.getOutgoingEdgesOf(edge.getDst())
- .forEach(edgeFromRelay -> {
-
edgeFromRelay.setProperty(EncoderProperty.of(BytesEncoderFactory.of()));
- });
+ .forEach(edgeFromRelay ->
+
edgeFromRelay.setPropertyPermanently(EncoderProperty.of(BytesEncoderFactory.of())));
}
});
});
diff --git
a/compiler/optimizer/src/main/java/edu/snu/nemo/compiler/optimizer/pass/compiletime/annotating/LargeShufflePartitionerPass.java
b/compiler/optimizer/src/main/java/edu/snu/nemo/compiler/optimizer/pass/compiletime/annotating/LargeShufflePartitionerPass.java
index eb3b2bf..084ab62 100644
---
a/compiler/optimizer/src/main/java/edu/snu/nemo/compiler/optimizer/pass/compiletime/annotating/LargeShufflePartitionerPass.java
+++
b/compiler/optimizer/src/main/java/edu/snu/nemo/compiler/optimizer/pass/compiletime/annotating/LargeShufflePartitionerPass.java
@@ -47,10 +47,9 @@ public final class LargeShufflePartitionerPass extends
AnnotatingPass {
if (edge.getPropertyValue(CommunicationPatternProperty.class).get()
.equals(CommunicationPatternProperty.Value.Shuffle)) {
dag.getOutgoingEdgesOf(edge.getDst())
- .forEach(edgeFromRelay -> {
- edgeFromRelay.setProperty(PartitionerProperty.of(
-
PartitionerProperty.Value.DedicatedKeyPerElementPartitioner));
- });
+ .forEach(edgeFromRelay ->
+ edgeFromRelay.setPropertyPermanently(PartitionerProperty.of(
+
PartitionerProperty.Value.DedicatedKeyPerElementPartitioner)));
}
});
});
diff --git
a/compiler/optimizer/src/main/java/edu/snu/nemo/compiler/optimizer/pass/compiletime/annotating/LargeShuffleResourceSlotPass.java
b/compiler/optimizer/src/main/java/edu/snu/nemo/compiler/optimizer/pass/compiletime/annotating/LargeShuffleResourceSlotPass.java
index 388f7c2..f474629 100644
---
a/compiler/optimizer/src/main/java/edu/snu/nemo/compiler/optimizer/pass/compiletime/annotating/LargeShuffleResourceSlotPass.java
+++
b/compiler/optimizer/src/main/java/edu/snu/nemo/compiler/optimizer/pass/compiletime/annotating/LargeShuffleResourceSlotPass.java
@@ -37,15 +37,15 @@ public final class LargeShuffleResourceSlotPass extends
AnnotatingPass {
// On every vertex that receive push edge, if ResourceSlotProperty is not
set, put it as false.
// For other vertices, if ResourceSlotProperty is not set, put it as true.
dag.getVertices().stream()
- .filter(v ->
!v.getExecutionProperties().containsKey(ResourceSlotProperty.class))
+ .filter(v ->
!v.getPropertyValue(ResourceSlotProperty.class).isPresent())
.forEach(v -> {
if (dag.getIncomingEdgesOf(v).stream().anyMatch(
e -> e.getPropertyValue(DataFlowProperty.class)
.orElseThrow(() -> new
RuntimeException(String.format("DataFlowProperty for %s must be set",
e.getId()))).equals(DataFlowProperty.Value.Push))) {
- v.getExecutionProperties().put(ResourceSlotProperty.of(false));
+ v.setPropertyPermanently(ResourceSlotProperty.of(false));
} else {
- v.getExecutionProperties().put(ResourceSlotProperty.of(true));
+ v.setPropertyPermanently(ResourceSlotProperty.of(true));
}
});
return dag;
diff --git
a/compiler/optimizer/src/main/java/edu/snu/nemo/compiler/optimizer/pass/compiletime/annotating/LargeSuffleCompressionPass.java
b/compiler/optimizer/src/main/java/edu/snu/nemo/compiler/optimizer/pass/compiletime/annotating/LargeSuffleCompressionPass.java
index abc9300..08ede72 100644
---
a/compiler/optimizer/src/main/java/edu/snu/nemo/compiler/optimizer/pass/compiletime/annotating/LargeSuffleCompressionPass.java
+++
b/compiler/optimizer/src/main/java/edu/snu/nemo/compiler/optimizer/pass/compiletime/annotating/LargeSuffleCompressionPass.java
@@ -44,12 +44,11 @@ public final class LargeSuffleCompressionPass extends
AnnotatingPass {
inEdges.forEach(edge -> {
if (edge.getPropertyValue(CommunicationPatternProperty.class).get()
.equals(CommunicationPatternProperty.Value.Shuffle)) {
-
edge.setProperty(CompressionProperty.of(CompressionProperty.Value.LZ4));
+
edge.setPropertyPermanently(CompressionProperty.of(CompressionProperty.Value.LZ4));
dag.getOutgoingEdgesOf(edge.getDst())
- .forEach(edgeFromRelay -> {
-
edgeFromRelay.setProperty(CompressionProperty.of(CompressionProperty.Value.None));
- });
+ .forEach(edgeFromRelay ->
+
edgeFromRelay.setPropertyPermanently(CompressionProperty.of(CompressionProperty.Value.None)));
}
});
});
diff --git
a/compiler/optimizer/src/main/java/edu/snu/nemo/compiler/optimizer/pass/compiletime/annotating/ResourceLocalityPass.java
b/compiler/optimizer/src/main/java/edu/snu/nemo/compiler/optimizer/pass/compiletime/annotating/ResourceLocalityPass.java
index 4862c7e..53f23df 100644
---
a/compiler/optimizer/src/main/java/edu/snu/nemo/compiler/optimizer/pass/compiletime/annotating/ResourceLocalityPass.java
+++
b/compiler/optimizer/src/main/java/edu/snu/nemo/compiler/optimizer/pass/compiletime/annotating/ResourceLocalityPass.java
@@ -33,8 +33,8 @@ public final class ResourceLocalityPass extends
AnnotatingPass {
public DAG<IRVertex, IREdge> apply(final DAG<IRVertex, IREdge> dag) {
// On every vertex, if ResourceLocalityProperty is not set, put it as true.
dag.getVertices().stream()
- .filter(v ->
!v.getExecutionProperties().containsKey(ResourceLocalityProperty.class))
- .forEach(v ->
v.getExecutionProperties().put(ResourceLocalityProperty.of(true)));
+ .filter(v ->
!v.getPropertyValue(ResourceLocalityProperty.class).isPresent())
+ .forEach(v -> v.setProperty(ResourceLocalityProperty.of(true)));
return dag;
}
}
diff --git
a/compiler/optimizer/src/main/java/edu/snu/nemo/compiler/optimizer/pass/compiletime/annotating/ResourceSitePass.java
b/compiler/optimizer/src/main/java/edu/snu/nemo/compiler/optimizer/pass/compiletime/annotating/ResourceSitePass.java
index 6189826..005138c 100644
---
a/compiler/optimizer/src/main/java/edu/snu/nemo/compiler/optimizer/pass/compiletime/annotating/ResourceSitePass.java
+++
b/compiler/optimizer/src/main/java/edu/snu/nemo/compiler/optimizer/pass/compiletime/annotating/ResourceSitePass.java
@@ -101,11 +101,11 @@ public final class ResourceSitePass extends
AnnotatingPass {
if (inEdges.size() == 0) {
// This vertex is root vertex.
// Fall back to setting even distribution
-
irVertex.getExecutionProperties().put(ResourceSiteProperty.of(EMPTY_MAP));
+ irVertex.setProperty(ResourceSiteProperty.of(EMPTY_MAP));
} else if (isOneToOneEdge(inEdges)) {
final Optional<HashMap<String, Integer>> property =
inEdges.iterator().next().getSrc()
- .getExecutionProperties().get(ResourceSiteProperty.class);
-
irVertex.getExecutionProperties().put(ResourceSiteProperty.of(property.get()));
+ .getPropertyValue(ResourceSiteProperty.class);
+ irVertex.setProperty(ResourceSiteProperty.of(property.get()));
} else {
// This IRVertex has shuffle inEdge(s), or has multiple inEdges.
final Map<String, Integer> parentLocationShares = new HashMap<>();
@@ -135,7 +135,7 @@ public final class ResourceSitePass extends AnnotatingPass {
shares.put(nodeName, shares.get(nodeName) + 1);
remainder--;
}
- irVertex.getExecutionProperties().put(ResourceSiteProperty.of(shares));
+ irVertex.setProperty(ResourceSiteProperty.of(shares));
}
});
}
diff --git
a/compiler/optimizer/src/main/java/edu/snu/nemo/compiler/optimizer/pass/compiletime/annotating/ResourceSlotPass.java
b/compiler/optimizer/src/main/java/edu/snu/nemo/compiler/optimizer/pass/compiletime/annotating/ResourceSlotPass.java
index 966fb50..f971882 100644
---
a/compiler/optimizer/src/main/java/edu/snu/nemo/compiler/optimizer/pass/compiletime/annotating/ResourceSlotPass.java
+++
b/compiler/optimizer/src/main/java/edu/snu/nemo/compiler/optimizer/pass/compiletime/annotating/ResourceSlotPass.java
@@ -33,8 +33,8 @@ public final class ResourceSlotPass extends AnnotatingPass {
public DAG<IRVertex, IREdge> apply(final DAG<IRVertex, IREdge> dag) {
// On every vertex, if ResourceSlotProperty is not set, put it as true.
dag.getVertices().stream()
- .filter(v ->
!v.getExecutionProperties().containsKey(ResourceSlotProperty.class))
- .forEach(v ->
v.getExecutionProperties().put(ResourceSlotProperty.of(true)));
+ .filter(v ->
!v.getPropertyValue(ResourceSlotProperty.class).isPresent())
+ .forEach(v -> v.setProperty(ResourceSlotProperty.of(true)));
return dag;
}
}
diff --git
a/compiler/optimizer/src/main/java/edu/snu/nemo/compiler/optimizer/pass/compiletime/annotating/SkewDataStorePass.java
b/compiler/optimizer/src/main/java/edu/snu/nemo/compiler/optimizer/pass/compiletime/annotating/SkewDataStorePass.java
index 6405c45..8b1592b 100644
---
a/compiler/optimizer/src/main/java/edu/snu/nemo/compiler/optimizer/pass/compiletime/annotating/SkewDataStorePass.java
+++
b/compiler/optimizer/src/main/java/edu/snu/nemo/compiler/optimizer/pass/compiletime/annotating/SkewDataStorePass.java
@@ -45,9 +45,9 @@ public final class SkewDataStorePass extends AnnotatingPass {
dag.getIncomingEdgesOf(v).forEach(edge -> {
// we want it to be in the same stage
if (edge.equals(edgeToUseMemory)) {
-
edge.setProperty(DataStoreProperty.of(DataStoreProperty.Value.MemoryStore));
+
edge.setPropertyPermanently(DataStoreProperty.of(DataStoreProperty.Value.MemoryStore));
} else {
-
edge.setProperty(DataStoreProperty.of(DataStoreProperty.Value.LocalFileStore));
+
edge.setPropertyPermanently(DataStoreProperty.of(DataStoreProperty.Value.LocalFileStore));
}
});
}
diff --git
a/compiler/optimizer/src/main/java/edu/snu/nemo/compiler/optimizer/pass/compiletime/annotating/SkewMetricCollectionPass.java
b/compiler/optimizer/src/main/java/edu/snu/nemo/compiler/optimizer/pass/compiletime/annotating/SkewMetricCollectionPass.java
index fe19942..f2bf9d3 100644
---
a/compiler/optimizer/src/main/java/edu/snu/nemo/compiler/optimizer/pass/compiletime/annotating/SkewMetricCollectionPass.java
+++
b/compiler/optimizer/src/main/java/edu/snu/nemo/compiler/optimizer/pass/compiletime/annotating/SkewMetricCollectionPass.java
@@ -46,7 +46,8 @@ public final class SkewMetricCollectionPass extends
AnnotatingPass {
// double checking.
if (edge.getPropertyValue(CommunicationPatternProperty.class).get()
.equals(CommunicationPatternProperty.Value.Shuffle)) {
-
edge.setProperty(MetricCollectionProperty.of(MetricCollectionProperty.Value.DataSkewRuntimePass));
+ edge.setPropertyPermanently(MetricCollectionProperty.of(
+ MetricCollectionProperty.Value.DataSkewRuntimePass));
}
});
}
diff --git
a/compiler/optimizer/src/main/java/edu/snu/nemo/compiler/optimizer/pass/compiletime/annotating/SkewPartitionerPass.java
b/compiler/optimizer/src/main/java/edu/snu/nemo/compiler/optimizer/pass/compiletime/annotating/SkewPartitionerPass.java
index 2b9b903..58c785f 100644
---
a/compiler/optimizer/src/main/java/edu/snu/nemo/compiler/optimizer/pass/compiletime/annotating/SkewPartitionerPass.java
+++
b/compiler/optimizer/src/main/java/edu/snu/nemo/compiler/optimizer/pass/compiletime/annotating/SkewPartitionerPass.java
@@ -45,7 +45,7 @@ public final class SkewPartitionerPass extends AnnotatingPass
{
// double checking.
if (MetricCollectionProperty.Value.DataSkewRuntimePass
.equals(edge.getPropertyValue(MetricCollectionProperty.class).get())) {
-
edge.setProperty(PartitionerProperty.of(PartitionerProperty.Value.DataSkewHashPartitioner));
+
edge.setPropertyPermanently(PartitionerProperty.of(PartitionerProperty.Value.DataSkewHashPartitioner));
}
});
}
diff --git
a/compiler/optimizer/src/main/java/edu/snu/nemo/compiler/optimizer/pass/compiletime/annotating/SkewResourceSkewedDataPass.java
b/compiler/optimizer/src/main/java/edu/snu/nemo/compiler/optimizer/pass/compiletime/annotating/SkewResourceSkewedDataPass.java
index ab2f4be..e2ac70f 100644
---
a/compiler/optimizer/src/main/java/edu/snu/nemo/compiler/optimizer/pass/compiletime/annotating/SkewResourceSkewedDataPass.java
+++
b/compiler/optimizer/src/main/java/edu/snu/nemo/compiler/optimizer/pass/compiletime/annotating/SkewResourceSkewedDataPass.java
@@ -55,8 +55,8 @@ public final class SkewResourceSkewedDataPass extends
AnnotatingPass {
.of(DynamicOptimizationProperty.Value.DataSkewRuntimePass)));
dag.getVertices().stream()
.filter(v -> hasMetricCollectionBarrierVertexAsParent(dag, v)
- &&
!v.getExecutionProperties().containsKey(ResourceSkewedDataProperty.class))
- .forEach(v ->
v.getExecutionProperties().put(ResourceSkewedDataProperty.of(true)));
+ &&
!v.getPropertyValue(ResourceSkewedDataProperty.class).isPresent())
+ .forEach(v -> v.setProperty(ResourceSkewedDataProperty.of(true)));
return dag;
}
diff --git
a/compiler/optimizer/src/main/java/edu/snu/nemo/compiler/optimizer/pass/compiletime/annotating/TransientResourceDataFlowPass.java
b/compiler/optimizer/src/main/java/edu/snu/nemo/compiler/optimizer/pass/compiletime/annotating/TransientResourceDataFlowPass.java
index 3433dbe..4da3bcf 100644
---
a/compiler/optimizer/src/main/java/edu/snu/nemo/compiler/optimizer/pass/compiletime/annotating/TransientResourceDataFlowPass.java
+++
b/compiler/optimizer/src/main/java/edu/snu/nemo/compiler/optimizer/pass/compiletime/annotating/TransientResourceDataFlowPass.java
@@ -44,9 +44,9 @@ public final class TransientResourceDataFlowPass extends
AnnotatingPass {
if (!inEdges.isEmpty()) {
inEdges.forEach(edge -> {
if (fromTransientToReserved(edge)) {
- edge.setProperty(DataFlowProperty.of(DataFlowProperty.Value.Push));
+
edge.setPropertyPermanently(DataFlowProperty.of(DataFlowProperty.Value.Push));
} else {
- edge.setProperty(DataFlowProperty.of(DataFlowProperty.Value.Pull));
+
edge.setPropertyPermanently(DataFlowProperty.of(DataFlowProperty.Value.Pull));
}
});
}
diff --git
a/compiler/optimizer/src/main/java/edu/snu/nemo/compiler/optimizer/pass/compiletime/annotating/TransientResourceDataStorePass.java
b/compiler/optimizer/src/main/java/edu/snu/nemo/compiler/optimizer/pass/compiletime/annotating/TransientResourceDataStorePass.java
index bcf0d0e..dcd9132 100644
---
a/compiler/optimizer/src/main/java/edu/snu/nemo/compiler/optimizer/pass/compiletime/annotating/TransientResourceDataStorePass.java
+++
b/compiler/optimizer/src/main/java/edu/snu/nemo/compiler/optimizer/pass/compiletime/annotating/TransientResourceDataStorePass.java
@@ -43,12 +43,12 @@ public final class TransientResourceDataStorePass extends
AnnotatingPass {
if (!inEdges.isEmpty()) {
inEdges.forEach(edge -> {
if (fromTransientToReserved(edge) || fromReservedToTransient(edge)) {
-
edge.setProperty(DataStoreProperty.of(DataStoreProperty.Value.LocalFileStore));
+
edge.setPropertyPermanently(DataStoreProperty.of(DataStoreProperty.Value.LocalFileStore));
} else if (CommunicationPatternProperty.Value.OneToOne
.equals(edge.getPropertyValue(CommunicationPatternProperty.class).get())) {
-
edge.setProperty(DataStoreProperty.of(DataStoreProperty.Value.MemoryStore));
+
edge.setPropertyPermanently(DataStoreProperty.of(DataStoreProperty.Value.MemoryStore));
} else {
-
edge.setProperty(DataStoreProperty.of(DataStoreProperty.Value.LocalFileStore));
+
edge.setPropertyPermanently(DataStoreProperty.of(DataStoreProperty.Value.LocalFileStore));
}
});
}
diff --git
a/compiler/optimizer/src/main/java/edu/snu/nemo/compiler/optimizer/pass/compiletime/annotating/TransientResourcePriorityPass.java
b/compiler/optimizer/src/main/java/edu/snu/nemo/compiler/optimizer/pass/compiletime/annotating/TransientResourcePriorityPass.java
index fd1eb5d..cdf25a2 100644
---
a/compiler/optimizer/src/main/java/edu/snu/nemo/compiler/optimizer/pass/compiletime/annotating/TransientResourcePriorityPass.java
+++
b/compiler/optimizer/src/main/java/edu/snu/nemo/compiler/optimizer/pass/compiletime/annotating/TransientResourcePriorityPass.java
@@ -40,12 +40,12 @@ public final class TransientResourcePriorityPass extends
AnnotatingPass {
dag.topologicalDo(vertex -> {
final List<IREdge> inEdges = dag.getIncomingEdgesOf(vertex);
if (inEdges.isEmpty()) {
-
vertex.setProperty(ResourcePriorityProperty.of(ResourcePriorityProperty.TRANSIENT));
+
vertex.setPropertyPermanently(ResourcePriorityProperty.of(ResourcePriorityProperty.TRANSIENT));
} else {
if (hasM2M(inEdges) || allO2OFromReserved(inEdges)) {
-
vertex.setProperty(ResourcePriorityProperty.of(ResourcePriorityProperty.RESERVED));
+
vertex.setPropertyPermanently(ResourcePriorityProperty.of(ResourcePriorityProperty.RESERVED));
} else {
-
vertex.setProperty(ResourcePriorityProperty.of(ResourcePriorityProperty.TRANSIENT));
+
vertex.setPropertyPermanently(ResourcePriorityProperty.of(ResourcePriorityProperty.TRANSIENT));
}
}
});
diff --git
a/compiler/optimizer/src/main/java/edu/snu/nemo/compiler/optimizer/policy/BasicPullPolicy.java
b/compiler/optimizer/src/main/java/edu/snu/nemo/compiler/optimizer/policy/BasicPullPolicy.java
index c251469..31999c8 100644
---
a/compiler/optimizer/src/main/java/edu/snu/nemo/compiler/optimizer/policy/BasicPullPolicy.java
+++
b/compiler/optimizer/src/main/java/edu/snu/nemo/compiler/optimizer/policy/BasicPullPolicy.java
@@ -27,7 +27,7 @@ import org.apache.reef.tang.Injector;
*/
public final class BasicPullPolicy implements Policy {
public static final PolicyBuilder BUILDER =
- new PolicyBuilder(true)
+ new PolicyBuilder()
.registerCompileTimePass(new DefaultScheduleGroupPass());
private final Policy policy;
diff --git
a/compiler/optimizer/src/main/java/edu/snu/nemo/compiler/optimizer/policy/BasicPushPolicy.java
b/compiler/optimizer/src/main/java/edu/snu/nemo/compiler/optimizer/policy/BasicPushPolicy.java
index 31a5d7b..5e40144 100644
---
a/compiler/optimizer/src/main/java/edu/snu/nemo/compiler/optimizer/policy/BasicPushPolicy.java
+++
b/compiler/optimizer/src/main/java/edu/snu/nemo/compiler/optimizer/policy/BasicPushPolicy.java
@@ -28,7 +28,7 @@ import org.apache.reef.tang.Injector;
*/
public final class BasicPushPolicy implements Policy {
public static final PolicyBuilder BUILDER =
- new PolicyBuilder(true)
+ new PolicyBuilder()
.registerCompileTimePass(new ShuffleEdgePushPass())
.registerCompileTimePass(new DefaultScheduleGroupPass());
private final Policy policy;
diff --git
a/compiler/optimizer/src/main/java/edu/snu/nemo/compiler/optimizer/policy/ConditionalLargeShufflePolicy.java
b/compiler/optimizer/src/main/java/edu/snu/nemo/compiler/optimizer/policy/ConditionalLargeShufflePolicy.java
index efdf14d..807dd57 100644
---
a/compiler/optimizer/src/main/java/edu/snu/nemo/compiler/optimizer/policy/ConditionalLargeShufflePolicy.java
+++
b/compiler/optimizer/src/main/java/edu/snu/nemo/compiler/optimizer/policy/ConditionalLargeShufflePolicy.java
@@ -31,7 +31,7 @@ import org.apache.reef.tang.Injector;
*/
public final class ConditionalLargeShufflePolicy implements Policy {
public static final PolicyBuilder BUILDER =
- new PolicyBuilder(false)
+ new PolicyBuilder()
.registerCompileTimePass(new LargeShuffleCompositePass(), dag ->
getMaxParallelism(dag) > 300)
.registerCompileTimePass(new LoopOptimizationCompositePass())
.registerCompileTimePass(new DefaultCompositePass());
diff --git
a/compiler/optimizer/src/main/java/edu/snu/nemo/compiler/optimizer/policy/DataSkewPolicy.java
b/compiler/optimizer/src/main/java/edu/snu/nemo/compiler/optimizer/policy/DataSkewPolicy.java
index be23e6c..9d7eb29 100644
---
a/compiler/optimizer/src/main/java/edu/snu/nemo/compiler/optimizer/policy/DataSkewPolicy.java
+++
b/compiler/optimizer/src/main/java/edu/snu/nemo/compiler/optimizer/policy/DataSkewPolicy.java
@@ -30,7 +30,7 @@ import org.apache.reef.tang.Injector;
*/
public final class DataSkewPolicy implements Policy {
public static final PolicyBuilder BUILDER =
- new PolicyBuilder(true)
+ new PolicyBuilder()
.registerRuntimePass(new
DataSkewRuntimePass().setNumSkewedKeys(DataSkewRuntimePass.DEFAULT_NUM_SKEWED_KEYS),
new SkewCompositePass())
.registerCompileTimePass(new LoopOptimizationCompositePass())
@@ -45,7 +45,7 @@ public final class DataSkewPolicy implements Policy {
}
public DataSkewPolicy(final int skewness) {
- this.policy = new PolicyBuilder(true)
+ this.policy = new PolicyBuilder()
.registerRuntimePass(new
DataSkewRuntimePass().setNumSkewedKeys(skewness), new SkewCompositePass())
.registerCompileTimePass(new LoopOptimizationCompositePass())
.registerCompileTimePass(new DefaultCompositePass())
diff --git
a/compiler/optimizer/src/main/java/edu/snu/nemo/compiler/optimizer/policy/DefaultPolicy.java
b/compiler/optimizer/src/main/java/edu/snu/nemo/compiler/optimizer/policy/DefaultPolicy.java
index 986aec5..a84ac35 100644
---
a/compiler/optimizer/src/main/java/edu/snu/nemo/compiler/optimizer/policy/DefaultPolicy.java
+++
b/compiler/optimizer/src/main/java/edu/snu/nemo/compiler/optimizer/policy/DefaultPolicy.java
@@ -27,7 +27,7 @@ import org.apache.reef.tang.Injector;
*/
public final class DefaultPolicy implements Policy {
public static final PolicyBuilder BUILDER =
- new PolicyBuilder(true)
+ new PolicyBuilder()
.registerCompileTimePass(new DefaultCompositePass());
private final Policy policy;
diff --git
a/compiler/optimizer/src/main/java/edu/snu/nemo/compiler/optimizer/policy/DefaultPolicyWithSeparatePass.java
b/compiler/optimizer/src/main/java/edu/snu/nemo/compiler/optimizer/policy/DefaultPolicyWithSeparatePass.java
index c832d1f..39b658b 100644
---
a/compiler/optimizer/src/main/java/edu/snu/nemo/compiler/optimizer/policy/DefaultPolicyWithSeparatePass.java
+++
b/compiler/optimizer/src/main/java/edu/snu/nemo/compiler/optimizer/policy/DefaultPolicyWithSeparatePass.java
@@ -34,7 +34,7 @@ import java.util.Arrays;
*/
public final class DefaultPolicyWithSeparatePass implements Policy {
public static final PolicyBuilder BUILDER =
- new PolicyBuilder(true)
+ new PolicyBuilder()
.registerCompileTimePass(new DefaultParallelismPass())
.registerCompileTimePass(new RefactoredPass());
private final Policy policy;
diff --git
a/compiler/optimizer/src/main/java/edu/snu/nemo/compiler/optimizer/policy/DisaggregationPolicy.java
b/compiler/optimizer/src/main/java/edu/snu/nemo/compiler/optimizer/policy/DisaggregationPolicy.java
index 667b670..0d934ca 100644
---
a/compiler/optimizer/src/main/java/edu/snu/nemo/compiler/optimizer/policy/DisaggregationPolicy.java
+++
b/compiler/optimizer/src/main/java/edu/snu/nemo/compiler/optimizer/policy/DisaggregationPolicy.java
@@ -29,10 +29,10 @@ import org.apache.reef.tang.Injector;
*/
public final class DisaggregationPolicy implements Policy {
public static final PolicyBuilder BUILDER =
- new PolicyBuilder(false)
+ new PolicyBuilder()
+ .registerCompileTimePass(new DisaggregationEdgeDataStorePass())
.registerCompileTimePass(new LoopOptimizationCompositePass())
- .registerCompileTimePass(new DefaultCompositePass())
- .registerCompileTimePass(new DisaggregationEdgeDataStorePass());
+ .registerCompileTimePass(new DefaultCompositePass());
private final Policy policy;
/**
diff --git
a/compiler/optimizer/src/main/java/edu/snu/nemo/compiler/optimizer/policy/LargeShufflePolicy.java
b/compiler/optimizer/src/main/java/edu/snu/nemo/compiler/optimizer/policy/LargeShufflePolicy.java
index 018c403..57423b8 100644
---
a/compiler/optimizer/src/main/java/edu/snu/nemo/compiler/optimizer/policy/LargeShufflePolicy.java
+++
b/compiler/optimizer/src/main/java/edu/snu/nemo/compiler/optimizer/policy/LargeShufflePolicy.java
@@ -29,10 +29,10 @@ import org.apache.reef.tang.Injector;
*/
public final class LargeShufflePolicy implements Policy {
public static final PolicyBuilder BUILDER =
- new PolicyBuilder(false)
- .registerCompileTimePass(new LargeShuffleCompositePass())
- .registerCompileTimePass(new LoopOptimizationCompositePass())
- .registerCompileTimePass(new DefaultCompositePass());
+ new PolicyBuilder()
+ .registerCompileTimePass(new LargeShuffleCompositePass())
+ .registerCompileTimePass(new LoopOptimizationCompositePass())
+ .registerCompileTimePass(new DefaultCompositePass());
private final Policy policy;
/**
diff --git
a/compiler/optimizer/src/main/java/edu/snu/nemo/compiler/optimizer/policy/PolicyBuilder.java
b/compiler/optimizer/src/main/java/edu/snu/nemo/compiler/optimizer/policy/PolicyBuilder.java
index bf939b9..13a2507 100644
---
a/compiler/optimizer/src/main/java/edu/snu/nemo/compiler/optimizer/policy/PolicyBuilder.java
+++
b/compiler/optimizer/src/main/java/edu/snu/nemo/compiler/optimizer/policy/PolicyBuilder.java
@@ -40,27 +40,15 @@ import java.util.function.Predicate;
public final class PolicyBuilder {
private final List<CompileTimePass> compileTimePasses;
private final List<RuntimePass<?>> runtimePasses;
- private final Set<Class<? extends ExecutionProperty>>
finalizedExecutionProperties;
private final Set<Class<? extends ExecutionProperty>>
annotatedExecutionProperties;
- private final Boolean strictPrerequisiteCheckMode;
/**
* Default constructor.
*/
public PolicyBuilder() {
- this(false);
- }
-
- /**
- * Constructor.
- * @param strictPrerequisiteCheckMode whether to use strict prerequisite
check mode or not.
- */
- public PolicyBuilder(final Boolean strictPrerequisiteCheckMode) {
this.compileTimePasses = new ArrayList<>();
this.runtimePasses = new ArrayList<>();
- this.finalizedExecutionProperties = new HashSet<>();
this.annotatedExecutionProperties = new HashSet<>();
- this.strictPrerequisiteCheckMode = strictPrerequisiteCheckMode;
// DataCommunicationPattern is already set when creating the IREdge itself.
annotatedExecutionProperties.add(CommunicationPatternProperty.class);
// Some default values are already annotated.
@@ -94,13 +82,7 @@ public final class PolicyBuilder {
if (compileTimePass instanceof AnnotatingPass) {
final AnnotatingPass annotatingPass = (AnnotatingPass) compileTimePass;
this.annotatedExecutionProperties.add(annotatingPass.getExecutionPropertyToModify());
- if (strictPrerequisiteCheckMode
- &&
finalizedExecutionProperties.contains(annotatingPass.getExecutionPropertyToModify()))
{
- throw new
CompileTimeOptimizationException(annotatingPass.getExecutionPropertyToModify()
- + " should have already been finalized.");
- }
}
-
finalizedExecutionProperties.addAll(compileTimePass.getPrerequisiteExecutionProperties());
this.compileTimePasses.add(compileTimePass);
diff --git
a/compiler/optimizer/src/main/java/edu/snu/nemo/compiler/optimizer/policy/TransientResourcePolicy.java
b/compiler/optimizer/src/main/java/edu/snu/nemo/compiler/optimizer/policy/TransientResourcePolicy.java
index caa29d9..4528f70 100644
---
a/compiler/optimizer/src/main/java/edu/snu/nemo/compiler/optimizer/policy/TransientResourcePolicy.java
+++
b/compiler/optimizer/src/main/java/edu/snu/nemo/compiler/optimizer/policy/TransientResourcePolicy.java
@@ -29,7 +29,7 @@ import org.apache.reef.tang.Injector;
*/
public final class TransientResourcePolicy implements Policy {
public static final PolicyBuilder BUILDER =
- new PolicyBuilder(true)
+ new PolicyBuilder()
.registerCompileTimePass(new TransientResourceCompositePass())
.registerCompileTimePass(new LoopOptimizationCompositePass())
.registerCompileTimePass(new DefaultCompositePass());
diff --git
a/compiler/optimizer/src/test/java/edu/snu/nemo/compiler/optimizer/policy/PolicyImplTest.java
b/compiler/optimizer/src/test/java/edu/snu/nemo/compiler/optimizer/policy/PolicyImplTest.java
new file mode 100644
index 0000000..2b2c7b7
--- /dev/null
+++
b/compiler/optimizer/src/test/java/edu/snu/nemo/compiler/optimizer/policy/PolicyImplTest.java
@@ -0,0 +1,115 @@
+/*
+ * Copyright (C) 2018 Seoul National University
+ *
+ * Licensed under the Apache License, Version 2.0 (the "License");
+ * you may not use this file except in compliance with the License.
+ * You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package edu.snu.nemo.compiler.optimizer.policy;
+
+import edu.snu.nemo.common.dag.DAG;
+import edu.snu.nemo.common.exception.CompileTimeOptimizationException;
+import edu.snu.nemo.common.test.EmptyComponents;
+import edu.snu.nemo.compiler.optimizer.pass.compiletime.CompileTimePass;
+import edu.snu.nemo.runtime.common.optimizer.pass.runtime.RuntimePass;
+import org.junit.Before;
+import org.junit.Rule;
+import org.junit.Test;
+import org.junit.rules.ExpectedException;
+
+import java.util.ArrayList;
+import java.util.List;
+
+public final class PolicyImplTest {
+ private DAG dag;
+
+ @Before
+ public void setUp() {
+ this.dag = EmptyComponents.buildEmptyDAG();
+ }
+
+ @Rule
+ public final ExpectedException expectedException = ExpectedException.none();
+
+ @Test
+ public void testTransientPolicy() throws Exception {
+ // this should run without an exception.
+ TransientResourcePolicy.BUILDER.build().runCompileTimeOptimization(dag,
DAG.EMPTY_DAG_DIRECTORY);
+ }
+
+ @Test
+ public void testDisaggregationPolicy() throws Exception {
+ // this should run without an exception.
+ DisaggregationPolicy.BUILDER.build().runCompileTimeOptimization(dag,
DAG.EMPTY_DAG_DIRECTORY);
+ }
+
+ @Test
+ public void testDataSkewPolicy() throws Exception {
+ // this should run without an exception.
+ DataSkewPolicy.BUILDER.build().runCompileTimeOptimization(dag,
DAG.EMPTY_DAG_DIRECTORY);
+ }
+
+ @Test
+ public void testLargeShufflePolicy() throws Exception {
+ // this should run without an exception.
+ LargeShufflePolicy.BUILDER.build().runCompileTimeOptimization(dag,
DAG.EMPTY_DAG_DIRECTORY);
+ }
+
+ @Test
+ public void testTransientAndLargeShuffleCombination() throws Exception {
+ final List<CompileTimePass> compileTimePasses = new ArrayList<>();
+ final List<RuntimePass<?>> runtimePasses = new ArrayList<>();
+
compileTimePasses.addAll(TransientResourcePolicy.BUILDER.getCompileTimePasses());
+ runtimePasses.addAll(TransientResourcePolicy.BUILDER.getRuntimePasses());
+
compileTimePasses.addAll(LargeShufflePolicy.BUILDER.getCompileTimePasses());
+ runtimePasses.addAll(LargeShufflePolicy.BUILDER.getRuntimePasses());
+
+ final Policy combinedPolicy = new PolicyImpl(compileTimePasses,
runtimePasses);
+
+ // This should NOT throw an exception and work well together.
+ combinedPolicy.runCompileTimeOptimization(dag, DAG.EMPTY_DAG_DIRECTORY);
+ }
+
+ @Test
+ public void testTransientAndDisaggregationCombination() throws Exception {
+ final List<CompileTimePass> compileTimePasses = new ArrayList<>();
+ final List<RuntimePass<?>> runtimePasses = new ArrayList<>();
+
compileTimePasses.addAll(TransientResourcePolicy.BUILDER.getCompileTimePasses());
+ runtimePasses.addAll(TransientResourcePolicy.BUILDER.getRuntimePasses());
+
compileTimePasses.addAll(DisaggregationPolicy.BUILDER.getCompileTimePasses());
+ runtimePasses.addAll(DisaggregationPolicy.BUILDER.getRuntimePasses());
+
+ final Policy combinedPolicy = new PolicyImpl(compileTimePasses,
runtimePasses);
+
+ // This should throw an exception.
+ // Not all data store should be transferred from and to the GFS.
+ expectedException.expect(CompileTimeOptimizationException.class);
+ combinedPolicy.runCompileTimeOptimization(dag, DAG.EMPTY_DAG_DIRECTORY);
+ }
+
+ @Test
+ public void testDataSkewAndLargeShuffleCombination() throws Exception {
+ final List<CompileTimePass> compileTimePasses = new ArrayList<>();
+ final List<RuntimePass<?>> runtimePasses = new ArrayList<>();
+ compileTimePasses.addAll(DataSkewPolicy.BUILDER.getCompileTimePasses());
+ runtimePasses.addAll(DataSkewPolicy.BUILDER.getRuntimePasses());
+
compileTimePasses.addAll(LargeShufflePolicy.BUILDER.getCompileTimePasses());
+ runtimePasses.addAll(LargeShufflePolicy.BUILDER.getRuntimePasses());
+
+ final Policy combinedPolicy = new PolicyImpl(compileTimePasses,
runtimePasses);
+
+ // This should throw an exception.
+ // DataSizeMetricCollection is not compatible with Push (All data have to
be stored before the data collection).
+ expectedException.expect(CompileTimeOptimizationException.class);
+ combinedPolicy.runCompileTimeOptimization(dag, DAG.EMPTY_DAG_DIRECTORY);
+ }
+}
diff --git
a/compiler/test/src/test/java/edu/snu/nemo/compiler/optimizer/pass/compiletime/annotating/DefaultScheduleGroupPassTest.java
b/compiler/test/src/test/java/edu/snu/nemo/compiler/optimizer/pass/compiletime/annotating/DefaultScheduleGroupPassTest.java
index bf348e0..70b3275 100644
---
a/compiler/test/src/test/java/edu/snu/nemo/compiler/optimizer/pass/compiletime/annotating/DefaultScheduleGroupPassTest.java
+++
b/compiler/test/src/test/java/edu/snu/nemo/compiler/optimizer/pass/compiletime/annotating/DefaultScheduleGroupPassTest.java
@@ -102,7 +102,7 @@ public final class DefaultScheduleGroupPassTest {
dagBuilder.addVertex(vertex);
}
for (final IREdge edge : Arrays.asList(e0, e1, e2, e3, e4)) {
- edge.getExecutionProperties().put(DataFlowProperty.of(dataFlowModel));
+ edge.setProperty(DataFlowProperty.of(dataFlowModel));
dagBuilder.connectVertices(edge);
}
return Pair.of(dagBuilder.buildWithoutSourceSinkCheck(), vertices);
@@ -142,7 +142,7 @@ public final class DefaultScheduleGroupPassTest {
dagBuilder.addVertex(vertex);
}
for (final IREdge edge : Arrays.asList(e0, e1, e2, e3, e4)) {
- edge.getExecutionProperties().put(DataFlowProperty.of(dataFlowModel));
+ edge.setProperty(DataFlowProperty.of(dataFlowModel));
dagBuilder.connectVertices(edge);
}
return Pair.of(dagBuilder.buildWithoutSourceSinkCheck(), vertices);
@@ -275,7 +275,7 @@ public final class DefaultScheduleGroupPassTest {
final Pair<DAG<IRVertex, IREdge>, List<IRVertex>> dag
= generateJoinDAG(CommunicationPatternProperty.Value.OneToOne,
DataFlowProperty.Value.Push);
dag.left().getOutgoingEdgesOf(dag.right().get(1)).iterator().next()
-
.getExecutionProperties().put(DataFlowProperty.of(DataFlowProperty.Value.Pull));
+ .setProperty(DataFlowProperty.of(DataFlowProperty.Value.Pull));
pass.apply(dag.left());
final int idxForFirstScheduleGroup = getScheduleGroup(dag.right().get(0));
final int idxForSecondScheduleGroup = getScheduleGroup(dag.right().get(2));
diff --git
a/examples/spark/src/test/java/edu/snu/nemo/examples/spark/MRJava.java
b/examples/spark/src/test/java/edu/snu/nemo/examples/spark/MRJava.java
index 31ac261..618b89a 100644
--- a/examples/spark/src/test/java/edu/snu/nemo/examples/spark/MRJava.java
+++ b/examples/spark/src/test/java/edu/snu/nemo/examples/spark/MRJava.java
@@ -70,7 +70,7 @@ public final class MRJava {
@Test(timeout = TIMEOUT)
public void testSparkWordAndLineCount() throws Exception {
final String inputFileName = "test_input_wordcount_spark";
- final String outputFileName = "test_output_wordcount_spark";
+ final String outputFileName = "test_output_word_and_line_count";
final String expectedOutputFilename =
"expected_output_word_and_line_count";
final String inputFilePath = fileBasePath + inputFileName;
final String outputFilePath = fileBasePath + outputFileName;
diff --git
a/runtime/common/src/main/java/edu/snu/nemo/runtime/common/plan/PhysicalPlanGenerator.java
b/runtime/common/src/main/java/edu/snu/nemo/runtime/common/plan/PhysicalPlanGenerator.java
index 364a938..ca4124b 100644
---
a/runtime/common/src/main/java/edu/snu/nemo/runtime/common/plan/PhysicalPlanGenerator.java
+++
b/runtime/common/src/main/java/edu/snu/nemo/runtime/common/plan/PhysicalPlanGenerator.java
@@ -299,8 +299,7 @@ public final class PhysicalPlanGenerator implements
Function<DAG<IRVertex, IREdg
dag.topologicalDo(stage -> {
final int scheduleGroup = stageToScheduleGroupMap.get(stage);
stage.getExecutionProperties().put(ScheduleGroupProperty.of(scheduleGroup));
- stage.getIRDAG().topologicalDo(vertex -> vertex.getExecutionProperties()
- .put(ScheduleGroupProperty.of(scheduleGroup)));
+ stage.getIRDAG().topologicalDo(vertex ->
vertex.setProperty(ScheduleGroupProperty.of(scheduleGroup)));
});
}
diff --git
a/runtime/common/src/main/java/edu/snu/nemo/runtime/common/plan/StagePartitioner.java
b/runtime/common/src/main/java/edu/snu/nemo/runtime/common/plan/StagePartitioner.java
index d6926e4..abe0a8f 100644
---
a/runtime/common/src/main/java/edu/snu/nemo/runtime/common/plan/StagePartitioner.java
+++
b/runtime/common/src/main/java/edu/snu/nemo/runtime/common/plan/StagePartitioner.java
@@ -29,7 +29,6 @@ import java.util.*;
import java.util.concurrent.ConcurrentHashMap;
import java.util.function.Function;
import java.util.stream.Collectors;
-import java.util.stream.Stream;
/**
* A function that is responsible for stage partitioning on IR DAG.
@@ -119,7 +118,8 @@ public final class StagePartitioner implements
Function<DAG<IRVertex, IREdge>, M
* @return set of stage-level properties for the stage
*/
public Set<VertexExecutionProperty> getStageProperties(final IRVertex
vertex) {
- final Stream<VertexExecutionProperty> stream =
vertex.getExecutionProperties().stream();
- return stream.filter(p ->
!ignoredPropertyKeys.contains(p.getClass())).collect(Collectors.toSet());
+ return vertex.getExecutionProperties().stream()
+ .filter(p -> !ignoredPropertyKeys.contains(p.getClass()))
+ .collect(Collectors.toSet());
}
}
diff --git
a/runtime/common/src/test/java/edu/snu/nemo/runtime/common/plan/PhysicalPlanGeneratorTest.java
b/runtime/common/src/test/java/edu/snu/nemo/runtime/common/plan/PhysicalPlanGeneratorTest.java
index 8f3c957..2c46194 100644
---
a/runtime/common/src/test/java/edu/snu/nemo/runtime/common/plan/PhysicalPlanGeneratorTest.java
+++
b/runtime/common/src/test/java/edu/snu/nemo/runtime/common/plan/PhysicalPlanGeneratorTest.java
@@ -66,8 +66,8 @@ public final class PhysicalPlanGeneratorTest {
private static final IRVertex newIRVertex(final int scheduleGroup, final int
parallelism) {
final IRVertex irVertex = new OperatorVertex(EMPTY_TRANSFORM);
-
irVertex.getExecutionProperties().put(ScheduleGroupProperty.of(scheduleGroup));
- irVertex.getExecutionProperties().put(ParallelismProperty.of(parallelism));
+ irVertex.setProperty(ScheduleGroupProperty.of(scheduleGroup));
+ irVertex.setProperty(ParallelismProperty.of(parallelism));
return irVertex;
}
@@ -75,7 +75,7 @@ public final class PhysicalPlanGeneratorTest {
final
CommunicationPatternProperty.Value communicationPattern,
final DataFlowProperty.Value
dataFlowModel) {
final IREdge irEdge = new IREdge(communicationPattern, src, dst);
- irEdge.getExecutionProperties().put(DataFlowProperty.of(dataFlowModel));
+ irEdge.setProperty(DataFlowProperty.of(dataFlowModel));
return irEdge;
}
}
diff --git
a/runtime/common/src/test/java/edu/snu/nemo/runtime/common/plan/StagePartitionerTest.java
b/runtime/common/src/test/java/edu/snu/nemo/runtime/common/plan/StagePartitionerTest.java
index 5c26e45..d7eedcd 100644
---
a/runtime/common/src/test/java/edu/snu/nemo/runtime/common/plan/StagePartitionerTest.java
+++
b/runtime/common/src/test/java/edu/snu/nemo/runtime/common/plan/StagePartitionerTest.java
@@ -58,9 +58,9 @@ public final class StagePartitionerTest {
private static IRVertex newVertex(final int parallelism, final int
scheduleGroup,
final List<VertexExecutionProperty>
otherProperties) {
final IRVertex vertex = new OperatorVertex(EMPTY_TRANSFORM);
- vertex.getExecutionProperties().put(ParallelismProperty.of(parallelism));
-
vertex.getExecutionProperties().put(ScheduleGroupProperty.of(scheduleGroup));
- otherProperties.forEach(property ->
vertex.getExecutionProperties().put(property));
+ vertex.setProperty(ParallelismProperty.of(parallelism));
+ vertex.setProperty(ScheduleGroupProperty.of(scheduleGroup));
+ otherProperties.forEach(property -> vertex.setProperty(property));
return vertex;
}
diff --git
a/runtime/executor/src/test/java/edu/snu/nemo/runtime/executor/datatransfer/DataTransferTest.java
b/runtime/executor/src/test/java/edu/snu/nemo/runtime/executor/datatransfer/DataTransferTest.java
index f309e49..c755543 100644
---
a/runtime/executor/src/test/java/edu/snu/nemo/runtime/executor/datatransfer/DataTransferTest.java
+++
b/runtime/executor/src/test/java/edu/snu/nemo/runtime/executor/datatransfer/DataTransferTest.java
@@ -481,13 +481,11 @@ public final class DataTransferTest {
// Src setup
final SourceVertex srcVertex = new
EmptyComponents.EmptySourceVertex("Source");
- final ExecutionPropertyMap srcVertexProperties =
srcVertex.getExecutionProperties();
- srcVertexProperties.put(ParallelismProperty.of(PARALLELISM_TEN));
+ srcVertex.setProperty(ParallelismProperty.of(PARALLELISM_TEN));
// Dst setup
final SourceVertex dstVertex = new
EmptyComponents.EmptySourceVertex("Destination");
- final ExecutionPropertyMap dstVertexProperties =
dstVertex.getExecutionProperties();
- dstVertexProperties.put(ParallelismProperty.of(PARALLELISM_TEN));
+ dstVertex.setProperty(ParallelismProperty.of(PARALLELISM_TEN));
return Pair.of(srcVertex, dstVertex);
}
@@ -503,13 +501,11 @@ public final class DataTransferTest {
// Src setup
final SourceVertex srcVertex = new
EmptyComponents.EmptySourceVertex("Source");
- final ExecutionPropertyMap srcVertexProperties =
srcVertex.getExecutionProperties();
- srcVertexProperties.put(ParallelismProperty.of(PARALLELISM_TEN));
+ srcVertex.setProperty(ParallelismProperty.of(PARALLELISM_TEN));
// Dst setup
final SourceVertex dstVertex = new
EmptyComponents.EmptySourceVertex("Destination");
- final ExecutionPropertyMap dstVertexProperties =
dstVertex.getExecutionProperties();
- dstVertexProperties.put(ParallelismProperty.of(PARALLELISM_TEN));
+ dstVertex.setProperty(ParallelismProperty.of(PARALLELISM_TEN));
return Pair.of(srcVertex, dstVertex);
}