sanha closed pull request #108: [NEMO-192] Remove IRDAG from PhysicalPlan
URL: https://github.com/apache/incubator-nemo/pull/108
 
 
   

This is a PR merged from a forked repository.
As GitHub hides the original diff on merge, it is displayed below for
the sake of provenance:

As this is a foreign pull request (from a fork), the diff is supplied
below (as it won't show otherwise due to GitHub magic):

diff --git 
a/compiler/backend/src/main/java/edu/snu/nemo/compiler/backend/nemo/NemoBackend.java
 
b/compiler/backend/src/main/java/edu/snu/nemo/compiler/backend/nemo/NemoBackend.java
index 4789a5c59..696acffd3 100644
--- 
a/compiler/backend/src/main/java/edu/snu/nemo/compiler/backend/nemo/NemoBackend.java
+++ 
b/compiler/backend/src/main/java/edu/snu/nemo/compiler/backend/nemo/NemoBackend.java
@@ -61,6 +61,6 @@ public PhysicalPlan compile(final DAG<IRVertex, IREdge> 
irDAG) throws Exception
   public PhysicalPlan compile(final DAG<IRVertex, IREdge> irDAG,
                               final PhysicalPlanGenerator 
physicalPlanGenerator) {
     final DAG<Stage, StageEdge> stageDAG = physicalPlanGenerator.apply(irDAG);
-    return new PhysicalPlan(RuntimeIdManager.generatePhysicalPlanId(), irDAG, 
stageDAG);
+    return new PhysicalPlan(RuntimeIdManager.generatePhysicalPlanId(), 
stageDAG);
   }
 }
diff --git 
a/runtime/common/src/main/java/edu/snu/nemo/runtime/common/eventhandler/DynamicOptimizationEvent.java
 
b/runtime/common/src/main/java/edu/snu/nemo/runtime/common/eventhandler/DynamicOptimizationEvent.java
index 9a8b26a04..341f0d164 100644
--- 
a/runtime/common/src/main/java/edu/snu/nemo/runtime/common/eventhandler/DynamicOptimizationEvent.java
+++ 
b/runtime/common/src/main/java/edu/snu/nemo/runtime/common/eventhandler/DynamicOptimizationEvent.java
@@ -19,8 +19,8 @@
 package edu.snu.nemo.runtime.common.eventhandler;
 
 import edu.snu.nemo.common.eventhandler.RuntimeEvent;
-import edu.snu.nemo.common.ir.edge.IREdge;
 import edu.snu.nemo.runtime.common.plan.PhysicalPlan;
