Repository: incubator-gobblin
Updated Branches:
  refs/heads/master bac980328 -> 391615e72


[GOBBLIN-597] avoid submitting gaas job if it is already running

Closes #2463 from
arjun4084346/skipDuplicateGaasJob


Project: http://git-wip-us.apache.org/repos/asf/incubator-gobblin/repo
Commit: http://git-wip-us.apache.org/repos/asf/incubator-gobblin/commit/391615e7
Tree: http://git-wip-us.apache.org/repos/asf/incubator-gobblin/tree/391615e7
Diff: http://git-wip-us.apache.org/repos/asf/incubator-gobblin/diff/391615e7

Branch: refs/heads/master
Commit: 391615e72e5f7bdd78034447881a3e325a0288e6
Parents: bac9803
Author: Arjun <[email protected]>
Authored: Mon Oct 1 21:20:23 2018 -0700
Committer: Hung Tran <[email protected]>
Committed: Mon Oct 1 21:20:23 2018 -0700

----------------------------------------------------------------------
 .../apache/gobblin/service/ExecutionStatus.pdsc | 12 +++--
 ...e.gobblin.service.flowstatuses.snapshot.json |  5 +-
 .../apache/gobblin/service/FlowConfigTest.java  |  2 +-
 .../apache/gobblin/service/FlowStatusTest.java  | 24 ++++++----
 .../service/FlowConfigResourceLocalHandler.java | 11 ++++-
 .../gobblin/service/FlowConfigsResource.java    |  3 +-
 .../gobblin/service/FlowConfigsV2Resource.java  |  3 +-
 .../gobblin/service/FlowStatusResource.java     | 50 +++++++-------------
 .../service/monitoring/FlowStatusGenerator.java |  7 +--
 .../service/monitoring/JobStatusRetriever.java  |  5 +-
 .../modules/core/GobblinServiceManager.java     |  5 +-
 11 files changed, 67 insertions(+), 60 deletions(-)
----------------------------------------------------------------------


http://git-wip-us.apache.org/repos/asf/incubator-gobblin/blob/391615e7/gobblin-restli/gobblin-flow-config-service/gobblin-flow-config-service-api/src/main/pegasus/org/apache/gobblin/service/ExecutionStatus.pdsc
----------------------------------------------------------------------
diff --git 
a/gobblin-restli/gobblin-flow-config-service/gobblin-flow-config-service-api/src/main/pegasus/org/apache/gobblin/service/ExecutionStatus.pdsc
 
b/gobblin-restli/gobblin-flow-config-service/gobblin-flow-config-service-api/src/main/pegasus/org/apache/gobblin/service/ExecutionStatus.pdsc
index 29d6a02..c87c484 100644
--- 
a/gobblin-restli/gobblin-flow-config-service/gobblin-flow-config-service-api/src/main/pegasus/org/apache/gobblin/service/ExecutionStatus.pdsc
+++ 
b/gobblin-restli/gobblin-flow-config-service/gobblin-flow-config-service-api/src/main/pegasus/org/apache/gobblin/service/ExecutionStatus.pdsc
@@ -3,7 +3,13 @@
   "name" : "ExecutionStatus",
   "namespace" : "org.apache.gobblin.service",
   "doc" : "Execution status for a flow or job",
-  "symbols" : ["RUNNING", "FAILED", "COMPLETE"],
-  "symbolDocs" : {"RUNNING":"Flow or job is currently executing", 
"FAILED":"Flow or job failed",
-    "COMPLETE":"Flow or job completed execution"}
+  "symbols" : ["COMPILED", "ORCHESTRATED", "RUNNING", "FAILED", "CANCELLED", 
"COMPLETE"],
+  "symbolDocs" : {
+    "COMPILED":"Flow compiled to jobs.",
+    "ORCHESTRATED":"Job(s) orchestrated to spec executors.",
+    "RUNNING": "Flow or job is currently executing",
+    "FAILED":"Flow or job failed",
+    "CANCELLED":"Flow cancelled.",
+    "COMPLETE":"Flow or job completed execution"
+  }
 }

http://git-wip-us.apache.org/repos/asf/incubator-gobblin/blob/391615e7/gobblin-restli/gobblin-flow-config-service/gobblin-flow-config-service-api/src/main/snapshot/org.apache.gobblin.service.flowstatuses.snapshot.json
----------------------------------------------------------------------
diff --git 
a/gobblin-restli/gobblin-flow-config-service/gobblin-flow-config-service-api/src/main/snapshot/org.apache.gobblin.service.flowstatuses.snapshot.json
 
