phet commented on code in PR #4037:
URL: https://github.com/apache/gobblin/pull/4037#discussion_r1730679368


##########
gobblin-service/src/main/java/org/apache/gobblin/service/modules/orchestration/proc/DagProcUtils.java:
##########
@@ -291,4 +291,79 @@ public static void 
removeFlowFinishDeadlineDagAction(DagManagementStateStore dag
       log.warn("Failed to delete dag action {}", 
enforceFlowFinishDeadlineDagAction);
     }
   }
+
+  /**
+   * Returns true if all dag nodes are finished, and it is not possible to run 
any new dag node.
+   */
+  public static boolean isDagFinished(Dag<JobExecutionPlan> dag) {
+    List<Dag.DagNode<JobExecutionPlan>> nodes = dag.getNodes();
+    Set<Dag.DagNode<JobExecutionPlan>> canRun = new HashSet<>(nodes);
+    Set<Dag.DagNode<JobExecutionPlan>> completed = new HashSet<>();
+    DagManager.FailureOption failureOption = 
DagManagerUtils.getFailureOption(dag);
+    boolean anyFailure = false;
+
+    for (Dag.DagNode<JobExecutionPlan> node : nodes) {
+      if (!canRun.contains(node)) {
+        continue;
+      }
+      ExecutionStatus status = node.getValue().getExecutionStatus();
+      if (status == ExecutionStatus.FAILED || status == 
ExecutionStatus.CANCELLED) {
+        anyFailure = true;
+        removeChildrenFromCanRun(node, dag, canRun);
+        completed.add(node);
+      } else if (status == ExecutionStatus.COMPLETE) {
+        completed.add(node);

Review Comment:
   I don't totally understand the logic.  whey the node is `COMPLETE`, why keep 
it in `canRun`?



##########
gobblin-service/src/main/java/org/apache/gobblin/service/modules/orchestration/proc/DagProcUtils.java:
##########
@@ -291,4 +291,79 @@ public static void 
removeFlowFinishDeadlineDagAction(DagManagementStateStore dag
       log.warn("Failed to delete dag action {}", 
enforceFlowFinishDeadlineDagAction);
     }
   }
+
+  /**
+   * Returns true if all dag nodes are finished, and it is not possible to run 
any new dag node.
+   */
+  public static boolean isDagFinished(Dag<JobExecutionPlan> dag) {
+    List<Dag.DagNode<JobExecutionPlan>> nodes = dag.getNodes();
+    Set<Dag.DagNode<JobExecutionPlan>> canRun = new HashSet<>(nodes);
+    Set<Dag.DagNode<JobExecutionPlan>> completed = new HashSet<>();
+    DagManager.FailureOption failureOption = 
DagManagerUtils.getFailureOption(dag);
+    boolean anyFailure = false;
+
+    for (Dag.DagNode<JobExecutionPlan> node : nodes) {
+      if (!canRun.contains(node)) {
+        continue;
+      }
+      ExecutionStatus status = node.getValue().getExecutionStatus();
+      if (status == ExecutionStatus.FAILED || status == 
ExecutionStatus.CANCELLED) {
+        anyFailure = true;
+        removeChildrenFromCanRun(node, dag, canRun);
+        completed.add(node);
+      } else if (status == ExecutionStatus.COMPLETE) {
+        completed.add(node);
+      } else if (status == ExecutionStatus.PENDING) {
+        // Remove PENDING node if its parents are not in canRun, this means 
remove the pending nodes also from canRun set
+        // if its parents cannot run
+        if (!areParentsInCanRun(node, canRun)) {
+          canRun.remove(node);
+        }
+      }
+    }
+
+    // In the end, check if there are more nodes in canRun than completed
+    assert canRun.size() >= completed.size();
+
+    if (!anyFailure || failureOption == 
DagManager.FailureOption.FINISH_ALL_POSSIBLE) {
+      return canRun.size() == completed.size();
+    } else if (failureOption == DagManager.FailureOption.FINISH_RUNNING) {
+      //if all the remaining jobs are pending return true
+      canRun.removeAll(completed);
+      boolean isFinished = true;
+      for (Dag.DagNode<JobExecutionPlan> remainingNode : canRun) {
+        if (remainingNode.getValue().getExecutionStatus() == 
ExecutionStatus.RUNNING ||
+            remainingNode.getValue().getExecutionStatus() == 
ExecutionStatus.PENDING_RESUME ||
+            remainingNode.getValue().getExecutionStatus() == 
ExecutionStatus.ORCHESTRATED ||
+            remainingNode.getValue().getExecutionStatus() == 
ExecutionStatus.PENDING_RETRY) {
+          isFinished = false;
+          break;
+        }
+      }
+      return isFinished;
+    } else {
+      throw new RuntimeException("Unexpected failure option " + failureOption);
+    }
+  }
+
+  private static void removeChildrenFromCanRun(Dag.DagNode<JobExecutionPlan> 
node, Dag<JobExecutionPlan> dag,
+      Set<Dag.DagNode<JobExecutionPlan>> canRun) {
+    for (Dag.DagNode<JobExecutionPlan> child : dag.getChildren(node)) {
+      canRun.remove(child);
+      removeChildrenFromCanRun(child, dag, canRun); // Recursively remove all 
descendants
+    }
+  }
+
+  private static boolean areParentsInCanRun(Dag.DagNode<JobExecutionPlan> node,

Review Comment:
   `areAllParentsInCanRun` / `isEveryParentInCanRun`?



##########
gobblin-service/src/main/java/org/apache/gobblin/service/modules/orchestration/DagManagerUtils.java:
##########
@@ -219,6 +219,7 @@ public static Set<DagNode<JobExecutionPlan>> 
getNext(Dag<JobExecutionPlan> dag)
         switch (failureOption) {
           case FINISH_RUNNING:
             return new HashSet<>();
+          // todo - FINISH_ALL_POSSIBLE should probably `continue` not `break`

Review Comment:
   seems a simple enough change... modify now?



##########
gobblin-service/src/test/java/org/apache/gobblin/service/modules/orchestration/DagManagerUtilsTest.java:
##########
@@ -38,4 +59,299 @@ public void testGetJobSpecFromDag() throws Exception {
       Assert.assertEquals(jobSpec.getConfigAsProperties().get(key), 
jobSpec.getConfig().getString(key));
     }
   }
+
+  @Test
+  public void testIsDagFinished() throws URISyntaxException {
+    long flowExecutionId = 12345L;
+    String flowGroup = "fg";
+    String flowName = "fn";
+
+    Dag<JobExecutionPlan> dag = DagManagerTest.buildDag("1", flowExecutionId, 
DagManager.FailureOption.FINISH_ALL_POSSIBLE.name(),
+        1, "user5", ConfigFactory.empty()
+            .withValue(ConfigurationKeys.FLOW_GROUP_KEY, 
ConfigValueFactory.fromAnyRef(flowGroup))
+            .withValue(ConfigurationKeys.FLOW_NAME_KEY, 
ConfigValueFactory.fromAnyRef(flowName))
+            .withValue(ConfigurationKeys.JOB_GROUP_KEY, 
ConfigValueFactory.fromAnyRef(flowGroup))
+            .withValue(ConfigurationKeys.SPECEXECUTOR_INSTANCE_URI_KEY, 
ConfigValueFactory.fromAnyRef(
+                MySqlDagManagementStateStoreTest.TEST_SPEC_EXECUTOR_URI))
+    );
+
+    setJobStatuses(dag, Collections.singletonList(COMPLETE));
+    Assert.assertTrue(DagProcUtils.isDagFinished(dag));
+
+    setJobStatuses(dag, Collections.singletonList(FAILED));
+    Assert.assertTrue(DagProcUtils.isDagFinished(dag));
+
+    setJobStatuses(dag, Collections.singletonList(CANCELLED));
+    Assert.assertTrue(DagProcUtils.isDagFinished(dag));
+
+    setJobStatuses(dag, Collections.singletonList(PENDING));
+    Assert.assertFalse(DagProcUtils.isDagFinished(dag));
+
+    setJobStatuses(dag, Collections.singletonList(PENDING_RETRY));
+    Assert.assertFalse(DagProcUtils.isDagFinished(dag));
+
+    setJobStatuses(dag, Collections.singletonList(PENDING_RESUME));
+    Assert.assertFalse(DagProcUtils.isDagFinished(dag));
+
+    setJobStatuses(dag, Collections.singletonList(RUNNING));
+    Assert.assertFalse(DagProcUtils.isDagFinished(dag));
+
+    dag = DagManagerTest.buildDag("1", flowExecutionId, 
DagManager.FailureOption.FINISH_ALL_POSSIBLE.name(),

Review Comment:
   rather than one single massive test method, could this be the start of a 
second test (e.g. single hop vs. multi)?
   
   (this same advice goes for each of the several other re-assignments to `dag` 
that follow)



##########
gobblin-service/src/main/java/org/apache/gobblin/service/modules/orchestration/proc/DagProcUtils.java:
##########
@@ -291,4 +291,79 @@ public static void 
removeFlowFinishDeadlineDagAction(DagManagementStateStore dag
       log.warn("Failed to delete dag action {}", 
enforceFlowFinishDeadlineDagAction);
     }
   }
+
+  /**
+   * Returns true if all dag nodes are finished, and it is not possible to run 
any new dag node.
+   */
+  public static boolean isDagFinished(Dag<JobExecutionPlan> dag) {
+    List<Dag.DagNode<JobExecutionPlan>> nodes = dag.getNodes();
+    Set<Dag.DagNode<JobExecutionPlan>> canRun = new HashSet<>(nodes);
+    Set<Dag.DagNode<JobExecutionPlan>> completed = new HashSet<>();
+    DagManager.FailureOption failureOption = 
DagManagerUtils.getFailureOption(dag);
+    boolean anyFailure = false;
+
+    for (Dag.DagNode<JobExecutionPlan> node : nodes) {
+      if (!canRun.contains(node)) {
+        continue;
+      }
+      ExecutionStatus status = node.getValue().getExecutionStatus();
+      if (status == ExecutionStatus.FAILED || status == 
ExecutionStatus.CANCELLED) {
+        anyFailure = true;
+        removeChildrenFromCanRun(node, dag, canRun);
+        completed.add(node);
+      } else if (status == ExecutionStatus.COMPLETE) {
+        completed.add(node);
+      } else if (status == ExecutionStatus.PENDING) {
+        // Remove PENDING node if its parents are not in canRun, this means 
remove the pending nodes also from canRun set
+        // if its parents cannot run
+        if (!areParentsInCanRun(node, canRun)) {
+          canRun.remove(node);
+        }
+      }
+    }
+
+    // In the end, check if there are more nodes in canRun than completed
+    assert canRun.size() >= completed.size();
+
+    if (!anyFailure || failureOption == 
DagManager.FailureOption.FINISH_ALL_POSSIBLE) {
+      return canRun.size() == completed.size();
+    } else if (failureOption == DagManager.FailureOption.FINISH_RUNNING) {
+      //if all the remaining jobs are pending return true
+      canRun.removeAll(completed);
+      boolean isFinished = true;
+      for (Dag.DagNode<JobExecutionPlan> remainingNode : canRun) {
+        if (remainingNode.getValue().getExecutionStatus() == 
ExecutionStatus.RUNNING ||
+            remainingNode.getValue().getExecutionStatus() == 
ExecutionStatus.PENDING_RESUME ||
+            remainingNode.getValue().getExecutionStatus() == 
ExecutionStatus.ORCHESTRATED ||
+            remainingNode.getValue().getExecutionStatus() == 
ExecutionStatus.PENDING_RETRY) {
+          isFinished = false;
+          break;
+        }
+      }
+      return isFinished;

Review Comment:
   ```
   List<ExecutionStatus> unfinishedStatuses = Lists.newArrayList(RUNNING, 
PENIDNG_RESUME, ORCHESTRATED, PENDING_RETRY);
   return canRun.stream().anyMatch(node -> 
unfinishedStatuses.contains(node.getValue().getExecutionStatus());
   ```
   ?



##########
gobblin-service/src/test/java/org/apache/gobblin/service/modules/orchestration/DagManagerUtilsTest.java:
##########
@@ -38,4 +59,299 @@ public void testGetJobSpecFromDag() throws Exception {
       Assert.assertEquals(jobSpec.getConfigAsProperties().get(key), 
jobSpec.getConfig().getString(key));
     }
   }
+
+  @Test
+  public void testIsDagFinished() throws URISyntaxException {

Review Comment:
   given the method lives in `DagProcUtils`, why put it into 
`DagManagerUtilsTest`?



##########
gobblin-service/src/main/java/org/apache/gobblin/service/modules/orchestration/proc/DagProcUtils.java:
##########
@@ -291,4 +291,79 @@ public static void 
removeFlowFinishDeadlineDagAction(DagManagementStateStore dag
       log.warn("Failed to delete dag action {}", 
enforceFlowFinishDeadlineDagAction);
     }
   }
+
+  /**
+   * Returns true if all dag nodes are finished, and it is not possible to run 
any new dag node.
+   */
+  public static boolean isDagFinished(Dag<JobExecutionPlan> dag) {
+    List<Dag.DagNode<JobExecutionPlan>> nodes = dag.getNodes();
+    Set<Dag.DagNode<JobExecutionPlan>> canRun = new HashSet<>(nodes);
+    Set<Dag.DagNode<JobExecutionPlan>> completed = new HashSet<>();
+    DagManager.FailureOption failureOption = 
DagManagerUtils.getFailureOption(dag);
+    boolean anyFailure = false;
+
+    for (Dag.DagNode<JobExecutionPlan> node : nodes) {
+      if (!canRun.contains(node)) {
+        continue;
+      }
+      ExecutionStatus status = node.getValue().getExecutionStatus();
+      if (status == ExecutionStatus.FAILED || status == 
ExecutionStatus.CANCELLED) {
+        anyFailure = true;
+        removeChildrenFromCanRun(node, dag, canRun);

Review Comment:
   is there a specific ordering guarantee for `DagNode<T>::getNodes`?  how do 
we know this parent would be executed after the children, such that it's 
children don't get added to `canRun`, even though their parent's failure should 
mean the child isn't ready to run?



##########
gobblin-service/src/main/java/org/apache/gobblin/service/modules/orchestration/proc/DagProcUtils.java:
##########
@@ -291,4 +291,79 @@ public static void 
removeFlowFinishDeadlineDagAction(DagManagementStateStore dag
       log.warn("Failed to delete dag action {}", 
enforceFlowFinishDeadlineDagAction);
     }
   }
+
+  /**
+   * Returns true if all dag nodes are finished, and it is not possible to run 
any new dag node.
+   */
+  public static boolean isDagFinished(Dag<JobExecutionPlan> dag) {
+    List<Dag.DagNode<JobExecutionPlan>> nodes = dag.getNodes();
+    Set<Dag.DagNode<JobExecutionPlan>> canRun = new HashSet<>(nodes);
+    Set<Dag.DagNode<JobExecutionPlan>> completed = new HashSet<>();
+    DagManager.FailureOption failureOption = 
DagManagerUtils.getFailureOption(dag);
+    boolean anyFailure = false;
+
+    for (Dag.DagNode<JobExecutionPlan> node : nodes) {
+      if (!canRun.contains(node)) {
+        continue;
+      }
+      ExecutionStatus status = node.getValue().getExecutionStatus();
+      if (status == ExecutionStatus.FAILED || status == 
ExecutionStatus.CANCELLED) {
+        anyFailure = true;
+        removeChildrenFromCanRun(node, dag, canRun);
+        completed.add(node);
+      } else if (status == ExecutionStatus.COMPLETE) {
+        completed.add(node);
+      } else if (status == ExecutionStatus.PENDING) {
+        // Remove PENDING node if its parents are not in canRun, this means 
remove the pending nodes also from canRun set
+        // if its parents cannot run
+        if (!areParentsInCanRun(node, canRun)) {
+          canRun.remove(node);
+        }
+      }
+    }
+
+    // In the end, check if there are more nodes in canRun than completed
+    assert canRun.size() >= completed.size();
+
+    if (!anyFailure || failureOption == 
DagManager.FailureOption.FINISH_ALL_POSSIBLE) {
+      return canRun.size() == completed.size();
+    } else if (failureOption == DagManager.FailureOption.FINISH_RUNNING) {
+      //if all the remaining jobs are pending return true
+      canRun.removeAll(completed);
+      boolean isFinished = true;
+      for (Dag.DagNode<JobExecutionPlan> remainingNode : canRun) {
+        if (remainingNode.getValue().getExecutionStatus() == 
ExecutionStatus.RUNNING ||
+            remainingNode.getValue().getExecutionStatus() == 
ExecutionStatus.PENDING_RESUME ||
+            remainingNode.getValue().getExecutionStatus() == 
ExecutionStatus.ORCHESTRATED ||
+            remainingNode.getValue().getExecutionStatus() == 
ExecutionStatus.PENDING_RETRY) {
+          isFinished = false;
+          break;
+        }
+      }
+      return isFinished;
+    } else {
+      throw new RuntimeException("Unexpected failure option " + failureOption);
+    }
+  }
+
+  private static void removeChildrenFromCanRun(Dag.DagNode<JobExecutionPlan> 
node, Dag<JobExecutionPlan> dag,

Review Comment:
   seems this should be named `removeDescendantsFromCanRun`



##########
gobblin-service/src/test/java/org/apache/gobblin/service/modules/orchestration/DagManagerUtilsTest.java:
##########
@@ -38,4 +59,299 @@ public void testGetJobSpecFromDag() throws Exception {
       Assert.assertEquals(jobSpec.getConfigAsProperties().get(key), 
jobSpec.getConfig().getString(key));
     }
   }
+
+  @Test
+  public void testIsDagFinished() throws URISyntaxException {
+    long flowExecutionId = 12345L;
+    String flowGroup = "fg";
+    String flowName = "fn";
+
+    Dag<JobExecutionPlan> dag = DagManagerTest.buildDag("1", flowExecutionId, 
DagManager.FailureOption.FINISH_ALL_POSSIBLE.name(),
+        1, "user5", ConfigFactory.empty()
+            .withValue(ConfigurationKeys.FLOW_GROUP_KEY, 
ConfigValueFactory.fromAnyRef(flowGroup))
+            .withValue(ConfigurationKeys.FLOW_NAME_KEY, 
ConfigValueFactory.fromAnyRef(flowName))
+            .withValue(ConfigurationKeys.JOB_GROUP_KEY, 
ConfigValueFactory.fromAnyRef(flowGroup))
+            .withValue(ConfigurationKeys.SPECEXECUTOR_INSTANCE_URI_KEY, 
ConfigValueFactory.fromAnyRef(
+                MySqlDagManagementStateStoreTest.TEST_SPEC_EXECUTOR_URI))
+    );
+
+    setJobStatuses(dag, Collections.singletonList(COMPLETE));
+    Assert.assertTrue(DagProcUtils.isDagFinished(dag));
+
+    setJobStatuses(dag, Collections.singletonList(FAILED));
+    Assert.assertTrue(DagProcUtils.isDagFinished(dag));
+
+    setJobStatuses(dag, Collections.singletonList(CANCELLED));
+    Assert.assertTrue(DagProcUtils.isDagFinished(dag));
+
+    setJobStatuses(dag, Collections.singletonList(PENDING));
+    Assert.assertFalse(DagProcUtils.isDagFinished(dag));
+
+    setJobStatuses(dag, Collections.singletonList(PENDING_RETRY));
+    Assert.assertFalse(DagProcUtils.isDagFinished(dag));
+
+    setJobStatuses(dag, Collections.singletonList(PENDING_RESUME));
+    Assert.assertFalse(DagProcUtils.isDagFinished(dag));
+
+    setJobStatuses(dag, Collections.singletonList(RUNNING));
+    Assert.assertFalse(DagProcUtils.isDagFinished(dag));

Review Comment:
   didn't the impl have `ORCHESTRATED` as another one?



##########
gobblin-service/src/test/java/org/apache/gobblin/service/modules/orchestration/DagManagerUtilsTest.java:
##########
@@ -38,4 +59,299 @@ public void testGetJobSpecFromDag() throws Exception {
       Assert.assertEquals(jobSpec.getConfigAsProperties().get(key), 
jobSpec.getConfig().getString(key));
     }
   }
+
+  @Test
+  public void testIsDagFinished() throws URISyntaxException {
+    long flowExecutionId = 12345L;
+    String flowGroup = "fg";
+    String flowName = "fn";
+
+    Dag<JobExecutionPlan> dag = DagManagerTest.buildDag("1", flowExecutionId, 
DagManager.FailureOption.FINISH_ALL_POSSIBLE.name(),
+        1, "user5", ConfigFactory.empty()
+            .withValue(ConfigurationKeys.FLOW_GROUP_KEY, 
ConfigValueFactory.fromAnyRef(flowGroup))
+            .withValue(ConfigurationKeys.FLOW_NAME_KEY, 
ConfigValueFactory.fromAnyRef(flowName))
+            .withValue(ConfigurationKeys.JOB_GROUP_KEY, 
ConfigValueFactory.fromAnyRef(flowGroup))
+            .withValue(ConfigurationKeys.SPECEXECUTOR_INSTANCE_URI_KEY, 
ConfigValueFactory.fromAnyRef(
+                MySqlDagManagementStateStoreTest.TEST_SPEC_EXECUTOR_URI))
+    );
+
+    setJobStatuses(dag, Collections.singletonList(COMPLETE));
+    Assert.assertTrue(DagProcUtils.isDagFinished(dag));
+
+    setJobStatuses(dag, Collections.singletonList(FAILED));
+    Assert.assertTrue(DagProcUtils.isDagFinished(dag));
+
+    setJobStatuses(dag, Collections.singletonList(CANCELLED));
+    Assert.assertTrue(DagProcUtils.isDagFinished(dag));
+
+    setJobStatuses(dag, Collections.singletonList(PENDING));
+    Assert.assertFalse(DagProcUtils.isDagFinished(dag));
+
+    setJobStatuses(dag, Collections.singletonList(PENDING_RETRY));
+    Assert.assertFalse(DagProcUtils.isDagFinished(dag));
+
+    setJobStatuses(dag, Collections.singletonList(PENDING_RESUME));
+    Assert.assertFalse(DagProcUtils.isDagFinished(dag));
+
+    setJobStatuses(dag, Collections.singletonList(RUNNING));
+    Assert.assertFalse(DagProcUtils.isDagFinished(dag));
+
+    dag = DagManagerTest.buildDag("1", flowExecutionId, 
DagManager.FailureOption.FINISH_ALL_POSSIBLE.name(),
+        2, "user5", ConfigFactory.empty()
+            .withValue(ConfigurationKeys.FLOW_GROUP_KEY, 
ConfigValueFactory.fromAnyRef(flowGroup))
+            .withValue(ConfigurationKeys.FLOW_NAME_KEY, 
ConfigValueFactory.fromAnyRef(flowName))
+            .withValue(ConfigurationKeys.JOB_GROUP_KEY, 
ConfigValueFactory.fromAnyRef(flowGroup))
+            .withValue(ConfigurationKeys.SPECEXECUTOR_INSTANCE_URI_KEY, 
ConfigValueFactory.fromAnyRef(
+                MySqlDagManagementStateStoreTest.TEST_SPEC_EXECUTOR_URI))
+    );
+
+    setJobStatuses(dag, Arrays.asList(COMPLETE, PENDING));
+    Assert.assertFalse(DagProcUtils.isDagFinished(dag));
+
+    setJobStatuses(dag, Arrays.asList(COMPLETE, FAILED));
+    Assert.assertTrue(DagProcUtils.isDagFinished(dag));
+
+    setJobStatuses(dag, Arrays.asList(FAILED, PENDING));
+    Assert.assertTrue(DagProcUtils.isDagFinished(dag));
+
+    setJobStatuses(dag, Arrays.asList(CANCELLED, PENDING));
+    Assert.assertTrue(DagProcUtils.isDagFinished(dag));
+
+    dag = buildComplexDag2("1", flowExecutionId, 
DagManager.FailureOption.FINISH_ALL_POSSIBLE.name(), "user5",
+        ConfigFactory.empty()
+            .withValue(ConfigurationKeys.FLOW_GROUP_KEY, 
ConfigValueFactory.fromAnyRef(flowGroup))
+            .withValue(ConfigurationKeys.FLOW_NAME_KEY, 
ConfigValueFactory.fromAnyRef(flowName))
+            .withValue(ConfigurationKeys.JOB_GROUP_KEY, 
ConfigValueFactory.fromAnyRef(flowGroup))
+            .withValue(ConfigurationKeys.SPECEXECUTOR_INSTANCE_URI_KEY, 
ConfigValueFactory.fromAnyRef(
+                MySqlDagManagementStateStoreTest.TEST_SPEC_EXECUTOR_URI)));
+
+    setJobStatuses(dag, Arrays.asList(COMPLETE, PENDING, PENDING, PENDING));
+    Assert.assertFalse(DagProcUtils.isDagFinished(dag));
+
+    setJobStatuses(dag, Arrays.asList(FAILED, PENDING, PENDING, PENDING));
+    Assert.assertTrue(DagProcUtils.isDagFinished(dag));
+
+    setJobStatuses(dag, Arrays.asList(CANCELLED, PENDING, PENDING, PENDING));
+    Assert.assertTrue(DagProcUtils.isDagFinished(dag));
+
+    setJobStatuses(dag, Arrays.asList(PENDING, PENDING, PENDING, PENDING));
+    Assert.assertFalse(DagProcUtils.isDagFinished(dag));
+
+    dag = buildComplexDag3("1", flowExecutionId, 
DagManager.FailureOption.FINISH_ALL_POSSIBLE.name(), "user5",
+        ConfigFactory.empty()
+            .withValue(ConfigurationKeys.FLOW_GROUP_KEY, 
ConfigValueFactory.fromAnyRef(flowGroup))
+            .withValue(ConfigurationKeys.FLOW_NAME_KEY, 
ConfigValueFactory.fromAnyRef(flowName))
+            .withValue(ConfigurationKeys.JOB_GROUP_KEY, 
ConfigValueFactory.fromAnyRef(flowGroup))
+            .withValue(ConfigurationKeys.SPECEXECUTOR_INSTANCE_URI_KEY, 
ConfigValueFactory.fromAnyRef(
+                MySqlDagManagementStateStoreTest.TEST_SPEC_EXECUTOR_URI)));
+
+    setJobStatuses(dag, Arrays.asList(COMPLETE, PENDING, PENDING));
+    Assert.assertFalse(DagProcUtils.isDagFinished(dag));
+
+    setJobStatuses(dag, Arrays.asList(COMPLETE, FAILED, PENDING));
+    Assert.assertTrue(DagProcUtils.isDagFinished(dag));
+
+    setJobStatuses(dag, Arrays.asList(COMPLETE, CANCELLED, PENDING));
+    Assert.assertTrue(DagProcUtils.isDagFinished(dag));
+
+    dag = buildComplexDag1("1", flowExecutionId,
+        DagManager.FailureOption.FINISH_ALL_POSSIBLE.name(),"user5", 
ConfigFactory.empty()
+            .withValue(ConfigurationKeys.FLOW_GROUP_KEY, 
ConfigValueFactory.fromAnyRef(flowGroup))
+            .withValue(ConfigurationKeys.FLOW_NAME_KEY,  
ConfigValueFactory.fromAnyRef(flowName))
+            .withValue(ConfigurationKeys.SPECEXECUTOR_INSTANCE_URI_KEY, 
ConfigValueFactory.fromAnyRef(
+                MySqlDagManagementStateStoreTest.TEST_SPEC_EXECUTOR_URI)));
+
+    setJobStatuses(dag, Arrays.asList(COMPLETE, COMPLETE, COMPLETE, COMPLETE, 
COMPLETE, COMPLETE, COMPLETE, COMPLETE, COMPLETE, COMPLETE));
+    Assert.assertTrue(DagProcUtils.isDagFinished(dag));
+
+    setJobStatuses(dag, Arrays.asList(COMPLETE, COMPLETE, COMPLETE, COMPLETE, 
PENDING, COMPLETE, COMPLETE, PENDING, COMPLETE, PENDING));
+    Assert.assertFalse(DagProcUtils.isDagFinished(dag));
+
+    setJobStatuses(dag, Arrays.asList(FAILED, COMPLETE, COMPLETE, COMPLETE, 
PENDING, COMPLETE, COMPLETE, PENDING, COMPLETE, PENDING));
+    Assert.assertTrue(DagProcUtils.isDagFinished(dag));
+
+    setJobStatuses(dag, Arrays.asList(COMPLETE, COMPLETE, COMPLETE, COMPLETE, 
COMPLETE, CANCELLED, COMPLETE, PENDING, PENDING, PENDING));
+    Assert.assertFalse(DagProcUtils.isDagFinished(dag));
+
+    setJobStatuses(dag, Arrays.asList(COMPLETE, COMPLETE, COMPLETE, COMPLETE, 
COMPLETE, CANCELLED, COMPLETE, COMPLETE, PENDING, PENDING));
+    Assert.assertTrue(DagProcUtils.isDagFinished(dag));
+
+    setJobStatuses(dag, Arrays.asList(COMPLETE, COMPLETE, COMPLETE, COMPLETE, 
COMPLETE, PENDING_RESUME, COMPLETE, COMPLETE, PENDING, PENDING));
+    Assert.assertFalse(DagProcUtils.isDagFinished(dag));
+
+    setJobStatuses(dag, Arrays.asList(COMPLETE, COMPLETE, COMPLETE, COMPLETE, 
COMPLETE, PENDING_RETRY, COMPLETE, COMPLETE, PENDING, PENDING));
+    Assert.assertFalse(DagProcUtils.isDagFinished(dag));
+
+    setJobStatuses(dag, Arrays.asList(COMPLETE, COMPLETE, COMPLETE, COMPLETE, 
COMPLETE, RUNNING, COMPLETE, COMPLETE, PENDING, PENDING));
+    Assert.assertFalse(DagProcUtils.isDagFinished(dag));
+
+    setJobStatuses(dag, Arrays.asList(COMPLETE, COMPLETE, COMPLETE, FAILED, 
COMPLETE, COMPLETE, PENDING, COMPLETE, PENDING, COMPLETE));
+    Assert.assertFalse(DagProcUtils.isDagFinished(dag));
+  }
+
+  @Test
+  public void testIsDagFinishedWithFinishRunningFailureOption() throws 
URISyntaxException {
+    long flowExecutionId = 12345L;
+    String flowGroup = "fg";
+    String flowName = "fn";
+    Dag<JobExecutionPlan> dag = buildComplexDag4("1", flowExecutionId, 
DagManager.FailureOption.FINISH_RUNNING.name(), "user5",
+        ConfigFactory.empty()
+            .withValue(ConfigurationKeys.FLOW_GROUP_KEY, 
ConfigValueFactory.fromAnyRef(flowGroup))
+            .withValue(ConfigurationKeys.FLOW_NAME_KEY, 
ConfigValueFactory.fromAnyRef(flowName))
+            .withValue(ConfigurationKeys.JOB_GROUP_KEY, 
ConfigValueFactory.fromAnyRef(flowGroup))
+            .withValue(ConfigurationKeys.SPECEXECUTOR_INSTANCE_URI_KEY, 
ConfigValueFactory.fromAnyRef(
+                MySqlDagManagementStateStoreTest.TEST_SPEC_EXECUTOR_URI)));
+
+    setJobStatuses(dag, Arrays.asList(COMPLETE, CANCELLED, COMPLETE, PENDING, 
PENDING));
+    Assert.assertTrue(DagProcUtils.isDagFinished(dag));
+
+    setJobStatuses(dag, Arrays.asList(COMPLETE, CANCELLED, COMPLETE, RUNNING, 
PENDING));
+    Assert.assertFalse(DagProcUtils.isDagFinished(dag));
+  }
+
+  private void setJobStatuses(Dag<JobExecutionPlan> dag, List<ExecutionStatus> 
statuses) {
+    int i=0;
+    for (ExecutionStatus status : statuses) {
+      dag.getNodes().get(i++).getValue().setExecutionStatus(status);
+    }
+  }
+
+  // This creates a dag like this
+  //  D0  D1  D2  D3
+  //  |   |   | \ |
+  //  D4  D5  |  D6
+  //  |   |  \|
+  //  D7  |   D8
+  //    \ |  /
+  //      D9
+
+  public static Dag<JobExecutionPlan> buildComplexDag1(String id, long 
flowExecutionId,
+      String flowFailureOption, String proxyUser, Config additionalConfig) 
throws URISyntaxException {
+    List<JobExecutionPlan> jobExecutionPlans = new ArrayList<>();
+
+    for (int i = 0; i < 10; i++) {
+      String suffix = Integer.toString(i);
+      Config jobConfig = ConfigBuilder.create().
+          addPrimitive(ConfigurationKeys.FLOW_GROUP_KEY, "group" + id).
+          addPrimitive(ConfigurationKeys.FLOW_NAME_KEY, "flow" + id).
+          addPrimitive(ConfigurationKeys.FLOW_EXECUTION_ID_KEY, 
flowExecutionId).
+          addPrimitive(ConfigurationKeys.JOB_GROUP_KEY, "group" + id).
+          addPrimitive(ConfigurationKeys.JOB_NAME_KEY, "job" + suffix).
+          addPrimitive(ConfigurationKeys.FLOW_FAILURE_OPTION, 
flowFailureOption).
+          addPrimitive(AzkabanProjectConfig.USER_TO_PROXY, proxyUser).build();
+      jobConfig = additionalConfig.withFallback(jobConfig);
+      if (i == 4) {
+        jobConfig = jobConfig.withValue(ConfigurationKeys.JOB_DEPENDENCIES, 
ConfigValueFactory.fromAnyRef("job0"));
+      } else if (i == 5) {
+        jobConfig = jobConfig.withValue(ConfigurationKeys.JOB_DEPENDENCIES, 
ConfigValueFactory.fromAnyRef("job1"));
+      } if (i == 6) {
+        jobConfig = jobConfig.withValue(ConfigurationKeys.JOB_DEPENDENCIES, 
ConfigValueFactory.fromAnyRef("job2,job3"));
+      } else if (i == 7) {
+        jobConfig = jobConfig.withValue(ConfigurationKeys.JOB_DEPENDENCIES, 
ConfigValueFactory.fromAnyRef("job4"));
+      } else if (i == 8) {
+        jobConfig = jobConfig.withValue(ConfigurationKeys.JOB_DEPENDENCIES, 
ConfigValueFactory.fromAnyRef("job5,job2"));
+      } else if (i == 9) {
+        jobConfig = jobConfig.withValue(ConfigurationKeys.JOB_DEPENDENCIES, 
ConfigValueFactory.fromAnyRef("job7,job5,job8"));
+      }
+      JobSpec js = JobSpec.builder("test_job" + 
suffix).withVersion(suffix).withConfig(jobConfig).
+          withTemplate(new URI("job" + suffix)).build();
+      SpecExecutor specExecutor = 
MockedSpecExecutor.createDummySpecExecutor(new URI(
+          ConfigUtils.getString(additionalConfig, 
ConfigurationKeys.SPECEXECUTOR_INSTANCE_URI_KEY,"job" + i)));
+      JobExecutionPlan jobExecutionPlan = new JobExecutionPlan(js, 
specExecutor);
+      jobExecutionPlans.add(jobExecutionPlan);
+    }
+    return new JobExecutionPlanDagFactory().createDag(jobExecutionPlans);
+  }
+
+  public static Dag<JobExecutionPlan> buildComplexDag2(String id, long 
flowExecutionId,

Review Comment:
   the picture for the one before was helpful.  what about something analogous 
here?
   
   (same w/ the others that follow)



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: [email protected]

For queries about this service, please contact Infrastructure at:
[email protected]

Reply via email to