This is an automated email from the ASF dual-hosted git repository.
wlo pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/gobblin.git
The following commit(s) were added to refs/heads/master by this push:
new d13c3276a [GOBBLIN-1863] Multi-Active Launch Job Related Issues (#3727)
d13c3276a is described below
commit d13c3276a51ea5a6a1801eff5fd89dc8493a254a
Author: umustafi <[email protected]>
AuthorDate: Tue Aug 1 16:00:57 2023 -0700
[GOBBLIN-1863] Multi-Active Launch Job Related Issues (#3727)
* DagManager checks leader status before adding dag to avoid NPE
* handle launch events after leader change in DagManager
* Refactor Orchestrator and DagManager
* remove unecessary specCompiler changes
* add docstring
---------
Co-authored-by: Urmi Mustafi <[email protected]>
---
.../apache/gobblin/runtime/api/DagActionStore.java | 5 +
.../runtime/api/MysqlMultiActiveLeaseArbiter.java | 2 +-
.../service/modules/orchestration/DagManager.java | 49 +++-
.../modules/orchestration/Orchestrator.java | 279 +++++++++++++--------
.../monitoring/DagActionStoreChangeMonitor.java | 10 +-
.../modules/orchestration/DagManagerFlowTest.java | 4 +-
6 files changed, 238 insertions(+), 111 deletions(-)
diff --git
a/gobblin-runtime/src/main/java/org/apache/gobblin/runtime/api/DagActionStore.java
b/gobblin-runtime/src/main/java/org/apache/gobblin/runtime/api/DagActionStore.java
index a1a0ea237..3e11d7c72 100644
---
a/gobblin-runtime/src/main/java/org/apache/gobblin/runtime/api/DagActionStore.java
+++
b/gobblin-runtime/src/main/java/org/apache/gobblin/runtime/api/DagActionStore.java
@@ -22,6 +22,7 @@ import java.sql.SQLException;
import java.util.Collection;
import lombok.Data;
+import org.apache.gobblin.service.FlowId;
public interface DagActionStore {
@@ -40,6 +41,10 @@ public interface DagActionStore {
final String flowName;
final String flowExecutionId;
final FlowActionType flowActionType;
+
+ public FlowId getFlowId() {
+ return new
FlowId().setFlowGroup(this.flowGroup).setFlowName(this.flowName);
+ }
}
diff --git
a/gobblin-runtime/src/main/java/org/apache/gobblin/runtime/api/MysqlMultiActiveLeaseArbiter.java
b/gobblin-runtime/src/main/java/org/apache/gobblin/runtime/api/MysqlMultiActiveLeaseArbiter.java
index 88f143a3b..2cdcf71ce 100644
---
a/gobblin-runtime/src/main/java/org/apache/gobblin/runtime/api/MysqlMultiActiveLeaseArbiter.java
+++
b/gobblin-runtime/src/main/java/org/apache/gobblin/runtime/api/MysqlMultiActiveLeaseArbiter.java
@@ -197,7 +197,7 @@ public class MysqlMultiActiveLeaseArbiter implements
MultiActiveLeaseArbiter {
ResultSet resultSet = getInfoStatement.executeQuery();
try {
if (!resultSet.next()) {
- return Optional.absent();
+ return Optional.<GetEventInfoResult>absent();
}
return Optional.of(createGetInfoResult(resultSet));
} finally {
diff --git
a/gobblin-service/src/main/java/org/apache/gobblin/service/modules/orchestration/DagManager.java
b/gobblin-service/src/main/java/org/apache/gobblin/service/modules/orchestration/DagManager.java
index acbf9f71d..20e802bec 100644
---
a/gobblin-service/src/main/java/org/apache/gobblin/service/modules/orchestration/DagManager.java
+++
b/gobblin-service/src/main/java/org/apache/gobblin/service/modules/orchestration/DagManager.java
@@ -19,6 +19,7 @@ package org.apache.gobblin.service.modules.orchestration;
import java.io.IOException;
import java.net.URI;
+import java.net.URISyntaxException;
import java.util.ArrayList;
import java.util.Collection;
import java.util.Collections;
@@ -69,8 +70,10 @@ import org.apache.gobblin.runtime.api.DagActionStore;
import org.apache.gobblin.runtime.api.FlowSpec;
import org.apache.gobblin.runtime.api.JobSpec;
import org.apache.gobblin.runtime.api.Spec;
+import org.apache.gobblin.runtime.api.SpecNotFoundException;
import org.apache.gobblin.runtime.api.SpecProducer;
import org.apache.gobblin.runtime.api.TopologySpec;
+import org.apache.gobblin.runtime.spec_catalog.FlowCatalog;
import org.apache.gobblin.service.ExecutionStatus;
import org.apache.gobblin.service.FlowId;
import org.apache.gobblin.service.ServiceConfigKeys;
@@ -203,6 +206,8 @@ public class DagManager extends AbstractIdleService {
protected final Long defaultJobStartSlaTimeMillis;
@Getter
private final JobStatusRetriever jobStatusRetriever;
+ private final Orchestrator orchestrator;
+ private final FlowCatalog flowCatalog;
private final Config config;
private final Optional<EventSubmitter> eventSubmitter;
private final long failedDagRetentionTime;
@@ -213,7 +218,8 @@ public class DagManager extends AbstractIdleService {
private volatile boolean isActive = false;
- public DagManager(Config config, JobStatusRetriever jobStatusRetriever,
boolean instrumentationEnabled) {
+ public DagManager(Config config, JobStatusRetriever jobStatusRetriever,
Orchestrator orchestrator,
+ FlowCatalog flowCatalog, boolean instrumentationEnabled) {
this.config = config;
this.numThreads = ConfigUtils.getInt(config, NUM_THREADS_KEY,
DEFAULT_NUM_THREADS);
this.runQueue = (BlockingQueue<Dag<JobExecutionPlan>>[])
initializeDagQueue(this.numThreads);
@@ -234,6 +240,8 @@ public class DagManager extends AbstractIdleService {
TimeUnit jobStartTimeUnit = TimeUnit.valueOf(ConfigUtils.getString(config,
JOB_START_SLA_UNITS,
ConfigurationKeys.FALLBACK_GOBBLIN_JOB_START_SLA_TIME_UNIT));
this.defaultJobStartSlaTimeMillis =
jobStartTimeUnit.toMillis(ConfigUtils.getLong(config, JOB_START_SLA_TIME,
ConfigurationKeys.FALLBACK_GOBBLIN_JOB_START_SLA_TIME));
this.jobStatusRetriever = jobStatusRetriever;
+ this.orchestrator = orchestrator;
+ this.flowCatalog = flowCatalog;
TimeUnit timeUnit = TimeUnit.valueOf(ConfigUtils.getString(config,
FAILED_DAG_RETENTION_TIME_UNIT, DEFAULT_FAILED_DAG_RETENTION_TIME_UNIT));
this.failedDagRetentionTime =
timeUnit.toMillis(ConfigUtils.getLong(config, FAILED_DAG_RETENTION_TIME,
DEFAULT_FAILED_DAG_RETENTION_TIME));
}
@@ -258,8 +266,9 @@ public class DagManager extends AbstractIdleService {
}
@Inject
- public DagManager(Config config, JobStatusRetriever jobStatusRetriever) {
- this(config, jobStatusRetriever, true);
+ public DagManager(Config config, JobStatusRetriever jobStatusRetriever,
Orchestrator orchestrator,
+ FlowCatalog flowCatalog) {
+ this(config, jobStatusRetriever, orchestrator, flowCatalog, true);
}
/** Do Nothing on service startup. Scheduling of {@link DagManagerThread}s
and loading of any {@link Dag}s is done
@@ -280,6 +289,10 @@ public class DagManager extends AbstractIdleService {
* Note this should only be called from the {@link Orchestrator} or {@link
org.apache.gobblin.service.monitoring.DagActionStoreChangeMonitor}
*/
public synchronized void addDag(Dag<JobExecutionPlan> dag, boolean persist,
boolean setStatus) throws IOException {
+ if (!isActive) {
+ log.warn("Skipping add dag because this instance of DagManager is not
active for dag: {}", dag);
+ return;
+ }
if (persist) {
//Persist the dag
this.dagStateStore.writeCheckpoint(dag);
@@ -433,6 +446,9 @@ public class DagManager extends AbstractIdleService {
case RESUME:
this.handleResumeFlowEvent(new
ResumeFlowEvent(action.getFlowGroup(), action.getFlowName(),
Long.parseLong(action.getFlowExecutionId())));
break;
+ case LAUNCH:
+ this.handleLaunchFlowEvent(action);
+ break;
default:
log.warn("Unsupported dagAction: " +
action.getFlowActionType().toString());
}
@@ -455,6 +471,33 @@ public class DagManager extends AbstractIdleService {
}
}
+ /**
+ * Used by the DagManager to launch a new execution for a flow action event
loaded from the DagActionStore upon
+ * setting this instance of the DagManager to active. Because it may be a
completely new DAG not contained in the
+ * dagStore, we compile the flow to generate the dag before calling
addDag(), handling any errors that may result in
+ * the process.
+ */
+ public void handleLaunchFlowEvent(DagActionStore.DagAction action) {
+ FlowId flowId = action.getFlowId();
+ FlowSpec spec;
+ try {
+ URI flowUri = FlowSpec.Utils.createFlowSpecUri(flowId);
+ spec = (FlowSpec) flowCatalog.getSpecs(flowUri);
+ Optional<Dag<JobExecutionPlan>> optionalJobExecutionPlanDag =
orchestrator.handleChecksBeforeExecution(spec);
+ if (optionalJobExecutionPlanDag.isPresent()) {
+ addDag(optionalJobExecutionPlanDag.get(), true, true);
+ }
+ } catch (URISyntaxException e) {
+ log.warn("Could not create URI object for flowId {} due to exception
{}", flowId, e.getMessage());
+ } catch (SpecNotFoundException e) {
+ log.warn("Spec not found for flowId {} due to exception {}", flowId,
e.getMessage());
+ } catch (IOException e) {
+ log.warn("Failed to add Job Execution Plan for flowId {} due to
exception {}", flowId, e.getMessage());
+ } catch (InterruptedException e) {
+ log.warn("SpecCompiler failed to reach healthy state before compilation
of flowId {}. Exception: ", flowId, e);
+ }
+ }
+
private void loadDagFromDagStateStore() throws IOException {
List<Dag<JobExecutionPlan>> dags = dagStateStore.getDags();
log.info("Loading " + dags.size() + " dags from dag state store");
diff --git
a/gobblin-service/src/main/java/org/apache/gobblin/service/modules/orchestration/Orchestrator.java
b/gobblin-service/src/main/java/org/apache/gobblin/service/modules/orchestration/Orchestrator.java
index 84521087b..b059d1744 100644
---
a/gobblin-service/src/main/java/org/apache/gobblin/service/modules/orchestration/Orchestrator.java
+++
b/gobblin-service/src/main/java/org/apache/gobblin/service/modules/orchestration/Orchestrator.java
@@ -110,9 +110,9 @@ public class Orchestrator implements SpecCatalogListener,
Instrumentable {
private Map<String, FlowCompiledState> flowGauges = Maps.newHashMap();
-
- public Orchestrator(Config config, Optional<TopologyCatalog>
topologyCatalog, Optional<DagManager> dagManager, Optional<Logger> log,
- FlowStatusGenerator flowStatusGenerator, boolean instrumentationEnabled,
Optional<FlowTriggerHandler> flowTriggerHandler) {
+ public Orchestrator(Config config, Optional<TopologyCatalog>
topologyCatalog, Optional<DagManager> dagManager,
+ Optional<Logger> log, FlowStatusGenerator flowStatusGenerator, boolean
instrumentationEnabled,
+ Optional<FlowTriggerHandler> flowTriggerHandler) {
_log = log.isPresent() ? log.get() : LoggerFactory.getLogger(getClass());
this.aliasResolver = new ClassAliasResolver<>(SpecCompiler.class);
this.topologyCatalog = topologyCatalog;
@@ -126,10 +126,9 @@ public class Orchestrator implements SpecCatalogListener,
Instrumentable {
}
_log.info("Using specCompiler class name/alias " +
specCompilerClassName);
- this.specCompiler = (SpecCompiler)
ConstructorUtils.invokeConstructor(Class.forName(this.aliasResolver.resolve(
- specCompilerClassName)), config);
- } catch (NoSuchMethodException | IllegalAccessException |
InvocationTargetException | InstantiationException
- | ClassNotFoundException e) {
+ this.specCompiler = (SpecCompiler)
ConstructorUtils.invokeConstructor(Class.forName(this.aliasResolver.resolve(specCompilerClassName)),
config);
+ } catch (NoSuchMethodException | IllegalAccessException |
InvocationTargetException | InstantiationException |
+ ClassNotFoundException e) {
throw new RuntimeException(e);
}
@@ -156,7 +155,8 @@ public class Orchestrator implements SpecCatalogListener,
Instrumentable {
this.flowConcurrencyFlag = ConfigUtils.getBoolean(config,
ServiceConfigKeys.FLOW_CONCURRENCY_ALLOWED,
ServiceConfigKeys.DEFAULT_FLOW_CONCURRENCY_ALLOWED);
quotaManager =
GobblinConstructorUtils.invokeConstructor(UserQuotaManager.class,
- ConfigUtils.getString(config, ServiceConfigKeys.QUOTA_MANAGER_CLASS,
ServiceConfigKeys.DEFAULT_QUOTA_MANAGER), config);
+ ConfigUtils.getString(config, ServiceConfigKeys.QUOTA_MANAGER_CLASS,
ServiceConfigKeys.DEFAULT_QUOTA_MANAGER),
+ config);
}
@Inject
@@ -238,80 +238,20 @@ public class Orchestrator implements SpecCatalogListener,
Instrumentable {
// Only register the metric of flows that are scheduled, run once flows
should not be tracked indefinitely
if (!flowGauges.containsKey(spec.getUri().toString()) &&
flowConfig.hasPath(ConfigurationKeys.JOB_SCHEDULE_KEY)) {
- String flowCompiledGaugeName =
MetricRegistry.name(ServiceMetricNames.GOBBLIN_SERVICE_PREFIX, flowGroup,
flowName, ServiceMetricNames.COMPILED);
+ String flowCompiledGaugeName =
+ MetricRegistry.name(ServiceMetricNames.GOBBLIN_SERVICE_PREFIX,
flowGroup, flowName, ServiceMetricNames.COMPILED);
flowGauges.put(spec.getUri().toString(), new FlowCompiledState());
ContextAwareGauge<Integer> gauge =
RootMetricContext.get().newContextAwareGauge(flowCompiledGaugeName, () ->
flowGauges.get(spec.getUri().toString()).state.value);
RootMetricContext.get().register(flowCompiledGaugeName, gauge);
}
- //If the FlowSpec disallows concurrent executions, then check if another
instance of the flow is already
- //running. If so, return immediately.
- boolean allowConcurrentExecution = ConfigUtils
- .getBoolean(flowConfig,
ConfigurationKeys.FLOW_ALLOW_CONCURRENT_EXECUTION, this.flowConcurrencyFlag);
-
- Dag<JobExecutionPlan> jobExecutionPlanDag =
specCompiler.compileFlow(spec);
-
- if (!canRun(flowName, flowGroup, allowConcurrentExecution)) {
- _log.warn("Another instance of flowGroup: {}, flowName: {} running;
Skipping flow execution since "
- + "concurrent executions are disabled for this flow.", flowGroup,
flowName);
- conditionallyUpdateFlowGaugeSpecState(spec, CompiledState.SKIPPED);
- Instrumented.markMeter(this.skippedFlowsMeter);
- if
(!((FlowSpec)spec).getConfigAsProperties().containsKey(ConfigurationKeys.JOB_SCHEDULE_KEY))
{
- // For ad-hoc flow, we might already increase quota, we need to
decrease here
- for(Dag.DagNode dagNode : jobExecutionPlanDag.getStartNodes()) {
- quotaManager.releaseQuota(dagNode);
- }
- }
-
- // Send FLOW_FAILED event
- Map<String, String> flowMetadata =
TimingEventUtils.getFlowMetadata((FlowSpec) spec);
- flowMetadata.put(TimingEvent.METADATA_MESSAGE, "Flow failed because
another instance is running and concurrent "
- + "executions are disabled. Set flow.allowConcurrentExecution to
true in the flow spec to change this behaviour.");
- if (this.eventSubmitter.isPresent()) {
- new TimingEvent(this.eventSubmitter.get(),
TimingEvent.FlowTimings.FLOW_FAILED).stop(flowMetadata);
- }
+ if (!isExecutionPermittedHandler(flowConfig, spec, flowName, flowGroup))
{
return;
}
-
- Optional<TimingEvent> flowCompilationTimer =
this.eventSubmitter.transform(submitter ->
- new TimingEvent(submitter, TimingEvent.FlowTimings.FLOW_COMPILED));
-
Map<String, String> flowMetadata =
TimingEventUtils.getFlowMetadata((FlowSpec) spec);
- if (jobExecutionPlanDag == null || jobExecutionPlanDag.isEmpty()) {
- // For scheduled flows, we do not insert the flowExecutionId into the
FlowSpec. As a result, if the flow
- // compilation fails (i.e. we are unable to find a path), the metadata
will not have flowExecutionId.
- // In this case, the current time is used as the flow executionId.
-
flowMetadata.putIfAbsent(TimingEvent.FlowEventConstants.FLOW_EXECUTION_ID_FIELD,
- Long.toString(System.currentTimeMillis()));
-
- String message = "Flow was not compiled successfully.";
- if (!((FlowSpec) spec).getCompilationErrors().isEmpty()) {
- message = message + " Compilation errors encountered: " +
((FlowSpec) spec).getCompilationErrors();
- }
- flowMetadata.put(TimingEvent.METADATA_MESSAGE, message);
-
- Optional<TimingEvent> flowCompileFailedTimer =
this.eventSubmitter.transform(submitter ->
- new TimingEvent(submitter,
TimingEvent.FlowTimings.FLOW_COMPILE_FAILED));
- Instrumented.markMeter(this.flowOrchestrationFailedMeter);
- conditionallyUpdateFlowGaugeSpecState(spec, CompiledState.FAILED);
- _log.warn("Cannot determine an executor to run on for Spec: " + spec);
- if (flowCompileFailedTimer.isPresent()) {
- flowCompileFailedTimer.get().stop(flowMetadata);
- }
- return;
- }
- conditionallyUpdateFlowGaugeSpecState(spec, CompiledState.SUCCESSFUL);
-
- //If it is a scheduled flow (and hence, does not have flowExecutionId in
the FlowSpec) and the flow compilation is successful,
- // retrieve the flowExecutionId from the JobSpec.
-
flowMetadata.putIfAbsent(TimingEvent.FlowEventConstants.FLOW_EXECUTION_ID_FIELD,
-
jobExecutionPlanDag.getNodes().get(0).getValue().getJobSpec().getConfigAsProperties().getProperty(ConfigurationKeys.FLOW_EXECUTION_ID_KEY));
-
- if (flowCompilationTimer.isPresent()) {
- flowCompilationTimer.get().stop(flowMetadata);
- }
// If multi-active scheduler is enabled do not pass onto DagManager,
otherwise scheduler forwards it directly
+ // Skip flow compilation as well, since we recompile after receiving
event from DagActionStoreChangeMonitor later
if (flowTriggerHandler.isPresent()) {
// If triggerTimestampMillis is 0, then it was not set by the job
trigger handler, and we cannot handle this event
if (triggerTimestampMillis == 0L) {
@@ -331,38 +271,60 @@ public class Orchestrator implements SpecCatalogListener,
Instrumentable {
flowTriggerHandler.get().handleTriggerEvent(jobProps, flowAction,
triggerTimestampMillis);
_log.info("Multi-active scheduler finished handling trigger event:
[{}, triggerEventTimestamp: {}]", flowAction,
triggerTimestampMillis);
- } else if (this.dagManager.isPresent()) {
- submitFlowToDagManager((FlowSpec) spec, jobExecutionPlanDag);
} else {
- // Schedule all compiled JobSpecs on their respective Executor
- for (Dag.DagNode<JobExecutionPlan> dagNode :
jobExecutionPlanDag.getNodes()) {
- DagManagerUtils.incrementJobAttempt(dagNode);
- JobExecutionPlan jobExecutionPlan = dagNode.getValue();
-
- // Run this spec on selected executor
- SpecProducer producer = null;
- try {
- producer = jobExecutionPlan.getSpecExecutor().getProducer().get();
- Spec jobSpec = jobExecutionPlan.getJobSpec();
-
- if (!((JobSpec)
jobSpec).getConfig().hasPath(ConfigurationKeys.FLOW_EXECUTION_ID_KEY)) {
- _log.warn("JobSpec does not contain flowExecutionId.");
- }
-
- Map<String, String> jobMetadata =
TimingEventUtils.getJobMetadata(flowMetadata, jobExecutionPlan);
- _log.info(String.format("Going to orchestrate JobSpec: %s on
Executor: %s", jobSpec, producer));
-
- Optional<TimingEvent> jobOrchestrationTimer =
this.eventSubmitter.transform(submitter ->
- new TimingEvent(submitter,
TimingEvent.LauncherTimings.JOB_ORCHESTRATED));
+ // Compile flow spec and do corresponding checks
+ Dag<JobExecutionPlan> jobExecutionPlanDag =
specCompiler.compileFlow(spec);
+ Optional<TimingEvent> flowCompilationTimer =
+ this.eventSubmitter.transform(submitter -> new
TimingEvent(submitter, TimingEvent.FlowTimings.FLOW_COMPILED));
+
+ if (jobExecutionPlanDag == null || jobExecutionPlanDag.isEmpty()) {
+ populateFlowCompilationFailedEventMessage(spec, flowMetadata);
+ Instrumented.markMeter(this.flowOrchestrationFailedMeter);
+ conditionallyUpdateFlowGaugeSpecState(spec, CompiledState.FAILED);
+ _log.warn("Cannot determine an executor to run on for Spec: " +
spec);
+ return;
+ }
+ conditionallyUpdateFlowGaugeSpecState(spec, CompiledState.SUCCESSFUL);
- producer.addSpec(jobSpec);
+ addFlowExecutionIdIfAbsent(flowMetadata, jobExecutionPlanDag);
+ if (flowCompilationTimer.isPresent()) {
+ flowCompilationTimer.get().stop(flowMetadata);
+ }
- if (jobOrchestrationTimer.isPresent()) {
- jobOrchestrationTimer.get().stop(jobMetadata);
+ // Depending on if DagManager is present, handle execution
+ if (this.dagManager.isPresent()) {
+ submitFlowToDagManager((FlowSpec) spec, jobExecutionPlanDag);
+ } else {
+ // Schedule all compiled JobSpecs on their respective Executor
+ for (Dag.DagNode<JobExecutionPlan> dagNode :
jobExecutionPlanDag.getNodes()) {
+ DagManagerUtils.incrementJobAttempt(dagNode);
+ JobExecutionPlan jobExecutionPlan = dagNode.getValue();
+
+ // Run this spec on selected executor
+ SpecProducer producer = null;
+ try {
+ producer =
jobExecutionPlan.getSpecExecutor().getProducer().get();
+ Spec jobSpec = jobExecutionPlan.getJobSpec();
+
+ if (!((JobSpec)
jobSpec).getConfig().hasPath(ConfigurationKeys.FLOW_EXECUTION_ID_KEY)) {
+ _log.warn("JobSpec does not contain flowExecutionId.");
+ }
+
+ Map<String, String> jobMetadata =
TimingEventUtils.getJobMetadata(flowMetadata, jobExecutionPlan);
+ _log.info(String.format("Going to orchestrate JobSpec: %s on
Executor: %s", jobSpec, producer));
+
+ Optional<TimingEvent> jobOrchestrationTimer =
this.eventSubmitter.transform(
+ submitter -> new TimingEvent(submitter,
TimingEvent.LauncherTimings.JOB_ORCHESTRATED));
+
+ producer.addSpec(jobSpec);
+
+ if (jobOrchestrationTimer.isPresent()) {
+ jobOrchestrationTimer.get().stop(jobMetadata);
+ }
+ } catch (Exception e) {
+ _log.error("Cannot successfully setup spec: " +
jobExecutionPlan.getJobSpec() + " on executor: " + producer
+ + " for flow: " + spec, e);
}
- } catch (Exception e) {
- _log.error("Cannot successfully setup spec: " +
jobExecutionPlan.getJobSpec() + " on executor: " + producer
- + " for flow: " + spec, e);
}
}
}
@@ -374,9 +336,120 @@ public class Orchestrator implements SpecCatalogListener,
Instrumentable {
Instrumented.updateTimer(this.flowOrchestrationTimer, System.nanoTime() -
startTime, TimeUnit.NANOSECONDS);
}
- public void submitFlowToDagManager(FlowSpec flowSpec)
+ /**
+ * If it is a scheduled flow (and hence, does not have flowExecutionId in
the FlowSpec) and the flow compilation is
+ * successful, retrieve the flowExecutionId from the JobSpec.
+ */
+ public void addFlowExecutionIdIfAbsent(Map<String,String> flowMetadata,
Dag<JobExecutionPlan> jobExecutionPlanDag) {
+
flowMetadata.putIfAbsent(TimingEvent.FlowEventConstants.FLOW_EXECUTION_ID_FIELD,
+
jobExecutionPlanDag.getNodes().get(0).getValue().getJobSpec().getConfigAsProperties().getProperty(
+ ConfigurationKeys.FLOW_EXECUTION_ID_KEY));
+ }
+
+ /**
+ * Checks if flowSpec disallows concurrent executions, and if so then checks
if another instance of the flow is
+ * already running and emits a FLOW FAILED event. Otherwise, this check
passes.
+ * @return true if caller can proceed to execute flow, false otherwise
+ * @throws IOException
+ */
+ public boolean isExecutionPermittedHandler(Config flowConfig, Spec spec,
String flowName, String flowGroup)
throws IOException {
- submitFlowToDagManager(flowSpec, specCompiler.compileFlow(flowSpec));
+ boolean allowConcurrentExecution = ConfigUtils.getBoolean(flowConfig,
ConfigurationKeys.FLOW_ALLOW_CONCURRENT_EXECUTION, this.flowConcurrencyFlag);
+
+ Dag<JobExecutionPlan> jobExecutionPlanDag = specCompiler.compileFlow(spec);
+
+ if (!isExecutionPermitted(flowName, flowGroup, allowConcurrentExecution)) {
+ _log.warn("Another instance of flowGroup: {}, flowName: {} running;
Skipping flow execution since "
+ + "concurrent executions are disabled for this flow.", flowGroup,
flowName);
+ conditionallyUpdateFlowGaugeSpecState(spec, CompiledState.SKIPPED);
+ Instrumented.markMeter(this.skippedFlowsMeter);
+ if (!((FlowSpec)
spec).getConfigAsProperties().containsKey(ConfigurationKeys.JOB_SCHEDULE_KEY)) {
+ // For ad-hoc flow, we might already increase quota, we need to
decrease here
+ for (Dag.DagNode dagNode : jobExecutionPlanDag.getStartNodes()) {
+ quotaManager.releaseQuota(dagNode);
+ }
+ }
+
+ // Send FLOW_FAILED event
+ Map<String, String> flowMetadata =
TimingEventUtils.getFlowMetadata((FlowSpec) spec);
+ flowMetadata.put(TimingEvent.METADATA_MESSAGE, "Flow failed because
another instance is running and concurrent "
+ + "executions are disabled. Set flow.allowConcurrentExecution to
true in the flow spec to change this behaviour.");
+ if (this.eventSubmitter.isPresent()) {
+ new TimingEvent(this.eventSubmitter.get(),
TimingEvent.FlowTimings.FLOW_FAILED).stop(flowMetadata);
+ }
+ return false;
+ }
+ return true;
+ }
+
+
+ /**
+ * Abstraction used to populate the message of and emit a FlowCompileFailed
event for the Orchestrator.
+ * @param spec
+ * @param flowMetadata
+ */
+ public void populateFlowCompilationFailedEventMessage(Spec spec,
+ Map<String, String> flowMetadata) {
+ // For scheduled flows, we do not insert the flowExecutionId into the
FlowSpec. As a result, if the flow
+ // compilation fails (i.e. we are unable to find a path), the metadata
will not have flowExecutionId.
+ // In this case, the current time is used as the flow executionId.
+
flowMetadata.putIfAbsent(TimingEvent.FlowEventConstants.FLOW_EXECUTION_ID_FIELD,
+ Long.toString(System.currentTimeMillis()));
+
+ String message = "Flow was not compiled successfully.";
+ if (!((FlowSpec) spec).getCompilationErrors().isEmpty()) {
+ message = message + " Compilation errors encountered: " + ((FlowSpec)
spec).getCompilationErrors();
+ }
+ flowMetadata.put(TimingEvent.METADATA_MESSAGE, message);
+
+ Optional<TimingEvent> flowCompileFailedTimer =
eventSubmitter.transform(submitter ->
+ new TimingEvent(submitter,
TimingEvent.FlowTimings.FLOW_COMPILE_FAILED));
+
+ if (flowCompileFailedTimer.isPresent()) {
+ flowCompileFailedTimer.get().stop(flowMetadata);
+ }
+ }
+
+ /**
+ * For a given a flowSpec, verifies that an execution is allowed (in case
there is an ongoing execution) and the
+ * flowspec can be compiled. If the pre-conditions hold, then a
JobExecutionPlan is constructed and returned to the
+ * caller.
+ * @return jobExecutionPlan dag if one can be constructed for the given
flowSpec
+ */
+ public Optional<Dag<JobExecutionPlan>> handleChecksBeforeExecution(FlowSpec
flowSpec)
+ throws IOException, InterruptedException {
+ Config flowConfig = flowSpec.getConfig();
+ String flowGroup = flowConfig.getString(ConfigurationKeys.FLOW_GROUP_KEY);
+ String flowName = flowConfig.getString(ConfigurationKeys.FLOW_NAME_KEY);
+ if (!isExecutionPermittedHandler(flowConfig, flowSpec, flowName,
flowGroup)) {
+ return Optional.absent();
+ }
+
+ //Wait for the SpecCompiler to become healthy.
+ this.getSpecCompiler().awaitHealthy();
+
+ Dag<JobExecutionPlan> jobExecutionPlanDag =
specCompiler.compileFlow(flowSpec);
+ Optional<TimingEvent> flowCompilationTimer =
+ this.eventSubmitter.transform(submitter -> new TimingEvent(submitter,
TimingEvent.FlowTimings.FLOW_COMPILED));
+ Map<String, String> flowMetadata =
TimingEventUtils.getFlowMetadata(flowSpec);
+
+ if (jobExecutionPlanDag == null || jobExecutionPlanDag.isEmpty()) {
+ populateFlowCompilationFailedEventMessage(flowSpec, flowMetadata);
+ return Optional.absent();
+ }
+
+ addFlowExecutionIdIfAbsent(flowMetadata, jobExecutionPlanDag);
+ if (flowCompilationTimer.isPresent()) {
+ flowCompilationTimer.get().stop(flowMetadata);
+ }
+ return Optional.of(jobExecutionPlanDag);
+ }
+
+ public void submitFlowToDagManager(FlowSpec flowSpec) throws IOException,
InterruptedException {
+ Optional<Dag<JobExecutionPlan>> optionalJobExecutionPlanDag =
handleChecksBeforeExecution(flowSpec);
+ if (optionalJobExecutionPlanDag.isPresent()) {
+ submitFlowToDagManager(flowSpec, optionalJobExecutionPlanDag.get());
+ }
}
public void submitFlowToDagManager(FlowSpec flowSpec, Dag<JobExecutionPlan>
jobExecutionPlanDag)
@@ -404,7 +477,7 @@ public class Orchestrator implements SpecCatalogListener,
Instrumentable {
* @param allowConcurrentExecution
* @return true if the {@link FlowSpec} allows concurrent executions or if
no other instance of the flow is currently RUNNING.
*/
- private boolean canRun(String flowName, String flowGroup, boolean
allowConcurrentExecution) {
+ private boolean isExecutionPermitted(String flowName, String flowGroup,
boolean allowConcurrentExecution) {
if (allowConcurrentExecution) {
return true;
} else {
diff --git
a/gobblin-service/src/main/java/org/apache/gobblin/service/monitoring/DagActionStoreChangeMonitor.java
b/gobblin-service/src/main/java/org/apache/gobblin/service/monitoring/DagActionStoreChangeMonitor.java
index 456851612..870b68f53 100644
---
a/gobblin-service/src/main/java/org/apache/gobblin/service/monitoring/DagActionStoreChangeMonitor.java
+++
b/gobblin-service/src/main/java/org/apache/gobblin/service/monitoring/DagActionStoreChangeMonitor.java
@@ -190,15 +190,19 @@ public class DagActionStoreChangeMonitor extends
HighLevelConsumer {
spec = (FlowSpec) flowCatalog.getSpecs(flowUri);
this.orchestrator.submitFlowToDagManager(spec);
} catch (URISyntaxException e) {
- log.warn("Could not create URI object for flowId {} due to error {}",
flowId, e.getMessage());
+ log.warn("Could not create URI object for flowId {}. Exception {}",
flowId, e.getMessage());
this.unexpectedErrors.mark();
return;
} catch (SpecNotFoundException e) {
- log.warn("Spec not found for flow group: {} name: {} Exception: {}",
flowGroup, flowName, e);
+ log.warn("Spec not found for flowId {} due to exception {}", flowId,
e.getMessage());
this.unexpectedErrors.mark();
return;
} catch (IOException e) {
- log.warn("Failed to add Job Execution Plan for flow group: {} name: {}
due to error {}", flowGroup, flowName, e);
+ log.warn("Failed to add Job Execution Plan for flowId {} due to
exception {}", flowId, e.getMessage());
+ this.unexpectedErrors.mark();
+ return;
+ } catch (InterruptedException e) {
+ log.warn("SpecCompiler failed to reach healthy state before compilation
of flowId {}. Exception: ", flowId, e);
this.unexpectedErrors.mark();
return;
}
diff --git
a/gobblin-service/src/test/java/org/apache/gobblin/service/modules/orchestration/DagManagerFlowTest.java
b/gobblin-service/src/test/java/org/apache/gobblin/service/modules/orchestration/DagManagerFlowTest.java
index e8c4fd443..603be2a17 100644
---
a/gobblin-service/src/test/java/org/apache/gobblin/service/modules/orchestration/DagManagerFlowTest.java
+++
b/gobblin-service/src/test/java/org/apache/gobblin/service/modules/orchestration/DagManagerFlowTest.java
@@ -25,6 +25,7 @@ import java.util.Properties;
import java.util.concurrent.TimeUnit;
import java.util.concurrent.TimeoutException;
+import org.apache.gobblin.runtime.spec_catalog.FlowCatalog;
import org.mockito.Mockito;
import org.testng.Assert;
import org.testng.annotations.AfterClass;
@@ -341,7 +342,8 @@ class CancelPredicate implements Predicate<Void> {
class MockedDagManager extends DagManager {
public MockedDagManager(Config config, boolean instrumentationEnabled) {
- super(config, createJobStatusRetriever(), instrumentationEnabled);
+ super(config, createJobStatusRetriever(),
Mockito.mock(Orchestrator.class), Mockito.mock(FlowCatalog.class),
+ instrumentationEnabled);
}
private static JobStatusRetriever createJobStatusRetriever() {