This is an automated email from the ASF dual-hosted git repository.
arjun4084346 pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/gobblin.git
The following commit(s) were added to refs/heads/master by this push:
new b28b468091 [GOBBLIN-2142] fix bug in determining if a dag is finished
or not (#4037)
b28b468091 is described below
commit b28b468091e26eb632292ee46fd8aa1e97bbb15e
Author: Arjun Singh Bora <[email protected]>
AuthorDate: Mon Aug 26 14:43:59 2024 -0700
[GOBBLIN-2142] fix bug in determining if a dag is finished or not (#4037)
* fix bug in determining if a dag is finished or not
* address review comments
---
.../orchestration/DagManagementStateStore.java | 6 -
.../modules/orchestration/DagManagerUtils.java | 3 +-
.../MySqlDagManagementStateStore.java | 8 -
.../modules/orchestration/proc/DagProcUtils.java | 77 ++++-
.../orchestration/proc/ReevaluateDagProc.java | 6 +-
.../modules/orchestration/DagManagerUtilsTest.java | 358 +++++++++++++++++++++
.../orchestration/proc/KillDagProcTest.java | 1 +
.../orchestration/proc/LaunchDagProcTest.java | 8 +
.../orchestration/proc/ReevaluateDagProcTest.java | 18 +-
.../orchestration/proc/ResumeDagProcTest.java | 3 +
10 files changed, 464 insertions(+), 24 deletions(-)
diff --git
a/gobblin-service/src/main/java/org/apache/gobblin/service/modules/orchestration/DagManagementStateStore.java
b/gobblin-service/src/main/java/org/apache/gobblin/service/modules/orchestration/DagManagementStateStore.java
index 139082545b..0a8514f274 100644
---
a/gobblin-service/src/main/java/org/apache/gobblin/service/modules/orchestration/DagManagementStateStore.java
+++
b/gobblin-service/src/main/java/org/apache/gobblin/service/modules/orchestration/DagManagementStateStore.java
@@ -143,12 +143,6 @@ public interface DagManagementStateStore {
*/
Optional<JobStatus> getJobStatus(DagNodeId dagNodeId);
- /**
- * Returns true if the {@link Dag} identified by the given {@link
org.apache.gobblin.service.modules.orchestration.DagManager.DagId}
- * has any running job, false otherwise.
- */
- boolean hasRunningJobs(DagManager.DagId dagId) throws IOException;
-
/**
* Check if an action exists in dagAction store by flow group, flow name,
flow execution id, and job name.
* @param flowGroup flow group for the dag action
diff --git
a/gobblin-service/src/main/java/org/apache/gobblin/service/modules/orchestration/DagManagerUtils.java
b/gobblin-service/src/main/java/org/apache/gobblin/service/modules/orchestration/DagManagerUtils.java
index 45ad84f91a..a6f6d527bf 100644
---
a/gobblin-service/src/main/java/org/apache/gobblin/service/modules/orchestration/DagManagerUtils.java
+++
b/gobblin-service/src/main/java/org/apache/gobblin/service/modules/orchestration/DagManagerUtils.java
@@ -219,6 +219,7 @@ public class DagManagerUtils {
switch (failureOption) {
case FINISH_RUNNING:
return new HashSet<>();
+ // todo - FINISH_ALL_POSSIBLE should probably `continue` not `break`
case FINISH_ALL_POSSIBLE:
default:
break;
@@ -228,7 +229,7 @@ public class DagManagerUtils {
return nextNodesToExecute;
}
- static FailureOption getFailureOption(Dag<JobExecutionPlan> dag) {
+ public static FailureOption getFailureOption(Dag<JobExecutionPlan> dag) {
if (dag.isEmpty()) {
return null;
}
diff --git
a/gobblin-service/src/main/java/org/apache/gobblin/service/modules/orchestration/MySqlDagManagementStateStore.java
b/gobblin-service/src/main/java/org/apache/gobblin/service/modules/orchestration/MySqlDagManagementStateStore.java
index c0984f835b..45ee013c7d 100644
---
a/gobblin-service/src/main/java/org/apache/gobblin/service/modules/orchestration/MySqlDagManagementStateStore.java
+++
b/gobblin-service/src/main/java/org/apache/gobblin/service/modules/orchestration/MySqlDagManagementStateStore.java
@@ -43,7 +43,6 @@ import org.apache.gobblin.runtime.spec_catalog.FlowCatalog;
import org.apache.gobblin.service.modules.flowgraph.Dag;
import org.apache.gobblin.service.modules.flowgraph.DagNodeId;
import org.apache.gobblin.service.modules.spec.JobExecutionPlan;
-import org.apache.gobblin.service.monitoring.FlowStatusGenerator;
import org.apache.gobblin.service.monitoring.JobStatus;
import org.apache.gobblin.service.monitoring.JobStatusRetriever;
import org.apache.gobblin.util.ConfigUtils;
@@ -209,13 +208,6 @@ public class MySqlDagManagementStateStore implements
DagManagementStateStore {
}
}
-
- @Override
- public boolean hasRunningJobs(DagManager.DagId dagId) throws IOException {
- return getDagNodes(dagId).stream()
- .anyMatch(node ->
!FlowStatusGenerator.FINISHED_STATUSES.contains(node.getValue().getExecutionStatus().name()));
- }
-
@Override
public boolean existsJobDagAction(String flowGroup, String flowName, long
flowExecutionId, String jobName,
DagActionStore.DagActionType dagActionType) throws IOException {
diff --git
a/gobblin-service/src/main/java/org/apache/gobblin/service/modules/orchestration/proc/DagProcUtils.java
b/gobblin-service/src/main/java/org/apache/gobblin/service/modules/orchestration/proc/DagProcUtils.java
index 289454502f..0a9f6dcd67 100644
---
a/gobblin-service/src/main/java/org/apache/gobblin/service/modules/orchestration/proc/DagProcUtils.java
+++
b/gobblin-service/src/main/java/org/apache/gobblin/service/modules/orchestration/proc/DagProcUtils.java
@@ -20,6 +20,7 @@ package org.apache.gobblin.service.modules.orchestration.proc;
import java.io.IOException;
import java.sql.SQLIntegrityConstraintViolationException;
import java.util.Collections;
+import java.util.HashSet;
import java.util.List;
import java.util.Map;
import java.util.Properties;
@@ -54,7 +55,7 @@ import
org.apache.gobblin.service.monitoring.JobStatusRetriever;
import org.apache.gobblin.util.ConfigUtils;
import org.apache.gobblin.util.PropertiesUtils;
-import static org.apache.gobblin.service.ExecutionStatus.CANCELLED;
+import static org.apache.gobblin.service.ExecutionStatus.*;
/**
@@ -175,7 +176,7 @@ public class DagProcUtils {
}
DagManagerUtils.getSpecProducer(dagNodeToCancel).cancelJob(dagNodeToCancel.getValue().getJobSpec().getUri(),
cancelJobArgs).get();
// add back the dag node with updated states in the store
- dagNodeToCancel.getValue().setExecutionStatus(CANCELLED);
+ dagNodeToCancel.getValue().setExecutionStatus(ExecutionStatus.CANCELLED);
dagManagementStateStore.addDagNodeState(dagNodeToCancel, dagId);
// send cancellation event after updating the state, because
cancellation event triggers a ReevaluateDagAction
// that will delete the dag. Due to race condition between adding dag
node and deleting dag, state store may get
@@ -291,4 +292,76 @@ public class DagProcUtils {
log.warn("Failed to delete dag action {}",
enforceFlowFinishDeadlineDagAction);
}
}
+
+ /**
+ * Returns true if all dag nodes are finished, and it is not possible to run
any new dag node.
+ * If failure option is {@link
org.apache.gobblin.service.modules.orchestration.DagManager.FailureOption#FINISH_RUNNING},
+ * no new jobs should be orchestrated, so even if some job can run, dag
should be considered finished.
+ */
+ public static boolean isDagFinished(Dag<JobExecutionPlan> dag) {
+ /*
+ The algo for this method is that it adds all the dag nodes into a set
`canRun` that signifies all the nodes that can
+ run in this dag. This also includes all the jobs that are completed. It
scans all the nodes and if the node is
+ completed it adds it to the `completed` set; if the node is
failed/cancelled it removes all its dependant nodes from
+ `canRun` set. In the end if there are more nodes that "canRun" than
"completed", dag is not finished.
+ For FINISH_RUNNING failure option, there is an additional condition that
all the remaining `canRun` jobs should already
+ be running/orchestrated/pending_retry/pending_resume. Basically they
should already be out of PENDING state, in order
+ for dag to be considered "NOT FINISHED".
+ */
+ List<Dag.DagNode<JobExecutionPlan>> nodes = dag.getNodes();
+ Set<Dag.DagNode<JobExecutionPlan>> canRun = new HashSet<>(nodes);
+ Set<Dag.DagNode<JobExecutionPlan>> completed = new HashSet<>();
+ boolean anyFailure = false;
+
+ for (Dag.DagNode<JobExecutionPlan> node : nodes) {
+ if (!canRun.contains(node)) {
+ continue;
+ }
+ ExecutionStatus status = node.getValue().getExecutionStatus();
+ if (status == ExecutionStatus.FAILED || status ==
ExecutionStatus.CANCELLED) {
+ anyFailure = true;
+ removeDescendantsFromCanRun(node, dag, canRun);
+ completed.add(node);
+ } else if (status == ExecutionStatus.COMPLETE) {
+ completed.add(node);
+ } else if (status == ExecutionStatus.PENDING) {
+ // Remove PENDING node if its parents are not in canRun, this means
remove the pending nodes also from canRun set
+ // if its parents cannot run
+ if (!areAllParentsInCanRun(node, canRun)) {
+ canRun.remove(node);
+ }
+ } else if (!(status == COMPILED || status == PENDING_RESUME || status ==
PENDING_RETRY || status == ORCHESTRATED ||
+ status == RUNNING)) {
+ throw new RuntimeException("Unexpected status " + status + " for dag
node " + node);
+ }
+ }
+
+ assert canRun.size() >= completed.size();
+
+ DagManager.FailureOption failureOption =
DagManagerUtils.getFailureOption(dag);
+
+ if (!anyFailure || failureOption ==
DagManager.FailureOption.FINISH_ALL_POSSIBLE) {
+ // In the end, check if there are more nodes in canRun than completed
+ return canRun.size() == completed.size();
+ } else if (failureOption == DagManager.FailureOption.FINISH_RUNNING) {
+ // if all the remaining jobs are pending/compiled (basically not started
yet) return true
+ canRun.removeAll(completed);
+ return canRun.stream().allMatch(node ->
(node.getValue().getExecutionStatus() == PENDING ||
node.getValue().getExecutionStatus() == COMPILED));
+ } else {
+ throw new RuntimeException("Unexpected failure option " + failureOption);
+ }
+ }
+
+ private static void
removeDescendantsFromCanRun(Dag.DagNode<JobExecutionPlan> node,
Dag<JobExecutionPlan> dag,
+ Set<Dag.DagNode<JobExecutionPlan>> canRun) {
+ for (Dag.DagNode<JobExecutionPlan> child : dag.getChildren(node)) {
+ canRun.remove(child);
+ removeDescendantsFromCanRun(child, dag, canRun); // Recursively remove
all descendants
+ }
+ }
+
+ private static boolean areAllParentsInCanRun(Dag.DagNode<JobExecutionPlan>
node,
+ Set<Dag.DagNode<JobExecutionPlan>> canRun) {
+ return node.getParentNodes() == null ||
canRun.containsAll(node.getParentNodes());
+ }
}
diff --git
a/gobblin-service/src/main/java/org/apache/gobblin/service/modules/orchestration/proc/ReevaluateDagProc.java
b/gobblin-service/src/main/java/org/apache/gobblin/service/modules/orchestration/proc/ReevaluateDagProc.java
index d55d5a425e..ef554f6418 100644
---
a/gobblin-service/src/main/java/org/apache/gobblin/service/modules/orchestration/proc/ReevaluateDagProc.java
+++
b/gobblin-service/src/main/java/org/apache/gobblin/service/modules/orchestration/proc/ReevaluateDagProc.java
@@ -104,7 +104,7 @@ public class ReevaluateDagProc extends
DagProc<Pair<Optional<Dag.DagNode<JobExec
// set to PASS, which would be incorrect.
dag.setFlowEvent(null);
DagProcUtils.submitJobToExecutor(dagManagementStateStore, dagNode,
getDagId());
- } else if (!dagManagementStateStore.hasRunningJobs(getDagId())) {
+ } else if (DagProcUtils.isDagFinished(dag)) {
if (dag.getFlowEvent() == null) {
// If the dag flow event is not set and there are no more jobs
running, then it is successful
// also note that `onJobFinish` method does whatever is required to do
after job finish, determining a Dag's
@@ -160,7 +160,9 @@ public class ReevaluateDagProc extends
DagProc<Pair<Optional<Dag.DagNode<JobExec
break;
case COMPLETE:
dagManagementStateStore.getDagManagerMetrics().incrementExecutorSuccess(dagNode);
- DagProcUtils.submitNextNodes(dagManagementStateStore, dag, getDagId());
+ if (!DagProcUtils.isDagFinished(dag)) { // this may fail when dag
failure option is finish_running and some dag node has failed
+ DagProcUtils.submitNextNodes(dagManagementStateStore, dag,
getDagId());
+ }
break;
default:
log.warn("It should not reach here. Job status {} is unexpected.",
executionStatus);
diff --git
a/gobblin-service/src/test/java/org/apache/gobblin/service/modules/orchestration/DagManagerUtilsTest.java
b/gobblin-service/src/test/java/org/apache/gobblin/service/modules/orchestration/DagManagerUtilsTest.java
index 63c521749e..ab6509e30d 100644
---
a/gobblin-service/src/test/java/org/apache/gobblin/service/modules/orchestration/DagManagerUtilsTest.java
+++
b/gobblin-service/src/test/java/org/apache/gobblin/service/modules/orchestration/DagManagerUtilsTest.java
@@ -17,15 +17,48 @@
package org.apache.gobblin.service.modules.orchestration;
+import java.net.URI;
+import java.net.URISyntaxException;
+import java.util.ArrayList;
+import java.util.Arrays;
+import java.util.Collections;
+import java.util.List;
+
import org.testng.Assert;
import org.testng.annotations.Test;
+import com.typesafe.config.Config;
+import com.typesafe.config.ConfigFactory;
+import com.typesafe.config.ConfigValueFactory;
+
+import org.apache.gobblin.config.ConfigBuilder;
+import org.apache.gobblin.configuration.ConfigurationKeys;
import org.apache.gobblin.runtime.api.JobSpec;
+import org.apache.gobblin.runtime.api.SpecExecutor;
+import org.apache.gobblin.runtime.spec_executorInstance.MockedSpecExecutor;
+import org.apache.gobblin.service.ExecutionStatus;
import org.apache.gobblin.service.modules.flowgraph.Dag;
+import org.apache.gobblin.service.modules.orchestration.proc.DagProcUtils;
import org.apache.gobblin.service.modules.spec.JobExecutionPlan;
+import org.apache.gobblin.service.modules.spec.JobExecutionPlanDagFactory;
+import org.apache.gobblin.util.ConfigUtils;
+
+import static org.apache.gobblin.service.ExecutionStatus.*;
public class DagManagerUtilsTest {
+ static String id = "1";
+ static String flowGroup = "fg";
+ static String flowName = "fn";
+ static long flowExecutionId = 12345L;
+ static String flowFailureOption =
DagManager.FailureOption.FINISH_ALL_POSSIBLE.name();
+ static String proxyUser = "user5";
+ static Config additionalConfig = ConfigFactory.empty()
+ .withValue(ConfigurationKeys.FLOW_GROUP_KEY,
ConfigValueFactory.fromAnyRef(flowGroup))
+ .withValue(ConfigurationKeys.FLOW_NAME_KEY,
ConfigValueFactory.fromAnyRef(flowName))
+ .withValue(ConfigurationKeys.JOB_GROUP_KEY,
ConfigValueFactory.fromAnyRef(flowGroup))
+ .withValue(ConfigurationKeys.SPECEXECUTOR_INSTANCE_URI_KEY,
ConfigValueFactory.fromAnyRef(
+ MySqlDagManagementStateStoreTest.TEST_SPEC_EXECUTOR_URI));
@Test
public void testGetJobSpecFromDag() throws Exception {
@@ -38,4 +71,329 @@ public class DagManagerUtilsTest {
Assert.assertEquals(jobSpec.getConfigAsProperties().get(key),
jobSpec.getConfig().getString(key));
}
}
+
+ @Test
+ public void testIsDagFinishedSingleNode() throws URISyntaxException {
+ Dag<JobExecutionPlan> dag =
+ DagManagerTest.buildDag(id, flowExecutionId, flowFailureOption, 1,
proxyUser, additionalConfig);
+
+ setJobStatuses(dag, Collections.singletonList(COMPLETE));
+ Assert.assertTrue(DagProcUtils.isDagFinished(dag));
+
+ setJobStatuses(dag, Collections.singletonList(FAILED));
+ Assert.assertTrue(DagProcUtils.isDagFinished(dag));
+
+ setJobStatuses(dag, Collections.singletonList(CANCELLED));
+ Assert.assertTrue(DagProcUtils.isDagFinished(dag));
+
+ setJobStatuses(dag, Collections.singletonList(PENDING));
+ Assert.assertFalse(DagProcUtils.isDagFinished(dag));
+
+ setJobStatuses(dag, Collections.singletonList(PENDING_RETRY));
+ Assert.assertFalse(DagProcUtils.isDagFinished(dag));
+
+ setJobStatuses(dag, Collections.singletonList(PENDING_RESUME));
+ Assert.assertFalse(DagProcUtils.isDagFinished(dag));
+
+ setJobStatuses(dag, Collections.singletonList(ORCHESTRATED));
+ Assert.assertFalse(DagProcUtils.isDagFinished(dag));
+
+ setJobStatuses(dag, Collections.singletonList(RUNNING));
+ Assert.assertFalse(DagProcUtils.isDagFinished(dag));
+ }
+
+ @Test
+ public void testIsDagFinishedTwoNodes() throws URISyntaxException {
+ Dag<JobExecutionPlan> dag =
+ DagManagerTest.buildDag(id, flowExecutionId, flowFailureOption, 2,
proxyUser, additionalConfig);
+
+ setJobStatuses(dag, Arrays.asList(COMPLETE, PENDING));
+ Assert.assertFalse(DagProcUtils.isDagFinished(dag));
+
+ setJobStatuses(dag, Arrays.asList(COMPLETE, FAILED));
+ Assert.assertTrue(DagProcUtils.isDagFinished(dag));
+
+ setJobStatuses(dag, Arrays.asList(FAILED, PENDING));
+ Assert.assertTrue(DagProcUtils.isDagFinished(dag));
+
+ setJobStatuses(dag, Arrays.asList(CANCELLED, PENDING));
+ Assert.assertTrue(DagProcUtils.isDagFinished(dag));
+ }
+
+ @Test
+ public void testIsDagFinishedThreeNodes() throws URISyntaxException {
+ Dag<JobExecutionPlan> dag = buildComplexDag3();
+
+ setJobStatuses(dag, Arrays.asList(COMPLETE, PENDING, PENDING));
+ Assert.assertFalse(DagProcUtils.isDagFinished(dag));
+
+ setJobStatuses(dag, Arrays.asList(COMPLETE, FAILED, PENDING));
+ Assert.assertTrue(DagProcUtils.isDagFinished(dag));
+
+ setJobStatuses(dag, Arrays.asList(COMPLETE, CANCELLED, PENDING));
+ Assert.assertTrue(DagProcUtils.isDagFinished(dag));
+ }
+
+ @Test
+ public void testIsDagFinishedFourNodes() throws URISyntaxException {
+ Dag<JobExecutionPlan> dag = buildLinearDagOf4Nodes();
+
+ setJobStatuses(dag, Arrays.asList(COMPLETE, PENDING, PENDING, PENDING));
+ Assert.assertFalse(DagProcUtils.isDagFinished(dag));
+
+ setJobStatuses(dag, Arrays.asList(FAILED, PENDING, PENDING, PENDING));
+ Assert.assertTrue(DagProcUtils.isDagFinished(dag));
+
+ setJobStatuses(dag, Arrays.asList(CANCELLED, PENDING, PENDING, PENDING));
+ Assert.assertTrue(DagProcUtils.isDagFinished(dag));
+
+ setJobStatuses(dag, Arrays.asList(PENDING, PENDING, PENDING, PENDING));
+ Assert.assertFalse(DagProcUtils.isDagFinished(dag));
+ }
+
+ @Test
+ public void testIsDagFinishedMultiNodes() throws URISyntaxException {
+ Dag<JobExecutionPlan> dag = buildComplexDag1();
+ setJobStatuses(dag, Arrays.asList(COMPLETE, COMPLETE, COMPLETE, COMPLETE,
COMPLETE, COMPLETE, COMPLETE, COMPLETE, COMPLETE, COMPLETE));
+ Assert.assertTrue(DagProcUtils.isDagFinished(dag));
+ Collections.shuffle(dag.getNodes());
+ Assert.assertTrue(DagProcUtils.isDagFinished(dag));
+
+ Dag<JobExecutionPlan> dag2 = buildComplexDag1();
+ setJobStatuses(dag2, Arrays.asList(COMPLETE, COMPLETE, COMPLETE, COMPLETE,
PENDING, COMPLETE, COMPLETE, PENDING, COMPLETE, PENDING));
+ Assert.assertFalse(DagProcUtils.isDagFinished(dag2));
+ Collections.shuffle(dag2.getNodes());
+ Assert.assertFalse(DagProcUtils.isDagFinished(dag2));
+
+ Dag<JobExecutionPlan> dag3 = buildComplexDag1();
+ setJobStatuses(dag3, Arrays.asList(FAILED, COMPLETE, COMPLETE, COMPLETE,
PENDING, COMPLETE, COMPLETE, PENDING, COMPLETE, PENDING));
+ Assert.assertTrue(DagProcUtils.isDagFinished(dag3));
+ Collections.shuffle(dag3.getNodes());
+ Assert.assertTrue(DagProcUtils.isDagFinished(dag3));
+
+ Dag<JobExecutionPlan> dag4 = buildComplexDag1();
+ setJobStatuses(dag4, Arrays.asList(COMPLETE, COMPLETE, COMPLETE, COMPLETE,
COMPLETE, CANCELLED, COMPLETE, PENDING, PENDING, PENDING));
+ Assert.assertFalse(DagProcUtils.isDagFinished(dag4));
+ Collections.shuffle(dag4.getNodes());
+ Assert.assertFalse(DagProcUtils.isDagFinished(dag4));
+
+ Dag<JobExecutionPlan> dag5 = buildComplexDag1();
+ setJobStatuses(dag5, Arrays.asList(COMPLETE, COMPLETE, COMPLETE, COMPLETE,
COMPLETE, CANCELLED, COMPLETE, COMPLETE, PENDING, PENDING));
+ Assert.assertTrue(DagProcUtils.isDagFinished(dag5));
+ Collections.shuffle(dag5.getNodes());
+ Assert.assertTrue(DagProcUtils.isDagFinished(dag5));
+
+ Dag<JobExecutionPlan> dag6 = buildComplexDag1();
+ setJobStatuses(dag6, Arrays.asList(COMPLETE, COMPLETE, COMPLETE, COMPLETE,
COMPLETE, PENDING_RESUME, COMPLETE, COMPLETE, PENDING, PENDING));
+ Assert.assertFalse(DagProcUtils.isDagFinished(dag6));
+ Collections.shuffle(dag6.getNodes());
+ Assert.assertFalse(DagProcUtils.isDagFinished(dag6));
+
+ Dag<JobExecutionPlan> dag7 = buildComplexDag1();
+ setJobStatuses(dag7, Arrays.asList(COMPLETE, COMPLETE, COMPLETE, COMPLETE,
COMPLETE, PENDING_RETRY, COMPLETE, COMPLETE, PENDING, PENDING));
+ Assert.assertFalse(DagProcUtils.isDagFinished(dag7));
+ Collections.shuffle(dag7.getNodes());
+ Assert.assertFalse(DagProcUtils.isDagFinished(dag7));
+
+ Dag<JobExecutionPlan> dag8 = buildComplexDag1();
+ setJobStatuses(dag8, Arrays.asList(COMPLETE, COMPLETE, COMPLETE, COMPLETE,
COMPLETE, RUNNING, COMPLETE, COMPLETE, PENDING, PENDING));
+ Assert.assertFalse(DagProcUtils.isDagFinished(dag8));
+ Collections.shuffle(dag8.getNodes());
+ Assert.assertFalse(DagProcUtils.isDagFinished(dag8));
+
+ Dag<JobExecutionPlan> dag9 = buildComplexDag1();
+ setJobStatuses(dag9, Arrays.asList(COMPLETE, COMPLETE, COMPLETE, FAILED,
COMPLETE, COMPLETE, PENDING, COMPLETE, PENDING, COMPLETE));
+ Assert.assertFalse(DagProcUtils.isDagFinished(dag9));
+ Collections.shuffle(dag9.getNodes());
+ Assert.assertFalse(DagProcUtils.isDagFinished(dag9));
+ }
+
+ @Test
+ public void testIsDagFinishedWithFinishRunningFailureOptionTwoNodes() throws
URISyntaxException {
+ Dag<JobExecutionPlan> dag =
+ DagManagerTest.buildDag(id, flowExecutionId, flowFailureOption, 2,
proxyUser, additionalConfig);
+
+ setJobStatuses(dag, Arrays.asList(FAILED, PENDING));
+ Assert.assertTrue(DagProcUtils.isDagFinished(dag));
+
+ setJobStatuses(dag, Arrays.asList(CANCELLED, PENDING));
+ Assert.assertTrue(DagProcUtils.isDagFinished(dag));
+ }
+
+ @Test
+ public void testIsDagFinishedWithFinishRunningFailureOptionMultiNodes()
throws URISyntaxException {
+ Dag<JobExecutionPlan> dag =
buildComplexDagWithFinishRunningFailureOption();
+
+ setJobStatuses(dag, Arrays.asList(COMPLETE, CANCELLED, COMPLETE, PENDING,
PENDING));
+ Assert.assertTrue(DagProcUtils.isDagFinished(dag));
+
+ setJobStatuses(dag, Arrays.asList(COMPLETE, CANCELLED, COMPLETE, RUNNING,
PENDING));
+ Assert.assertFalse(DagProcUtils.isDagFinished(dag));
+ }
+
+ private void setJobStatuses(Dag<JobExecutionPlan> dag, List<ExecutionStatus>
statuses) {
+ int i=0;
+ for (ExecutionStatus status : statuses) {
+ dag.getNodes().get(i++).getValue().setExecutionStatus(status);
+ }
+ }
+
+ // This creates a dag like this
+ // D0 D1 D2 D3
+ // | | | \ |
+ // D4 D5 | D6
+ // | | \|
+ // D7 | D8
+ // \ | /
+ // D9
+
+ public static Dag<JobExecutionPlan> buildComplexDag1() throws
URISyntaxException {
+ List<JobExecutionPlan> jobExecutionPlans = new ArrayList<>();
+ String id = "1";
+ String flowGroup = "fg";
+ String flowName = "fn";
+ long flowExecutionId = 12345L;
+ String flowFailureOption =
DagManager.FailureOption.FINISH_ALL_POSSIBLE.name();
+ String proxyUser = "user5";
+ Config additionalConfig = ConfigFactory.empty()
+ .withValue(ConfigurationKeys.FLOW_GROUP_KEY,
ConfigValueFactory.fromAnyRef(flowGroup))
+ .withValue(ConfigurationKeys.FLOW_NAME_KEY,
ConfigValueFactory.fromAnyRef(flowName))
+ .withValue(ConfigurationKeys.JOB_GROUP_KEY,
ConfigValueFactory.fromAnyRef(flowGroup))
+ .withValue(ConfigurationKeys.SPECEXECUTOR_INSTANCE_URI_KEY,
ConfigValueFactory.fromAnyRef(
+ MySqlDagManagementStateStoreTest.TEST_SPEC_EXECUTOR_URI));
+
+ for (int i = 0; i < 10; i++) {
+ String suffix = Integer.toString(i);
+ Config jobConfig = ConfigBuilder.create().
+ addPrimitive(ConfigurationKeys.FLOW_GROUP_KEY, "group" + id).
+ addPrimitive(ConfigurationKeys.FLOW_NAME_KEY, "flow" + id).
+ addPrimitive(ConfigurationKeys.FLOW_EXECUTION_ID_KEY,
flowExecutionId).
+ addPrimitive(ConfigurationKeys.JOB_GROUP_KEY, "group" + id).
+ addPrimitive(ConfigurationKeys.JOB_NAME_KEY, "job" + suffix).
+ addPrimitive(ConfigurationKeys.FLOW_FAILURE_OPTION,
flowFailureOption).
+ addPrimitive(AzkabanProjectConfig.USER_TO_PROXY, proxyUser).build();
+ jobConfig = additionalConfig.withFallback(jobConfig);
+ if (i == 4) {
+ jobConfig = jobConfig.withValue(ConfigurationKeys.JOB_DEPENDENCIES,
ConfigValueFactory.fromAnyRef("job0"));
+ } else if (i == 5) {
+ jobConfig = jobConfig.withValue(ConfigurationKeys.JOB_DEPENDENCIES,
ConfigValueFactory.fromAnyRef("job1"));
+ } if (i == 6) {
+ jobConfig = jobConfig.withValue(ConfigurationKeys.JOB_DEPENDENCIES,
ConfigValueFactory.fromAnyRef("job2,job3"));
+ } else if (i == 7) {
+ jobConfig = jobConfig.withValue(ConfigurationKeys.JOB_DEPENDENCIES,
ConfigValueFactory.fromAnyRef("job4"));
+ } else if (i == 8) {
+ jobConfig = jobConfig.withValue(ConfigurationKeys.JOB_DEPENDENCIES,
ConfigValueFactory.fromAnyRef("job5,job2"));
+ } else if (i == 9) {
+ jobConfig = jobConfig.withValue(ConfigurationKeys.JOB_DEPENDENCIES,
ConfigValueFactory.fromAnyRef("job7,job5,job8"));
+ }
+ JobSpec js = JobSpec.builder("test_job" +
suffix).withVersion(suffix).withConfig(jobConfig).
+ withTemplate(new URI("job" + suffix)).build();
+ SpecExecutor specExecutor =
MockedSpecExecutor.createDummySpecExecutor(new URI(
+ ConfigUtils.getString(additionalConfig,
ConfigurationKeys.SPECEXECUTOR_INSTANCE_URI_KEY,"job" + i)));
+ JobExecutionPlan jobExecutionPlan = new JobExecutionPlan(js,
specExecutor);
+ jobExecutionPlans.add(jobExecutionPlan);
+ }
+ return new JobExecutionPlanDagFactory().createDag(jobExecutionPlans);
+ }
+
+ // This creates a dag like this
+ // D0 -> D1 -> D2 -> D3
+ public static Dag<JobExecutionPlan> buildLinearDagOf4Nodes() throws
URISyntaxException {
+ List<JobExecutionPlan> jobExecutionPlans = new ArrayList<>();
+
+ for (int i = 0; i < 4; i++) {
+ String suffix = Integer.toString(i);
+ Config jobConfig = ConfigBuilder.create().
+ addPrimitive(ConfigurationKeys.FLOW_GROUP_KEY, "group" + id).
+ addPrimitive(ConfigurationKeys.FLOW_NAME_KEY, "flow" + id).
+ addPrimitive(ConfigurationKeys.FLOW_EXECUTION_ID_KEY,
flowExecutionId).
+ addPrimitive(ConfigurationKeys.JOB_GROUP_KEY, "group" + id).
+ addPrimitive(ConfigurationKeys.JOB_NAME_KEY, "job" + suffix).
+ addPrimitive(ConfigurationKeys.FLOW_FAILURE_OPTION,
flowFailureOption).
+ addPrimitive(AzkabanProjectConfig.USER_TO_PROXY, proxyUser).build();
+ jobConfig = additionalConfig.withFallback(jobConfig);
+ if (i == 1) {
+ jobConfig = jobConfig.withValue(ConfigurationKeys.JOB_DEPENDENCIES,
ConfigValueFactory.fromAnyRef("job0"));
+ } else if (i == 2) {
+ jobConfig = jobConfig.withValue(ConfigurationKeys.JOB_DEPENDENCIES,
ConfigValueFactory.fromAnyRef("job1"));
+ } if (i == 3) {
+ jobConfig = jobConfig.withValue(ConfigurationKeys.JOB_DEPENDENCIES,
ConfigValueFactory.fromAnyRef("job2"));
+ }
+ JobSpec js = JobSpec.builder("test_job" +
suffix).withVersion(suffix).withConfig(jobConfig).
+ withTemplate(new URI("job" + suffix)).build();
+ SpecExecutor specExecutor =
MockedSpecExecutor.createDummySpecExecutor(new URI(
+ ConfigUtils.getString(additionalConfig,
ConfigurationKeys.SPECEXECUTOR_INSTANCE_URI_KEY,"job" + i)));
+ JobExecutionPlan jobExecutionPlan = new JobExecutionPlan(js,
specExecutor);
+ jobExecutionPlans.add(jobExecutionPlan);
+ }
+ return new JobExecutionPlanDagFactory().createDag(jobExecutionPlans);
+ }
+
+ // This creates a dag like this
+ // D0 D1
+ // \/
+ // D2
+ public static Dag<JobExecutionPlan> buildComplexDag3() throws
URISyntaxException {
+ List<JobExecutionPlan> jobExecutionPlans = new ArrayList<>();
+
+ for (int i = 0; i < 3; i++) {
+ String suffix = Integer.toString(i);
+ Config jobConfig = ConfigBuilder.create().
+ addPrimitive(ConfigurationKeys.FLOW_GROUP_KEY, "group" + id).
+ addPrimitive(ConfigurationKeys.FLOW_NAME_KEY, "flow" + id).
+ addPrimitive(ConfigurationKeys.FLOW_EXECUTION_ID_KEY,
flowExecutionId).
+ addPrimitive(ConfigurationKeys.JOB_GROUP_KEY, "group" + id).
+ addPrimitive(ConfigurationKeys.JOB_NAME_KEY, "job" + suffix).
+ addPrimitive(ConfigurationKeys.FLOW_FAILURE_OPTION,
flowFailureOption).
+ addPrimitive(AzkabanProjectConfig.USER_TO_PROXY, proxyUser).build();
+ jobConfig = additionalConfig.withFallback(jobConfig);
+ if (i == 2) {
+ jobConfig = jobConfig.withValue(ConfigurationKeys.JOB_DEPENDENCIES,
ConfigValueFactory.fromAnyRef("job0,job1"));
+ }
+ JobSpec js = JobSpec.builder("test_job" +
suffix).withVersion(suffix).withConfig(jobConfig).
+ withTemplate(new URI("job" + suffix)).build();
+ SpecExecutor specExecutor =
MockedSpecExecutor.createDummySpecExecutor(new URI(
+ ConfigUtils.getString(additionalConfig,
ConfigurationKeys.SPECEXECUTOR_INSTANCE_URI_KEY,"job" + i)));
+ JobExecutionPlan jobExecutionPlan = new JobExecutionPlan(js,
specExecutor);
+ jobExecutionPlans.add(jobExecutionPlan);
+ }
+ return new JobExecutionPlanDagFactory().createDag(jobExecutionPlans);
+ }
+
+ // This creates a dag like this
+ // D0
+ // / \
+ // D1 D2
+ // / \
+ // D3 D4
+ public static Dag<JobExecutionPlan>
buildComplexDagWithFinishRunningFailureOption() throws URISyntaxException {
+ List<JobExecutionPlan> jobExecutionPlans = new ArrayList<>();
+
+ for (int i = 0; i < 5; i++) {
+ String suffix = Integer.toString(i);
+ Config jobConfig = ConfigBuilder.create().
+ addPrimitive(ConfigurationKeys.FLOW_GROUP_KEY, "group" + id).
+ addPrimitive(ConfigurationKeys.FLOW_NAME_KEY, "flow" + id).
+ addPrimitive(ConfigurationKeys.FLOW_EXECUTION_ID_KEY,
flowExecutionId).
+ addPrimitive(ConfigurationKeys.JOB_GROUP_KEY, "group" + id).
+ addPrimitive(ConfigurationKeys.JOB_NAME_KEY, "job" + suffix).
+ addPrimitive(ConfigurationKeys.FLOW_FAILURE_OPTION,
DagManager.FailureOption.FINISH_RUNNING.name()).
+ addPrimitive(AzkabanProjectConfig.USER_TO_PROXY, proxyUser).build();
+ jobConfig = additionalConfig.withFallback(jobConfig);
+ if (i == 1) {
+ jobConfig = jobConfig.withValue(ConfigurationKeys.JOB_DEPENDENCIES,
ConfigValueFactory.fromAnyRef("job0"));
+ } else if (i == 2) {
+ jobConfig = jobConfig.withValue(ConfigurationKeys.JOB_DEPENDENCIES,
ConfigValueFactory.fromAnyRef("job0"));
+ } else if (i == 3 || i == 4) {
+ jobConfig = jobConfig.withValue(ConfigurationKeys.JOB_DEPENDENCIES,
ConfigValueFactory.fromAnyRef("job2"));
+ }
+ JobSpec js = JobSpec.builder("test_job" +
suffix).withVersion(suffix).withConfig(jobConfig).
+ withTemplate(new URI("job" + suffix)).build();
+ SpecExecutor specExecutor =
MockedSpecExecutor.createDummySpecExecutor(new URI(
+ ConfigUtils.getString(additionalConfig,
ConfigurationKeys.SPECEXECUTOR_INSTANCE_URI_KEY,"job" + i)));
+ JobExecutionPlan jobExecutionPlan = new JobExecutionPlan(js,
specExecutor);
+ jobExecutionPlans.add(jobExecutionPlan);
+ }
+ return new JobExecutionPlanDagFactory().createDag(jobExecutionPlans);
+ }
}
diff --git
a/gobblin-service/src/test/java/org/apache/gobblin/service/modules/orchestration/proc/KillDagProcTest.java
b/gobblin-service/src/test/java/org/apache/gobblin/service/modules/orchestration/proc/KillDagProcTest.java
index cd34b47cb9..33dc8f49e2 100644
---
a/gobblin-service/src/test/java/org/apache/gobblin/service/modules/orchestration/proc/KillDagProcTest.java
+++
b/gobblin-service/src/test/java/org/apache/gobblin/service/modules/orchestration/proc/KillDagProcTest.java
@@ -180,6 +180,7 @@ public class KillDagProcTest {
.withValue(ConfigurationKeys.FLOW_GROUP_KEY,
ConfigValueFactory.fromAnyRef("fg"))
.withValue(ConfigurationKeys.SPECEXECUTOR_INSTANCE_URI_KEY,
ConfigValueFactory.fromAnyRef(
MySqlDagManagementStateStoreTest.TEST_SPEC_EXECUTOR_URI)));
+ DagManager.DagId dagId = DagManagerUtils.generateDagId(dag);
FlowCompilationValidationHelper flowCompilationValidationHelper =
mock(FlowCompilationValidationHelper.class);
JobStatus
jobStatus =
JobStatus.builder().flowName("job0").flowGroup("fg").jobGroup("fg").jobName("job0").flowExecutionId(flowExecutionId).
diff --git
a/gobblin-service/src/test/java/org/apache/gobblin/service/modules/orchestration/proc/LaunchDagProcTest.java
b/gobblin-service/src/test/java/org/apache/gobblin/service/modules/orchestration/proc/LaunchDagProcTest.java
index 99caa9a005..ab426dedf1 100644
---
a/gobblin-service/src/test/java/org/apache/gobblin/service/modules/orchestration/proc/LaunchDagProcTest.java
+++
b/gobblin-service/src/test/java/org/apache/gobblin/service/modules/orchestration/proc/LaunchDagProcTest.java
@@ -29,6 +29,7 @@ import org.mockito.MockedStatic;
import org.mockito.Mockito;
import org.powermock.core.classloader.annotations.PrepareForTest;
import org.powermock.modules.junit4.PowerMockRunner;
+import org.testng.Assert;
import org.testng.annotations.AfterClass;
import org.testng.annotations.BeforeClass;
import org.testng.annotations.BeforeMethod;
@@ -59,6 +60,7 @@ import
org.apache.gobblin.service.modules.orchestration.DagActionStore;
import
org.apache.gobblin.service.modules.orchestration.DagManagementStateStore;
import org.apache.gobblin.service.modules.orchestration.DagManager;
import org.apache.gobblin.service.modules.orchestration.DagManagerTest;
+import org.apache.gobblin.service.modules.orchestration.DagManagerUtils;
import
org.apache.gobblin.service.modules.orchestration.MySqlDagManagementStateStore;
import
org.apache.gobblin.service.modules.orchestration.MySqlDagManagementStateStoreTest;
import
org.apache.gobblin.service.modules.orchestration.task.DagProcessingEngineMetrics;
@@ -121,6 +123,7 @@ public class LaunchDagProcTest {
.withValue(ConfigurationKeys.FLOW_NAME_KEY,
ConfigValueFactory.fromAnyRef(flowName))
.withValue(ConfigurationKeys.SPECEXECUTOR_INSTANCE_URI_KEY,
ConfigValueFactory.fromAnyRef(
MySqlDagManagementStateStoreTest.TEST_SPEC_EXECUTOR_URI)));
+ DagManager.DagId dagId = DagManagerUtils.generateDagId(dag);
FlowCompilationValidationHelper flowCompilationValidationHelper =
mock(FlowCompilationValidationHelper.class);
doReturn(com.google.common.base.Optional.of(dag)).when(flowCompilationValidationHelper).createExecutionPlanIfValid(any());
List<SpecProducer<Spec>> specProducers =
ReevaluateDagProcTest.getDagSpecProducers(dag);
@@ -144,6 +147,8 @@ public class LaunchDagProcTest {
// FLOW_RUNNING is emitted exactly once per flow during the execution of
LaunchDagProc
Mockito.verify(this.mockedEventSubmitter, Mockito.times(1))
.submit(eq(TimingEvent.FlowTimings.FLOW_RUNNING), anyMap());
+
+
Assert.assertFalse(DagProcUtils.isDagFinished(this.dagManagementStateStore.getDag(dagId).get()));
}
@Test
@@ -157,6 +162,7 @@ public class LaunchDagProcTest {
.withValue(ConfigurationKeys.FLOW_NAME_KEY,
ConfigValueFactory.fromAnyRef(flowName))
.withValue(ConfigurationKeys.SPECEXECUTOR_INSTANCE_URI_KEY,
ConfigValueFactory.fromAnyRef(
MySqlDagManagementStateStoreTest.TEST_SPEC_EXECUTOR_URI)));
+ DagManager.DagId dagId = DagManagerUtils.generateDagId(dag);
FlowCompilationValidationHelper flowCompilationValidationHelper =
mock(FlowCompilationValidationHelper.class);
doReturn(com.google.common.base.Optional.of(dag)).when(flowCompilationValidationHelper).createExecutionPlanIfValid(any());
LaunchDagProc launchDagProc = new LaunchDagProc(
@@ -174,6 +180,8 @@ public class LaunchDagProcTest {
// FLOW_RUNNING is emitted exactly once per flow during the execution of
LaunchDagProc
Mockito.verify(this.mockedEventSubmitter, Mockito.times(1))
.submit(eq(TimingEvent.FlowTimings.FLOW_RUNNING), anyMap());
+
+
Assert.assertFalse(DagProcUtils.isDagFinished(this.dagManagementStateStore.getDag(dagId).get()));
}
// This creates a dag like this
diff --git
a/gobblin-service/src/test/java/org/apache/gobblin/service/modules/orchestration/proc/ReevaluateDagProcTest.java
b/gobblin-service/src/test/java/org/apache/gobblin/service/modules/orchestration/proc/ReevaluateDagProcTest.java
index bcf16c7e40..651cc441df 100644
---
a/gobblin-service/src/test/java/org/apache/gobblin/service/modules/orchestration/proc/ReevaluateDagProcTest.java
+++
b/gobblin-service/src/test/java/org/apache/gobblin/service/modules/orchestration/proc/ReevaluateDagProcTest.java
@@ -128,6 +128,9 @@ public class ReevaluateDagProcTest {
// assert that the first job is completed
Assert.assertEquals(ExecutionStatus.COMPLETE,
this.dagManagementStateStore.getDag(dagId).get().getStartNodes().get(0).getValue().getExecutionStatus());
+
+ // note that only assertFalse can be tested on DagProcUtils.isDagFinished,
because if it had returned true, dag must have been cleaned
+
Assert.assertFalse(DagProcUtils.isDagFinished(this.dagManagementStateStore.getDag(dagId).get()));
}
// test when there does not exist a next job in the dag when the current
job's reevaluate dag action is processed
@@ -161,7 +164,7 @@ public class ReevaluateDagProcTest {
doReturn(new ImmutablePair<>(Optional.of(mockedDag.getNodes().get(0)),
Optional.of(jobStatus)))
.when(dagManagementStateStore).getDagNodeWithJobStatus(any());
- Assert.assertTrue(dagManagementStateStore.hasRunningJobs(dagId));
+
Assert.assertFalse(DagProcUtils.isDagFinished(this.dagManagementStateStore.getDag(dagId).get()));
List<SpecProducer<Spec>> specProducers = getDagSpecProducers(dag);
@@ -180,8 +183,6 @@ public class ReevaluateDagProcTest {
Assert.assertEquals(Mockito.mockingDetails(this.dagManagementStateStore).getInvocations().stream()
.filter(a ->
a.getMethod().getName().equals("deleteDagAction")).count(), 1);
-
- Assert.assertFalse(dagManagementStateStore.hasRunningJobs(dagId));
}
@Test
@@ -195,6 +196,7 @@ public class ReevaluateDagProcTest {
.withValue(ConfigurationKeys.SPECEXECUTOR_INSTANCE_URI_KEY,
ConfigValueFactory.fromAnyRef(
MySqlDagManagementStateStoreTest.TEST_SPEC_EXECUTOR_URI))
);
+ DagManager.DagId dagId = DagManagerUtils.generateDagId(dag);
List<SpecProducer<Spec>> specProducers = getDagSpecProducers(dag);
dagManagementStateStore.addDag(dag);
doReturn(new ImmutablePair<>(Optional.of(dag.getNodes().get(0)),
Optional.empty()))
@@ -216,6 +218,8 @@ public class ReevaluateDagProcTest {
Mockito.verify(dagManagementStateStore,
Mockito.never()).deleteDagAction(any());
Mockito.verify(dagManagementStateStore,
Mockito.never()).addJobDagAction(any(), any(), anyLong(), any(),
eq(DagActionStore.DagActionType.REEVALUATE));
+
+
Assert.assertFalse(DagProcUtils.isDagFinished(this.dagManagementStateStore.getDag(dagId).get()));
}
@Test
@@ -229,6 +233,7 @@ public class ReevaluateDagProcTest {
.withValue(ConfigurationKeys.SPECEXECUTOR_INSTANCE_URI_KEY,
ConfigValueFactory.fromAnyRef(
MySqlDagManagementStateStoreTest.TEST_SPEC_EXECUTOR_URI))
);
+ DagManager.DagId dagId = DagManagerUtils.generateDagId(dag);
JobStatus jobStatus =
JobStatus.builder().flowName(flowName).flowGroup(flowGroup).jobGroup(flowGroup)
.jobName("job3").flowExecutionId(flowExecutionId).message("Test
message").eventName(ExecutionStatus.COMPLETE.name())
.startTime(flowExecutionId).shouldRetry(false).orchestratedTime(flowExecutionId).build();
@@ -260,6 +265,8 @@ public class ReevaluateDagProcTest {
// when there are parallel jobs to launch, they are not directly sent to
spec producers, instead reevaluate dag action is created
specProducers.forEach(sp -> Mockito.verify(sp,
Mockito.never()).addSpec(any()));
+
+
Assert.assertFalse(DagProcUtils.isDagFinished(this.dagManagementStateStore.getDag(dagId).get()));
}
@Test
@@ -271,8 +278,8 @@ public class ReevaluateDagProcTest {
.withValue(ConfigurationKeys.FLOW_NAME_KEY,
ConfigValueFactory.fromAnyRef(flowName))
.withValue(ConfigurationKeys.JOB_GROUP_KEY,
ConfigValueFactory.fromAnyRef(flowGroup))
.withValue(ConfigurationKeys.SPECEXECUTOR_INSTANCE_URI_KEY,
ConfigValueFactory.fromAnyRef(
- MySqlDagManagementStateStoreTest.TEST_SPEC_EXECUTOR_URI))
- );
+ MySqlDagManagementStateStoreTest.TEST_SPEC_EXECUTOR_URI)));
+ DagManager.DagId dagId = DagManagerUtils.generateDagId(dag);
List<SpecProducer<Spec>> specProducers = getDagSpecProducers(dag);
dagManagementStateStore.addDag(dag);
// a job status with shouldRetry=true, it should have execution status =
PENDING_RETRY
@@ -294,6 +301,7 @@ public class ReevaluateDagProcTest {
specProducers.stream().skip(numOfLaunchedJobs) // separately verified
`specProducers.get(0)`
.forEach(sp -> Mockito.verify(sp, Mockito.never()).addSpec(any()));
+
Assert.assertFalse(DagProcUtils.isDagFinished(this.dagManagementStateStore.getDag(dagId).get()));
Mockito.verify(dagManagementStateStore,
Mockito.never()).deleteDagAction(any());
Mockito.verify(dagManagementStateStore, Mockito.never()).deleteDag(any());
Mockito.verify(dagManagementStateStore,
Mockito.never()).addJobDagAction(any(), any(), anyLong(), any(),
diff --git
a/gobblin-service/src/test/java/org/apache/gobblin/service/modules/orchestration/proc/ResumeDagProcTest.java
b/gobblin-service/src/test/java/org/apache/gobblin/service/modules/orchestration/proc/ResumeDagProcTest.java
index 20e47145b6..150bed10e6 100644
---
a/gobblin-service/src/test/java/org/apache/gobblin/service/modules/orchestration/proc/ResumeDagProcTest.java
+++
b/gobblin-service/src/test/java/org/apache/gobblin/service/modules/orchestration/proc/ResumeDagProcTest.java
@@ -21,6 +21,7 @@ import java.io.IOException;
import java.net.URISyntaxException;
import org.mockito.Mockito;
+import org.testng.Assert;
import org.testng.annotations.AfterClass;
import org.testng.annotations.BeforeClass;
import org.testng.annotations.Test;
@@ -109,5 +110,7 @@ public class ResumeDagProcTest {
the result will be that after serializing/deserializing the test dag, the
spec executor (and producer) type may change */
Mockito.verify(this.dagManagementStateStore,
Mockito.times(expectedNumOfResumedJobs)).addDagNodeState(any(), any());
+
+
Assert.assertFalse(DagProcUtils.isDagFinished(this.dagManagementStateStore.getDag(dagId).get()));
}
}