jeongyooneo closed pull request #38: [NEMO-48] Do not merge broadcasted data
into a single partition
URL: https://github.com/apache/incubator-nemo/pull/38
This is a PR merged from a forked repository.
As GitHub hides the original diff on merge, it is displayed below for
the sake of provenance:
As this is a foreign pull request (from a fork), the diff is supplied
below (as it won't show otherwise due to GitHub magic):
diff --git a/common/src/main/java/edu/snu/nemo/common/dag/DAGBuilder.java
b/common/src/main/java/edu/snu/nemo/common/dag/DAGBuilder.java
index 3b586374..2d90388d 100644
--- a/common/src/main/java/edu/snu/nemo/common/dag/DAGBuilder.java
+++ b/common/src/main/java/edu/snu/nemo/common/dag/DAGBuilder.java
@@ -16,8 +16,6 @@
package edu.snu.nemo.common.dag;
import edu.snu.nemo.common.ir.edge.IREdge;
-import
edu.snu.nemo.common.ir.edge.executionproperty.DataCommunicationPatternProperty;
-//import
edu.snu.nemo.common.ir.edge.executionproperty.MetricCollectionProperty;
import edu.snu.nemo.common.ir.edge.executionproperty.MetricCollectionProperty;
import edu.snu.nemo.common.ir.vertex.IRVertex;
import edu.snu.nemo.common.ir.vertex.OperatorVertex;
@@ -243,15 +241,6 @@ private void sinkCheck() {
* Helper method to check that all execution properties are correct and
makes sense.
*/
private void executionPropertyCheck() {
- // SideInput edge must be broadcast
- vertices.forEach(v -> incomingEdges.get(v).stream().filter(e -> e
instanceof IREdge).map(e -> (IREdge) e)
- .filter(e -> Boolean.TRUE.equals(e.isSideInput()))
- .filter(e ->
!(e.getProperty(ExecutionProperty.Key.DataCommunicationPattern))
- .equals(DataCommunicationPatternProperty.Value.BroadCast))
- .forEach(e -> {
- throw new RuntimeException("DAG execution property check: "
- + "SideInput edge must be broadcast: " + e.getId());
- }));
// SideInput is not compatible with Push
vertices.forEach(v -> incomingEdges.get(v).stream().filter(e -> e
instanceof IREdge).map(e -> (IREdge) e)
.filter(e -> Boolean.TRUE.equals(e.isSideInput()))
diff --git
a/compiler/frontend/beam/src/main/java/edu/snu/nemo/compiler/frontend/beam/NemoPipelineVisitor.java
b/compiler/frontend/beam/src/main/java/edu/snu/nemo/compiler/frontend/beam/NemoPipelineVisitor.java
index e3072855..bf3253be 100644
---
a/compiler/frontend/beam/src/main/java/edu/snu/nemo/compiler/frontend/beam/NemoPipelineVisitor.java
+++
b/compiler/frontend/beam/src/main/java/edu/snu/nemo/compiler/frontend/beam/NemoPipelineVisitor.java
@@ -263,7 +263,7 @@ private static BeamCoder getCoderForView(final ViewFn
viewFn, final Coder beamIn
if (dstTransform instanceof GroupByKeyTransform) {
return DataCommunicationPatternProperty.Value.Shuffle;
}
- if (srcTransform instanceof CreateViewTransform || dstTransform instanceof
CreateViewTransform) {
+ if (dstTransform instanceof CreateViewTransform) {
return DataCommunicationPatternProperty.Value.BroadCast;
}
return DataCommunicationPatternProperty.Value.OneToOne;
diff --git
a/compiler/optimizer/src/main/java/edu/snu/nemo/compiler/optimizer/pass/compiletime/annotating/DefaultParallelismPass.java
b/compiler/optimizer/src/main/java/edu/snu/nemo/compiler/optimizer/pass/compiletime/annotating/DefaultParallelismPass.java
index adf5218b..6b90111c 100644
---
a/compiler/optimizer/src/main/java/edu/snu/nemo/compiler/optimizer/pass/compiletime/annotating/DefaultParallelismPass.java
+++
b/compiler/optimizer/src/main/java/edu/snu/nemo/compiler/optimizer/pass/compiletime/annotating/DefaultParallelismPass.java
@@ -78,7 +78,7 @@ public DefaultParallelismPass(final int
desiredSourceParallelism,
// No reason to propagate via Broadcast edges, as the data streams
that will use the broadcasted data
// as a sideInput will have their own number of parallelism
final Integer o2oParallelism = inEdges.stream()
- .filter(edge -> DataCommunicationPatternProperty.Value.OneToOne
+ .filter(edge -> DataCommunicationPatternProperty.Value.OneToOne
.equals(edge.getProperty(ExecutionProperty.Key.DataCommunicationPattern)))
.mapToInt(edge ->
edge.getSrc().getProperty(ExecutionProperty.Key.Parallelism))
.max().orElse(1);
diff --git
a/compiler/optimizer/src/main/java/edu/snu/nemo/compiler/optimizer/pass/compiletime/annotating/DefaultStagePartitioningPass.java
b/compiler/optimizer/src/main/java/edu/snu/nemo/compiler/optimizer/pass/compiletime/annotating/DefaultStagePartitioningPass.java
index a89fa6a1..e9c256c8 100644
---
a/compiler/optimizer/src/main/java/edu/snu/nemo/compiler/optimizer/pass/compiletime/annotating/DefaultStagePartitioningPass.java
+++
b/compiler/optimizer/src/main/java/edu/snu/nemo/compiler/optimizer/pass/compiletime/annotating/DefaultStagePartitioningPass.java
@@ -79,7 +79,7 @@ public DefaultStagePartitioningPass() {
final Optional<List<IREdge>> inEdgeList = (inEdges == null ||
inEdges.isEmpty())
? Optional.empty() : Optional.of(inEdges);
- if (!inEdgeList.isPresent() || inEdgeList.get().size() > 1) { // If
Source vertex or has multiple inEdges
+ if (!inEdgeList.isPresent()) { // If Source vertex
createNewStage(vertex, vertexStageNumHashMap, stageNumber,
vertexListForEachStage);
} else {
// Filter candidate incoming edges that can be included in a stage
with the vertex.
diff --git
a/runtime/common/src/main/java/edu/snu/nemo/runtime/common/plan/PhysicalPlanGenerator.java
b/runtime/common/src/main/java/edu/snu/nemo/runtime/common/plan/PhysicalPlanGenerator.java
index 696df976..fae871a7 100644
---
a/runtime/common/src/main/java/edu/snu/nemo/runtime/common/plan/PhysicalPlanGenerator.java
+++
b/runtime/common/src/main/java/edu/snu/nemo/runtime/common/plan/PhysicalPlanGenerator.java
@@ -186,7 +186,8 @@ private void handleDuplicateEdgeGroupProperty(final
DAG<Stage, StageEdge> dagOfS
irEdge.getId(),
irEdge.getExecutionProperties(),
irEdge.getSrc(),
- irEdge.getDst()));
+ irEdge.getDst(),
+ irEdge.isSideInput()));
} else { // edge comes from another stage
final Stage srcStage = vertexStageMap.get(srcVertex);
diff --git
a/runtime/common/src/main/java/edu/snu/nemo/runtime/common/plan/RuntimeEdge.java
b/runtime/common/src/main/java/edu/snu/nemo/runtime/common/plan/RuntimeEdge.java
index a2c52a7f..e1a11acc 100644
---
a/runtime/common/src/main/java/edu/snu/nemo/runtime/common/plan/RuntimeEdge.java
+++
b/runtime/common/src/main/java/edu/snu/nemo/runtime/common/plan/RuntimeEdge.java
@@ -28,22 +28,6 @@
private final ExecutionPropertyMap executionProperties;
private final Boolean isSideInput;
- /**
- * Constructs the edge given the below parameters.
- * This constructor assumes that this edge is not for a side input.
- *
- * @param runtimeEdgeId the id of this edge.
- * @param executionProperties to control the data flow on this edge.
- * @param src the source vertex.
- * @param dst the destination vertex.
- */
- public RuntimeEdge(final String runtimeEdgeId,
- final ExecutionPropertyMap executionProperties,
- final V src,
- final V dst) {
- this(runtimeEdgeId, executionProperties, src, dst, false);
- }
-
/**
* Constructs the edge given the below parameters.
*
diff --git
a/runtime/executor/src/main/java/edu/snu/nemo/runtime/executor/task/ParentTaskDataFetcher.java
b/runtime/executor/src/main/java/edu/snu/nemo/runtime/executor/task/ParentTaskDataFetcher.java
index 2abb3b7f..61a371f5 100644
---
a/runtime/executor/src/main/java/edu/snu/nemo/runtime/executor/task/ParentTaskDataFetcher.java
+++
b/runtime/executor/src/main/java/edu/snu/nemo/runtime/executor/task/ParentTaskDataFetcher.java
@@ -50,9 +50,8 @@
final InputReader readerForParentTask,
final VertexHarness child,
final Map<String, Object> metricMap,
- final boolean isFromSideInput,
final boolean isToSideInput) {
- super(dataSource, child, metricMap, isFromSideInput, isToSideInput);
+ super(dataSource, child, metricMap,
readerForParentTask.isSideInputReader(), isToSideInput);
this.readersForParentTask = readerForParentTask;
this.hasFetchStarted = false;
this.dataQueue = new LinkedBlockingQueue<>();
diff --git
a/runtime/executor/src/main/java/edu/snu/nemo/runtime/executor/task/SourceVertexDataFetcher.java
b/runtime/executor/src/main/java/edu/snu/nemo/runtime/executor/task/SourceVertexDataFetcher.java
index 998df63e..116b9c41 100644
---
a/runtime/executor/src/main/java/edu/snu/nemo/runtime/executor/task/SourceVertexDataFetcher.java
+++
b/runtime/executor/src/main/java/edu/snu/nemo/runtime/executor/task/SourceVertexDataFetcher.java
@@ -35,9 +35,8 @@
final Readable readable,
final VertexHarness child,
final Map<String, Object> metricMap,
- final boolean isFromSideInput,
final boolean isToSideInput) {
- super(dataSource, child, metricMap, isFromSideInput, isToSideInput);
+ super(dataSource, child, metricMap, false, isToSideInput);
this.readable = readable;
}
diff --git
a/runtime/executor/src/main/java/edu/snu/nemo/runtime/executor/task/TaskExecutor.java
b/runtime/executor/src/main/java/edu/snu/nemo/runtime/executor/task/TaskExecutor.java
index 764dddf6..f8ef425e 100644
---
a/runtime/executor/src/main/java/edu/snu/nemo/runtime/executor/task/TaskExecutor.java
+++
b/runtime/executor/src/main/java/edu/snu/nemo/runtime/executor/task/TaskExecutor.java
@@ -153,15 +153,14 @@ public TaskExecutor(final Task task,
// Handle reads
final boolean isToSideInput = isToSideInputs.stream().anyMatch(bool ->
bool);
if (irVertex instanceof SourceVertex) {
- dataFetcherList.add(new SourceVertexDataFetcher(irVertex,
sourceReader.get(), vertexHarness, metricMap,
- false, isToSideInput)); // Source vertex read
+ dataFetcherList.add(new SourceVertexDataFetcher(
+ irVertex, sourceReader.get(), vertexHarness, metricMap,
isToSideInput)); // Source vertex read
}
final List<InputReader> parentTaskReaders =
getParentTaskReaders(taskIndex, irVertex,
task.getTaskIncomingEdges(), dataTransferFactory);
parentTaskReaders.forEach(parentTaskReader -> {
- final boolean isFromSideInput = parentTaskReader.isSideInputReader();
dataFetcherList.add(new
ParentTaskDataFetcher(parentTaskReader.getSrcIrVertex(), parentTaskReader,
- vertexHarness, metricMap, isFromSideInput, isToSideInput)); //
Parent-task read
+ vertexHarness, metricMap, isToSideInput)); // Parent-task read
});
});
----------------------------------------------------------------
This is an automated message from the Apache Git Service.
To respond to the message, please log on GitHub and use the
URL above to go to the specific comment.
For queries about this service, please contact Infrastructure at:
[email protected]
With regards,
Apache Git Services