[ 
https://issues.apache.org/jira/browse/GOBBLIN-1868?focusedWorklogId=874724&page=com.atlassian.jira.plugin.system.issuetabpanels:worklog-tabpanel#worklog-874724
 ]

ASF GitHub Bot logged work on GOBBLIN-1868:
-------------------------------------------

                Author: ASF GitHub Bot
            Created on: 04/Aug/23 19:51
            Start Date: 04/Aug/23 19:51
    Worklog Time Spent: 10m 
      Work Description: phet commented on code in PR #3731:
URL: https://github.com/apache/gobblin/pull/3731#discussion_r1284711212


##########
gobblin-service/src/main/java/org/apache/gobblin/service/modules/orchestration/DagManager.java:
##########
@@ -240,7 +252,26 @@ public DagManager(Config config, JobStatusRetriever 
jobStatusRetriever, Orchestr
     TimeUnit jobStartTimeUnit = TimeUnit.valueOf(ConfigUtils.getString(config, 
JOB_START_SLA_UNITS, 
ConfigurationKeys.FALLBACK_GOBBLIN_JOB_START_SLA_TIME_UNIT));
     this.defaultJobStartSlaTimeMillis = 
jobStartTimeUnit.toMillis(ConfigUtils.getLong(config, JOB_START_SLA_TIME, 
ConfigurationKeys.FALLBACK_GOBBLIN_JOB_START_SLA_TIME));
     this.jobStatusRetriever = jobStatusRetriever;
-    this.orchestrator = orchestrator;
+    this.aliasResolver = new ClassAliasResolver<>(SpecCompiler.class);
+    this.flowStatusGenerator = flowStatusGenerator;
+    try {
+      String specCompilerClassName = 
ServiceConfigKeys.DEFAULT_GOBBLIN_SERVICE_FLOWCOMPILER_CLASS;
+      if 
(config.hasPath(ServiceConfigKeys.GOBBLIN_SERVICE_FLOWCOMPILER_CLASS_KEY)) {
+        specCompilerClassName = 
config.getString(ServiceConfigKeys.GOBBLIN_SERVICE_FLOWCOMPILER_CLASS_KEY);

Review Comment:
   I prefer the use of `ConfigUtils.getString` below, which takes a default, so 
all this can happen in a single line, w/o needing an `if` block.



##########
gobblin-service/src/main/java/org/apache/gobblin/service/modules/orchestration/DagManager.java:
##########
@@ -240,7 +252,26 @@ public DagManager(Config config, JobStatusRetriever 
jobStatusRetriever, Orchestr
     TimeUnit jobStartTimeUnit = TimeUnit.valueOf(ConfigUtils.getString(config, 
JOB_START_SLA_UNITS, 
ConfigurationKeys.FALLBACK_GOBBLIN_JOB_START_SLA_TIME_UNIT));
     this.defaultJobStartSlaTimeMillis = 
jobStartTimeUnit.toMillis(ConfigUtils.getLong(config, JOB_START_SLA_TIME, 
ConfigurationKeys.FALLBACK_GOBBLIN_JOB_START_SLA_TIME));
     this.jobStatusRetriever = jobStatusRetriever;
-    this.orchestrator = orchestrator;
+    this.aliasResolver = new ClassAliasResolver<>(SpecCompiler.class);
+    this.flowStatusGenerator = flowStatusGenerator;
+    try {
+      String specCompilerClassName = 
ServiceConfigKeys.DEFAULT_GOBBLIN_SERVICE_FLOWCOMPILER_CLASS;
+      if 
(config.hasPath(ServiceConfigKeys.GOBBLIN_SERVICE_FLOWCOMPILER_CLASS_KEY)) {
+        specCompilerClassName = 
config.getString(ServiceConfigKeys.GOBBLIN_SERVICE_FLOWCOMPILER_CLASS_KEY);
+      }
+      log.info("Using specCompiler class name/alias " + specCompilerClassName);
+
+      this.specCompiler = (SpecCompiler) 
ConstructorUtils.invokeConstructor(Class.forName(this.aliasResolver.resolve(specCompilerClassName)),
 config);
+    } catch (NoSuchMethodException | IllegalAccessException | 
InvocationTargetException | InstantiationException |
+             ClassNotFoundException e) {
+      throw new RuntimeException(e);
+    }
+    this.flowConcurrencyFlag = ConfigUtils.getBoolean(config, 
ServiceConfigKeys.FLOW_CONCURRENCY_ALLOWED,
+        ServiceConfigKeys.DEFAULT_FLOW_CONCURRENCY_ALLOWED);

Review Comment:
   I'm unclear what's meant by "flow concurrency".  AFAIR "allow concurrent 
executions" is flow-scoped, rather than system-level, ain't it?
   
   also, the name `flowConcurrencyFlag` is less preferable to precisely stating 
the meaning/sense of the flag, like `isFlowConcurrencyEnabled` or 
`flowConcurrencyDisabled`



##########
gobblin-service/src/main/java/org/apache/gobblin/service/modules/utils/SharedFlowMetricsContainer.java:
##########
@@ -0,0 +1,101 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *    http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.gobblin.service.modules.utils;
+
+import com.codahale.metrics.Meter;
+import com.codahale.metrics.MetricRegistry;
+import com.google.common.base.Optional;
+import com.google.common.collect.Maps;
+import com.typesafe.config.Config;
+import java.util.Map;
+import javax.inject.Inject;
+import javax.inject.Singleton;
+import lombok.Data;
+import lombok.Setter;
+import org.apache.gobblin.configuration.ConfigurationKeys;
+import org.apache.gobblin.instrumented.Instrumented;
+import org.apache.gobblin.metrics.ContextAwareGauge;
+import org.apache.gobblin.metrics.MetricContext;
+import org.apache.gobblin.metrics.RootMetricContext;
+import org.apache.gobblin.metrics.ServiceMetricNames;
+import org.apache.gobblin.runtime.api.Spec;
+import org.apache.gobblin.service.modules.orchestration.Orchestrator;
+import org.apache.gobblin.util.ConfigUtils;
+
+
+/**
+ * Class to store flow related metrics shared between the {@link Orchestrator} 
and {@link DagManager} so we can easily
+ * track all flow compilations and skipped flows handled between the two in a 
common place.
+ */
+@Singleton
+@Data
+public class SharedFlowMetricsContainer {
+  protected final MetricContext metricContext;
+  private Map<String, FlowCompiledState> flowGauges = Maps.newHashMap();
+  private Optional<Meter> skippedFlowsMeter;
+
+  @Inject
+  public SharedFlowMetricsContainer(Config config) {
+    this.metricContext = 
Instrumented.getMetricContext(ConfigUtils.configToState(config), 
SharedFlowMetricsContainer.class);
+    this.skippedFlowsMeter = 
Optional.of(metricContext.contextAwareMeter(ServiceMetricNames.SKIPPED_FLOWS));
+  }
+
+  /**
+   * Adds a new FlowGauge to the metric context if one does not already exist 
for this flow spec
+   */
+  public void addFlowGauge(Spec spec, Config flowConfig, String flowName, 
String flowGroup) {

Review Comment:
   I find widely-employed, non-semantic types like `String` error-prone, 
because particular instances are readily confused w/ one another, such as w/ 
param/arg mis-ordering.  since `flowGroup` is less-specific than `flowName`, my 
habit is to put it first, which appears to be how `MetricRegistry.name` wants 
them as well... and yet they're swapped in this method's params.
   
   let's seek uniform consistency, so a misordering easily jumps out at 
maintainers.  nonetheless, even better, would be to replace the two `String` 
params w/ a single `FlowId`



##########
gobblin-service/src/main/java/org/apache/gobblin/service/modules/utils/ExecutionChecksUtil.java:
##########
@@ -0,0 +1,185 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *    http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.gobblin.service.modules.utils;
+
+import com.google.common.base.Optional;
+import com.typesafe.config.Config;
+import java.io.IOException;
+import java.util.Map;
+import org.apache.gobblin.configuration.ConfigurationKeys;
+import org.apache.gobblin.instrumented.Instrumented;
+import org.apache.gobblin.metrics.event.EventSubmitter;
+import org.apache.gobblin.metrics.event.TimingEvent;
+import org.apache.gobblin.runtime.api.FlowSpec;
+import org.apache.gobblin.runtime.api.Spec;
+import org.apache.gobblin.service.modules.flow.SpecCompiler;
+import org.apache.gobblin.service.modules.flowgraph.Dag;
+import org.apache.gobblin.service.modules.orchestration.TimingEventUtils;
+import org.apache.gobblin.service.modules.orchestration.UserQuotaManager;
+import org.apache.gobblin.service.modules.spec.JobExecutionPlan;
+import org.apache.gobblin.service.monitoring.FlowStatusGenerator;
+import org.apache.gobblin.util.ConfigUtils;
+import org.slf4j.Logger;
+
+
+/**
+ * Stateless class with functionality meant to be re-used between the 
DagManager and Orchestrator when launching
+ * executions of a flow spec. In the common case, the Orchestrator receives a 
flow to orchestrate, performs necessary
+ * validations, and forwards the execution responsibility to the DagManager. 
The DagManager's responsibility is to
+ * carry out any flow action requests. However, with launch executions now 
being stored in the DagActionStateStore, on
+ * restart or leadership change the DagManager has to perform validations 
before executing any launch actions the
+ * previous leader was unable to complete. Rather than duplicating the code or 
introducing a circular dependency between
+ * the DagManager and Orchestrator, this class is utilized to store the common 
functionality. It is stateless and
+ * requires all stateful pieces to be passed as input from the caller.
+ * Note: We expect further refactoring to be done to the DagManager in later 
stage of multi-active development so we do
+ * not attempt major reorganization as abstractions may change.
+ */
+public final class ExecutionChecksUtil {
+
+  /**
+   * For a given a flowSpec, verifies that an execution is allowed (in case 
there is an ongoing execution) and the
+   * flowspec can be compiled. If the pre-conditions hold, then a 
JobExecutionPlan is constructed and returned to the
+   * caller.
+   * @return jobExecutionPlan dag if one can be constructed for the given 
flowSpec
+   */
+  public static Optional<Dag<JobExecutionPlan>> handleChecksBeforeExecution(
+      SharedFlowMetricsContainer sharedFlowMetricsContainer, SpecCompiler 
specCompiler, UserQuotaManager quotaManager,
+      Optional<EventSubmitter> eventSubmitter, FlowStatusGenerator 
flowStatusGenerator, Logger log,
+      boolean flowConcurrencyFlag, FlowSpec flowSpec)
+      throws IOException, InterruptedException {
+    Config flowConfig = flowSpec.getConfig();
+    String flowGroup = flowConfig.getString(ConfigurationKeys.FLOW_GROUP_KEY);
+    String flowName = flowConfig.getString(ConfigurationKeys.FLOW_NAME_KEY);
+    if (!isExecutionPermittedHandler(sharedFlowMetricsContainer, specCompiler, 
quotaManager, eventSubmitter,
+        flowStatusGenerator, log, flowConcurrencyFlag, flowConfig, flowSpec, 
flowName, flowGroup)) {
+      return Optional.absent();
+    }
+
+    //Wait for the SpecCompiler to become healthy.
+    specCompiler.awaitHealthy();
+
+    Dag<JobExecutionPlan> jobExecutionPlanDag = 
specCompiler.compileFlow(flowSpec);
+    Optional<TimingEvent> flowCompilationTimer =
+        eventSubmitter.transform(submitter -> new TimingEvent(submitter, 
TimingEvent.FlowTimings.FLOW_COMPILED));
+    Map<String, String> flowMetadata = 
TimingEventUtils.getFlowMetadata(flowSpec);
+
+    if (jobExecutionPlanDag == null || jobExecutionPlanDag.isEmpty()) {
+      populateFlowCompilationFailedEventMessage(eventSubmitter, flowSpec, 
flowMetadata);
+      return Optional.absent();
+    }
+
+    addFlowExecutionIdIfAbsent(flowMetadata, jobExecutionPlanDag);
+    if (flowCompilationTimer.isPresent()) {
+      flowCompilationTimer.get().stop(flowMetadata);
+    }
+    return Optional.of(jobExecutionPlanDag);
+  }
+
+  /**
+   * Checks if flowSpec disallows concurrent executions, and if so then checks 
if another instance of the flow is
+   * already running and emits a FLOW FAILED event. Otherwise, this check 
passes.
+   * @return true if caller can proceed to execute flow, false otherwise
+   * @throws IOException
+   */
+  public static boolean isExecutionPermittedHandler(SharedFlowMetricsContainer 
sharedFlowMetricsContainer,
+      SpecCompiler specCompiler, UserQuotaManager quotaManager, 
Optional<EventSubmitter> eventSubmitter,
+      FlowStatusGenerator flowStatusGenerator, Logger log, boolean 
flowConcurrencyFlag, Config flowConfig, Spec spec,
+      String flowName, String flowGroup) throws IOException {

Review Comment:
   I commented elsewhere about lack of uniform ordering between `flowName` and 
`flowGroup` params... as well as way to sidestep via `FlowId` type



##########
gobblin-service/src/main/java/org/apache/gobblin/service/modules/utils/ExecutionChecksUtil.java:
##########
@@ -0,0 +1,185 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *    http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.gobblin.service.modules.utils;
+
+import com.google.common.base.Optional;
+import com.typesafe.config.Config;
+import java.io.IOException;
+import java.util.Map;
+import org.apache.gobblin.configuration.ConfigurationKeys;
+import org.apache.gobblin.instrumented.Instrumented;
+import org.apache.gobblin.metrics.event.EventSubmitter;
+import org.apache.gobblin.metrics.event.TimingEvent;
+import org.apache.gobblin.runtime.api.FlowSpec;
+import org.apache.gobblin.runtime.api.Spec;
+import org.apache.gobblin.service.modules.flow.SpecCompiler;
+import org.apache.gobblin.service.modules.flowgraph.Dag;
+import org.apache.gobblin.service.modules.orchestration.TimingEventUtils;
+import org.apache.gobblin.service.modules.orchestration.UserQuotaManager;
+import org.apache.gobblin.service.modules.spec.JobExecutionPlan;
+import org.apache.gobblin.service.monitoring.FlowStatusGenerator;
+import org.apache.gobblin.util.ConfigUtils;
+import org.slf4j.Logger;
+
+
+/**
+ * Stateless class with functionality meant to be re-used between the 
DagManager and Orchestrator when launching
+ * executions of a flow spec. In the common case, the Orchestrator receives a 
flow to orchestrate, performs necessary
+ * validations, and forwards the execution responsibility to the DagManager. 
The DagManager's responsibility is to
+ * carry out any flow action requests. However, with launch executions now 
being stored in the DagActionStateStore, on
+ * restart or leadership change the DagManager has to perform validations 
before executing any launch actions the
+ * previous leader was unable to complete. Rather than duplicating the code or 
introducing a circular dependency between
+ * the DagManager and Orchestrator, this class is utilized to store the common 
functionality. It is stateless and
+ * requires all stateful pieces to be passed as input from the caller.
+ * Note: We expect further refactoring to be done to the DagManager in later 
stage of multi-active development so we do
+ * not attempt major reorganization as abstractions may change.
+ */
+public final class ExecutionChecksUtil {
+
+  /**
+   * For a given a flowSpec, verifies that an execution is allowed (in case 
there is an ongoing execution) and the
+   * flowspec can be compiled. If the pre-conditions hold, then a 
JobExecutionPlan is constructed and returned to the
+   * caller.
+   * @return jobExecutionPlan dag if one can be constructed for the given 
flowSpec
+   */
+  public static Optional<Dag<JobExecutionPlan>> handleChecksBeforeExecution(
+      SharedFlowMetricsContainer sharedFlowMetricsContainer, SpecCompiler 
specCompiler, UserQuotaManager quotaManager,
+      Optional<EventSubmitter> eventSubmitter, FlowStatusGenerator 
flowStatusGenerator, Logger log,
+      boolean flowConcurrencyFlag, FlowSpec flowSpec)
+      throws IOException, InterruptedException {
+    Config flowConfig = flowSpec.getConfig();
+    String flowGroup = flowConfig.getString(ConfigurationKeys.FLOW_GROUP_KEY);
+    String flowName = flowConfig.getString(ConfigurationKeys.FLOW_NAME_KEY);
+    if (!isExecutionPermittedHandler(sharedFlowMetricsContainer, specCompiler, 
quotaManager, eventSubmitter,
+        flowStatusGenerator, log, flowConcurrencyFlag, flowConfig, flowSpec, 
flowName, flowGroup)) {
+      return Optional.absent();
+    }
+
+    //Wait for the SpecCompiler to become healthy.
+    specCompiler.awaitHealthy();
+
+    Dag<JobExecutionPlan> jobExecutionPlanDag = 
specCompiler.compileFlow(flowSpec);
+    Optional<TimingEvent> flowCompilationTimer =
+        eventSubmitter.transform(submitter -> new TimingEvent(submitter, 
TimingEvent.FlowTimings.FLOW_COMPILED));
+    Map<String, String> flowMetadata = 
TimingEventUtils.getFlowMetadata(flowSpec);
+
+    if (jobExecutionPlanDag == null || jobExecutionPlanDag.isEmpty()) {
+      populateFlowCompilationFailedEventMessage(eventSubmitter, flowSpec, 
flowMetadata);
+      return Optional.absent();
+    }
+
+    addFlowExecutionIdIfAbsent(flowMetadata, jobExecutionPlanDag);
+    if (flowCompilationTimer.isPresent()) {
+      flowCompilationTimer.get().stop(flowMetadata);
+    }
+    return Optional.of(jobExecutionPlanDag);
+  }
+
+  /**
+   * Checks if flowSpec disallows concurrent executions, and if so then checks 
if another instance of the flow is
+   * already running and emits a FLOW FAILED event. Otherwise, this check 
passes.
+   * @return true if caller can proceed to execute flow, false otherwise
+   * @throws IOException
+   */
+  public static boolean isExecutionPermittedHandler(SharedFlowMetricsContainer 
sharedFlowMetricsContainer,
+      SpecCompiler specCompiler, UserQuotaManager quotaManager, 
Optional<EventSubmitter> eventSubmitter,
+      FlowStatusGenerator flowStatusGenerator, Logger log, boolean 
flowConcurrencyFlag, Config flowConfig, Spec spec,
+      String flowName, String flowGroup) throws IOException {
+    boolean allowConcurrentExecution = ConfigUtils.getBoolean(flowConfig,
+        ConfigurationKeys.FLOW_ALLOW_CONCURRENT_EXECUTION, 
flowConcurrencyFlag);
+
+    Dag<JobExecutionPlan> jobExecutionPlanDag = specCompiler.compileFlow(spec);
+
+    if (!isExecutionPermitted(flowStatusGenerator, flowName, flowGroup, 
allowConcurrentExecution)) {
+      log.warn("Another instance of flowGroup: {}, flowName: {} running; 
Skipping flow execution since "
+          + "concurrent executions are disabled for this flow.", flowGroup, 
flowName);
+      sharedFlowMetricsContainer.conditionallyUpdateFlowGaugeSpecState(spec, 
SharedFlowMetricsContainer.CompiledState.SKIPPED);
+      
Instrumented.markMeter(sharedFlowMetricsContainer.getSkippedFlowsMeter());
+      if (!((FlowSpec) 
spec).getConfigAsProperties().containsKey(ConfigurationKeys.JOB_SCHEDULE_KEY)) {
+        // For ad-hoc flow, we might already increase quota, we need to 
decrease here
+        for (Dag.DagNode dagNode : jobExecutionPlanDag.getStartNodes()) {
+          quotaManager.releaseQuota(dagNode);
+        }
+      }
+
+      // Send FLOW_FAILED event
+      Map<String, String> flowMetadata = 
TimingEventUtils.getFlowMetadata((FlowSpec) spec);
+      flowMetadata.put(TimingEvent.METADATA_MESSAGE, "Flow failed because 
another instance is running and concurrent "
+          + "executions are disabled. Set flow.allowConcurrentExecution to 
true in the flow spec to change this behaviour.");
+      if (eventSubmitter.isPresent()) {
+        new TimingEvent(eventSubmitter.get(), 
TimingEvent.FlowTimings.FLOW_FAILED).stop(flowMetadata);
+      }
+      return false;
+    }
+    return true;

Review Comment:
   not sure where we are in the nesting, so that I've got this right... *but 
that's the problem I'm suggesting to solve here!* ;)
   
   how about:
   ```
   if (isExecPermitted(...)) {
     return true;
   } else {
     ... // all the many other lines of code
   }
   ```
   ?



##########
gobblin-service/src/main/java/org/apache/gobblin/service/modules/utils/ExecutionChecksUtil.java:
##########
@@ -0,0 +1,185 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *    http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.gobblin.service.modules.utils;
+
+import com.google.common.base.Optional;
+import com.typesafe.config.Config;
+import java.io.IOException;
+import java.util.Map;
+import org.apache.gobblin.configuration.ConfigurationKeys;
+import org.apache.gobblin.instrumented.Instrumented;
+import org.apache.gobblin.metrics.event.EventSubmitter;
+import org.apache.gobblin.metrics.event.TimingEvent;
+import org.apache.gobblin.runtime.api.FlowSpec;
+import org.apache.gobblin.runtime.api.Spec;
+import org.apache.gobblin.service.modules.flow.SpecCompiler;
+import org.apache.gobblin.service.modules.flowgraph.Dag;
+import org.apache.gobblin.service.modules.orchestration.TimingEventUtils;
+import org.apache.gobblin.service.modules.orchestration.UserQuotaManager;
+import org.apache.gobblin.service.modules.spec.JobExecutionPlan;
+import org.apache.gobblin.service.monitoring.FlowStatusGenerator;
+import org.apache.gobblin.util.ConfigUtils;
+import org.slf4j.Logger;
+
+
+/**
+ * Stateless class with functionality meant to be re-used between the 
DagManager and Orchestrator when launching
+ * executions of a flow spec. In the common case, the Orchestrator receives a 
flow to orchestrate, performs necessary
+ * validations, and forwards the execution responsibility to the DagManager. 
The DagManager's responsibility is to
+ * carry out any flow action requests. However, with launch executions now 
being stored in the DagActionStateStore, on
+ * restart or leadership change the DagManager has to perform validations 
before executing any launch actions the
+ * previous leader was unable to complete. Rather than duplicating the code or 
introducing a circular dependency between
+ * the DagManager and Orchestrator, this class is utilized to store the common 
functionality. It is stateless and
+ * requires all stateful pieces to be passed as input from the caller.
+ * Note: We expect further refactoring to be done to the DagManager in later 
stage of multi-active development so we do
+ * not attempt major reorganization as abstractions may change.
+ */
+public final class ExecutionChecksUtil {
+
+  /**
+   * For a given a flowSpec, verifies that an execution is allowed (in case 
there is an ongoing execution) and the
+   * flowspec can be compiled. If the pre-conditions hold, then a 
JobExecutionPlan is constructed and returned to the
+   * caller.
+   * @return jobExecutionPlan dag if one can be constructed for the given 
flowSpec
+   */
+  public static Optional<Dag<JobExecutionPlan>> handleChecksBeforeExecution(

Review Comment:
   perhaps more specific... `compileToExecutionPlan`?  
`createExecutionPlanIfValid`?



##########
gobblin-service/src/main/java/org/apache/gobblin/service/modules/orchestration/DagManager.java:
##########
@@ -206,7 +212,12 @@ public String toString() {
   protected final Long defaultJobStartSlaTimeMillis;
   @Getter
   private final JobStatusRetriever jobStatusRetriever;
-  private final Orchestrator orchestrator;
+  private final FlowStatusGenerator flowStatusGenerator;
+  private final UserQuotaManager quotaManager;
+  private final ClassAliasResolver<SpecCompiler> aliasResolver;

Review Comment:
   is this used outside the ctor?  if not, why an instance member?



##########
gobblin-service/src/main/java/org/apache/gobblin/service/modules/utils/SharedFlowMetricsContainer.java:
##########
@@ -0,0 +1,101 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *    http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.gobblin.service.modules.utils;
+
+import com.codahale.metrics.Meter;
+import com.codahale.metrics.MetricRegistry;
+import com.google.common.base.Optional;
+import com.google.common.collect.Maps;
+import com.typesafe.config.Config;
+import java.util.Map;
+import javax.inject.Inject;
+import javax.inject.Singleton;
+import lombok.Data;
+import lombok.Setter;
+import org.apache.gobblin.configuration.ConfigurationKeys;
+import org.apache.gobblin.instrumented.Instrumented;
+import org.apache.gobblin.metrics.ContextAwareGauge;
+import org.apache.gobblin.metrics.MetricContext;
+import org.apache.gobblin.metrics.RootMetricContext;
+import org.apache.gobblin.metrics.ServiceMetricNames;
+import org.apache.gobblin.runtime.api.Spec;
+import org.apache.gobblin.service.modules.orchestration.Orchestrator;
+import org.apache.gobblin.util.ConfigUtils;
+
+
+/**
+ * Class to store flow related metrics shared between the {@link Orchestrator} 
and {@link DagManager} so we can easily
+ * track all flow compilations and skipped flows handled between the two in a 
common place.
+ */
+@Singleton
+@Data
+public class SharedFlowMetricsContainer {

Review Comment:
   naming-wise: "container" has very specific meaning: an exec env that hosts 
user-supplied code.  e.g. "my app runs in a container," or "container 
deployment failed."   more rarely, it's a synonym for "collection", especially 
the abstract concept.  e.g. "Map, List, and PriorityQueue are Java container 
types."
   
   this class, looks like simply `SharedFlowMetrics` or the 
`SharedFlowMetricsSingleton` or a `SharedFlowMetricsTracker`...



##########
gobblin-service/src/main/java/org/apache/gobblin/service/modules/orchestration/DagManager.java:
##########
@@ -240,7 +252,26 @@ public DagManager(Config config, JobStatusRetriever 
jobStatusRetriever, Orchestr
     TimeUnit jobStartTimeUnit = TimeUnit.valueOf(ConfigUtils.getString(config, 
JOB_START_SLA_UNITS, 
ConfigurationKeys.FALLBACK_GOBBLIN_JOB_START_SLA_TIME_UNIT));
     this.defaultJobStartSlaTimeMillis = 
jobStartTimeUnit.toMillis(ConfigUtils.getLong(config, JOB_START_SLA_TIME, 
ConfigurationKeys.FALLBACK_GOBBLIN_JOB_START_SLA_TIME));
     this.jobStatusRetriever = jobStatusRetriever;
-    this.orchestrator = orchestrator;
+    this.aliasResolver = new ClassAliasResolver<>(SpecCompiler.class);
+    this.flowStatusGenerator = flowStatusGenerator;
+    try {
+      String specCompilerClassName = 
ServiceConfigKeys.DEFAULT_GOBBLIN_SERVICE_FLOWCOMPILER_CLASS;
+      if 
(config.hasPath(ServiceConfigKeys.GOBBLIN_SERVICE_FLOWCOMPILER_CLASS_KEY)) {
+        specCompilerClassName = 
config.getString(ServiceConfigKeys.GOBBLIN_SERVICE_FLOWCOMPILER_CLASS_KEY);
+      }
+      log.info("Using specCompiler class name/alias " + specCompilerClassName);
+
+      this.specCompiler = (SpecCompiler) 
ConstructorUtils.invokeConstructor(Class.forName(this.aliasResolver.resolve(specCompilerClassName)),
 config);
+    } catch (NoSuchMethodException | IllegalAccessException | 
InvocationTargetException | InstantiationException |
+             ClassNotFoundException e) {
+      throw new RuntimeException(e);
+    }

Review Comment:
   this ctor is 46 lines long now, with 13 involving initializing this 
`SpecCompiler`.  let's streamline and minimize the distraction by abstracting 
it into a one-line method call, like:
   ```
   this.specCompiler = createSpecCompiler(config);
   ```
   
   alternatively, is there a way to use the `GobblinConstructorUtils`, like you 
did below for the `UserQuotaManager`?  e.g. - 
https://github.com/apache/gobblin/blob/2917b6311949d25f898a6691fe0730d344299d25/gobblin-utility/src/main/java/org/apache/gobblin/util/reflection/GobblinConstructorUtils.java#L110



##########
gobblin-service/src/main/java/org/apache/gobblin/service/modules/utils/SharedFlowMetricsContainer.java:
##########
@@ -0,0 +1,101 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *    http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.gobblin.service.modules.utils;
+
+import com.codahale.metrics.Meter;
+import com.codahale.metrics.MetricRegistry;
+import com.google.common.base.Optional;
+import com.google.common.collect.Maps;
+import com.typesafe.config.Config;
+import java.util.Map;
+import javax.inject.Inject;
+import javax.inject.Singleton;
+import lombok.Data;
+import lombok.Setter;
+import org.apache.gobblin.configuration.ConfigurationKeys;
+import org.apache.gobblin.instrumented.Instrumented;
+import org.apache.gobblin.metrics.ContextAwareGauge;
+import org.apache.gobblin.metrics.MetricContext;
+import org.apache.gobblin.metrics.RootMetricContext;
+import org.apache.gobblin.metrics.ServiceMetricNames;
+import org.apache.gobblin.runtime.api.Spec;
+import org.apache.gobblin.service.modules.orchestration.Orchestrator;
+import org.apache.gobblin.util.ConfigUtils;
+
+
+/**
+ * Class to store flow related metrics shared between the {@link Orchestrator} 
and {@link DagManager} so we can easily
+ * track all flow compilations and skipped flows handled between the two in a 
common place.
+ */
+@Singleton
+@Data
+public class SharedFlowMetricsContainer {
+  protected final MetricContext metricContext;
+  private Map<String, FlowCompiledState> flowGauges = Maps.newHashMap();

Review Comment:
   everywhere below, I see `spec.getUri().toString()`... can't we simply use a 
URI as this map's key, as doing so additionally carries richer semantics
   
   also: up to you, but my naming convention here would be 
`flowGaugeStateBySpecUri`



##########
gobblin-service/src/main/java/org/apache/gobblin/service/modules/utils/ExecutionChecksUtil.java:
##########
@@ -0,0 +1,185 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *    http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.gobblin.service.modules.utils;
+
+import com.google.common.base.Optional;
+import com.typesafe.config.Config;
+import java.io.IOException;
+import java.util.Map;
+import org.apache.gobblin.configuration.ConfigurationKeys;
+import org.apache.gobblin.instrumented.Instrumented;
+import org.apache.gobblin.metrics.event.EventSubmitter;
+import org.apache.gobblin.metrics.event.TimingEvent;
+import org.apache.gobblin.runtime.api.FlowSpec;
+import org.apache.gobblin.runtime.api.Spec;
+import org.apache.gobblin.service.modules.flow.SpecCompiler;
+import org.apache.gobblin.service.modules.flowgraph.Dag;
+import org.apache.gobblin.service.modules.orchestration.TimingEventUtils;
+import org.apache.gobblin.service.modules.orchestration.UserQuotaManager;
+import org.apache.gobblin.service.modules.spec.JobExecutionPlan;
+import org.apache.gobblin.service.monitoring.FlowStatusGenerator;
+import org.apache.gobblin.util.ConfigUtils;
+import org.slf4j.Logger;
+
+
+/**
+ * Stateless class with functionality meant to be re-used between the 
DagManager and Orchestrator when launching
+ * executions of a flow spec. In the common case, the Orchestrator receives a 
flow to orchestrate, performs necessary
+ * validations, and forwards the execution responsibility to the DagManager. 
The DagManager's responsibility is to
+ * carry out any flow action requests. However, with launch executions now 
being stored in the DagActionStateStore, on
+ * restart or leadership change the DagManager has to perform validations 
before executing any launch actions the
+ * previous leader was unable to complete. Rather than duplicating the code or 
introducing a circular dependency between
+ * the DagManager and Orchestrator, this class is utilized to store the common 
functionality. It is stateless and
+ * requires all stateful pieces to be passed as input from the caller.
+ * Note: We expect further refactoring to be done to the DagManager in later 
stage of multi-active development so we do
+ * not attempt major reorganization as abstractions may change.
+ */
+public final class ExecutionChecksUtil {
+
+  /**
+   * For a given a flowSpec, verifies that an execution is allowed (in case 
there is an ongoing execution) and the
+   * flowspec can be compiled. If the pre-conditions hold, then a 
JobExecutionPlan is constructed and returned to the
+   * caller.
+   * @return jobExecutionPlan dag if one can be constructed for the given 
flowSpec
+   */
+  public static Optional<Dag<JobExecutionPlan>> handleChecksBeforeExecution(
+      SharedFlowMetricsContainer sharedFlowMetricsContainer, SpecCompiler 
specCompiler, UserQuotaManager quotaManager,
+      Optional<EventSubmitter> eventSubmitter, FlowStatusGenerator 
flowStatusGenerator, Logger log,
+      boolean flowConcurrencyFlag, FlowSpec flowSpec)
+      throws IOException, InterruptedException {
+    Config flowConfig = flowSpec.getConfig();
+    String flowGroup = flowConfig.getString(ConfigurationKeys.FLOW_GROUP_KEY);
+    String flowName = flowConfig.getString(ConfigurationKeys.FLOW_NAME_KEY);
+    if (!isExecutionPermittedHandler(sharedFlowMetricsContainer, specCompiler, 
quotaManager, eventSubmitter,
+        flowStatusGenerator, log, flowConcurrencyFlag, flowConfig, flowSpec, 
flowName, flowGroup)) {
+      return Optional.absent();
+    }
+
+    //Wait for the SpecCompiler to become healthy.
+    specCompiler.awaitHealthy();
+
+    Dag<JobExecutionPlan> jobExecutionPlanDag = 
specCompiler.compileFlow(flowSpec);
+    Optional<TimingEvent> flowCompilationTimer =
+        eventSubmitter.transform(submitter -> new TimingEvent(submitter, 
TimingEvent.FlowTimings.FLOW_COMPILED));
+    Map<String, String> flowMetadata = 
TimingEventUtils.getFlowMetadata(flowSpec);
+
+    if (jobExecutionPlanDag == null || jobExecutionPlanDag.isEmpty()) {
+      populateFlowCompilationFailedEventMessage(eventSubmitter, flowSpec, 
flowMetadata);
+      return Optional.absent();
+    }
+
+    addFlowExecutionIdIfAbsent(flowMetadata, jobExecutionPlanDag);
+    if (flowCompilationTimer.isPresent()) {
+      flowCompilationTimer.get().stop(flowMetadata);
+    }
+    return Optional.of(jobExecutionPlanDag);
+  }
+
+  /**
+   * Checks if flowSpec disallows concurrent executions, and if so then checks 
if another instance of the flow is
+   * already running and emits a FLOW FAILED event. Otherwise, this check 
passes.
+   * @return true if caller can proceed to execute flow, false otherwise
+   * @throws IOException
+   */
+  public static boolean isExecutionPermittedHandler(SharedFlowMetricsContainer 
sharedFlowMetricsContainer,
+      SpecCompiler specCompiler, UserQuotaManager quotaManager, 
Optional<EventSubmitter> eventSubmitter,
+      FlowStatusGenerator flowStatusGenerator, Logger log, boolean 
flowConcurrencyFlag, Config flowConfig, Spec spec,
+      String flowName, String flowGroup) throws IOException {
+    boolean allowConcurrentExecution = ConfigUtils.getBoolean(flowConfig,
+        ConfigurationKeys.FLOW_ALLOW_CONCURRENT_EXECUTION, 
flowConcurrencyFlag);
+
+    Dag<JobExecutionPlan> jobExecutionPlanDag = specCompiler.compileFlow(spec);
+
+    if (!isExecutionPermitted(flowStatusGenerator, flowName, flowGroup, 
allowConcurrentExecution)) {
+      log.warn("Another instance of flowGroup: {}, flowName: {} running; 
Skipping flow execution since "
+          + "concurrent executions are disabled for this flow.", flowGroup, 
flowName);
+      sharedFlowMetricsContainer.conditionallyUpdateFlowGaugeSpecState(spec, 
SharedFlowMetricsContainer.CompiledState.SKIPPED);
+      
Instrumented.markMeter(sharedFlowMetricsContainer.getSkippedFlowsMeter());
+      if (!((FlowSpec) 
spec).getConfigAsProperties().containsKey(ConfigurationKeys.JOB_SCHEDULE_KEY)) {

Review Comment:
   NBD... but I do imagine it being clearer to read:
   ```
   if (isScheduledFlow(spec)) ...
   ```



##########
gobblin-service/src/main/java/org/apache/gobblin/service/modules/utils/ExecutionChecksUtil.java:
##########
@@ -0,0 +1,185 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *    http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.gobblin.service.modules.utils;
+
+import com.google.common.base.Optional;
+import com.typesafe.config.Config;
+import java.io.IOException;
+import java.util.Map;
+import org.apache.gobblin.configuration.ConfigurationKeys;
+import org.apache.gobblin.instrumented.Instrumented;
+import org.apache.gobblin.metrics.event.EventSubmitter;
+import org.apache.gobblin.metrics.event.TimingEvent;
+import org.apache.gobblin.runtime.api.FlowSpec;
+import org.apache.gobblin.runtime.api.Spec;
+import org.apache.gobblin.service.modules.flow.SpecCompiler;
+import org.apache.gobblin.service.modules.flowgraph.Dag;
+import org.apache.gobblin.service.modules.orchestration.TimingEventUtils;
+import org.apache.gobblin.service.modules.orchestration.UserQuotaManager;
+import org.apache.gobblin.service.modules.spec.JobExecutionPlan;
+import org.apache.gobblin.service.monitoring.FlowStatusGenerator;
+import org.apache.gobblin.util.ConfigUtils;
+import org.slf4j.Logger;
+
+
+/**
+ * Stateless class with functionality meant to be re-used between the 
DagManager and Orchestrator when launching
+ * executions of a flow spec. In the common case, the Orchestrator receives a 
flow to orchestrate, performs necessary
+ * validations, and forwards the execution responsibility to the DagManager. 
The DagManager's responsibility is to
+ * carry out any flow action requests. However, with launch executions now 
being stored in the DagActionStateStore, on
+ * restart or leadership change the DagManager has to perform validations 
before executing any launch actions the
+ * previous leader was unable to complete. Rather than duplicating the code or 
introducing a circular dependency between
+ * the DagManager and Orchestrator, this class is utilized to store the common 
functionality. It is stateless and
+ * requires all stateful pieces to be passed as input from the caller.
+ * Note: We expect further refactoring to be done to the DagManager in later 
stage of multi-active development so we do
+ * not attempt major reorganization as abstractions may change.
+ */
+public final class ExecutionChecksUtil {
+
+  /**
+   * For a given a flowSpec, verifies that an execution is allowed (in case 
there is an ongoing execution) and the
+   * flowspec can be compiled. If the pre-conditions hold, then a 
JobExecutionPlan is constructed and returned to the
+   * caller.
+   * @return jobExecutionPlan dag if one can be constructed for the given 
flowSpec
+   */
+  public static Optional<Dag<JobExecutionPlan>> handleChecksBeforeExecution(
+      SharedFlowMetricsContainer sharedFlowMetricsContainer, SpecCompiler 
specCompiler, UserQuotaManager quotaManager,
+      Optional<EventSubmitter> eventSubmitter, FlowStatusGenerator 
flowStatusGenerator, Logger log,
+      boolean flowConcurrencyFlag, FlowSpec flowSpec)
+      throws IOException, InterruptedException {
+    Config flowConfig = flowSpec.getConfig();
+    String flowGroup = flowConfig.getString(ConfigurationKeys.FLOW_GROUP_KEY);
+    String flowName = flowConfig.getString(ConfigurationKeys.FLOW_NAME_KEY);
+    if (!isExecutionPermittedHandler(sharedFlowMetricsContainer, specCompiler, 
quotaManager, eventSubmitter,
+        flowStatusGenerator, log, flowConcurrencyFlag, flowConfig, flowSpec, 
flowName, flowGroup)) {
+      return Optional.absent();
+    }
+
+    //Wait for the SpecCompiler to become healthy.
+    specCompiler.awaitHealthy();
+
+    Dag<JobExecutionPlan> jobExecutionPlanDag = 
specCompiler.compileFlow(flowSpec);
+    Optional<TimingEvent> flowCompilationTimer =
+        eventSubmitter.transform(submitter -> new TimingEvent(submitter, 
TimingEvent.FlowTimings.FLOW_COMPILED));
+    Map<String, String> flowMetadata = 
TimingEventUtils.getFlowMetadata(flowSpec);
+
+    if (jobExecutionPlanDag == null || jobExecutionPlanDag.isEmpty()) {
+      populateFlowCompilationFailedEventMessage(eventSubmitter, flowSpec, 
flowMetadata);
+      return Optional.absent();
+    }
+
+    addFlowExecutionIdIfAbsent(flowMetadata, jobExecutionPlanDag);
+    if (flowCompilationTimer.isPresent()) {
+      flowCompilationTimer.get().stop(flowMetadata);
+    }
+    return Optional.of(jobExecutionPlanDag);
+  }
+
+  /**
+   * Checks if flowSpec disallows concurrent executions, and if so then checks 
if another instance of the flow is
+   * already running and emits a FLOW FAILED event. Otherwise, this check 
passes.
+   * @return true if caller can proceed to execute flow, false otherwise
+   * @throws IOException
+   */
+  public static boolean isExecutionPermittedHandler(SharedFlowMetricsContainer 
sharedFlowMetricsContainer,
+      SpecCompiler specCompiler, UserQuotaManager quotaManager, 
Optional<EventSubmitter> eventSubmitter,
+      FlowStatusGenerator flowStatusGenerator, Logger log, boolean 
flowConcurrencyFlag, Config flowConfig, Spec spec,
+      String flowName, String flowGroup) throws IOException {
+    boolean allowConcurrentExecution = ConfigUtils.getBoolean(flowConfig,
+        ConfigurationKeys.FLOW_ALLOW_CONCURRENT_EXECUTION, 
flowConcurrencyFlag);
+
+    Dag<JobExecutionPlan> jobExecutionPlanDag = specCompiler.compileFlow(spec);
+
+    if (!isExecutionPermitted(flowStatusGenerator, flowName, flowGroup, 
allowConcurrentExecution)) {
+      log.warn("Another instance of flowGroup: {}, flowName: {} running; 
Skipping flow execution since "
+          + "concurrent executions are disabled for this flow.", flowGroup, 
flowName);

Review Comment:
   (I notice we log them as group, then name--so that may be what maintainers 
become most familiar w/)



##########
gobblin-service/src/main/java/org/apache/gobblin/service/modules/utils/ExecutionChecksUtil.java:
##########
@@ -0,0 +1,185 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *    http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.gobblin.service.modules.utils;
+
+import com.google.common.base.Optional;
+import com.typesafe.config.Config;
+import java.io.IOException;
+import java.util.Map;
+import org.apache.gobblin.configuration.ConfigurationKeys;
+import org.apache.gobblin.instrumented.Instrumented;
+import org.apache.gobblin.metrics.event.EventSubmitter;
+import org.apache.gobblin.metrics.event.TimingEvent;
+import org.apache.gobblin.runtime.api.FlowSpec;
+import org.apache.gobblin.runtime.api.Spec;
+import org.apache.gobblin.service.modules.flow.SpecCompiler;
+import org.apache.gobblin.service.modules.flowgraph.Dag;
+import org.apache.gobblin.service.modules.orchestration.TimingEventUtils;
+import org.apache.gobblin.service.modules.orchestration.UserQuotaManager;
+import org.apache.gobblin.service.modules.spec.JobExecutionPlan;
+import org.apache.gobblin.service.monitoring.FlowStatusGenerator;
+import org.apache.gobblin.util.ConfigUtils;
+import org.slf4j.Logger;
+
+
+/**
+ * Stateless class with functionality meant to be re-used between the 
DagManager and Orchestrator when launching
+ * executions of a flow spec. In the common case, the Orchestrator receives a 
flow to orchestrate, performs necessary
+ * validations, and forwards the execution responsibility to the DagManager. 
The DagManager's responsibility is to
+ * carry out any flow action requests. However, with launch executions now 
being stored in the DagActionStateStore, on
+ * restart or leadership change the DagManager has to perform validations 
before executing any launch actions the
+ * previous leader was unable to complete. Rather than duplicating the code or 
introducing a circular dependency between
+ * the DagManager and Orchestrator, this class is utilized to store the common 
functionality. It is stateless and
+ * requires all stateful pieces to be passed as input from the caller.
+ * Note: We expect further refactoring to be done to the DagManager in later 
stage of multi-active development so we do
+ * not attempt major reorganization as abstractions may change.
+ */
+public final class ExecutionChecksUtil {
+
+  /**
+   * For a given a flowSpec, verifies that an execution is allowed (in case 
there is an ongoing execution) and the
+   * flowspec can be compiled. If the pre-conditions hold, then a 
JobExecutionPlan is constructed and returned to the
+   * caller.
+   * @return jobExecutionPlan dag if one can be constructed for the given 
flowSpec
+   */
+  public static Optional<Dag<JobExecutionPlan>> handleChecksBeforeExecution(
+      SharedFlowMetricsContainer sharedFlowMetricsContainer, SpecCompiler 
specCompiler, UserQuotaManager quotaManager,
+      Optional<EventSubmitter> eventSubmitter, FlowStatusGenerator 
flowStatusGenerator, Logger log,
+      boolean flowConcurrencyFlag, FlowSpec flowSpec)
+      throws IOException, InterruptedException {
+    Config flowConfig = flowSpec.getConfig();
+    String flowGroup = flowConfig.getString(ConfigurationKeys.FLOW_GROUP_KEY);
+    String flowName = flowConfig.getString(ConfigurationKeys.FLOW_NAME_KEY);
+    if (!isExecutionPermittedHandler(sharedFlowMetricsContainer, specCompiler, 
quotaManager, eventSubmitter,
+        flowStatusGenerator, log, flowConcurrencyFlag, flowConfig, flowSpec, 
flowName, flowGroup)) {
+      return Optional.absent();
+    }
+
+    //Wait for the SpecCompiler to become healthy.
+    specCompiler.awaitHealthy();
+
+    Dag<JobExecutionPlan> jobExecutionPlanDag = 
specCompiler.compileFlow(flowSpec);
+    Optional<TimingEvent> flowCompilationTimer =
+        eventSubmitter.transform(submitter -> new TimingEvent(submitter, 
TimingEvent.FlowTimings.FLOW_COMPILED));
+    Map<String, String> flowMetadata = 
TimingEventUtils.getFlowMetadata(flowSpec);
+
+    if (jobExecutionPlanDag == null || jobExecutionPlanDag.isEmpty()) {
+      populateFlowCompilationFailedEventMessage(eventSubmitter, flowSpec, 
flowMetadata);
+      return Optional.absent();
+    }
+
+    addFlowExecutionIdIfAbsent(flowMetadata, jobExecutionPlanDag);
+    if (flowCompilationTimer.isPresent()) {
+      flowCompilationTimer.get().stop(flowMetadata);
+    }
+    return Optional.of(jobExecutionPlanDag);
+  }
+
+  /**
+   * Checks if flowSpec disallows concurrent executions, and if so then checks 
if another instance of the flow is
+   * already running and emits a FLOW FAILED event. Otherwise, this check 
passes.
+   * @return true if caller can proceed to execute flow, false otherwise
+   * @throws IOException
+   */
+  public static boolean isExecutionPermittedHandler(SharedFlowMetricsContainer 
sharedFlowMetricsContainer,
+      SpecCompiler specCompiler, UserQuotaManager quotaManager, 
Optional<EventSubmitter> eventSubmitter,
+      FlowStatusGenerator flowStatusGenerator, Logger log, boolean 
flowConcurrencyFlag, Config flowConfig, Spec spec,
+      String flowName, String flowGroup) throws IOException {
+    boolean allowConcurrentExecution = ConfigUtils.getBoolean(flowConfig,
+        ConfigurationKeys.FLOW_ALLOW_CONCURRENT_EXECUTION, 
flowConcurrencyFlag);
+
+    Dag<JobExecutionPlan> jobExecutionPlanDag = specCompiler.compileFlow(spec);
+
+    if (!isExecutionPermitted(flowStatusGenerator, flowName, flowGroup, 
allowConcurrentExecution)) {
+      log.warn("Another instance of flowGroup: {}, flowName: {} running; 
Skipping flow execution since "
+          + "concurrent executions are disabled for this flow.", flowGroup, 
flowName);
+      sharedFlowMetricsContainer.conditionallyUpdateFlowGaugeSpecState(spec, 
SharedFlowMetricsContainer.CompiledState.SKIPPED);
+      
Instrumented.markMeter(sharedFlowMetricsContainer.getSkippedFlowsMeter());
+      if (!((FlowSpec) 
spec).getConfigAsProperties().containsKey(ConfigurationKeys.JOB_SCHEDULE_KEY)) {
+        // For ad-hoc flow, we might already increase quota, we need to 
decrease here
+        for (Dag.DagNode dagNode : jobExecutionPlanDag.getStartNodes()) {
+          quotaManager.releaseQuota(dagNode);
+        }
+      }
+
+      // Send FLOW_FAILED event
+      Map<String, String> flowMetadata = 
TimingEventUtils.getFlowMetadata((FlowSpec) spec);
+      flowMetadata.put(TimingEvent.METADATA_MESSAGE, "Flow failed because 
another instance is running and concurrent "
+          + "executions are disabled. Set flow.allowConcurrentExecution to 
true in the flow spec to change this behaviour.");
+      if (eventSubmitter.isPresent()) {
+        new TimingEvent(eventSubmitter.get(), 
TimingEvent.FlowTimings.FLOW_FAILED).stop(flowMetadata);
+      }
+      return false;
+    }
+    return true;
+  }
+
+  /**
+   * Check if a FlowSpec instance is allowed to run.
+   *
+   * @param flowName
+   * @param flowGroup
+   * @param allowConcurrentExecution
+   * @return true if the {@link FlowSpec} allows concurrent executions or if 
no other instance of the flow is currently RUNNING.
+   */
+  private static boolean isExecutionPermitted(FlowStatusGenerator 
flowStatusGenerator, String flowName,
+      String flowGroup, boolean allowConcurrentExecution) {
+    if (allowConcurrentExecution) {
+      return true;
+    } else {
+      return !flowStatusGenerator.isFlowRunning(flowName, flowGroup);
+    }

Review Comment:
   nit:
   ```
   return allowConcurrent || !fsg.isFlowRunning(...)
   ```



##########
gobblin-service/src/main/java/org/apache/gobblin/service/modules/utils/SharedFlowMetricsContainer.java:
##########
@@ -0,0 +1,101 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *    http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.gobblin.service.modules.utils;
+
+import com.codahale.metrics.Meter;
+import com.codahale.metrics.MetricRegistry;
+import com.google.common.base.Optional;
+import com.google.common.collect.Maps;
+import com.typesafe.config.Config;
+import java.util.Map;
+import javax.inject.Inject;
+import javax.inject.Singleton;
+import lombok.Data;
+import lombok.Setter;
+import org.apache.gobblin.configuration.ConfigurationKeys;
+import org.apache.gobblin.instrumented.Instrumented;
+import org.apache.gobblin.metrics.ContextAwareGauge;
+import org.apache.gobblin.metrics.MetricContext;
+import org.apache.gobblin.metrics.RootMetricContext;
+import org.apache.gobblin.metrics.ServiceMetricNames;
+import org.apache.gobblin.runtime.api.Spec;
+import org.apache.gobblin.service.modules.orchestration.Orchestrator;
+import org.apache.gobblin.util.ConfigUtils;
+
+
+/**
+ * Class to store flow related metrics shared between the {@link Orchestrator} 
and {@link DagManager} so we can easily
+ * track all flow compilations and skipped flows handled between the two in a 
common place.
+ */
+@Singleton
+@Data
+public class SharedFlowMetricsContainer {
+  protected final MetricContext metricContext;
+  private Map<String, FlowCompiledState> flowGauges = Maps.newHashMap();
+  private Optional<Meter> skippedFlowsMeter;
+
+  @Inject
+  public SharedFlowMetricsContainer(Config config) {
+    this.metricContext = 
Instrumented.getMetricContext(ConfigUtils.configToState(config), 
SharedFlowMetricsContainer.class);
+    this.skippedFlowsMeter = 
Optional.of(metricContext.contextAwareMeter(ServiceMetricNames.SKIPPED_FLOWS));
+  }
+
+  /**
+   * Adds a new FlowGauge to the metric context if one does not already exist 
for this flow spec
+   */
+  public void addFlowGauge(Spec spec, Config flowConfig, String flowName, 
String flowGroup) {
+    // Only register the metric of flows that are scheduled, run once flows 
should not be tracked indefinitely
+    if (!flowGauges.containsKey(spec.getUri().toString()) && 
flowConfig.hasPath(ConfigurationKeys.JOB_SCHEDULE_KEY)) {
+      String flowCompiledGaugeName =
+          MetricRegistry.name(ServiceMetricNames.GOBBLIN_SERVICE_PREFIX, 
flowGroup, flowName, ServiceMetricNames.COMPILED);
+      flowGauges.put(spec.getUri().toString(), new FlowCompiledState());
+      ContextAwareGauge<Integer> gauge =
+          RootMetricContext.get().newContextAwareGauge(flowCompiledGaugeName,
+              () -> flowGauges.get(spec.getUri().toString()).state.value);
+      RootMetricContext.get().register(flowCompiledGaugeName, gauge);
+    }
+  }
+  /**
+   * Updates the flowgauge related to the spec if the gauge is being tracked 
for the flow
+   * @param spec FlowSpec to be updated
+   * @param state desired state to set the gauge
+   */
+  public void conditionallyUpdateFlowGaugeSpecState(Spec spec, CompiledState 
state) {
+    if (flowGauges.containsKey(spec.getUri().toString())) {
+      flowGauges.get(spec.getUri().toString()).setState(state);
+    }
+  }
+
+  @Setter
+  public static class FlowCompiledState {

Review Comment:
   nit: given this is mentioned above, I'd suggest introducing beforehand, by 
moving this def to the top of the enclosing class



##########
gobblin-service/src/main/java/org/apache/gobblin/service/modules/utils/ExecutionChecksUtil.java:
##########
@@ -0,0 +1,185 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *    http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.gobblin.service.modules.utils;
+
+import com.google.common.base.Optional;
+import com.typesafe.config.Config;
+import java.io.IOException;
+import java.util.Map;
+import org.apache.gobblin.configuration.ConfigurationKeys;
+import org.apache.gobblin.instrumented.Instrumented;
+import org.apache.gobblin.metrics.event.EventSubmitter;
+import org.apache.gobblin.metrics.event.TimingEvent;
+import org.apache.gobblin.runtime.api.FlowSpec;
+import org.apache.gobblin.runtime.api.Spec;
+import org.apache.gobblin.service.modules.flow.SpecCompiler;
+import org.apache.gobblin.service.modules.flowgraph.Dag;
+import org.apache.gobblin.service.modules.orchestration.TimingEventUtils;
+import org.apache.gobblin.service.modules.orchestration.UserQuotaManager;
+import org.apache.gobblin.service.modules.spec.JobExecutionPlan;
+import org.apache.gobblin.service.monitoring.FlowStatusGenerator;
+import org.apache.gobblin.util.ConfigUtils;
+import org.slf4j.Logger;
+
+
+/**
+ * Stateless class with functionality meant to be re-used between the 
DagManager and Orchestrator when launching
+ * executions of a flow spec. In the common case, the Orchestrator receives a 
flow to orchestrate, performs necessary
+ * validations, and forwards the execution responsibility to the DagManager. 
The DagManager's responsibility is to
+ * carry out any flow action requests. However, with launch executions now 
being stored in the DagActionStateStore, on
+ * restart or leadership change the DagManager has to perform validations 
before executing any launch actions the
+ * previous leader was unable to complete. Rather than duplicating the code or 
introducing a circular dependency between
+ * the DagManager and Orchestrator, this class is utilized to store the common 
functionality. It is stateless and
+ * requires all stateful pieces to be passed as input from the caller.
+ * Note: We expect further refactoring to be done to the DagManager in later 
stage of multi-active development so we do
+ * not attempt major reorganization as abstractions may change.
+ */
+public final class ExecutionChecksUtil {
+
+  /**
+   * For a given a flowSpec, verifies that an execution is allowed (in case 
there is an ongoing execution) and the
+   * flowspec can be compiled. If the pre-conditions hold, then a 
JobExecutionPlan is constructed and returned to the
+   * caller.
+   * @return jobExecutionPlan dag if one can be constructed for the given 
flowSpec
+   */
+  public static Optional<Dag<JobExecutionPlan>> handleChecksBeforeExecution(
+      SharedFlowMetricsContainer sharedFlowMetricsContainer, SpecCompiler 
specCompiler, UserQuotaManager quotaManager,
+      Optional<EventSubmitter> eventSubmitter, FlowStatusGenerator 
flowStatusGenerator, Logger log,
+      boolean flowConcurrencyFlag, FlowSpec flowSpec)
+      throws IOException, InterruptedException {
+    Config flowConfig = flowSpec.getConfig();
+    String flowGroup = flowConfig.getString(ConfigurationKeys.FLOW_GROUP_KEY);
+    String flowName = flowConfig.getString(ConfigurationKeys.FLOW_NAME_KEY);
+    if (!isExecutionPermittedHandler(sharedFlowMetricsContainer, specCompiler, 
quotaManager, eventSubmitter,
+        flowStatusGenerator, log, flowConcurrencyFlag, flowConfig, flowSpec, 
flowName, flowGroup)) {
+      return Optional.absent();
+    }
+
+    //Wait for the SpecCompiler to become healthy.
+    specCompiler.awaitHealthy();
+
+    Dag<JobExecutionPlan> jobExecutionPlanDag = 
specCompiler.compileFlow(flowSpec);

Review Comment:
   is this different from what just happened inside 
`isExecutionPermittedHandler`?  if not, rather than repeat, let's return 
`Optional<Dag<JobExecutionPlan>>` from the latter, rather than the boolean.  if 
there's a reason NOT to do that, then please add a comment here, describing the 
pitfall, so a maintainer gets a clear idea of what to watch out, before 
potentially trying that



##########
gobblin-service/src/main/java/org/apache/gobblin/service/modules/utils/ExecutionChecksUtil.java:
##########
@@ -0,0 +1,185 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *    http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.gobblin.service.modules.utils;
+
+import com.google.common.base.Optional;
+import com.typesafe.config.Config;
+import java.io.IOException;
+import java.util.Map;
+import org.apache.gobblin.configuration.ConfigurationKeys;
+import org.apache.gobblin.instrumented.Instrumented;
+import org.apache.gobblin.metrics.event.EventSubmitter;
+import org.apache.gobblin.metrics.event.TimingEvent;
+import org.apache.gobblin.runtime.api.FlowSpec;
+import org.apache.gobblin.runtime.api.Spec;
+import org.apache.gobblin.service.modules.flow.SpecCompiler;
+import org.apache.gobblin.service.modules.flowgraph.Dag;
+import org.apache.gobblin.service.modules.orchestration.TimingEventUtils;
+import org.apache.gobblin.service.modules.orchestration.UserQuotaManager;
+import org.apache.gobblin.service.modules.spec.JobExecutionPlan;
+import org.apache.gobblin.service.monitoring.FlowStatusGenerator;
+import org.apache.gobblin.util.ConfigUtils;
+import org.slf4j.Logger;
+
+
+/**
+ * Stateless class with functionality meant to be re-used between the 
DagManager and Orchestrator when launching
+ * executions of a flow spec. In the common case, the Orchestrator receives a 
flow to orchestrate, performs necessary
+ * validations, and forwards the execution responsibility to the DagManager. 
The DagManager's responsibility is to
+ * carry out any flow action requests. However, with launch executions now 
being stored in the DagActionStateStore, on
+ * restart or leadership change the DagManager has to perform validations 
before executing any launch actions the
+ * previous leader was unable to complete. Rather than duplicating the code or 
introducing a circular dependency between
+ * the DagManager and Orchestrator, this class is utilized to store the common 
functionality. It is stateless and
+ * requires all stateful pieces to be passed as input from the caller.
+ * Note: We expect further refactoring to be done to the DagManager in later 
stage of multi-active development so we do
+ * not attempt major reorganization as abstractions may change.
+ */
+public final class ExecutionChecksUtil {
+
+  /**
+   * For a given a flowSpec, verifies that an execution is allowed (in case 
there is an ongoing execution) and the
+   * flowspec can be compiled. If the pre-conditions hold, then a 
JobExecutionPlan is constructed and returned to the
+   * caller.
+   * @return jobExecutionPlan dag if one can be constructed for the given 
flowSpec
+   */
+  public static Optional<Dag<JobExecutionPlan>> handleChecksBeforeExecution(
+      SharedFlowMetricsContainer sharedFlowMetricsContainer, SpecCompiler 
specCompiler, UserQuotaManager quotaManager,
+      Optional<EventSubmitter> eventSubmitter, FlowStatusGenerator 
flowStatusGenerator, Logger log,
+      boolean flowConcurrencyFlag, FlowSpec flowSpec)
+      throws IOException, InterruptedException {
+    Config flowConfig = flowSpec.getConfig();
+    String flowGroup = flowConfig.getString(ConfigurationKeys.FLOW_GROUP_KEY);
+    String flowName = flowConfig.getString(ConfigurationKeys.FLOW_NAME_KEY);
+    if (!isExecutionPermittedHandler(sharedFlowMetricsContainer, specCompiler, 
quotaManager, eventSubmitter,
+        flowStatusGenerator, log, flowConcurrencyFlag, flowConfig, flowSpec, 
flowName, flowGroup)) {
+      return Optional.absent();
+    }
+
+    //Wait for the SpecCompiler to become healthy.
+    specCompiler.awaitHealthy();
+
+    Dag<JobExecutionPlan> jobExecutionPlanDag = 
specCompiler.compileFlow(flowSpec);
+    Optional<TimingEvent> flowCompilationTimer =
+        eventSubmitter.transform(submitter -> new TimingEvent(submitter, 
TimingEvent.FlowTimings.FLOW_COMPILED));
+    Map<String, String> flowMetadata = 
TimingEventUtils.getFlowMetadata(flowSpec);
+
+    if (jobExecutionPlanDag == null || jobExecutionPlanDag.isEmpty()) {
+      populateFlowCompilationFailedEventMessage(eventSubmitter, flowSpec, 
flowMetadata);
+      return Optional.absent();
+    }
+
+    addFlowExecutionIdIfAbsent(flowMetadata, jobExecutionPlanDag);
+    if (flowCompilationTimer.isPresent()) {
+      flowCompilationTimer.get().stop(flowMetadata);
+    }
+    return Optional.of(jobExecutionPlanDag);
+  }
+
+  /**
+   * Checks if flowSpec disallows concurrent executions, and if so then checks 
if another instance of the flow is
+   * already running and emits a FLOW FAILED event. Otherwise, this check 
passes.
+   * @return true if caller can proceed to execute flow, false otherwise
+   * @throws IOException
+   */
+  public static boolean isExecutionPermittedHandler(SharedFlowMetricsContainer 
sharedFlowMetricsContainer,

Review Comment:
   nit: a "handler" is usually a class or a func/method reference invoked as a 
callback.  this also isn't really a predicate (e.g. `isXYZ`), as it performs so 
many side-effects, like quota mgmt, etc.
   
   maybe `validateExecutionOrFinalizeRejection`?  (sorry, you're likely to find 
a better name)



##########
gobblin-service/src/main/java/org/apache/gobblin/service/modules/utils/ExecutionChecksUtil.java:
##########
@@ -0,0 +1,185 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *    http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.gobblin.service.modules.utils;
+
+import com.google.common.base.Optional;
+import com.typesafe.config.Config;
+import java.io.IOException;
+import java.util.Map;
+import org.apache.gobblin.configuration.ConfigurationKeys;
+import org.apache.gobblin.instrumented.Instrumented;
+import org.apache.gobblin.metrics.event.EventSubmitter;
+import org.apache.gobblin.metrics.event.TimingEvent;
+import org.apache.gobblin.runtime.api.FlowSpec;
+import org.apache.gobblin.runtime.api.Spec;
+import org.apache.gobblin.service.modules.flow.SpecCompiler;
+import org.apache.gobblin.service.modules.flowgraph.Dag;
+import org.apache.gobblin.service.modules.orchestration.TimingEventUtils;
+import org.apache.gobblin.service.modules.orchestration.UserQuotaManager;
+import org.apache.gobblin.service.modules.spec.JobExecutionPlan;
+import org.apache.gobblin.service.monitoring.FlowStatusGenerator;
+import org.apache.gobblin.util.ConfigUtils;
+import org.slf4j.Logger;
+
+
+/**
+ * Stateless class with functionality meant to be re-used between the 
DagManager and Orchestrator when launching
+ * executions of a flow spec. In the common case, the Orchestrator receives a 
flow to orchestrate, performs necessary
+ * validations, and forwards the execution responsibility to the DagManager. 
The DagManager's responsibility is to
+ * carry out any flow action requests. However, with launch executions now 
being stored in the DagActionStateStore, on
+ * restart or leadership change the DagManager has to perform validations 
before executing any launch actions the
+ * previous leader was unable to complete. Rather than duplicating the code or 
introducing a circular dependency between
+ * the DagManager and Orchestrator, this class is utilized to store the common 
functionality. It is stateless and
+ * requires all stateful pieces to be passed as input from the caller.
+ * Note: We expect further refactoring to be done to the DagManager in later 
stage of multi-active development so we do
+ * not attempt major reorganization as abstractions may change.
+ */
+public final class ExecutionChecksUtil {

Review Comment:
   `FlowCompilationValidationHelper`?
   
   also, I completely agree w/ the statelessness (and `final`ity) of this 
abstraction, but just for comprehensibility I'd suggest, creating a ctor merely 
to separate "the args that stay the same" from "the changing ones", to put the 
latter front and center.
   
   e.g. so if not called like this:
   ```
   this.fcHelper = new FlowCompilationValidationHelper(
       sharedFlowMetricsContainer, specCompiler, quotaManager,
       optEventSubmitter, flowStatusGenerator, log, isFlowConcurrencyEnabled);
   
   ... // way later on
   Optional<Dag<JobExecutionPlan>> optDag = 
fcHelper.createExecutionPlanIfValid(flowSpec);
   ```
   then at least like this:
   ```
   Optional<Dag<JobExecutionPlan>> optDag = new FlowCompilationValidationHelper(
       sharedFlowMetricsContainer, specCompiler, quotaManager,
       optEventSubmitter, flowStatusGenerator, log, isFlowConcurrencyEnabled)
       .createExecutionPlanIfValid(flowSpec);
   ```
   
   p.s. populating the compilation failure message could remain `static`





Issue Time Tracking
-------------------

    Worklog Id:     (was: 874724)
    Time Spent: 0.5h  (was: 20m)

> Refactor Common Utils between Orchestrator & DagManager
> -------------------------------------------------------
>
>                 Key: GOBBLIN-1868
>                 URL: https://issues.apache.org/jira/browse/GOBBLIN-1868
>             Project: Apache Gobblin
>          Issue Type: Bug
>          Components: gobblin-service
>            Reporter: Urmi Mustafi
>            Assignee: Abhishek Tiwari
>            Priority: Major
>          Time Spent: 0.5h
>  Remaining Estimate: 0h
>
> Create a Util class to contain functionality re-used between the DagManager 
> and Orchestrator when launching executions of a flow spec. In the common 
> case, the Orchestrator receives a flow to orchestrate, performs necessary 
> validations, and forwards the execution responsibility to the DagManager. The 
> DagManager's responsibility is to carry out any flow action requests. 
> However, with launch executions now being stored in the DagActionStateStore, 
> on restart or leadership change the DagManager has to perform validations 
> before executing any launch actions the previous leader was unable to 
> complete. Rather than duplicating the code or introducing a circular 
> dependency between the DagManager and Orchestrator, this class is utilized to 
> store the common functionality. It is stateless and requires all stateful 
> pieces to be passed as input from the caller.
> * Note: We expect further refactoring to be done to the DagManager in later 
> stage of multi-active development so we do not attempt *major* reorganization 
> as abstractions may change. 
>  



--
This message was sent by Atlassian Jira
(v8.20.10#820010)

Reply via email to