[
https://issues.apache.org/jira/browse/GOBBLIN-2142?focusedWorklogId=931648&page=com.atlassian.jira.plugin.system.issuetabpanels:worklog-tabpanel#worklog-931648
]
ASF GitHub Bot logged work on GOBBLIN-2142:
-------------------------------------------
Author: ASF GitHub Bot
Created on: 26/Aug/24 05:44
Start Date: 26/Aug/24 05:44
Worklog Time Spent: 10m
Work Description: phet commented on code in PR #4037:
URL: https://github.com/apache/gobblin/pull/4037#discussion_r1730679368
##########
gobblin-service/src/main/java/org/apache/gobblin/service/modules/orchestration/proc/DagProcUtils.java:
##########
@@ -291,4 +291,79 @@ public static void
removeFlowFinishDeadlineDagAction(DagManagementStateStore dag
log.warn("Failed to delete dag action {}",
enforceFlowFinishDeadlineDagAction);
}
}
+
+ /**
+ * Returns true if all dag nodes are finished, and it is not possible to run
any new dag node.
+ */
+ public static boolean isDagFinished(Dag<JobExecutionPlan> dag) {
+ List<Dag.DagNode<JobExecutionPlan>> nodes = dag.getNodes();
+ Set<Dag.DagNode<JobExecutionPlan>> canRun = new HashSet<>(nodes);
+ Set<Dag.DagNode<JobExecutionPlan>> completed = new HashSet<>();
+ DagManager.FailureOption failureOption =
DagManagerUtils.getFailureOption(dag);
+ boolean anyFailure = false;
+
+ for (Dag.DagNode<JobExecutionPlan> node : nodes) {
+ if (!canRun.contains(node)) {
+ continue;
+ }
+ ExecutionStatus status = node.getValue().getExecutionStatus();
+ if (status == ExecutionStatus.FAILED || status ==
ExecutionStatus.CANCELLED) {
+ anyFailure = true;
+ removeChildrenFromCanRun(node, dag, canRun);
+ completed.add(node);
+ } else if (status == ExecutionStatus.COMPLETE) {
+ completed.add(node);
Review Comment:
I don't totally understand the logic. whey the node is `COMPLETE`, why keep
it in `canRun`?
##########
gobblin-service/src/main/java/org/apache/gobblin/service/modules/orchestration/proc/DagProcUtils.java:
##########
@@ -291,4 +291,79 @@ public static void
removeFlowFinishDeadlineDagAction(DagManagementStateStore dag
log.warn("Failed to delete dag action {}",
enforceFlowFinishDeadlineDagAction);
}
}
+
+ /**
+ * Returns true if all dag nodes are finished, and it is not possible to run
any new dag node.
+ */
+ public static boolean isDagFinished(Dag<JobExecutionPlan> dag) {
+ List<Dag.DagNode<JobExecutionPlan>> nodes = dag.getNodes();
+ Set<Dag.DagNode<JobExecutionPlan>> canRun = new HashSet<>(nodes);
+ Set<Dag.DagNode<JobExecutionPlan>> completed = new HashSet<>();
+ DagManager.FailureOption failureOption =
DagManagerUtils.getFailureOption(dag);
+ boolean anyFailure = false;
+
+ for (Dag.DagNode<JobExecutionPlan> node : nodes) {
+ if (!canRun.contains(node)) {
+ continue;
+ }
+ ExecutionStatus status = node.getValue().getExecutionStatus();
+ if (status == ExecutionStatus.FAILED || status ==
ExecutionStatus.CANCELLED) {
+ anyFailure = true;
+ removeChildrenFromCanRun(node, dag, canRun);
+ completed.add(node);
+ } else if (status == ExecutionStatus.COMPLETE) {
+ completed.add(node);
+ } else if (status == ExecutionStatus.PENDING) {
+ // Remove PENDING node if its parents are not in canRun, this means
remove the pending nodes also from canRun set
+ // if its parents cannot run
+ if (!areParentsInCanRun(node, canRun)) {
+ canRun.remove(node);
+ }
+ }
+ }
+
+ // In the end, check if there are more nodes in canRun than completed
+ assert canRun.size() >= completed.size();
+
+ if (!anyFailure || failureOption ==
DagManager.FailureOption.FINISH_ALL_POSSIBLE) {
+ return canRun.size() == completed.size();
+ } else if (failureOption == DagManager.FailureOption.FINISH_RUNNING) {
+ //if all the remaining jobs are pending return true
+ canRun.removeAll(completed);
+ boolean isFinished = true;
+ for (Dag.DagNode<JobExecutionPlan> remainingNode : canRun) {
+ if (remainingNode.getValue().getExecutionStatus() ==
ExecutionStatus.RUNNING ||
+ remainingNode.getValue().getExecutionStatus() ==
ExecutionStatus.PENDING_RESUME ||
+ remainingNode.getValue().getExecutionStatus() ==
ExecutionStatus.ORCHESTRATED ||
+ remainingNode.getValue().getExecutionStatus() ==
ExecutionStatus.PENDING_RETRY) {
+ isFinished = false;
+ break;
+ }
+ }
+ return isFinished;
+ } else {
+ throw new RuntimeException("Unexpected failure option " + failureOption);
+ }
+ }
+
+ private static void removeChildrenFromCanRun(Dag.DagNode<JobExecutionPlan>
node, Dag<JobExecutionPlan> dag,
+ Set<Dag.DagNode<JobExecutionPlan>> canRun) {
+ for (Dag.DagNode<JobExecutionPlan> child : dag.getChildren(node)) {
+ canRun.remove(child);
+ removeChildrenFromCanRun(child, dag, canRun); // Recursively remove all
descendants
+ }
+ }
+
+ private static boolean areParentsInCanRun(Dag.DagNode<JobExecutionPlan> node,
Review Comment:
`areAllParentsInCanRun` / `isEveryParentInCanRun`?
##########
gobblin-service/src/main/java/org/apache/gobblin/service/modules/orchestration/DagManagerUtils.java:
##########
@@ -219,6 +219,7 @@ public static Set<DagNode<JobExecutionPlan>>
getNext(Dag<JobExecutionPlan> dag)
switch (failureOption) {
case FINISH_RUNNING:
return new HashSet<>();
+ // todo - FINISH_ALL_POSSIBLE should probably `continue` not `break`
Review Comment:
seems a simple enough change... modify now?
##########
gobblin-service/src/test/java/org/apache/gobblin/service/modules/orchestration/DagManagerUtilsTest.java:
##########
@@ -38,4 +59,299 @@ public void testGetJobSpecFromDag() throws Exception {
Assert.assertEquals(jobSpec.getConfigAsProperties().get(key),
jobSpec.getConfig().getString(key));
}
}
+
+ @Test
+ public void testIsDagFinished() throws URISyntaxException {
+ long flowExecutionId = 12345L;
+ String flowGroup = "fg";
+ String flowName = "fn";
+
+ Dag<JobExecutionPlan> dag = DagManagerTest.buildDag("1", flowExecutionId,
DagManager.FailureOption.FINISH_ALL_POSSIBLE.name(),
+ 1, "user5", ConfigFactory.empty()
+ .withValue(ConfigurationKeys.FLOW_GROUP_KEY,
ConfigValueFactory.fromAnyRef(flowGroup))
+ .withValue(ConfigurationKeys.FLOW_NAME_KEY,
ConfigValueFactory.fromAnyRef(flowName))
+ .withValue(ConfigurationKeys.JOB_GROUP_KEY,
ConfigValueFactory.fromAnyRef(flowGroup))
+ .withValue(ConfigurationKeys.SPECEXECUTOR_INSTANCE_URI_KEY,
ConfigValueFactory.fromAnyRef(
+ MySqlDagManagementStateStoreTest.TEST_SPEC_EXECUTOR_URI))
+ );
+
+ setJobStatuses(dag, Collections.singletonList(COMPLETE));
+ Assert.assertTrue(DagProcUtils.isDagFinished(dag));
+
+ setJobStatuses(dag, Collections.singletonList(FAILED));
+ Assert.assertTrue(DagProcUtils.isDagFinished(dag));
+
+ setJobStatuses(dag, Collections.singletonList(CANCELLED));
+ Assert.assertTrue(DagProcUtils.isDagFinished(dag));
+
+ setJobStatuses(dag, Collections.singletonList(PENDING));
+ Assert.assertFalse(DagProcUtils.isDagFinished(dag));
+
+ setJobStatuses(dag, Collections.singletonList(PENDING_RETRY));
+ Assert.assertFalse(DagProcUtils.isDagFinished(dag));
+
+ setJobStatuses(dag, Collections.singletonList(PENDING_RESUME));
+ Assert.assertFalse(DagProcUtils.isDagFinished(dag));
+
+ setJobStatuses(dag, Collections.singletonList(RUNNING));
+ Assert.assertFalse(DagProcUtils.isDagFinished(dag));
+
+ dag = DagManagerTest.buildDag("1", flowExecutionId,
DagManager.FailureOption.FINISH_ALL_POSSIBLE.name(),
Review Comment:
rather than one single massive test method, could this be the start of a
second test (e.g. single hop vs. multi)?
(this same advice goes for each of the several other re-assignments to `dag`
that follow)
##########
gobblin-service/src/main/java/org/apache/gobblin/service/modules/orchestration/proc/DagProcUtils.java:
##########
@@ -291,4 +291,79 @@ public static void
removeFlowFinishDeadlineDagAction(DagManagementStateStore dag
log.warn("Failed to delete dag action {}",
enforceFlowFinishDeadlineDagAction);
}
}
+
+ /**
+ * Returns true if all dag nodes are finished, and it is not possible to run
any new dag node.
+ */
+ public static boolean isDagFinished(Dag<JobExecutionPlan> dag) {
+ List<Dag.DagNode<JobExecutionPlan>> nodes = dag.getNodes();
+ Set<Dag.DagNode<JobExecutionPlan>> canRun = new HashSet<>(nodes);
+ Set<Dag.DagNode<JobExecutionPlan>> completed = new HashSet<>();
+ DagManager.FailureOption failureOption =
DagManagerUtils.getFailureOption(dag);
+ boolean anyFailure = false;
+
+ for (Dag.DagNode<JobExecutionPlan> node : nodes) {
+ if (!canRun.contains(node)) {
+ continue;
+ }
+ ExecutionStatus status = node.getValue().getExecutionStatus();
+ if (status == ExecutionStatus.FAILED || status ==
ExecutionStatus.CANCELLED) {
+ anyFailure = true;
+ removeChildrenFromCanRun(node, dag, canRun);
+ completed.add(node);
+ } else if (status == ExecutionStatus.COMPLETE) {
+ completed.add(node);
+ } else if (status == ExecutionStatus.PENDING) {
+ // Remove PENDING node if its parents are not in canRun, this means
remove the pending nodes also from canRun set
+ // if its parents cannot run
+ if (!areParentsInCanRun(node, canRun)) {
+ canRun.remove(node);
+ }
+ }
+ }
+
+ // In the end, check if there are more nodes in canRun than completed
+ assert canRun.size() >= completed.size();
+
+ if (!anyFailure || failureOption ==
DagManager.FailureOption.FINISH_ALL_POSSIBLE) {
+ return canRun.size() == completed.size();
+ } else if (failureOption == DagManager.FailureOption.FINISH_RUNNING) {
+ //if all the remaining jobs are pending return true
+ canRun.removeAll(completed);
+ boolean isFinished = true;
+ for (Dag.DagNode<JobExecutionPlan> remainingNode : canRun) {
+ if (remainingNode.getValue().getExecutionStatus() ==
ExecutionStatus.RUNNING ||
+ remainingNode.getValue().getExecutionStatus() ==
ExecutionStatus.PENDING_RESUME ||
+ remainingNode.getValue().getExecutionStatus() ==
ExecutionStatus.ORCHESTRATED ||
+ remainingNode.getValue().getExecutionStatus() ==
ExecutionStatus.PENDING_RETRY) {
+ isFinished = false;
+ break;
+ }
+ }
+ return isFinished;
Review Comment:
```
List<ExecutionStatus> unfinishedStatuses = Lists.newArrayList(RUNNING,
PENIDNG_RESUME, ORCHESTRATED, PENDING_RETRY);
return canRun.stream().anyMatch(node ->
unfinishedStatuses.contains(node.getValue().getExecutionStatus());
```
?
##########
gobblin-service/src/test/java/org/apache/gobblin/service/modules/orchestration/DagManagerUtilsTest.java:
##########
@@ -38,4 +59,299 @@ public void testGetJobSpecFromDag() throws Exception {
Assert.assertEquals(jobSpec.getConfigAsProperties().get(key),
jobSpec.getConfig().getString(key));
}
}
+
+ @Test
+ public void testIsDagFinished() throws URISyntaxException {
Review Comment:
given the method lives in `DagProcUtils`, why put it into
`DagManagerUtilsTest`?
##########
gobblin-service/src/main/java/org/apache/gobblin/service/modules/orchestration/proc/DagProcUtils.java:
##########
@@ -291,4 +291,79 @@ public static void
removeFlowFinishDeadlineDagAction(DagManagementStateStore dag
log.warn("Failed to delete dag action {}",
enforceFlowFinishDeadlineDagAction);
}
}
+
+ /**
+ * Returns true if all dag nodes are finished, and it is not possible to run
any new dag node.
+ */
+ public static boolean isDagFinished(Dag<JobExecutionPlan> dag) {
+ List<Dag.DagNode<JobExecutionPlan>> nodes = dag.getNodes();
+ Set<Dag.DagNode<JobExecutionPlan>> canRun = new HashSet<>(nodes);
+ Set<Dag.DagNode<JobExecutionPlan>> completed = new HashSet<>();
+ DagManager.FailureOption failureOption =
DagManagerUtils.getFailureOption(dag);
+ boolean anyFailure = false;
+
+ for (Dag.DagNode<JobExecutionPlan> node : nodes) {
+ if (!canRun.contains(node)) {
+ continue;
+ }
+ ExecutionStatus status = node.getValue().getExecutionStatus();
+ if (status == ExecutionStatus.FAILED || status ==
ExecutionStatus.CANCELLED) {
+ anyFailure = true;
+ removeChildrenFromCanRun(node, dag, canRun);
Review Comment:
is there a specific ordering guarantee for `DagNode<T>::getNodes`? how do
we know this parent would be executed after the children, such that it's
children don't get added to `canRun`, even though their parent's failure should
mean the child isn't ready to run?
##########
gobblin-service/src/main/java/org/apache/gobblin/service/modules/orchestration/proc/DagProcUtils.java:
##########
@@ -291,4 +291,79 @@ public static void
removeFlowFinishDeadlineDagAction(DagManagementStateStore dag
log.warn("Failed to delete dag action {}",
enforceFlowFinishDeadlineDagAction);
}
}
+
+ /**
+ * Returns true if all dag nodes are finished, and it is not possible to run
any new dag node.
+ */
+ public static boolean isDagFinished(Dag<JobExecutionPlan> dag) {
+ List<Dag.DagNode<JobExecutionPlan>> nodes = dag.getNodes();
+ Set<Dag.DagNode<JobExecutionPlan>> canRun = new HashSet<>(nodes);
+ Set<Dag.DagNode<JobExecutionPlan>> completed = new HashSet<>();
+ DagManager.FailureOption failureOption =
DagManagerUtils.getFailureOption(dag);
+ boolean anyFailure = false;
+
+ for (Dag.DagNode<JobExecutionPlan> node : nodes) {
+ if (!canRun.contains(node)) {
+ continue;
+ }
+ ExecutionStatus status = node.getValue().getExecutionStatus();
+ if (status == ExecutionStatus.FAILED || status ==
ExecutionStatus.CANCELLED) {
+ anyFailure = true;
+ removeChildrenFromCanRun(node, dag, canRun);
+ completed.add(node);
+ } else if (status == ExecutionStatus.COMPLETE) {
+ completed.add(node);
+ } else if (status == ExecutionStatus.PENDING) {
+ // Remove PENDING node if its parents are not in canRun, this means
remove the pending nodes also from canRun set
+ // if its parents cannot run
+ if (!areParentsInCanRun(node, canRun)) {
+ canRun.remove(node);
+ }
+ }
+ }
+
+ // In the end, check if there are more nodes in canRun than completed
+ assert canRun.size() >= completed.size();
+
+ if (!anyFailure || failureOption ==
DagManager.FailureOption.FINISH_ALL_POSSIBLE) {
+ return canRun.size() == completed.size();
+ } else if (failureOption == DagManager.FailureOption.FINISH_RUNNING) {
+ //if all the remaining jobs are pending return true
+ canRun.removeAll(completed);
+ boolean isFinished = true;
+ for (Dag.DagNode<JobExecutionPlan> remainingNode : canRun) {
+ if (remainingNode.getValue().getExecutionStatus() ==
ExecutionStatus.RUNNING ||
+ remainingNode.getValue().getExecutionStatus() ==
ExecutionStatus.PENDING_RESUME ||
+ remainingNode.getValue().getExecutionStatus() ==
ExecutionStatus.ORCHESTRATED ||
+ remainingNode.getValue().getExecutionStatus() ==
ExecutionStatus.PENDING_RETRY) {
+ isFinished = false;
+ break;
+ }
+ }
+ return isFinished;
+ } else {
+ throw new RuntimeException("Unexpected failure option " + failureOption);
+ }
+ }
+
+ private static void removeChildrenFromCanRun(Dag.DagNode<JobExecutionPlan>
node, Dag<JobExecutionPlan> dag,
Review Comment:
seems this should be named `removeDescendantsFromCanRun`
##########
gobblin-service/src/test/java/org/apache/gobblin/service/modules/orchestration/DagManagerUtilsTest.java:
##########
@@ -38,4 +59,299 @@ public void testGetJobSpecFromDag() throws Exception {
Assert.assertEquals(jobSpec.getConfigAsProperties().get(key),
jobSpec.getConfig().getString(key));
}
}
+
+ @Test
+ public void testIsDagFinished() throws URISyntaxException {
+ long flowExecutionId = 12345L;
+ String flowGroup = "fg";
+ String flowName = "fn";
+
+ Dag<JobExecutionPlan> dag = DagManagerTest.buildDag("1", flowExecutionId,
DagManager.FailureOption.FINISH_ALL_POSSIBLE.name(),
+ 1, "user5", ConfigFactory.empty()
+ .withValue(ConfigurationKeys.FLOW_GROUP_KEY,
ConfigValueFactory.fromAnyRef(flowGroup))
+ .withValue(ConfigurationKeys.FLOW_NAME_KEY,
ConfigValueFactory.fromAnyRef(flowName))
+ .withValue(ConfigurationKeys.JOB_GROUP_KEY,
ConfigValueFactory.fromAnyRef(flowGroup))
+ .withValue(ConfigurationKeys.SPECEXECUTOR_INSTANCE_URI_KEY,
ConfigValueFactory.fromAnyRef(
+ MySqlDagManagementStateStoreTest.TEST_SPEC_EXECUTOR_URI))
+ );
+
+ setJobStatuses(dag, Collections.singletonList(COMPLETE));
+ Assert.assertTrue(DagProcUtils.isDagFinished(dag));
+
+ setJobStatuses(dag, Collections.singletonList(FAILED));
+ Assert.assertTrue(DagProcUtils.isDagFinished(dag));
+
+ setJobStatuses(dag, Collections.singletonList(CANCELLED));
+ Assert.assertTrue(DagProcUtils.isDagFinished(dag));
+
+ setJobStatuses(dag, Collections.singletonList(PENDING));
+ Assert.assertFalse(DagProcUtils.isDagFinished(dag));
+
+ setJobStatuses(dag, Collections.singletonList(PENDING_RETRY));
+ Assert.assertFalse(DagProcUtils.isDagFinished(dag));
+
+ setJobStatuses(dag, Collections.singletonList(PENDING_RESUME));
+ Assert.assertFalse(DagProcUtils.isDagFinished(dag));
+
+ setJobStatuses(dag, Collections.singletonList(RUNNING));
+ Assert.assertFalse(DagProcUtils.isDagFinished(dag));
Review Comment:
didn't the impl have `ORCHESTRATED` as another one?
##########
gobblin-service/src/test/java/org/apache/gobblin/service/modules/orchestration/DagManagerUtilsTest.java:
##########
@@ -38,4 +59,299 @@ public void testGetJobSpecFromDag() throws Exception {
Assert.assertEquals(jobSpec.getConfigAsProperties().get(key),
jobSpec.getConfig().getString(key));
}
}
+
+ @Test
+ public void testIsDagFinished() throws URISyntaxException {
+ long flowExecutionId = 12345L;
+ String flowGroup = "fg";
+ String flowName = "fn";
+
+ Dag<JobExecutionPlan> dag = DagManagerTest.buildDag("1", flowExecutionId,
DagManager.FailureOption.FINISH_ALL_POSSIBLE.name(),
+ 1, "user5", ConfigFactory.empty()
+ .withValue(ConfigurationKeys.FLOW_GROUP_KEY,
ConfigValueFactory.fromAnyRef(flowGroup))
+ .withValue(ConfigurationKeys.FLOW_NAME_KEY,
ConfigValueFactory.fromAnyRef(flowName))
+ .withValue(ConfigurationKeys.JOB_GROUP_KEY,
ConfigValueFactory.fromAnyRef(flowGroup))
+ .withValue(ConfigurationKeys.SPECEXECUTOR_INSTANCE_URI_KEY,
ConfigValueFactory.fromAnyRef(
+ MySqlDagManagementStateStoreTest.TEST_SPEC_EXECUTOR_URI))
+ );
+
+ setJobStatuses(dag, Collections.singletonList(COMPLETE));
+ Assert.assertTrue(DagProcUtils.isDagFinished(dag));
+
+ setJobStatuses(dag, Collections.singletonList(FAILED));
+ Assert.assertTrue(DagProcUtils.isDagFinished(dag));
+
+ setJobStatuses(dag, Collections.singletonList(CANCELLED));
+ Assert.assertTrue(DagProcUtils.isDagFinished(dag));
+
+ setJobStatuses(dag, Collections.singletonList(PENDING));
+ Assert.assertFalse(DagProcUtils.isDagFinished(dag));
+
+ setJobStatuses(dag, Collections.singletonList(PENDING_RETRY));
+ Assert.assertFalse(DagProcUtils.isDagFinished(dag));
+
+ setJobStatuses(dag, Collections.singletonList(PENDING_RESUME));
+ Assert.assertFalse(DagProcUtils.isDagFinished(dag));
+
+ setJobStatuses(dag, Collections.singletonList(RUNNING));
+ Assert.assertFalse(DagProcUtils.isDagFinished(dag));
+
+ dag = DagManagerTest.buildDag("1", flowExecutionId,
DagManager.FailureOption.FINISH_ALL_POSSIBLE.name(),
+ 2, "user5", ConfigFactory.empty()
+ .withValue(ConfigurationKeys.FLOW_GROUP_KEY,
ConfigValueFactory.fromAnyRef(flowGroup))
+ .withValue(ConfigurationKeys.FLOW_NAME_KEY,
ConfigValueFactory.fromAnyRef(flowName))
+ .withValue(ConfigurationKeys.JOB_GROUP_KEY,
ConfigValueFactory.fromAnyRef(flowGroup))
+ .withValue(ConfigurationKeys.SPECEXECUTOR_INSTANCE_URI_KEY,
ConfigValueFactory.fromAnyRef(
+ MySqlDagManagementStateStoreTest.TEST_SPEC_EXECUTOR_URI))
+ );
+
+ setJobStatuses(dag, Arrays.asList(COMPLETE, PENDING));
+ Assert.assertFalse(DagProcUtils.isDagFinished(dag));
+
+ setJobStatuses(dag, Arrays.asList(COMPLETE, FAILED));
+ Assert.assertTrue(DagProcUtils.isDagFinished(dag));
+
+ setJobStatuses(dag, Arrays.asList(FAILED, PENDING));
+ Assert.assertTrue(DagProcUtils.isDagFinished(dag));
+
+ setJobStatuses(dag, Arrays.asList(CANCELLED, PENDING));
+ Assert.assertTrue(DagProcUtils.isDagFinished(dag));
+
+ dag = buildComplexDag2("1", flowExecutionId,
DagManager.FailureOption.FINISH_ALL_POSSIBLE.name(), "user5",
+ ConfigFactory.empty()
+ .withValue(ConfigurationKeys.FLOW_GROUP_KEY,
ConfigValueFactory.fromAnyRef(flowGroup))
+ .withValue(ConfigurationKeys.FLOW_NAME_KEY,
ConfigValueFactory.fromAnyRef(flowName))
+ .withValue(ConfigurationKeys.JOB_GROUP_KEY,
ConfigValueFactory.fromAnyRef(flowGroup))
+ .withValue(ConfigurationKeys.SPECEXECUTOR_INSTANCE_URI_KEY,
ConfigValueFactory.fromAnyRef(
+ MySqlDagManagementStateStoreTest.TEST_SPEC_EXECUTOR_URI)));
+
+ setJobStatuses(dag, Arrays.asList(COMPLETE, PENDING, PENDING, PENDING));
+ Assert.assertFalse(DagProcUtils.isDagFinished(dag));
+
+ setJobStatuses(dag, Arrays.asList(FAILED, PENDING, PENDING, PENDING));
+ Assert.assertTrue(DagProcUtils.isDagFinished(dag));
+
+ setJobStatuses(dag, Arrays.asList(CANCELLED, PENDING, PENDING, PENDING));
+ Assert.assertTrue(DagProcUtils.isDagFinished(dag));
+
+ setJobStatuses(dag, Arrays.asList(PENDING, PENDING, PENDING, PENDING));
+ Assert.assertFalse(DagProcUtils.isDagFinished(dag));
+
+ dag = buildComplexDag3("1", flowExecutionId,
DagManager.FailureOption.FINISH_ALL_POSSIBLE.name(), "user5",
+ ConfigFactory.empty()
+ .withValue(ConfigurationKeys.FLOW_GROUP_KEY,
ConfigValueFactory.fromAnyRef(flowGroup))
+ .withValue(ConfigurationKeys.FLOW_NAME_KEY,
ConfigValueFactory.fromAnyRef(flowName))
+ .withValue(ConfigurationKeys.JOB_GROUP_KEY,
ConfigValueFactory.fromAnyRef(flowGroup))
+ .withValue(ConfigurationKeys.SPECEXECUTOR_INSTANCE_URI_KEY,
ConfigValueFactory.fromAnyRef(
+ MySqlDagManagementStateStoreTest.TEST_SPEC_EXECUTOR_URI)));
+
+ setJobStatuses(dag, Arrays.asList(COMPLETE, PENDING, PENDING));
+ Assert.assertFalse(DagProcUtils.isDagFinished(dag));
+
+ setJobStatuses(dag, Arrays.asList(COMPLETE, FAILED, PENDING));
+ Assert.assertTrue(DagProcUtils.isDagFinished(dag));
+
+ setJobStatuses(dag, Arrays.asList(COMPLETE, CANCELLED, PENDING));
+ Assert.assertTrue(DagProcUtils.isDagFinished(dag));
+
+ dag = buildComplexDag1("1", flowExecutionId,
+ DagManager.FailureOption.FINISH_ALL_POSSIBLE.name(),"user5",
ConfigFactory.empty()
+ .withValue(ConfigurationKeys.FLOW_GROUP_KEY,
ConfigValueFactory.fromAnyRef(flowGroup))
+ .withValue(ConfigurationKeys.FLOW_NAME_KEY,
ConfigValueFactory.fromAnyRef(flowName))
+ .withValue(ConfigurationKeys.SPECEXECUTOR_INSTANCE_URI_KEY,
ConfigValueFactory.fromAnyRef(
+ MySqlDagManagementStateStoreTest.TEST_SPEC_EXECUTOR_URI)));
+
+ setJobStatuses(dag, Arrays.asList(COMPLETE, COMPLETE, COMPLETE, COMPLETE,
COMPLETE, COMPLETE, COMPLETE, COMPLETE, COMPLETE, COMPLETE));
+ Assert.assertTrue(DagProcUtils.isDagFinished(dag));
+
+ setJobStatuses(dag, Arrays.asList(COMPLETE, COMPLETE, COMPLETE, COMPLETE,
PENDING, COMPLETE, COMPLETE, PENDING, COMPLETE, PENDING));
+ Assert.assertFalse(DagProcUtils.isDagFinished(dag));
+
+ setJobStatuses(dag, Arrays.asList(FAILED, COMPLETE, COMPLETE, COMPLETE,
PENDING, COMPLETE, COMPLETE, PENDING, COMPLETE, PENDING));
+ Assert.assertTrue(DagProcUtils.isDagFinished(dag));
+
+ setJobStatuses(dag, Arrays.asList(COMPLETE, COMPLETE, COMPLETE, COMPLETE,
COMPLETE, CANCELLED, COMPLETE, PENDING, PENDING, PENDING));
+ Assert.assertFalse(DagProcUtils.isDagFinished(dag));
+
+ setJobStatuses(dag, Arrays.asList(COMPLETE, COMPLETE, COMPLETE, COMPLETE,
COMPLETE, CANCELLED, COMPLETE, COMPLETE, PENDING, PENDING));
+ Assert.assertTrue(DagProcUtils.isDagFinished(dag));
+
+ setJobStatuses(dag, Arrays.asList(COMPLETE, COMPLETE, COMPLETE, COMPLETE,
COMPLETE, PENDING_RESUME, COMPLETE, COMPLETE, PENDING, PENDING));
+ Assert.assertFalse(DagProcUtils.isDagFinished(dag));
+
+ setJobStatuses(dag, Arrays.asList(COMPLETE, COMPLETE, COMPLETE, COMPLETE,
COMPLETE, PENDING_RETRY, COMPLETE, COMPLETE, PENDING, PENDING));
+ Assert.assertFalse(DagProcUtils.isDagFinished(dag));
+
+ setJobStatuses(dag, Arrays.asList(COMPLETE, COMPLETE, COMPLETE, COMPLETE,
COMPLETE, RUNNING, COMPLETE, COMPLETE, PENDING, PENDING));
+ Assert.assertFalse(DagProcUtils.isDagFinished(dag));
+
+ setJobStatuses(dag, Arrays.asList(COMPLETE, COMPLETE, COMPLETE, FAILED,
COMPLETE, COMPLETE, PENDING, COMPLETE, PENDING, COMPLETE));
+ Assert.assertFalse(DagProcUtils.isDagFinished(dag));
+ }
+
+ @Test
+ public void testIsDagFinishedWithFinishRunningFailureOption() throws
URISyntaxException {
+ long flowExecutionId = 12345L;
+ String flowGroup = "fg";
+ String flowName = "fn";
+ Dag<JobExecutionPlan> dag = buildComplexDag4("1", flowExecutionId,
DagManager.FailureOption.FINISH_RUNNING.name(), "user5",
+ ConfigFactory.empty()
+ .withValue(ConfigurationKeys.FLOW_GROUP_KEY,
ConfigValueFactory.fromAnyRef(flowGroup))
+ .withValue(ConfigurationKeys.FLOW_NAME_KEY,
ConfigValueFactory.fromAnyRef(flowName))
+ .withValue(ConfigurationKeys.JOB_GROUP_KEY,
ConfigValueFactory.fromAnyRef(flowGroup))
+ .withValue(ConfigurationKeys.SPECEXECUTOR_INSTANCE_URI_KEY,
ConfigValueFactory.fromAnyRef(
+ MySqlDagManagementStateStoreTest.TEST_SPEC_EXECUTOR_URI)));
+
+ setJobStatuses(dag, Arrays.asList(COMPLETE, CANCELLED, COMPLETE, PENDING,
PENDING));
+ Assert.assertTrue(DagProcUtils.isDagFinished(dag));
+
+ setJobStatuses(dag, Arrays.asList(COMPLETE, CANCELLED, COMPLETE, RUNNING,
PENDING));
+ Assert.assertFalse(DagProcUtils.isDagFinished(dag));
+ }
+
+ private void setJobStatuses(Dag<JobExecutionPlan> dag, List<ExecutionStatus>
statuses) {
+ int i=0;
+ for (ExecutionStatus status : statuses) {
+ dag.getNodes().get(i++).getValue().setExecutionStatus(status);
+ }
+ }
+
+ // This creates a dag like this
+ // D0 D1 D2 D3
+ // | | | \ |
+ // D4 D5 | D6
+ // | | \|
+ // D7 | D8
+ // \ | /
+ // D9
+
+ public static Dag<JobExecutionPlan> buildComplexDag1(String id, long
flowExecutionId,
+ String flowFailureOption, String proxyUser, Config additionalConfig)
throws URISyntaxException {
+ List<JobExecutionPlan> jobExecutionPlans = new ArrayList<>();
+
+ for (int i = 0; i < 10; i++) {
+ String suffix = Integer.toString(i);
+ Config jobConfig = ConfigBuilder.create().
+ addPrimitive(ConfigurationKeys.FLOW_GROUP_KEY, "group" + id).
+ addPrimitive(ConfigurationKeys.FLOW_NAME_KEY, "flow" + id).
+ addPrimitive(ConfigurationKeys.FLOW_EXECUTION_ID_KEY,
flowExecutionId).
+ addPrimitive(ConfigurationKeys.JOB_GROUP_KEY, "group" + id).
+ addPrimitive(ConfigurationKeys.JOB_NAME_KEY, "job" + suffix).
+ addPrimitive(ConfigurationKeys.FLOW_FAILURE_OPTION,
flowFailureOption).
+ addPrimitive(AzkabanProjectConfig.USER_TO_PROXY, proxyUser).build();
+ jobConfig = additionalConfig.withFallback(jobConfig);
+ if (i == 4) {
+ jobConfig = jobConfig.withValue(ConfigurationKeys.JOB_DEPENDENCIES,
ConfigValueFactory.fromAnyRef("job0"));
+ } else if (i == 5) {
+ jobConfig = jobConfig.withValue(ConfigurationKeys.JOB_DEPENDENCIES,
ConfigValueFactory.fromAnyRef("job1"));
+ } if (i == 6) {
+ jobConfig = jobConfig.withValue(ConfigurationKeys.JOB_DEPENDENCIES,
ConfigValueFactory.fromAnyRef("job2,job3"));
+ } else if (i == 7) {
+ jobConfig = jobConfig.withValue(ConfigurationKeys.JOB_DEPENDENCIES,
ConfigValueFactory.fromAnyRef("job4"));
+ } else if (i == 8) {
+ jobConfig = jobConfig.withValue(ConfigurationKeys.JOB_DEPENDENCIES,
ConfigValueFactory.fromAnyRef("job5,job2"));
+ } else if (i == 9) {
+ jobConfig = jobConfig.withValue(ConfigurationKeys.JOB_DEPENDENCIES,
ConfigValueFactory.fromAnyRef("job7,job5,job8"));
+ }
+ JobSpec js = JobSpec.builder("test_job" +
suffix).withVersion(suffix).withConfig(jobConfig).
+ withTemplate(new URI("job" + suffix)).build();
+ SpecExecutor specExecutor =
MockedSpecExecutor.createDummySpecExecutor(new URI(
+ ConfigUtils.getString(additionalConfig,
ConfigurationKeys.SPECEXECUTOR_INSTANCE_URI_KEY,"job" + i)));
+ JobExecutionPlan jobExecutionPlan = new JobExecutionPlan(js,
specExecutor);
+ jobExecutionPlans.add(jobExecutionPlan);
+ }
+ return new JobExecutionPlanDagFactory().createDag(jobExecutionPlans);
+ }
+
+ public static Dag<JobExecutionPlan> buildComplexDag2(String id, long
flowExecutionId,
Review Comment:
the picture for the one before was helpful. what about something analogous
here?
(same w/ the others that follow)
Issue Time Tracking
-------------------
Worklog Id: (was: 931648)
Time Spent: 20m (was: 10m)
> find if the dag is running or not correctly
> -------------------------------------------
>
> Key: GOBBLIN-2142
> URL: https://issues.apache.org/jira/browse/GOBBLIN-2142
> Project: Apache Gobblin
> Issue Type: Bug
> Reporter: Arjun Singh Bora
> Priority: Major
> Time Spent: 20m
> Remaining Estimate: 0h
>
--
This message was sent by Atlassian Jira
(v8.20.10#820010)