This is an automated email from the ASF dual-hosted git repository.

arjun4084346 pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/gobblin.git


The following commit(s) were added to refs/heads/master by this push:
     new b28b468091 [GOBBLIN-2142] fix bug in determining if a dag is finished 
or not (#4037)
b28b468091 is described below

commit b28b468091e26eb632292ee46fd8aa1e97bbb15e
Author: Arjun Singh Bora <[email protected]>
AuthorDate: Mon Aug 26 14:43:59 2024 -0700

    [GOBBLIN-2142] fix bug in determining if a dag is finished or not (#4037)
    
    * fix bug in determining if a dag is finished or not
    * address review comments
---
 .../orchestration/DagManagementStateStore.java     |   6 -
 .../modules/orchestration/DagManagerUtils.java     |   3 +-
 .../MySqlDagManagementStateStore.java              |   8 -
 .../modules/orchestration/proc/DagProcUtils.java   |  77 ++++-
 .../orchestration/proc/ReevaluateDagProc.java      |   6 +-
 .../modules/orchestration/DagManagerUtilsTest.java | 358 +++++++++++++++++++++
 .../orchestration/proc/KillDagProcTest.java        |   1 +
 .../orchestration/proc/LaunchDagProcTest.java      |   8 +
 .../orchestration/proc/ReevaluateDagProcTest.java  |  18 +-
 .../orchestration/proc/ResumeDagProcTest.java      |   3 +
 10 files changed, 464 insertions(+), 24 deletions(-)

diff --git 
a/gobblin-service/src/main/java/org/apache/gobblin/service/modules/orchestration/DagManagementStateStore.java
 
b/gobblin-service/src/main/java/org/apache/gobblin/service/modules/orchestration/DagManagementStateStore.java
index 139082545b..0a8514f274 100644
--- 
a/gobblin-service/src/main/java/org/apache/gobblin/service/modules/orchestration/DagManagementStateStore.java
+++ 
b/gobblin-service/src/main/java/org/apache/gobblin/service/modules/orchestration/DagManagementStateStore.java
@@ -143,12 +143,6 @@ public interface DagManagementStateStore {
    */
   Optional<JobStatus> getJobStatus(DagNodeId dagNodeId);
 
-  /**
-   * Returns true if the {@link Dag} identified by the given {@link 
org.apache.gobblin.service.modules.orchestration.DagManager.DagId}
-   * has any running job, false otherwise.
-   */
-  boolean hasRunningJobs(DagManager.DagId dagId) throws IOException;
-
   /**
    * Check if an action exists in dagAction store by flow group, flow name, 
flow execution id, and job name.
    * @param flowGroup flow group for the dag action
diff --git 
a/gobblin-service/src/main/java/org/apache/gobblin/service/modules/orchestration/DagManagerUtils.java
 
b/gobblin-service/src/main/java/org/apache/gobblin/service/modules/orchestration/DagManagerUtils.java
index 45ad84f91a..a6f6d527bf 100644
--- 
a/gobblin-service/src/main/java/org/apache/gobblin/service/modules/orchestration/DagManagerUtils.java
+++ 
b/gobblin-service/src/main/java/org/apache/gobblin/service/modules/orchestration/DagManagerUtils.java
@@ -219,6 +219,7 @@ public class DagManagerUtils {
         switch (failureOption) {
           case FINISH_RUNNING:
             return new HashSet<>();
+          // todo - FINISH_ALL_POSSIBLE should probably `continue` not `break`
           case FINISH_ALL_POSSIBLE:
           default:
             break;
@@ -228,7 +229,7 @@ public class DagManagerUtils {
     return nextNodesToExecute;
   }
 
-  static FailureOption getFailureOption(Dag<JobExecutionPlan> dag) {
+  public static FailureOption getFailureOption(Dag<JobExecutionPlan> dag) {
     if (dag.isEmpty()) {
       return null;
     }
diff --git 
a/gobblin-service/src/main/java/org/apache/gobblin/service/modules/orchestration/MySqlDagManagementStateStore.java
 
b/gobblin-service/src/main/java/org/apache/gobblin/service/modules/orchestration/MySqlDagManagementStateStore.java
index c0984f835b..45ee013c7d 100644
--- 
a/gobblin-service/src/main/java/org/apache/gobblin/service/modules/orchestration/MySqlDagManagementStateStore.java
+++ 
b/gobblin-service/src/main/java/org/apache/gobblin/service/modules/orchestration/MySqlDagManagementStateStore.java
@@ -43,7 +43,6 @@ import org.apache.gobblin.runtime.spec_catalog.FlowCatalog;
 import org.apache.gobblin.service.modules.flowgraph.Dag;
 import org.apache.gobblin.service.modules.flowgraph.DagNodeId;
 import org.apache.gobblin.service.modules.spec.JobExecutionPlan;
-import org.apache.gobblin.service.monitoring.FlowStatusGenerator;
 import org.apache.gobblin.service.monitoring.JobStatus;
 import org.apache.gobblin.service.monitoring.JobStatusRetriever;
 import org.apache.gobblin.util.ConfigUtils;
@@ -209,13 +208,6 @@ public class MySqlDagManagementStateStore implements 
DagManagementStateStore {
     }
   }
 
-
-  @Override
-  public boolean hasRunningJobs(DagManager.DagId dagId) throws IOException {
-    return getDagNodes(dagId).stream()
-        .anyMatch(node -> 
!FlowStatusGenerator.FINISHED_STATUSES.contains(node.getValue().getExecutionStatus().name()));
-  }
-
   @Override
   public boolean existsJobDagAction(String flowGroup, String flowName, long 
flowExecutionId, String jobName,
       DagActionStore.DagActionType dagActionType) throws IOException {
diff --git 
a/gobblin-service/src/main/java/org/apache/gobblin/service/modules/orchestration/proc/DagProcUtils.java
 
b/gobblin-service/src/main/java/org/apache/gobblin/service/modules/orchestration/proc/DagProcUtils.java
index 289454502f..0a9f6dcd67 100644
--- 
a/gobblin-service/src/main/java/org/apache/gobblin/service/modules/orchestration/proc/DagProcUtils.java
+++ 
b/gobblin-service/src/main/java/org/apache/gobblin/service/modules/orchestration/proc/DagProcUtils.java
@@ -20,6 +20,7 @@ package org.apache.gobblin.service.modules.orchestration.proc;
 import java.io.IOException;
 import java.sql.SQLIntegrityConstraintViolationException;
 import java.util.Collections;
+import java.util.HashSet;
 import java.util.List;
 import java.util.Map;
 import java.util.Properties;
@@ -54,7 +55,7 @@ import 
org.apache.gobblin.service.monitoring.JobStatusRetriever;
 import org.apache.gobblin.util.ConfigUtils;
 import org.apache.gobblin.util.PropertiesUtils;
 
-import static org.apache.gobblin.service.ExecutionStatus.CANCELLED;
+import static org.apache.gobblin.service.ExecutionStatus.*;
 
 
 /**
@@ -175,7 +176,7 @@ public class DagProcUtils {
       }
       
DagManagerUtils.getSpecProducer(dagNodeToCancel).cancelJob(dagNodeToCancel.getValue().getJobSpec().getUri(),
 cancelJobArgs).get();
       // add back the dag node with updated states in the store
-      dagNodeToCancel.getValue().setExecutionStatus(CANCELLED);
+      dagNodeToCancel.getValue().setExecutionStatus(ExecutionStatus.CANCELLED);
       dagManagementStateStore.addDagNodeState(dagNodeToCancel, dagId);
       // send cancellation event after updating the state, because 
cancellation event triggers a ReevaluateDagAction
       // that will delete the dag. Due to race condition between adding dag 
node and deleting dag, state store may get
@@ -291,4 +292,76 @@ public class DagProcUtils {
       log.warn("Failed to delete dag action {}", 
enforceFlowFinishDeadlineDagAction);
     }
   }
+
+  /**
+   * Returns true if all dag nodes are finished, and it is not possible to run 
any new dag node.
+   * If failure option is {@link 
org.apache.gobblin.service.modules.orchestration.DagManager.FailureOption#FINISH_RUNNING},
+   * no new jobs should be orchestrated, so even if some job can run, dag 
should be considered finished.
+   */
+  public static boolean isDagFinished(Dag<JobExecutionPlan> dag) {
+    /*
+    The algo for this method is that it adds all the dag nodes into a set 
`canRun` that signifies all the nodes that can
+    run in this dag. This also includes all the jobs that are completed. It 
scans all the nodes and if the node is
+    completed it adds it to the `completed` set; if the node is 
failed/cancelled it removes all its dependant nodes from
+    `canRun` set. In the end if there are more nodes that "canRun" than 
"completed", dag is not finished.
+    For FINISH_RUNNING failure option, there is an additional condition that 
all the remaining `canRun` jobs should already
+    be running/orchestrated/pending_retry/pending_resume. Basically they 
should already be out of PENDING state, in order
+    for dag to be considered "NOT FINISHED".
+     */
+    List<Dag.DagNode<JobExecutionPlan>> nodes = dag.getNodes();
+    Set<Dag.DagNode<JobExecutionPlan>> canRun = new HashSet<>(nodes);
+    Set<Dag.DagNode<JobExecutionPlan>> completed = new HashSet<>();
+    boolean anyFailure = false;
+
+    for (Dag.DagNode<JobExecutionPlan> node : nodes) {
+      if (!canRun.contains(node)) {
+        continue;
+      }
+      ExecutionStatus status = node.getValue().getExecutionStatus();
+      if (status == ExecutionStatus.FAILED || status == 
ExecutionStatus.CANCELLED) {
+        anyFailure = true;
+        removeDescendantsFromCanRun(node, dag, canRun);
+        completed.add(node);
+      } else if (status == ExecutionStatus.COMPLETE) {
+        completed.add(node);
+      } else if (status == ExecutionStatus.PENDING) {
+        // Remove PENDING node if its parents are not in canRun, this means 
remove the pending nodes also from canRun set
+        // if its parents cannot run
+        if (!areAllParentsInCanRun(node, canRun)) {
+          canRun.remove(node);
+        }
+      } else if (!(status == COMPILED || status == PENDING_RESUME || status == 
PENDING_RETRY || status == ORCHESTRATED ||
+                  status == RUNNING)) {
+        throw new RuntimeException("Unexpected status " + status + " for dag 
node " + node);
+      }
+    }
+
+    assert canRun.size() >= completed.size();
+
+    DagManager.FailureOption failureOption = 
DagManagerUtils.getFailureOption(dag);
+
+    if (!anyFailure || failureOption == 
DagManager.FailureOption.FINISH_ALL_POSSIBLE) {
+      // In the end, check if there are more nodes in canRun than completed
+      return canRun.size() == completed.size();
+    } else if (failureOption == DagManager.FailureOption.FINISH_RUNNING) {
+      // if all the remaining jobs are pending/compiled (basically not started 
yet) return true
+      canRun.removeAll(completed);
+      return canRun.stream().allMatch(node -> 
(node.getValue().getExecutionStatus() == PENDING || 
node.getValue().getExecutionStatus() == COMPILED));
+    } else {
+      throw new RuntimeException("Unexpected failure option " + failureOption);
+    }
+  }
+
+  private static void 
removeDescendantsFromCanRun(Dag.DagNode<JobExecutionPlan> node, 
Dag<JobExecutionPlan> dag,
+      Set<Dag.DagNode<JobExecutionPlan>> canRun) {
+    for (Dag.DagNode<JobExecutionPlan> child : dag.getChildren(node)) {
+      canRun.remove(child);
+      removeDescendantsFromCanRun(child, dag, canRun); // Recursively remove 
all descendants
+    }
+  }
+
+  private static boolean areAllParentsInCanRun(Dag.DagNode<JobExecutionPlan> 
node,
+      Set<Dag.DagNode<JobExecutionPlan>> canRun) {
+    return node.getParentNodes() == null || 
canRun.containsAll(node.getParentNodes());
+  }
 }
diff --git 
a/gobblin-service/src/main/java/org/apache/gobblin/service/modules/orchestration/proc/ReevaluateDagProc.java
 
b/gobblin-service/src/main/java/org/apache/gobblin/service/modules/orchestration/proc/ReevaluateDagProc.java
index d55d5a425e..ef554f6418 100644
--- 
a/gobblin-service/src/main/java/org/apache/gobblin/service/modules/orchestration/proc/ReevaluateDagProc.java
+++ 
b/gobblin-service/src/main/java/org/apache/gobblin/service/modules/orchestration/proc/ReevaluateDagProc.java
@@ -104,7 +104,7 @@ public class ReevaluateDagProc extends 
DagProc<Pair<Optional<Dag.DagNode<JobExec
       // set to PASS, which would be incorrect.
       dag.setFlowEvent(null);
       DagProcUtils.submitJobToExecutor(dagManagementStateStore, dagNode, 
getDagId());
-    } else if (!dagManagementStateStore.hasRunningJobs(getDagId())) {
+    } else if (DagProcUtils.isDagFinished(dag)) {
       if (dag.getFlowEvent() == null) {
         // If the dag flow event is not set and there are no more jobs 
running, then it is successful
         // also note that `onJobFinish` method does whatever is required to do 
after job finish, determining a Dag's
@@ -160,7 +160,9 @@ public class ReevaluateDagProc extends 
DagProc<Pair<Optional<Dag.DagNode<JobExec
         break;
       case COMPLETE:
         
dagManagementStateStore.getDagManagerMetrics().incrementExecutorSuccess(dagNode);
-        DagProcUtils.submitNextNodes(dagManagementStateStore, dag, getDagId());
+        if (!DagProcUtils.isDagFinished(dag)) { // this may fail when dag 
failure option is finish_running and some dag node has failed
+          DagProcUtils.submitNextNodes(dagManagementStateStore, dag, 
getDagId());
+        }
         break;
       default:
         log.warn("It should not reach here. Job status {} is unexpected.", 
executionStatus);
diff --git 
a/gobblin-service/src/test/java/org/apache/gobblin/service/modules/orchestration/DagManagerUtilsTest.java
 
b/gobblin-service/src/test/java/org/apache/gobblin/service/modules/orchestration/DagManagerUtilsTest.java
index 63c521749e..ab6509e30d 100644
--- 
a/gobblin-service/src/test/java/org/apache/gobblin/service/modules/orchestration/DagManagerUtilsTest.java
+++ 
b/gobblin-service/src/test/java/org/apache/gobblin/service/modules/orchestration/DagManagerUtilsTest.java
@@ -17,15 +17,48 @@
 
 package org.apache.gobblin.service.modules.orchestration;
 
+import java.net.URI;
+import java.net.URISyntaxException;
+import java.util.ArrayList;
+import java.util.Arrays;
+import java.util.Collections;
+import java.util.List;
+
 import org.testng.Assert;
 import org.testng.annotations.Test;
 
+import com.typesafe.config.Config;
+import com.typesafe.config.ConfigFactory;
+import com.typesafe.config.ConfigValueFactory;
+
+import org.apache.gobblin.config.ConfigBuilder;
+import org.apache.gobblin.configuration.ConfigurationKeys;
 import org.apache.gobblin.runtime.api.JobSpec;
+import org.apache.gobblin.runtime.api.SpecExecutor;
+import org.apache.gobblin.runtime.spec_executorInstance.MockedSpecExecutor;
+import org.apache.gobblin.service.ExecutionStatus;
 import org.apache.gobblin.service.modules.flowgraph.Dag;
+import org.apache.gobblin.service.modules.orchestration.proc.DagProcUtils;
 import org.apache.gobblin.service.modules.spec.JobExecutionPlan;
+import org.apache.gobblin.service.modules.spec.JobExecutionPlanDagFactory;
+import org.apache.gobblin.util.ConfigUtils;
+
+import static org.apache.gobblin.service.ExecutionStatus.*;
 
 
 public class DagManagerUtilsTest {
+  static String id = "1";
+  static String flowGroup = "fg";
+  static String flowName = "fn";
+  static long flowExecutionId = 12345L;
+  static String flowFailureOption = 
DagManager.FailureOption.FINISH_ALL_POSSIBLE.name();
+  static String proxyUser = "user5";
+  static Config additionalConfig = ConfigFactory.empty()
+      .withValue(ConfigurationKeys.FLOW_GROUP_KEY, 
ConfigValueFactory.fromAnyRef(flowGroup))
+      .withValue(ConfigurationKeys.FLOW_NAME_KEY, 
ConfigValueFactory.fromAnyRef(flowName))
+      .withValue(ConfigurationKeys.JOB_GROUP_KEY, 
ConfigValueFactory.fromAnyRef(flowGroup))
+      .withValue(ConfigurationKeys.SPECEXECUTOR_INSTANCE_URI_KEY, 
ConfigValueFactory.fromAnyRef(
+          MySqlDagManagementStateStoreTest.TEST_SPEC_EXECUTOR_URI));
 
   @Test
   public void testGetJobSpecFromDag() throws Exception {
@@ -38,4 +71,329 @@ public class DagManagerUtilsTest {
       Assert.assertEquals(jobSpec.getConfigAsProperties().get(key), 
jobSpec.getConfig().getString(key));
     }
   }
+
+  @Test
+  public void testIsDagFinishedSingleNode() throws URISyntaxException {
+    Dag<JobExecutionPlan> dag =
+        DagManagerTest.buildDag(id, flowExecutionId, flowFailureOption, 1, 
proxyUser, additionalConfig);
+
+    setJobStatuses(dag, Collections.singletonList(COMPLETE));
+    Assert.assertTrue(DagProcUtils.isDagFinished(dag));
+
+    setJobStatuses(dag, Collections.singletonList(FAILED));
+    Assert.assertTrue(DagProcUtils.isDagFinished(dag));
+
+    setJobStatuses(dag, Collections.singletonList(CANCELLED));
+    Assert.assertTrue(DagProcUtils.isDagFinished(dag));
+
+    setJobStatuses(dag, Collections.singletonList(PENDING));
+    Assert.assertFalse(DagProcUtils.isDagFinished(dag));
+
+    setJobStatuses(dag, Collections.singletonList(PENDING_RETRY));
+    Assert.assertFalse(DagProcUtils.isDagFinished(dag));
+
+    setJobStatuses(dag, Collections.singletonList(PENDING_RESUME));
+    Assert.assertFalse(DagProcUtils.isDagFinished(dag));
+
+    setJobStatuses(dag, Collections.singletonList(ORCHESTRATED));
+    Assert.assertFalse(DagProcUtils.isDagFinished(dag));
+
+    setJobStatuses(dag, Collections.singletonList(RUNNING));
+    Assert.assertFalse(DagProcUtils.isDagFinished(dag));
+  }
+
+  @Test
+  public void testIsDagFinishedTwoNodes() throws URISyntaxException {
+    Dag<JobExecutionPlan> dag =
+        DagManagerTest.buildDag(id, flowExecutionId, flowFailureOption, 2, 
proxyUser, additionalConfig);
+
+    setJobStatuses(dag, Arrays.asList(COMPLETE, PENDING));
+    Assert.assertFalse(DagProcUtils.isDagFinished(dag));
+
+    setJobStatuses(dag, Arrays.asList(COMPLETE, FAILED));
+    Assert.assertTrue(DagProcUtils.isDagFinished(dag));
+
+    setJobStatuses(dag, Arrays.asList(FAILED, PENDING));
+    Assert.assertTrue(DagProcUtils.isDagFinished(dag));
+
+    setJobStatuses(dag, Arrays.asList(CANCELLED, PENDING));
+    Assert.assertTrue(DagProcUtils.isDagFinished(dag));
+  }
+
+  @Test
+  public void testIsDagFinishedThreeNodes() throws URISyntaxException {
+    Dag<JobExecutionPlan> dag = buildComplexDag3();
+
+    setJobStatuses(dag, Arrays.asList(COMPLETE, PENDING, PENDING));
+    Assert.assertFalse(DagProcUtils.isDagFinished(dag));
+
+    setJobStatuses(dag, Arrays.asList(COMPLETE, FAILED, PENDING));
+    Assert.assertTrue(DagProcUtils.isDagFinished(dag));
+
+    setJobStatuses(dag, Arrays.asList(COMPLETE, CANCELLED, PENDING));
+    Assert.assertTrue(DagProcUtils.isDagFinished(dag));
+  }
+
+  @Test
+  public void testIsDagFinishedFourNodes() throws URISyntaxException {
+    Dag<JobExecutionPlan> dag = buildLinearDagOf4Nodes();
+
+    setJobStatuses(dag, Arrays.asList(COMPLETE, PENDING, PENDING, PENDING));
+    Assert.assertFalse(DagProcUtils.isDagFinished(dag));
+
+    setJobStatuses(dag, Arrays.asList(FAILED, PENDING, PENDING, PENDING));
+    Assert.assertTrue(DagProcUtils.isDagFinished(dag));
+
+    setJobStatuses(dag, Arrays.asList(CANCELLED, PENDING, PENDING, PENDING));
+    Assert.assertTrue(DagProcUtils.isDagFinished(dag));
+
+    setJobStatuses(dag, Arrays.asList(PENDING, PENDING, PENDING, PENDING));
+    Assert.assertFalse(DagProcUtils.isDagFinished(dag));
+  }
+
+  @Test
+  public void testIsDagFinishedMultiNodes() throws URISyntaxException {
+    Dag<JobExecutionPlan> dag = buildComplexDag1();
+    setJobStatuses(dag, Arrays.asList(COMPLETE, COMPLETE, COMPLETE, COMPLETE, 
COMPLETE, COMPLETE, COMPLETE, COMPLETE, COMPLETE, COMPLETE));
+    Assert.assertTrue(DagProcUtils.isDagFinished(dag));
+    Collections.shuffle(dag.getNodes());
+    Assert.assertTrue(DagProcUtils.isDagFinished(dag));
+
+    Dag<JobExecutionPlan> dag2 = buildComplexDag1();
+    setJobStatuses(dag2, Arrays.asList(COMPLETE, COMPLETE, COMPLETE, COMPLETE, 
PENDING, COMPLETE, COMPLETE, PENDING, COMPLETE, PENDING));
+    Assert.assertFalse(DagProcUtils.isDagFinished(dag2));
+    Collections.shuffle(dag2.getNodes());
+    Assert.assertFalse(DagProcUtils.isDagFinished(dag2));
+
+    Dag<JobExecutionPlan> dag3 = buildComplexDag1();
+    setJobStatuses(dag3, Arrays.asList(FAILED, COMPLETE, COMPLETE, COMPLETE, 
PENDING, COMPLETE, COMPLETE, PENDING, COMPLETE, PENDING));
+    Assert.assertTrue(DagProcUtils.isDagFinished(dag3));
+    Collections.shuffle(dag3.getNodes());
+    Assert.assertTrue(DagProcUtils.isDagFinished(dag3));
+
+    Dag<JobExecutionPlan> dag4 = buildComplexDag1();
+    setJobStatuses(dag4, Arrays.asList(COMPLETE, COMPLETE, COMPLETE, COMPLETE, 
COMPLETE, CANCELLED, COMPLETE, PENDING, PENDING, PENDING));
+    Assert.assertFalse(DagProcUtils.isDagFinished(dag4));
+    Collections.shuffle(dag4.getNodes());
+    Assert.assertFalse(DagProcUtils.isDagFinished(dag4));
+
+    Dag<JobExecutionPlan> dag5 = buildComplexDag1();
+    setJobStatuses(dag5, Arrays.asList(COMPLETE, COMPLETE, COMPLETE, COMPLETE, 
COMPLETE, CANCELLED, COMPLETE, COMPLETE, PENDING, PENDING));
+    Assert.assertTrue(DagProcUtils.isDagFinished(dag5));
+    Collections.shuffle(dag5.getNodes());
+    Assert.assertTrue(DagProcUtils.isDagFinished(dag5));
+
+    Dag<JobExecutionPlan> dag6 = buildComplexDag1();
+    setJobStatuses(dag6, Arrays.asList(COMPLETE, COMPLETE, COMPLETE, COMPLETE, 
COMPLETE, PENDING_RESUME, COMPLETE, COMPLETE, PENDING, PENDING));
+    Assert.assertFalse(DagProcUtils.isDagFinished(dag6));
+    Collections.shuffle(dag6.getNodes());
+    Assert.assertFalse(DagProcUtils.isDagFinished(dag6));
+
+    Dag<JobExecutionPlan> dag7 = buildComplexDag1();
+    setJobStatuses(dag7, Arrays.asList(COMPLETE, COMPLETE, COMPLETE, COMPLETE, 
COMPLETE, PENDING_RETRY, COMPLETE, COMPLETE, PENDING, PENDING));
+    Assert.assertFalse(DagProcUtils.isDagFinished(dag7));
+    Collections.shuffle(dag7.getNodes());
+    Assert.assertFalse(DagProcUtils.isDagFinished(dag7));
+
+    Dag<JobExecutionPlan> dag8 = buildComplexDag1();
+    setJobStatuses(dag8, Arrays.asList(COMPLETE, COMPLETE, COMPLETE, COMPLETE, 
COMPLETE, RUNNING, COMPLETE, COMPLETE, PENDING, PENDING));
+    Assert.assertFalse(DagProcUtils.isDagFinished(dag8));
+    Collections.shuffle(dag8.getNodes());
+    Assert.assertFalse(DagProcUtils.isDagFinished(dag8));
+
+    Dag<JobExecutionPlan> dag9 = buildComplexDag1();
+    setJobStatuses(dag9, Arrays.asList(COMPLETE, COMPLETE, COMPLETE, FAILED, 
COMPLETE, COMPLETE, PENDING, COMPLETE, PENDING, COMPLETE));
+    Assert.assertFalse(DagProcUtils.isDagFinished(dag9));
+    Collections.shuffle(dag9.getNodes());
+    Assert.assertFalse(DagProcUtils.isDagFinished(dag9));
+  }
+
+  @Test
+  public void testIsDagFinishedWithFinishRunningFailureOptionTwoNodes() throws 
URISyntaxException {
+    Dag<JobExecutionPlan> dag =
+        DagManagerTest.buildDag(id, flowExecutionId, flowFailureOption, 2, 
proxyUser, additionalConfig);
+
+    setJobStatuses(dag, Arrays.asList(FAILED, PENDING));
+    Assert.assertTrue(DagProcUtils.isDagFinished(dag));
+
+    setJobStatuses(dag, Arrays.asList(CANCELLED, PENDING));
+    Assert.assertTrue(DagProcUtils.isDagFinished(dag));
+  }
+
+  @Test
+  public void testIsDagFinishedWithFinishRunningFailureOptionMultiNodes() 
throws URISyntaxException {
+    Dag<JobExecutionPlan> dag = 
buildComplexDagWithFinishRunningFailureOption();
+
+    setJobStatuses(dag, Arrays.asList(COMPLETE, CANCELLED, COMPLETE, PENDING, 
PENDING));
+    Assert.assertTrue(DagProcUtils.isDagFinished(dag));
+
+    setJobStatuses(dag, Arrays.asList(COMPLETE, CANCELLED, COMPLETE, RUNNING, 
PENDING));
+    Assert.assertFalse(DagProcUtils.isDagFinished(dag));
+  }
+
+  private void setJobStatuses(Dag<JobExecutionPlan> dag, List<ExecutionStatus> 
statuses) {
+    int i=0;
+    for (ExecutionStatus status : statuses) {
+      dag.getNodes().get(i++).getValue().setExecutionStatus(status);
+    }
+  }
+
+  // This creates a dag like this
+  //  D0  D1  D2  D3
+  //  |   |   | \ |
+  //  D4  D5  |  D6
+  //  |   |  \|
+  //  D7  |   D8
+  //    \ |  /
+  //      D9
+
+  public static Dag<JobExecutionPlan> buildComplexDag1() throws 
URISyntaxException {
+    List<JobExecutionPlan> jobExecutionPlans = new ArrayList<>();
+    String id = "1";
+    String flowGroup = "fg";
+    String flowName = "fn";
+    long flowExecutionId = 12345L;
+    String flowFailureOption = 
DagManager.FailureOption.FINISH_ALL_POSSIBLE.name();
+    String proxyUser = "user5";
+    Config additionalConfig = ConfigFactory.empty()
+        .withValue(ConfigurationKeys.FLOW_GROUP_KEY, 
ConfigValueFactory.fromAnyRef(flowGroup))
+        .withValue(ConfigurationKeys.FLOW_NAME_KEY, 
ConfigValueFactory.fromAnyRef(flowName))
+        .withValue(ConfigurationKeys.JOB_GROUP_KEY, 
ConfigValueFactory.fromAnyRef(flowGroup))
+        .withValue(ConfigurationKeys.SPECEXECUTOR_INSTANCE_URI_KEY, 
ConfigValueFactory.fromAnyRef(
+            MySqlDagManagementStateStoreTest.TEST_SPEC_EXECUTOR_URI));
+
+    for (int i = 0; i < 10; i++) {
+      String suffix = Integer.toString(i);
+      Config jobConfig = ConfigBuilder.create().
+          addPrimitive(ConfigurationKeys.FLOW_GROUP_KEY, "group" + id).
+          addPrimitive(ConfigurationKeys.FLOW_NAME_KEY, "flow" + id).
+          addPrimitive(ConfigurationKeys.FLOW_EXECUTION_ID_KEY, 
flowExecutionId).
+          addPrimitive(ConfigurationKeys.JOB_GROUP_KEY, "group" + id).
+          addPrimitive(ConfigurationKeys.JOB_NAME_KEY, "job" + suffix).
+          addPrimitive(ConfigurationKeys.FLOW_FAILURE_OPTION, 
flowFailureOption).
+          addPrimitive(AzkabanProjectConfig.USER_TO_PROXY, proxyUser).build();
+      jobConfig = additionalConfig.withFallback(jobConfig);
+      if (i == 4) {
+        jobConfig = jobConfig.withValue(ConfigurationKeys.JOB_DEPENDENCIES, 
ConfigValueFactory.fromAnyRef("job0"));
+      } else if (i == 5) {
+        jobConfig = jobConfig.withValue(ConfigurationKeys.JOB_DEPENDENCIES, 
ConfigValueFactory.fromAnyRef("job1"));
+      } if (i == 6) {
+        jobConfig = jobConfig.withValue(ConfigurationKeys.JOB_DEPENDENCIES, 
ConfigValueFactory.fromAnyRef("job2,job3"));
+      } else if (i == 7) {
+        jobConfig = jobConfig.withValue(ConfigurationKeys.JOB_DEPENDENCIES, 
ConfigValueFactory.fromAnyRef("job4"));
+      } else if (i == 8) {
+        jobConfig = jobConfig.withValue(ConfigurationKeys.JOB_DEPENDENCIES, 
ConfigValueFactory.fromAnyRef("job5,job2"));
+      } else if (i == 9) {
+        jobConfig = jobConfig.withValue(ConfigurationKeys.JOB_DEPENDENCIES, 
ConfigValueFactory.fromAnyRef("job7,job5,job8"));
+      }
+      JobSpec js = JobSpec.builder("test_job" + 
suffix).withVersion(suffix).withConfig(jobConfig).
+          withTemplate(new URI("job" + suffix)).build();
+      SpecExecutor specExecutor = 
MockedSpecExecutor.createDummySpecExecutor(new URI(
+          ConfigUtils.getString(additionalConfig, 
ConfigurationKeys.SPECEXECUTOR_INSTANCE_URI_KEY,"job" + i)));
+      JobExecutionPlan jobExecutionPlan = new JobExecutionPlan(js, 
specExecutor);
+      jobExecutionPlans.add(jobExecutionPlan);
+    }
+    return new JobExecutionPlanDagFactory().createDag(jobExecutionPlans);
+  }
+
+  // This creates a dag like this
+  // D0 -> D1 -> D2 -> D3
+  public static Dag<JobExecutionPlan> buildLinearDagOf4Nodes() throws 
URISyntaxException {
+    List<JobExecutionPlan> jobExecutionPlans = new ArrayList<>();
+
+    for (int i = 0; i < 4; i++) {
+      String suffix = Integer.toString(i);
+      Config jobConfig = ConfigBuilder.create().
+          addPrimitive(ConfigurationKeys.FLOW_GROUP_KEY, "group" + id).
+          addPrimitive(ConfigurationKeys.FLOW_NAME_KEY, "flow" + id).
+          addPrimitive(ConfigurationKeys.FLOW_EXECUTION_ID_KEY, 
flowExecutionId).
+          addPrimitive(ConfigurationKeys.JOB_GROUP_KEY, "group" + id).
+          addPrimitive(ConfigurationKeys.JOB_NAME_KEY, "job" + suffix).
+          addPrimitive(ConfigurationKeys.FLOW_FAILURE_OPTION, 
flowFailureOption).
+          addPrimitive(AzkabanProjectConfig.USER_TO_PROXY, proxyUser).build();
+      jobConfig = additionalConfig.withFallback(jobConfig);
+      if (i == 1) {
+        jobConfig = jobConfig.withValue(ConfigurationKeys.JOB_DEPENDENCIES, 
ConfigValueFactory.fromAnyRef("job0"));
+      } else if (i == 2) {
+        jobConfig = jobConfig.withValue(ConfigurationKeys.JOB_DEPENDENCIES, 
ConfigValueFactory.fromAnyRef("job1"));
+      } if (i == 3) {
+        jobConfig = jobConfig.withValue(ConfigurationKeys.JOB_DEPENDENCIES, 
ConfigValueFactory.fromAnyRef("job2"));
+      }
+      JobSpec js = JobSpec.builder("test_job" + 
suffix).withVersion(suffix).withConfig(jobConfig).
+          withTemplate(new URI("job" + suffix)).build();
+      SpecExecutor specExecutor = 
MockedSpecExecutor.createDummySpecExecutor(new URI(
+          ConfigUtils.getString(additionalConfig, 
ConfigurationKeys.SPECEXECUTOR_INSTANCE_URI_KEY,"job" + i)));
+      JobExecutionPlan jobExecutionPlan = new JobExecutionPlan(js, 
specExecutor);
+      jobExecutionPlans.add(jobExecutionPlan);
+    }
+    return new JobExecutionPlanDagFactory().createDag(jobExecutionPlans);
+  }
+
+  // This creates a dag like this
+  // D0  D1
+  //   \/
+  //   D2
+  public static Dag<JobExecutionPlan> buildComplexDag3() throws 
URISyntaxException {
+    List<JobExecutionPlan> jobExecutionPlans = new ArrayList<>();
+
+    for (int i = 0; i < 3; i++) {
+      String suffix = Integer.toString(i);
+      Config jobConfig = ConfigBuilder.create().
+          addPrimitive(ConfigurationKeys.FLOW_GROUP_KEY, "group" + id).
+          addPrimitive(ConfigurationKeys.FLOW_NAME_KEY, "flow" + id).
+          addPrimitive(ConfigurationKeys.FLOW_EXECUTION_ID_KEY, 
flowExecutionId).
+          addPrimitive(ConfigurationKeys.JOB_GROUP_KEY, "group" + id).
+          addPrimitive(ConfigurationKeys.JOB_NAME_KEY, "job" + suffix).
+          addPrimitive(ConfigurationKeys.FLOW_FAILURE_OPTION, 
flowFailureOption).
+          addPrimitive(AzkabanProjectConfig.USER_TO_PROXY, proxyUser).build();
+      jobConfig = additionalConfig.withFallback(jobConfig);
+      if (i == 2) {
+        jobConfig = jobConfig.withValue(ConfigurationKeys.JOB_DEPENDENCIES, 
ConfigValueFactory.fromAnyRef("job0,job1"));
+      }
+      JobSpec js = JobSpec.builder("test_job" + 
suffix).withVersion(suffix).withConfig(jobConfig).
+          withTemplate(new URI("job" + suffix)).build();
+      SpecExecutor specExecutor = 
MockedSpecExecutor.createDummySpecExecutor(new URI(
+          ConfigUtils.getString(additionalConfig, 
ConfigurationKeys.SPECEXECUTOR_INSTANCE_URI_KEY,"job" + i)));
+      JobExecutionPlan jobExecutionPlan = new JobExecutionPlan(js, 
specExecutor);
+      jobExecutionPlans.add(jobExecutionPlan);
+    }
+    return new JobExecutionPlanDagFactory().createDag(jobExecutionPlans);
+  }
+
+  // This creates a dag like this
+  //   D0
+  //  / \
+  // D1  D2
+  //    / \
+  //   D3  D4
+  public static Dag<JobExecutionPlan> 
buildComplexDagWithFinishRunningFailureOption() throws URISyntaxException {
+    List<JobExecutionPlan> jobExecutionPlans = new ArrayList<>();
+
+    for (int i = 0; i < 5; i++) {
+      String suffix = Integer.toString(i);
+      Config jobConfig = ConfigBuilder.create().
+          addPrimitive(ConfigurationKeys.FLOW_GROUP_KEY, "group" + id).
+          addPrimitive(ConfigurationKeys.FLOW_NAME_KEY, "flow" + id).
+          addPrimitive(ConfigurationKeys.FLOW_EXECUTION_ID_KEY, 
flowExecutionId).
+          addPrimitive(ConfigurationKeys.JOB_GROUP_KEY, "group" + id).
+          addPrimitive(ConfigurationKeys.JOB_NAME_KEY, "job" + suffix).
+          addPrimitive(ConfigurationKeys.FLOW_FAILURE_OPTION, 
DagManager.FailureOption.FINISH_RUNNING.name()).
+          addPrimitive(AzkabanProjectConfig.USER_TO_PROXY, proxyUser).build();
+      jobConfig = additionalConfig.withFallback(jobConfig);
+      if (i == 1) {
+        jobConfig = jobConfig.withValue(ConfigurationKeys.JOB_DEPENDENCIES, 
ConfigValueFactory.fromAnyRef("job0"));
+      } else if (i == 2) {
+        jobConfig = jobConfig.withValue(ConfigurationKeys.JOB_DEPENDENCIES, 
ConfigValueFactory.fromAnyRef("job0"));
+      } else if (i == 3 || i == 4) {
+        jobConfig = jobConfig.withValue(ConfigurationKeys.JOB_DEPENDENCIES, 
ConfigValueFactory.fromAnyRef("job2"));
+      }
+      JobSpec js = JobSpec.builder("test_job" + 
suffix).withVersion(suffix).withConfig(jobConfig).
+          withTemplate(new URI("job" + suffix)).build();
+      SpecExecutor specExecutor = 
MockedSpecExecutor.createDummySpecExecutor(new URI(
+          ConfigUtils.getString(additionalConfig, 
ConfigurationKeys.SPECEXECUTOR_INSTANCE_URI_KEY,"job" + i)));
+      JobExecutionPlan jobExecutionPlan = new JobExecutionPlan(js, 
specExecutor);
+      jobExecutionPlans.add(jobExecutionPlan);
+    }
+    return new JobExecutionPlanDagFactory().createDag(jobExecutionPlans);
+  }
 }
diff --git 
a/gobblin-service/src/test/java/org/apache/gobblin/service/modules/orchestration/proc/KillDagProcTest.java
 
b/gobblin-service/src/test/java/org/apache/gobblin/service/modules/orchestration/proc/KillDagProcTest.java
index cd34b47cb9..33dc8f49e2 100644
--- 
a/gobblin-service/src/test/java/org/apache/gobblin/service/modules/orchestration/proc/KillDagProcTest.java
+++ 
b/gobblin-service/src/test/java/org/apache/gobblin/service/modules/orchestration/proc/KillDagProcTest.java
@@ -180,6 +180,7 @@ public class KillDagProcTest {
             .withValue(ConfigurationKeys.FLOW_GROUP_KEY, 
ConfigValueFactory.fromAnyRef("fg"))
             .withValue(ConfigurationKeys.SPECEXECUTOR_INSTANCE_URI_KEY, 
ConfigValueFactory.fromAnyRef(
                 MySqlDagManagementStateStoreTest.TEST_SPEC_EXECUTOR_URI)));
+    DagManager.DagId dagId = DagManagerUtils.generateDagId(dag);
     FlowCompilationValidationHelper flowCompilationValidationHelper = 
mock(FlowCompilationValidationHelper.class);
     JobStatus
         jobStatus = 
JobStatus.builder().flowName("job0").flowGroup("fg").jobGroup("fg").jobName("job0").flowExecutionId(flowExecutionId).
diff --git 
a/gobblin-service/src/test/java/org/apache/gobblin/service/modules/orchestration/proc/LaunchDagProcTest.java
 
b/gobblin-service/src/test/java/org/apache/gobblin/service/modules/orchestration/proc/LaunchDagProcTest.java
index 99caa9a005..ab426dedf1 100644
--- 
a/gobblin-service/src/test/java/org/apache/gobblin/service/modules/orchestration/proc/LaunchDagProcTest.java
+++ 
b/gobblin-service/src/test/java/org/apache/gobblin/service/modules/orchestration/proc/LaunchDagProcTest.java
@@ -29,6 +29,7 @@ import org.mockito.MockedStatic;
 import org.mockito.Mockito;
 import org.powermock.core.classloader.annotations.PrepareForTest;
 import org.powermock.modules.junit4.PowerMockRunner;
+import org.testng.Assert;
 import org.testng.annotations.AfterClass;
 import org.testng.annotations.BeforeClass;
 import org.testng.annotations.BeforeMethod;
@@ -59,6 +60,7 @@ import 
org.apache.gobblin.service.modules.orchestration.DagActionStore;
 import 
org.apache.gobblin.service.modules.orchestration.DagManagementStateStore;
 import org.apache.gobblin.service.modules.orchestration.DagManager;
 import org.apache.gobblin.service.modules.orchestration.DagManagerTest;
+import org.apache.gobblin.service.modules.orchestration.DagManagerUtils;
 import 
org.apache.gobblin.service.modules.orchestration.MySqlDagManagementStateStore;
 import 
org.apache.gobblin.service.modules.orchestration.MySqlDagManagementStateStoreTest;
 import 
org.apache.gobblin.service.modules.orchestration.task.DagProcessingEngineMetrics;
@@ -121,6 +123,7 @@ public class LaunchDagProcTest {
             .withValue(ConfigurationKeys.FLOW_NAME_KEY, 
ConfigValueFactory.fromAnyRef(flowName))
             .withValue(ConfigurationKeys.SPECEXECUTOR_INSTANCE_URI_KEY, 
ConfigValueFactory.fromAnyRef(
             MySqlDagManagementStateStoreTest.TEST_SPEC_EXECUTOR_URI)));
+    DagManager.DagId dagId = DagManagerUtils.generateDagId(dag);
     FlowCompilationValidationHelper flowCompilationValidationHelper = 
mock(FlowCompilationValidationHelper.class);
     
doReturn(com.google.common.base.Optional.of(dag)).when(flowCompilationValidationHelper).createExecutionPlanIfValid(any());
     List<SpecProducer<Spec>> specProducers = 
ReevaluateDagProcTest.getDagSpecProducers(dag);
@@ -144,6 +147,8 @@ public class LaunchDagProcTest {
     // FLOW_RUNNING is emitted exactly once per flow during the execution of 
LaunchDagProc
     Mockito.verify(this.mockedEventSubmitter, Mockito.times(1))
         .submit(eq(TimingEvent.FlowTimings.FLOW_RUNNING), anyMap());
+
+    
Assert.assertFalse(DagProcUtils.isDagFinished(this.dagManagementStateStore.getDag(dagId).get()));
   }
 
   @Test
@@ -157,6 +162,7 @@ public class LaunchDagProcTest {
             .withValue(ConfigurationKeys.FLOW_NAME_KEY,  
ConfigValueFactory.fromAnyRef(flowName))
             .withValue(ConfigurationKeys.SPECEXECUTOR_INSTANCE_URI_KEY, 
ConfigValueFactory.fromAnyRef(
                 MySqlDagManagementStateStoreTest.TEST_SPEC_EXECUTOR_URI)));
+    DagManager.DagId dagId = DagManagerUtils.generateDagId(dag);
     FlowCompilationValidationHelper flowCompilationValidationHelper = 
mock(FlowCompilationValidationHelper.class);
     
doReturn(com.google.common.base.Optional.of(dag)).when(flowCompilationValidationHelper).createExecutionPlanIfValid(any());
     LaunchDagProc launchDagProc = new LaunchDagProc(
@@ -174,6 +180,8 @@ public class LaunchDagProcTest {
     // FLOW_RUNNING is emitted exactly once per flow during the execution of 
LaunchDagProc
     Mockito.verify(this.mockedEventSubmitter, Mockito.times(1))
         .submit(eq(TimingEvent.FlowTimings.FLOW_RUNNING), anyMap());
+
+    
Assert.assertFalse(DagProcUtils.isDagFinished(this.dagManagementStateStore.getDag(dagId).get()));
   }
 
   // This creates a dag like this
diff --git 
a/gobblin-service/src/test/java/org/apache/gobblin/service/modules/orchestration/proc/ReevaluateDagProcTest.java
 
b/gobblin-service/src/test/java/org/apache/gobblin/service/modules/orchestration/proc/ReevaluateDagProcTest.java
index bcf16c7e40..651cc441df 100644
--- 
a/gobblin-service/src/test/java/org/apache/gobblin/service/modules/orchestration/proc/ReevaluateDagProcTest.java
+++ 
b/gobblin-service/src/test/java/org/apache/gobblin/service/modules/orchestration/proc/ReevaluateDagProcTest.java
@@ -128,6 +128,9 @@ public class ReevaluateDagProcTest {
     // assert that the first job is completed
     Assert.assertEquals(ExecutionStatus.COMPLETE,
         
this.dagManagementStateStore.getDag(dagId).get().getStartNodes().get(0).getValue().getExecutionStatus());
+
+    // note that only assertFalse can be tested on DagProcUtils.isDagFinished, 
because if it had returned true, dag must have been cleaned
+    
Assert.assertFalse(DagProcUtils.isDagFinished(this.dagManagementStateStore.getDag(dagId).get()));
   }
 
   // test when there does not exist a next job in the dag when the current 
job's reevaluate dag action is processed
@@ -161,7 +164,7 @@ public class ReevaluateDagProcTest {
     doReturn(new ImmutablePair<>(Optional.of(mockedDag.getNodes().get(0)), 
Optional.of(jobStatus)))
         .when(dagManagementStateStore).getDagNodeWithJobStatus(any());
 
-    Assert.assertTrue(dagManagementStateStore.hasRunningJobs(dagId));
+    
Assert.assertFalse(DagProcUtils.isDagFinished(this.dagManagementStateStore.getDag(dagId).get()));
 
     List<SpecProducer<Spec>> specProducers = getDagSpecProducers(dag);
 
@@ -180,8 +183,6 @@ public class ReevaluateDagProcTest {
 
     
Assert.assertEquals(Mockito.mockingDetails(this.dagManagementStateStore).getInvocations().stream()
         .filter(a -> 
a.getMethod().getName().equals("deleteDagAction")).count(), 1);
-
-    Assert.assertFalse(dagManagementStateStore.hasRunningJobs(dagId));
   }
 
   @Test
@@ -195,6 +196,7 @@ public class ReevaluateDagProcTest {
             .withValue(ConfigurationKeys.SPECEXECUTOR_INSTANCE_URI_KEY, 
ConfigValueFactory.fromAnyRef(
                 MySqlDagManagementStateStoreTest.TEST_SPEC_EXECUTOR_URI))
     );
+    DagManager.DagId dagId = DagManagerUtils.generateDagId(dag);
     List<SpecProducer<Spec>> specProducers = getDagSpecProducers(dag);
     dagManagementStateStore.addDag(dag);
     doReturn(new ImmutablePair<>(Optional.of(dag.getNodes().get(0)), 
Optional.empty()))
@@ -216,6 +218,8 @@ public class ReevaluateDagProcTest {
     Mockito.verify(dagManagementStateStore, 
Mockito.never()).deleteDagAction(any());
     Mockito.verify(dagManagementStateStore, 
Mockito.never()).addJobDagAction(any(), any(), anyLong(), any(),
         eq(DagActionStore.DagActionType.REEVALUATE));
+
+    
Assert.assertFalse(DagProcUtils.isDagFinished(this.dagManagementStateStore.getDag(dagId).get()));
   }
 
   @Test
@@ -229,6 +233,7 @@ public class ReevaluateDagProcTest {
             .withValue(ConfigurationKeys.SPECEXECUTOR_INSTANCE_URI_KEY, 
ConfigValueFactory.fromAnyRef(
                 MySqlDagManagementStateStoreTest.TEST_SPEC_EXECUTOR_URI))
     );
+    DagManager.DagId dagId = DagManagerUtils.generateDagId(dag);
     JobStatus jobStatus = 
JobStatus.builder().flowName(flowName).flowGroup(flowGroup).jobGroup(flowGroup)
         .jobName("job3").flowExecutionId(flowExecutionId).message("Test 
message").eventName(ExecutionStatus.COMPLETE.name())
         
.startTime(flowExecutionId).shouldRetry(false).orchestratedTime(flowExecutionId).build();
@@ -260,6 +265,8 @@ public class ReevaluateDagProcTest {
 
     // when there are parallel jobs to launch, they are not directly sent to 
spec producers, instead reevaluate dag action is created
     specProducers.forEach(sp -> Mockito.verify(sp, 
Mockito.never()).addSpec(any()));
+
+    
Assert.assertFalse(DagProcUtils.isDagFinished(this.dagManagementStateStore.getDag(dagId).get()));
   }
 
   @Test
@@ -271,8 +278,8 @@ public class ReevaluateDagProcTest {
             .withValue(ConfigurationKeys.FLOW_NAME_KEY, 
ConfigValueFactory.fromAnyRef(flowName))
             .withValue(ConfigurationKeys.JOB_GROUP_KEY, 
ConfigValueFactory.fromAnyRef(flowGroup))
             .withValue(ConfigurationKeys.SPECEXECUTOR_INSTANCE_URI_KEY, 
ConfigValueFactory.fromAnyRef(
-                MySqlDagManagementStateStoreTest.TEST_SPEC_EXECUTOR_URI))
-    );
+                MySqlDagManagementStateStoreTest.TEST_SPEC_EXECUTOR_URI)));
+    DagManager.DagId dagId = DagManagerUtils.generateDagId(dag);
     List<SpecProducer<Spec>> specProducers = getDagSpecProducers(dag);
     dagManagementStateStore.addDag(dag);
     // a job status with shouldRetry=true, it should have execution status = 
PENDING_RETRY
@@ -294,6 +301,7 @@ public class ReevaluateDagProcTest {
     specProducers.stream().skip(numOfLaunchedJobs) // separately verified 
`specProducers.get(0)`
         .forEach(sp -> Mockito.verify(sp, Mockito.never()).addSpec(any()));
 
+    
Assert.assertFalse(DagProcUtils.isDagFinished(this.dagManagementStateStore.getDag(dagId).get()));
     Mockito.verify(dagManagementStateStore, 
Mockito.never()).deleteDagAction(any());
     Mockito.verify(dagManagementStateStore, Mockito.never()).deleteDag(any());
     Mockito.verify(dagManagementStateStore, 
Mockito.never()).addJobDagAction(any(), any(), anyLong(), any(),
diff --git 
a/gobblin-service/src/test/java/org/apache/gobblin/service/modules/orchestration/proc/ResumeDagProcTest.java
 
b/gobblin-service/src/test/java/org/apache/gobblin/service/modules/orchestration/proc/ResumeDagProcTest.java
index 20e47145b6..150bed10e6 100644
--- 
a/gobblin-service/src/test/java/org/apache/gobblin/service/modules/orchestration/proc/ResumeDagProcTest.java
+++ 
b/gobblin-service/src/test/java/org/apache/gobblin/service/modules/orchestration/proc/ResumeDagProcTest.java
@@ -21,6 +21,7 @@ import java.io.IOException;
 import java.net.URISyntaxException;
 
 import org.mockito.Mockito;
+import org.testng.Assert;
 import org.testng.annotations.AfterClass;
 import org.testng.annotations.BeforeClass;
 import org.testng.annotations.Test;
@@ -109,5 +110,7 @@ public class ResumeDagProcTest {
      the result will be that after serializing/deserializing the test dag, the 
spec executor (and producer) type may change */
 
     Mockito.verify(this.dagManagementStateStore, 
Mockito.times(expectedNumOfResumedJobs)).addDagNodeState(any(), any());
+
+    
Assert.assertFalse(DagProcUtils.isDagFinished(this.dagManagementStateStore.getDag(dagId).get()));
   }
 }


Reply via email to