This is an automated email from the ASF dual-hosted git repository.

wlo pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/gobblin.git


The following commit(s) were added to refs/heads/master by this push:
     new 01036ccd4 [GOBBLIN-1868] Introduce FlowCompilationValidationHelper & 
SharedFlowMetricsSingleton for sharing between Orchestrator & DagManager (#3731)
01036ccd4 is described below

commit 01036ccd45ae4ddf21a6e60acc84da70ccd097c9
Author: umustafi <[email protected]>
AuthorDate: Mon Aug 7 13:19:55 2023 -0700

    [GOBBLIN-1868] Introduce FlowCompilationValidationHelper & 
SharedFlowMetricsSingleton for sharing between Orchestrator & DagManager (#3731)
    
    * refactoring skeleton, builds
    
    * deploy works
    
    * add javadocs
    
    * fix failing test
    
    * respond to comments
    
    * Clean up in response to next review
    
    ---------
    
    Co-authored-by: Urmi Mustafi <[email protected]>
---
 .../modules/core/GobblinServiceGuiceModule.java    |   3 +
 .../service/modules/orchestration/DagManager.java  |  61 ++++--
 .../modules/orchestration/Orchestrator.java        | 243 ++++-----------------
 .../scheduler/GobblinServiceJobScheduler.java      |   7 +-
 .../utils/FlowCompilationValidationHelper.java     | 196 +++++++++++++++++
 .../modules/utils/SharedFlowMetricsSingleton.java  | 104 +++++++++
 .../modules/orchestration/DagManagerFlowTest.java  |   6 +-
 .../modules/orchestration/OrchestratorTest.java    |  34 ++-
 8 files changed, 409 insertions(+), 245 deletions(-)

diff --git 
a/gobblin-service/src/main/java/org/apache/gobblin/service/modules/core/GobblinServiceGuiceModule.java
 
b/gobblin-service/src/main/java/org/apache/gobblin/service/modules/core/GobblinServiceGuiceModule.java
index 1423e2d8c..af0c3461a 100644
--- 
a/gobblin-service/src/main/java/org/apache/gobblin/service/modules/core/GobblinServiceGuiceModule.java
+++ 
b/gobblin-service/src/main/java/org/apache/gobblin/service/modules/core/GobblinServiceGuiceModule.java
@@ -27,6 +27,7 @@ import 
org.apache.gobblin.service.modules.orchestration.FlowTriggerHandler;
 import org.apache.gobblin.service.modules.orchestration.UserQuotaManager;
 import 
org.apache.gobblin.service.modules.restli.GobblinServiceFlowConfigV2ResourceHandlerWithWarmStandby;
 import 
org.apache.gobblin.service.modules.restli.GobblinServiceFlowExecutionResourceHandlerWithWarmStandby;
+import org.apache.gobblin.service.modules.utils.SharedFlowMetricsSingleton;
 import org.apache.gobblin.service.monitoring.DagActionStoreChangeMonitor;
 import 
org.apache.gobblin.service.monitoring.DagActionStoreChangeMonitorFactory;
 import org.apache.gobblin.service.monitoring.GitConfigMonitor;
@@ -186,6 +187,8 @@ public class GobblinServiceGuiceModule implements Module {
     binder.bind(RequesterService.class)
         .to(NoopRequesterService.class);
 
+    binder.bind(SharedFlowMetricsSingleton.class);
+
     OptionalBinder.newOptionalBinder(binder, TopologyCatalog.class);
     if (serviceConfig.isTopologyCatalogEnabled()) {
       binder.bind(TopologyCatalog.class);
diff --git 
a/gobblin-service/src/main/java/org/apache/gobblin/service/modules/orchestration/DagManager.java
 
b/gobblin-service/src/main/java/org/apache/gobblin/service/modules/orchestration/DagManager.java
index 20e802bec..207c1c1dc 100644
--- 
a/gobblin-service/src/main/java/org/apache/gobblin/service/modules/orchestration/DagManager.java
+++ 
b/gobblin-service/src/main/java/org/apache/gobblin/service/modules/orchestration/DagManager.java
@@ -17,6 +17,19 @@
 
 package org.apache.gobblin.service.modules.orchestration;
 
+import com.codahale.metrics.Meter;
+import com.codahale.metrics.Timer;
+import com.google.common.base.Joiner;
+import com.google.common.base.Optional;
+import com.google.common.collect.Lists;
+import com.google.common.collect.Maps;
+import com.google.common.eventbus.Subscribe;
+import com.google.common.util.concurrent.AbstractIdleService;
+import com.google.inject.Inject;
+import com.google.inject.Singleton;
+import com.typesafe.config.Config;
+import com.typesafe.config.ConfigException;
+import com.typesafe.config.ConfigFactory;
 import java.io.IOException;
 import java.net.URI;
 import java.net.URISyntaxException;
@@ -39,25 +52,9 @@ import java.util.concurrent.LinkedBlockingDeque;
 import java.util.concurrent.ScheduledExecutorService;
 import java.util.concurrent.TimeUnit;
 import java.util.concurrent.atomic.AtomicLong;
-
-import com.codahale.metrics.Meter;
-import com.codahale.metrics.Timer;
-import com.google.common.base.Joiner;
-import com.google.common.base.Optional;
-import com.google.common.collect.Lists;
-import com.google.common.collect.Maps;
-import com.google.common.eventbus.Subscribe;
-import com.google.common.util.concurrent.AbstractIdleService;
-import com.google.inject.Inject;
-import com.google.inject.Singleton;
-import com.typesafe.config.Config;
-import com.typesafe.config.ConfigException;
-import com.typesafe.config.ConfigFactory;
-
 import lombok.EqualsAndHashCode;
 import lombok.Getter;
 import lombok.extern.slf4j.Slf4j;
-
 import org.apache.gobblin.annotation.Alpha;
 import org.apache.gobblin.configuration.ConfigurationKeys;
 import org.apache.gobblin.instrumented.Instrumented;
@@ -77,9 +74,12 @@ import org.apache.gobblin.runtime.spec_catalog.FlowCatalog;
 import org.apache.gobblin.service.ExecutionStatus;
 import org.apache.gobblin.service.FlowId;
 import org.apache.gobblin.service.ServiceConfigKeys;
+import org.apache.gobblin.service.modules.flow.SpecCompiler;
 import org.apache.gobblin.service.modules.flowgraph.Dag;
 import org.apache.gobblin.service.modules.flowgraph.Dag.DagNode;
 import org.apache.gobblin.service.modules.spec.JobExecutionPlan;
+import 
org.apache.gobblin.service.modules.utils.FlowCompilationValidationHelper;
+import org.apache.gobblin.service.modules.utils.SharedFlowMetricsSingleton;
 import org.apache.gobblin.service.monitoring.FlowStatusGenerator;
 import org.apache.gobblin.service.monitoring.JobStatus;
 import org.apache.gobblin.service.monitoring.JobStatusRetriever;
@@ -206,8 +206,12 @@ public class DagManager extends AbstractIdleService {
   protected final Long defaultJobStartSlaTimeMillis;
   @Getter
   private final JobStatusRetriever jobStatusRetriever;
-  private final Orchestrator orchestrator;
+  private final FlowStatusGenerator flowStatusGenerator;
+  private final UserQuotaManager quotaManager;
+  private final SpecCompiler specCompiler;
+  private final boolean isFlowConcurrencyEnabled;
   private final FlowCatalog flowCatalog;
+  private final FlowCompilationValidationHelper 
flowCompilationValidationHelper;
   private final Config config;
   private final Optional<EventSubmitter> eventSubmitter;
   private final long failedDagRetentionTime;
@@ -218,7 +222,8 @@ public class DagManager extends AbstractIdleService {
 
   private volatile boolean isActive = false;
 
-  public DagManager(Config config, JobStatusRetriever jobStatusRetriever, 
Orchestrator orchestrator,
+  public DagManager(Config config, JobStatusRetriever jobStatusRetriever,
+      SharedFlowMetricsSingleton sharedFlowMetricsSingleton, 
FlowStatusGenerator flowStatusGenerator,
       FlowCatalog flowCatalog, boolean instrumentationEnabled) {
     this.config = config;
     this.numThreads = ConfigUtils.getInt(config, NUM_THREADS_KEY, 
DEFAULT_NUM_THREADS);
@@ -240,8 +245,18 @@ public class DagManager extends AbstractIdleService {
     TimeUnit jobStartTimeUnit = TimeUnit.valueOf(ConfigUtils.getString(config, 
JOB_START_SLA_UNITS, 
ConfigurationKeys.FALLBACK_GOBBLIN_JOB_START_SLA_TIME_UNIT));
     this.defaultJobStartSlaTimeMillis = 
jobStartTimeUnit.toMillis(ConfigUtils.getLong(config, JOB_START_SLA_TIME, 
ConfigurationKeys.FALLBACK_GOBBLIN_JOB_START_SLA_TIME));
     this.jobStatusRetriever = jobStatusRetriever;
-    this.orchestrator = orchestrator;
+    this.flowStatusGenerator = flowStatusGenerator;
+    this.specCompiler = 
GobblinConstructorUtils.invokeConstructor(SpecCompiler.class, 
ConfigUtils.getString(config,
+        ServiceConfigKeys.GOBBLIN_SERVICE_FLOWCOMPILER_CLASS_KEY,
+        ServiceConfigKeys.DEFAULT_GOBBLIN_SERVICE_FLOWCOMPILER_CLASS), config);
+    this.isFlowConcurrencyEnabled = ConfigUtils.getBoolean(config, 
ServiceConfigKeys.FLOW_CONCURRENCY_ALLOWED,
+        ServiceConfigKeys.DEFAULT_FLOW_CONCURRENCY_ALLOWED);
+    this.quotaManager = 
GobblinConstructorUtils.invokeConstructor(UserQuotaManager.class,
+        ConfigUtils.getString(config, ServiceConfigKeys.QUOTA_MANAGER_CLASS, 
ServiceConfigKeys.DEFAULT_QUOTA_MANAGER),
+        config);
     this.flowCatalog = flowCatalog;
+    this.flowCompilationValidationHelper = new 
FlowCompilationValidationHelper(sharedFlowMetricsSingleton, specCompiler,
+        quotaManager, eventSubmitter, flowStatusGenerator, 
isFlowConcurrencyEnabled);
     TimeUnit timeUnit = TimeUnit.valueOf(ConfigUtils.getString(config, 
FAILED_DAG_RETENTION_TIME_UNIT, DEFAULT_FAILED_DAG_RETENTION_TIME_UNIT));
     this.failedDagRetentionTime = 
timeUnit.toMillis(ConfigUtils.getLong(config, FAILED_DAG_RETENTION_TIME, 
DEFAULT_FAILED_DAG_RETENTION_TIME));
   }
@@ -266,9 +281,10 @@ public class DagManager extends AbstractIdleService {
   }
 
   @Inject
-  public DagManager(Config config, JobStatusRetriever jobStatusRetriever, 
Orchestrator orchestrator,
+  public DagManager(Config config, JobStatusRetriever jobStatusRetriever,
+      SharedFlowMetricsSingleton sharedFlowMetricsSingleton, 
FlowStatusGenerator flowStatusGenerator,
       FlowCatalog flowCatalog) {
-    this(config, jobStatusRetriever, orchestrator, flowCatalog, true);
+    this(config, jobStatusRetriever, sharedFlowMetricsSingleton, 
flowStatusGenerator, flowCatalog, true);
   }
 
   /** Do Nothing on service startup. Scheduling of {@link DagManagerThread}s 
and loading of any {@link Dag}s is done
@@ -483,7 +499,8 @@ public class DagManager extends AbstractIdleService {
     try {
       URI flowUri = FlowSpec.Utils.createFlowSpecUri(flowId);
       spec = (FlowSpec) flowCatalog.getSpecs(flowUri);
-      Optional<Dag<JobExecutionPlan>> optionalJobExecutionPlanDag = 
orchestrator.handleChecksBeforeExecution(spec);
+      Optional<Dag<JobExecutionPlan>> optionalJobExecutionPlanDag =
+          
this.flowCompilationValidationHelper.createExecutionPlanIfValid(spec);
       if (optionalJobExecutionPlanDag.isPresent()) {
         addDag(optionalJobExecutionPlanDag.get(), true, true);
       }
diff --git 
a/gobblin-service/src/main/java/org/apache/gobblin/service/modules/orchestration/Orchestrator.java
 
b/gobblin-service/src/main/java/org/apache/gobblin/service/modules/orchestration/Orchestrator.java
index b059d1744..186b6d81c 100644
--- 
a/gobblin-service/src/main/java/org/apache/gobblin/service/modules/orchestration/Orchestrator.java
+++ 
b/gobblin-service/src/main/java/org/apache/gobblin/service/modules/orchestration/Orchestrator.java
@@ -17,6 +17,11 @@
 
 package org.apache.gobblin.service.modules.orchestration;
 
+import com.codahale.metrics.Meter;
+import com.codahale.metrics.Timer;
+import com.google.common.annotations.VisibleForTesting;
+import com.google.common.base.Optional;
+import com.typesafe.config.Config;
 import java.io.IOException;
 import java.lang.reflect.InvocationTargetException;
 import java.net.URI;
@@ -25,33 +30,18 @@ import java.util.List;
 import java.util.Map;
 import java.util.Properties;
 import java.util.concurrent.TimeUnit;
-
-import org.apache.commons.lang3.reflect.ConstructorUtils;
-import org.slf4j.Logger;
-import org.slf4j.LoggerFactory;
-
-import com.codahale.metrics.Meter;
-import com.codahale.metrics.MetricRegistry;
-import com.codahale.metrics.Timer;
-import com.google.common.annotations.VisibleForTesting;
-import com.google.common.base.Optional;
-import com.google.common.collect.Maps;
-import com.typesafe.config.Config;
-
 import javax.annotation.Nonnull;
 import javax.inject.Inject;
 import javax.inject.Singleton;
 import lombok.Getter;
 import lombok.Setter;
-
+import org.apache.commons.lang3.reflect.ConstructorUtils;
 import org.apache.gobblin.annotation.Alpha;
 import org.apache.gobblin.configuration.ConfigurationKeys;
 import org.apache.gobblin.configuration.State;
 import org.apache.gobblin.instrumented.Instrumentable;
 import org.apache.gobblin.instrumented.Instrumented;
-import org.apache.gobblin.metrics.ContextAwareGauge;
 import org.apache.gobblin.metrics.MetricContext;
-import org.apache.gobblin.metrics.RootMetricContext;
 import org.apache.gobblin.metrics.ServiceMetricNames;
 import org.apache.gobblin.metrics.Tag;
 import org.apache.gobblin.metrics.event.EventSubmitter;
@@ -69,10 +59,14 @@ import org.apache.gobblin.service.ServiceConfigKeys;
 import org.apache.gobblin.service.modules.flow.SpecCompiler;
 import org.apache.gobblin.service.modules.flowgraph.Dag;
 import org.apache.gobblin.service.modules.spec.JobExecutionPlan;
+import 
org.apache.gobblin.service.modules.utils.FlowCompilationValidationHelper;
+import org.apache.gobblin.service.modules.utils.SharedFlowMetricsSingleton;
 import org.apache.gobblin.service.monitoring.FlowStatusGenerator;
 import org.apache.gobblin.util.ClassAliasResolver;
 import org.apache.gobblin.util.ConfigUtils;
 import org.apache.gobblin.util.reflection.GobblinConstructorUtils;
+import org.slf4j.Logger;
+import org.slf4j.LoggerFactory;
 
 
 /**
@@ -92,33 +86,34 @@ public class Orchestrator implements SpecCatalogListener, 
Instrumentable {
   protected final MetricContext metricContext;
 
   protected final Optional<EventSubmitter> eventSubmitter;
-  private final boolean flowConcurrencyFlag;
+  private final boolean isFlowConcurrencyEnabled;
   @Getter
   private Optional<Meter> flowOrchestrationSuccessFulMeter;
   @Getter
   private Optional<Meter> flowOrchestrationFailedMeter;
   @Getter
   private Optional<Timer> flowOrchestrationTimer;
-  private Optional<Meter> skippedFlowsMeter;
   @Setter
   private FlowStatusGenerator flowStatusGenerator;
 
   private UserQuotaManager quotaManager;
+  private final FlowCompilationValidationHelper 
flowCompilationValidationHelper;
   private Optional<FlowTriggerHandler> flowTriggerHandler;
+  @Getter
+  private final SharedFlowMetricsSingleton sharedFlowMetricsSingleton;
 
   private final ClassAliasResolver<SpecCompiler> aliasResolver;
 
-  private Map<String, FlowCompiledState> flowGauges = Maps.newHashMap();
-
   public Orchestrator(Config config, Optional<TopologyCatalog> 
topologyCatalog, Optional<DagManager> dagManager,
       Optional<Logger> log, FlowStatusGenerator flowStatusGenerator, boolean 
instrumentationEnabled,
-      Optional<FlowTriggerHandler> flowTriggerHandler) {
+      Optional<FlowTriggerHandler> flowTriggerHandler, 
SharedFlowMetricsSingleton sharedFlowMetricsSingleton) {
     _log = log.isPresent() ? log.get() : LoggerFactory.getLogger(getClass());
     this.aliasResolver = new ClassAliasResolver<>(SpecCompiler.class);
     this.topologyCatalog = topologyCatalog;
     this.dagManager = dagManager;
     this.flowStatusGenerator = flowStatusGenerator;
     this.flowTriggerHandler = flowTriggerHandler;
+    this.sharedFlowMetricsSingleton = sharedFlowMetricsSingleton;
     try {
       String specCompilerClassName = 
ServiceConfigKeys.DEFAULT_GOBBLIN_SERVICE_FLOWCOMPILER_CLASS;
       if 
(config.hasPath(ServiceConfigKeys.GOBBLIN_SERVICE_FLOWCOMPILER_CLASS_KEY)) {
@@ -142,27 +137,29 @@ public class Orchestrator implements SpecCatalogListener, 
Instrumentable {
       this.flowOrchestrationSuccessFulMeter = 
Optional.of(this.metricContext.meter(ServiceMetricNames.FLOW_ORCHESTRATION_SUCCESSFUL_METER));
       this.flowOrchestrationFailedMeter = 
Optional.of(this.metricContext.meter(ServiceMetricNames.FLOW_ORCHESTRATION_FAILED_METER));
       this.flowOrchestrationTimer = 
Optional.of(this.metricContext.timer(ServiceMetricNames.FLOW_ORCHESTRATION_TIMER));
-      this.skippedFlowsMeter = 
Optional.of(metricContext.contextAwareMeter(ServiceMetricNames.SKIPPED_FLOWS));
       this.eventSubmitter = Optional.of(new 
EventSubmitter.Builder(this.metricContext, 
"org.apache.gobblin.service").build());
     } else {
       this.metricContext = null;
       this.flowOrchestrationSuccessFulMeter = Optional.absent();
       this.flowOrchestrationFailedMeter = Optional.absent();
       this.flowOrchestrationTimer = Optional.absent();
-      this.skippedFlowsMeter = Optional.absent();
       this.eventSubmitter = Optional.absent();
     }
-    this.flowConcurrencyFlag = ConfigUtils.getBoolean(config, 
ServiceConfigKeys.FLOW_CONCURRENCY_ALLOWED,
+    this.isFlowConcurrencyEnabled = ConfigUtils.getBoolean(config, 
ServiceConfigKeys.FLOW_CONCURRENCY_ALLOWED,
         ServiceConfigKeys.DEFAULT_FLOW_CONCURRENCY_ALLOWED);
     quotaManager = 
GobblinConstructorUtils.invokeConstructor(UserQuotaManager.class,
         ConfigUtils.getString(config, ServiceConfigKeys.QUOTA_MANAGER_CLASS, 
ServiceConfigKeys.DEFAULT_QUOTA_MANAGER),
         config);
+    this.flowCompilationValidationHelper = new 
FlowCompilationValidationHelper(sharedFlowMetricsSingleton, specCompiler,
+        quotaManager, eventSubmitter, flowStatusGenerator, 
isFlowConcurrencyEnabled);
   }
 
   @Inject
   public Orchestrator(Config config, FlowStatusGenerator flowStatusGenerator, 
Optional<TopologyCatalog> topologyCatalog,
-      Optional<DagManager> dagManager, Optional<Logger> log, 
Optional<FlowTriggerHandler> flowTriggerHandler) {
-    this(config, topologyCatalog, dagManager, log, flowStatusGenerator, true, 
flowTriggerHandler);
+      Optional<DagManager> dagManager, Optional<Logger> log, 
Optional<FlowTriggerHandler> flowTriggerHandler,
+      SharedFlowMetricsSingleton sharedFlowMetricsSingleton) {
+    this(config, topologyCatalog, dagManager, log, flowStatusGenerator, true, 
flowTriggerHandler,
+        sharedFlowMetricsSingleton);
   }
 
 
@@ -236,16 +233,14 @@ public class Orchestrator implements SpecCatalogListener, 
Instrumentable {
       String flowGroup = 
flowConfig.getString(ConfigurationKeys.FLOW_GROUP_KEY);
       String flowName = flowConfig.getString(ConfigurationKeys.FLOW_NAME_KEY);
 
-      // Only register the metric of flows that are scheduled, run once flows 
should not be tracked indefinitely
-      if (!flowGauges.containsKey(spec.getUri().toString()) && 
flowConfig.hasPath(ConfigurationKeys.JOB_SCHEDULE_KEY)) {
-        String flowCompiledGaugeName =
-            MetricRegistry.name(ServiceMetricNames.GOBBLIN_SERVICE_PREFIX, 
flowGroup, flowName, ServiceMetricNames.COMPILED);
-        flowGauges.put(spec.getUri().toString(), new FlowCompiledState());
-        ContextAwareGauge<Integer> gauge = 
RootMetricContext.get().newContextAwareGauge(flowCompiledGaugeName, () -> 
flowGauges.get(spec.getUri().toString()).state.value);
-        RootMetricContext.get().register(flowCompiledGaugeName, gauge);
-      }
-
-      if (!isExecutionPermittedHandler(flowConfig, spec, flowName, flowGroup)) 
{
+      sharedFlowMetricsSingleton.addFlowGauge(spec, flowConfig, flowGroup, 
flowName);
+      Optional<TimingEvent> flowCompilationTimer =
+          this.eventSubmitter.transform(submitter -> new 
TimingEvent(submitter, TimingEvent.FlowTimings.FLOW_COMPILED));
+      Optional<Dag<JobExecutionPlan>> jobExecutionPlanDagOptional =
+          
this.flowCompilationValidationHelper.validateAndHandleConcurrentExecution(flowConfig,
 spec, flowGroup,
+              flowName);
+      if (!jobExecutionPlanDagOptional.isPresent()) {
+        Instrumented.markMeter(this.flowOrchestrationFailedMeter);
         return;
       }
       Map<String, String> flowMetadata = 
TimingEventUtils.getFlowMetadata((FlowSpec) spec);
@@ -272,21 +267,19 @@ public class Orchestrator implements SpecCatalogListener, 
Instrumentable {
         _log.info("Multi-active scheduler finished handling trigger event: 
[{}, triggerEventTimestamp: {}]", flowAction,
             triggerTimestampMillis);
       } else {
-        // Compile flow spec and do corresponding checks
-        Dag<JobExecutionPlan> jobExecutionPlanDag = 
specCompiler.compileFlow(spec);
-        Optional<TimingEvent> flowCompilationTimer =
-            this.eventSubmitter.transform(submitter -> new 
TimingEvent(submitter, TimingEvent.FlowTimings.FLOW_COMPILED));
-
+        Dag<JobExecutionPlan> jobExecutionPlanDag = 
jobExecutionPlanDagOptional.get();
         if (jobExecutionPlanDag == null || jobExecutionPlanDag.isEmpty()) {
-          populateFlowCompilationFailedEventMessage(spec, flowMetadata);
+          
FlowCompilationValidationHelper.populateFlowCompilationFailedEventMessage(eventSubmitter,
 spec, flowMetadata);
           Instrumented.markMeter(this.flowOrchestrationFailedMeter);
-          conditionallyUpdateFlowGaugeSpecState(spec, CompiledState.FAILED);
+          
sharedFlowMetricsSingleton.conditionallyUpdateFlowGaugeSpecState(spec,
+              SharedFlowMetricsSingleton.CompiledState.FAILED);
           _log.warn("Cannot determine an executor to run on for Spec: " + 
spec);
           return;
         }
-        conditionallyUpdateFlowGaugeSpecState(spec, CompiledState.SUCCESSFUL);
+        sharedFlowMetricsSingleton.conditionallyUpdateFlowGaugeSpecState(spec,
+            SharedFlowMetricsSingleton.CompiledState.SUCCESSFUL);
 
-        addFlowExecutionIdIfAbsent(flowMetadata, jobExecutionPlanDag);
+        
FlowCompilationValidationHelper.addFlowExecutionIdIfAbsent(flowMetadata, 
jobExecutionPlanDag);
         if (flowCompilationTimer.isPresent()) {
           flowCompilationTimer.get().stop(flowMetadata);
         }
@@ -336,119 +329,13 @@ public class Orchestrator implements 
SpecCatalogListener, Instrumentable {
     Instrumented.updateTimer(this.flowOrchestrationTimer, System.nanoTime() - 
startTime, TimeUnit.NANOSECONDS);
   }
 
-  /**
-   * If it is a scheduled flow (and hence, does not have flowExecutionId in 
the FlowSpec) and the flow compilation is
-   * successful, retrieve the flowExecutionId from the JobSpec.
-   */
-  public void addFlowExecutionIdIfAbsent(Map<String,String> flowMetadata, 
Dag<JobExecutionPlan> jobExecutionPlanDag) {
-    
flowMetadata.putIfAbsent(TimingEvent.FlowEventConstants.FLOW_EXECUTION_ID_FIELD,
-        
jobExecutionPlanDag.getNodes().get(0).getValue().getJobSpec().getConfigAsProperties().getProperty(
-            ConfigurationKeys.FLOW_EXECUTION_ID_KEY));
-  }
-
-  /**
-   * Checks if flowSpec disallows concurrent executions, and if so then checks 
if another instance of the flow is
-   * already running and emits a FLOW FAILED event. Otherwise, this check 
passes.
-   * @return true if caller can proceed to execute flow, false otherwise
-   * @throws IOException
-   */
-  public boolean isExecutionPermittedHandler(Config flowConfig, Spec spec, 
String flowName, String flowGroup)
-      throws IOException {
-    boolean allowConcurrentExecution = ConfigUtils.getBoolean(flowConfig, 
ConfigurationKeys.FLOW_ALLOW_CONCURRENT_EXECUTION, this.flowConcurrencyFlag);
-
-    Dag<JobExecutionPlan> jobExecutionPlanDag = specCompiler.compileFlow(spec);
-
-    if (!isExecutionPermitted(flowName, flowGroup, allowConcurrentExecution)) {
-      _log.warn("Another instance of flowGroup: {}, flowName: {} running; 
Skipping flow execution since "
-          + "concurrent executions are disabled for this flow.", flowGroup, 
flowName);
-      conditionallyUpdateFlowGaugeSpecState(spec, CompiledState.SKIPPED);
-      Instrumented.markMeter(this.skippedFlowsMeter);
-      if (!((FlowSpec) 
spec).getConfigAsProperties().containsKey(ConfigurationKeys.JOB_SCHEDULE_KEY)) {
-        // For ad-hoc flow, we might already increase quota, we need to 
decrease here
-        for (Dag.DagNode dagNode : jobExecutionPlanDag.getStartNodes()) {
-          quotaManager.releaseQuota(dagNode);
-        }
-      }
-
-      // Send FLOW_FAILED event
-      Map<String, String> flowMetadata = 
TimingEventUtils.getFlowMetadata((FlowSpec) spec);
-      flowMetadata.put(TimingEvent.METADATA_MESSAGE, "Flow failed because 
another instance is running and concurrent "
-          + "executions are disabled. Set flow.allowConcurrentExecution to 
true in the flow spec to change this behaviour.");
-      if (this.eventSubmitter.isPresent()) {
-        new TimingEvent(this.eventSubmitter.get(), 
TimingEvent.FlowTimings.FLOW_FAILED).stop(flowMetadata);
-      }
-      return false;
-    }
-    return true;
-  }
-
-
-  /**
-   * Abstraction used to populate the message of and emit a FlowCompileFailed 
event for the Orchestrator.
-   * @param spec
-   * @param flowMetadata
-   */
-  public void populateFlowCompilationFailedEventMessage(Spec spec,
-      Map<String, String> flowMetadata) {
-    // For scheduled flows, we do not insert the flowExecutionId into the 
FlowSpec. As a result, if the flow
-    // compilation fails (i.e. we are unable to find a path), the metadata 
will not have flowExecutionId.
-    // In this case, the current time is used as the flow executionId.
-    
flowMetadata.putIfAbsent(TimingEvent.FlowEventConstants.FLOW_EXECUTION_ID_FIELD,
-        Long.toString(System.currentTimeMillis()));
-
-    String message = "Flow was not compiled successfully.";
-    if (!((FlowSpec) spec).getCompilationErrors().isEmpty()) {
-      message = message + " Compilation errors encountered: " + ((FlowSpec) 
spec).getCompilationErrors();
-    }
-    flowMetadata.put(TimingEvent.METADATA_MESSAGE, message);
-
-    Optional<TimingEvent> flowCompileFailedTimer = 
eventSubmitter.transform(submitter ->
-        new TimingEvent(submitter, 
TimingEvent.FlowTimings.FLOW_COMPILE_FAILED));
-
-    if (flowCompileFailedTimer.isPresent()) {
-      flowCompileFailedTimer.get().stop(flowMetadata);
-    }
-  }
-
-  /**
-   * For a given a flowSpec, verifies that an execution is allowed (in case 
there is an ongoing execution) and the
-   * flowspec can be compiled. If the pre-conditions hold, then a 
JobExecutionPlan is constructed and returned to the
-   * caller.
-   * @return jobExecutionPlan dag if one can be constructed for the given 
flowSpec
-   */
-  public Optional<Dag<JobExecutionPlan>> handleChecksBeforeExecution(FlowSpec 
flowSpec)
-      throws IOException, InterruptedException {
-    Config flowConfig = flowSpec.getConfig();
-    String flowGroup = flowConfig.getString(ConfigurationKeys.FLOW_GROUP_KEY);
-    String flowName = flowConfig.getString(ConfigurationKeys.FLOW_NAME_KEY);
-    if (!isExecutionPermittedHandler(flowConfig, flowSpec, flowName, 
flowGroup)) {
-      return Optional.absent();
-    }
-
-    //Wait for the SpecCompiler to become healthy.
-    this.getSpecCompiler().awaitHealthy();
-
-    Dag<JobExecutionPlan> jobExecutionPlanDag = 
specCompiler.compileFlow(flowSpec);
-    Optional<TimingEvent> flowCompilationTimer =
-        this.eventSubmitter.transform(submitter -> new TimingEvent(submitter, 
TimingEvent.FlowTimings.FLOW_COMPILED));
-    Map<String, String> flowMetadata = 
TimingEventUtils.getFlowMetadata(flowSpec);
-
-    if (jobExecutionPlanDag == null || jobExecutionPlanDag.isEmpty()) {
-      populateFlowCompilationFailedEventMessage(flowSpec, flowMetadata);
-      return Optional.absent();
-    }
-
-    addFlowExecutionIdIfAbsent(flowMetadata, jobExecutionPlanDag);
-    if (flowCompilationTimer.isPresent()) {
-      flowCompilationTimer.get().stop(flowMetadata);
-    }
-    return Optional.of(jobExecutionPlanDag);
-  }
-
   public void submitFlowToDagManager(FlowSpec flowSpec) throws IOException, 
InterruptedException {
-    Optional<Dag<JobExecutionPlan>> optionalJobExecutionPlanDag = 
handleChecksBeforeExecution(flowSpec);
+    Optional<Dag<JobExecutionPlan>> optionalJobExecutionPlanDag =
+        
this.flowCompilationValidationHelper.createExecutionPlanIfValid(flowSpec);
     if (optionalJobExecutionPlanDag.isPresent()) {
       submitFlowToDagManager(flowSpec, optionalJobExecutionPlanDag.get());
+    } else {
+      Instrumented.markMeter(this.flowOrchestrationFailedMeter);
     }
   }
 
@@ -469,22 +356,6 @@ public class Orchestrator implements SpecCatalogListener, 
Instrumentable {
     }
   }
 
-  /**
-   * Check if a FlowSpec instance is allowed to run.
-   *
-   * @param flowName
-   * @param flowGroup
-   * @param allowConcurrentExecution
-   * @return true if the {@link FlowSpec} allows concurrent executions or if 
no other instance of the flow is currently RUNNING.
-   */
-  private boolean isExecutionPermitted(String flowName, String flowGroup, 
boolean allowConcurrentExecution) {
-    if (allowConcurrentExecution) {
-      return true;
-    } else {
-      return !flowStatusGenerator.isFlowRunning(flowName, flowGroup);
-    }
-  }
-
   public void remove(Spec spec, Properties headers) throws IOException {
     // TODO: Evolve logic to cache and reuse previously compiled JobSpecs
     // .. this will work for Identity compiler but not always for multi-hop.
@@ -529,18 +400,6 @@ public class Orchestrator implements SpecCatalogListener, 
Instrumentable {
     }
   }
 
-  /**
-   * Updates the flowgauge related to the spec if the gauge is being tracked 
for the flow
-   * @param spec FlowSpec to be updated
-   * @param state desired state to set the gauge
-   */
-  private void conditionallyUpdateFlowGaugeSpecState(Spec spec, CompiledState 
state) {
-    if (this.flowGauges.containsKey(spec.getUri().toString())) {
-      this.flowGauges.get(spec.getUri().toString()).setState(state);
-    }
-  }
-
-
   @Nonnull
   @Override
   public MetricContext getMetricContext() {
@@ -566,22 +425,4 @@ public class Orchestrator implements SpecCatalogListener, 
Instrumentable {
   public void switchMetricContext(MetricContext context) {
     throw new UnsupportedOperationException();
   }
-
-  @Setter
-  private static class FlowCompiledState {
-    private CompiledState state = CompiledState.UNKNOWN;
-  }
-
-  private enum CompiledState {
-    FAILED(-1),
-    UNKNOWN(0),
-    SUCCESSFUL(1),
-    SKIPPED(2);
-
-    public int value;
-
-    CompiledState(int value) {
-      this.value = value;
-    }
-  }
 }
\ No newline at end of file
diff --git 
a/gobblin-service/src/main/java/org/apache/gobblin/service/modules/scheduler/GobblinServiceJobScheduler.java
 
b/gobblin-service/src/main/java/org/apache/gobblin/service/modules/scheduler/GobblinServiceJobScheduler.java
index d860ed485..f71093f52 100644
--- 
a/gobblin-service/src/main/java/org/apache/gobblin/service/modules/scheduler/GobblinServiceJobScheduler.java
+++ 
b/gobblin-service/src/main/java/org/apache/gobblin/service/modules/scheduler/GobblinServiceJobScheduler.java
@@ -31,6 +31,7 @@ import java.util.TimeZone;
 
 import org.apache.commons.lang.StringUtils;
 import org.apache.gobblin.runtime.api.SpecNotFoundException;
+import org.apache.gobblin.service.modules.utils.SharedFlowMetricsSingleton;
 import org.apache.helix.HelixManager;
 import org.quartz.CronExpression;
 import org.quartz.DisallowConcurrentExecution;
@@ -208,10 +209,12 @@ public class GobblinServiceJobScheduler extends 
JobScheduler implements SpecCata
   public GobblinServiceJobScheduler(String serviceName, Config config, 
FlowStatusGenerator flowStatusGenerator,
       Optional<HelixManager> helixManager, Optional<FlowCatalog> flowCatalog, 
Optional<TopologyCatalog> topologyCatalog,
       Optional<DagManager> dagManager, Optional<UserQuotaManager> 
quotaManager, SchedulerService schedulerService,
-      Optional<Logger> log, boolean warmStandbyEnabled, Optional 
<FlowTriggerHandler> flowTriggerHandler)
+      Optional<Logger> log, boolean warmStandbyEnabled, Optional 
<FlowTriggerHandler> flowTriggerHandler,
+      SharedFlowMetricsSingleton sharedFlowMetricsSingleton)
       throws Exception {
     this(serviceName, config, helixManager, flowCatalog, topologyCatalog,
-        new Orchestrator(config, flowStatusGenerator, topologyCatalog, 
dagManager, log, flowTriggerHandler),
+        new Orchestrator(config, flowStatusGenerator, topologyCatalog, 
dagManager, log, flowTriggerHandler,
+            sharedFlowMetricsSingleton),
         schedulerService, quotaManager, log, warmStandbyEnabled, 
flowTriggerHandler);
   }
 
diff --git 
a/gobblin-service/src/main/java/org/apache/gobblin/service/modules/utils/FlowCompilationValidationHelper.java
 
b/gobblin-service/src/main/java/org/apache/gobblin/service/modules/utils/FlowCompilationValidationHelper.java
new file mode 100644
index 000000000..747ebc2bc
--- /dev/null
+++ 
b/gobblin-service/src/main/java/org/apache/gobblin/service/modules/utils/FlowCompilationValidationHelper.java
@@ -0,0 +1,196 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *    http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.gobblin.service.modules.utils;
+
+import com.google.common.base.Optional;
+import com.typesafe.config.Config;
+import java.io.IOException;
+import java.util.Map;
+import lombok.Data;
+import lombok.extern.slf4j.Slf4j;
+import org.apache.gobblin.configuration.ConfigurationKeys;
+import org.apache.gobblin.instrumented.Instrumented;
+import org.apache.gobblin.metrics.event.EventSubmitter;
+import org.apache.gobblin.metrics.event.TimingEvent;
+import org.apache.gobblin.runtime.api.FlowSpec;
+import org.apache.gobblin.runtime.api.Spec;
+import org.apache.gobblin.service.modules.flow.SpecCompiler;
+import org.apache.gobblin.service.modules.flowgraph.Dag;
+import org.apache.gobblin.service.modules.orchestration.TimingEventUtils;
+import org.apache.gobblin.service.modules.orchestration.UserQuotaManager;
+import org.apache.gobblin.service.modules.spec.JobExecutionPlan;
+import org.apache.gobblin.service.monitoring.FlowStatusGenerator;
+import org.apache.gobblin.util.ConfigUtils;
+
+
+/**
+ * Helper class with functionality meant to be re-used between the DagManager 
and Orchestrator when launching
+ * executions of a flow spec. In the common case, the Orchestrator receives a 
flow to orchestrate, performs necessary
+ * validations, and forwards the execution responsibility to the DagManager. 
The DagManager's responsibility is to
+ * carry out any flow action requests. However, with launch executions now 
being stored in the DagActionStateStore, on
+ * restart or leadership change the DagManager has to perform validations 
before executing any launch actions the
+ * previous leader was unable to complete. Rather than duplicating the code or 
introducing a circular dependency between
+ * the DagManager and Orchestrator, this class is utilized to store the common 
functionality. It is stateful,
+ * requiring all stateful pieces to be passed as input from the caller upon 
instantiating the helper.
+ * Note: We expect further refactoring to be done to the DagManager in later 
stage of multi-active development, so we do
+ * not attempt major reorganization as abstractions may change.
+ */
+@Slf4j
+@Data
+public final class FlowCompilationValidationHelper {
+  private final SharedFlowMetricsSingleton sharedFlowMetricsSingleton;
+  private final SpecCompiler specCompiler;
+  private final UserQuotaManager quotaManager;
+  private final Optional<EventSubmitter> eventSubmitter;
+  private final FlowStatusGenerator flowStatusGenerator;
+  private final boolean isFlowConcurrencyEnabled;
+
+  /**
+   * For a given a flowSpec, verifies that an execution is allowed (in case 
there is an ongoing execution) and the
+   * flowspec can be compiled. If the pre-conditions hold, then a 
JobExecutionPlan is constructed and returned to the
+   * caller.
+   * @return jobExecutionPlan dag if one can be constructed for the given 
flowSpec
+   */
+  public Optional<Dag<JobExecutionPlan>> createExecutionPlanIfValid(FlowSpec 
flowSpec)
+      throws IOException, InterruptedException {
+    Config flowConfig = flowSpec.getConfig();
+    String flowGroup = flowConfig.getString(ConfigurationKeys.FLOW_GROUP_KEY);
+    String flowName = flowConfig.getString(ConfigurationKeys.FLOW_NAME_KEY);
+
+    //Wait for the SpecCompiler to become healthy.
+    specCompiler.awaitHealthy();
+
+    Optional<TimingEvent> flowCompilationTimer =
+        this.eventSubmitter.transform(submitter -> new TimingEvent(submitter, 
TimingEvent.FlowTimings.FLOW_COMPILED));
+    Optional<Dag<JobExecutionPlan>> jobExecutionPlanDagOptional =
+        validateAndHandleConcurrentExecution(flowConfig, flowSpec, flowGroup, 
flowName);
+    Map<String, String> flowMetadata = 
TimingEventUtils.getFlowMetadata(flowSpec);
+
+    if (!jobExecutionPlanDagOptional.isPresent()) {
+      return Optional.absent();
+    }
+
+    if (jobExecutionPlanDagOptional.get() == null || 
jobExecutionPlanDagOptional.get().isEmpty()) {
+      populateFlowCompilationFailedEventMessage(eventSubmitter, flowSpec, 
flowMetadata);
+      return Optional.absent();
+    }
+
+    addFlowExecutionIdIfAbsent(flowMetadata, 
jobExecutionPlanDagOptional.get());
+    if (flowCompilationTimer.isPresent()) {
+      flowCompilationTimer.get().stop(flowMetadata);
+    }
+    return jobExecutionPlanDagOptional;
+  }
+
+  /**
+   * Checks if flowSpec disallows concurrent executions, and if so then checks 
if another instance of the flow is
+   * already running and emits a FLOW FAILED event. Otherwise, this check 
passes.
+   * @return Optional<Dag<JobExecutionPlan>> if caller allowed to execute flow 
and compile spec, else absent Optional
+   * @throws IOException
+   */
+  public Optional<Dag<JobExecutionPlan>> 
validateAndHandleConcurrentExecution(Config flowConfig, Spec spec,
+      String flowGroup, String flowName) throws IOException {
+    boolean allowConcurrentExecution = ConfigUtils.getBoolean(flowConfig,
+        ConfigurationKeys.FLOW_ALLOW_CONCURRENT_EXECUTION, 
isFlowConcurrencyEnabled);
+
+    Dag<JobExecutionPlan> jobExecutionPlanDag = specCompiler.compileFlow(spec);
+
+    if (isExecutionPermitted(flowStatusGenerator, flowName, flowGroup, 
allowConcurrentExecution)) {
+      return Optional.of(jobExecutionPlanDag);
+    } else {
+      log.warn("Another instance of flowGroup: {}, flowName: {} running; 
Skipping flow execution since "
+          + "concurrent executions are disabled for this flow.", flowGroup, 
flowName);
+      sharedFlowMetricsSingleton.conditionallyUpdateFlowGaugeSpecState(spec,
+          SharedFlowMetricsSingleton.CompiledState.SKIPPED);
+      
Instrumented.markMeter(sharedFlowMetricsSingleton.getSkippedFlowsMeter());
+      if (!isScheduledFlow((FlowSpec) spec)) {
+        // For ad-hoc flow, we might already increase quota, we need to 
decrease here
+        for (Dag.DagNode dagNode : jobExecutionPlanDag.getStartNodes()) {
+          quotaManager.releaseQuota(dagNode);
+        }
+      }
+
+      // Send FLOW_FAILED event
+      Map<String, String> flowMetadata = 
TimingEventUtils.getFlowMetadata((FlowSpec) spec);
+      flowMetadata.put(TimingEvent.METADATA_MESSAGE, "Flow failed because 
another instance is running and concurrent "
+          + "executions are disabled. Set flow.allowConcurrentExecution to 
true in the flow spec to change this behaviour.");
+      if (eventSubmitter.isPresent()) {
+        new TimingEvent(eventSubmitter.get(), 
TimingEvent.FlowTimings.FLOW_FAILED).stop(flowMetadata);
+      }
+      return Optional.absent();
+    }
+  }
+
+  /**
+   * Check if a FlowSpec instance is allowed to run.
+   *
+   * @param flowName
+   * @param flowGroup
+   * @param allowConcurrentExecution
+   * @return true if the {@link FlowSpec} allows concurrent executions or if 
no other instance of the flow is currently RUNNING.
+   */
+  private boolean isExecutionPermitted(FlowStatusGenerator 
flowStatusGenerator, String flowName, String flowGroup,
+      boolean allowConcurrentExecution) {
+    return allowConcurrentExecution || 
!flowStatusGenerator.isFlowRunning(flowName, flowGroup);
+  }
+
+  /**
+   * Abstraction used to populate the message of and emit a FlowCompileFailed 
event for the Orchestrator.
+   * @param spec
+   * @param flowMetadata
+   */
+  public static void 
populateFlowCompilationFailedEventMessage(Optional<EventSubmitter> 
eventSubmitter, Spec spec,
+      Map<String, String> flowMetadata) {
+    // For scheduled flows, we do not insert the flowExecutionId into the 
FlowSpec. As a result, if the flow
+    // compilation fails (i.e. we are unable to find a path), the metadata 
will not have flowExecutionId.
+    // In this case, the current time is used as the flow executionId.
+    
flowMetadata.putIfAbsent(TimingEvent.FlowEventConstants.FLOW_EXECUTION_ID_FIELD,
+        Long.toString(System.currentTimeMillis()));
+
+    String message = "Flow was not compiled successfully.";
+    if (!((FlowSpec) spec).getCompilationErrors().isEmpty()) {
+      message = message + " Compilation errors encountered: " + ((FlowSpec) 
spec).getCompilationErrors();
+    }
+    flowMetadata.put(TimingEvent.METADATA_MESSAGE, message);
+
+    Optional<TimingEvent> flowCompileFailedTimer = 
eventSubmitter.transform(submitter ->
+        new TimingEvent(submitter, 
TimingEvent.FlowTimings.FLOW_COMPILE_FAILED));
+
+    if (flowCompileFailedTimer.isPresent()) {
+      flowCompileFailedTimer.get().stop(flowMetadata);
+    }
+  }
+
+  /**
+   * If it is a scheduled flow (and hence, does not have flowExecutionId in 
the FlowSpec) and the flow compilation is
+   * successful, retrieve the flowExecutionId from the JobSpec.
+   */
+  public static void addFlowExecutionIdIfAbsent(Map<String,String> 
flowMetadata,
+      Dag<JobExecutionPlan> jobExecutionPlanDag) {
+    
flowMetadata.putIfAbsent(TimingEvent.FlowEventConstants.FLOW_EXECUTION_ID_FIELD,
+        
jobExecutionPlanDag.getNodes().get(0).getValue().getJobSpec().getConfigAsProperties().getProperty(
+            ConfigurationKeys.FLOW_EXECUTION_ID_KEY));
+  }
+
+  /**
+   * Return true if the spec contains a schedule, false otherwise.
+   */
+  public static boolean isScheduledFlow(FlowSpec spec) {
+    return 
spec.getConfigAsProperties().containsKey(ConfigurationKeys.JOB_SCHEDULE_KEY);
+  }
+}
diff --git 
a/gobblin-service/src/main/java/org/apache/gobblin/service/modules/utils/SharedFlowMetricsSingleton.java
 
b/gobblin-service/src/main/java/org/apache/gobblin/service/modules/utils/SharedFlowMetricsSingleton.java
new file mode 100644
index 000000000..ab9bb5f97
--- /dev/null
+++ 
b/gobblin-service/src/main/java/org/apache/gobblin/service/modules/utils/SharedFlowMetricsSingleton.java
@@ -0,0 +1,104 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *    http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.gobblin.service.modules.utils;
+
+import com.codahale.metrics.Meter;
+import com.codahale.metrics.MetricRegistry;
+import com.google.common.base.Optional;
+import com.google.common.collect.Maps;
+import com.typesafe.config.Config;
+import java.net.URI;
+import java.util.Map;
+import javax.inject.Inject;
+import javax.inject.Singleton;
+import lombok.Data;
+import lombok.Setter;
+import org.apache.gobblin.configuration.ConfigurationKeys;
+import org.apache.gobblin.instrumented.Instrumented;
+import org.apache.gobblin.metrics.ContextAwareGauge;
+import org.apache.gobblin.metrics.MetricContext;
+import org.apache.gobblin.metrics.RootMetricContext;
+import org.apache.gobblin.metrics.ServiceMetricNames;
+import org.apache.gobblin.runtime.api.Spec;
+import org.apache.gobblin.service.modules.orchestration.Orchestrator;
+import org.apache.gobblin.util.ConfigUtils;
+
+
+/**
+ * Class to store flow related metrics shared between the {@link Orchestrator} 
and {@link DagManager} so we can easily
+ * track all flow compilations and skipped flows handled between the two in a 
common place.
+ */
+@Singleton
+@Data
+public class SharedFlowMetricsSingleton {
+  protected final MetricContext metricContext;
+  private Map<URI, FlowCompiledState> flowGaugeStateBySpecUri = 
Maps.newHashMap();
+  private Optional<Meter> skippedFlowsMeter;
+
+  @Setter
+  public static class FlowCompiledState {
+    private CompiledState state = CompiledState.UNKNOWN;
+  }
+
+  public enum CompiledState {
+    FAILED(-1),
+    UNKNOWN(0),
+    SUCCESSFUL(1),
+    SKIPPED(2);
+
+    public int value;
+
+    CompiledState(int value) {
+      this.value = value;
+    }
+  }
+
+  @Inject
+  public SharedFlowMetricsSingleton(Config config) {
+    this.metricContext = 
Instrumented.getMetricContext(ConfigUtils.configToState(config),
+        SharedFlowMetricsSingleton.class);
+    this.skippedFlowsMeter = 
Optional.of(metricContext.contextAwareMeter(ServiceMetricNames.SKIPPED_FLOWS));
+  }
+
+  /**
+   * Adds a new FlowGauge to the metric context if one does not already exist 
for this flow spec
+   */
+  public void addFlowGauge(Spec spec, Config flowConfig, String flowGroup, 
String flowName) {
+    // Only register the metric of flows that are scheduled, run once flows 
should not be tracked indefinitely
+    if (!flowGaugeStateBySpecUri.containsKey(spec.getUri())
+        && flowConfig.hasPath(ConfigurationKeys.JOB_SCHEDULE_KEY)) {
+      String flowCompiledGaugeName = 
MetricRegistry.name(ServiceMetricNames.GOBBLIN_SERVICE_PREFIX, flowGroup, 
flowName,
+              ServiceMetricNames.COMPILED);
+      flowGaugeStateBySpecUri.put(spec.getUri(), new FlowCompiledState());
+      ContextAwareGauge<Integer> gauge =
+          RootMetricContext.get().newContextAwareGauge(flowCompiledGaugeName,
+              () -> flowGaugeStateBySpecUri.get(spec.getUri()).state.value);
+      RootMetricContext.get().register(flowCompiledGaugeName, gauge);
+    }
+  }
+  /**
+   * Updates the flowgauge related to the spec if the gauge is being tracked 
for the flow
+   * @param spec FlowSpec to be updated
+   * @param state desired state to set the gauge
+   */
+  public void conditionallyUpdateFlowGaugeSpecState(Spec spec, CompiledState 
state) {
+    if (flowGaugeStateBySpecUri.containsKey(spec.getUri())) {
+      flowGaugeStateBySpecUri.get(spec.getUri()).setState(state);
+    }
+  }
+}
diff --git 
a/gobblin-service/src/test/java/org/apache/gobblin/service/modules/orchestration/DagManagerFlowTest.java
 
b/gobblin-service/src/test/java/org/apache/gobblin/service/modules/orchestration/DagManagerFlowTest.java
index 603be2a17..5445a2096 100644
--- 
a/gobblin-service/src/test/java/org/apache/gobblin/service/modules/orchestration/DagManagerFlowTest.java
+++ 
b/gobblin-service/src/test/java/org/apache/gobblin/service/modules/orchestration/DagManagerFlowTest.java
@@ -26,6 +26,8 @@ import java.util.concurrent.TimeUnit;
 import java.util.concurrent.TimeoutException;
 
 import org.apache.gobblin.runtime.spec_catalog.FlowCatalog;
+import org.apache.gobblin.service.modules.utils.SharedFlowMetricsSingleton;
+import org.apache.gobblin.service.monitoring.FlowStatusGenerator;
 import org.mockito.Mockito;
 import org.testng.Assert;
 import org.testng.annotations.AfterClass;
@@ -342,8 +344,8 @@ class CancelPredicate implements Predicate<Void> {
 class MockedDagManager extends DagManager {
 
   public MockedDagManager(Config config, boolean instrumentationEnabled) {
-    super(config, createJobStatusRetriever(), 
Mockito.mock(Orchestrator.class), Mockito.mock(FlowCatalog.class),
-        instrumentationEnabled);
+    super(config, createJobStatusRetriever(), 
Mockito.mock(SharedFlowMetricsSingleton.class),
+        Mockito.mock(FlowStatusGenerator.class), 
Mockito.mock(FlowCatalog.class), instrumentationEnabled);
   }
 
   private static JobStatusRetriever createJobStatusRetriever() {
diff --git 
a/gobblin-service/src/test/java/org/apache/gobblin/service/modules/orchestration/OrchestratorTest.java
 
b/gobblin-service/src/test/java/org/apache/gobblin/service/modules/orchestration/OrchestratorTest.java
index fafcc9605..2d3f4b3f9 100644
--- 
a/gobblin-service/src/test/java/org/apache/gobblin/service/modules/orchestration/OrchestratorTest.java
+++ 
b/gobblin-service/src/test/java/org/apache/gobblin/service/modules/orchestration/OrchestratorTest.java
@@ -18,34 +18,23 @@
 package org.apache.gobblin.service.modules.orchestration;
 
 import com.codahale.metrics.MetricRegistry;
+import com.google.common.base.Optional;
+import com.google.gson.Gson;
+import com.google.gson.GsonBuilder;
+import com.typesafe.config.Config;
 import java.io.File;
 import java.net.URI;
 import java.net.URISyntaxException;
 import java.util.Collection;
 import java.util.List;
 import java.util.Properties;
-
 import org.apache.commons.io.FileUtils;
-
 import org.apache.gobblin.configuration.ConfigurationKeys;
 import org.apache.gobblin.metrics.MetricContext;
 import org.apache.gobblin.metrics.ServiceMetricNames;
-import org.apache.gobblin.runtime.api.SpecCatalogListener;
-import org.apache.hadoop.fs.Path;
-import org.slf4j.Logger;
-import org.slf4j.LoggerFactory;
-import org.testng.Assert;
-import org.testng.annotations.AfterClass;
-import org.testng.annotations.BeforeClass;
-import org.testng.annotations.Test;
-
-import com.google.common.base.Optional;
-import com.google.gson.Gson;
-import com.google.gson.GsonBuilder;
-import com.typesafe.config.Config;
-
 import org.apache.gobblin.runtime.api.FlowSpec;
 import org.apache.gobblin.runtime.api.Spec;
+import org.apache.gobblin.runtime.api.SpecCatalogListener;
 import org.apache.gobblin.runtime.api.SpecExecutor;
 import org.apache.gobblin.runtime.api.TopologySpec;
 import org.apache.gobblin.runtime.app.ServiceBasedAppLauncher;
@@ -53,9 +42,17 @@ import org.apache.gobblin.runtime.spec_catalog.FlowCatalog;
 import org.apache.gobblin.runtime.spec_catalog.TopologyCatalog;
 import org.apache.gobblin.runtime.spec_executorInstance.InMemorySpecExecutor;
 import org.apache.gobblin.service.modules.flow.IdentityFlowToJobSpecCompiler;
+import org.apache.gobblin.service.modules.utils.SharedFlowMetricsSingleton;
 import org.apache.gobblin.service.monitoring.FlowStatusGenerator;
 import org.apache.gobblin.util.ConfigUtils;
 import org.apache.gobblin.util.PathUtils;
+import org.apache.hadoop.fs.Path;
+import org.slf4j.Logger;
+import org.slf4j.LoggerFactory;
+import org.testng.Assert;
+import org.testng.annotations.AfterClass;
+import org.testng.annotations.BeforeClass;
+import org.testng.annotations.Test;
 
 import static org.mockito.Mockito.*;
 
@@ -111,7 +108,8 @@ public class OrchestratorTest {
     this._mockFlowTriggerHandler = mock(FlowTriggerHandler.class);
     this.orchestrator = new 
Orchestrator(ConfigUtils.propertiesToConfig(orchestratorProperties),
         this.mockStatusGenerator, Optional.of(this.topologyCatalog), 
Optional.<DagManager>absent(), Optional.of(logger),
-         Optional.of(this._mockFlowTriggerHandler));
+         Optional.of(this._mockFlowTriggerHandler), new 
SharedFlowMetricsSingleton(
+             ConfigUtils.propertiesToConfig(orchestratorProperties)));
     this.topologyCatalog.addListener(orchestrator);
     this.flowCatalog.addListener(orchestrator);
     // Start application
@@ -332,7 +330,7 @@ public class OrchestratorTest {
 
   @Test (dependsOnMethods = "deleteFlowSpec")
   public void doNotRegisterMetricsAdhocFlows() throws Exception {
-    MetricContext metricContext = this.orchestrator.getMetricContext();
+    MetricContext metricContext = 
this.orchestrator.getSharedFlowMetricsSingleton().getMetricContext();
     this.topologyCatalog.getInitComplete().countDown(); // unblock 
orchestration
     Properties flowProps = new Properties();
     flowProps.setProperty(ConfigurationKeys.FLOW_NAME_KEY, "flow0");

Reply via email to