+import edu.snu.nemo.runtime.common.plan.StageEdge;
 
 /**
  * An event for triggering dynamic optimization.
@@ -30,7 +30,7 @@
   private final Object dynOptData;
   private final String taskId;
   private final String executorId;
-  private final IREdge targetEdge;
+  private final StageEdge targetEdge;
 
   /**
    * Default constructor.
@@ -42,7 +42,7 @@ public DynamicOptimizationEvent(final PhysicalPlan 
physicalPlan,
                                   final Object dynOptData,
                                   final String taskId,
                                   final String executorId,
-                                  final IREdge targetEdge) {
+                                  final StageEdge targetEdge) {
     this.physicalPlan = physicalPlan;
     this.taskId = taskId;
     this.dynOptData = dynOptData;
@@ -75,7 +75,7 @@ public Object getDynOptData() {
     return this.dynOptData;
   }
 
-  public IREdge getTargetEdge() {
+  public StageEdge getTargetEdge() {
     return this.targetEdge;
   }
 }
diff --git 
a/runtime/common/src/main/java/edu/snu/nemo/runtime/common/eventhandler/DynamicOptimizationEventHandler.java
 
b/runtime/common/src/main/java/edu/snu/nemo/runtime/common/eventhandler/DynamicOptimizationEventHandler.java
index a642248cf..9cd4ca23c 100644
--- 
a/runtime/common/src/main/java/edu/snu/nemo/runtime/common/eventhandler/DynamicOptimizationEventHandler.java
+++ 
b/runtime/common/src/main/java/edu/snu/nemo/runtime/common/eventhandler/DynamicOptimizationEventHandler.java
@@ -20,9 +20,9 @@
 
 import edu.snu.nemo.common.eventhandler.PubSubEventHandlerWrapper;
 import edu.snu.nemo.common.eventhandler.RuntimeEventHandler;
-import edu.snu.nemo.common.ir.edge.IREdge;
 import edu.snu.nemo.runtime.common.optimizer.RunTimeOptimizer;
 import edu.snu.nemo.runtime.common.plan.PhysicalPlan;
+import edu.snu.nemo.runtime.common.plan.StageEdge;
 import org.apache.reef.wake.impl.PubSubEventHandler;
 
 import javax.inject.Inject;
@@ -51,7 +51,7 @@ private DynamicOptimizationEventHandler(final 
PubSubEventHandlerWrapper pubSubEv
   public void onNext(final DynamicOptimizationEvent dynamicOptimizationEvent) {
     final PhysicalPlan physicalPlan = 
dynamicOptimizationEvent.getPhysicalPlan();
     final Object dynOptData = dynamicOptimizationEvent.getDynOptData();
-    final IREdge targetEdge = dynamicOptimizationEvent.getTargetEdge();
+    final StageEdge targetEdge = dynamicOptimizationEvent.getTargetEdge();
 
     final PhysicalPlan newPlan = 
RunTimeOptimizer.dynamicOptimization(physicalPlan, dynOptData, targetEdge);
 
diff --git 
a/runtime/common/src/main/java/edu/snu/nemo/runtime/common/optimizer/RunTimeOptimizer.java
 
b/runtime/common/src/main/java/edu/snu/nemo/runtime/common/optimizer/RunTimeOptimizer.java
index d9ca02411..9611326a1 100644
--- 
a/runtime/common/src/main/java/edu/snu/nemo/runtime/common/optimizer/RunTimeOptimizer.java
+++ 
b/runtime/common/src/main/java/edu/snu/nemo/runtime/common/optimizer/RunTimeOptimizer.java
@@ -16,17 +16,9 @@
 package edu.snu.nemo.runtime.common.optimizer;
 
 import edu.snu.nemo.common.Pair;
-import edu.snu.nemo.common.dag.DAG;
-import edu.snu.nemo.common.ir.edge.IREdge;
-import edu.snu.nemo.common.ir.vertex.IRVertex;
-import edu.snu.nemo.runtime.common.RuntimeIdManager;
 import edu.snu.nemo.runtime.common.optimizer.pass.runtime.DataSkewRuntimePass;
 import edu.snu.nemo.runtime.common.plan.PhysicalPlan;
-import edu.snu.nemo.runtime.common.plan.PhysicalPlanGenerator;
-import edu.snu.nemo.runtime.common.plan.Stage;
 import edu.snu.nemo.runtime.common.plan.StageEdge;
-import org.apache.reef.tang.Tang;
-import org.apache.reef.tang.exceptions.InjectionException;
 
 import java.util.*;
 
@@ -48,22 +40,12 @@ private RunTimeOptimizer() {
   public static synchronized PhysicalPlan dynamicOptimization(
           final PhysicalPlan originalPlan,
           final Object dynOptData,
-          final IREdge targetEdge) {
-    try {
-      final PhysicalPlanGenerator physicalPlanGenerator =
-          
Tang.Factory.getTang().newInjector().getInstance(PhysicalPlanGenerator.class);
-
-      // Data for dynamic optimization used in DataSkewRuntimePass
-      // is a map of <hash value, partition size>.
-      final DAG<IRVertex, IREdge> newIrDAG =
-          new DataSkewRuntimePass()
-              .apply(originalPlan.getIrDAG(), Pair.of(targetEdge, 
(Map<Integer, Long>) dynOptData));
-      final DAG<Stage, StageEdge> stageDAG = 
physicalPlanGenerator.apply(newIrDAG);
-      final PhysicalPlan physicalPlan =
-          new PhysicalPlan(RuntimeIdManager.generatePhysicalPlanId(), 
newIrDAG, stageDAG);
-      return physicalPlan;
-    } catch (final InjectionException e) {
-      throw new RuntimeException(e);
-    }
+          final StageEdge targetEdge) {
+    // Data for dynamic optimization used in DataSkewRuntimePass
+    // is a map of <hash value, partition size>.
+    final PhysicalPlan physicalPlan =
+      new DataSkewRuntimePass()
+        .apply(originalPlan, Pair.of(targetEdge, (Map<Integer, Long>) 
dynOptData));
+    return physicalPlan;
   }
 }
diff --git 
a/runtime/common/src/main/java/edu/snu/nemo/runtime/common/optimizer/pass/runtime/DataSkewRuntimePass.java
 
b/runtime/common/src/main/java/edu/snu/nemo/runtime/common/optimizer/pass/runtime/DataSkewRuntimePass.java
index 67afbad6a..572cbd167 100644
--- 
a/runtime/common/src/main/java/edu/snu/nemo/runtime/common/optimizer/pass/runtime/DataSkewRuntimePass.java
+++ 
b/runtime/common/src/main/java/edu/snu/nemo/runtime/common/optimizer/pass/runtime/DataSkewRuntimePass.java
@@ -16,19 +16,17 @@
 package edu.snu.nemo.runtime.common.optimizer.pass.runtime;
 
 import com.google.common.annotations.VisibleForTesting;
-import edu.snu.nemo.common.DataSkewMetricFactory;
 import edu.snu.nemo.common.Pair;
 import edu.snu.nemo.common.dag.DAG;
 import edu.snu.nemo.common.eventhandler.RuntimeEventHandler;
 
-import edu.snu.nemo.common.ir.edge.IREdge;
-import edu.snu.nemo.common.ir.edge.executionproperty.DataSkewMetricProperty;
-import edu.snu.nemo.common.ir.edge.executionproperty.MetricCollectionProperty;
-import edu.snu.nemo.common.ir.vertex.IRVertex;
 import edu.snu.nemo.common.ir.vertex.executionproperty.ParallelismProperty;
 import edu.snu.nemo.common.KeyRange;
 import edu.snu.nemo.common.HashRange;
 import 
edu.snu.nemo.runtime.common.eventhandler.DynamicOptimizationEventHandler;
+import edu.snu.nemo.runtime.common.plan.PhysicalPlan;
+import edu.snu.nemo.runtime.common.plan.Stage;
+import edu.snu.nemo.runtime.common.plan.StageEdge;
 import org.slf4j.Logger;
 import org.slf4j.LoggerFactory;
 
@@ -41,7 +39,7 @@
  * this RuntimePass identifies a number of keys with big partition 
sizes(skewed key)
  * and evenly redistributes data via overwriting incoming edges of destination 
tasks.
  */
