This is an automated email from the ASF dual-hosted git repository.

arjun4084346 pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/gobblin.git


The following commit(s) were added to refs/heads/master by this push:
     new f40bb446b3 [GOBBLIN-2150] calculate flow status correctly (#4048)
f40bb446b3 is described below

commit f40bb446b3943f9eb95055a4544b387993672f87
Author: Arjun Singh Bora <[email protected]>
AuthorDate: Wed Sep 4 18:53:02 2024 -0700

    [GOBBLIN-2150] calculate flow status correctly (#4048)
    
    * calculate flow status correctly
    * address review comments
---
 .../modules/orchestration/proc/DagProcUtils.java   | 22 +++++++++-
 .../orchestration/proc/ReevaluateDagProc.java      |  9 +----
 .../modules/orchestration/DagManagerUtilsTest.java | 47 ++++++++++++++++++----
 3 files changed, 62 insertions(+), 16 deletions(-)

diff --git 
a/gobblin-service/src/main/java/org/apache/gobblin/service/modules/orchestration/proc/DagProcUtils.java
 
b/gobblin-service/src/main/java/org/apache/gobblin/service/modules/orchestration/proc/DagProcUtils.java
index 71693705fc..da144c28cd 100644
--- 
a/gobblin-service/src/main/java/org/apache/gobblin/service/modules/orchestration/proc/DagProcUtils.java
+++ 
b/gobblin-service/src/main/java/org/apache/gobblin/service/modules/orchestration/proc/DagProcUtils.java
@@ -27,6 +27,7 @@ import java.util.Properties;
 import java.util.Set;
 import java.util.concurrent.Future;
 import java.util.concurrent.TimeUnit;
+import java.util.stream.Collectors;
 
 import com.google.common.base.Optional;
 import com.google.common.collect.Maps;
@@ -174,7 +175,7 @@ public class DagProcUtils {
         log.warn("No Job future when canceling DAG node - {}", 
dagNodeToCancel.getValue().getId());
       }
       
DagManagerUtils.getSpecProducer(dagNodeToCancel).cancelJob(dagNodeToCancel.getValue().getJobSpec().getUri(),
 cancelJobArgs).get();
-      sendCancellationEvent(dagNodeToCancel);
+      sendJobCancellationEvent(dagNodeToCancel);
       log.info("Cancelled dag node {}, spec_producer_future {}", 
dagNodeToCancel.getValue().getId(), serializedFuture);
     } catch (Exception e) {
       throw new IOException(e);
@@ -190,7 +191,7 @@ public class DagProcUtils {
     }
   }
 
-  private static void sendCancellationEvent(Dag.DagNode<JobExecutionPlan> 
dagNodeToCancel) {
+  private static void sendJobCancellationEvent(Dag.DagNode<JobExecutionPlan> 
dagNodeToCancel) {
     JobExecutionPlan jobExecutionPlan = dagNodeToCancel.getValue();
     Map<String, String> jobMetadata = 
TimingEventUtils.getJobMetadata(Maps.newHashMap(), jobExecutionPlan);
     
DagProc.eventSubmitter.getTimingEvent(TimingEvent.LauncherTimings.JOB_CANCEL).stop(jobMetadata);
@@ -357,4 +358,21 @@ public class DagProcUtils {
       Set<Dag.DagNode<JobExecutionPlan>> canRun) {
     return node.getParentNodes() == null || 
canRun.containsAll(node.getParentNodes());
   }
+
+  public static String calcFlowStatus(Dag<JobExecutionPlan> dag) {
+    Set<ExecutionStatus> jobsStatuses = dag.getNodes().stream().map(node -> 
node.getValue().getExecutionStatus())
+        .collect(Collectors.toSet());
+
+    if (jobsStatuses.contains(FAILED)) {
+      return TimingEvent.FlowTimings.FLOW_FAILED;
+    } else if (jobsStatuses.contains(CANCELLED)) {
+      return TimingEvent.FlowTimings.FLOW_CANCELLED;
+    } else if (jobsStatuses.contains(PENDING_RESUME)) {
+      return TimingEvent.FlowTimings.FLOW_PENDING_RESUME;
+    } else if (jobsStatuses.stream().allMatch(jobStatus -> jobStatus == 
COMPLETE)) {
+      return TimingEvent.FlowTimings.FLOW_SUCCEEDED;
+    } else {
+      return TimingEvent.FlowTimings.FLOW_RUNNING;
+    }
+  }
 }
diff --git 
a/gobblin-service/src/main/java/org/apache/gobblin/service/modules/orchestration/proc/ReevaluateDagProc.java
 
b/gobblin-service/src/main/java/org/apache/gobblin/service/modules/orchestration/proc/ReevaluateDagProc.java
index cdfa3d0509..a4afe4e198 100644
--- 
a/gobblin-service/src/main/java/org/apache/gobblin/service/modules/orchestration/proc/ReevaluateDagProc.java
+++ 
b/gobblin-service/src/main/java/org/apache/gobblin/service/modules/orchestration/proc/ReevaluateDagProc.java
@@ -115,13 +115,8 @@ public class ReevaluateDagProc extends 
DagProc<Pair<Optional<Dag.DagNode<JobExec
       dag.setFlowEvent(null);
       DagProcUtils.submitJobToExecutor(dagManagementStateStore, dagNode, 
getDagId());
     } else if (DagProcUtils.isDagFinished(dag)) {
-      if (dag.getFlowEvent() == null) {
-        // If the dag flow event is not set and there are no more jobs 
running, then it is successful
-        // also note that `onJobFinish` method does whatever is required to do 
after job finish, determining a Dag's
-        // status is not possible on individual job's finish status
-        dag.setFlowEvent(TimingEvent.FlowTimings.FLOW_SUCCEEDED);
-      }
-      String flowEvent = dag.getFlowEvent();
+      String flowEvent = DagProcUtils.calcFlowStatus(dag);
+      dag.setFlowEvent(flowEvent);
       DagProcUtils.setAndEmitFlowEvent(eventSubmitter, dag, flowEvent);
       if (flowEvent.equals(TimingEvent.FlowTimings.FLOW_SUCCEEDED)) {
         // todo - verify if work from PR#3641 is required
diff --git 
a/gobblin-service/src/test/java/org/apache/gobblin/service/modules/orchestration/DagManagerUtilsTest.java
 
b/gobblin-service/src/test/java/org/apache/gobblin/service/modules/orchestration/DagManagerUtilsTest.java
index ab6509e30d..3e3d41dbe8 100644
--- 
a/gobblin-service/src/test/java/org/apache/gobblin/service/modules/orchestration/DagManagerUtilsTest.java
+++ 
b/gobblin-service/src/test/java/org/apache/gobblin/service/modules/orchestration/DagManagerUtilsTest.java
@@ -33,6 +33,7 @@ import com.typesafe.config.ConfigValueFactory;
 
 import org.apache.gobblin.config.ConfigBuilder;
 import org.apache.gobblin.configuration.ConfigurationKeys;
+import org.apache.gobblin.metrics.event.TimingEvent;
 import org.apache.gobblin.runtime.api.JobSpec;
 import org.apache.gobblin.runtime.api.SpecExecutor;
 import org.apache.gobblin.runtime.spec_executorInstance.MockedSpecExecutor;
@@ -73,162 +74,194 @@ public class DagManagerUtilsTest {
   }
 
   @Test
-  public void testIsDagFinishedSingleNode() throws URISyntaxException {
+  public void testFlowStatusAndIsDagFinishedSingleNode() throws 
URISyntaxException {
     Dag<JobExecutionPlan> dag =
         DagManagerTest.buildDag(id, flowExecutionId, flowFailureOption, 1, 
proxyUser, additionalConfig);
 
     setJobStatuses(dag, Collections.singletonList(COMPLETE));
     Assert.assertTrue(DagProcUtils.isDagFinished(dag));
+    Assert.assertEquals(TimingEvent.FlowTimings.FLOW_SUCCEEDED, 
DagProcUtils.calcFlowStatus(dag));
 
     setJobStatuses(dag, Collections.singletonList(FAILED));
     Assert.assertTrue(DagProcUtils.isDagFinished(dag));
+    Assert.assertEquals(TimingEvent.FlowTimings.FLOW_FAILED, 
DagProcUtils.calcFlowStatus(dag));
 
     setJobStatuses(dag, Collections.singletonList(CANCELLED));
     Assert.assertTrue(DagProcUtils.isDagFinished(dag));
+    Assert.assertEquals(TimingEvent.FlowTimings.FLOW_CANCELLED, 
DagProcUtils.calcFlowStatus(dag));
 
     setJobStatuses(dag, Collections.singletonList(PENDING));
     Assert.assertFalse(DagProcUtils.isDagFinished(dag));
+    Assert.assertEquals(TimingEvent.FlowTimings.FLOW_RUNNING, 
DagProcUtils.calcFlowStatus(dag));
 
     setJobStatuses(dag, Collections.singletonList(PENDING_RETRY));
     Assert.assertFalse(DagProcUtils.isDagFinished(dag));
+    Assert.assertEquals(TimingEvent.FlowTimings.FLOW_RUNNING, 
DagProcUtils.calcFlowStatus(dag));
 
     setJobStatuses(dag, Collections.singletonList(PENDING_RESUME));
     Assert.assertFalse(DagProcUtils.isDagFinished(dag));
+    Assert.assertEquals(TimingEvent.FlowTimings.FLOW_PENDING_RESUME, 
DagProcUtils.calcFlowStatus(dag));
 
     setJobStatuses(dag, Collections.singletonList(ORCHESTRATED));
     Assert.assertFalse(DagProcUtils.isDagFinished(dag));
+    Assert.assertEquals(TimingEvent.FlowTimings.FLOW_RUNNING, 
DagProcUtils.calcFlowStatus(dag));
 
     setJobStatuses(dag, Collections.singletonList(RUNNING));
     Assert.assertFalse(DagProcUtils.isDagFinished(dag));
+    Assert.assertEquals(TimingEvent.FlowTimings.FLOW_RUNNING, 
DagProcUtils.calcFlowStatus(dag));
   }
 
   @Test
-  public void testIsDagFinishedTwoNodes() throws URISyntaxException {
+  public void testFlowStatusAndIsDagFinishedTwoNodes() throws 
URISyntaxException {
     Dag<JobExecutionPlan> dag =
         DagManagerTest.buildDag(id, flowExecutionId, flowFailureOption, 2, 
proxyUser, additionalConfig);
 
     setJobStatuses(dag, Arrays.asList(COMPLETE, PENDING));
     Assert.assertFalse(DagProcUtils.isDagFinished(dag));
+    Assert.assertEquals(TimingEvent.FlowTimings.FLOW_RUNNING, 
DagProcUtils.calcFlowStatus(dag));
 
     setJobStatuses(dag, Arrays.asList(COMPLETE, FAILED));
     Assert.assertTrue(DagProcUtils.isDagFinished(dag));
+    Assert.assertEquals(TimingEvent.FlowTimings.FLOW_FAILED, 
DagProcUtils.calcFlowStatus(dag));
 
     setJobStatuses(dag, Arrays.asList(FAILED, PENDING));
     Assert.assertTrue(DagProcUtils.isDagFinished(dag));
+    Assert.assertEquals(TimingEvent.FlowTimings.FLOW_FAILED, 
DagProcUtils.calcFlowStatus(dag));
 
     setJobStatuses(dag, Arrays.asList(CANCELLED, PENDING));
     Assert.assertTrue(DagProcUtils.isDagFinished(dag));
+    Assert.assertEquals(TimingEvent.FlowTimings.FLOW_CANCELLED, 
DagProcUtils.calcFlowStatus(dag));
   }
 
   @Test
-  public void testIsDagFinishedThreeNodes() throws URISyntaxException {
+  public void testFlowStatusAndIsDagFinishedThreeNodes() throws 
URISyntaxException {
     Dag<JobExecutionPlan> dag = buildComplexDag3();
 
     setJobStatuses(dag, Arrays.asList(COMPLETE, PENDING, PENDING));
     Assert.assertFalse(DagProcUtils.isDagFinished(dag));
+    Assert.assertEquals(TimingEvent.FlowTimings.FLOW_RUNNING, 
DagProcUtils.calcFlowStatus(dag));
 
     setJobStatuses(dag, Arrays.asList(COMPLETE, FAILED, PENDING));
     Assert.assertTrue(DagProcUtils.isDagFinished(dag));
+    Assert.assertEquals(TimingEvent.FlowTimings.FLOW_FAILED, 
DagProcUtils.calcFlowStatus(dag));
 
     setJobStatuses(dag, Arrays.asList(COMPLETE, CANCELLED, PENDING));
     Assert.assertTrue(DagProcUtils.isDagFinished(dag));
+    Assert.assertEquals(TimingEvent.FlowTimings.FLOW_CANCELLED, 
DagProcUtils.calcFlowStatus(dag));
   }
 
   @Test
-  public void testIsDagFinishedFourNodes() throws URISyntaxException {
+  public void testFlowStatusAndIsDagFinishedFourNodes() throws 
URISyntaxException {
     Dag<JobExecutionPlan> dag = buildLinearDagOf4Nodes();
 
     setJobStatuses(dag, Arrays.asList(COMPLETE, PENDING, PENDING, PENDING));
     Assert.assertFalse(DagProcUtils.isDagFinished(dag));
+    Assert.assertEquals(TimingEvent.FlowTimings.FLOW_RUNNING, 
DagProcUtils.calcFlowStatus(dag));
 
     setJobStatuses(dag, Arrays.asList(FAILED, PENDING, PENDING, PENDING));
     Assert.assertTrue(DagProcUtils.isDagFinished(dag));
+    Assert.assertEquals(TimingEvent.FlowTimings.FLOW_FAILED, 
DagProcUtils.calcFlowStatus(dag));
 
     setJobStatuses(dag, Arrays.asList(CANCELLED, PENDING, PENDING, PENDING));
     Assert.assertTrue(DagProcUtils.isDagFinished(dag));
+    Assert.assertEquals(TimingEvent.FlowTimings.FLOW_CANCELLED, 
DagProcUtils.calcFlowStatus(dag));
 
     setJobStatuses(dag, Arrays.asList(PENDING, PENDING, PENDING, PENDING));
     Assert.assertFalse(DagProcUtils.isDagFinished(dag));
+    Assert.assertEquals(TimingEvent.FlowTimings.FLOW_RUNNING, 
DagProcUtils.calcFlowStatus(dag));
   }
 
   @Test
-  public void testIsDagFinishedMultiNodes() throws URISyntaxException {
+  public void testFlowStatusAndIsDagFinishedMultiNodes() throws 
URISyntaxException {
     Dag<JobExecutionPlan> dag = buildComplexDag1();
     setJobStatuses(dag, Arrays.asList(COMPLETE, COMPLETE, COMPLETE, COMPLETE, 
COMPLETE, COMPLETE, COMPLETE, COMPLETE, COMPLETE, COMPLETE));
     Assert.assertTrue(DagProcUtils.isDagFinished(dag));
     Collections.shuffle(dag.getNodes());
     Assert.assertTrue(DagProcUtils.isDagFinished(dag));
+    Assert.assertEquals(TimingEvent.FlowTimings.FLOW_SUCCEEDED, 
DagProcUtils.calcFlowStatus(dag));
 
     Dag<JobExecutionPlan> dag2 = buildComplexDag1();
     setJobStatuses(dag2, Arrays.asList(COMPLETE, COMPLETE, COMPLETE, COMPLETE, 
PENDING, COMPLETE, COMPLETE, PENDING, COMPLETE, PENDING));
     Assert.assertFalse(DagProcUtils.isDagFinished(dag2));
     Collections.shuffle(dag2.getNodes());
     Assert.assertFalse(DagProcUtils.isDagFinished(dag2));
+    Assert.assertEquals(TimingEvent.FlowTimings.FLOW_RUNNING, 
DagProcUtils.calcFlowStatus(dag2));
 
     Dag<JobExecutionPlan> dag3 = buildComplexDag1();
     setJobStatuses(dag3, Arrays.asList(FAILED, COMPLETE, COMPLETE, COMPLETE, 
PENDING, COMPLETE, COMPLETE, PENDING, COMPLETE, PENDING));
     Assert.assertTrue(DagProcUtils.isDagFinished(dag3));
     Collections.shuffle(dag3.getNodes());
     Assert.assertTrue(DagProcUtils.isDagFinished(dag3));
+    Assert.assertEquals(TimingEvent.FlowTimings.FLOW_FAILED, 
DagProcUtils.calcFlowStatus(dag3));
 
     Dag<JobExecutionPlan> dag4 = buildComplexDag1();
     setJobStatuses(dag4, Arrays.asList(COMPLETE, COMPLETE, COMPLETE, COMPLETE, 
COMPLETE, CANCELLED, COMPLETE, PENDING, PENDING, PENDING));
     Assert.assertFalse(DagProcUtils.isDagFinished(dag4));
     Collections.shuffle(dag4.getNodes());
     Assert.assertFalse(DagProcUtils.isDagFinished(dag4));
+    Assert.assertEquals(TimingEvent.FlowTimings.FLOW_CANCELLED, 
DagProcUtils.calcFlowStatus(dag4));
 
     Dag<JobExecutionPlan> dag5 = buildComplexDag1();
     setJobStatuses(dag5, Arrays.asList(COMPLETE, COMPLETE, COMPLETE, COMPLETE, 
COMPLETE, CANCELLED, COMPLETE, COMPLETE, PENDING, PENDING));
     Assert.assertTrue(DagProcUtils.isDagFinished(dag5));
     Collections.shuffle(dag5.getNodes());
     Assert.assertTrue(DagProcUtils.isDagFinished(dag5));
+    Assert.assertEquals(TimingEvent.FlowTimings.FLOW_CANCELLED, 
DagProcUtils.calcFlowStatus(dag5));
 
     Dag<JobExecutionPlan> dag6 = buildComplexDag1();
     setJobStatuses(dag6, Arrays.asList(COMPLETE, COMPLETE, COMPLETE, COMPLETE, 
COMPLETE, PENDING_RESUME, COMPLETE, COMPLETE, PENDING, PENDING));
     Assert.assertFalse(DagProcUtils.isDagFinished(dag6));
     Collections.shuffle(dag6.getNodes());
     Assert.assertFalse(DagProcUtils.isDagFinished(dag6));
+    Assert.assertEquals(TimingEvent.FlowTimings.FLOW_PENDING_RESUME, 
DagProcUtils.calcFlowStatus(dag6));
 
     Dag<JobExecutionPlan> dag7 = buildComplexDag1();
     setJobStatuses(dag7, Arrays.asList(COMPLETE, COMPLETE, COMPLETE, COMPLETE, 
COMPLETE, PENDING_RETRY, COMPLETE, COMPLETE, PENDING, PENDING));
     Assert.assertFalse(DagProcUtils.isDagFinished(dag7));
     Collections.shuffle(dag7.getNodes());
     Assert.assertFalse(DagProcUtils.isDagFinished(dag7));
+    Assert.assertEquals(TimingEvent.FlowTimings.FLOW_RUNNING, 
DagProcUtils.calcFlowStatus(dag7));
 
     Dag<JobExecutionPlan> dag8 = buildComplexDag1();
     setJobStatuses(dag8, Arrays.asList(COMPLETE, COMPLETE, COMPLETE, COMPLETE, 
COMPLETE, RUNNING, COMPLETE, COMPLETE, PENDING, PENDING));
     Assert.assertFalse(DagProcUtils.isDagFinished(dag8));
     Collections.shuffle(dag8.getNodes());
     Assert.assertFalse(DagProcUtils.isDagFinished(dag8));
+    Assert.assertEquals(TimingEvent.FlowTimings.FLOW_RUNNING, 
DagProcUtils.calcFlowStatus(dag8));
 
     Dag<JobExecutionPlan> dag9 = buildComplexDag1();
     setJobStatuses(dag9, Arrays.asList(COMPLETE, COMPLETE, COMPLETE, FAILED, 
COMPLETE, COMPLETE, PENDING, COMPLETE, PENDING, COMPLETE));
     Assert.assertFalse(DagProcUtils.isDagFinished(dag9));
     Collections.shuffle(dag9.getNodes());
     Assert.assertFalse(DagProcUtils.isDagFinished(dag9));
+    Assert.assertEquals(TimingEvent.FlowTimings.FLOW_FAILED, 
DagProcUtils.calcFlowStatus(dag9));
   }
 
   @Test
-  public void testIsDagFinishedWithFinishRunningFailureOptionTwoNodes() throws 
URISyntaxException {
+  public void 
testFlowStatusAndIsDagFinishedWithFinishRunningFailureOptionTwoNodes() throws 
URISyntaxException {
     Dag<JobExecutionPlan> dag =
         DagManagerTest.buildDag(id, flowExecutionId, flowFailureOption, 2, 
proxyUser, additionalConfig);
 
     setJobStatuses(dag, Arrays.asList(FAILED, PENDING));
     Assert.assertTrue(DagProcUtils.isDagFinished(dag));
+    Assert.assertEquals(TimingEvent.FlowTimings.FLOW_FAILED, 
DagProcUtils.calcFlowStatus(dag));
 
     setJobStatuses(dag, Arrays.asList(CANCELLED, PENDING));
     Assert.assertTrue(DagProcUtils.isDagFinished(dag));
+    Assert.assertEquals(TimingEvent.FlowTimings.FLOW_CANCELLED, 
DagProcUtils.calcFlowStatus(dag));
   }
 
   @Test
-  public void testIsDagFinishedWithFinishRunningFailureOptionMultiNodes() 
throws URISyntaxException {
+  public void 
testFlowStatusAndIsDagFinishedWithFinishRunningFailureOptionMultiNodes() throws 
URISyntaxException {
     Dag<JobExecutionPlan> dag = 
buildComplexDagWithFinishRunningFailureOption();
 
     setJobStatuses(dag, Arrays.asList(COMPLETE, CANCELLED, COMPLETE, PENDING, 
PENDING));
     Assert.assertTrue(DagProcUtils.isDagFinished(dag));
+    Assert.assertEquals(TimingEvent.FlowTimings.FLOW_CANCELLED, 
DagProcUtils.calcFlowStatus(dag));
 
     setJobStatuses(dag, Arrays.asList(COMPLETE, CANCELLED, COMPLETE, RUNNING, 
PENDING));
     Assert.assertFalse(DagProcUtils.isDagFinished(dag));
+    Assert.assertEquals(TimingEvent.FlowTimings.FLOW_CANCELLED, 
DagProcUtils.calcFlowStatus(dag));
   }
 
   private void setJobStatuses(Dag<JobExecutionPlan> dag, List<ExecutionStatus> 
statuses) {

Reply via email to