Repository: incubator-gobblin Updated Branches: refs/heads/master bac980328 -> 391615e72
[GOBBLIN-597] avoid submitting gaas job if it is already running Closes #2463 from arjun4084346/skipDuplicateGaasJob Project: http://git-wip-us.apache.org/repos/asf/incubator-gobblin/repo Commit: http://git-wip-us.apache.org/repos/asf/incubator-gobblin/commit/391615e7 Tree: http://git-wip-us.apache.org/repos/asf/incubator-gobblin/tree/391615e7 Diff: http://git-wip-us.apache.org/repos/asf/incubator-gobblin/diff/391615e7 Branch: refs/heads/master Commit: 391615e72e5f7bdd78034447881a3e325a0288e6 Parents: bac9803 Author: Arjun <[email protected]> Authored: Mon Oct 1 21:20:23 2018 -0700 Committer: Hung Tran <[email protected]> Committed: Mon Oct 1 21:20:23 2018 -0700 ---------------------------------------------------------------------- .../apache/gobblin/service/ExecutionStatus.pdsc | 12 +++-- ...e.gobblin.service.flowstatuses.snapshot.json | 5 +- .../apache/gobblin/service/FlowConfigTest.java | 2 +- .../apache/gobblin/service/FlowStatusTest.java | 24 ++++++---- .../service/FlowConfigResourceLocalHandler.java | 11 ++++- .../gobblin/service/FlowConfigsResource.java | 3 +- .../gobblin/service/FlowConfigsV2Resource.java | 3 +- .../gobblin/service/FlowStatusResource.java | 50 +++++++------------- .../service/monitoring/FlowStatusGenerator.java | 7 +-- .../service/monitoring/JobStatusRetriever.java | 5 +- .../modules/core/GobblinServiceManager.java | 5 +- 11 files changed, 67 insertions(+), 60 deletions(-) ---------------------------------------------------------------------- http://git-wip-us.apache.org/repos/asf/incubator-gobblin/blob/391615e7/gobblin-restli/gobblin-flow-config-service/gobblin-flow-config-service-api/src/main/pegasus/org/apache/gobblin/service/ExecutionStatus.pdsc ---------------------------------------------------------------------- diff --git a/gobblin-restli/gobblin-flow-config-service/gobblin-flow-config-service-api/src/main/pegasus/org/apache/gobblin/service/ExecutionStatus.pdsc b/gobblin-restli/gobblin-flow-config-service/gobblin-flow-config-service-api/src/main/pegasus/org/apache/gobblin/service/ExecutionStatus.pdsc index 29d6a02..c87c484 100644 --- a/gobblin-restli/gobblin-flow-config-service/gobblin-flow-config-service-api/src/main/pegasus/org/apache/gobblin/service/ExecutionStatus.pdsc +++ b/gobblin-restli/gobblin-flow-config-service/gobblin-flow-config-service-api/src/main/pegasus/org/apache/gobblin/service/ExecutionStatus.pdsc @@ -3,7 +3,13 @@ "name" : "ExecutionStatus", "namespace" : "org.apache.gobblin.service", "doc" : "Execution status for a flow or job", - "symbols" : ["RUNNING", "FAILED", "COMPLETE"], - "symbolDocs" : {"RUNNING":"Flow or job is currently executing", "FAILED":"Flow or job failed", - "COMPLETE":"Flow or job completed execution"} + "symbols" : ["COMPILED", "ORCHESTRATED", "RUNNING", "FAILED", "CANCELLED", "COMPLETE"], + "symbolDocs" : { + "COMPILED":"Flow compiled to jobs.", + "ORCHESTRATED":"Job(s) orchestrated to spec executors.", + "RUNNING": "Flow or job is currently executing", + "FAILED":"Flow or job failed", + "CANCELLED":"Flow cancelled.", + "COMPLETE":"Flow or job completed execution" + } } http://git-wip-us.apache.org/repos/asf/incubator-gobblin/blob/391615e7/gobblin-restli/gobblin-flow-config-service/gobblin-flow-config-service-api/src/main/snapshot/org.apache.gobblin.service.flowstatuses.snapshot.json ---------------------------------------------------------------------- diff --git a/gobblin-restli/gobblin-flow-config-service/gobblin-flow-config-service-api/src/main/snapshot/org.apache.gobblin.service.flowstatuses.snapshot.json b/gobblin-restli/gobblin-flow-config-service/gobblin-flow-config-service-api/src/main/snapshot/org.apache.gobblin.service.flowstatuses.snapshot.json index 99b6a1f..8912336 100644 --- a/gobblin-restli/gobblin-flow-config-service/gobblin-flow-config-service-api/src/main/snapshot/org.apache.gobblin.service.flowstatuses.snapshot.json +++ b/gobblin-restli/gobblin-flow-config-service/gobblin-flow-config-service-api/src/main/snapshot/org.apache.gobblin.service.flowstatuses.snapshot.json @@ -36,10 +36,13 @@ "name" : "ExecutionStatus", "namespace" : "org.apache.gobblin.service", "doc" : "Execution status for a flow or job", - "symbols" : [ "RUNNING", "FAILED", "COMPLETE" ], + "symbols" : [ "COMPILED", "ORCHESTRATED", "RUNNING", "FAILED", "CANCELLED", "COMPLETE" ], "symbolDocs" : { + "COMPILED" : "Flow compiled to jobs.", + "ORCHESTRATED" : "Job(s) orchestrated to spec executors.", "RUNNING" : "Flow or job is currently executing", "FAILED" : "Flow or job failed", + "CANCELLED" : "Flow cancelled.", "COMPLETE" : "Flow or job completed execution" } }, { http://git-wip-us.apache.org/repos/asf/incubator-gobblin/blob/391615e7/gobblin-restli/gobblin-flow-config-service/gobblin-flow-config-service-client/src/test/java/org/apache/gobblin/service/FlowConfigTest.java ---------------------------------------------------------------------- diff --git a/gobblin-restli/gobblin-flow-config-service/gobblin-flow-config-service-client/src/test/java/org/apache/gobblin/service/FlowConfigTest.java b/gobblin-restli/gobblin-flow-config-service/gobblin-flow-config-service-client/src/test/java/org/apache/gobblin/service/FlowConfigTest.java index 951103c..09b8be5 100644 --- a/gobblin-restli/gobblin-flow-config-service/gobblin-flow-config-service-client/src/test/java/org/apache/gobblin/service/FlowConfigTest.java +++ b/gobblin-restli/gobblin-flow-config-service/gobblin-flow-config-service-client/src/test/java/org/apache/gobblin/service/FlowConfigTest.java @@ -82,7 +82,7 @@ public class FlowConfigTest { Injector injector = Guice.createInjector(new Module() { @Override public void configure(Binder binder) { - binder.bind(FlowConfigsResourceHandler.class).annotatedWith(Names.named("flowConfigsResourceHandler")).toInstance(new FlowConfigResourceLocalHandler(flowCatalog)); + binder.bind(FlowConfigsResourceHandler.class).annotatedWith(Names.named(FlowConfigsResource.FLOW_CONFIG_GENERATOR_INJECT_NAME)).toInstance(new FlowConfigResourceLocalHandler(flowCatalog)); // indicate that we are in unit testing since the resource is being blocked until flow catalog changes have // been made binder.bindConstant().annotatedWith(Names.named("readyToUse")).to(Boolean.TRUE); http://git-wip-us.apache.org/repos/asf/incubator-gobblin/blob/391615e7/gobblin-restli/gobblin-flow-config-service/gobblin-flow-config-service-client/src/test/java/org/apache/gobblin/service/FlowStatusTest.java ---------------------------------------------------------------------- diff --git a/gobblin-restli/gobblin-flow-config-service/gobblin-flow-config-service-client/src/test/java/org/apache/gobblin/service/FlowStatusTest.java b/gobblin-restli/gobblin-flow-config-service/gobblin-flow-config-service-client/src/test/java/org/apache/gobblin/service/FlowStatusTest.java index 0a5a2da..d5e857e 100644 --- a/gobblin-restli/gobblin-flow-config-service/gobblin-flow-config-service-client/src/test/java/org/apache/gobblin/service/FlowStatusTest.java +++ b/gobblin-restli/gobblin-flow-config-service/gobblin-flow-config-service-client/src/test/java/org/apache/gobblin/service/FlowStatusTest.java @@ -27,6 +27,7 @@ import org.testng.annotations.Test; import com.google.common.base.Joiner; import com.google.common.collect.Lists; +import com.google.common.collect.Iterators; import com.google.inject.Binder; import com.google.inject.Guice; import com.google.inject.Injector; @@ -35,7 +36,6 @@ import com.google.inject.name.Names; import com.linkedin.restli.server.resources.BaseResource; import org.apache.gobblin.config.ConfigBuilder; -import org.apache.gobblin.metrics.event.TimingEvent; import org.apache.gobblin.restli.EmbeddedRestliServer; import org.apache.gobblin.service.monitoring.FlowStatusGenerator; import org.apache.gobblin.service.monitoring.JobStatusRetriever; @@ -55,6 +55,12 @@ public class FlowStatusTest { } @Override + public Iterator<org.apache.gobblin.service.monitoring.JobStatus> getJobStatusesForFlowExecution(String flowName, + String flowGroup, long flowExecutionId, String jobGroup, String jobName) { + return Iterators.emptyIterator(); + } + + @Override public long getLatestExecutionIdForFlow(String flowName, String flowGroup) { return _listOfJobStatusLists.size() - 1; } @@ -96,11 +102,11 @@ public class FlowStatusTest { public void testFindLatest() throws Exception { org.apache.gobblin.service.monitoring.JobStatus js1 = org.apache.gobblin.service.monitoring.JobStatus.builder().flowGroup("fgroup1") .flowName("flow1").jobGroup("jgroup1").jobName("job1").startTime(1000L).endTime(5000L) - .eventName(TimingEvent.LauncherTimings.JOB_COMPLETE).flowExecutionId(0).message("Test message 1") + .eventName(ExecutionStatus.COMPLETE.name()).flowExecutionId(0).message("Test message 1") .processedCount(100).jobExecutionId(1).lowWatermark("watermark:1").highWatermark("watermark:2").build(); org.apache.gobblin.service.monitoring.JobStatus js2 = org.apache.gobblin.service.monitoring.JobStatus.builder().flowGroup("fgroup1") .flowName("flow1").jobGroup("jgroup1").jobName("job1").startTime(2000L).endTime(6000L) - .eventName(TimingEvent.LauncherTimings.JOB_COMPLETE).flowExecutionId(1).message("Test message 2") + .eventName(ExecutionStatus.COMPLETE.name()).flowExecutionId(1).message("Test message 2") .processedCount(200).jobExecutionId(2).lowWatermark("watermark:2").highWatermark("watermark:3").build(); List<org.apache.gobblin.service.monitoring.JobStatus> jobStatusList1 = Lists.newArrayList(js1); List<org.apache.gobblin.service.monitoring.JobStatus> jobStatusList2 = Lists.newArrayList(js2); @@ -138,11 +144,11 @@ public class FlowStatusTest { public void testGetCompleted() throws Exception { org.apache.gobblin.service.monitoring.JobStatus js1 = org.apache.gobblin.service.monitoring.JobStatus.builder().flowGroup("fgroup1") .flowName("flow1").jobGroup("jgroup1").jobName("job1").startTime(1000L).endTime(5000L) - .eventName(TimingEvent.LauncherTimings.JOB_COMPLETE).flowExecutionId(0).message("Test message 1") + .eventName(ExecutionStatus.COMPLETE.name()).flowExecutionId(0).message("Test message 1") .processedCount(100).jobExecutionId(1).lowWatermark("watermark:1").highWatermark("watermark:2").build(); org.apache.gobblin.service.monitoring.JobStatus js2 = org.apache.gobblin.service.monitoring.JobStatus.builder().flowGroup("fgroup1") .flowName("flow1").jobGroup("jgroup1").jobName("job2").startTime(2000L).endTime(6000L) - .eventName(TimingEvent.LauncherTimings.JOB_COMPLETE).flowExecutionId(0).message("Test message 2") + .eventName(ExecutionStatus.COMPLETE.name()).flowExecutionId(0).message("Test message 2") .processedCount(200).jobExecutionId(2).lowWatermark("watermark:2").highWatermark("watermark:3").build(); List<org.apache.gobblin.service.monitoring.JobStatus> jobStatusList = Lists.newArrayList(js1, js2); _listOfJobStatusLists = Lists.newArrayList(); @@ -178,11 +184,11 @@ public class FlowStatusTest { public void testGetRunning() throws Exception { org.apache.gobblin.service.monitoring.JobStatus js1 = org.apache.gobblin.service.monitoring.JobStatus.builder().flowGroup("fgroup1") .flowName("flow1").jobGroup("jgroup1").jobName("job1").startTime(1000L).endTime(5000L) - .eventName(TimingEvent.LauncherTimings.JOB_RUN).flowExecutionId(0).message("Test message 1").processedCount(100) + .eventName(ExecutionStatus.RUNNING.name()).flowExecutionId(0).message("Test message 1").processedCount(100) .jobExecutionId(1).lowWatermark("watermark:1").highWatermark("watermark:2").build(); org.apache.gobblin.service.monitoring.JobStatus js2 = org.apache.gobblin.service.monitoring.JobStatus.builder().flowGroup("fgroup1") .flowName("flow1").jobGroup("jgroup1").jobName("job2").startTime(2000L).endTime(6000L) - .eventName(TimingEvent.LauncherTimings.JOB_COMPLETE).flowExecutionId(0).message("Test message 2") + .eventName(ExecutionStatus.COMPLETE.name()).flowExecutionId(0).message("Test message 2") .processedCount(200).jobExecutionId(2).lowWatermark("watermark:2").highWatermark("watermark:3").build(); List<org.apache.gobblin.service.monitoring.JobStatus> jobStatusList = Lists.newArrayList(js1, js2); _listOfJobStatusLists = Lists.newArrayList(); @@ -218,11 +224,11 @@ public class FlowStatusTest { public void testGetFailed() throws Exception { org.apache.gobblin.service.monitoring.JobStatus js1 = org.apache.gobblin.service.monitoring.JobStatus.builder().flowGroup("fgroup1") .flowName("flow1").jobGroup("jgroup1").jobName("job1").startTime(1000L).endTime(5000L) - .eventName(TimingEvent.LauncherTimings.JOB_COMPLETE).flowExecutionId(0).message("Test message 1") + .eventName(ExecutionStatus.COMPLETE.name()).flowExecutionId(0).message("Test message 1") .processedCount(100).jobExecutionId(1).lowWatermark("watermark:1").highWatermark("watermark:2").build(); org.apache.gobblin.service.monitoring.JobStatus js2 = org.apache.gobblin.service.monitoring.JobStatus.builder().flowGroup("fgroup1") .flowName("flow1").jobGroup("jgroup1").jobName("job2").startTime(2000L).endTime(6000L) - .eventName(TimingEvent.LauncherTimings.JOB_FAILED).flowExecutionId(0).message("Test message 2") + .eventName(ExecutionStatus.FAILED.name()).flowExecutionId(0).message("Test message 2") .processedCount(200).jobExecutionId(2).lowWatermark("watermark:2").highWatermark("watermark:3").build(); List<org.apache.gobblin.service.monitoring.JobStatus> jobStatusList = Lists.newArrayList(js1, js2); _listOfJobStatusLists = Lists.newArrayList(); http://git-wip-us.apache.org/repos/asf/incubator-gobblin/blob/391615e7/gobblin-restli/gobblin-flow-config-service/gobblin-flow-config-service-server/src/main/java/org/apache/gobblin/service/FlowConfigResourceLocalHandler.java ---------------------------------------------------------------------- diff --git a/gobblin-restli/gobblin-flow-config-service/gobblin-flow-config-service-server/src/main/java/org/apache/gobblin/service/FlowConfigResourceLocalHandler.java b/gobblin-restli/gobblin-flow-config-service/gobblin-flow-config-service-server/src/main/java/org/apache/gobblin/service/FlowConfigResourceLocalHandler.java index 52cc0c7..05a1054 100644 --- a/gobblin-restli/gobblin-flow-config-service/gobblin-flow-config-service-server/src/main/java/org/apache/gobblin/service/FlowConfigResourceLocalHandler.java +++ b/gobblin-restli/gobblin-flow-config-service/gobblin-flow-config-service-server/src/main/java/org/apache/gobblin/service/FlowConfigResourceLocalHandler.java @@ -109,8 +109,15 @@ public class FlowConfigResourceLocalHandler implements FlowConfigsResourceHandle public CreateResponse createFlowConfig(FlowConfig flowConfig, boolean triggerListener) throws FlowConfigLoggedException { log.info("[GAAS-REST] Create called with flowGroup " + flowConfig.getId().getFlowGroup() + " flowName " + flowConfig.getId().getFlowName()); FlowSpec flowSpec = createFlowSpecForConfig(flowConfig); - this.flowCatalog.put(flowSpec, triggerListener); - return new CreateResponse(new ComplexResourceKey<>(flowConfig.getId(), new EmptyRecord()), HttpStatus.S_201_CREATED); + // Existence of a flow spec in the flow catalog implies that the flow is currently running. + // If the new flow spec has a schedule we should allow submission of the new flow to accept the new schedule. + // However, if the new flow spec does not have a schedule, we should allow submission only if it is not running. + if (!flowConfig.hasSchedule() && this.flowCatalog.exists(flowSpec.getUri())) { + return new CreateResponse(new ComplexResourceKey<>(flowConfig.getId(), new EmptyRecord()), HttpStatus.S_409_CONFLICT); + } else { + this.flowCatalog.put(flowSpec, triggerListener); + return new CreateResponse(new ComplexResourceKey<>(flowConfig.getId(), new EmptyRecord()), HttpStatus.S_201_CREATED); + } } /** http://git-wip-us.apache.org/repos/asf/incubator-gobblin/blob/391615e7/gobblin-restli/gobblin-flow-config-service/gobblin-flow-config-service-server/src/main/java/org/apache/gobblin/service/FlowConfigsResource.java ---------------------------------------------------------------------- diff --git a/gobblin-restli/gobblin-flow-config-service/gobblin-flow-config-service-server/src/main/java/org/apache/gobblin/service/FlowConfigsResource.java b/gobblin-restli/gobblin-flow-config-service/gobblin-flow-config-service-server/src/main/java/org/apache/gobblin/service/FlowConfigsResource.java index 8c91ca7..1d0aa20 100644 --- a/gobblin-restli/gobblin-flow-config-service/gobblin-flow-config-service-server/src/main/java/org/apache/gobblin/service/FlowConfigsResource.java +++ b/gobblin-restli/gobblin-flow-config-service/gobblin-flow-config-service-server/src/main/java/org/apache/gobblin/service/FlowConfigsResource.java @@ -41,6 +41,7 @@ import com.linkedin.restli.server.resources.ComplexKeyResourceTemplate; @RestLiCollection(name = "flowconfigs", namespace = "org.apache.gobblin.service", keyName = "id") public class FlowConfigsResource extends ComplexKeyResourceTemplate<FlowId, EmptyRecord, FlowConfig> { private static final Logger LOG = LoggerFactory.getLogger(FlowConfigsResource.class); + public static final String FLOW_CONFIG_GENERATOR_INJECT_NAME = "flowConfigsResourceHandler"; private static final Set<String> ALLOWED_METADATA = ImmutableSet.of("delete.state.store"); @@ -48,7 +49,7 @@ public class FlowConfigsResource extends ComplexKeyResourceTemplate<FlowId, Empt public static FlowConfigsResourceHandler global_flowConfigsResourceHandler = null; @Inject - @Named("flowConfigsResourceHandler") + @Named(FLOW_CONFIG_GENERATOR_INJECT_NAME) private FlowConfigsResourceHandler flowConfigsResourceHandler; // For blocking use of this resource until it is ready http://git-wip-us.apache.org/repos/asf/incubator-gobblin/blob/391615e7/gobblin-restli/gobblin-flow-config-service/gobblin-flow-config-service-server/src/main/java/org/apache/gobblin/service/FlowConfigsV2Resource.java ---------------------------------------------------------------------- diff --git a/gobblin-restli/gobblin-flow-config-service/gobblin-flow-config-service-server/src/main/java/org/apache/gobblin/service/FlowConfigsV2Resource.java b/gobblin-restli/gobblin-flow-config-service/gobblin-flow-config-service-server/src/main/java/org/apache/gobblin/service/FlowConfigsV2Resource.java index 7053e39..fa6112c 100644 --- a/gobblin-restli/gobblin-flow-config-service/gobblin-flow-config-service-server/src/main/java/org/apache/gobblin/service/FlowConfigsV2Resource.java +++ b/gobblin-restli/gobblin-flow-config-service/gobblin-flow-config-service-server/src/main/java/org/apache/gobblin/service/FlowConfigsV2Resource.java @@ -34,6 +34,7 @@ import com.linkedin.restli.server.resources.ComplexKeyResourceTemplate; @RestLiCollection(name = "flowconfigsV2", namespace = "org.apache.gobblin.service", keyName = "id") public class FlowConfigsV2Resource extends ComplexKeyResourceTemplate<FlowId, FlowStatusId, FlowConfig> { private static final Logger LOG = LoggerFactory.getLogger(FlowConfigsV2Resource.class); + public static final String FLOW_CONFIG_GENERATOR_INJECT_NAME = "flowConfigsV2ResourceHandler"; private static final Set<String> ALLOWED_METADATA = ImmutableSet.of("delete.state.store"); @@ -41,7 +42,7 @@ public class FlowConfigsV2Resource extends ComplexKeyResourceTemplate<FlowId, Fl public static FlowConfigsResourceHandler global_flowConfigsResourceHandler = null; @Inject - @Named("flowConfigsV2ResourceHandler") + @Named(FLOW_CONFIG_GENERATOR_INJECT_NAME) private FlowConfigsResourceHandler flowConfigsResourceHandler; // For blocking use of this resource until it is ready http://git-wip-us.apache.org/repos/asf/incubator-gobblin/blob/391615e7/gobblin-restli/gobblin-flow-config-service/gobblin-flow-config-service-server/src/main/java/org/apache/gobblin/service/FlowStatusResource.java ---------------------------------------------------------------------- diff --git a/gobblin-restli/gobblin-flow-config-service/gobblin-flow-config-service-server/src/main/java/org/apache/gobblin/service/FlowStatusResource.java b/gobblin-restli/gobblin-flow-config-service/gobblin-flow-config-service-server/src/main/java/org/apache/gobblin/service/FlowStatusResource.java index 584c57c..a08d594 100644 --- a/gobblin-restli/gobblin-flow-config-service/gobblin-flow-config-service-server/src/main/java/org/apache/gobblin/service/FlowStatusResource.java +++ b/gobblin-restli/gobblin-flow-config-service/gobblin-flow-config-service-server/src/main/java/org/apache/gobblin/service/FlowStatusResource.java @@ -17,14 +17,10 @@ package org.apache.gobblin.service; -import com.linkedin.restli.server.PagingContext; -import com.linkedin.restli.server.annotations.Context; -import com.linkedin.restli.server.annotations.Finder; -import com.linkedin.restli.server.annotations.QueryParam; import java.util.Collections; import java.util.Iterator; - import java.util.List; + import org.apache.commons.lang.StringUtils; import org.slf4j.Logger; import org.slf4j.LoggerFactory; @@ -32,10 +28,13 @@ import org.slf4j.LoggerFactory; import com.google.inject.Inject; import com.linkedin.restli.common.ComplexResourceKey; import com.linkedin.restli.common.EmptyRecord; +import com.linkedin.restli.server.PagingContext; +import com.linkedin.restli.server.annotations.Context; +import com.linkedin.restli.server.annotations.Finder; +import com.linkedin.restli.server.annotations.QueryParam; import com.linkedin.restli.server.annotations.RestLiCollection; import com.linkedin.restli.server.resources.ComplexKeyResourceTemplate; -import org.apache.gobblin.metrics.event.TimingEvent; import org.apache.gobblin.service.monitoring.FlowStatusGenerator; @@ -120,7 +119,7 @@ public class FlowStatusResource extends ComplexKeyResourceTemplate<FlowStatusId, .setExecutionStartTime(queriedJobStatus.getStartTime()) .setExecutionEndTime(queriedJobStatus.getEndTime()) .setProcessedCount(queriedJobStatus.getProcessedCount())) - .setExecutionStatus(timingEventToStatus(queriedJobStatus.getEventName())) + .setExecutionStatus(ExecutionStatus.valueOf(queriedJobStatus.getEventName())) .setMessage(queriedJobStatus.getMessage()) .setJobState(new JobState().setLowWatermark(queriedJobStatus.getLowWatermark()). setHighWatermark(queriedJobStatus.getHighWatermark())); @@ -160,35 +159,12 @@ public class FlowStatusResource extends ComplexKeyResourceTemplate<FlowStatusId, } /** - * Maps a timing event name to a flow/job ExecutionStatus - * @param timingEvent timing event name - * @return status string - */ - private ExecutionStatus timingEventToStatus(String timingEvent) { - ExecutionStatus status; - - switch (timingEvent) { - case TimingEvent.LauncherTimings.JOB_FAILED: - case TimingEvent.LauncherTimings.JOB_CANCEL: - status = ExecutionStatus.FAILED; - break; - case TimingEvent.LauncherTimings.JOB_COMPLETE: - status = ExecutionStatus.COMPLETE; - break; - default: - status = ExecutionStatus.RUNNING; - } - - return status; - } - - /** * Determines the new flow status based on the current flow status and new job status * @param jobExecutionStatus job status * @param currentFlowExecutionStatus current flow status * @return updated flow status */ - private ExecutionStatus updatedFlowExecutionStatus(ExecutionStatus jobExecutionStatus, + static ExecutionStatus updatedFlowExecutionStatus(ExecutionStatus jobExecutionStatus, ExecutionStatus currentFlowExecutionStatus) { // if any job failed or flow has failed then return failed status @@ -197,8 +173,16 @@ public class FlowStatusResource extends ComplexKeyResourceTemplate<FlowStatusId, return ExecutionStatus.FAILED; } - // if job still running then flow is still running - if (jobExecutionStatus == ExecutionStatus.RUNNING) { + // if any job is cancelled or flow has failed then return failed status + if (currentFlowExecutionStatus == ExecutionStatus.CANCELLED || + jobExecutionStatus == ExecutionStatus.CANCELLED) { + return ExecutionStatus.CANCELLED; + } + + if (currentFlowExecutionStatus == ExecutionStatus.RUNNING || + jobExecutionStatus == ExecutionStatus.RUNNING || + jobExecutionStatus == ExecutionStatus.ORCHESTRATED || + jobExecutionStatus == ExecutionStatus.COMPILED) { return ExecutionStatus.RUNNING; } http://git-wip-us.apache.org/repos/asf/incubator-gobblin/blob/391615e7/gobblin-runtime/src/main/java/org/apache/gobblin/service/monitoring/FlowStatusGenerator.java ---------------------------------------------------------------------- diff --git a/gobblin-runtime/src/main/java/org/apache/gobblin/service/monitoring/FlowStatusGenerator.java b/gobblin-runtime/src/main/java/org/apache/gobblin/service/monitoring/FlowStatusGenerator.java index 5657e42..c903ba1 100644 --- a/gobblin-runtime/src/main/java/org/apache/gobblin/service/monitoring/FlowStatusGenerator.java +++ b/gobblin-runtime/src/main/java/org/apache/gobblin/service/monitoring/FlowStatusGenerator.java @@ -18,16 +18,11 @@ package org.apache.gobblin.service.monitoring; import java.util.Iterator; -import java.util.List; -import org.apache.commons.lang.StringUtils; - -import com.google.common.collect.Lists; +import lombok.Builder; import org.apache.gobblin.annotation.Alpha; -import lombok.Builder; - /** * Generator for {@link FlowStatus}, which relies on a {@link JobStatusRetriever}. http://git-wip-us.apache.org/repos/asf/incubator-gobblin/blob/391615e7/gobblin-runtime/src/main/java/org/apache/gobblin/service/monitoring/JobStatusRetriever.java ---------------------------------------------------------------------- diff --git a/gobblin-runtime/src/main/java/org/apache/gobblin/service/monitoring/JobStatusRetriever.java b/gobblin-runtime/src/main/java/org/apache/gobblin/service/monitoring/JobStatusRetriever.java index fe60c4e..03a8cf6 100644 --- a/gobblin-runtime/src/main/java/org/apache/gobblin/service/monitoring/JobStatusRetriever.java +++ b/gobblin-runtime/src/main/java/org/apache/gobblin/service/monitoring/JobStatusRetriever.java @@ -33,6 +33,9 @@ public abstract class JobStatusRetriever implements LatestFlowExecutionIdTracker public abstract Iterator<JobStatus> getJobStatusesForFlowExecution(String flowName, String flowGroup, long flowExecutionId); + public abstract Iterator<JobStatus> getJobStatusesForFlowExecution(String flowName, String flowGroup, + long flowExecutionId, String jobGroup, String jobName); + /** * Get the latest {@link JobStatus}es that belongs to the same latest flow execution. Currently, latest flow execution * is decided by comparing {@link JobStatus#getFlowExecutionId()}. @@ -40,7 +43,7 @@ public abstract class JobStatusRetriever implements LatestFlowExecutionIdTracker public Iterator<JobStatus> getLatestJobStatusByFlowNameAndGroup(String flowName, String flowGroup) { long latestExecutionId = getLatestExecutionIdForFlow(flowName, flowGroup); - return latestExecutionId == -1l ? Iterators.<JobStatus>emptyIterator() + return latestExecutionId == -1L ? Iterators.<JobStatus>emptyIterator() : getJobStatusesForFlowExecution(flowName, flowGroup, latestExecutionId); } } http://git-wip-us.apache.org/repos/asf/incubator-gobblin/blob/391615e7/gobblin-service/src/main/java/org/apache/gobblin/service/modules/core/GobblinServiceManager.java ---------------------------------------------------------------------- diff --git a/gobblin-service/src/main/java/org/apache/gobblin/service/modules/core/GobblinServiceManager.java b/gobblin-service/src/main/java/org/apache/gobblin/service/modules/core/GobblinServiceManager.java index 9c1b42f..8d934ce 100644 --- a/gobblin-service/src/main/java/org/apache/gobblin/service/modules/core/GobblinServiceManager.java +++ b/gobblin-service/src/main/java/org/apache/gobblin/service/modules/core/GobblinServiceManager.java @@ -81,6 +81,7 @@ import org.apache.gobblin.service.FlowConfigResourceLocalHandler; import org.apache.gobblin.service.FlowConfigV2ResourceLocalHandler; import org.apache.gobblin.service.FlowConfigsResource; import org.apache.gobblin.service.FlowConfigsResourceHandler; +import org.apache.gobblin.service.FlowConfigsV2Resource; import org.apache.gobblin.service.FlowId; import org.apache.gobblin.service.Schedule; import org.apache.gobblin.service.ServiceConfigKeys; @@ -235,8 +236,8 @@ public class GobblinServiceManager implements ApplicationLauncher, StandardMetri Injector injector = Guice.createInjector(new Module() { @Override public void configure(Binder binder) { - binder.bind(FlowConfigsResourceHandler.class).annotatedWith(Names.named("flowConfigsResourceHandler")).toInstance(GobblinServiceManager.this.resourceHandler); - binder.bind(FlowConfigsResourceHandler.class).annotatedWith(Names.named("flowConfigsV2ResourceHandler")).toInstance(GobblinServiceManager.this.v2ResourceHandler); + binder.bind(FlowConfigsResourceHandler.class).annotatedWith(Names.named(FlowConfigsResource.FLOW_CONFIG_GENERATOR_INJECT_NAME)).toInstance(GobblinServiceManager.this.resourceHandler); + binder.bind(FlowConfigsResourceHandler.class).annotatedWith(Names.named(FlowConfigsV2Resource.FLOW_CONFIG_GENERATOR_INJECT_NAME)).toInstance(GobblinServiceManager.this.v2ResourceHandler); binder.bindConstant().annotatedWith(Names.named("readyToUse")).to(Boolean.TRUE); } });
