This is an automated email from the ASF dual-hosted git repository.

wlo pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/gobblin.git


The following commit(s) were added to refs/heads/master by this push:
     new d13c3276a [GOBBLIN-1863] Multi-Active Launch Job Related Issues (#3727)
d13c3276a is described below

commit d13c3276a51ea5a6a1801eff5fd89dc8493a254a
Author: umustafi <[email protected]>
AuthorDate: Tue Aug 1 16:00:57 2023 -0700

    [GOBBLIN-1863] Multi-Active Launch Job Related Issues (#3727)
    
    * DagManager checks leader status before adding dag to avoid NPE
    
    * handle launch events after leader change in DagManager
    
    * Refactor Orchestrator and DagManager
    
    * remove unecessary specCompiler changes
    
    * add docstring
    
    ---------
    
    Co-authored-by: Urmi Mustafi <[email protected]>
---
 .../apache/gobblin/runtime/api/DagActionStore.java |   5 +
 .../runtime/api/MysqlMultiActiveLeaseArbiter.java  |   2 +-
 .../service/modules/orchestration/DagManager.java  |  49 +++-
 .../modules/orchestration/Orchestrator.java        | 279 +++++++++++++--------
 .../monitoring/DagActionStoreChangeMonitor.java    |  10 +-
 .../modules/orchestration/DagManagerFlowTest.java  |   4 +-
 6 files changed, 238 insertions(+), 111 deletions(-)

diff --git 
a/gobblin-runtime/src/main/java/org/apache/gobblin/runtime/api/DagActionStore.java
 
b/gobblin-runtime/src/main/java/org/apache/gobblin/runtime/api/DagActionStore.java
index a1a0ea237..3e11d7c72 100644
--- 
a/gobblin-runtime/src/main/java/org/apache/gobblin/runtime/api/DagActionStore.java
+++ 
b/gobblin-runtime/src/main/java/org/apache/gobblin/runtime/api/DagActionStore.java
@@ -22,6 +22,7 @@ import java.sql.SQLException;
 import java.util.Collection;
 
 import lombok.Data;
+import org.apache.gobblin.service.FlowId;
 
 
 public interface DagActionStore {
@@ -40,6 +41,10 @@ public interface DagActionStore {
     final String flowName;
     final String flowExecutionId;
     final FlowActionType flowActionType;
+
+    public FlowId getFlowId() {
+      return new 
FlowId().setFlowGroup(this.flowGroup).setFlowName(this.flowName);
+    }
   }
 
 
diff --git 
a/gobblin-runtime/src/main/java/org/apache/gobblin/runtime/api/MysqlMultiActiveLeaseArbiter.java
 
b/gobblin-runtime/src/main/java/org/apache/gobblin/runtime/api/MysqlMultiActiveLeaseArbiter.java
index 88f143a3b..2cdcf71ce 100644
--- 
a/gobblin-runtime/src/main/java/org/apache/gobblin/runtime/api/MysqlMultiActiveLeaseArbiter.java
+++ 
b/gobblin-runtime/src/main/java/org/apache/gobblin/runtime/api/MysqlMultiActiveLeaseArbiter.java
@@ -197,7 +197,7 @@ public class MysqlMultiActiveLeaseArbiter implements 
MultiActiveLeaseArbiter {
           ResultSet resultSet = getInfoStatement.executeQuery();
           try {
             if (!resultSet.next()) {
-              return Optional.absent();
+              return Optional.<GetEventInfoResult>absent();
             }
             return Optional.of(createGetInfoResult(resultSet));
           } finally {
diff --git 
a/gobblin-service/src/main/java/org/apache/gobblin/service/modules/orchestration/DagManager.java
 
b/gobblin-service/src/main/java/org/apache/gobblin/service/modules/orchestration/DagManager.java
index acbf9f71d..20e802bec 100644
--- 
a/gobblin-service/src/main/java/org/apache/gobblin/service/modules/orchestration/DagManager.java
+++ 
b/gobblin-service/src/main/java/org/apache/gobblin/service/modules/orchestration/DagManager.java
@@ -19,6 +19,7 @@ package org.apache.gobblin.service.modules.orchestration;
 
 import java.io.IOException;
 import java.net.URI;
+import java.net.URISyntaxException;
 import java.util.ArrayList;
 import java.util.Collection;
 import java.util.Collections;
@@ -69,8 +70,10 @@ import org.apache.gobblin.runtime.api.DagActionStore;
 import org.apache.gobblin.runtime.api.FlowSpec;
 import org.apache.gobblin.runtime.api.JobSpec;
 import org.apache.gobblin.runtime.api.Spec;
+import org.apache.gobblin.runtime.api.SpecNotFoundException;
 import org.apache.gobblin.runtime.api.SpecProducer;
 import org.apache.gobblin.runtime.api.TopologySpec;
+import org.apache.gobblin.runtime.spec_catalog.FlowCatalog;
 import org.apache.gobblin.service.ExecutionStatus;
 import org.apache.gobblin.service.FlowId;
 import org.apache.gobblin.service.ServiceConfigKeys;
@@ -203,6 +206,8 @@ public class DagManager extends AbstractIdleService {
   protected final Long defaultJobStartSlaTimeMillis;
   @Getter
   private final JobStatusRetriever jobStatusRetriever;
+  private final Orchestrator orchestrator;
+  private final FlowCatalog flowCatalog;
   private final Config config;
   private final Optional<EventSubmitter> eventSubmitter;
   private final long failedDagRetentionTime;
@@ -213,7 +218,8 @@ public class DagManager extends AbstractIdleService {
 
   private volatile boolean isActive = false;
 
-  public DagManager(Config config, JobStatusRetriever jobStatusRetriever, 
boolean instrumentationEnabled) {
+  public DagManager(Config config, JobStatusRetriever jobStatusRetriever, 
Orchestrator orchestrator,
+      FlowCatalog flowCatalog, boolean instrumentationEnabled) {
     this.config = config;
     this.numThreads = ConfigUtils.getInt(config, NUM_THREADS_KEY, 
DEFAULT_NUM_THREADS);
     this.runQueue = (BlockingQueue<Dag<JobExecutionPlan>>[]) 
initializeDagQueue(this.numThreads);
@@ -234,6 +240,8 @@ public class DagManager extends AbstractIdleService {
     TimeUnit jobStartTimeUnit = TimeUnit.valueOf(ConfigUtils.getString(config, 
JOB_START_SLA_UNITS, 
ConfigurationKeys.FALLBACK_GOBBLIN_JOB_START_SLA_TIME_UNIT));
     this.defaultJobStartSlaTimeMillis = 
jobStartTimeUnit.toMillis(ConfigUtils.getLong(config, JOB_START_SLA_TIME, 
ConfigurationKeys.FALLBACK_GOBBLIN_JOB_START_SLA_TIME));
     this.jobStatusRetriever = jobStatusRetriever;
+    this.orchestrator = orchestrator;
+    this.flowCatalog = flowCatalog;
     TimeUnit timeUnit = TimeUnit.valueOf(ConfigUtils.getString(config, 
FAILED_DAG_RETENTION_TIME_UNIT, DEFAULT_FAILED_DAG_RETENTION_TIME_UNIT));
     this.failedDagRetentionTime = 
timeUnit.toMillis(ConfigUtils.getLong(config, FAILED_DAG_RETENTION_TIME, 
DEFAULT_FAILED_DAG_RETENTION_TIME));
   }
@@ -258,8 +266,9 @@ public class DagManager extends AbstractIdleService {
   }
 
   @Inject
-  public DagManager(Config config, JobStatusRetriever jobStatusRetriever) {
-    this(config, jobStatusRetriever, true);
+  public DagManager(Config config, JobStatusRetriever jobStatusRetriever, 
Orchestrator orchestrator,
+      FlowCatalog flowCatalog) {
+    this(config, jobStatusRetriever, orchestrator, flowCatalog, true);
   }
 
   /** Do Nothing on service startup. Scheduling of {@link DagManagerThread}s 
and loading of any {@link Dag}s is done
@@ -280,6 +289,10 @@ public class DagManager extends AbstractIdleService {
    * Note this should only be called from the {@link Orchestrator} or {@link 
org.apache.gobblin.service.monitoring.DagActionStoreChangeMonitor}
    */
   public synchronized void addDag(Dag<JobExecutionPlan> dag, boolean persist, 
boolean setStatus) throws IOException {
+    if (!isActive) {
+      log.warn("Skipping add dag because this instance of DagManager is not 
active for dag: {}", dag);
+      return;
+    }
     if (persist) {
       //Persist the dag
       this.dagStateStore.writeCheckpoint(dag);
@@ -433,6 +446,9 @@ public class DagManager extends AbstractIdleService {
               case RESUME:
                 this.handleResumeFlowEvent(new 
ResumeFlowEvent(action.getFlowGroup(), action.getFlowName(), 
Long.parseLong(action.getFlowExecutionId())));
                 break;
+              case LAUNCH:
+                this.handleLaunchFlowEvent(action);
+                break;
               default:
                 log.warn("Unsupported dagAction: " + 
action.getFlowActionType().toString());
             }
@@ -455,6 +471,33 @@ public class DagManager extends AbstractIdleService {
     }
   }
 
+  /**
+   * Used by the DagManager to launch a new execution for a flow action event 
loaded from the DagActionStore upon
+   * setting this instance of the DagManager to active. Because it may be a 
completely new DAG not contained in the
+   * dagStore, we compile the flow to generate the dag before calling 
addDag(), handling any errors that may result in
+   * the process.
+   */
+  public void handleLaunchFlowEvent(DagActionStore.DagAction action) {
+    FlowId flowId = action.getFlowId();
+    FlowSpec spec;
+    try {
+      URI flowUri = FlowSpec.Utils.createFlowSpecUri(flowId);
+      spec = (FlowSpec) flowCatalog.getSpecs(flowUri);
+      Optional<Dag<JobExecutionPlan>> optionalJobExecutionPlanDag = 
orchestrator.handleChecksBeforeExecution(spec);
+      if (optionalJobExecutionPlanDag.isPresent()) {
+        addDag(optionalJobExecutionPlanDag.get(), true, true);
+      }
+    } catch (URISyntaxException e) {
+      log.warn("Could not create URI object for flowId {} due to exception 
{}", flowId, e.getMessage());
+    } catch (SpecNotFoundException e) {
+      log.warn("Spec not found for flowId {} due to exception {}", flowId, 
e.getMessage());
+    } catch (IOException e) {
+      log.warn("Failed to add Job Execution Plan for flowId {} due to 
exception {}", flowId, e.getMessage());
+    } catch (InterruptedException e) {
+      log.warn("SpecCompiler failed to reach healthy state before compilation 
of flowId {}. Exception: ", flowId, e);
+    }
+  }
+
   private void loadDagFromDagStateStore() throws IOException {
     List<Dag<JobExecutionPlan>> dags = dagStateStore.getDags();
     log.info("Loading " + dags.size() + " dags from dag state store");
diff --git 
a/gobblin-service/src/main/java/org/apache/gobblin/service/modules/orchestration/Orchestrator.java
 
b/gobblin-service/src/main/java/org/apache/gobblin/service/modules/orchestration/Orchestrator.java
index 84521087b..b059d1744 100644
--- 
a/gobblin-service/src/main/java/org/apache/gobblin/service/modules/orchestration/Orchestrator.java
+++ 
b/gobblin-service/src/main/java/org/apache/gobblin/service/modules/orchestration/Orchestrator.java
@@ -110,9 +110,9 @@ public class Orchestrator implements SpecCatalogListener, 
Instrumentable {
 
   private Map<String, FlowCompiledState> flowGauges = Maps.newHashMap();
 
-
-  public Orchestrator(Config config, Optional<TopologyCatalog> 
topologyCatalog, Optional<DagManager> dagManager, Optional<Logger> log,
-      FlowStatusGenerator flowStatusGenerator, boolean instrumentationEnabled, 
Optional<FlowTriggerHandler> flowTriggerHandler) {
+  public Orchestrator(Config config, Optional<TopologyCatalog> 
topologyCatalog, Optional<DagManager> dagManager,
+      Optional<Logger> log, FlowStatusGenerator flowStatusGenerator, boolean 
instrumentationEnabled,
+      Optional<FlowTriggerHandler> flowTriggerHandler) {
     _log = log.isPresent() ? log.get() : LoggerFactory.getLogger(getClass());
     this.aliasResolver = new ClassAliasResolver<>(SpecCompiler.class);
     this.topologyCatalog = topologyCatalog;
@@ -126,10 +126,9 @@ public class Orchestrator implements SpecCatalogListener, 
Instrumentable {
       }
       _log.info("Using specCompiler class name/alias " + 
specCompilerClassName);
 
-      this.specCompiler = (SpecCompiler) 
ConstructorUtils.invokeConstructor(Class.forName(this.aliasResolver.resolve(
-          specCompilerClassName)), config);
-    } catch (NoSuchMethodException | IllegalAccessException | 
InvocationTargetException | InstantiationException
-        | ClassNotFoundException e) {
+      this.specCompiler = (SpecCompiler) 
ConstructorUtils.invokeConstructor(Class.forName(this.aliasResolver.resolve(specCompilerClassName)),
 config);
+    } catch (NoSuchMethodException | IllegalAccessException | 
InvocationTargetException | InstantiationException |
+             ClassNotFoundException e) {
       throw new RuntimeException(e);
     }
 
@@ -156,7 +155,8 @@ public class Orchestrator implements SpecCatalogListener, 
Instrumentable {
     this.flowConcurrencyFlag = ConfigUtils.getBoolean(config, 
ServiceConfigKeys.FLOW_CONCURRENCY_ALLOWED,
         ServiceConfigKeys.DEFAULT_FLOW_CONCURRENCY_ALLOWED);
     quotaManager = 
GobblinConstructorUtils.invokeConstructor(UserQuotaManager.class,
-        ConfigUtils.getString(config, ServiceConfigKeys.QUOTA_MANAGER_CLASS, 
ServiceConfigKeys.DEFAULT_QUOTA_MANAGER), config);
+        ConfigUtils.getString(config, ServiceConfigKeys.QUOTA_MANAGER_CLASS, 
ServiceConfigKeys.DEFAULT_QUOTA_MANAGER),
+        config);
   }
 
   @Inject
@@ -238,80 +238,20 @@ public class Orchestrator implements SpecCatalogListener, 
Instrumentable {
 
       // Only register the metric of flows that are scheduled, run once flows 
should not be tracked indefinitely
       if (!flowGauges.containsKey(spec.getUri().toString()) && 
flowConfig.hasPath(ConfigurationKeys.JOB_SCHEDULE_KEY)) {
-        String flowCompiledGaugeName = 
MetricRegistry.name(ServiceMetricNames.GOBBLIN_SERVICE_PREFIX, flowGroup, 
flowName, ServiceMetricNames.COMPILED);
+        String flowCompiledGaugeName =
+            MetricRegistry.name(ServiceMetricNames.GOBBLIN_SERVICE_PREFIX, 
flowGroup, flowName, ServiceMetricNames.COMPILED);
         flowGauges.put(spec.getUri().toString(), new FlowCompiledState());
         ContextAwareGauge<Integer> gauge = 
RootMetricContext.get().newContextAwareGauge(flowCompiledGaugeName, () -> 
flowGauges.get(spec.getUri().toString()).state.value);
         RootMetricContext.get().register(flowCompiledGaugeName, gauge);
       }
 
-      //If the FlowSpec disallows concurrent executions, then check if another 
instance of the flow is already
-      //running. If so, return immediately.
-      boolean allowConcurrentExecution = ConfigUtils
-          .getBoolean(flowConfig, 
ConfigurationKeys.FLOW_ALLOW_CONCURRENT_EXECUTION, this.flowConcurrencyFlag);
-
-      Dag<JobExecutionPlan> jobExecutionPlanDag = 
specCompiler.compileFlow(spec);
-
-      if (!canRun(flowName, flowGroup, allowConcurrentExecution)) {
-        _log.warn("Another instance of flowGroup: {}, flowName: {} running; 
Skipping flow execution since "
-            + "concurrent executions are disabled for this flow.", flowGroup, 
flowName);
-        conditionallyUpdateFlowGaugeSpecState(spec, CompiledState.SKIPPED);
-        Instrumented.markMeter(this.skippedFlowsMeter);
-        if 
(!((FlowSpec)spec).getConfigAsProperties().containsKey(ConfigurationKeys.JOB_SCHEDULE_KEY))
 {
-          // For ad-hoc flow, we might already increase quota, we need to 
decrease here
-          for(Dag.DagNode dagNode : jobExecutionPlanDag.getStartNodes()) {
-            quotaManager.releaseQuota(dagNode);
-          }
-        }
-
-        // Send FLOW_FAILED event
-        Map<String, String> flowMetadata = 
TimingEventUtils.getFlowMetadata((FlowSpec) spec);
-        flowMetadata.put(TimingEvent.METADATA_MESSAGE, "Flow failed because 
another instance is running and concurrent "
-            + "executions are disabled. Set flow.allowConcurrentExecution to 
true in the flow spec to change this behaviour.");
-        if (this.eventSubmitter.isPresent()) {
-          new TimingEvent(this.eventSubmitter.get(), 
TimingEvent.FlowTimings.FLOW_FAILED).stop(flowMetadata);
-        }
+      if (!isExecutionPermittedHandler(flowConfig, spec, flowName, flowGroup)) 
{
         return;
       }
-
-      Optional<TimingEvent> flowCompilationTimer = 
this.eventSubmitter.transform(submitter ->
-          new TimingEvent(submitter, TimingEvent.FlowTimings.FLOW_COMPILED));
-
       Map<String, String> flowMetadata = 
TimingEventUtils.getFlowMetadata((FlowSpec) spec);
-      if (jobExecutionPlanDag == null || jobExecutionPlanDag.isEmpty()) {
-        // For scheduled flows, we do not insert the flowExecutionId into the 
FlowSpec. As a result, if the flow
-        // compilation fails (i.e. we are unable to find a path), the metadata 
will not have flowExecutionId.
-        // In this case, the current time is used as the flow executionId.
-        
flowMetadata.putIfAbsent(TimingEvent.FlowEventConstants.FLOW_EXECUTION_ID_FIELD,
-            Long.toString(System.currentTimeMillis()));
-
-        String message = "Flow was not compiled successfully.";
-        if (!((FlowSpec) spec).getCompilationErrors().isEmpty()) {
-          message = message + " Compilation errors encountered: " + 
((FlowSpec) spec).getCompilationErrors();
-        }
-        flowMetadata.put(TimingEvent.METADATA_MESSAGE, message);
-
-        Optional<TimingEvent> flowCompileFailedTimer = 
this.eventSubmitter.transform(submitter ->
-            new TimingEvent(submitter, 
TimingEvent.FlowTimings.FLOW_COMPILE_FAILED));
-        Instrumented.markMeter(this.flowOrchestrationFailedMeter);
-        conditionallyUpdateFlowGaugeSpecState(spec, CompiledState.FAILED);
-        _log.warn("Cannot determine an executor to run on for Spec: " + spec);
-        if (flowCompileFailedTimer.isPresent()) {
-          flowCompileFailedTimer.get().stop(flowMetadata);
-        }
-        return;
-      }
-      conditionallyUpdateFlowGaugeSpecState(spec, CompiledState.SUCCESSFUL);
-
-      //If it is a scheduled flow (and hence, does not have flowExecutionId in 
the FlowSpec) and the flow compilation is successful,
-      // retrieve the flowExecutionId from the JobSpec.
-      
flowMetadata.putIfAbsent(TimingEvent.FlowEventConstants.FLOW_EXECUTION_ID_FIELD,
-          
jobExecutionPlanDag.getNodes().get(0).getValue().getJobSpec().getConfigAsProperties().getProperty(ConfigurationKeys.FLOW_EXECUTION_ID_KEY));
-
-      if (flowCompilationTimer.isPresent()) {
-        flowCompilationTimer.get().stop(flowMetadata);
-      }
 
       // If multi-active scheduler is enabled do not pass onto DagManager, 
otherwise scheduler forwards it directly
+      // Skip flow compilation as well, since we recompile after receiving 
event from DagActionStoreChangeMonitor later
       if (flowTriggerHandler.isPresent()) {
         // If triggerTimestampMillis is 0, then it was not set by the job 
trigger handler, and we cannot handle this event
         if (triggerTimestampMillis == 0L) {
@@ -331,38 +271,60 @@ public class Orchestrator implements SpecCatalogListener, 
Instrumentable {
         flowTriggerHandler.get().handleTriggerEvent(jobProps, flowAction, 
triggerTimestampMillis);
         _log.info("Multi-active scheduler finished handling trigger event: 
[{}, triggerEventTimestamp: {}]", flowAction,
             triggerTimestampMillis);
-      } else if (this.dagManager.isPresent()) {
-        submitFlowToDagManager((FlowSpec) spec, jobExecutionPlanDag);
       } else {
-        // Schedule all compiled JobSpecs on their respective Executor
-        for (Dag.DagNode<JobExecutionPlan> dagNode : 
jobExecutionPlanDag.getNodes()) {
-          DagManagerUtils.incrementJobAttempt(dagNode);
-          JobExecutionPlan jobExecutionPlan = dagNode.getValue();
-
-          // Run this spec on selected executor
-          SpecProducer producer = null;
-          try {
-            producer = jobExecutionPlan.getSpecExecutor().getProducer().get();
-            Spec jobSpec = jobExecutionPlan.getJobSpec();
-
-            if (!((JobSpec) 
jobSpec).getConfig().hasPath(ConfigurationKeys.FLOW_EXECUTION_ID_KEY)) {
-              _log.warn("JobSpec does not contain flowExecutionId.");
-            }
-
-            Map<String, String> jobMetadata = 
TimingEventUtils.getJobMetadata(flowMetadata, jobExecutionPlan);
-            _log.info(String.format("Going to orchestrate JobSpec: %s on 
Executor: %s", jobSpec, producer));
-
-            Optional<TimingEvent> jobOrchestrationTimer = 
this.eventSubmitter.transform(submitter ->
-                new TimingEvent(submitter, 
TimingEvent.LauncherTimings.JOB_ORCHESTRATED));
+        // Compile flow spec and do corresponding checks
+        Dag<JobExecutionPlan> jobExecutionPlanDag = 
specCompiler.compileFlow(spec);
+        Optional<TimingEvent> flowCompilationTimer =
+            this.eventSubmitter.transform(submitter -> new 
TimingEvent(submitter, TimingEvent.FlowTimings.FLOW_COMPILED));
+
+        if (jobExecutionPlanDag == null || jobExecutionPlanDag.isEmpty()) {
+          populateFlowCompilationFailedEventMessage(spec, flowMetadata);
+          Instrumented.markMeter(this.flowOrchestrationFailedMeter);
+          conditionallyUpdateFlowGaugeSpecState(spec, CompiledState.FAILED);
+          _log.warn("Cannot determine an executor to run on for Spec: " + 
spec);
+          return;
+        }
+        conditionallyUpdateFlowGaugeSpecState(spec, CompiledState.SUCCESSFUL);
 
-            producer.addSpec(jobSpec);
+        addFlowExecutionIdIfAbsent(flowMetadata, jobExecutionPlanDag);
+        if (flowCompilationTimer.isPresent()) {
+          flowCompilationTimer.get().stop(flowMetadata);
+        }
 
-            if (jobOrchestrationTimer.isPresent()) {
-              jobOrchestrationTimer.get().stop(jobMetadata);
+        // Depending on if DagManager is present, handle execution
+        if (this.dagManager.isPresent()) {
+          submitFlowToDagManager((FlowSpec) spec, jobExecutionPlanDag);
+        } else {
+          // Schedule all compiled JobSpecs on their respective Executor
+          for (Dag.DagNode<JobExecutionPlan> dagNode : 
jobExecutionPlanDag.getNodes()) {
+            DagManagerUtils.incrementJobAttempt(dagNode);
+            JobExecutionPlan jobExecutionPlan = dagNode.getValue();
+
+            // Run this spec on selected executor
+            SpecProducer producer = null;
+            try {
+              producer = 
jobExecutionPlan.getSpecExecutor().getProducer().get();
+              Spec jobSpec = jobExecutionPlan.getJobSpec();
+
+              if (!((JobSpec) 
jobSpec).getConfig().hasPath(ConfigurationKeys.FLOW_EXECUTION_ID_KEY)) {
+                _log.warn("JobSpec does not contain flowExecutionId.");
+              }
+
+              Map<String, String> jobMetadata = 
TimingEventUtils.getJobMetadata(flowMetadata, jobExecutionPlan);
+              _log.info(String.format("Going to orchestrate JobSpec: %s on 
Executor: %s", jobSpec, producer));
+
+              Optional<TimingEvent> jobOrchestrationTimer = 
this.eventSubmitter.transform(
+                  submitter -> new TimingEvent(submitter, 
TimingEvent.LauncherTimings.JOB_ORCHESTRATED));
+
+              producer.addSpec(jobSpec);
+
+              if (jobOrchestrationTimer.isPresent()) {
+                jobOrchestrationTimer.get().stop(jobMetadata);
+              }
+            } catch (Exception e) {
+              _log.error("Cannot successfully setup spec: " + 
jobExecutionPlan.getJobSpec() + " on executor: " + producer
+                  + " for flow: " + spec, e);
             }
-          } catch (Exception e) {
-            _log.error("Cannot successfully setup spec: " + 
jobExecutionPlan.getJobSpec() + " on executor: " + producer
-                + " for flow: " + spec, e);
           }
         }
       }
@@ -374,9 +336,120 @@ public class Orchestrator implements SpecCatalogListener, 
Instrumentable {
     Instrumented.updateTimer(this.flowOrchestrationTimer, System.nanoTime() - 
startTime, TimeUnit.NANOSECONDS);
   }
 
-  public void submitFlowToDagManager(FlowSpec flowSpec)
+  /**
+   * If it is a scheduled flow (and hence, does not have flowExecutionId in 
the FlowSpec) and the flow compilation is
+   * successful, retrieve the flowExecutionId from the JobSpec.
+   */
+  public void addFlowExecutionIdIfAbsent(Map<String,String> flowMetadata, 
Dag<JobExecutionPlan> jobExecutionPlanDag) {
+    
flowMetadata.putIfAbsent(TimingEvent.FlowEventConstants.FLOW_EXECUTION_ID_FIELD,
+        
jobExecutionPlanDag.getNodes().get(0).getValue().getJobSpec().getConfigAsProperties().getProperty(
+            ConfigurationKeys.FLOW_EXECUTION_ID_KEY));
+  }
+
+  /**
+   * Checks if flowSpec disallows concurrent executions, and if so then checks 
if another instance of the flow is
+   * already running and emits a FLOW FAILED event. Otherwise, this check 
passes.
+   * @return true if caller can proceed to execute flow, false otherwise
+   * @throws IOException
+   */
+  public boolean isExecutionPermittedHandler(Config flowConfig, Spec spec, 
String flowName, String flowGroup)
       throws IOException {
-    submitFlowToDagManager(flowSpec, specCompiler.compileFlow(flowSpec));
+    boolean allowConcurrentExecution = ConfigUtils.getBoolean(flowConfig, 
ConfigurationKeys.FLOW_ALLOW_CONCURRENT_EXECUTION, this.flowConcurrencyFlag);
+
+    Dag<JobExecutionPlan> jobExecutionPlanDag = specCompiler.compileFlow(spec);
+
+    if (!isExecutionPermitted(flowName, flowGroup, allowConcurrentExecution)) {
+      _log.warn("Another instance of flowGroup: {}, flowName: {} running; 
Skipping flow execution since "
+          + "concurrent executions are disabled for this flow.", flowGroup, 
flowName);
+      conditionallyUpdateFlowGaugeSpecState(spec, CompiledState.SKIPPED);
+      Instrumented.markMeter(this.skippedFlowsMeter);
+      if (!((FlowSpec) 
spec).getConfigAsProperties().containsKey(ConfigurationKeys.JOB_SCHEDULE_KEY)) {
+        // For ad-hoc flow, we might already increase quota, we need to 
decrease here
+        for (Dag.DagNode dagNode : jobExecutionPlanDag.getStartNodes()) {
+          quotaManager.releaseQuota(dagNode);
+        }
+      }
+
+      // Send FLOW_FAILED event
+      Map<String, String> flowMetadata = 
TimingEventUtils.getFlowMetadata((FlowSpec) spec);
+      flowMetadata.put(TimingEvent.METADATA_MESSAGE, "Flow failed because 
another instance is running and concurrent "
+          + "executions are disabled. Set flow.allowConcurrentExecution to 
true in the flow spec to change this behaviour.");
+      if (this.eventSubmitter.isPresent()) {
+        new TimingEvent(this.eventSubmitter.get(), 
TimingEvent.FlowTimings.FLOW_FAILED).stop(flowMetadata);
+      }
+      return false;
+    }
+    return true;
+  }
+
+
+  /**
+   * Abstraction used to populate the message of and emit a FlowCompileFailed 
event for the Orchestrator.
+   * @param spec
+   * @param flowMetadata
+   */
+  public void populateFlowCompilationFailedEventMessage(Spec spec,
+      Map<String, String> flowMetadata) {
+    // For scheduled flows, we do not insert the flowExecutionId into the 
FlowSpec. As a result, if the flow
+    // compilation fails (i.e. we are unable to find a path), the metadata 
will not have flowExecutionId.
+    // In this case, the current time is used as the flow executionId.
+    
flowMetadata.putIfAbsent(TimingEvent.FlowEventConstants.FLOW_EXECUTION_ID_FIELD,
+        Long.toString(System.currentTimeMillis()));
+
+    String message = "Flow was not compiled successfully.";
+    if (!((FlowSpec) spec).getCompilationErrors().isEmpty()) {
+      message = message + " Compilation errors encountered: " + ((FlowSpec) 
spec).getCompilationErrors();
+    }
+    flowMetadata.put(TimingEvent.METADATA_MESSAGE, message);
+
+    Optional<TimingEvent> flowCompileFailedTimer = 
eventSubmitter.transform(submitter ->
+        new TimingEvent(submitter, 
TimingEvent.FlowTimings.FLOW_COMPILE_FAILED));
+
+    if (flowCompileFailedTimer.isPresent()) {
+      flowCompileFailedTimer.get().stop(flowMetadata);
+    }
+  }
+
+  /**
+   * For a given a flowSpec, verifies that an execution is allowed (in case 
there is an ongoing execution) and the
+   * flowspec can be compiled. If the pre-conditions hold, then a 
JobExecutionPlan is constructed and returned to the
+   * caller.
+   * @return jobExecutionPlan dag if one can be constructed for the given 
flowSpec
+   */
+  public Optional<Dag<JobExecutionPlan>> handleChecksBeforeExecution(FlowSpec 
flowSpec)
+      throws IOException, InterruptedException {
+    Config flowConfig = flowSpec.getConfig();
+    String flowGroup = flowConfig.getString(ConfigurationKeys.FLOW_GROUP_KEY);
+    String flowName = flowConfig.getString(ConfigurationKeys.FLOW_NAME_KEY);
+    if (!isExecutionPermittedHandler(flowConfig, flowSpec, flowName, 
flowGroup)) {
+      return Optional.absent();
+    }
+
+    //Wait for the SpecCompiler to become healthy.
+    this.getSpecCompiler().awaitHealthy();
+
+    Dag<JobExecutionPlan> jobExecutionPlanDag = 
specCompiler.compileFlow(flowSpec);
+    Optional<TimingEvent> flowCompilationTimer =
+        this.eventSubmitter.transform(submitter -> new TimingEvent(submitter, 
TimingEvent.FlowTimings.FLOW_COMPILED));
+    Map<String, String> flowMetadata = 
TimingEventUtils.getFlowMetadata(flowSpec);
+
+    if (jobExecutionPlanDag == null || jobExecutionPlanDag.isEmpty()) {
+      populateFlowCompilationFailedEventMessage(flowSpec, flowMetadata);
+      return Optional.absent();
+    }
+
+    addFlowExecutionIdIfAbsent(flowMetadata, jobExecutionPlanDag);
+    if (flowCompilationTimer.isPresent()) {
+      flowCompilationTimer.get().stop(flowMetadata);
+    }
+    return Optional.of(jobExecutionPlanDag);
+  }
+
+  public void submitFlowToDagManager(FlowSpec flowSpec) throws IOException, 
InterruptedException {
+    Optional<Dag<JobExecutionPlan>> optionalJobExecutionPlanDag = 
handleChecksBeforeExecution(flowSpec);
+    if (optionalJobExecutionPlanDag.isPresent()) {
+      submitFlowToDagManager(flowSpec, optionalJobExecutionPlanDag.get());
+    }
   }
 
   public void submitFlowToDagManager(FlowSpec flowSpec, Dag<JobExecutionPlan> 
jobExecutionPlanDag)
@@ -404,7 +477,7 @@ public class Orchestrator implements SpecCatalogListener, 
Instrumentable {
    * @param allowConcurrentExecution
    * @return true if the {@link FlowSpec} allows concurrent executions or if 
no other instance of the flow is currently RUNNING.
    */
-  private boolean canRun(String flowName, String flowGroup, boolean 
allowConcurrentExecution) {
+  private boolean isExecutionPermitted(String flowName, String flowGroup, 
boolean allowConcurrentExecution) {
     if (allowConcurrentExecution) {
       return true;
     } else {
diff --git 
a/gobblin-service/src/main/java/org/apache/gobblin/service/monitoring/DagActionStoreChangeMonitor.java
 
b/gobblin-service/src/main/java/org/apache/gobblin/service/monitoring/DagActionStoreChangeMonitor.java
index 456851612..870b68f53 100644
--- 
a/gobblin-service/src/main/java/org/apache/gobblin/service/monitoring/DagActionStoreChangeMonitor.java
+++ 
b/gobblin-service/src/main/java/org/apache/gobblin/service/monitoring/DagActionStoreChangeMonitor.java
@@ -190,15 +190,19 @@ public class DagActionStoreChangeMonitor extends 
HighLevelConsumer {
       spec = (FlowSpec) flowCatalog.getSpecs(flowUri);
       this.orchestrator.submitFlowToDagManager(spec);
     } catch (URISyntaxException e) {
-      log.warn("Could not create URI object for flowId {} due to error {}", 
flowId, e.getMessage());
+      log.warn("Could not create URI object for flowId {}. Exception {}", 
flowId, e.getMessage());
       this.unexpectedErrors.mark();
       return;
     } catch (SpecNotFoundException e) {
-      log.warn("Spec not found for flow group: {} name: {} Exception: {}", 
flowGroup, flowName, e);
+      log.warn("Spec not found for flowId {} due to exception {}", flowId, 
e.getMessage());
       this.unexpectedErrors.mark();
       return;
     } catch (IOException e) {
-      log.warn("Failed to add Job Execution Plan for flow group: {} name: {} 
due to error {}", flowGroup, flowName, e);
+      log.warn("Failed to add Job Execution Plan for flowId {} due to 
exception {}", flowId, e.getMessage());
+      this.unexpectedErrors.mark();
+      return;
+    } catch (InterruptedException e) {
+      log.warn("SpecCompiler failed to reach healthy state before compilation 
of flowId {}. Exception: ", flowId, e);
       this.unexpectedErrors.mark();
       return;
     }
diff --git 
a/gobblin-service/src/test/java/org/apache/gobblin/service/modules/orchestration/DagManagerFlowTest.java
 
b/gobblin-service/src/test/java/org/apache/gobblin/service/modules/orchestration/DagManagerFlowTest.java
index e8c4fd443..603be2a17 100644
--- 
a/gobblin-service/src/test/java/org/apache/gobblin/service/modules/orchestration/DagManagerFlowTest.java
+++ 
b/gobblin-service/src/test/java/org/apache/gobblin/service/modules/orchestration/DagManagerFlowTest.java
@@ -25,6 +25,7 @@ import java.util.Properties;
 import java.util.concurrent.TimeUnit;
 import java.util.concurrent.TimeoutException;
 
+import org.apache.gobblin.runtime.spec_catalog.FlowCatalog;
 import org.mockito.Mockito;
 import org.testng.Assert;
 import org.testng.annotations.AfterClass;
@@ -341,7 +342,8 @@ class CancelPredicate implements Predicate<Void> {
 class MockedDagManager extends DagManager {
 
   public MockedDagManager(Config config, boolean instrumentationEnabled) {
-    super(config, createJobStatusRetriever(), instrumentationEnabled);
+    super(config, createJobStatusRetriever(), 
Mockito.mock(Orchestrator.class), Mockito.mock(FlowCatalog.class),
+        instrumentationEnabled);
   }
 
   private static JobStatusRetriever createJobStatusRetriever() {

Reply via email to