sanha commented on a change in pull request #1: [NEMO-26] Implement 
SourceLocationAwareSchedulingPolicy
URL: https://github.com/apache/incubator-nemo/pull/1#discussion_r172728122
 
 

 ##########
 File path: 
runtime/master/src/main/java/edu/snu/nemo/runtime/master/scheduler/BatchSingleJobScheduler.java
 ##########
 @@ -526,8 +403,133 @@ private PhysicalStage getStageById(final String stageId) 
{
     throw new RuntimeException(new Throwable("This taskGroupId does not exist 
in the plan"));
   }
 
-  @Override
-  public void terminate() {
-    // nothing to do yet.
+  /**
+   * Action after task group execution has been completed, not after it has 
been put on hold.
+   *
+   * @param executorId  the ID of the executor.
+   * @param taskGroupId the ID pf the task group completed.
+   */
+  private void onTaskGroupExecutionComplete(final String executorId,
+                                            final String taskGroupId) {
+    onTaskGroupExecutionComplete(executorId, taskGroupId, false);
+  }
+
+  /**
+   * Action after task group execution has been completed.
+   * @param executorId id of the executor.
+   * @param taskGroupId the ID of the task group completed.
+   * @param isOnHoldToComplete whether or not if it is switched to complete 
after it has been on hold.
+   */
+  private void onTaskGroupExecutionComplete(final String executorId,
+                                            final String taskGroupId,
+                                            final Boolean isOnHoldToComplete) {
+    LOG.debug("{} completed in {}", new Object[]{taskGroupId, executorId});
+    if (!isOnHoldToComplete) {
+      schedulingPolicy.onTaskGroupExecutionComplete(executorId, taskGroupId);
+    }
+
+    final String stageIdForTaskGroupUponCompletion = 
RuntimeIdGenerator.getStageIdFromTaskGroupId(taskGroupId);
+    if 
(jobStateManager.checkStageCompletion(stageIdForTaskGroupUponCompletion)) {
+      // if the stage this task group belongs to is complete,
+      if (!jobStateManager.checkJobTermination()) { // and if the job is not 
yet complete or failed,
+        scheduleNextStage(stageIdForTaskGroupUponCompletion);
+      }
+    }
+  }
+
+  /**
+   * Action for after task group execution is put on hold.
+   * @param executorId     the ID of the executor.
+   * @param taskGroupId    the ID of the task group.
+   * @param taskPutOnHold  the ID of task that is put on hold.
+   */
+  private void onTaskGroupExecutionOnHold(final String executorId,
+                                          final String taskGroupId,
+                                          final String taskPutOnHold) {
+    LOG.info("{} put on hold in {}", new Object[]{taskGroupId, executorId});
+    schedulingPolicy.onTaskGroupExecutionComplete(executorId, taskGroupId);
+    final String stageIdForTaskGroupUponCompletion = 
RuntimeIdGenerator.getStageIdFromTaskGroupId(taskGroupId);
+
+    final boolean stageComplete =
+        
jobStateManager.checkStageCompletion(stageIdForTaskGroupUponCompletion);
+
+    if (stageComplete) {
+      // get optimization vertex from the task.
+      final MetricCollectionBarrierVertex metricCollectionBarrierVertex =
+          getTaskGroupDagById(taskGroupId).getVertices().stream() // get tasks 
list
+              .filter(task -> task.getId().equals(taskPutOnHold)) // find it
+              .map(physicalPlan::getIRVertexOf) // get the corresponding 
IRVertex, the MetricCollectionBarrierVertex
+              .filter(irVertex -> irVertex instanceof 
MetricCollectionBarrierVertex)
+              .distinct()
+              .map(irVertex -> (MetricCollectionBarrierVertex) irVertex) // 
convert types
+              .findFirst().orElseThrow(() -> new 
RuntimeException(ON_HOLD.name() // get it
+              + " called with failed task ids by some other task than "
+              + MetricCollectionBarrierTask.class.getSimpleName()));
+      // and we will use this vertex to perform metric collection and dynamic 
optimization.
+
+      pubSubEventHandlerWrapper.getPubSubEventHandler().onNext(
+          new DynamicOptimizationEvent(physicalPlan, 
metricCollectionBarrierVertex, Pair.of(executorId, taskGroupId)));
+    } else {
+      onTaskGroupExecutionComplete(executorId, taskGroupId, true);
+    }
+  }
+
+  private void onTaskGroupExecutionFailedRecoverable(final String executorId, 
final String taskGroupId,
 
 Review comment:
   Please add a comment about this method.

----------------------------------------------------------------
This is an automated message from the Apache Git Service.
To respond to the message, please log on GitHub and use the
URL above to go to the specific comment.
 
For queries about this service, please contact Infrastructure at:
us...@infra.apache.org


With regards,
Apache Git Services

Reply via email to