This is an automated email from the ASF dual-hosted git repository.
arjun4084346 pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/gobblin.git
The following commit(s) were added to refs/heads/master by this push:
new f40bb446b3 [GOBBLIN-2150] calculate flow status correctly (#4048)
f40bb446b3 is described below
commit f40bb446b3943f9eb95055a4544b387993672f87
Author: Arjun Singh Bora <[email protected]>
AuthorDate: Wed Sep 4 18:53:02 2024 -0700
[GOBBLIN-2150] calculate flow status correctly (#4048)
* calculate flow status correctly
* address review comments
---
.../modules/orchestration/proc/DagProcUtils.java | 22 +++++++++-
.../orchestration/proc/ReevaluateDagProc.java | 9 +----
.../modules/orchestration/DagManagerUtilsTest.java | 47 ++++++++++++++++++----
3 files changed, 62 insertions(+), 16 deletions(-)
diff --git
a/gobblin-service/src/main/java/org/apache/gobblin/service/modules/orchestration/proc/DagProcUtils.java
b/gobblin-service/src/main/java/org/apache/gobblin/service/modules/orchestration/proc/DagProcUtils.java
index 71693705fc..da144c28cd 100644
---
a/gobblin-service/src/main/java/org/apache/gobblin/service/modules/orchestration/proc/DagProcUtils.java
+++
b/gobblin-service/src/main/java/org/apache/gobblin/service/modules/orchestration/proc/DagProcUtils.java
@@ -27,6 +27,7 @@ import java.util.Properties;
import java.util.Set;
import java.util.concurrent.Future;
import java.util.concurrent.TimeUnit;
+import java.util.stream.Collectors;
import com.google.common.base.Optional;
import com.google.common.collect.Maps;
@@ -174,7 +175,7 @@ public class DagProcUtils {
log.warn("No Job future when canceling DAG node - {}",
dagNodeToCancel.getValue().getId());
}
DagManagerUtils.getSpecProducer(dagNodeToCancel).cancelJob(dagNodeToCancel.getValue().getJobSpec().getUri(),
cancelJobArgs).get();
- sendCancellationEvent(dagNodeToCancel);
+ sendJobCancellationEvent(dagNodeToCancel);
log.info("Cancelled dag node {}, spec_producer_future {}",
dagNodeToCancel.getValue().getId(), serializedFuture);
} catch (Exception e) {
throw new IOException(e);
@@ -190,7 +191,7 @@ public class DagProcUtils {
}
}
- private static void sendCancellationEvent(Dag.DagNode<JobExecutionPlan>
dagNodeToCancel) {
+ private static void sendJobCancellationEvent(Dag.DagNode<JobExecutionPlan>
dagNodeToCancel) {
JobExecutionPlan jobExecutionPlan = dagNodeToCancel.getValue();
Map<String, String> jobMetadata =
TimingEventUtils.getJobMetadata(Maps.newHashMap(), jobExecutionPlan);
DagProc.eventSubmitter.getTimingEvent(TimingEvent.LauncherTimings.JOB_CANCEL).stop(jobMetadata);
@@ -357,4 +358,21 @@ public class DagProcUtils {
Set<Dag.DagNode<JobExecutionPlan>> canRun) {
return node.getParentNodes() == null ||
canRun.containsAll(node.getParentNodes());
}
+
+ public static String calcFlowStatus(Dag<JobExecutionPlan> dag) {
+ Set<ExecutionStatus> jobsStatuses = dag.getNodes().stream().map(node ->
node.getValue().getExecutionStatus())
+ .collect(Collectors.toSet());
+
+ if (jobsStatuses.contains(FAILED)) {
+ return TimingEvent.FlowTimings.FLOW_FAILED;
+ } else if (jobsStatuses.contains(CANCELLED)) {
+ return TimingEvent.FlowTimings.FLOW_CANCELLED;
+ } else if (jobsStatuses.contains(PENDING_RESUME)) {
+ return TimingEvent.FlowTimings.FLOW_PENDING_RESUME;
+ } else if (jobsStatuses.stream().allMatch(jobStatus -> jobStatus ==
COMPLETE)) {
+ return TimingEvent.FlowTimings.FLOW_SUCCEEDED;
+ } else {
+ return TimingEvent.FlowTimings.FLOW_RUNNING;
+ }
+ }
}
diff --git
a/gobblin-service/src/main/java/org/apache/gobblin/service/modules/orchestration/proc/ReevaluateDagProc.java
b/gobblin-service/src/main/java/org/apache/gobblin/service/modules/orchestration/proc/ReevaluateDagProc.java
index cdfa3d0509..a4afe4e198 100644
---
a/gobblin-service/src/main/java/org/apache/gobblin/service/modules/orchestration/proc/ReevaluateDagProc.java
+++
b/gobblin-service/src/main/java/org/apache/gobblin/service/modules/orchestration/proc/ReevaluateDagProc.java
@@ -115,13 +115,8 @@ public class ReevaluateDagProc extends
DagProc<Pair<Optional<Dag.DagNode<JobExec
dag.setFlowEvent(null);
DagProcUtils.submitJobToExecutor(dagManagementStateStore, dagNode,
getDagId());
} else if (DagProcUtils.isDagFinished(dag)) {
- if (dag.getFlowEvent() == null) {
- // If the dag flow event is not set and there are no more jobs
running, then it is successful
- // also note that `onJobFinish` method does whatever is required to do
after job finish, determining a Dag's
- // status is not possible on individual job's finish status
- dag.setFlowEvent(TimingEvent.FlowTimings.FLOW_SUCCEEDED);
- }
- String flowEvent = dag.getFlowEvent();
+ String flowEvent = DagProcUtils.calcFlowStatus(dag);
+ dag.setFlowEvent(flowEvent);
DagProcUtils.setAndEmitFlowEvent(eventSubmitter, dag, flowEvent);
if (flowEvent.equals(TimingEvent.FlowTimings.FLOW_SUCCEEDED)) {
// todo - verify if work from PR#3641 is required
diff --git
a/gobblin-service/src/test/java/org/apache/gobblin/service/modules/orchestration/DagManagerUtilsTest.java
b/gobblin-service/src/test/java/org/apache/gobblin/service/modules/orchestration/DagManagerUtilsTest.java
index ab6509e30d..3e3d41dbe8 100644
---
a/gobblin-service/src/test/java/org/apache/gobblin/service/modules/orchestration/DagManagerUtilsTest.java
+++
b/gobblin-service/src/test/java/org/apache/gobblin/service/modules/orchestration/DagManagerUtilsTest.java
@@ -33,6 +33,7 @@ import com.typesafe.config.ConfigValueFactory;
import org.apache.gobblin.config.ConfigBuilder;
import org.apache.gobblin.configuration.ConfigurationKeys;
+import org.apache.gobblin.metrics.event.TimingEvent;
import org.apache.gobblin.runtime.api.JobSpec;
import org.apache.gobblin.runtime.api.SpecExecutor;
import org.apache.gobblin.runtime.spec_executorInstance.MockedSpecExecutor;
@@ -73,162 +74,194 @@ public class DagManagerUtilsTest {
}
@Test
- public void testIsDagFinishedSingleNode() throws URISyntaxException {
+ public void testFlowStatusAndIsDagFinishedSingleNode() throws
URISyntaxException {
Dag<JobExecutionPlan> dag =
DagManagerTest.buildDag(id, flowExecutionId, flowFailureOption, 1,
proxyUser, additionalConfig);
setJobStatuses(dag, Collections.singletonList(COMPLETE));
Assert.assertTrue(DagProcUtils.isDagFinished(dag));
+ Assert.assertEquals(TimingEvent.FlowTimings.FLOW_SUCCEEDED,
DagProcUtils.calcFlowStatus(dag));
setJobStatuses(dag, Collections.singletonList(FAILED));
Assert.assertTrue(DagProcUtils.isDagFinished(dag));
+ Assert.assertEquals(TimingEvent.FlowTimings.FLOW_FAILED,
DagProcUtils.calcFlowStatus(dag));
setJobStatuses(dag, Collections.singletonList(CANCELLED));
Assert.assertTrue(DagProcUtils.isDagFinished(dag));
+ Assert.assertEquals(TimingEvent.FlowTimings.FLOW_CANCELLED,
DagProcUtils.calcFlowStatus(dag));
setJobStatuses(dag, Collections.singletonList(PENDING));
Assert.assertFalse(DagProcUtils.isDagFinished(dag));
+ Assert.assertEquals(TimingEvent.FlowTimings.FLOW_RUNNING,
DagProcUtils.calcFlowStatus(dag));
setJobStatuses(dag, Collections.singletonList(PENDING_RETRY));
Assert.assertFalse(DagProcUtils.isDagFinished(dag));
+ Assert.assertEquals(TimingEvent.FlowTimings.FLOW_RUNNING,
DagProcUtils.calcFlowStatus(dag));
setJobStatuses(dag, Collections.singletonList(PENDING_RESUME));
Assert.assertFalse(DagProcUtils.isDagFinished(dag));
+ Assert.assertEquals(TimingEvent.FlowTimings.FLOW_PENDING_RESUME,
DagProcUtils.calcFlowStatus(dag));
setJobStatuses(dag, Collections.singletonList(ORCHESTRATED));
Assert.assertFalse(DagProcUtils.isDagFinished(dag));
+ Assert.assertEquals(TimingEvent.FlowTimings.FLOW_RUNNING,
DagProcUtils.calcFlowStatus(dag));
setJobStatuses(dag, Collections.singletonList(RUNNING));
Assert.assertFalse(DagProcUtils.isDagFinished(dag));
+ Assert.assertEquals(TimingEvent.FlowTimings.FLOW_RUNNING,
DagProcUtils.calcFlowStatus(dag));
}
@Test
- public void testIsDagFinishedTwoNodes() throws URISyntaxException {
+ public void testFlowStatusAndIsDagFinishedTwoNodes() throws
URISyntaxException {
Dag<JobExecutionPlan> dag =
DagManagerTest.buildDag(id, flowExecutionId, flowFailureOption, 2,
proxyUser, additionalConfig);
setJobStatuses(dag, Arrays.asList(COMPLETE, PENDING));
Assert.assertFalse(DagProcUtils.isDagFinished(dag));
+ Assert.assertEquals(TimingEvent.FlowTimings.FLOW_RUNNING,
DagProcUtils.calcFlowStatus(dag));
setJobStatuses(dag, Arrays.asList(COMPLETE, FAILED));
Assert.assertTrue(DagProcUtils.isDagFinished(dag));
+ Assert.assertEquals(TimingEvent.FlowTimings.FLOW_FAILED,
DagProcUtils.calcFlowStatus(dag));
setJobStatuses(dag, Arrays.asList(FAILED, PENDING));
Assert.assertTrue(DagProcUtils.isDagFinished(dag));
+ Assert.assertEquals(TimingEvent.FlowTimings.FLOW_FAILED,
DagProcUtils.calcFlowStatus(dag));
setJobStatuses(dag, Arrays.asList(CANCELLED, PENDING));
Assert.assertTrue(DagProcUtils.isDagFinished(dag));
+ Assert.assertEquals(TimingEvent.FlowTimings.FLOW_CANCELLED,
DagProcUtils.calcFlowStatus(dag));
}
@Test
- public void testIsDagFinishedThreeNodes() throws URISyntaxException {
+ public void testFlowStatusAndIsDagFinishedThreeNodes() throws
URISyntaxException {
Dag<JobExecutionPlan> dag = buildComplexDag3();
setJobStatuses(dag, Arrays.asList(COMPLETE, PENDING, PENDING));
Assert.assertFalse(DagProcUtils.isDagFinished(dag));
+ Assert.assertEquals(TimingEvent.FlowTimings.FLOW_RUNNING,
DagProcUtils.calcFlowStatus(dag));
setJobStatuses(dag, Arrays.asList(COMPLETE, FAILED, PENDING));
Assert.assertTrue(DagProcUtils.isDagFinished(dag));
+ Assert.assertEquals(TimingEvent.FlowTimings.FLOW_FAILED,
DagProcUtils.calcFlowStatus(dag));
setJobStatuses(dag, Arrays.asList(COMPLETE, CANCELLED, PENDING));
Assert.assertTrue(DagProcUtils.isDagFinished(dag));
+ Assert.assertEquals(TimingEvent.FlowTimings.FLOW_CANCELLED,
DagProcUtils.calcFlowStatus(dag));
}
@Test
- public void testIsDagFinishedFourNodes() throws URISyntaxException {
+ public void testFlowStatusAndIsDagFinishedFourNodes() throws
URISyntaxException {
Dag<JobExecutionPlan> dag = buildLinearDagOf4Nodes();
setJobStatuses(dag, Arrays.asList(COMPLETE, PENDING, PENDING, PENDING));
Assert.assertFalse(DagProcUtils.isDagFinished(dag));
+ Assert.assertEquals(TimingEvent.FlowTimings.FLOW_RUNNING,
DagProcUtils.calcFlowStatus(dag));
setJobStatuses(dag, Arrays.asList(FAILED, PENDING, PENDING, PENDING));
Assert.assertTrue(DagProcUtils.isDagFinished(dag));
+ Assert.assertEquals(TimingEvent.FlowTimings.FLOW_FAILED,
DagProcUtils.calcFlowStatus(dag));
setJobStatuses(dag, Arrays.asList(CANCELLED, PENDING, PENDING, PENDING));
Assert.assertTrue(DagProcUtils.isDagFinished(dag));
+ Assert.assertEquals(TimingEvent.FlowTimings.FLOW_CANCELLED,
DagProcUtils.calcFlowStatus(dag));
setJobStatuses(dag, Arrays.asList(PENDING, PENDING, PENDING, PENDING));
Assert.assertFalse(DagProcUtils.isDagFinished(dag));
+ Assert.assertEquals(TimingEvent.FlowTimings.FLOW_RUNNING,
DagProcUtils.calcFlowStatus(dag));
}
@Test
- public void testIsDagFinishedMultiNodes() throws URISyntaxException {
+ public void testFlowStatusAndIsDagFinishedMultiNodes() throws
URISyntaxException {
Dag<JobExecutionPlan> dag = buildComplexDag1();
setJobStatuses(dag, Arrays.asList(COMPLETE, COMPLETE, COMPLETE, COMPLETE,
COMPLETE, COMPLETE, COMPLETE, COMPLETE, COMPLETE, COMPLETE));
Assert.assertTrue(DagProcUtils.isDagFinished(dag));
Collections.shuffle(dag.getNodes());
Assert.assertTrue(DagProcUtils.isDagFinished(dag));
+ Assert.assertEquals(TimingEvent.FlowTimings.FLOW_SUCCEEDED,
DagProcUtils.calcFlowStatus(dag));
Dag<JobExecutionPlan> dag2 = buildComplexDag1();
setJobStatuses(dag2, Arrays.asList(COMPLETE, COMPLETE, COMPLETE, COMPLETE,
PENDING, COMPLETE, COMPLETE, PENDING, COMPLETE, PENDING));
Assert.assertFalse(DagProcUtils.isDagFinished(dag2));
Collections.shuffle(dag2.getNodes());
Assert.assertFalse(DagProcUtils.isDagFinished(dag2));
+ Assert.assertEquals(TimingEvent.FlowTimings.FLOW_RUNNING,
DagProcUtils.calcFlowStatus(dag2));
Dag<JobExecutionPlan> dag3 = buildComplexDag1();
setJobStatuses(dag3, Arrays.asList(FAILED, COMPLETE, COMPLETE, COMPLETE,
PENDING, COMPLETE, COMPLETE, PENDING, COMPLETE, PENDING));
Assert.assertTrue(DagProcUtils.isDagFinished(dag3));
Collections.shuffle(dag3.getNodes());
Assert.assertTrue(DagProcUtils.isDagFinished(dag3));
+ Assert.assertEquals(TimingEvent.FlowTimings.FLOW_FAILED,
DagProcUtils.calcFlowStatus(dag3));
Dag<JobExecutionPlan> dag4 = buildComplexDag1();
setJobStatuses(dag4, Arrays.asList(COMPLETE, COMPLETE, COMPLETE, COMPLETE,
COMPLETE, CANCELLED, COMPLETE, PENDING, PENDING, PENDING));
Assert.assertFalse(DagProcUtils.isDagFinished(dag4));
Collections.shuffle(dag4.getNodes());
Assert.assertFalse(DagProcUtils.isDagFinished(dag4));
+ Assert.assertEquals(TimingEvent.FlowTimings.FLOW_CANCELLED,
DagProcUtils.calcFlowStatus(dag4));
Dag<JobExecutionPlan> dag5 = buildComplexDag1();
setJobStatuses(dag5, Arrays.asList(COMPLETE, COMPLETE, COMPLETE, COMPLETE,
COMPLETE, CANCELLED, COMPLETE, COMPLETE, PENDING, PENDING));
Assert.assertTrue(DagProcUtils.isDagFinished(dag5));
Collections.shuffle(dag5.getNodes());
Assert.assertTrue(DagProcUtils.isDagFinished(dag5));
+ Assert.assertEquals(TimingEvent.FlowTimings.FLOW_CANCELLED,
DagProcUtils.calcFlowStatus(dag5));
Dag<JobExecutionPlan> dag6 = buildComplexDag1();
setJobStatuses(dag6, Arrays.asList(COMPLETE, COMPLETE, COMPLETE, COMPLETE,
COMPLETE, PENDING_RESUME, COMPLETE, COMPLETE, PENDING, PENDING));
Assert.assertFalse(DagProcUtils.isDagFinished(dag6));
Collections.shuffle(dag6.getNodes());
Assert.assertFalse(DagProcUtils.isDagFinished(dag6));
+ Assert.assertEquals(TimingEvent.FlowTimings.FLOW_PENDING_RESUME,
DagProcUtils.calcFlowStatus(dag6));
Dag<JobExecutionPlan> dag7 = buildComplexDag1();
setJobStatuses(dag7, Arrays.asList(COMPLETE, COMPLETE, COMPLETE, COMPLETE,
COMPLETE, PENDING_RETRY, COMPLETE, COMPLETE, PENDING, PENDING));
Assert.assertFalse(DagProcUtils.isDagFinished(dag7));
Collections.shuffle(dag7.getNodes());
Assert.assertFalse(DagProcUtils.isDagFinished(dag7));
+ Assert.assertEquals(TimingEvent.FlowTimings.FLOW_RUNNING,
DagProcUtils.calcFlowStatus(dag7));
Dag<JobExecutionPlan> dag8 = buildComplexDag1();
setJobStatuses(dag8, Arrays.asList(COMPLETE, COMPLETE, COMPLETE, COMPLETE,
COMPLETE, RUNNING, COMPLETE, COMPLETE, PENDING, PENDING));
Assert.assertFalse(DagProcUtils.isDagFinished(dag8));
Collections.shuffle(dag8.getNodes());
Assert.assertFalse(DagProcUtils.isDagFinished(dag8));
+ Assert.assertEquals(TimingEvent.FlowTimings.FLOW_RUNNING,
DagProcUtils.calcFlowStatus(dag8));
Dag<JobExecutionPlan> dag9 = buildComplexDag1();
setJobStatuses(dag9, Arrays.asList(COMPLETE, COMPLETE, COMPLETE, FAILED,
COMPLETE, COMPLETE, PENDING, COMPLETE, PENDING, COMPLETE));
Assert.assertFalse(DagProcUtils.isDagFinished(dag9));
Collections.shuffle(dag9.getNodes());
Assert.assertFalse(DagProcUtils.isDagFinished(dag9));
+ Assert.assertEquals(TimingEvent.FlowTimings.FLOW_FAILED,
DagProcUtils.calcFlowStatus(dag9));
}
@Test
- public void testIsDagFinishedWithFinishRunningFailureOptionTwoNodes() throws
URISyntaxException {
+ public void
testFlowStatusAndIsDagFinishedWithFinishRunningFailureOptionTwoNodes() throws
URISyntaxException {
Dag<JobExecutionPlan> dag =
DagManagerTest.buildDag(id, flowExecutionId, flowFailureOption, 2,
proxyUser, additionalConfig);
setJobStatuses(dag, Arrays.asList(FAILED, PENDING));
Assert.assertTrue(DagProcUtils.isDagFinished(dag));
+ Assert.assertEquals(TimingEvent.FlowTimings.FLOW_FAILED,
DagProcUtils.calcFlowStatus(dag));
setJobStatuses(dag, Arrays.asList(CANCELLED, PENDING));
Assert.assertTrue(DagProcUtils.isDagFinished(dag));
+ Assert.assertEquals(TimingEvent.FlowTimings.FLOW_CANCELLED,
DagProcUtils.calcFlowStatus(dag));
}
@Test
- public void testIsDagFinishedWithFinishRunningFailureOptionMultiNodes()
throws URISyntaxException {
+ public void
testFlowStatusAndIsDagFinishedWithFinishRunningFailureOptionMultiNodes() throws
URISyntaxException {
Dag<JobExecutionPlan> dag =
buildComplexDagWithFinishRunningFailureOption();
setJobStatuses(dag, Arrays.asList(COMPLETE, CANCELLED, COMPLETE, PENDING,
PENDING));
Assert.assertTrue(DagProcUtils.isDagFinished(dag));
+ Assert.assertEquals(TimingEvent.FlowTimings.FLOW_CANCELLED,
DagProcUtils.calcFlowStatus(dag));
setJobStatuses(dag, Arrays.asList(COMPLETE, CANCELLED, COMPLETE, RUNNING,
PENDING));
Assert.assertFalse(DagProcUtils.isDagFinished(dag));
+ Assert.assertEquals(TimingEvent.FlowTimings.FLOW_CANCELLED,
DagProcUtils.calcFlowStatus(dag));
}
private void setJobStatuses(Dag<JobExecutionPlan> dag, List<ExecutionStatus>
statuses) {