This is an automated email from the ASF dual-hosted git repository.
suvasude pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/incubator-gobblin.git
The following commit(s) were added to refs/heads/master by this push:
new 4eb8a27 [GOBBLIN-868] Check flow status instead of job status to
determine if flow is running
4eb8a27 is described below
commit 4eb8a276e718fb199ed75848657ec9ae4fdf456c
Author: Jack Moseley <[email protected]>
AuthorDate: Fri Sep 6 13:34:26 2019 -0700
[GOBBLIN-868] Check flow status instead of job status to determine if flow
is running
Closes #2723 from jack-moseley/flowstatus-fix
---
.../org/apache/gobblin/service/FlowStatusResource.java | 10 +---------
.../gobblin/service/monitoring/FlowStatusGenerator.java | 8 ++++----
.../gobblin/service/monitoring/JobStatusRetriever.java | 9 +++++++++
.../service/monitoring/FlowStatusGeneratorTest.java | 14 ++++++++------
4 files changed, 22 insertions(+), 19 deletions(-)
diff --git
a/gobblin-restli/gobblin-flow-config-service/gobblin-flow-config-service-server/src/main/java/org/apache/gobblin/service/FlowStatusResource.java
b/gobblin-restli/gobblin-flow-config-service/gobblin-flow-config-service-server/src/main/java/org/apache/gobblin/service/FlowStatusResource.java
index 70957c6..9cd9508 100644
---
a/gobblin-restli/gobblin-flow-config-service/gobblin-flow-config-service-server/src/main/java/org/apache/gobblin/service/FlowStatusResource.java
+++
b/gobblin-restli/gobblin-flow-config-service/gobblin-flow-config-service-server/src/main/java/org/apache/gobblin/service/FlowStatusResource.java
@@ -117,7 +117,7 @@ public class FlowStatusResource extends
ComplexKeyResourceTemplate<FlowStatusId,
org.apache.gobblin.service.monitoring.JobStatus queriedJobStatus =
jobStatusIter.next();
// Check if this is the flow status instead of a single job status
- if (isFlowStatus(queriedJobStatus)) {
+ if (JobStatusRetriever.isFlowStatus(queriedJobStatus)) {
flowEndTime = queriedJobStatus.getEndTime();
flowExecutionStatus =
ExecutionStatus.valueOf(queriedJobStatus.getEventName());
continue;
@@ -160,14 +160,6 @@ public class FlowStatusResource extends
ComplexKeyResourceTemplate<FlowStatusId,
}
/**
- * Check if a {@link org.apache.gobblin.service.monitoring.JobStatus} is the
special job status that represents the
- * entire flow's status
- */
- private static boolean
isFlowStatus(org.apache.gobblin.service.monitoring.JobStatus jobStatus) {
- return jobStatus.getJobName().equals(JobStatusRetriever.NA_KEY) &&
jobStatus.getJobGroup().equals(JobStatusRetriever.NA_KEY);
- }
-
- /**
* Return the flow start time given a {@link
org.apache.gobblin.service.monitoring.FlowStatus}. Flow execution ID is
* assumed to be the flow start time.
*/
diff --git
a/gobblin-runtime/src/main/java/org/apache/gobblin/service/monitoring/FlowStatusGenerator.java
b/gobblin-runtime/src/main/java/org/apache/gobblin/service/monitoring/FlowStatusGenerator.java
index f9e1005..b334c3e 100644
---
a/gobblin-runtime/src/main/java/org/apache/gobblin/service/monitoring/FlowStatusGenerator.java
+++
b/gobblin-runtime/src/main/java/org/apache/gobblin/service/monitoring/FlowStatusGenerator.java
@@ -34,7 +34,7 @@ import org.apache.gobblin.annotation.Alpha;
@Alpha
@Builder
public class FlowStatusGenerator {
- public static final List<String> FINISHED_JOB_STATUSES =
Lists.newArrayList("FAILED", "COMPLETE", "CANCELLED");
+ public static final List<String> FINISHED_STATUSES =
Lists.newArrayList("FAILED", "COMPLETE", "CANCELLED");
private final JobStatusRetriever jobStatusRetriever;
@@ -94,8 +94,8 @@ public class FlowStatusGenerator {
while (jobStatusIterator.hasNext()) {
JobStatus jobStatus = jobStatusIterator.next();
- if (isJobRunning(jobStatus)) {
- return true;
+ if (JobStatusRetriever.isFlowStatus(jobStatus)) {
+ return isJobRunning(jobStatus);
}
}
return false;
@@ -108,6 +108,6 @@ public class FlowStatusGenerator {
*/
private boolean isJobRunning(JobStatus jobStatus) {
String status = jobStatus.getEventName().toUpperCase();
- return !FINISHED_JOB_STATUSES.contains(status);
+ return !FINISHED_STATUSES.contains(status);
}
}
diff --git
a/gobblin-runtime/src/main/java/org/apache/gobblin/service/monitoring/JobStatusRetriever.java
b/gobblin-runtime/src/main/java/org/apache/gobblin/service/monitoring/JobStatusRetriever.java
index ebc48fe..09068d6 100644
---
a/gobblin-runtime/src/main/java/org/apache/gobblin/service/monitoring/JobStatusRetriever.java
+++
b/gobblin-runtime/src/main/java/org/apache/gobblin/service/monitoring/JobStatusRetriever.java
@@ -91,4 +91,13 @@ public abstract class JobStatusRetriever implements
LatestFlowExecutionIdTracker
}
public abstract StateStore<State> getStateStore();
+
+ /**
+ * Check if a {@link org.apache.gobblin.service.monitoring.JobStatus} is the
special job status that represents the
+ * entire flow's status
+ */
+ public static boolean
isFlowStatus(org.apache.gobblin.service.monitoring.JobStatus jobStatus) {
+ return jobStatus.getJobName() != null && jobStatus.getJobGroup() != null
+ && jobStatus.getJobName().equals(JobStatusRetriever.NA_KEY) &&
jobStatus.getJobGroup().equals(JobStatusRetriever.NA_KEY);
+ }
}
diff --git
a/gobblin-runtime/src/test/java/org/apache/gobblin/service/monitoring/FlowStatusGeneratorTest.java
b/gobblin-runtime/src/test/java/org/apache/gobblin/service/monitoring/FlowStatusGeneratorTest.java
index e4bc4bd..4515e51 100644
---
a/gobblin-runtime/src/test/java/org/apache/gobblin/service/monitoring/FlowStatusGeneratorTest.java
+++
b/gobblin-runtime/src/test/java/org/apache/gobblin/service/monitoring/FlowStatusGeneratorTest.java
@@ -42,12 +42,12 @@ public class FlowStatusGeneratorTest {
Mockito.when(jobStatusRetriever.getLatestExecutionIdsForFlow(flowName,
flowGroup, 1)).thenReturn(
Lists.newArrayList(flowExecutionId));
JobStatus jobStatus =
JobStatus.builder().flowGroup(flowGroup).flowName(flowName).flowExecutionId(flowExecutionId)
- .jobName(JobStatusRetriever.NA_KEY).eventName("COMPILED").build();
+
.jobName(JobStatusRetriever.NA_KEY).jobGroup(JobStatusRetriever.NA_KEY).eventName("COMPILED").build();
Iterator<JobStatus> jobStatusIterator =
Lists.newArrayList(jobStatus).iterator();
Mockito.when(jobStatusRetriever.getJobStatusesForFlowExecution(flowName,
flowGroup, flowExecutionId)).thenReturn(jobStatusIterator);
Assert.assertTrue(flowStatusGenerator.isFlowRunning(flowName, flowGroup));
- //A flow with 3 JobStatus-es with status COMPLETE, FAILED and CANCELLED =>
isFlowRunning() should return false.
+ //JobStatuses should be ignored, only the flow level status matters.
String job1 = "job1";
String job2 = "job2";
String job3 = "job3";
@@ -57,13 +57,15 @@ public class FlowStatusGeneratorTest {
.jobName(job2).eventName("FAILED").build();
JobStatus jobStatus3 =
JobStatus.builder().flowGroup(flowGroup).flowName(flowName).flowExecutionId(flowExecutionId)
.jobName(job3).eventName("CANCELLED").build();
- jobStatusIterator = Lists.newArrayList(jobStatus1, jobStatus2,
jobStatus3).iterator();
+ JobStatus flowStatus =
JobStatus.builder().flowGroup(flowGroup).flowName(flowName).flowExecutionId(flowExecutionId)
+
.jobName(JobStatusRetriever.NA_KEY).jobGroup(JobStatusRetriever.NA_KEY).eventName("CANCELLED").build();
+ jobStatusIterator = Lists.newArrayList(jobStatus1, jobStatus2, jobStatus3,
flowStatus).iterator();
Mockito.when(jobStatusRetriever.getJobStatusesForFlowExecution(flowName,
flowGroup, flowExecutionId)).thenReturn(jobStatusIterator);
Assert.assertFalse(flowStatusGenerator.isFlowRunning(flowName, flowGroup));
- jobStatus2 =
JobStatus.builder().flowGroup(flowGroup).flowName(flowName).flowExecutionId(flowExecutionId)
- .jobName(job2).eventName("RUNNING").build();
- jobStatusIterator = Lists.newArrayList(jobStatus1, jobStatus2,
jobStatus3).iterator();
+ flowStatus =
JobStatus.builder().flowGroup(flowGroup).flowName(flowName).flowExecutionId(flowExecutionId)
+
.jobName(JobStatusRetriever.NA_KEY).jobGroup(JobStatusRetriever.NA_KEY).eventName("RUNNING").build();
+ jobStatusIterator = Lists.newArrayList(jobStatus1, jobStatus2, jobStatus3,
flowStatus).iterator();
Mockito.when(jobStatusRetriever.getJobStatusesForFlowExecution(flowName,
flowGroup, flowExecutionId)).thenReturn(jobStatusIterator);
Assert.assertTrue(flowStatusGenerator.isFlowRunning(flowName, flowGroup));
}