b/gobblin-restli/gobblin-flow-config-service/gobblin-flow-config-service-api/src/main/snapshot/org.apache.gobblin.service.flowstatuses.snapshot.json
index 99b6a1f..8912336 100644
--- 
a/gobblin-restli/gobblin-flow-config-service/gobblin-flow-config-service-api/src/main/snapshot/org.apache.gobblin.service.flowstatuses.snapshot.json
+++ 
b/gobblin-restli/gobblin-flow-config-service/gobblin-flow-config-service-api/src/main/snapshot/org.apache.gobblin.service.flowstatuses.snapshot.json
@@ -36,10 +36,13 @@
     "name" : "ExecutionStatus",
     "namespace" : "org.apache.gobblin.service",
     "doc" : "Execution status for a flow or job",
-    "symbols" : [ "RUNNING", "FAILED", "COMPLETE" ],
+    "symbols" : [ "COMPILED", "ORCHESTRATED", "RUNNING", "FAILED", 
"CANCELLED", "COMPLETE" ],
     "symbolDocs" : {
+      "COMPILED" : "Flow compiled to jobs.",
+      "ORCHESTRATED" : "Job(s) orchestrated to spec executors.",
       "RUNNING" : "Flow or job is currently executing",
       "FAILED" : "Flow or job failed",
+      "CANCELLED" : "Flow cancelled.",
       "COMPLETE" : "Flow or job completed execution"
     }
   }, {

http://git-wip-us.apache.org/repos/asf/incubator-gobblin/blob/391615e7/gobblin-restli/gobblin-flow-config-service/gobblin-flow-config-service-client/src/test/java/org/apache/gobblin/service/FlowConfigTest.java
----------------------------------------------------------------------
diff --git 
a/gobblin-restli/gobblin-flow-config-service/gobblin-flow-config-service-client/src/test/java/org/apache/gobblin/service/FlowConfigTest.java
 
b/gobblin-restli/gobblin-flow-config-service/gobblin-flow-config-service-client/src/test/java/org/apache/gobblin/service/FlowConfigTest.java
index 951103c..09b8be5 100644
--- 
a/gobblin-restli/gobblin-flow-config-service/gobblin-flow-config-service-client/src/test/java/org/apache/gobblin/service/FlowConfigTest.java
+++ 
b/gobblin-restli/gobblin-flow-config-service/gobblin-flow-config-service-client/src/test/java/org/apache/gobblin/service/FlowConfigTest.java
@@ -82,7 +82,7 @@ public class FlowConfigTest {
     Injector injector = Guice.createInjector(new Module() {
        @Override
        public void configure(Binder binder) {
-         
binder.bind(FlowConfigsResourceHandler.class).annotatedWith(Names.named("flowConfigsResourceHandler")).toInstance(new
 FlowConfigResourceLocalHandler(flowCatalog));
+         
binder.bind(FlowConfigsResourceHandler.class).annotatedWith(Names.named(FlowConfigsResource.FLOW_CONFIG_GENERATOR_INJECT_NAME)).toInstance(new
 FlowConfigResourceLocalHandler(flowCatalog));
          // indicate that we are in unit testing since the resource is being 
blocked until flow catalog changes have
          // been made
          
binder.bindConstant().annotatedWith(Names.named("readyToUse")).to(Boolean.TRUE);

http://git-wip-us.apache.org/repos/asf/incubator-gobblin/blob/391615e7/gobblin-restli/gobblin-flow-config-service/gobblin-flow-config-service-client/src/test/java/org/apache/gobblin/service/FlowStatusTest.java
----------------------------------------------------------------------
diff --git 
a/gobblin-restli/gobblin-flow-config-service/gobblin-flow-config-service-client/src/test/java/org/apache/gobblin/service/FlowStatusTest.java
 
b/gobblin-restli/gobblin-flow-config-service/gobblin-flow-config-service-client/src/test/java/org/apache/gobblin/service/FlowStatusTest.java
index 0a5a2da..d5e857e 100644
--- 
a/gobblin-restli/gobblin-flow-config-service/gobblin-flow-config-service-client/src/test/java/org/apache/gobblin/service/FlowStatusTest.java
+++ 
b/gobblin-restli/gobblin-flow-config-service/gobblin-flow-config-service-client/src/test/java/org/apache/gobblin/service/FlowStatusTest.java
@@ -27,6 +27,7 @@ import org.testng.annotations.Test;
 
 import com.google.common.base.Joiner;
 import com.google.common.collect.Lists;
+import com.google.common.collect.Iterators;
 import com.google.inject.Binder;
 import com.google.inject.Guice;
 import com.google.inject.Injector;
@@ -35,7 +36,6 @@ import com.google.inject.name.Names;
 import com.linkedin.restli.server.resources.BaseResource;
 
 import org.apache.gobblin.config.ConfigBuilder;
-import org.apache.gobblin.metrics.event.TimingEvent;
 import org.apache.gobblin.restli.EmbeddedRestliServer;
 import org.apache.gobblin.service.monitoring.FlowStatusGenerator;
 import org.apache.gobblin.service.monitoring.JobStatusRetriever;
@@ -55,6 +55,12 @@ public class FlowStatusTest {
     }
 
     @Override
+    public Iterator<org.apache.gobblin.service.monitoring.JobStatus> 
getJobStatusesForFlowExecution(String flowName,
+        String flowGroup, long flowExecutionId, String jobGroup, String 
jobName) {
+      return Iterators.emptyIterator();
+    }
+
+    @Override
     public long getLatestExecutionIdForFlow(String flowName, String flowGroup) 
{
       return _listOfJobStatusLists.size() - 1;
     }
@@ -96,11 +102,11 @@ public class FlowStatusTest {
   public void testFindLatest() throws Exception {
     org.apache.gobblin.service.monitoring.JobStatus js1 = 
org.apache.gobblin.service.monitoring.JobStatus.builder().flowGroup("fgroup1")
         
.flowName("flow1").jobGroup("jgroup1").jobName("job1").startTime(1000L).endTime(5000L)
-        
.eventName(TimingEvent.LauncherTimings.JOB_COMPLETE).flowExecutionId(0).message("Test
 message 1")
+        
.eventName(ExecutionStatus.COMPLETE.name()).flowExecutionId(0).message("Test 
message 1")
         
.processedCount(100).jobExecutionId(1).lowWatermark("watermark:1").highWatermark("watermark:2").build();
     org.apache.gobblin.service.monitoring.JobStatus js2 = 
org.apache.gobblin.service.monitoring.JobStatus.builder().flowGroup("fgroup1")
         
.flowName("flow1").jobGroup("jgroup1").jobName("job1").startTime(2000L).endTime(6000L)
-        
.eventName(TimingEvent.LauncherTimings.JOB_COMPLETE).flowExecutionId(1).message("Test
 message 2")
+        
.eventName(ExecutionStatus.COMPLETE.name()).flowExecutionId(1).message("Test 
message 2")
         
.processedCount(200).jobExecutionId(2).lowWatermark("watermark:2").highWatermark("watermark:3").build();
     List<org.apache.gobblin.service.monitoring.JobStatus> jobStatusList1 = 
Lists.newArrayList(js1);
     List<org.apache.gobblin.service.monitoring.JobStatus> jobStatusList2 = 
Lists.newArrayList(js2);
@@ -138,11 +144,11 @@ public class FlowStatusTest {
   public void testGetCompleted() throws Exception {
     org.apache.gobblin.service.monitoring.JobStatus js1 = 
org.apache.gobblin.service.monitoring.JobStatus.builder().flowGroup("fgroup1")
         
.flowName("flow1").jobGroup("jgroup1").jobName("job1").startTime(1000L).endTime(5000L)
-        
.eventName(TimingEvent.LauncherTimings.JOB_COMPLETE).flowExecutionId(0).message("Test
 message 1")
+        
.eventName(ExecutionStatus.COMPLETE.name()).flowExecutionId(0).message("Test 
message 1")
         
.processedCount(100).jobExecutionId(1).lowWatermark("watermark:1").highWatermark("watermark:2").build();
     org.apache.gobblin.service.monitoring.JobStatus js2 = 
org.apache.gobblin.service.monitoring.JobStatus.builder().flowGroup("fgroup1")
         
.flowName("flow1").jobGroup("jgroup1").jobName("job2").startTime(2000L).endTime(6000L)
-        
.eventName(TimingEvent.LauncherTimings.JOB_COMPLETE).flowExecutionId(0).message("Test
 message 2")
+        
.eventName(ExecutionStatus.COMPLETE.name()).flowExecutionId(0).message("Test 
message 2")
         
.processedCount(200).jobExecutionId(2).lowWatermark("watermark:2").highWatermark("watermark:3").build();
     List<org.apache.gobblin.service.monitoring.JobStatus> jobStatusList = 
Lists.newArrayList(js1, js2);
     _listOfJobStatusLists = Lists.newArrayList();
@@ -178,11 +184,11 @@ public class FlowStatusTest {
   public void testGetRunning() throws Exception {
     org.apache.gobblin.service.monitoring.JobStatus js1 = 
org.apache.gobblin.service.monitoring.JobStatus.builder().flowGroup("fgroup1")
         
.flowName("flow1").jobGroup("jgroup1").jobName("job1").startTime(1000L).endTime(5000L)
-        
.eventName(TimingEvent.LauncherTimings.JOB_RUN).flowExecutionId(0).message("Test
 message 1").processedCount(100)
+        
.eventName(ExecutionStatus.RUNNING.name()).flowExecutionId(0).message("Test 
message 1").processedCount(100)
         
.jobExecutionId(1).lowWatermark("watermark:1").highWatermark("watermark:2").build();
     org.apache.gobblin.service.monitoring.JobStatus js2 = 
org.apache.gobblin.service.monitoring.JobStatus.builder().flowGroup("fgroup1")
         
.flowName("flow1").jobGroup("jgroup1").jobName("job2").startTime(2000L).endTime(6000L)
-        
.eventName(TimingEvent.LauncherTimings.JOB_COMPLETE).flowExecutionId(0).message("Test
 message 2")
+        
.eventName(ExecutionStatus.COMPLETE.name()).flowExecutionId(0).message("Test 
message 2")
         
.processedCount(200).jobExecutionId(2).lowWatermark("watermark:2").highWatermark("watermark:3").build();
     List<org.apache.gobblin.service.monitoring.JobStatus> jobStatusList = 
Lists.newArrayList(js1, js2);
     _listOfJobStatusLists = Lists.newArrayList();
@@ -218,11 +224,11 @@ public class FlowStatusTest {
   public void testGetFailed() throws Exception {
     org.apache.gobblin.service.monitoring.JobStatus js1 = 
org.apache.gobblin.service.monitoring.JobStatus.builder().flowGroup("fgroup1")
         
.flowName("flow1").jobGroup("jgroup1").jobName("job1").startTime(1000L).endTime(5000L)
-        
.eventName(TimingEvent.LauncherTimings.JOB_COMPLETE).flowExecutionId(0).message("Test
 message 1")
+        
.eventName(ExecutionStatus.COMPLETE.name()).flowExecutionId(0).message("Test 
message 1")
         
.processedCount(100).jobExecutionId(1).lowWatermark("watermark:1").highWatermark("watermark:2").build();
     org.apache.gobblin.service.monitoring.JobStatus js2 = 
org.apache.gobblin.service.monitoring.JobStatus.builder().flowGroup("fgroup1")
         
.flowName("flow1").jobGroup("jgroup1").jobName("job2").startTime(2000L).endTime(6000L)
-        
.eventName(TimingEvent.LauncherTimings.JOB_FAILED).flowExecutionId(0).message("Test
 message 2")
+        
.eventName(ExecutionStatus.FAILED.name()).flowExecutionId(0).message("Test 
message 2")
         
.processedCount(200).jobExecutionId(2).lowWatermark("watermark:2").highWatermark("watermark:3").build();
     List<org.apache.gobblin.service.monitoring.JobStatus> jobStatusList = 
Lists.newArrayList(js1, js2);
     _listOfJobStatusLists = Lists.newArrayList();

http://git-wip-us.apache.org/repos/asf/incubator-gobblin/blob/391615e7/gobblin-restli/gobblin-flow-config-service/gobblin-flow-config-service-server/src/main/java/org/apache/gobblin/service/FlowConfigResourceLocalHandler.java
----------------------------------------------------------------------
diff --git 
a/gobblin-restli/gobblin-flow-config-service/gobblin-flow-config-service-server/src/main/java/org/apache/gobblin/service/FlowConfigResourceLocalHandler.java
 
b/gobblin-restli/gobblin-flow-config-service/gobblin-flow-config-service-server/src/main/java/org/apache/gobblin/service/FlowConfigResourceLocalHandler.java
index 52cc0c7..05a1054 100644
--- 
a/gobblin-restli/gobblin-flow-config-service/gobblin-flow-config-service-server/src/main/java/org/apache/gobblin/service/FlowConfigResourceLocalHandler.java
+++ 
b/gobblin-restli/gobblin-flow-config-service/gobblin-flow-config-service-server/src/main/java/org/apache/gobblin/service/FlowConfigResourceLocalHandler.java
@@ -109,8 +109,15 @@ public class FlowConfigResourceLocalHandler implements 
FlowConfigsResourceHandle
   public CreateResponse createFlowConfig(FlowConfig flowConfig, boolean 
triggerListener) throws FlowConfigLoggedException {
     log.info("[GAAS-REST] Create called with flowGroup " + 
flowConfig.getId().getFlowGroup() + " flowName " + 
flowConfig.getId().getFlowName());
     FlowSpec flowSpec = createFlowSpecForConfig(flowConfig);
-    this.flowCatalog.put(flowSpec, triggerListener);
-    return new CreateResponse(new ComplexResourceKey<>(flowConfig.getId(), new 
EmptyRecord()), HttpStatus.S_201_CREATED);
+    // Existence of a flow spec in the flow catalog implies that the flow is 
currently running.
+    // If the new flow spec has a schedule we should allow submission of the 
new flow to accept the new schedule.
+    // However, if the new flow spec does not have a schedule, we should allow 
submission only if it is not running.
+    if (!flowConfig.hasSchedule() && 
this.flowCatalog.exists(flowSpec.getUri())) {
+      return new CreateResponse(new ComplexResourceKey<>(flowConfig.getId(), 
new EmptyRecord()), HttpStatus.S_409_CONFLICT);
+    } else {
+      this.flowCatalog.put(flowSpec, triggerListener);
+      return new CreateResponse(new ComplexResourceKey<>(flowConfig.getId(), 
new EmptyRecord()), HttpStatus.S_201_CREATED);
+    }
   }
 
   /**

http://git-wip-us.apache.org/repos/asf/incubator-gobblin/blob/391615e7/gobblin-restli/gobblin-flow-config-service/gobblin-flow-config-service-server/src/main/java/org/apache/gobblin/service/FlowConfigsResource.java
----------------------------------------------------------------------
diff --git 
a/gobblin-restli/gobblin-flow-config-service/gobblin-flow-config-service-server/src/main/java/org/apache/gobblin/service/FlowConfigsResource.java
 
b/gobblin-restli/gobblin-flow-config-service/gobblin-flow-config-service-server/src/main/java/org/apache/gobblin/service/FlowConfigsResource.java
index 8c91ca7..1d0aa20 100644
--- 
a/gobblin-restli/gobblin-flow-config-service/gobblin-flow-config-service-server/src/main/java/org/apache/gobblin/service/FlowConfigsResource.java
+++ 
b/gobblin-restli/gobblin-flow-config-service/gobblin-flow-config-service-server/src/main/java/org/apache/gobblin/service/FlowConfigsResource.java
@@ -41,6 +41,7 @@ import 
com.linkedin.restli.server.resources.ComplexKeyResourceTemplate;
 @RestLiCollection(name = "flowconfigs", namespace = 
"org.apache.gobblin.service", keyName = "id")
 public class FlowConfigsResource extends ComplexKeyResourceTemplate<FlowId, 
EmptyRecord, FlowConfig> {
   private static final Logger LOG = 
LoggerFactory.getLogger(FlowConfigsResource.class);
+  public static final String FLOW_CONFIG_GENERATOR_INJECT_NAME = 
"flowConfigsResourceHandler";
   private static final Set<String> ALLOWED_METADATA = 
ImmutableSet.of("delete.state.store");
 
 
@@ -48,7 +49,7 @@ public class FlowConfigsResource extends 
ComplexKeyResourceTemplate<FlowId, Empt
   public static FlowConfigsResourceHandler global_flowConfigsResourceHandler = 
null;
 
   @Inject
-  @Named("flowConfigsResourceHandler")
+  @Named(FLOW_CONFIG_GENERATOR_INJECT_NAME)
   private FlowConfigsResourceHandler flowConfigsResourceHandler;
 
   // For blocking use of this resource until it is ready

http://git-wip-us.apache.org/repos/asf/incubator-gobblin/blob/391615e7/gobblin-restli/gobblin-flow-config-service/gobblin-flow-config-service-server/src/main/java/org/apache/gobblin/service/FlowConfigsV2Resource.java
----------------------------------------------------------------------
diff --git 
a/gobblin-restli/gobblin-flow-config-service/gobblin-flow-config-service-server/src/main/java/org/apache/gobblin/service/FlowConfigsV2Resource.java
 
b/gobblin-restli/gobblin-flow-config-service/gobblin-flow-config-service-server/src/main/java/org/apache/gobblin/service/FlowConfigsV2Resource.java
index 7053e39..fa6112c 100644
--- 
a/gobblin-restli/gobblin-flow-config-service/gobblin-flow-config-service-server/src/main/java/org/apache/gobblin/service/FlowConfigsV2Resource.java
+++ 
b/gobblin-restli/gobblin-flow-config-service/gobblin-flow-config-service-server/src/main/java/org/apache/gobblin/service/FlowConfigsV2Resource.java
@@ -34,6 +34,7 @@ import 
com.linkedin.restli.server.resources.ComplexKeyResourceTemplate;
 @RestLiCollection(name = "flowconfigsV2", namespace = 
"org.apache.gobblin.service", keyName = "id")
 public class FlowConfigsV2Resource extends ComplexKeyResourceTemplate<FlowId, 
FlowStatusId, FlowConfig> {
   private static final Logger LOG = 
LoggerFactory.getLogger(FlowConfigsV2Resource.class);
+  public static final String FLOW_CONFIG_GENERATOR_INJECT_NAME = 
"flowConfigsV2ResourceHandler";
   private static final Set<String> ALLOWED_METADATA = 
ImmutableSet.of("delete.state.store");
 
 
@@ -41,7 +42,7 @@ public class FlowConfigsV2Resource extends 
ComplexKeyResourceTemplate<FlowId, Fl
   public static FlowConfigsResourceHandler global_flowConfigsResourceHandler = 
null;
 
   @Inject
-  @Named("flowConfigsV2ResourceHandler")
+  @Named(FLOW_CONFIG_GENERATOR_INJECT_NAME)
   private FlowConfigsResourceHandler flowConfigsResourceHandler;
 
   // For blocking use of this resource until it is ready

http://git-wip-us.apache.org/repos/asf/incubator-gobblin/blob/391615e7/gobblin-restli/gobblin-flow-config-service/gobblin-flow-config-service-server/src/main/java/org/apache/gobblin/service/FlowStatusResource.java
----------------------------------------------------------------------
diff --git 
a/gobblin-restli/gobblin-flow-config-service/gobblin-flow-config-service-server/src/main/java/org/apache/gobblin/service/FlowStatusResource.java
 
b/gobblin-restli/gobblin-flow-config-service/gobblin-flow-config-service-server/src/main/java/org/apache/gobblin/service/FlowStatusResource.java
index 584c57c..a08d594 100644
--- 
a/gobblin-restli/gobblin-flow-config-service/gobblin-flow-config-service-server/src/main/java/org/apache/gobblin/service/FlowStatusResource.java
+++ 
b/gobblin-restli/gobblin-flow-config-service/gobblin-flow-config-service-server/src/main/java/org/apache/gobblin/service/FlowStatusResource.java
@@ -17,14 +17,10 @@
 
 package org.apache.gobblin.service;
 
-import com.linkedin.restli.server.PagingContext;
-import com.linkedin.restli.server.annotations.Context;
-import com.linkedin.restli.server.annotations.Finder;
-import com.linkedin.restli.server.annotations.QueryParam;
 import java.util.Collections;
 import java.util.Iterator;
-
 import java.util.List;
+
 import org.apache.commons.lang.StringUtils;
 import org.slf4j.Logger;
 import org.slf4j.LoggerFactory;
@@ -32,10 +28,13 @@ import org.slf4j.LoggerFactory;
 import com.google.inject.Inject;
 import com.linkedin.restli.common.ComplexResourceKey;
 import com.linkedin.restli.common.EmptyRecord;
+import com.linkedin.restli.server.PagingContext;
+import com.linkedin.restli.server.annotations.Context;
+import com.linkedin.restli.server.annotations.Finder;
+import com.linkedin.restli.server.annotations.QueryParam;
 import com.linkedin.restli.server.annotations.RestLiCollection;
 import com.linkedin.restli.server.resources.ComplexKeyResourceTemplate;
 
-import org.apache.gobblin.metrics.event.TimingEvent;
 import org.apache.gobblin.service.monitoring.FlowStatusGenerator;
 
 
@@ -120,7 +119,7 @@ public class FlowStatusResource extends 
ComplexKeyResourceTemplate<FlowStatusId,
               .setExecutionStartTime(queriedJobStatus.getStartTime())
               .setExecutionEndTime(queriedJobStatus.getEndTime())
               .setProcessedCount(queriedJobStatus.getProcessedCount()))
-          
.setExecutionStatus(timingEventToStatus(queriedJobStatus.getEventName()))
+          
.setExecutionStatus(ExecutionStatus.valueOf(queriedJobStatus.getEventName()))
           .setMessage(queriedJobStatus.getMessage())
           .setJobState(new 
JobState().setLowWatermark(queriedJobStatus.getLowWatermark()).
               setHighWatermark(queriedJobStatus.getHighWatermark()));
@@ -160,35 +159,12 @@ public class FlowStatusResource extends 
ComplexKeyResourceTemplate<FlowStatusId,
   }
 
   /**
-   * Maps a timing event name to a flow/job ExecutionStatus
-   * @param timingEvent timing event name
-   * @return status string
-   */
-  private ExecutionStatus timingEventToStatus(String timingEvent) {
-    ExecutionStatus status;
-
-    switch (timingEvent) {
-      case TimingEvent.LauncherTimings.JOB_FAILED:
-      case TimingEvent.LauncherTimings.JOB_CANCEL:
-        status = ExecutionStatus.FAILED;
-        break;
-      case TimingEvent.LauncherTimings.JOB_COMPLETE:
-        status = ExecutionStatus.COMPLETE;
-        break;
-      default:
-        status = ExecutionStatus.RUNNING;
-    }
-
-    return status;
-  }
-
-  /**
    * Determines the new flow status based on the current flow status and new 
job status
    * @param jobExecutionStatus job status
    * @param currentFlowExecutionStatus current flow status
    * @return updated flow status
    */
-  private ExecutionStatus updatedFlowExecutionStatus(ExecutionStatus 
jobExecutionStatus,
+  static ExecutionStatus updatedFlowExecutionStatus(ExecutionStatus 
jobExecutionStatus,
       ExecutionStatus currentFlowExecutionStatus) {
 
     // if any job failed or flow has failed then return failed status
@@ -197,8 +173,16 @@ public class FlowStatusResource extends 
ComplexKeyResourceTemplate<FlowStatusId,
       return ExecutionStatus.FAILED;
     }
 
-    // if job still running then flow is still running
-    if (jobExecutionStatus == ExecutionStatus.RUNNING) {
+    // if any job is cancelled or flow has failed then return failed status
+    if (currentFlowExecutionStatus == ExecutionStatus.CANCELLED ||
+        jobExecutionStatus == ExecutionStatus.CANCELLED) {
+      return ExecutionStatus.CANCELLED;
+    }
+
+    if (currentFlowExecutionStatus == ExecutionStatus.RUNNING ||
+        jobExecutionStatus == ExecutionStatus.RUNNING ||
+        jobExecutionStatus == ExecutionStatus.ORCHESTRATED ||
+        jobExecutionStatus == ExecutionStatus.COMPILED) {
       return ExecutionStatus.RUNNING;
     }
 

http://git-wip-us.apache.org/repos/asf/incubator-gobblin/blob/391615e7/gobblin-runtime/src/main/java/org/apache/gobblin/service/monitoring/FlowStatusGenerator.java
----------------------------------------------------------------------
diff --git 
a/gobblin-runtime/src/main/java/org/apache/gobblin/service/monitoring/FlowStatusGenerator.java
 
b/gobblin-runtime/src/main/java/org/apache/gobblin/service/monitoring/FlowStatusGenerator.java
index 5657e42..c903ba1 100644
--- 
a/gobblin-runtime/src/main/java/org/apache/gobblin/service/monitoring/FlowStatusGenerator.java
+++ 
b/gobblin-runtime/src/main/java/org/apache/gobblin/service/monitoring/FlowStatusGenerator.java
@@ -18,16 +18,11 @@
 package org.apache.gobblin.service.monitoring;
 
 import java.util.Iterator;
-import java.util.List;
 
-import org.apache.commons.lang.StringUtils;
-
-import com.google.common.collect.Lists;
+import lombok.Builder;
 
 import org.apache.gobblin.annotation.Alpha;
 
-import lombok.Builder;
-
 
 /**
  * Generator for {@link FlowStatus}, which relies on a {@link 
JobStatusRetriever}.

http://git-wip-us.apache.org/repos/asf/incubator-gobblin/blob/391615e7/gobblin-runtime/src/main/java/org/apache/gobblin/service/monitoring/JobStatusRetriever.java
----------------------------------------------------------------------
diff --git 
a/gobblin-runtime/src/main/java/org/apache/gobblin/service/monitoring/JobStatusRetriever.java
 
b/gobblin-runtime/src/main/java/org/apache/gobblin/service/monitoring/JobStatusRetriever.java
index fe60c4e..03a8cf6 100644
--- 
a/gobblin-runtime/src/main/java/org/apache/gobblin/service/monitoring/JobStatusRetriever.java
+++ 
b/gobblin-runtime/src/main/java/org/apache/gobblin/service/monitoring/JobStatusRetriever.java
@@ -33,6 +33,9 @@ public abstract class JobStatusRetriever implements 
LatestFlowExecutionIdTracker
   public abstract Iterator<JobStatus> getJobStatusesForFlowExecution(String 
flowName, String flowGroup,
       long flowExecutionId);
 
+  public abstract Iterator<JobStatus> getJobStatusesForFlowExecution(String 
flowName, String flowGroup,
+      long flowExecutionId, String jobGroup, String jobName);
+
   /**
    * Get the latest {@link JobStatus}es that belongs to the same latest flow 
execution. Currently, latest flow execution
    * is decided by comparing {@link JobStatus#getFlowExecutionId()}.
@@ -40,7 +43,7 @@ public abstract class JobStatusRetriever implements 
LatestFlowExecutionIdTracker
   public Iterator<JobStatus> getLatestJobStatusByFlowNameAndGroup(String 
flowName, String flowGroup) {
     long latestExecutionId = getLatestExecutionIdForFlow(flowName, flowGroup);
 
-    return latestExecutionId == -1l ? Iterators.<JobStatus>emptyIterator()
+    return latestExecutionId == -1L ? Iterators.<JobStatus>emptyIterator()
         : getJobStatusesForFlowExecution(flowName, flowGroup, 
latestExecutionId);
   }
 }

http://git-wip-us.apache.org/repos/asf/incubator-gobblin/blob/391615e7/gobblin-service/src/main/java/org/apache/gobblin/service/modules/core/GobblinServiceManager.java
----------------------------------------------------------------------
diff --git 
a/gobblin-service/src/main/java/org/apache/gobblin/service/modules/core/GobblinServiceManager.java
 
b/gobblin-service/src/main/java/org/apache/gobblin/service/modules/core/GobblinServiceManager.java
index 9c1b42f..8d934ce 100644
--- 
a/gobblin-service/src/main/java/org/apache/gobblin/service/modules/core/GobblinServiceManager.java
+++ 
b/gobblin-service/src/main/java/org/apache/gobblin/service/modules/core/GobblinServiceManager.java
@@ -81,6 +81,7 @@ import 
org.apache.gobblin.service.FlowConfigResourceLocalHandler;
 import org.apache.gobblin.service.FlowConfigV2ResourceLocalHandler;
 import org.apache.gobblin.service.FlowConfigsResource;
 import org.apache.gobblin.service.FlowConfigsResourceHandler;
+import org.apache.gobblin.service.FlowConfigsV2Resource;
 import org.apache.gobblin.service.FlowId;
 import org.apache.gobblin.service.Schedule;
 import org.apache.gobblin.service.ServiceConfigKeys;
@@ -235,8 +236,8 @@ public class GobblinServiceManager implements 
ApplicationLauncher, StandardMetri
       Injector injector = Guice.createInjector(new Module() {
         @Override
         public void configure(Binder binder) {
-          
binder.bind(FlowConfigsResourceHandler.class).annotatedWith(Names.named("flowConfigsResourceHandler")).toInstance(GobblinServiceManager.this.resourceHandler);
-          
binder.bind(FlowConfigsResourceHandler.class).annotatedWith(Names.named("flowConfigsV2ResourceHandler")).toInstance(GobblinServiceManager.this.v2ResourceHandler);
+          
binder.bind(FlowConfigsResourceHandler.class).annotatedWith(Names.named(FlowConfigsResource.FLOW_CONFIG_GENERATOR_INJECT_NAME)).toInstance(GobblinServiceManager.this.resourceHandler);
+          
binder.bind(FlowConfigsResourceHandler.class).annotatedWith(Names.named(FlowConfigsV2Resource.FLOW_CONFIG_GENERATOR_INJECT_NAME)).toInstance(GobblinServiceManager.this.v2ResourceHandler);
           
binder.bindConstant().annotatedWith(Names.named("readyToUse")).to(Boolean.TRUE);
         }
       });

Reply via email to