phet commented on code in PR #3731:
URL: https://github.com/apache/gobblin/pull/3731#discussion_r1284711212


##########
gobblin-service/src/main/java/org/apache/gobblin/service/modules/orchestration/DagManager.java:
##########
@@ -240,7 +252,26 @@ public DagManager(Config config, JobStatusRetriever 
jobStatusRetriever, Orchestr
     TimeUnit jobStartTimeUnit = TimeUnit.valueOf(ConfigUtils.getString(config, 
JOB_START_SLA_UNITS, 
ConfigurationKeys.FALLBACK_GOBBLIN_JOB_START_SLA_TIME_UNIT));
     this.defaultJobStartSlaTimeMillis = 
jobStartTimeUnit.toMillis(ConfigUtils.getLong(config, JOB_START_SLA_TIME, 
ConfigurationKeys.FALLBACK_GOBBLIN_JOB_START_SLA_TIME));
     this.jobStatusRetriever = jobStatusRetriever;
-    this.orchestrator = orchestrator;
+    this.aliasResolver = new ClassAliasResolver<>(SpecCompiler.class);
+    this.flowStatusGenerator = flowStatusGenerator;
+    try {
+      String specCompilerClassName = 
ServiceConfigKeys.DEFAULT_GOBBLIN_SERVICE_FLOWCOMPILER_CLASS;
+      if 
(config.hasPath(ServiceConfigKeys.GOBBLIN_SERVICE_FLOWCOMPILER_CLASS_KEY)) {
+        specCompilerClassName = 
config.getString(ServiceConfigKeys.GOBBLIN_SERVICE_FLOWCOMPILER_CLASS_KEY);

Review Comment:
   I prefer the use of `ConfigUtils.getString` below, which takes a default, so 
all this can happen in a single line, w/o needing an `if` block.



##########
gobblin-service/src/main/java/org/apache/gobblin/service/modules/orchestration/DagManager.java:
##########
@@ -240,7 +252,26 @@ public DagManager(Config config, JobStatusRetriever 
jobStatusRetriever, Orchestr
     TimeUnit jobStartTimeUnit = TimeUnit.valueOf(ConfigUtils.getString(config, 
JOB_START_SLA_UNITS, 
ConfigurationKeys.FALLBACK_GOBBLIN_JOB_START_SLA_TIME_UNIT));
     this.defaultJobStartSlaTimeMillis = 
jobStartTimeUnit.toMillis(ConfigUtils.getLong(config, JOB_START_SLA_TIME, 
ConfigurationKeys.FALLBACK_GOBBLIN_JOB_START_SLA_TIME));
     this.jobStatusRetriever = jobStatusRetriever;
-    this.orchestrator = orchestrator;
+    this.aliasResolver = new ClassAliasResolver<>(SpecCompiler.class);
+    this.flowStatusGenerator = flowStatusGenerator;
+    try {
+      String specCompilerClassName = 
ServiceConfigKeys.DEFAULT_GOBBLIN_SERVICE_FLOWCOMPILER_CLASS;
+      if 
(config.hasPath(ServiceConfigKeys.GOBBLIN_SERVICE_FLOWCOMPILER_CLASS_KEY)) {
+        specCompilerClassName = 
config.getString(ServiceConfigKeys.GOBBLIN_SERVICE_FLOWCOMPILER_CLASS_KEY);
+      }
+      log.info("Using specCompiler class name/alias " + specCompilerClassName);
+
+      this.specCompiler = (SpecCompiler) 
ConstructorUtils.invokeConstructor(Class.forName(this.aliasResolver.resolve(specCompilerClassName)),
 config);
+    } catch (NoSuchMethodException | IllegalAccessException | 
InvocationTargetException | InstantiationException |
+             ClassNotFoundException e) {
+      throw new RuntimeException(e);
+    }
+    this.flowConcurrencyFlag = ConfigUtils.getBoolean(config, 
ServiceConfigKeys.FLOW_CONCURRENCY_ALLOWED,
+        ServiceConfigKeys.DEFAULT_FLOW_CONCURRENCY_ALLOWED);

Review Comment:
   I'm unclear what's meant by "flow concurrency".  AFAIR "allow concurrent 
executions" is flow-scoped, rather than system-level, ain't it?
   
   also, the name `flowConcurrencyFlag` is less preferable to precisely stating 
the meaning/sense of the flag, like `isFlowConcurrencyEnabled` or 
`flowConcurrencyDisabled`



##########
gobblin-service/src/main/java/org/apache/gobblin/service/modules/utils/SharedFlowMetricsContainer.java:
##########
@@ -0,0 +1,101 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *    http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.gobblin.service.modules.utils;
+
+import com.codahale.metrics.Meter;
+import com.codahale.metrics.MetricRegistry;
+import com.google.common.base.Optional;
+import com.google.common.collect.Maps;
+import com.typesafe.config.Config;
+import java.util.Map;
+import javax.inject.Inject;
+import javax.inject.Singleton;
+import lombok.Data;
+import lombok.Setter;
+import org.apache.gobblin.configuration.ConfigurationKeys;
+import org.apache.gobblin.instrumented.Instrumented;
+import org.apache.gobblin.metrics.ContextAwareGauge;
+import org.apache.gobblin.metrics.MetricContext;
+import org.apache.gobblin.metrics.RootMetricContext;
+import org.apache.gobblin.metrics.ServiceMetricNames;
+import org.apache.gobblin.runtime.api.Spec;
+import org.apache.gobblin.service.modules.orchestration.Orchestrator;
+import org.apache.gobblin.util.ConfigUtils;
+
+
+/**
+ * Class to store flow related metrics shared between the {@link Orchestrator} 
and {@link DagManager} so we can easily
+ * track all flow compilations and skipped flows handled between the two in a 
common place.
+ */
+@Singleton
+@Data
+public class SharedFlowMetricsContainer {
+  protected final MetricContext metricContext;
+  private Map<String, FlowCompiledState> flowGauges = Maps.newHashMap();
+  private Optional<Meter> skippedFlowsMeter;
+
+  @Inject
+  public SharedFlowMetricsContainer(Config config) {
+    this.metricContext = 
Instrumented.getMetricContext(ConfigUtils.configToState(config), 
SharedFlowMetricsContainer.class);
+    this.skippedFlowsMeter = 
Optional.of(metricContext.contextAwareMeter(ServiceMetricNames.SKIPPED_FLOWS));
+  }
+
+  /**
+   * Adds a new FlowGauge to the metric context if one does not already exist 
for this flow spec
+   */
+  public void addFlowGauge(Spec spec, Config flowConfig, String flowName, 
String flowGroup) {

Review Comment:
   I find widely-employed, non-semantic types like `String` error-prone, 
because particular instances are readily confused w/ one another, such as w/ 
param/arg mis-ordering.  since `flowGroup` is less-specific than `flowName`, my 
habit is to put it first, which appears to be how `MetricRegistry.name` wants 
them as well... and yet they're swapped in this method's params.
   
   let's seek uniform consistency, so a misordering easily jumps out at 
maintainers.  nonetheless, even better, would be to replace the two `String` 
params w/ a single `FlowId`



##########
gobblin-service/src/main/java/org/apache/gobblin/service/modules/utils/ExecutionChecksUtil.java:
##########
@@ -0,0 +1,185 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *    http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.gobblin.service.modules.utils;
+
+import com.google.common.base.Optional;
+import com.typesafe.config.Config;
+import java.io.IOException;
+import java.util.Map;
+import org.apache.gobblin.configuration.ConfigurationKeys;
+import org.apache.gobblin.instrumented.Instrumented;
+import org.apache.gobblin.metrics.event.EventSubmitter;
+import org.apache.gobblin.metrics.event.TimingEvent;
+import org.apache.gobblin.runtime.api.FlowSpec;
+import org.apache.gobblin.runtime.api.Spec;
+import org.apache.gobblin.service.modules.flow.SpecCompiler;
+import org.apache.gobblin.service.modules.flowgraph.Dag;
+import org.apache.gobblin.service.modules.orchestration.TimingEventUtils;
+import org.apache.gobblin.service.modules.orchestration.UserQuotaManager;
+import org.apache.gobblin.service.modules.spec.JobExecutionPlan;
+import org.apache.gobblin.service.monitoring.FlowStatusGenerator;
+import org.apache.gobblin.util.ConfigUtils;
+import org.slf4j.Logger;
+
+
+/**
+ * Stateless class with functionality meant to be re-used between the 
DagManager and Orchestrator when launching
+ * executions of a flow spec. In the common case, the Orchestrator receives a 
flow to orchestrate, performs necessary
+ * validations, and forwards the execution responsibility to the DagManager. 
The DagManager's responsibility is to
+ * carry out any flow action requests. However, with launch executions now 
being stored in the DagActionStateStore, on
+ * restart or leadership change the DagManager has to perform validations 
before executing any launch actions the
+ * previous leader was unable to complete. Rather than duplicating the code or 
introducing a circular dependency between
+ * the DagManager and Orchestrator, this class is utilized to store the common 
functionality. It is stateless and
+ * requires all stateful pieces to be passed as input from the caller.
+ * Note: We expect further refactoring to be done to the DagManager in later 
stage of multi-active development so we do
+ * not attempt major reorganization as abstractions may change.
+ */
+public final class ExecutionChecksUtil {
+
+  /**
+   * For a given a flowSpec, verifies that an execution is allowed (in case 
there is an ongoing execution) and the
+   * flowspec can be compiled. If the pre-conditions hold, then a 
JobExecutionPlan is constructed and returned to the
+   * caller.
+   * @return jobExecutionPlan dag if one can be constructed for the given 
flowSpec
+   */
+  public static Optional<Dag<JobExecutionPlan>> handleChecksBeforeExecution(
+      SharedFlowMetricsContainer sharedFlowMetricsContainer, SpecCompiler 
specCompiler, UserQuotaManager quotaManager,
+      Optional<EventSubmitter> eventSubmitter, FlowStatusGenerator 
flowStatusGenerator, Logger log,
+      boolean flowConcurrencyFlag, FlowSpec flowSpec)
+      throws IOException, InterruptedException {
+    Config flowConfig = flowSpec.getConfig();
+    String flowGroup = flowConfig.getString(ConfigurationKeys.FLOW_GROUP_KEY);
+    String flowName = flowConfig.getString(ConfigurationKeys.FLOW_NAME_KEY);
+    if (!isExecutionPermittedHandler(sharedFlowMetricsContainer, specCompiler, 
quotaManager, eventSubmitter,
+        flowStatusGenerator, log, flowConcurrencyFlag, flowConfig, flowSpec, 
flowName, flowGroup)) {
+      return Optional.absent();
+    }
+
+    //Wait for the SpecCompiler to become healthy.
+    specCompiler.awaitHealthy();
+
+    Dag<JobExecutionPlan> jobExecutionPlanDag = 
specCompiler.compileFlow(flowSpec);
+    Optional<TimingEvent> flowCompilationTimer =
+        eventSubmitter.transform(submitter -> new TimingEvent(submitter, 
TimingEvent.FlowTimings.FLOW_COMPILED));
+    Map<String, String> flowMetadata = 
TimingEventUtils.getFlowMetadata(flowSpec);
+
+    if (jobExecutionPlanDag == null || jobExecutionPlanDag.isEmpty()) {
+      populateFlowCompilationFailedEventMessage(eventSubmitter, flowSpec, 
flowMetadata);
+      return Optional.absent();
+    }
+
+    addFlowExecutionIdIfAbsent(flowMetadata, jobExecutionPlanDag);
+    if (flowCompilationTimer.isPresent()) {
+      flowCompilationTimer.get().stop(flowMetadata);
+    }
+    return Optional.of(jobExecutionPlanDag);
+  }
+
+  /**
+   * Checks if flowSpec disallows concurrent executions, and if so then checks 
if another instance of the flow is
+   * already running and emits a FLOW FAILED event. Otherwise, this check 
passes.
+   * @return true if caller can proceed to execute flow, false otherwise
+   * @throws IOException
+   */
+  public static boolean isExecutionPermittedHandler(SharedFlowMetricsContainer 
sharedFlowMetricsContainer,
+      SpecCompiler specCompiler, UserQuotaManager quotaManager, 
Optional<EventSubmitter> eventSubmitter,
+      FlowStatusGenerator flowStatusGenerator, Logger log, boolean 
flowConcurrencyFlag, Config flowConfig, Spec spec,
+      String flowName, String flowGroup) throws IOException {

Review Comment:
   I commented elsewhere about lack of uniform ordering between `flowName` and 
`flowGroup` params... as well as way to sidestep via `FlowId` type



##########
gobblin-service/src/main/java/org/apache/gobblin/service/modules/utils/ExecutionChecksUtil.java:
##########
@@ -0,0 +1,185 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *    http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.gobblin.service.modules.utils;
+
+import com.google.common.base.Optional;
+import com.typesafe.config.Config;
+import java.io.IOException;
+import java.util.Map;
+import org.apache.gobblin.configuration.ConfigurationKeys;
+import org.apache.gobblin.instrumented.Instrumented;
+import org.apache.gobblin.metrics.event.EventSubmitter;
+import org.apache.gobblin.metrics.event.TimingEvent;
+import org.apache.gobblin.runtime.api.FlowSpec;
+import org.apache.gobblin.runtime.api.Spec;
+import org.apache.gobblin.service.modules.flow.SpecCompiler;
+import org.apache.gobblin.service.modules.flowgraph.Dag;
+import org.apache.gobblin.service.modules.orchestration.TimingEventUtils;
+import org.apache.gobblin.service.modules.orchestration.UserQuotaManager;
+import org.apache.gobblin.service.modules.spec.JobExecutionPlan;
+import org.apache.gobblin.service.monitoring.FlowStatusGenerator;
+import org.apache.gobblin.util.ConfigUtils;
+import org.slf4j.Logger;
+
+
+/**
+ * Stateless class with functionality meant to be re-used between the 
DagManager and Orchestrator when launching
+ * executions of a flow spec. In the common case, the Orchestrator receives a 
flow to orchestrate, performs necessary
+ * validations, and forwards the execution responsibility to the DagManager. 
The DagManager's responsibility is to
+ * carry out any flow action requests. However, with launch executions now 
being stored in the DagActionStateStore, on
+ * restart or leadership change the DagManager has to perform validations 
before executing any launch actions the
+ * previous leader was unable to complete. Rather than duplicating the code or 
introducing a circular dependency between
+ * the DagManager and Orchestrator, this class is utilized to store the common 
functionality. It is stateless and
+ * requires all stateful pieces to be passed as input from the caller.
+ * Note: We expect further refactoring to be done to the DagManager in later 
stage of multi-active development so we do
+ * not attempt major reorganization as abstractions may change.
+ */
+public final class ExecutionChecksUtil {
+
+  /**
+   * For a given a flowSpec, verifies that an execution is allowed (in case 
there is an ongoing execution) and the
+   * flowspec can be compiled. If the pre-conditions hold, then a 
JobExecutionPlan is constructed and returned to the
+   * caller.
+   * @return jobExecutionPlan dag if one can be constructed for the given 
flowSpec
+   */
+  public static Optional<Dag<JobExecutionPlan>> handleChecksBeforeExecution(
+      SharedFlowMetricsContainer sharedFlowMetricsContainer, SpecCompiler 
specCompiler, UserQuotaManager quotaManager,
+      Optional<EventSubmitter> eventSubmitter, FlowStatusGenerator 
flowStatusGenerator, Logger log,
+      boolean flowConcurrencyFlag, FlowSpec flowSpec)
+      throws IOException, InterruptedException {
+    Config flowConfig = flowSpec.getConfig();
+    String flowGroup = flowConfig.getString(ConfigurationKeys.FLOW_GROUP_KEY);
+    String flowName = flowConfig.getString(ConfigurationKeys.FLOW_NAME_KEY);
+    if (!isExecutionPermittedHandler(sharedFlowMetricsContainer, specCompiler, 
quotaManager, eventSubmitter,
+        flowStatusGenerator, log, flowConcurrencyFlag, flowConfig, flowSpec, 
flowName, flowGroup)) {
+      return Optional.absent();
+    }
+
+    //Wait for the SpecCompiler to become healthy.
+    specCompiler.awaitHealthy();
+
+    Dag<JobExecutionPlan> jobExecutionPlanDag = 
specCompiler.compileFlow(flowSpec);
+    Optional<TimingEvent> flowCompilationTimer =
+        eventSubmitter.transform(submitter -> new TimingEvent(submitter, 
TimingEvent.FlowTimings.FLOW_COMPILED));
+    Map<String, String> flowMetadata = 
TimingEventUtils.getFlowMetadata(flowSpec);
+
+    if (jobExecutionPlanDag == null || jobExecutionPlanDag.isEmpty()) {
+      populateFlowCompilationFailedEventMessage(eventSubmitter, flowSpec, 
flowMetadata);
+      return Optional.absent();
+    }
+
+    addFlowExecutionIdIfAbsent(flowMetadata, jobExecutionPlanDag);
+    if (flowCompilationTimer.isPresent()) {
+      flowCompilationTimer.get().stop(flowMetadata);
+    }
+    return Optional.of(jobExecutionPlanDag);
+  }
+
+  /**
+   * Checks if flowSpec disallows concurrent executions, and if so then checks 
if another instance of the flow is
+   * already running and emits a FLOW FAILED event. Otherwise, this check 
passes.
+   * @return true if caller can proceed to execute flow, false otherwise
+   * @throws IOException
+   */
+  public static boolean isExecutionPermittedHandler(SharedFlowMetricsContainer 
sharedFlowMetricsContainer,
+      SpecCompiler specCompiler, UserQuotaManager quotaManager, 
Optional<EventSubmitter> eventSubmitter,
+      FlowStatusGenerator flowStatusGenerator, Logger log, boolean 
flowConcurrencyFlag, Config flowConfig, Spec spec,
+      String flowName, String flowGroup) throws IOException {
+    boolean allowConcurrentExecution = ConfigUtils.getBoolean(flowConfig,
+        ConfigurationKeys.FLOW_ALLOW_CONCURRENT_EXECUTION, 
flowConcurrencyFlag);
+
+    Dag<JobExecutionPlan> jobExecutionPlanDag = specCompiler.compileFlow(spec);
+
+    if (!isExecutionPermitted(flowStatusGenerator, flowName, flowGroup, 
allowConcurrentExecution)) {
+      log.warn("Another instance of flowGroup: {}, flowName: {} running; 
Skipping flow execution since "
+          + "concurrent executions are disabled for this flow.", flowGroup, 
flowName);
+      sharedFlowMetricsContainer.conditionallyUpdateFlowGaugeSpecState(spec, 
SharedFlowMetricsContainer.CompiledState.SKIPPED);
+      
Instrumented.markMeter(sharedFlowMetricsContainer.getSkippedFlowsMeter());
+      if (!((FlowSpec) 
spec).getConfigAsProperties().containsKey(ConfigurationKeys.JOB_SCHEDULE_KEY)) {
+        // For ad-hoc flow, we might already increase quota, we need to 
decrease here
+        for (Dag.DagNode dagNode : jobExecutionPlanDag.getStartNodes()) {
+          quotaManager.releaseQuota(dagNode);
+        }
+      }
+
+      // Send FLOW_FAILED event
+      Map<String, String> flowMetadata = 
TimingEventUtils.getFlowMetadata((FlowSpec) spec);
+      flowMetadata.put(TimingEvent.METADATA_MESSAGE, "Flow failed because 
another instance is running and concurrent "
+          + "executions are disabled. Set flow.allowConcurrentExecution to 
true in the flow spec to change this behaviour.");
+      if (eventSubmitter.isPresent()) {
+        new TimingEvent(eventSubmitter.get(), 
TimingEvent.FlowTimings.FLOW_FAILED).stop(flowMetadata);
+      }
+      return false;
+    }
+    return true;

Review Comment:
   not sure where we are in the nesting, so that I've got this right... *but 
that's the problem I'm suggesting to solve here!* ;)
   
   how about:
   ```
   if (isExecPermitted(...)) {
     return true;
   } else {
     ... // all the many other lines of code
   }
   ```
   ?



##########
gobblin-service/src/main/java/org/apache/gobblin/service/modules/utils/ExecutionChecksUtil.java:
##########
@@ -0,0 +1,185 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *    http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.gobblin.service.modules.utils;
+
+import com.google.common.base.Optional;
+import com.typesafe.config.Config;
+import java.io.IOException;
+import java.util.Map;
+import org.apache.gobblin.configuration.ConfigurationKeys;
+import org.apache.gobblin.instrumented.Instrumented;
+import org.apache.gobblin.metrics.event.EventSubmitter;
+import org.apache.gobblin.metrics.event.TimingEvent;
+import org.apache.gobblin.runtime.api.FlowSpec;
+import org.apache.gobblin.runtime.api.Spec;
+import org.apache.gobblin.service.modules.flow.SpecCompiler;
+import org.apache.gobblin.service.modules.flowgraph.Dag;
+import org.apache.gobblin.service.modules.orchestration.TimingEventUtils;
+import org.apache.gobblin.service.modules.orchestration.UserQuotaManager;
+import org.apache.gobblin.service.modules.spec.JobExecutionPlan;
+import org.apache.gobblin.service.monitoring.FlowStatusGenerator;
+import org.apache.gobblin.util.ConfigUtils;
+import org.slf4j.Logger;
+
+
+/**
+ * Stateless class with functionality meant to be re-used between the 
DagManager and Orchestrator when launching
+ * executions of a flow spec. In the common case, the Orchestrator receives a 
flow to orchestrate, performs necessary
+ * validations, and forwards the execution responsibility to the DagManager. 
The DagManager's responsibility is to
+ * carry out any flow action requests. However, with launch executions now 
being stored in the DagActionStateStore, on
+ * restart or leadership change the DagManager has to perform validations 
before executing any launch actions the
+ * previous leader was unable to complete. Rather than duplicating the code or 
introducing a circular dependency between
+ * the DagManager and Orchestrator, this class is utilized to store the common 
functionality. It is stateless and
+ * requires all stateful pieces to be passed as input from the caller.
+ * Note: We expect further refactoring to be done to the DagManager in later 
stage of multi-active development so we do
+ * not attempt major reorganization as abstractions may change.
+ */
+public final class ExecutionChecksUtil {
+
+  /**
+   * For a given a flowSpec, verifies that an execution is allowed (in case 
there is an ongoing execution) and the
+   * flowspec can be compiled. If the pre-conditions hold, then a 
JobExecutionPlan is constructed and returned to the
+   * caller.
+   * @return jobExecutionPlan dag if one can be constructed for the given 
flowSpec
+   */
+  public static Optional<Dag<JobExecutionPlan>> handleChecksBeforeExecution(

Review Comment:
   perhaps more specific... `compileToExecutionPlan`?  
`createExecutionPlanIfValid`?



##########
gobblin-service/src/main/java/org/apache/gobblin/service/modules/orchestration/DagManager.java:
##########
@@ -206,7 +212,12 @@ public String toString() {
   protected final Long defaultJobStartSlaTimeMillis;
   @Getter
   private final JobStatusRetriever jobStatusRetriever;
-  private final Orchestrator orchestrator;
+  private final FlowStatusGenerator flowStatusGenerator;
+  private final UserQuotaManager quotaManager;
+  private final ClassAliasResolver<SpecCompiler> aliasResolver;

Review Comment:
   is this used outside the ctor?  if not, why an instance member?



##########
gobblin-service/src/main/java/org/apache/gobblin/service/modules/utils/SharedFlowMetricsContainer.java:
##########
@@ -0,0 +1,101 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *    http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.gobblin.service.modules.utils;
+
+import com.codahale.metrics.Meter;
+import com.codahale.metrics.MetricRegistry;
+import com.google.common.base.Optional;
+import com.google.common.collect.Maps;
+import com.typesafe.config.Config;
+import java.util.Map;
+import javax.inject.Inject;
+import javax.inject.Singleton;
+import lombok.Data;
+import lombok.Setter;
+import org.apache.gobblin.configuration.ConfigurationKeys;
+import org.apache.gobblin.instrumented.Instrumented;
+import org.apache.gobblin.metrics.ContextAwareGauge;
+import org.apache.gobblin.metrics.MetricContext;
+import org.apache.gobblin.metrics.RootMetricContext;
+import org.apache.gobblin.metrics.ServiceMetricNames;
+import org.apache.gobblin.runtime.api.Spec;
+import org.apache.gobblin.service.modules.orchestration.Orchestrator;
+import org.apache.gobblin.util.ConfigUtils;
+
+
+/**
+ * Class to store flow related metrics shared between the {@link Orchestrator} 
and {@link DagManager} so we can easily
+ * track all flow compilations and skipped flows handled between the two in a 
common place.
+ */
+@Singleton
+@Data
+public class SharedFlowMetricsContainer {

Review Comment:
   naming-wise: "container" has very specific meaning: an exec env that hosts 
user-supplied code.  e.g. "my app runs in a container," or "container 
deployment failed."   more rarely, it's a synonym for "collection", especially 
the abstract concept.  e.g. "Map, List, and PriorityQueue are Java container 
types."
   
   this class, looks like simply `SharedFlowMetrics` or the 
`SharedFlowMetricsSingleton` or a `SharedFlowMetricsTracker`...



##########
gobblin-service/src/main/java/org/apache/gobblin/service/modules/orchestration/DagManager.java:
##########
@@ -240,7 +252,26 @@ public DagManager(Config config, JobStatusRetriever 
jobStatusRetriever, Orchestr
     TimeUnit jobStartTimeUnit = TimeUnit.valueOf(ConfigUtils.getString(config, 
JOB_START_SLA_UNITS, 
ConfigurationKeys.FALLBACK_GOBBLIN_JOB_START_SLA_TIME_UNIT));
     this.defaultJobStartSlaTimeMillis = 
jobStartTimeUnit.toMillis(ConfigUtils.getLong(config, JOB_START_SLA_TIME, 
ConfigurationKeys.FALLBACK_GOBBLIN_JOB_START_SLA_TIME));
     this.jobStatusRetriever = jobStatusRetriever;
-    this.orchestrator = orchestrator;
+    this.aliasResolver = new ClassAliasResolver<>(SpecCompiler.class);
+    this.flowStatusGenerator = flowStatusGenerator;
+    try {
+      String specCompilerClassName = 
ServiceConfigKeys.DEFAULT_GOBBLIN_SERVICE_FLOWCOMPILER_CLASS;
+      if 
(config.hasPath(ServiceConfigKeys.GOBBLIN_SERVICE_FLOWCOMPILER_CLASS_KEY)) {
+        specCompilerClassName = 
config.getString(ServiceConfigKeys.GOBBLIN_SERVICE_FLOWCOMPILER_CLASS_KEY);
+      }
+      log.info("Using specCompiler class name/alias " + specCompilerClassName);
+
+      this.specCompiler = (SpecCompiler) 
ConstructorUtils.invokeConstructor(Class.forName(this.aliasResolver.resolve(specCompilerClassName)),
 config);
+    } catch (NoSuchMethodException | IllegalAccessException | 
InvocationTargetException | InstantiationException |
+             ClassNotFoundException e) {
+      throw new RuntimeException(e);
+    }

Review Comment:
   this ctor is 46 lines long now, with 13 involving initializing this 
`SpecCompiler`.  let's streamline and minimize the distraction by abstracting 
it into a one-line method call, like:
   ```
   this.specCompiler = createSpecCompiler(config);
   ```
   
   alternatively, is there a way to use the `GobblinConstructorUtils`, like you 
did below for the `UserQuotaManager`?  e.g. - 
https://github.com/apache/gobblin/blob/2917b6311949d25f898a6691fe0730d344299d25/gobblin-utility/src/main/java/org/apache/gobblin/util/reflection/GobblinConstructorUtils.java#L110



##########
gobblin-service/src/main/java/org/apache/gobblin/service/modules/utils/SharedFlowMetricsContainer.java:
##########
@@ -0,0 +1,101 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *    http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.gobblin.service.modules.utils;
+
+import com.codahale.metrics.Meter;
+import com.codahale.metrics.MetricRegistry;
+import com.google.common.base.Optional;
+import com.google.common.collect.Maps;
+import com.typesafe.config.Config;
+import java.util.Map;
+import javax.inject.Inject;
+import javax.inject.Singleton;
+import lombok.Data;
+import lombok.Setter;
+import org.apache.gobblin.configuration.ConfigurationKeys;
+import org.apache.gobblin.instrumented.Instrumented;
+import org.apache.gobblin.metrics.ContextAwareGauge;
+import org.apache.gobblin.metrics.MetricContext;
+import org.apache.gobblin.metrics.RootMetricContext;
+import org.apache.gobblin.metrics.ServiceMetricNames;
+import org.apache.gobblin.runtime.api.Spec;
+import org.apache.gobblin.service.modules.orchestration.Orchestrator;
+import org.apache.gobblin.util.ConfigUtils;
+
+
+/**
+ * Class to store flow related metrics shared between the {@link Orchestrator} 
and {@link DagManager} so we can easily
+ * track all flow compilations and skipped flows handled between the two in a 
common place.
+ */
+@Singleton
+@Data
+public class SharedFlowMetricsContainer {
+  protected final MetricContext metricContext;
+  private Map<String, FlowCompiledState> flowGauges = Maps.newHashMap();

Review Comment:
   everywhere below, I see `spec.getUri().toString()`... can't we simply use a 
URI as this map's key, as doing so additionally carries richer semantics
   
   also: up to you, but my naming convention here would be 
`flowGaugeStateBySpecUri`



##########
gobblin-service/src/main/java/org/apache/gobblin/service/modules/utils/ExecutionChecksUtil.java:
##########
@@ -0,0 +1,185 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *    http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.gobblin.service.modules.utils;
+
+import com.google.common.base.Optional;
+import com.typesafe.config.Config;
+import java.io.IOException;
+import java.util.Map;
+import org.apache.gobblin.configuration.ConfigurationKeys;
+import org.apache.gobblin.instrumented.Instrumented;
+import org.apache.gobblin.metrics.event.EventSubmitter;
+import org.apache.gobblin.metrics.event.TimingEvent;
+import org.apache.gobblin.runtime.api.FlowSpec;
+import org.apache.gobblin.runtime.api.Spec;
+import org.apache.gobblin.service.modules.flow.SpecCompiler;
+import org.apache.gobblin.service.modules.flowgraph.Dag;
+import org.apache.gobblin.service.modules.orchestration.TimingEventUtils;
+import org.apache.gobblin.service.modules.orchestration.UserQuotaManager;
+import org.apache.gobblin.service.modules.spec.JobExecutionPlan;
+import org.apache.gobblin.service.monitoring.FlowStatusGenerator;
+import org.apache.gobblin.util.ConfigUtils;
+import org.slf4j.Logger;
+
+
+/**
+ * Stateless class with functionality meant to be re-used between the 
DagManager and Orchestrator when launching
+ * executions of a flow spec. In the common case, the Orchestrator receives a 
flow to orchestrate, performs necessary
+ * validations, and forwards the execution responsibility to the DagManager. 
The DagManager's responsibility is to
+ * carry out any flow action requests. However, with launch executions now 
being stored in the DagActionStateStore, on
+ * restart or leadership change the DagManager has to perform validations 
before executing any launch actions the
+ * previous leader was unable to complete. Rather than duplicating the code or 
introducing a circular dependency between
+ * the DagManager and Orchestrator, this class is utilized to store the common 
functionality. It is stateless and
+ * requires all stateful pieces to be passed as input from the caller.
+ * Note: We expect further refactoring to be done to the DagManager in later 
stage of multi-active development so we do
+ * not attempt major reorganization as abstractions may change.
+ */
+public final class ExecutionChecksUtil {
+
+  /**
+   * For a given a flowSpec, verifies that an execution is allowed (in case 
there is an ongoing execution) and the
+   * flowspec can be compiled. If the pre-conditions hold, then a 
JobExecutionPlan is constructed and returned to the
+   * caller.
+   * @return jobExecutionPlan dag if one can be constructed for the given 
flowSpec
+   */
+  public static Optional<Dag<JobExecutionPlan>> handleChecksBeforeExecution(
+      SharedFlowMetricsContainer sharedFlowMetricsContainer, SpecCompiler 
specCompiler, UserQuotaManager quotaManager,
+      Optional<EventSubmitter> eventSubmitter, FlowStatusGenerator 
flowStatusGenerator, Logger log,
+      boolean flowConcurrencyFlag, FlowSpec flowSpec)
+      throws IOException, InterruptedException {
+    Config flowConfig = flowSpec.getConfig();
+    String flowGroup = flowConfig.getString(ConfigurationKeys.FLOW_GROUP_KEY);
+    String flowName = flowConfig.getString(ConfigurationKeys.FLOW_NAME_KEY);
+    if (!isExecutionPermittedHandler(sharedFlowMetricsContainer, specCompiler, 
quotaManager, eventSubmitter,
+        flowStatusGenerator, log, flowConcurrencyFlag, flowConfig, flowSpec, 
flowName, flowGroup)) {
+      return Optional.absent();
+    }
+
+    //Wait for the SpecCompiler to become healthy.
+    specCompiler.awaitHealthy();
+
+    Dag<JobExecutionPlan> jobExecutionPlanDag = 
specCompiler.compileFlow(flowSpec);
+    Optional<TimingEvent> flowCompilationTimer =
+        eventSubmitter.transform(submitter -> new TimingEvent(submitter, 
TimingEvent.FlowTimings.FLOW_COMPILED));
+    Map<String, String> flowMetadata = 
TimingEventUtils.getFlowMetadata(flowSpec);
+
+    if (jobExecutionPlanDag == null || jobExecutionPlanDag.isEmpty()) {
+      populateFlowCompilationFailedEventMessage(eventSubmitter, flowSpec, 
flowMetadata);
+      return Optional.absent();
+    }
+
+    addFlowExecutionIdIfAbsent(flowMetadata, jobExecutionPlanDag);
+    if (flowCompilationTimer.isPresent()) {
+      flowCompilationTimer.get().stop(flowMetadata);
+    }
+    return Optional.of(jobExecutionPlanDag);
+  }
+
+  /**
+   * Checks if flowSpec disallows concurrent executions, and if so then checks 
if another instance of the flow is
+   * already running and emits a FLOW FAILED event. Otherwise, this check 
passes.
+   * @return true if caller can proceed to execute flow, false otherwise
+   * @throws IOException
+   */
+  public static boolean isExecutionPermittedHandler(SharedFlowMetricsContainer 
sharedFlowMetricsContainer,
+      SpecCompiler specCompiler, UserQuotaManager quotaManager, 
Optional<EventSubmitter> eventSubmitter,
+      FlowStatusGenerator flowStatusGenerator, Logger log, boolean 
flowConcurrencyFlag, Config flowConfig, Spec spec,
+      String flowName, String flowGroup) throws IOException {
+    boolean allowConcurrentExecution = ConfigUtils.getBoolean(flowConfig,
+        ConfigurationKeys.FLOW_ALLOW_CONCURRENT_EXECUTION, 
flowConcurrencyFlag);
+
+    Dag<JobExecutionPlan> jobExecutionPlanDag = specCompiler.compileFlow(spec);
+
+    if (!isExecutionPermitted(flowStatusGenerator, flowName, flowGroup, 
allowConcurrentExecution)) {
+      log.warn("Another instance of flowGroup: {}, flowName: {} running; 
Skipping flow execution since "
+          + "concurrent executions are disabled for this flow.", flowGroup, 
flowName);
+      sharedFlowMetricsContainer.conditionallyUpdateFlowGaugeSpecState(spec, 
SharedFlowMetricsContainer.CompiledState.SKIPPED);
+      
Instrumented.markMeter(sharedFlowMetricsContainer.getSkippedFlowsMeter());
+      if (!((FlowSpec) 
spec).getConfigAsProperties().containsKey(ConfigurationKeys.JOB_SCHEDULE_KEY)) {

Review Comment:
   NBD... but I do imagine it being clearer to read:
   ```
   if (isScheduledFlow(spec)) ...
   ```



##########
gobblin-service/src/main/java/org/apache/gobblin/service/modules/utils/ExecutionChecksUtil.java:
##########
@@ -0,0 +1,185 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *    http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.gobblin.service.modules.utils;
+
+import com.google.common.base.Optional;
+import com.typesafe.config.Config;
+import java.io.IOException;
+import java.util.Map;
+import org.apache.gobblin.configuration.ConfigurationKeys;
+import org.apache.gobblin.instrumented.Instrumented;
+import org.apache.gobblin.metrics.event.EventSubmitter;
+import org.apache.gobblin.metrics.event.TimingEvent;
+import org.apache.gobblin.runtime.api.FlowSpec;
+import org.apache.gobblin.runtime.api.Spec;
+import org.apache.gobblin.service.modules.flow.SpecCompiler;
+import org.apache.gobblin.service.modules.flowgraph.Dag;
+import org.apache.gobblin.service.modules.orchestration.TimingEventUtils;
+import org.apache.gobblin.service.modules.orchestration.UserQuotaManager;
+import org.apache.gobblin.service.modules.spec.JobExecutionPlan;
+import org.apache.gobblin.service.monitoring.FlowStatusGenerator;
+import org.apache.gobblin.util.ConfigUtils;
+import org.slf4j.Logger;
+
+
+/**
+ * Stateless class with functionality meant to be re-used between the 
DagManager and Orchestrator when launching
+ * executions of a flow spec. In the common case, the Orchestrator receives a 
flow to orchestrate, performs necessary
+ * validations, and forwards the execution responsibility to the DagManager. 
The DagManager's responsibility is to
+ * carry out any flow action requests. However, with launch executions now 
being stored in the DagActionStateStore, on
+ * restart or leadership change the DagManager has to perform validations 
before executing any launch actions the
+ * previous leader was unable to complete. Rather than duplicating the code or 
introducing a circular dependency between
+ * the DagManager and Orchestrator, this class is utilized to store the common 
functionality. It is stateless and
+ * requires all stateful pieces to be passed as input from the caller.
+ * Note: We expect further refactoring to be done to the DagManager in later 
stage of multi-active development so we do
+ * not attempt major reorganization as abstractions may change.
+ */
+public final class ExecutionChecksUtil {
+
+  /**
+   * For a given a flowSpec, verifies that an execution is allowed (in case 
there is an ongoing execution) and the
+   * flowspec can be compiled. If the pre-conditions hold, then a 
JobExecutionPlan is constructed and returned to the
+   * caller.
+   * @return jobExecutionPlan dag if one can be constructed for the given 
flowSpec
+   */
+  public static Optional<Dag<JobExecutionPlan>> handleChecksBeforeExecution(
+      SharedFlowMetricsContainer sharedFlowMetricsContainer, SpecCompiler 
specCompiler, UserQuotaManager quotaManager,
+      Optional<EventSubmitter> eventSubmitter, FlowStatusGenerator 
flowStatusGenerator, Logger log,
+      boolean flowConcurrencyFlag, FlowSpec flowSpec)
+      throws IOException, InterruptedException {
+    Config flowConfig = flowSpec.getConfig();
+    String flowGroup = flowConfig.getString(ConfigurationKeys.FLOW_GROUP_KEY);
+    String flowName = flowConfig.getString(ConfigurationKeys.FLOW_NAME_KEY);
+    if (!isExecutionPermittedHandler(sharedFlowMetricsContainer, specCompiler, 
quotaManager, eventSubmitter,
+        flowStatusGenerator, log, flowConcurrencyFlag, flowConfig, flowSpec, 
flowName, flowGroup)) {
+      return Optional.absent();
+    }
+
+    //Wait for the SpecCompiler to become healthy.
+    specCompiler.awaitHealthy();
+
+    Dag<JobExecutionPlan> jobExecutionPlanDag = 
specCompiler.compileFlow(flowSpec);
+    Optional<TimingEvent> flowCompilationTimer =
+        eventSubmitter.transform(submitter -> new TimingEvent(submitter, 
TimingEvent.FlowTimings.FLOW_COMPILED));
+    Map<String, String> flowMetadata = 
TimingEventUtils.getFlowMetadata(flowSpec);
+
+    if (jobExecutionPlanDag == null || jobExecutionPlanDag.isEmpty()) {
+      populateFlowCompilationFailedEventMessage(eventSubmitter, flowSpec, 
flowMetadata);
+      return Optional.absent();
+    }
+
+    addFlowExecutionIdIfAbsent(flowMetadata, jobExecutionPlanDag);
+    if (flowCompilationTimer.isPresent()) {
+      flowCompilationTimer.get().stop(flowMetadata);
+    }
+    return Optional.of(jobExecutionPlanDag);
+  }
+
+  /**
+   * Checks if flowSpec disallows concurrent executions, and if so then checks 
if another instance of the flow is
+   * already running and emits a FLOW FAILED event. Otherwise, this check 
passes.
+   * @return true if caller can proceed to execute flow, false otherwise
+   * @throws IOException
+   */
+  public static boolean isExecutionPermittedHandler(SharedFlowMetricsContainer 
sharedFlowMetricsContainer,
+      SpecCompiler specCompiler, UserQuotaManager quotaManager, 
Optional<EventSubmitter> eventSubmitter,
+      FlowStatusGenerator flowStatusGenerator, Logger log, boolean 
flowConcurrencyFlag, Config flowConfig, Spec spec,
+      String flowName, String flowGroup) throws IOException {
+    boolean allowConcurrentExecution = ConfigUtils.getBoolean(flowConfig,
+        ConfigurationKeys.FLOW_ALLOW_CONCURRENT_EXECUTION, 
flowConcurrencyFlag);
+
+    Dag<JobExecutionPlan> jobExecutionPlanDag = specCompiler.compileFlow(spec);
+
+    if (!isExecutionPermitted(flowStatusGenerator, flowName, flowGroup, 
allowConcurrentExecution)) {
+      log.warn("Another instance of flowGroup: {}, flowName: {} running; 
Skipping flow execution since "
+          + "concurrent executions are disabled for this flow.", flowGroup, 
flowName);

Review Comment:
   (I notice we log them as group, then name--so that may be what maintainers 
become most familiar w/)



##########
gobblin-service/src/main/java/org/apache/gobblin/service/modules/utils/ExecutionChecksUtil.java:
##########
@@ -0,0 +1,185 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *    http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.gobblin.service.modules.utils;
+
+import com.google.common.base.Optional;
+import com.typesafe.config.Config;
+import java.io.IOException;
+import java.util.Map;
+import org.apache.gobblin.configuration.ConfigurationKeys;
+import org.apache.gobblin.instrumented.Instrumented;
+import org.apache.gobblin.metrics.event.EventSubmitter;
+import org.apache.gobblin.metrics.event.TimingEvent;
+import org.apache.gobblin.runtime.api.FlowSpec;
+import org.apache.gobblin.runtime.api.Spec;
+import org.apache.gobblin.service.modules.flow.SpecCompiler;
+import org.apache.gobblin.service.modules.flowgraph.Dag;
+import org.apache.gobblin.service.modules.orchestration.TimingEventUtils;
+import org.apache.gobblin.service.modules.orchestration.UserQuotaManager;
+import org.apache.gobblin.service.modules.spec.JobExecutionPlan;
+import org.apache.gobblin.service.monitoring.FlowStatusGenerator;
+import org.apache.gobblin.util.ConfigUtils;
+import org.slf4j.Logger;
+
+
+/**
+ * Stateless class with functionality meant to be re-used between the 
DagManager and Orchestrator when launching
+ * executions of a flow spec. In the common case, the Orchestrator receives a 
flow to orchestrate, performs necessary
+ * validations, and forwards the execution responsibility to the DagManager. 
The DagManager's responsibility is to
+ * carry out any flow action requests. However, with launch executions now 
being stored in the DagActionStateStore, on
+ * restart or leadership change the DagManager has to perform validations 
before executing any launch actions the
+ * previous leader was unable to complete. Rather than duplicating the code or 
introducing a circular dependency between
+ * the DagManager and Orchestrator, this class is utilized to store the common 
functionality. It is stateless and
+ * requires all stateful pieces to be passed as input from the caller.
+ * Note: We expect further refactoring to be done to the DagManager in later 
stage of multi-active development so we do
+ * not attempt major reorganization as abstractions may change.
+ */
+public final class ExecutionChecksUtil {
+
+  /**
+   * For a given a flowSpec, verifies that an execution is allowed (in case 
there is an ongoing execution) and the
+   * flowspec can be compiled. If the pre-conditions hold, then a 
JobExecutionPlan is constructed and returned to the
+   * caller.
+   * @return jobExecutionPlan dag if one can be constructed for the given 
flowSpec
+   */
+  public static Optional<Dag<JobExecutionPlan>> handleChecksBeforeExecution(
+      SharedFlowMetricsContainer sharedFlowMetricsContainer, SpecCompiler 
specCompiler, UserQuotaManager quotaManager,
+      Optional<EventSubmitter> eventSubmitter, FlowStatusGenerator 
flowStatusGenerator, Logger log,
+      boolean flowConcurrencyFlag, FlowSpec flowSpec)
+      throws IOException, InterruptedException {
+    Config flowConfig = flowSpec.getConfig();
+    String flowGroup = flowConfig.getString(ConfigurationKeys.FLOW_GROUP_KEY);
+    String flowName = flowConfig.getString(ConfigurationKeys.FLOW_NAME_KEY);
+    if (!isExecutionPermittedHandler(sharedFlowMetricsContainer, specCompiler, 
quotaManager, eventSubmitter,
+        flowStatusGenerator, log, flowConcurrencyFlag, flowConfig, flowSpec, 
flowName, flowGroup)) {
+      return Optional.absent();
+    }
+
+    //Wait for the SpecCompiler to become healthy.
+    specCompiler.awaitHealthy();
+
+    Dag<JobExecutionPlan> jobExecutionPlanDag = 
specCompiler.compileFlow(flowSpec);
+    Optional<TimingEvent> flowCompilationTimer =
+        eventSubmitter.transform(submitter -> new TimingEvent(submitter, 
TimingEvent.FlowTimings.FLOW_COMPILED));
+    Map<String, String> flowMetadata = 
TimingEventUtils.getFlowMetadata(flowSpec);
+
+    if (jobExecutionPlanDag == null || jobExecutionPlanDag.isEmpty()) {
+      populateFlowCompilationFailedEventMessage(eventSubmitter, flowSpec, 
flowMetadata);
+      return Optional.absent();
+    }
+
+    addFlowExecutionIdIfAbsent(flowMetadata, jobExecutionPlanDag);
+    if (flowCompilationTimer.isPresent()) {
+      flowCompilationTimer.get().stop(flowMetadata);
+    }
+    return Optional.of(jobExecutionPlanDag);
+  }
+
+  /**
+   * Checks if flowSpec disallows concurrent executions, and if so then checks 
if another instance of the flow is
+   * already running and emits a FLOW FAILED event. Otherwise, this check 
passes.
+   * @return true if caller can proceed to execute flow, false otherwise
+   * @throws IOException
+   */
+  public static boolean isExecutionPermittedHandler(SharedFlowMetricsContainer 
sharedFlowMetricsContainer,
+      SpecCompiler specCompiler, UserQuotaManager quotaManager, 
Optional<EventSubmitter> eventSubmitter,
+      FlowStatusGenerator flowStatusGenerator, Logger log, boolean 
flowConcurrencyFlag, Config flowConfig, Spec spec,
+      String flowName, String flowGroup) throws IOException {
+    boolean allowConcurrentExecution = ConfigUtils.getBoolean(flowConfig,
+        ConfigurationKeys.FLOW_ALLOW_CONCURRENT_EXECUTION, 
flowConcurrencyFlag);
+
+    Dag<JobExecutionPlan> jobExecutionPlanDag = specCompiler.compileFlow(spec);
+
+    if (!isExecutionPermitted(flowStatusGenerator, flowName, flowGroup, 
allowConcurrentExecution)) {
+      log.warn("Another instance of flowGroup: {}, flowName: {} running; 
Skipping flow execution since "
+          + "concurrent executions are disabled for this flow.", flowGroup, 
flowName);
+      sharedFlowMetricsContainer.conditionallyUpdateFlowGaugeSpecState(spec, 
SharedFlowMetricsContainer.CompiledState.SKIPPED);
+      
Instrumented.markMeter(sharedFlowMetricsContainer.getSkippedFlowsMeter());
+      if (!((FlowSpec) 
spec).getConfigAsProperties().containsKey(ConfigurationKeys.JOB_SCHEDULE_KEY)) {
+        // For ad-hoc flow, we might already increase quota, we need to 
decrease here
+        for (Dag.DagNode dagNode : jobExecutionPlanDag.getStartNodes()) {
+          quotaManager.releaseQuota(dagNode);
+        }
+      }
+
+      // Send FLOW_FAILED event
+      Map<String, String> flowMetadata = 
TimingEventUtils.getFlowMetadata((FlowSpec) spec);
+      flowMetadata.put(TimingEvent.METADATA_MESSAGE, "Flow failed because 
another instance is running and concurrent "
+          + "executions are disabled. Set flow.allowConcurrentExecution to 
true in the flow spec to change this behaviour.");
+      if (eventSubmitter.isPresent()) {
+        new TimingEvent(eventSubmitter.get(), 
TimingEvent.FlowTimings.FLOW_FAILED).stop(flowMetadata);
+      }
+      return false;
+    }
+    return true;
+  }
+
+  /**
+   * Check if a FlowSpec instance is allowed to run.
+   *
+   * @param flowName
+   * @param flowGroup
+   * @param allowConcurrentExecution
+   * @return true if the {@link FlowSpec} allows concurrent executions or if 
no other instance of the flow is currently RUNNING.
+   */
+  private static boolean isExecutionPermitted(FlowStatusGenerator 
flowStatusGenerator, String flowName,
+      String flowGroup, boolean allowConcurrentExecution) {
+    if (allowConcurrentExecution) {
+      return true;
+    } else {
+      return !flowStatusGenerator.isFlowRunning(flowName, flowGroup);
+    }

Review Comment:
   nit:
   ```
   return allowConcurrent || !fsg.isFlowRunning(...)
   ```



##########
gobblin-service/src/main/java/org/apache/gobblin/service/modules/utils/SharedFlowMetricsContainer.java:
##########
@@ -0,0 +1,101 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *    http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.gobblin.service.modules.utils;
+
+import com.codahale.metrics.Meter;
+import com.codahale.metrics.MetricRegistry;
+import com.google.common.base.Optional;
+import com.google.common.collect.Maps;
+import com.typesafe.config.Config;
+import java.util.Map;
+import javax.inject.Inject;
+import javax.inject.Singleton;
+import lombok.Data;
+import lombok.Setter;
+import org.apache.gobblin.configuration.ConfigurationKeys;
+import org.apache.gobblin.instrumented.Instrumented;
+import org.apache.gobblin.metrics.ContextAwareGauge;
+import org.apache.gobblin.metrics.MetricContext;
+import org.apache.gobblin.metrics.RootMetricContext;
+import org.apache.gobblin.metrics.ServiceMetricNames;
+import org.apache.gobblin.runtime.api.Spec;
+import org.apache.gobblin.service.modules.orchestration.Orchestrator;
+import org.apache.gobblin.util.ConfigUtils;
+
+
+/**
+ * Class to store flow related metrics shared between the {@link Orchestrator} 
and {@link DagManager} so we can easily
+ * track all flow compilations and skipped flows handled between the two in a 
common place.
+ */
+@Singleton
+@Data
+public class SharedFlowMetricsContainer {
+  protected final MetricContext metricContext;
+  private Map<String, FlowCompiledState> flowGauges = Maps.newHashMap();
+  private Optional<Meter> skippedFlowsMeter;
+
+  @Inject
+  public SharedFlowMetricsContainer(Config config) {
+    this.metricContext = 
Instrumented.getMetricContext(ConfigUtils.configToState(config), 
SharedFlowMetricsContainer.class);
+    this.skippedFlowsMeter = 
Optional.of(metricContext.contextAwareMeter(ServiceMetricNames.SKIPPED_FLOWS));
+  }
+
+  /**
+   * Adds a new FlowGauge to the metric context if one does not already exist 
for this flow spec
+   */
+  public void addFlowGauge(Spec spec, Config flowConfig, String flowName, 
String flowGroup) {
+    // Only register the metric of flows that are scheduled, run once flows 
should not be tracked indefinitely
+    if (!flowGauges.containsKey(spec.getUri().toString()) && 
flowConfig.hasPath(ConfigurationKeys.JOB_SCHEDULE_KEY)) {
+      String flowCompiledGaugeName =
+          MetricRegistry.name(ServiceMetricNames.GOBBLIN_SERVICE_PREFIX, 
flowGroup, flowName, ServiceMetricNames.COMPILED);
+      flowGauges.put(spec.getUri().toString(), new FlowCompiledState());
+      ContextAwareGauge<Integer> gauge =
+          RootMetricContext.get().newContextAwareGauge(flowCompiledGaugeName,
+              () -> flowGauges.get(spec.getUri().toString()).state.value);
+      RootMetricContext.get().register(flowCompiledGaugeName, gauge);
+    }
+  }
+  /**
+   * Updates the flowgauge related to the spec if the gauge is being tracked 
for the flow
+   * @param spec FlowSpec to be updated
+   * @param state desired state to set the gauge
+   */
+  public void conditionallyUpdateFlowGaugeSpecState(Spec spec, CompiledState 
state) {
+    if (flowGauges.containsKey(spec.getUri().toString())) {
+      flowGauges.get(spec.getUri().toString()).setState(state);
+    }
+  }
+
+  @Setter
+  public static class FlowCompiledState {

Review Comment:
   nit: given this is mentioned above, I'd suggest introducing beforehand, by 
moving this def to the top of the enclosing class



##########
gobblin-service/src/main/java/org/apache/gobblin/service/modules/utils/ExecutionChecksUtil.java:
##########
@@ -0,0 +1,185 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *    http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.gobblin.service.modules.utils;
+
+import com.google.common.base.Optional;
+import com.typesafe.config.Config;
+import java.io.IOException;
+import java.util.Map;
+import org.apache.gobblin.configuration.ConfigurationKeys;
+import org.apache.gobblin.instrumented.Instrumented;
+import org.apache.gobblin.metrics.event.EventSubmitter;
+import org.apache.gobblin.metrics.event.TimingEvent;
+import org.apache.gobblin.runtime.api.FlowSpec;
+import org.apache.gobblin.runtime.api.Spec;
+import org.apache.gobblin.service.modules.flow.SpecCompiler;
+import org.apache.gobblin.service.modules.flowgraph.Dag;
+import org.apache.gobblin.service.modules.orchestration.TimingEventUtils;
+import org.apache.gobblin.service.modules.orchestration.UserQuotaManager;
+import org.apache.gobblin.service.modules.spec.JobExecutionPlan;
+import org.apache.gobblin.service.monitoring.FlowStatusGenerator;
+import org.apache.gobblin.util.ConfigUtils;
+import org.slf4j.Logger;
+
+
+/**
+ * Stateless class with functionality meant to be re-used between the 
DagManager and Orchestrator when launching
+ * executions of a flow spec. In the common case, the Orchestrator receives a 
flow to orchestrate, performs necessary
+ * validations, and forwards the execution responsibility to the DagManager. 
The DagManager's responsibility is to
+ * carry out any flow action requests. However, with launch executions now 
being stored in the DagActionStateStore, on
+ * restart or leadership change the DagManager has to perform validations 
before executing any launch actions the
+ * previous leader was unable to complete. Rather than duplicating the code or 
introducing a circular dependency between
+ * the DagManager and Orchestrator, this class is utilized to store the common 
functionality. It is stateless and
+ * requires all stateful pieces to be passed as input from the caller.
+ * Note: We expect further refactoring to be done to the DagManager in later 
stage of multi-active development so we do
+ * not attempt major reorganization as abstractions may change.
+ */
+public final class ExecutionChecksUtil {
+
+  /**
+   * For a given a flowSpec, verifies that an execution is allowed (in case 
there is an ongoing execution) and the
+   * flowspec can be compiled. If the pre-conditions hold, then a 
JobExecutionPlan is constructed and returned to the
+   * caller.
+   * @return jobExecutionPlan dag if one can be constructed for the given 
flowSpec
+   */
+  public static Optional<Dag<JobExecutionPlan>> handleChecksBeforeExecution(
+      SharedFlowMetricsContainer sharedFlowMetricsContainer, SpecCompiler 
specCompiler, UserQuotaManager quotaManager,
+      Optional<EventSubmitter> eventSubmitter, FlowStatusGenerator 
flowStatusGenerator, Logger log,
+      boolean flowConcurrencyFlag, FlowSpec flowSpec)
+      throws IOException, InterruptedException {
+    Config flowConfig = flowSpec.getConfig();
+    String flowGroup = flowConfig.getString(ConfigurationKeys.FLOW_GROUP_KEY);
+    String flowName = flowConfig.getString(ConfigurationKeys.FLOW_NAME_KEY);
+    if (!isExecutionPermittedHandler(sharedFlowMetricsContainer, specCompiler, 
quotaManager, eventSubmitter,
+        flowStatusGenerator, log, flowConcurrencyFlag, flowConfig, flowSpec, 
flowName, flowGroup)) {
+      return Optional.absent();
+    }
+
+    //Wait for the SpecCompiler to become healthy.
+    specCompiler.awaitHealthy();
+
+    Dag<JobExecutionPlan> jobExecutionPlanDag = 
specCompiler.compileFlow(flowSpec);

Review Comment:
   is this different from what just happened inside 
`isExecutionPermittedHandler`?  if not, rather than repeat, let's return 
`Optional<Dag<JobExecutionPlan>>` from the latter, rather than the boolean.  if 
there's a reason NOT to do that, then please add a comment here, describing the 
pitfall, so a maintainer gets a clear idea of what to watch out, before 
potentially trying that



##########
gobblin-service/src/main/java/org/apache/gobblin/service/modules/utils/ExecutionChecksUtil.java:
##########
@@ -0,0 +1,185 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *    http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.gobblin.service.modules.utils;
+
+import com.google.common.base.Optional;
+import com.typesafe.config.Config;
+import java.io.IOException;
+import java.util.Map;
+import org.apache.gobblin.configuration.ConfigurationKeys;
+import org.apache.gobblin.instrumented.Instrumented;
+import org.apache.gobblin.metrics.event.EventSubmitter;
+import org.apache.gobblin.metrics.event.TimingEvent;
+import org.apache.gobblin.runtime.api.FlowSpec;
+import org.apache.gobblin.runtime.api.Spec;
+import org.apache.gobblin.service.modules.flow.SpecCompiler;
+import org.apache.gobblin.service.modules.flowgraph.Dag;
+import org.apache.gobblin.service.modules.orchestration.TimingEventUtils;
+import org.apache.gobblin.service.modules.orchestration.UserQuotaManager;
+import org.apache.gobblin.service.modules.spec.JobExecutionPlan;
+import org.apache.gobblin.service.monitoring.FlowStatusGenerator;
+import org.apache.gobblin.util.ConfigUtils;
+import org.slf4j.Logger;
+
+
+/**
+ * Stateless class with functionality meant to be re-used between the 
DagManager and Orchestrator when launching
+ * executions of a flow spec. In the common case, the Orchestrator receives a 
flow to orchestrate, performs necessary
+ * validations, and forwards the execution responsibility to the DagManager. 
The DagManager's responsibility is to
+ * carry out any flow action requests. However, with launch executions now 
being stored in the DagActionStateStore, on
+ * restart or leadership change the DagManager has to perform validations 
before executing any launch actions the
+ * previous leader was unable to complete. Rather than duplicating the code or 
introducing a circular dependency between
+ * the DagManager and Orchestrator, this class is utilized to store the common 
functionality. It is stateless and
+ * requires all stateful pieces to be passed as input from the caller.
+ * Note: We expect further refactoring to be done to the DagManager in later 
stage of multi-active development so we do
+ * not attempt major reorganization as abstractions may change.
+ */
+public final class ExecutionChecksUtil {
+
+  /**
+   * For a given a flowSpec, verifies that an execution is allowed (in case 
there is an ongoing execution) and the
+   * flowspec can be compiled. If the pre-conditions hold, then a 
JobExecutionPlan is constructed and returned to the
+   * caller.
+   * @return jobExecutionPlan dag if one can be constructed for the given 
flowSpec
+   */
+  public static Optional<Dag<JobExecutionPlan>> handleChecksBeforeExecution(
+      SharedFlowMetricsContainer sharedFlowMetricsContainer, SpecCompiler 
specCompiler, UserQuotaManager quotaManager,
+      Optional<EventSubmitter> eventSubmitter, FlowStatusGenerator 
flowStatusGenerator, Logger log,
+      boolean flowConcurrencyFlag, FlowSpec flowSpec)
+      throws IOException, InterruptedException {
+    Config flowConfig = flowSpec.getConfig();
+    String flowGroup = flowConfig.getString(ConfigurationKeys.FLOW_GROUP_KEY);
+    String flowName = flowConfig.getString(ConfigurationKeys.FLOW_NAME_KEY);
+    if (!isExecutionPermittedHandler(sharedFlowMetricsContainer, specCompiler, 
quotaManager, eventSubmitter,
+        flowStatusGenerator, log, flowConcurrencyFlag, flowConfig, flowSpec, 
flowName, flowGroup)) {
+      return Optional.absent();
+    }
+
+    //Wait for the SpecCompiler to become healthy.
+    specCompiler.awaitHealthy();
+
+    Dag<JobExecutionPlan> jobExecutionPlanDag = 
specCompiler.compileFlow(flowSpec);
+    Optional<TimingEvent> flowCompilationTimer =
+        eventSubmitter.transform(submitter -> new TimingEvent(submitter, 
TimingEvent.FlowTimings.FLOW_COMPILED));
+    Map<String, String> flowMetadata = 
TimingEventUtils.getFlowMetadata(flowSpec);
+
+    if (jobExecutionPlanDag == null || jobExecutionPlanDag.isEmpty()) {
+      populateFlowCompilationFailedEventMessage(eventSubmitter, flowSpec, 
flowMetadata);
+      return Optional.absent();
+    }
+
+    addFlowExecutionIdIfAbsent(flowMetadata, jobExecutionPlanDag);
+    if (flowCompilationTimer.isPresent()) {
+      flowCompilationTimer.get().stop(flowMetadata);
+    }
+    return Optional.of(jobExecutionPlanDag);
+  }
+
+  /**
+   * Checks if flowSpec disallows concurrent executions, and if so then checks 
if another instance of the flow is
+   * already running and emits a FLOW FAILED event. Otherwise, this check 
passes.
+   * @return true if caller can proceed to execute flow, false otherwise
+   * @throws IOException
+   */
+  public static boolean isExecutionPermittedHandler(SharedFlowMetricsContainer 
sharedFlowMetricsContainer,

Review Comment:
   nit: a "handler" is usually a class or a func/method reference invoked as a 
callback.  this also isn't really a predicate (e.g. `isXYZ`), as it performs so 
many side-effects, like quota mgmt, etc.
   
   maybe `validateExecutionOrFinalizeRejection`?  (sorry, you're likely to find 
a better name)



##########
gobblin-service/src/main/java/org/apache/gobblin/service/modules/utils/ExecutionChecksUtil.java:
##########
@@ -0,0 +1,185 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *    http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.gobblin.service.modules.utils;
+
+import com.google.common.base.Optional;
+import com.typesafe.config.Config;
+import java.io.IOException;
+import java.util.Map;
+import org.apache.gobblin.configuration.ConfigurationKeys;
+import org.apache.gobblin.instrumented.Instrumented;
+import org.apache.gobblin.metrics.event.EventSubmitter;
+import org.apache.gobblin.metrics.event.TimingEvent;
+import org.apache.gobblin.runtime.api.FlowSpec;
+import org.apache.gobblin.runtime.api.Spec;
+import org.apache.gobblin.service.modules.flow.SpecCompiler;
+import org.apache.gobblin.service.modules.flowgraph.Dag;
+import org.apache.gobblin.service.modules.orchestration.TimingEventUtils;
+import org.apache.gobblin.service.modules.orchestration.UserQuotaManager;
+import org.apache.gobblin.service.modules.spec.JobExecutionPlan;
+import org.apache.gobblin.service.monitoring.FlowStatusGenerator;
+import org.apache.gobblin.util.ConfigUtils;
+import org.slf4j.Logger;
+
+
+/**
+ * Stateless class with functionality meant to be re-used between the 
DagManager and Orchestrator when launching
+ * executions of a flow spec. In the common case, the Orchestrator receives a 
flow to orchestrate, performs necessary
+ * validations, and forwards the execution responsibility to the DagManager. 
The DagManager's responsibility is to
+ * carry out any flow action requests. However, with launch executions now 
being stored in the DagActionStateStore, on
+ * restart or leadership change the DagManager has to perform validations 
before executing any launch actions the
+ * previous leader was unable to complete. Rather than duplicating the code or 
introducing a circular dependency between
+ * the DagManager and Orchestrator, this class is utilized to store the common 
functionality. It is stateless and
+ * requires all stateful pieces to be passed as input from the caller.
+ * Note: We expect further refactoring to be done to the DagManager in later 
stage of multi-active development so we do
+ * not attempt major reorganization as abstractions may change.
+ */
+public final class ExecutionChecksUtil {

Review Comment:
   `FlowCompilationValidationHelper`?
   
   also, I completely agree w/ the statelessness (and `final`ity) of this 
abstraction, but just for comprehensibility I'd suggest, creating a ctor merely 
to separate "the args that stay the same" from "the changing ones", to put the 
latter front and center.
   
   e.g. so if not called like this:
   ```
   this.fcHelper = new FlowCompilationValidationHelper(
       sharedFlowMetricsContainer, specCompiler, quotaManager,
       optEventSubmitter, flowStatusGenerator, log, isFlowConcurrencyEnabled);
   
   ... // way later on
   Optional<Dag<JobExecutionPlan>> optDag = 
fcHelper.createExecutionPlanIfValid(flowSpec);
   ```
   then at least like this:
   ```
   Optional<Dag<JobExecutionPlan>> optDag = new FlowCompilationValidationHelper(
       sharedFlowMetricsContainer, specCompiler, quotaManager,
       optEventSubmitter, flowStatusGenerator, log, isFlowConcurrencyEnabled)
       .createExecutionPlanIfValid(flowSpec);
   ```
   
   p.s. populating the compilation failure message could remain `static`



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: [email protected]

For queries about this service, please contact Infrastructure at:
[email protected]

Reply via email to