This is an automated email from the ASF dual-hosted git repository.
sanha pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/incubator-nemo.git
The following commit(s) were added to refs/heads/master by this push:
new 1fc9791 [NEMO-192] Remove IRDAG from PhysicalPlan (#108)
1fc9791 is described below
commit 1fc979162b493a72b2e1898bc189aa4c35d4b87d
Author: Jeongyoon Eo <[email protected]>
AuthorDate: Mon Aug 20 00:31:17 2018 +0900
[NEMO-192] Remove IRDAG from PhysicalPlan (#108)
JIRA: [NEMO-192: Remove IRDAG from
PhysicalPlan](https://issues.apache.org/jira/projects/NEMO/issues/NEMO-192)
**Major changes:**
- Removed IRDAG from PhysicalPlan
**Minor changes to note:**
- N/A
**Tests for the changes:**
- N/A
**Other comments:**
- Now dynamic optimization is done via StageEdge. We need a revised version
of IR-based dynamic optimization, considering issues related with RDD caching
and reshaping IR DAG in run-time. This will be handled with issue
[NEMO-193](https://issues.apache.org/jira/projects/NEMO/issues/NEMO-193).
resolves
[NEMO-192](https://issues.apache.org/jira/projects/NEMO/issues/NEMO-192)
---
.../nemo/compiler/backend/nemo/NemoBackend.java | 2 +-
.../eventhandler/DynamicOptimizationEvent.java | 8 +--
.../DynamicOptimizationEventHandler.java | 4 +-
.../runtime/common/optimizer/RunTimeOptimizer.java | 32 +++--------
.../pass/runtime/DataSkewRuntimePass.java | 37 ++++++-------
.../common/optimizer/pass/runtime/RuntimePass.java | 6 +--
.../snu/nemo/runtime/common/plan/PhysicalPlan.java | 11 ----
.../snu/nemo/runtime/common/plan/StageEdge.java | 62 ++++++++++++++++++++++
.../runtime/executor/datatransfer/InputReader.java | 7 +--
.../runtime/master/scheduler/BatchScheduler.java | 23 +++-----
.../runtime/common/plan/TestPlanGenerator.java | 2 +-
11 files changed, 108 insertions(+), 86 deletions(-)
diff --git
a/compiler/backend/src/main/java/edu/snu/nemo/compiler/backend/nemo/NemoBackend.java
b/compiler/backend/src/main/java/edu/snu/nemo/compiler/backend/nemo/NemoBackend.java
index 4789a5c..696acff 100644
---
a/compiler/backend/src/main/java/edu/snu/nemo/compiler/backend/nemo/NemoBackend.java
+++
b/compiler/backend/src/main/java/edu/snu/nemo/compiler/backend/nemo/NemoBackend.java
@@ -61,6 +61,6 @@ public final class NemoBackend implements
Backend<PhysicalPlan> {
public PhysicalPlan compile(final DAG<IRVertex, IREdge> irDAG,
final PhysicalPlanGenerator
physicalPlanGenerator) {
final DAG<Stage, StageEdge> stageDAG = physicalPlanGenerator.apply(irDAG);
- return new PhysicalPlan(RuntimeIdManager.generatePhysicalPlanId(), irDAG,
stageDAG);
+ return new PhysicalPlan(RuntimeIdManager.generatePhysicalPlanId(),
stageDAG);
}
}
diff --git
a/runtime/common/src/main/java/edu/snu/nemo/runtime/common/eventhandler/DynamicOptimizationEvent.java
b/runtime/common/src/main/java/edu/snu/nemo/runtime/common/eventhandler/DynamicOptimizationEvent.java
index 9a8b26a..341f0d1 100644
---
a/runtime/common/src/main/java/edu/snu/nemo/runtime/common/eventhandler/DynamicOptimizationEvent.java
+++
b/runtime/common/src/main/java/edu/snu/nemo/runtime/common/eventhandler/DynamicOptimizationEvent.java
@@ -19,8 +19,8 @@
package edu.snu.nemo.runtime.common.eventhandler;
import edu.snu.nemo.common.eventhandler.RuntimeEvent;
-import edu.snu.nemo.common.ir.edge.IREdge;
import edu.snu.nemo.runtime.common.plan.PhysicalPlan;
+import edu.snu.nemo.runtime.common.plan.StageEdge;
/**
* An event for triggering dynamic optimization.
@@ -30,7 +30,7 @@ public final class DynamicOptimizationEvent implements
RuntimeEvent {
private final Object dynOptData;
private final String taskId;
private final String executorId;
- private final IREdge targetEdge;
+ private final StageEdge targetEdge;
/**
* Default constructor.
@@ -42,7 +42,7 @@ public final class DynamicOptimizationEvent implements
RuntimeEvent {
final Object dynOptData,
final String taskId,
final String executorId,
- final IREdge targetEdge) {
+ final StageEdge targetEdge) {
this.physicalPlan = physicalPlan;
this.taskId = taskId;
this.dynOptData = dynOptData;
@@ -75,7 +75,7 @@ public final class DynamicOptimizationEvent implements
RuntimeEvent {
return this.dynOptData;
}
- public IREdge getTargetEdge() {
+ public StageEdge getTargetEdge() {
return this.targetEdge;
}
}
diff --git
a/runtime/common/src/main/java/edu/snu/nemo/runtime/common/eventhandler/DynamicOptimizationEventHandler.java
b/runtime/common/src/main/java/edu/snu/nemo/runtime/common/eventhandler/DynamicOptimizationEventHandler.java
index a642248..9cd4ca2 100644
---
a/runtime/common/src/main/java/edu/snu/nemo/runtime/common/eventhandler/DynamicOptimizationEventHandler.java
+++
b/runtime/common/src/main/java/edu/snu/nemo/runtime/common/eventhandler/DynamicOptimizationEventHandler.java
@@ -20,9 +20,9 @@ package edu.snu.nemo.runtime.common.eventhandler;
import edu.snu.nemo.common.eventhandler.PubSubEventHandlerWrapper;
import edu.snu.nemo.common.eventhandler.RuntimeEventHandler;
-import edu.snu.nemo.common.ir.edge.IREdge;
import edu.snu.nemo.runtime.common.optimizer.RunTimeOptimizer;
import edu.snu.nemo.runtime.common.plan.PhysicalPlan;
+import edu.snu.nemo.runtime.common.plan.StageEdge;
import org.apache.reef.wake.impl.PubSubEventHandler;
import javax.inject.Inject;
@@ -51,7 +51,7 @@ public final class DynamicOptimizationEventHandler implements
RuntimeEventHandle
public void onNext(final DynamicOptimizationEvent dynamicOptimizationEvent) {
final PhysicalPlan physicalPlan =
dynamicOptimizationEvent.getPhysicalPlan();
final Object dynOptData = dynamicOptimizationEvent.getDynOptData();
- final IREdge targetEdge = dynamicOptimizationEvent.getTargetEdge();
+ final StageEdge targetEdge = dynamicOptimizationEvent.getTargetEdge();
final PhysicalPlan newPlan =
RunTimeOptimizer.dynamicOptimization(physicalPlan, dynOptData, targetEdge);
diff --git
a/runtime/common/src/main/java/edu/snu/nemo/runtime/common/optimizer/RunTimeOptimizer.java
b/runtime/common/src/main/java/edu/snu/nemo/runtime/common/optimizer/RunTimeOptimizer.java
index d9ca024..9611326 100644
---
a/runtime/common/src/main/java/edu/snu/nemo/runtime/common/optimizer/RunTimeOptimizer.java
+++
b/runtime/common/src/main/java/edu/snu/nemo/runtime/common/optimizer/RunTimeOptimizer.java
@@ -16,17 +16,9 @@
package edu.snu.nemo.runtime.common.optimizer;
import edu.snu.nemo.common.Pair;
-import edu.snu.nemo.common.dag.DAG;
-import edu.snu.nemo.common.ir.edge.IREdge;
-import edu.snu.nemo.common.ir.vertex.IRVertex;
-import edu.snu.nemo.runtime.common.RuntimeIdManager;
import edu.snu.nemo.runtime.common.optimizer.pass.runtime.DataSkewRuntimePass;
import edu.snu.nemo.runtime.common.plan.PhysicalPlan;
-import edu.snu.nemo.runtime.common.plan.PhysicalPlanGenerator;
-import edu.snu.nemo.runtime.common.plan.Stage;
import edu.snu.nemo.runtime.common.plan.StageEdge;
-import org.apache.reef.tang.Tang;
-import org.apache.reef.tang.exceptions.InjectionException;
import java.util.*;
@@ -48,22 +40,12 @@ public final class RunTimeOptimizer {
public static synchronized PhysicalPlan dynamicOptimization(
final PhysicalPlan originalPlan,
final Object dynOptData,
- final IREdge targetEdge) {
- try {
- final PhysicalPlanGenerator physicalPlanGenerator =
-
Tang.Factory.getTang().newInjector().getInstance(PhysicalPlanGenerator.class);
-
- // Data for dynamic optimization used in DataSkewRuntimePass
- // is a map of <hash value, partition size>.
- final DAG<IRVertex, IREdge> newIrDAG =
- new DataSkewRuntimePass()
- .apply(originalPlan.getIrDAG(), Pair.of(targetEdge,
(Map<Integer, Long>) dynOptData));
- final DAG<Stage, StageEdge> stageDAG =
physicalPlanGenerator.apply(newIrDAG);
- final PhysicalPlan physicalPlan =
- new PhysicalPlan(RuntimeIdManager.generatePhysicalPlanId(),
newIrDAG, stageDAG);
- return physicalPlan;
- } catch (final InjectionException e) {
- throw new RuntimeException(e);
- }
+ final StageEdge targetEdge) {
+ // Data for dynamic optimization used in DataSkewRuntimePass
+ // is a map of <hash value, partition size>.
+ final PhysicalPlan physicalPlan =
+ new DataSkewRuntimePass()
+ .apply(originalPlan, Pair.of(targetEdge, (Map<Integer, Long>)
dynOptData));
+ return physicalPlan;
}
}
diff --git
a/runtime/common/src/main/java/edu/snu/nemo/runtime/common/optimizer/pass/runtime/DataSkewRuntimePass.java
b/runtime/common/src/main/java/edu/snu/nemo/runtime/common/optimizer/pass/runtime/DataSkewRuntimePass.java
index 67afbad..572cbd1 100644
---
a/runtime/common/src/main/java/edu/snu/nemo/runtime/common/optimizer/pass/runtime/DataSkewRuntimePass.java
+++
b/runtime/common/src/main/java/edu/snu/nemo/runtime/common/optimizer/pass/runtime/DataSkewRuntimePass.java
@@ -16,19 +16,17 @@
package edu.snu.nemo.runtime.common.optimizer.pass.runtime;
import com.google.common.annotations.VisibleForTesting;
-import edu.snu.nemo.common.DataSkewMetricFactory;
import edu.snu.nemo.common.Pair;
import edu.snu.nemo.common.dag.DAG;
import edu.snu.nemo.common.eventhandler.RuntimeEventHandler;
-import edu.snu.nemo.common.ir.edge.IREdge;
-import edu.snu.nemo.common.ir.edge.executionproperty.DataSkewMetricProperty;
-import edu.snu.nemo.common.ir.edge.executionproperty.MetricCollectionProperty;
-import edu.snu.nemo.common.ir.vertex.IRVertex;
import edu.snu.nemo.common.ir.vertex.executionproperty.ParallelismProperty;
import edu.snu.nemo.common.KeyRange;
import edu.snu.nemo.common.HashRange;
import
edu.snu.nemo.runtime.common.eventhandler.DynamicOptimizationEventHandler;
+import edu.snu.nemo.runtime.common.plan.PhysicalPlan;
+import edu.snu.nemo.runtime.common.plan.Stage;
+import edu.snu.nemo.runtime.common.plan.StageEdge;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
@@ -41,7 +39,7 @@ import java.util.stream.Collectors;
* this RuntimePass identifies a number of keys with big partition
sizes(skewed key)
* and evenly redistributes data via overwriting incoming edges of destination
tasks.
*/
-public final class DataSkewRuntimePass extends RuntimePass<Pair<IREdge,
Map<Integer, Long>>> {
+public final class DataSkewRuntimePass extends RuntimePass<Pair<StageEdge,
Map<Integer, Long>>> {
private static final Logger LOG =
LoggerFactory.getLogger(DataSkewRuntimePass.class.getName());
private final Set<Class<? extends RuntimeEventHandler>> eventHandlers;
// Skewed keys denote for top n keys in terms of partition size.
@@ -72,16 +70,9 @@ public final class DataSkewRuntimePass extends
RuntimePass<Pair<IREdge, Map<Inte
}
@Override
- public DAG<IRVertex, IREdge> apply(final DAG<IRVertex, IREdge> irDAG,
- final Pair<IREdge, Map<Integer, Long>>
metricData) {
- // get edges to optimize
- final List<IREdge> optimizationEdges = irDAG.getVertices().stream()
- .flatMap(v -> irDAG.getIncomingEdgesOf(v).stream())
- .filter(e ->
Optional.of(MetricCollectionProperty.Value.DataSkewRuntimePass)
- .equals(e.getPropertyValue(MetricCollectionProperty.class)))
- .collect(Collectors.toList());
-
- final IREdge targetEdge = metricData.left();
+ public PhysicalPlan apply(final PhysicalPlan originalPlan,
+ final Pair<StageEdge, Map<Integer, Long>>
metricData) {
+ final StageEdge targetEdge = metricData.left();
// Get number of evaluators of the next stage (number of blocks).
final Integer dstParallelism =
targetEdge.getDst().getPropertyValue(ParallelismProperty.class).get();
@@ -91,9 +82,19 @@ public final class DataSkewRuntimePass extends
RuntimePass<Pair<IREdge, Map<Inte
for (int i = 0; i < dstParallelism; i++) {
taskIdxToKeyRange.put(i, keyRanges.get(i));
}
+
// Overwrite the previously assigned key range in the physical DAG with
the new range.
- targetEdge.setProperty(DataSkewMetricProperty.of(new
DataSkewMetricFactory(taskIdxToKeyRange)));
- return irDAG;
+ final DAG<Stage, StageEdge> stageDAG = originalPlan.getStageDAG();
+ for (Stage stage : stageDAG.getVertices()) {
+ List<StageEdge> stageEdges = stageDAG.getOutgoingEdgesOf(stage);
+ for (StageEdge edge : stageEdges) {
+ if (edge.equals(targetEdge)) {
+ edge.setTaskIdxToKeyRange(taskIdxToKeyRange);
+ }
+ }
+ }
+
+ return new PhysicalPlan(originalPlan.getId(), stageDAG);
}
public List<Integer> identifySkewedKeys(final Map<Integer, Long>
keyValToPartitionSizeMap) {
diff --git
a/runtime/common/src/main/java/edu/snu/nemo/runtime/common/optimizer/pass/runtime/RuntimePass.java
b/runtime/common/src/main/java/edu/snu/nemo/runtime/common/optimizer/pass/runtime/RuntimePass.java
index 5a8471f..f30403f 100644
---
a/runtime/common/src/main/java/edu/snu/nemo/runtime/common/optimizer/pass/runtime/RuntimePass.java
+++
b/runtime/common/src/main/java/edu/snu/nemo/runtime/common/optimizer/pass/runtime/RuntimePass.java
@@ -15,11 +15,9 @@
*/
package edu.snu.nemo.runtime.common.optimizer.pass.runtime;
-import edu.snu.nemo.common.dag.DAG;
import edu.snu.nemo.common.eventhandler.RuntimeEventHandler;
-import edu.snu.nemo.common.ir.edge.IREdge;
-import edu.snu.nemo.common.ir.vertex.IRVertex;
import edu.snu.nemo.common.pass.Pass;
+import edu.snu.nemo.runtime.common.plan.PhysicalPlan;
import java.util.Set;
import java.util.function.BiFunction;
@@ -31,7 +29,7 @@ import java.util.function.BiFunction;
* @param <T> type of the metric data used for dynamic optimization.
*/
public abstract class RuntimePass<T> extends Pass
- implements BiFunction<DAG<IRVertex, IREdge>, T, DAG<IRVertex, IREdge>> {
+ implements BiFunction<PhysicalPlan, T, PhysicalPlan> {
/**
* @return the set of event handlers used with the runtime pass.
*/
diff --git
a/runtime/common/src/main/java/edu/snu/nemo/runtime/common/plan/PhysicalPlan.java
b/runtime/common/src/main/java/edu/snu/nemo/runtime/common/plan/PhysicalPlan.java
index 4286bca..30f7111 100644
---
a/runtime/common/src/main/java/edu/snu/nemo/runtime/common/plan/PhysicalPlan.java
+++
b/runtime/common/src/main/java/edu/snu/nemo/runtime/common/plan/PhysicalPlan.java
@@ -16,7 +16,6 @@
package edu.snu.nemo.runtime.common.plan;
import edu.snu.nemo.common.dag.DAG;
-import edu.snu.nemo.common.ir.edge.IREdge;
import edu.snu.nemo.common.ir.vertex.IRVertex;
import java.io.Serializable;
@@ -28,7 +27,6 @@ import java.util.Map;
*/
public final class PhysicalPlan implements Serializable {
private final String id;
- private final DAG<IRVertex, IREdge> irDAG;
private final DAG<Stage, StageEdge> stageDAG;
private final Map<String, IRVertex> idToIRVertex;
@@ -39,10 +37,8 @@ public final class PhysicalPlan implements Serializable {
* @param stageDAG the DAG of stages.
*/
public PhysicalPlan(final String id,
- final DAG<IRVertex, IREdge> irDAG,
final DAG<Stage, StageEdge> stageDAG) {
this.id = id;
- this.irDAG = irDAG;
this.stageDAG = stageDAG;
idToIRVertex = new HashMap<>();
@@ -74,13 +70,6 @@ public final class PhysicalPlan implements Serializable {
return idToIRVertex;
}
- /**
- * @return IR DAG.
- */
- public DAG<IRVertex, IREdge> getIrDAG() {
- return irDAG;
- }
-
@Override
public String toString() {
return stageDAG.toString();
diff --git
a/runtime/common/src/main/java/edu/snu/nemo/runtime/common/plan/StageEdge.java
b/runtime/common/src/main/java/edu/snu/nemo/runtime/common/plan/StageEdge.java
index 5218b8f..ced7564 100644
---
a/runtime/common/src/main/java/edu/snu/nemo/runtime/common/plan/StageEdge.java
+++
b/runtime/common/src/main/java/edu/snu/nemo/runtime/common/plan/StageEdge.java
@@ -16,11 +16,17 @@
package edu.snu.nemo.runtime.common.plan;
import com.google.common.annotations.VisibleForTesting;
+import edu.snu.nemo.common.HashRange;
+import edu.snu.nemo.common.KeyRange;
import
edu.snu.nemo.common.ir.edge.executionproperty.CommunicationPatternProperty;
import edu.snu.nemo.common.ir.edge.executionproperty.DataFlowProperty;
import edu.snu.nemo.common.ir.executionproperty.EdgeExecutionProperty;
import edu.snu.nemo.common.ir.vertex.IRVertex;
import edu.snu.nemo.common.ir.executionproperty.ExecutionPropertyMap;
+import org.apache.commons.lang3.builder.HashCodeBuilder;
+
+import java.util.HashMap;
+import java.util.Map;
/**
* Edge of a stage that connects an IRVertex of the source stage to an
IRVertex of the destination stage.
@@ -40,6 +46,11 @@ public final class StageEdge extends RuntimeEdge<Stage> {
private final IRVertex dstVertex;
/**
+ * The list between the task idx and key range to read.
+ */
+ private Map<Integer, KeyRange> taskIdxToKeyRange;
+
+ /**
* Value for {@link CommunicationPatternProperty}.
*/
private final CommunicationPatternProperty.Value
dataCommunicationPatternValue;
@@ -71,6 +82,11 @@ public final class StageEdge extends RuntimeEdge<Stage> {
super(runtimeEdgeId, edgeProperties, srcStage, dstStage, isSideInput);
this.srcVertex = srcVertex;
this.dstVertex = dstVertex;
+ // Initialize the key range of each dst task.
+ this.taskIdxToKeyRange = new HashMap<>();
+ for (int taskIdx = 0; taskIdx < dstStage.getParallelism(); taskIdx++) {
+ taskIdxToKeyRange.put(taskIdx, HashRange.of(taskIdx, taskIdx + 1,
false));
+ }
this.dataCommunicationPatternValue =
edgeProperties.get(CommunicationPatternProperty.class)
.orElseThrow(() -> new RuntimeException(String.format(
"CommunicationPatternProperty not set for %s", runtimeEdgeId)));
@@ -110,6 +126,52 @@ public final class StageEdge extends RuntimeEdge<Stage> {
}
/**
+ * @return the list between the task idx and key range to read.
+ */
+ public Map<Integer, KeyRange> getTaskIdxToKeyRange() {
+ return taskIdxToKeyRange;
+ }
+
+ /**
+ * Sets the task idx to key range list.
+ *
+ * @param taskIdxToKeyRange the list to set.
+ */
+ public void setTaskIdxToKeyRange(final Map<Integer, KeyRange>
taskIdxToKeyRange) {
+ this.taskIdxToKeyRange = taskIdxToKeyRange;
+ }
+
+ /**
+ * @param edge edge to compare.
+ * @return whether or not the edge has the same itinerary
+ */
+ public Boolean hasSameItineraryAs(final StageEdge edge) {
+ return getSrc().equals(edge.getSrc()) && getDst().equals(edge.getDst());
+ }
+
+ @Override
+ public boolean equals(final Object o) {
+ if (this == o) {
+ return true;
+ }
+ if (o == null || getClass() != o.getClass()) {
+ return false;
+ }
+
+ final StageEdge stageEdge = (StageEdge) o;
+ return getExecutionProperties().equals(stageEdge.getExecutionProperties())
&& hasSameItineraryAs(stageEdge);
+ }
+
+ @Override
+ public int hashCode() {
+ return new HashCodeBuilder(17, 37)
+ .append(getSrc().hashCode())
+ .append(getDst().hashCode())
+ .append(getExecutionProperties())
+ .toHashCode();
+ }
+
+ /**
* @return {@link CommunicationPatternProperty} value.
*/
public CommunicationPatternProperty.Value getDataCommunicationPattern() {
diff --git
a/runtime/executor/src/main/java/edu/snu/nemo/runtime/executor/datatransfer/InputReader.java
b/runtime/executor/src/main/java/edu/snu/nemo/runtime/executor/datatransfer/InputReader.java
index eadd8bf..7637660 100644
---
a/runtime/executor/src/main/java/edu/snu/nemo/runtime/executor/datatransfer/InputReader.java
+++
b/runtime/executor/src/main/java/edu/snu/nemo/runtime/executor/datatransfer/InputReader.java
@@ -16,8 +16,6 @@
package edu.snu.nemo.runtime.executor.datatransfer;
import com.google.common.annotations.VisibleForTesting;
-import edu.snu.nemo.common.DataSkewMetricFactory;
-import edu.snu.nemo.common.ir.edge.executionproperty.*;
import
edu.snu.nemo.common.ir.edge.executionproperty.CommunicationPatternProperty;
import edu.snu.nemo.common.ir.edge.executionproperty.DataStoreProperty;
import
edu.snu.nemo.common.ir.edge.executionproperty.DuplicateEdgeGroupProperty;
@@ -117,9 +115,8 @@ public final class InputReader extends DataTransfer {
assert (runtimeEdge instanceof StageEdge);
final Optional<DataStoreProperty.Value> dataStoreProperty
= runtimeEdge.getPropertyValue(DataStoreProperty.class);
- final DataSkewMetricFactory metricFactory =
- (DataSkewMetricFactory)
runtimeEdge.getExecutionProperties().get(DataSkewMetricProperty.class).get();
- final KeyRange hashRangeToRead =
metricFactory.getMetric().get(dstTaskIndex);
+ ((StageEdge) runtimeEdge).getTaskIdxToKeyRange().get(dstTaskIndex);
+ final KeyRange hashRangeToRead = ((StageEdge)
runtimeEdge).getTaskIdxToKeyRange().get(dstTaskIndex);
if (hashRangeToRead == null) {
throw new BlockFetchException(
new Throwable("The hash range to read is not assigned to " +
dstTaskIndex + "'th task"));
diff --git
a/runtime/master/src/main/java/edu/snu/nemo/runtime/master/scheduler/BatchScheduler.java
b/runtime/master/src/main/java/edu/snu/nemo/runtime/master/scheduler/BatchScheduler.java
index 71e7d3f..243f739 100644
---
a/runtime/master/src/main/java/edu/snu/nemo/runtime/master/scheduler/BatchScheduler.java
+++
b/runtime/master/src/main/java/edu/snu/nemo/runtime/master/scheduler/BatchScheduler.java
@@ -19,9 +19,7 @@ import com.google.common.collect.Sets;
import edu.snu.nemo.common.Pair;
import edu.snu.nemo.common.eventhandler.PubSubEventHandlerWrapper;
import edu.snu.nemo.common.ir.Readable;
-import edu.snu.nemo.common.ir.edge.IREdge;
import edu.snu.nemo.common.ir.edge.executionproperty.MetricCollectionProperty;
-import edu.snu.nemo.common.ir.vertex.IRVertex;
import edu.snu.nemo.runtime.common.RuntimeIdManager;
import edu.snu.nemo.runtime.common.eventhandler.DynamicOptimizationEvent;
import edu.snu.nemo.runtime.common.plan.*;
@@ -330,27 +328,22 @@ public final class BatchScheduler implements Scheduler {
});
}
- private IREdge getEdgeToOptimize(final String taskId) {
+ private StageEdge getEdgeToOptimize(final String taskId) {
// Get a stage including the given task
final Stage stagePutOnHold =
physicalPlan.getStageDAG().getVertices().stream()
- .filter(stage ->
stage.getId().equals(RuntimeIdManager.getStageIdFromTaskId(taskId)))
- .findFirst()
- .orElseThrow(() -> new RuntimeException());
+ .filter(stage ->
stage.getId().equals(RuntimeIdManager.getStageIdFromTaskId(taskId)))
+ .findFirst()
+ .orElseThrow(() -> new RuntimeException());
// Get outgoing edges of that stage with MetricCollectionProperty
List<StageEdge> stageEdges =
physicalPlan.getStageDAG().getOutgoingEdgesOf(stagePutOnHold);
- IREdge targetEdge = null;
for (StageEdge edge : stageEdges) {
- final IRVertex srcIRVertex = edge.getSrcIRVertex();
- final IRVertex dstIRVertex = edge.getDstIRVertex();
- targetEdge = physicalPlan.getIrDAG().getEdgeBetween(srcIRVertex.getId(),
dstIRVertex.getId());
- if (MetricCollectionProperty.Value.DataSkewRuntimePass
-
.equals(targetEdge.getPropertyValue(MetricCollectionProperty.class).get())) {
- break;
+ if
(edge.getExecutionProperties().containsKey(MetricCollectionProperty.class)) {
+ return edge;
}
}
- return targetEdge;
+ return null;
}
/**
@@ -370,7 +363,7 @@ public final class BatchScheduler implements Scheduler {
final boolean stageComplete =
planStateManager.getStageState(stageIdForTaskUponCompletion).equals(StageState.State.COMPLETE);
- final IREdge targetEdge = getEdgeToOptimize(taskId);
+ final StageEdge targetEdge = getEdgeToOptimize(taskId);
if (targetEdge == null) {
throw new RuntimeException("No edges specified for data skew
optimization");
}
diff --git
a/runtime/test/src/main/java/edu/snu/nemo/runtime/common/plan/TestPlanGenerator.java
b/runtime/test/src/main/java/edu/snu/nemo/runtime/common/plan/TestPlanGenerator.java
index 3c23dcf..3b3646e 100644
---
a/runtime/test/src/main/java/edu/snu/nemo/runtime/common/plan/TestPlanGenerator.java
+++
b/runtime/test/src/main/java/edu/snu/nemo/runtime/common/plan/TestPlanGenerator.java
@@ -95,7 +95,7 @@ public final class TestPlanGenerator {
final Policy policy) throws
Exception {
final DAG<IRVertex, IREdge> optimized =
policy.runCompileTimeOptimization(irDAG, EMPTY_DAG_DIRECTORY);
final DAG<Stage, StageEdge> physicalDAG = PLAN_GENERATOR.apply(optimized);
- return new PhysicalPlan("TestPlan", irDAG, physicalDAG);
+ return new PhysicalPlan("TestPlan", physicalDAG);
}
/**