wonook commented on a change in pull request #111: [NEMO-139, 6] Logic in the 
scheduler for appending jobs, Support RDD caching
URL: https://github.com/apache/incubator-nemo/pull/111#discussion_r211161433
 
 

 ##########
 File path: 
runtime/master/src/main/java/edu/snu/nemo/runtime/master/scheduler/BatchScheduler.java
 ##########
 @@ -92,48 +95,75 @@ private BatchScheduler(final TaskDispatcher taskDispatcher,
           .subscribe(updatePhysicalPlanEventHandler.getEventClass(), 
updatePhysicalPlanEventHandler);
     }
     this.executorRegistry = executorRegistry;
+    this.planStateManager = planStateManager;
     this.dynOptDataHandlers = new ArrayList<>();
     dynOptDataHandlers.add(new DataSkewDynOptDataHandler());
   }
 
   /**
+   * Schedules a given plan.
+   * If multiple physical plans are submitted, they will be appended and 
handled as a single plan.
+   * TODO #182: Consider reshaping in run-time optimization. At now, we only 
consider plan appending.
+   *
    * @param submittedPhysicalPlan the physical plan to schedule.
-   * @param submittedPlanStateManager the state manager of the plan.
+   * @param maxScheduleAttempt    the max number of times this plan/sub-part 
of the plan should be attempted.
    */
   @Override
-  public void schedulePlan(final PhysicalPlan submittedPhysicalPlan, final 
PlanStateManager submittedPlanStateManager) {
-    this.physicalPlan = submittedPhysicalPlan;
-    this.planStateManager = submittedPlanStateManager;
-
-    taskDispatcher.run(this.planStateManager);
-    LOG.info("Plan to schedule: {}", this.physicalPlan.getId());
-
-    this.sortedScheduleGroups = 
this.physicalPlan.getStageDAG().getVertices().stream()
-        .collect(Collectors.groupingBy(Stage::getScheduleGroup))
-        .entrySet().stream()
-        .sorted(Map.Entry.comparingByKey())
-        .map(Map.Entry::getValue)
-        .collect(Collectors.toList());
+  public void schedulePlan(final PhysicalPlan submittedPhysicalPlan,
+                           final int maxScheduleAttempt) {
+    LOG.info("Plan to schedule: {}", submittedPhysicalPlan.getPlanId());
+
+    if (!planStateManager.isInitialized()) {
+      // First scheduling.
+      taskDispatcher.run();
+      updatePlan(submittedPhysicalPlan, maxScheduleAttempt);
+      planStateManager.storeJSON("submitted");
+    } else {
+      // Append the submitted plan to the original plan.
+      final PhysicalPlan appendedPlan =
+          PlanAppender.appendPlan(planStateManager.getPhysicalPlan(), 
submittedPhysicalPlan);
+      updatePlan(appendedPlan, maxScheduleAttempt);
+      planStateManager.storeJSON("appended");
 
 Review comment:
   We can have multiple `appended` plans. Can we have a unique id for each of 
them?

----------------------------------------------------------------
This is an automated message from the Apache Git Service.
To respond to the message, please log on GitHub and use the
URL above to go to the specific comment.
 
For queries about this service, please contact Infrastructure at:
[email protected]


With regards,
Apache Git Services

Reply via email to