This is an automated email from the ASF dual-hosted git repository.

suvasude pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/incubator-gobblin.git


The following commit(s) were added to refs/heads/master by this push:
     new 4eb8a27  [GOBBLIN-868] Check flow status instead of job status to 
determine if flow is running
4eb8a27 is described below

commit 4eb8a276e718fb199ed75848657ec9ae4fdf456c
Author: Jack Moseley <[email protected]>
AuthorDate: Fri Sep 6 13:34:26 2019 -0700

    [GOBBLIN-868] Check flow status instead of job status to determine if flow 
is running
    
    Closes #2723 from jack-moseley/flowstatus-fix
---
 .../org/apache/gobblin/service/FlowStatusResource.java     | 10 +---------
 .../gobblin/service/monitoring/FlowStatusGenerator.java    |  8 ++++----
 .../gobblin/service/monitoring/JobStatusRetriever.java     |  9 +++++++++
 .../service/monitoring/FlowStatusGeneratorTest.java        | 14 ++++++++------
 4 files changed, 22 insertions(+), 19 deletions(-)

diff --git 
a/gobblin-restli/gobblin-flow-config-service/gobblin-flow-config-service-server/src/main/java/org/apache/gobblin/service/FlowStatusResource.java
 
b/gobblin-restli/gobblin-flow-config-service/gobblin-flow-config-service-server/src/main/java/org/apache/gobblin/service/FlowStatusResource.java
index 70957c6..9cd9508 100644
--- 
a/gobblin-restli/gobblin-flow-config-service/gobblin-flow-config-service-server/src/main/java/org/apache/gobblin/service/FlowStatusResource.java
+++ 
b/gobblin-restli/gobblin-flow-config-service/gobblin-flow-config-service-server/src/main/java/org/apache/gobblin/service/FlowStatusResource.java
@@ -117,7 +117,7 @@ public class FlowStatusResource extends 
ComplexKeyResourceTemplate<FlowStatusId,
       org.apache.gobblin.service.monitoring.JobStatus queriedJobStatus = 
jobStatusIter.next();
 
       // Check if this is the flow status instead of a single job status
