johnyangk commented on a change in pull request #99: [NEMO-64] Fix map stage 
hang under DataSkewPolicy
URL: https://github.com/apache/incubator-nemo/pull/99#discussion_r209152438
 
 

 ##########
 File path: 
runtime/common/src/main/java/edu/snu/nemo/runtime/common/optimizer/RunTimeOptimizer.java
 ##########
 @@ -36,24 +42,25 @@ private RunTimeOptimizer() {
   /**
    * Dynamic optimization method to process the dag with an appropriate pass, 
decided by the stats.
    * @param originalPlan original physical execution plan.
-   * @param metricCollectionBarrierVertex the vertex that collects metrics and 
chooses which optimization to perform.
    * @return the newly updated optimized physical plan.
    */
   public static synchronized PhysicalPlan dynamicOptimization(
           final PhysicalPlan originalPlan,
-          final MetricCollectionBarrierVertex metricCollectionBarrierVertex) {
-    final DynamicOptimizationProperty.Value dynamicOptimizationType =
-        
metricCollectionBarrierVertex.getPropertyValue(DynamicOptimizationProperty.class).get();
+          final Object metric) {
+    try {
+      final PhysicalPlanGenerator physicalPlanGenerator =
+          
Tang.Factory.getTang().newInjector().getInstance(PhysicalPlanGenerator.class);
 
-    switch (dynamicOptimizationType) {
-      case DataSkewRuntimePass:
-        // Metric data for DataSkewRuntimePass is a pair of blockIds and map 
of hashrange, partition size.
-        final Pair<List<String>, Map<Integer, Long>> metricData =
-            Pair.of(metricCollectionBarrierVertex.getBlockIds(),
-                (Map<Integer, Long>) 
metricCollectionBarrierVertex.getMetricData());
-        return new DataSkewRuntimePass().apply(originalPlan, metricData);
-      default:
-        throw new UnsupportedOperationException("Unknown runtime pass: " + 
dynamicOptimizationType);
+      // Metric data for DataSkewRuntimePass is a map of <hashrange, partition 
size>.
+      final Map<Integer, Long> metricData = (Map<Integer, Long>) metric;
+      final DAG<IRVertex, IREdge> newIrDAG =
+          new DataSkewRuntimePass().apply(originalPlan.getIrDAG(), metricData);
 
 Review comment:
   add a TODO here?
   `DataSkewRuntimePass` is hard-coded.

----------------------------------------------------------------
This is an automated message from the Apache Git Service.
To respond to the message, please log on GitHub and use the
URL above to go to the specific comment.
 
For queries about this service, please contact Infrastructure at:
[email protected]


With regards,
Apache Git Services

Reply via email to