-public final class DataSkewRuntimePass extends RuntimePass<Pair<IREdge, 
Map<Integer, Long>>> {
+public final class DataSkewRuntimePass extends RuntimePass<Pair<StageEdge, 
Map<Integer, Long>>> {
   private static final Logger LOG = 
LoggerFactory.getLogger(DataSkewRuntimePass.class.getName());
   private final Set<Class<? extends RuntimeEventHandler>> eventHandlers;
   // Skewed keys denote for top n keys in terms of partition size.
@@ -72,16 +70,9 @@ public DataSkewRuntimePass setNumSkewedKeys(final int 
numOfSkewedKeys) {
   }
 
   @Override
-  public DAG<IRVertex, IREdge> apply(final DAG<IRVertex, IREdge> irDAG,
-                                     final Pair<IREdge, Map<Integer, Long>> 
metricData) {
-    // get edges to optimize
-    final List<IREdge> optimizationEdges = irDAG.getVertices().stream()
-        .flatMap(v -> irDAG.getIncomingEdgesOf(v).stream())
-        .filter(e -> 
Optional.of(MetricCollectionProperty.Value.DataSkewRuntimePass)
-            .equals(e.getPropertyValue(MetricCollectionProperty.class)))
-        .collect(Collectors.toList());
-
-    final IREdge targetEdge = metricData.left();
+  public PhysicalPlan apply(final PhysicalPlan originalPlan,
+                            final Pair<StageEdge, Map<Integer, Long>> 
metricData) {
+    final StageEdge targetEdge = metricData.left();
     // Get number of evaluators of the next stage (number of blocks).
     final Integer dstParallelism = 
targetEdge.getDst().getPropertyValue(ParallelismProperty.class).get();
 
@@ -91,9 +82,19 @@ public DataSkewRuntimePass setNumSkewedKeys(final int 
numOfSkewedKeys) {
     for (int i = 0; i < dstParallelism; i++) {
       taskIdxToKeyRange.put(i, keyRanges.get(i));
     }
+
     // Overwrite the previously assigned key range in the physical DAG with 
the new range.
-    targetEdge.setProperty(DataSkewMetricProperty.of(new 
DataSkewMetricFactory(taskIdxToKeyRange)));
-    return irDAG;
+    final DAG<Stage, StageEdge> stageDAG = originalPlan.getStageDAG();
+    for (Stage stage : stageDAG.getVertices()) {
+      List<StageEdge> stageEdges = stageDAG.getOutgoingEdgesOf(stage);
+      for (StageEdge edge : stageEdges) {
+        if (edge.equals(targetEdge)) {
+          edge.setTaskIdxToKeyRange(taskIdxToKeyRange);
+        }
+      }
+    }
+
+    return new PhysicalPlan(originalPlan.getId(), stageDAG);
   }
 
   public List<Integer> identifySkewedKeys(final Map<Integer, Long> 
keyValToPartitionSizeMap) {
diff --git 
a/runtime/common/src/main/java/edu/snu/nemo/runtime/common/optimizer/pass/runtime/RuntimePass.java
 
b/runtime/common/src/main/java/edu/snu/nemo/runtime/common/optimizer/pass/runtime/RuntimePass.java
index 5a8471fba..f30403f84 100644
--- 
a/runtime/common/src/main/java/edu/snu/nemo/runtime/common/optimizer/pass/runtime/RuntimePass.java
+++ 
b/runtime/common/src/main/java/edu/snu/nemo/runtime/common/optimizer/pass/runtime/RuntimePass.java
@@ -15,11 +15,9 @@
  */
 package edu.snu.nemo.runtime.common.optimizer.pass.runtime;
 
-import edu.snu.nemo.common.dag.DAG;
 import edu.snu.nemo.common.eventhandler.RuntimeEventHandler;
-import edu.snu.nemo.common.ir.edge.IREdge;
-import edu.snu.nemo.common.ir.vertex.IRVertex;
 import edu.snu.nemo.common.pass.Pass;
+import edu.snu.nemo.runtime.common.plan.PhysicalPlan;
 
 import java.util.Set;
 import java.util.function.BiFunction;
@@ -31,7 +29,7 @@
  * @param <T> type of the metric data used for dynamic optimization.
  */
 public abstract class RuntimePass<T> extends Pass
-    implements BiFunction<DAG<IRVertex, IREdge>, T, DAG<IRVertex, IREdge>> {
+    implements BiFunction<PhysicalPlan, T, PhysicalPlan> {
   /**
    * @return the set of event handlers used with the runtime pass.
    */
diff --git 
a/runtime/common/src/main/java/edu/snu/nemo/runtime/common/plan/PhysicalPlan.java
 
b/runtime/common/src/main/java/edu/snu/nemo/runtime/common/plan/PhysicalPlan.java
index 4286bca0c..30f71110d 100644
--- 
a/runtime/common/src/main/java/edu/snu/nemo/runtime/common/plan/PhysicalPlan.java
+++ 
b/runtime/common/src/main/java/edu/snu/nemo/runtime/common/plan/PhysicalPlan.java
@@ -16,7 +16,6 @@
 package edu.snu.nemo.runtime.common.plan;
 
 import edu.snu.nemo.common.dag.DAG;
-import edu.snu.nemo.common.ir.edge.IREdge;
 import edu.snu.nemo.common.ir.vertex.IRVertex;
 
 import java.io.Serializable;
@@ -28,7 +27,6 @@
  */
 public final class PhysicalPlan implements Serializable {
   private final String id;
-  private final DAG<IRVertex, IREdge> irDAG;
   private final DAG<Stage, StageEdge> stageDAG;
   private final Map<String, IRVertex> idToIRVertex;
 
@@ -39,10 +37,8 @@
    * @param stageDAG        the DAG of stages.
    */
   public PhysicalPlan(final String id,
-                      final DAG<IRVertex, IREdge> irDAG,
                       final DAG<Stage, StageEdge> stageDAG) {
     this.id = id;
-    this.irDAG = irDAG;
     this.stageDAG = stageDAG;
 
     idToIRVertex = new HashMap<>();
@@ -74,13 +70,6 @@ public String getId() {
     return idToIRVertex;
   }
 
-  /**
-   * @return IR DAG.
-   */
-  public DAG<IRVertex, IREdge> getIrDAG() {
-    return irDAG;
-  }
-
   @Override
   public String toString() {
     return stageDAG.toString();
diff --git 
a/runtime/common/src/main/java/edu/snu/nemo/runtime/common/plan/StageEdge.java 
b/runtime/common/src/main/java/edu/snu/nemo/runtime/common/plan/StageEdge.java
index 5218b8fea..ced756457 100644
--- 
a/runtime/common/src/main/java/edu/snu/nemo/runtime/common/plan/StageEdge.java
+++ 
b/runtime/common/src/main/java/edu/snu/nemo/runtime/common/plan/StageEdge.java
@@ -16,11 +16,17 @@
 package edu.snu.nemo.runtime.common.plan;
 
 import com.google.common.annotations.VisibleForTesting;
+import edu.snu.nemo.common.HashRange;
+import edu.snu.nemo.common.KeyRange;
 import 
edu.snu.nemo.common.ir.edge.executionproperty.CommunicationPatternProperty;
 import edu.snu.nemo.common.ir.edge.executionproperty.DataFlowProperty;
 import edu.snu.nemo.common.ir.executionproperty.EdgeExecutionProperty;
 import edu.snu.nemo.common.ir.vertex.IRVertex;
 import edu.snu.nemo.common.ir.executionproperty.ExecutionPropertyMap;
+import org.apache.commons.lang3.builder.HashCodeBuilder;
+
+import java.util.HashMap;
+import java.util.Map;
 
 /**
  * Edge of a stage that connects an IRVertex of the source stage to an 
IRVertex of the destination stage.
@@ -39,6 +45,11 @@
    */
   private final IRVertex dstVertex;
 
+  /**
+   * The list between the task idx and key range to read.
+   */
+  private Map<Integer, KeyRange> taskIdxToKeyRange;
+
   /**
    * Value for {@link CommunicationPatternProperty}.
    */
@@ -71,6 +82,11 @@ public StageEdge(final String runtimeEdgeId,
     super(runtimeEdgeId, edgeProperties, srcStage, dstStage, isSideInput);
     this.srcVertex = srcVertex;
     this.dstVertex = dstVertex;
+    // Initialize the key range of each dst task.
+    this.taskIdxToKeyRange = new HashMap<>();
+    for (int taskIdx = 0; taskIdx < dstStage.getParallelism(); taskIdx++) {
+      taskIdxToKeyRange.put(taskIdx, HashRange.of(taskIdx, taskIdx + 1, 
false));
+    }
     this.dataCommunicationPatternValue = 
edgeProperties.get(CommunicationPatternProperty.class)
         .orElseThrow(() -> new RuntimeException(String.format(
             "CommunicationPatternProperty not set for %s", runtimeEdgeId)));
@@ -109,6 +125,52 @@ public String toString() {
     return propertiesToJSON();
   }
 
+  /**
+   * @return the list between the task idx and key range to read.
+   */
+  public Map<Integer, KeyRange> getTaskIdxToKeyRange() {
+    return taskIdxToKeyRange;
+  }
+
+  /**
+   * Sets the task idx to key range list.
+   *
+   * @param taskIdxToKeyRange the list to set.
+   */
+  public void setTaskIdxToKeyRange(final Map<Integer, KeyRange> 
taskIdxToKeyRange) {
+    this.taskIdxToKeyRange = taskIdxToKeyRange;
+  }
+
+  /**
+   * @param edge edge to compare.
+   * @return whether or not the edge has the same itinerary
+   */
+  public Boolean hasSameItineraryAs(final StageEdge edge) {
+    return getSrc().equals(edge.getSrc()) && getDst().equals(edge.getDst());
+  }
+
+  @Override
+  public boolean equals(final Object o) {
+    if (this == o) {
+      return true;
+    }
+    if (o == null || getClass() != o.getClass()) {
+      return false;
+    }
+
+    final StageEdge stageEdge = (StageEdge) o;
+    return getExecutionProperties().equals(stageEdge.getExecutionProperties()) 
&& hasSameItineraryAs(stageEdge);
+  }
+
+  @Override
+  public int hashCode() {
+    return new HashCodeBuilder(17, 37)
+      .append(getSrc().hashCode())
+      .append(getDst().hashCode())
+      .append(getExecutionProperties())
+      .toHashCode();
+  }
+
   /**
    * @return {@link CommunicationPatternProperty} value.
    */
diff --git 
a/runtime/executor/src/main/java/edu/snu/nemo/runtime/executor/datatransfer/InputReader.java
 
b/runtime/executor/src/main/java/edu/snu/nemo/runtime/executor/datatransfer/InputReader.java
index eadd8bf50..763766004 100644
--- 
a/runtime/executor/src/main/java/edu/snu/nemo/runtime/executor/datatransfer/InputReader.java
+++ 
b/runtime/executor/src/main/java/edu/snu/nemo/runtime/executor/datatransfer/InputReader.java
@@ -16,8 +16,6 @@
 package edu.snu.nemo.runtime.executor.datatransfer;
 
 import com.google.common.annotations.VisibleForTesting;
-import edu.snu.nemo.common.DataSkewMetricFactory;
-import edu.snu.nemo.common.ir.edge.executionproperty.*;
 import 
edu.snu.nemo.common.ir.edge.executionproperty.CommunicationPatternProperty;
 import edu.snu.nemo.common.ir.edge.executionproperty.DataStoreProperty;
 import 
edu.snu.nemo.common.ir.edge.executionproperty.DuplicateEdgeGroupProperty;
@@ -117,9 +115,8 @@ public InputReader(final int dstTaskIndex,
     assert (runtimeEdge instanceof StageEdge);
     final Optional<DataStoreProperty.Value> dataStoreProperty
         = runtimeEdge.getPropertyValue(DataStoreProperty.class);
-    final DataSkewMetricFactory metricFactory =
-        (DataSkewMetricFactory) 
runtimeEdge.getExecutionProperties().get(DataSkewMetricProperty.class).get();
-    final KeyRange hashRangeToRead = 
metricFactory.getMetric().get(dstTaskIndex);
+    ((StageEdge) runtimeEdge).getTaskIdxToKeyRange().get(dstTaskIndex);
+    final KeyRange hashRangeToRead = ((StageEdge) 
runtimeEdge).getTaskIdxToKeyRange().get(dstTaskIndex);
     if (hashRangeToRead == null) {
       throw new BlockFetchException(
           new Throwable("The hash range to read is not assigned to " + 
dstTaskIndex + "'th task"));
diff --git 
a/runtime/master/src/main/java/edu/snu/nemo/runtime/master/scheduler/BatchScheduler.java
 
b/runtime/master/src/main/java/edu/snu/nemo/runtime/master/scheduler/BatchScheduler.java
index 71e7d3fa7..243f7396c 100644
--- 
a/runtime/master/src/main/java/edu/snu/nemo/runtime/master/scheduler/BatchScheduler.java
+++ 
b/runtime/master/src/main/java/edu/snu/nemo/runtime/master/scheduler/BatchScheduler.java
@@ -19,9 +19,7 @@
 import edu.snu.nemo.common.Pair;
 import edu.snu.nemo.common.eventhandler.PubSubEventHandlerWrapper;
 import edu.snu.nemo.common.ir.Readable;
-import edu.snu.nemo.common.ir.edge.IREdge;
 import edu.snu.nemo.common.ir.edge.executionproperty.MetricCollectionProperty;
-import edu.snu.nemo.common.ir.vertex.IRVertex;
 import edu.snu.nemo.runtime.common.RuntimeIdManager;
 import edu.snu.nemo.runtime.common.eventhandler.DynamicOptimizationEvent;
 import edu.snu.nemo.runtime.common.plan.*;
@@ -330,27 +328,22 @@ private void onTaskExecutionComplete(final String 
executorId,
     });
   }
 
-  private IREdge getEdgeToOptimize(final String taskId) {
+  private StageEdge getEdgeToOptimize(final String taskId) {
     // Get a stage including the given task
     final Stage stagePutOnHold = 
physicalPlan.getStageDAG().getVertices().stream()
-        .filter(stage -> 
stage.getId().equals(RuntimeIdManager.getStageIdFromTaskId(taskId)))
-        .findFirst()
-        .orElseThrow(() -> new RuntimeException());
+      .filter(stage -> 
stage.getId().equals(RuntimeIdManager.getStageIdFromTaskId(taskId)))
+      .findFirst()
+      .orElseThrow(() -> new RuntimeException());
 
     // Get outgoing edges of that stage with MetricCollectionProperty
     List<StageEdge> stageEdges = 
physicalPlan.getStageDAG().getOutgoingEdgesOf(stagePutOnHold);
-    IREdge targetEdge = null;
     for (StageEdge edge : stageEdges) {
-      final IRVertex srcIRVertex = edge.getSrcIRVertex();
-      final IRVertex dstIRVertex = edge.getDstIRVertex();
-      targetEdge = physicalPlan.getIrDAG().getEdgeBetween(srcIRVertex.getId(), 
dstIRVertex.getId());
-      if (MetricCollectionProperty.Value.DataSkewRuntimePass
-          
.equals(targetEdge.getPropertyValue(MetricCollectionProperty.class).get())) {
-        break;
+      if 
(edge.getExecutionProperties().containsKey(MetricCollectionProperty.class)) {
+        return edge;
       }
     }
 
-    return targetEdge;
+    return null;
   }
 
   /**
@@ -370,7 +363,7 @@ private void onTaskExecutionOnHold(final String executorId,
     final boolean stageComplete =
         
planStateManager.getStageState(stageIdForTaskUponCompletion).equals(StageState.State.COMPLETE);
 
-    final IREdge targetEdge = getEdgeToOptimize(taskId);
+    final StageEdge targetEdge = getEdgeToOptimize(taskId);
     if (targetEdge == null) {
       throw new RuntimeException("No edges specified for data skew 
optimization");
     }
diff --git 
a/runtime/test/src/main/java/edu/snu/nemo/runtime/common/plan/TestPlanGenerator.java
 
b/runtime/test/src/main/java/edu/snu/nemo/runtime/common/plan/TestPlanGenerator.java
index 3c23dcfec..3b3646eb1 100644
--- 
a/runtime/test/src/main/java/edu/snu/nemo/runtime/common/plan/TestPlanGenerator.java
+++ 
b/runtime/test/src/main/java/edu/snu/nemo/runtime/common/plan/TestPlanGenerator.java
@@ -95,7 +95,7 @@ private static PhysicalPlan convertIRToPhysical(final 
DAG<IRVertex, IREdge> irDA
                                                   final Policy policy) throws 
Exception {
     final DAG<IRVertex, IREdge> optimized = 
policy.runCompileTimeOptimization(irDAG, EMPTY_DAG_DIRECTORY);
     final DAG<Stage, StageEdge> physicalDAG = PLAN_GENERATOR.apply(optimized);
-    return new PhysicalPlan("TestPlan", irDAG, physicalDAG);
+    return new PhysicalPlan("TestPlan", physicalDAG);
   }
 
   /**


 

----------------------------------------------------------------
This is an automated message from the Apache Git Service.
To respond to the message, please log on GitHub and use the
URL above to go to the specific comment.
 
For queries about this service, please contact Infrastructure at:
us...@infra.apache.org


With regards,
Apache Git Services

Reply via email to