-      if (isFlowStatus(queriedJobStatus)) {
+      if (JobStatusRetriever.isFlowStatus(queriedJobStatus)) {
         flowEndTime = queriedJobStatus.getEndTime();
         flowExecutionStatus = 
ExecutionStatus.valueOf(queriedJobStatus.getEventName());
         continue;
@@ -160,14 +160,6 @@ public class FlowStatusResource extends 
ComplexKeyResourceTemplate<FlowStatusId,
   }
 
   /**
-   * Check if a {@link org.apache.gobblin.service.monitoring.JobStatus} is the 
special job status that represents the
-   * entire flow's status
-   */
-  private static boolean 
isFlowStatus(org.apache.gobblin.service.monitoring.JobStatus jobStatus) {
-    return jobStatus.getJobName().equals(JobStatusRetriever.NA_KEY) && 
jobStatus.getJobGroup().equals(JobStatusRetriever.NA_KEY);
-  }
-
-  /**
    * Return the flow start time given a {@link 
org.apache.gobblin.service.monitoring.FlowStatus}. Flow execution ID is
    * assumed to be the flow start time.
    */
diff --git 
a/gobblin-runtime/src/main/java/org/apache/gobblin/service/monitoring/FlowStatusGenerator.java
 
b/gobblin-runtime/src/main/java/org/apache/gobblin/service/monitoring/FlowStatusGenerator.java
index f9e1005..b334c3e 100644
--- 
a/gobblin-runtime/src/main/java/org/apache/gobblin/service/monitoring/FlowStatusGenerator.java
+++ 
b/gobblin-runtime/src/main/java/org/apache/gobblin/service/monitoring/FlowStatusGenerator.java
@@ -34,7 +34,7 @@ import org.apache.gobblin.annotation.Alpha;
 @Alpha
 @Builder
 public class FlowStatusGenerator {
-  public static final List<String> FINISHED_JOB_STATUSES = 
Lists.newArrayList("FAILED", "COMPLETE", "CANCELLED");
+  public static final List<String> FINISHED_STATUSES = 
Lists.newArrayList("FAILED", "COMPLETE", "CANCELLED");
 
   private final JobStatusRetriever jobStatusRetriever;
 
@@ -94,8 +94,8 @@ public class FlowStatusGenerator {
 
       while (jobStatusIterator.hasNext()) {
         JobStatus jobStatus = jobStatusIterator.next();
-        if (isJobRunning(jobStatus)) {
-          return true;
+        if (JobStatusRetriever.isFlowStatus(jobStatus)) {
+          return isJobRunning(jobStatus);
         }
       }
       return false;
@@ -108,6 +108,6 @@ public class FlowStatusGenerator {
    */
   private boolean isJobRunning(JobStatus jobStatus) {
     String status = jobStatus.getEventName().toUpperCase();
-    return !FINISHED_JOB_STATUSES.contains(status);
+    return !FINISHED_STATUSES.contains(status);
   }
 }
diff --git 
a/gobblin-runtime/src/main/java/org/apache/gobblin/service/monitoring/JobStatusRetriever.java
 
b/gobblin-runtime/src/main/java/org/apache/gobblin/service/monitoring/JobStatusRetriever.java
index ebc48fe..09068d6 100644
--- 
a/gobblin-runtime/src/main/java/org/apache/gobblin/service/monitoring/JobStatusRetriever.java
+++ 
b/gobblin-runtime/src/main/java/org/apache/gobblin/service/monitoring/JobStatusRetriever.java
@@ -91,4 +91,13 @@ public abstract class JobStatusRetriever implements 
LatestFlowExecutionIdTracker
   }
 
   public abstract StateStore<State> getStateStore();
+
+  /**
+   * Check if a {@link org.apache.gobblin.service.monitoring.JobStatus} is the 
special job status that represents the
+   * entire flow's status
+   */
+  public static boolean 
isFlowStatus(org.apache.gobblin.service.monitoring.JobStatus jobStatus) {
+    return jobStatus.getJobName() != null && jobStatus.getJobGroup() != null
+        && jobStatus.getJobName().equals(JobStatusRetriever.NA_KEY) && 
jobStatus.getJobGroup().equals(JobStatusRetriever.NA_KEY);
+  }
 }
diff --git 
a/gobblin-runtime/src/test/java/org/apache/gobblin/service/monitoring/FlowStatusGeneratorTest.java
 
b/gobblin-runtime/src/test/java/org/apache/gobblin/service/monitoring/FlowStatusGeneratorTest.java
index e4bc4bd..4515e51 100644
--- 
a/gobblin-runtime/src/test/java/org/apache/gobblin/service/monitoring/FlowStatusGeneratorTest.java
+++ 
b/gobblin-runtime/src/test/java/org/apache/gobblin/service/monitoring/FlowStatusGeneratorTest.java
@@ -42,12 +42,12 @@ public class FlowStatusGeneratorTest {
     Mockito.when(jobStatusRetriever.getLatestExecutionIdsForFlow(flowName, 
flowGroup, 1)).thenReturn(
         Lists.newArrayList(flowExecutionId));
     JobStatus jobStatus = 
JobStatus.builder().flowGroup(flowGroup).flowName(flowName).flowExecutionId(flowExecutionId)
-        .jobName(JobStatusRetriever.NA_KEY).eventName("COMPILED").build();
+        
.jobName(JobStatusRetriever.NA_KEY).jobGroup(JobStatusRetriever.NA_KEY).eventName("COMPILED").build();
     Iterator<JobStatus> jobStatusIterator = 
Lists.newArrayList(jobStatus).iterator();
     Mockito.when(jobStatusRetriever.getJobStatusesForFlowExecution(flowName, 
flowGroup, flowExecutionId)).thenReturn(jobStatusIterator);
     Assert.assertTrue(flowStatusGenerator.isFlowRunning(flowName, flowGroup));
 
-    //A flow with 3 JobStatus-es with status COMPLETE, FAILED and CANCELLED => 
isFlowRunning() should return false.
+    //JobStatuses should be ignored, only the flow level status matters.
     String job1 = "job1";
     String job2 = "job2";
     String job3 = "job3";
@@ -57,13 +57,15 @@ public class FlowStatusGeneratorTest {
         .jobName(job2).eventName("FAILED").build();
     JobStatus jobStatus3 = 
JobStatus.builder().flowGroup(flowGroup).flowName(flowName).flowExecutionId(flowExecutionId)
         .jobName(job3).eventName("CANCELLED").build();
-    jobStatusIterator = Lists.newArrayList(jobStatus1, jobStatus2, 
jobStatus3).iterator();
+    JobStatus flowStatus = 
JobStatus.builder().flowGroup(flowGroup).flowName(flowName).flowExecutionId(flowExecutionId)
+        
.jobName(JobStatusRetriever.NA_KEY).jobGroup(JobStatusRetriever.NA_KEY).eventName("CANCELLED").build();
+    jobStatusIterator = Lists.newArrayList(jobStatus1, jobStatus2, jobStatus3, 
flowStatus).iterator();
     Mockito.when(jobStatusRetriever.getJobStatusesForFlowExecution(flowName, 
flowGroup, flowExecutionId)).thenReturn(jobStatusIterator);
     Assert.assertFalse(flowStatusGenerator.isFlowRunning(flowName, flowGroup));
 
-    jobStatus2 = 
JobStatus.builder().flowGroup(flowGroup).flowName(flowName).flowExecutionId(flowExecutionId)
-        .jobName(job2).eventName("RUNNING").build();
-    jobStatusIterator = Lists.newArrayList(jobStatus1, jobStatus2, 
jobStatus3).iterator();
+    flowStatus = 
JobStatus.builder().flowGroup(flowGroup).flowName(flowName).flowExecutionId(flowExecutionId)
+        
.jobName(JobStatusRetriever.NA_KEY).jobGroup(JobStatusRetriever.NA_KEY).eventName("RUNNING").build();
+    jobStatusIterator = Lists.newArrayList(jobStatus1, jobStatus2, jobStatus3, 
flowStatus).iterator();
     Mockito.when(jobStatusRetriever.getJobStatusesForFlowExecution(flowName, 
flowGroup, flowExecutionId)).thenReturn(jobStatusIterator);
     Assert.assertTrue(flowStatusGenerator.isFlowRunning(flowName, flowGroup));
   }

Reply via email to