phet commented on code in PR #4037:
URL: https://github.com/apache/gobblin/pull/4037#discussion_r1731756895


##########
gobblin-service/src/main/java/org/apache/gobblin/service/modules/orchestration/proc/DagProcUtils.java:
##########
@@ -291,4 +293,81 @@ public static void 
removeFlowFinishDeadlineDagAction(DagManagementStateStore dag
       log.warn("Failed to delete dag action {}", 
enforceFlowFinishDeadlineDagAction);
     }
   }
+
+  /**
+   * Returns true if all dag nodes are finished, and it is not possible to run 
any new dag node.
+   * If failure option is {@link 
org.apache.gobblin.service.modules.orchestration.DagManager.FailureOption#FINISH_RUNNING},
+   * no new jobs should be orchestrated, so even if some job can run, dag 
should be considered finished.
+   */
+  public static boolean isDagFinished(Dag<JobExecutionPlan> dag) {
+    /*
+    The algo for this method is that it adds all the dag nodes into a set 
`canRun` that signifies all the nodes that can
+    run in this dag. This also includes all the jobs that are completed. It 
scans all the nodes and if the node is
+    completed it adds it to the `completed` set; if the node is 
failed/cancelled it removes all its dependant nodes from
+    `canRun` set. In the end if there are more nodes that "canRun" than 
"completed", dag is not finished.
+    For FINISH_RUNNING failure option, there is an additional condition that 
all the remaining `canRun` jobs should already
+    be running/orchestrated/pending_retry/pending_resume. Basically they 
should already be out of PENDING state, in order
+    for dag to be considered "NOT FINISHED".
+     */
+    List<Dag.DagNode<JobExecutionPlan>> nodes = dag.getNodes();
+    Set<Dag.DagNode<JobExecutionPlan>> canRun = new HashSet<>(nodes);
+    Set<Dag.DagNode<JobExecutionPlan>> completed = new HashSet<>();
+    DagManager.FailureOption failureOption = 
DagManagerUtils.getFailureOption(dag);
+    boolean anyFailure = false;
+
+    for (Dag.DagNode<JobExecutionPlan> node : nodes) {
+      if (!canRun.contains(node)) {
+        continue;
+      }
+      ExecutionStatus status = node.getValue().getExecutionStatus();
+      if (status == ExecutionStatus.FAILED || status == 
ExecutionStatus.CANCELLED) {
+        anyFailure = true;
+        removeDescendantsFromCanRun(node, dag, canRun);
+        completed.add(node);
+      } else if (status == ExecutionStatus.COMPLETE) {
+        completed.add(node);
+      } else if (status == ExecutionStatus.PENDING) {
+        // Remove PENDING node if its parents are not in canRun, this means 
remove the pending nodes also from canRun set
+        // if its parents cannot run
+        if (!areAllParentsInCanRun(node, canRun)) {
+          canRun.remove(node);
+        }
+      }

Review Comment:
   this could given unexpected results if a new `ExecutionStatus` were ever 
added.  even if other statuses don't call for processing, at least confirm this 
status is among those known, and fail-fast on any other
   
   (like you do w/ `failureOption` below)



##########
gobblin-service/src/main/java/org/apache/gobblin/service/modules/orchestration/proc/DagProcUtils.java:
##########
@@ -309,52 +311,43 @@ public static boolean isDagFinished(Dag<JobExecutionPlan> 
dag) {
       ExecutionStatus status = node.getValue().getExecutionStatus();
       if (status == ExecutionStatus.FAILED || status == 
ExecutionStatus.CANCELLED) {
         anyFailure = true;
-        removeChildrenFromCanRun(node, dag, canRun);
+        removeDescendantsFromCanRun(node, dag, canRun);
         completed.add(node);
       } else if (status == ExecutionStatus.COMPLETE) {
         completed.add(node);
       } else if (status == ExecutionStatus.PENDING) {
         // Remove PENDING node if its parents are not in canRun, this means 
remove the pending nodes also from canRun set
         // if its parents cannot run
-        if (!areParentsInCanRun(node, canRun)) {
+        if (!areAllParentsInCanRun(node, canRun)) {
           canRun.remove(node);
         }
       }
     }
 
-    // In the end, check if there are more nodes in canRun than completed
     assert canRun.size() >= completed.size();
 
     if (!anyFailure || failureOption == 
DagManager.FailureOption.FINISH_ALL_POSSIBLE) {
+      // In the end, check if there are more nodes in canRun than completed
       return canRun.size() == completed.size();
     } else if (failureOption == DagManager.FailureOption.FINISH_RUNNING) {
-      //if all the remaining jobs are pending return true
+      // if all the remaining jobs are pending return true

Review Comment:
   (I reached out over DM, to find out why I'm reading something different)



##########
gobblin-service/src/main/java/org/apache/gobblin/service/modules/orchestration/proc/DagProcUtils.java:
##########
@@ -291,4 +293,81 @@ public static void 
removeFlowFinishDeadlineDagAction(DagManagementStateStore dag
       log.warn("Failed to delete dag action {}", 
enforceFlowFinishDeadlineDagAction);
     }
   }
+
+  /**
+   * Returns true if all dag nodes are finished, and it is not possible to run 
any new dag node.
+   * If failure option is {@link 
org.apache.gobblin.service.modules.orchestration.DagManager.FailureOption#FINISH_RUNNING},
+   * no new jobs should be orchestrated, so even if some job can run, dag 
should be considered finished.
+   */
+  public static boolean isDagFinished(Dag<JobExecutionPlan> dag) {
+    /*
+    The algo for this method is that it adds all the dag nodes into a set 
`canRun` that signifies all the nodes that can
+    run in this dag. This also includes all the jobs that are completed. It 
scans all the nodes and if the node is
+    completed it adds it to the `completed` set; if the node is 
failed/cancelled it removes all its dependant nodes from
+    `canRun` set. In the end if there are more nodes that "canRun" than 
"completed", dag is not finished.
+    For FINISH_RUNNING failure option, there is an additional condition that 
all the remaining `canRun` jobs should already
+    be running/orchestrated/pending_retry/pending_resume. Basically they 
should already be out of PENDING state, in order
+    for dag to be considered "NOT FINISHED".
+     */
+    List<Dag.DagNode<JobExecutionPlan>> nodes = dag.getNodes();
+    Set<Dag.DagNode<JobExecutionPlan>> canRun = new HashSet<>(nodes);
+    Set<Dag.DagNode<JobExecutionPlan>> completed = new HashSet<>();
+    DagManager.FailureOption failureOption = 
DagManagerUtils.getFailureOption(dag);

Review Comment:
   suggest to move decl to line 339



##########
gobblin-service/src/main/java/org/apache/gobblin/service/modules/orchestration/proc/DagProcUtils.java:
##########
@@ -291,4 +293,81 @@ public static void 
removeFlowFinishDeadlineDagAction(DagManagementStateStore dag
       log.warn("Failed to delete dag action {}", 
enforceFlowFinishDeadlineDagAction);
     }
   }
+
+  /**
+   * Returns true if all dag nodes are finished, and it is not possible to run 
any new dag node.
+   * If failure option is {@link 
org.apache.gobblin.service.modules.orchestration.DagManager.FailureOption#FINISH_RUNNING},
+   * no new jobs should be orchestrated, so even if some job can run, dag 
should be considered finished.
+   */
+  public static boolean isDagFinished(Dag<JobExecutionPlan> dag) {
+    /*
+    The algo for this method is that it adds all the dag nodes into a set 
`canRun` that signifies all the nodes that can
+    run in this dag. This also includes all the jobs that are completed. It 
scans all the nodes and if the node is
+    completed it adds it to the `completed` set; if the node is 
failed/cancelled it removes all its dependant nodes from
+    `canRun` set. In the end if there are more nodes that "canRun" than 
"completed", dag is not finished.
+    For FINISH_RUNNING failure option, there is an additional condition that 
all the remaining `canRun` jobs should already
+    be running/orchestrated/pending_retry/pending_resume. Basically they 
should already be out of PENDING state, in order
+    for dag to be considered "NOT FINISHED".
+     */
+    List<Dag.DagNode<JobExecutionPlan>> nodes = dag.getNodes();
+    Set<Dag.DagNode<JobExecutionPlan>> canRun = new HashSet<>(nodes);
+    Set<Dag.DagNode<JobExecutionPlan>> completed = new HashSet<>();
+    DagManager.FailureOption failureOption = 
DagManagerUtils.getFailureOption(dag);
+    boolean anyFailure = false;
+
+    for (Dag.DagNode<JobExecutionPlan> node : nodes) {
+      if (!canRun.contains(node)) {
+        continue;
+      }
+      ExecutionStatus status = node.getValue().getExecutionStatus();
+      if (status == ExecutionStatus.FAILED || status == 
ExecutionStatus.CANCELLED) {
+        anyFailure = true;
+        removeDescendantsFromCanRun(node, dag, canRun);
+        completed.add(node);
+      } else if (status == ExecutionStatus.COMPLETE) {
+        completed.add(node);
+      } else if (status == ExecutionStatus.PENDING) {
+        // Remove PENDING node if its parents are not in canRun, this means 
remove the pending nodes also from canRun set
+        // if its parents cannot run
+        if (!areAllParentsInCanRun(node, canRun)) {
+          canRun.remove(node);
+        }
+      }
+    }
+
+    assert canRun.size() >= completed.size();
+
+    if (!anyFailure || failureOption == 
DagManager.FailureOption.FINISH_ALL_POSSIBLE) {
+      // In the end, check if there are more nodes in canRun than completed
+      return canRun.size() == completed.size();
+    } else if (failureOption == DagManager.FailureOption.FINISH_RUNNING) {
+      // if all the remaining jobs are pending return true
+      canRun.removeAll(completed);
+      List<ExecutionStatus> unfinishedStatuses = Lists.newArrayList(RUNNING, 
PENDING_RESUME, ORCHESTRATED, PENDING_RETRY);
+      return canRun.stream().noneMatch(node -> 
unfinishedStatuses.contains(node.getValue().getExecutionStatus()));
+    } else {
+      throw new RuntimeException("Unexpected failure option " + failureOption);
+    }
+  }
+
+  private static void 
removeDescendantsFromCanRun(Dag.DagNode<JobExecutionPlan> node, 
Dag<JobExecutionPlan> dag,
+      Set<Dag.DagNode<JobExecutionPlan>> canRun) {
+    for (Dag.DagNode<JobExecutionPlan> child : dag.getChildren(node)) {
+      canRun.remove(child);
+      removeDescendantsFromCanRun(child, dag, canRun); // Recursively remove 
all descendants
+    }
+  }
+
+  private static boolean areAllParentsInCanRun(Dag.DagNode<JobExecutionPlan> 
node,
+      Set<Dag.DagNode<JobExecutionPlan>> canRun) {
+    if (node.getParentNodes() == null) {
+      return true;
+    }
+    for (Dag.DagNode<JobExecutionPlan> parent : node.getParentNodes()) {
+      if (!canRun.contains(parent)) {
+        return false; // If any parent is not in canRun, return false
+      }
+    }
+    return true; // All parents are in canRun

Review Comment:
   ```
   return node.getParentNodes() == null
       || node.getParentNodes().stream().allMatch(parent -> 
canRun.contains(parent));
   ```



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: [email protected]

For queries about this service, please contact Infrastructure at:
[email protected]

Reply via email to