This is an automated email from the ASF dual-hosted git repository.

arjun4084346 pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/gobblin.git


The following commit(s) were added to refs/heads/master by this push:
     new be12fd043 [GOBBLIN-1983] Remove Optionals to make DagManager, 
EventSubmitter, and TopologyCatalog required for GaaS operation (#3855)
be12fd043 is described below

commit be12fd043a129e4b6c6445cbe98afb050c69c62c
Author: Arjun Singh Bora <[email protected]>
AuthorDate: Fri Jan 12 00:34:14 2024 -0800

    [GOBBLIN-1983] Remove Optionals to make DagManager, EventSubmitter, and 
TopologyCatalog required for GaaS operation (#3855)
    
    Remove Optionals to make DagManager, EventSubmitter, and TopologyCatalog 
required for GaaS operation
---
 .../apache/gobblin/service/ServiceConfigKeys.java  |   3 -
 .../apache/gobblin/instrumented/Instrumented.java  |  12 +-
 .../gobblin/metrics/event/EventSubmitter.java      |   2 +-
 .../org/apache/gobblin/service/FlowStatusTest.java |   2 +-
 .../service/monitoring/FlowStatusGenerator.java    |   2 +-
 .../service/monitoring/JobStatusRetriever.java     |  37 ++---
 .../monitoring/FlowStatusGeneratorTest.java        |  11 +-
 .../modules/core/GobblinServiceConfiguration.java  |  10 --
 .../modules/core/GobblinServiceGuiceModule.java    |  35 ++---
 .../modules/core/GobblinServiceManager.java        |  41 +++---
 .../service/modules/orchestration/DagManager.java  | 125 +++++++---------
 .../modules/orchestration/DagManagerUtils.java     |   7 +-
 .../modules/orchestration/Orchestrator.java        | 158 ++++++---------------
 .../scheduler/GobblinServiceJobScheduler.java      |  54 +++----
 .../utils/FlowCompilationValidationHelper.java     |  26 ++--
 .../service/monitoring/FsJobStatusRetriever.java   |   5 +-
 .../monitoring/LocalFsJobStatusRetriever.java      |   3 +-
 .../monitoring/MysqlJobStatusRetriever.java        |   5 +-
 .../gobblin/service/GobblinServiceManagerTest.java |  34 +++--
 .../service/modules/core/GobblinServiceHATest.java |  11 +-
 .../modules/core/GobblinServiceRedirectTest.java   |  10 +-
 .../modules/orchestration/DagManagerFlowTest.java  |   6 +-
 .../modules/orchestration/DagManagerTest.java      |   2 +-
 .../modules/orchestration/OrchestratorTest.java    |  45 +++---
 .../scheduler/GobblinServiceJobSchedulerTest.java  |  21 ++-
 .../monitoring/FsJobStatusRetrieverTest.java       |  19 ++-
 .../FsJobStatusRetrieverTestWithoutDagManager.java | 116 ---------------
 .../service/monitoring/JobStatusRetrieverTest.java |   2 +-
 .../monitoring/MysqlJobStatusRetrieverTest.java    |  15 +-
 ...sqlJobStatusRetrieverTestWithoutDagManager.java | 128 -----------------
 30 files changed, 286 insertions(+), 661 deletions(-)

diff --git 
a/gobblin-api/src/main/java/org/apache/gobblin/service/ServiceConfigKeys.java 
b/gobblin-api/src/main/java/org/apache/gobblin/service/ServiceConfigKeys.java
index 21b32b58c..2cfb6a017 100644
--- 
a/gobblin-api/src/main/java/org/apache/gobblin/service/ServiceConfigKeys.java
+++ 
b/gobblin-api/src/main/java/org/apache/gobblin/service/ServiceConfigKeys.java
@@ -29,7 +29,6 @@ public class ServiceConfigKeys {
   public static final String GOBBLIN_ORCHESTRATOR_LISTENER_CLASS = 
"org.apache.gobblin.service.modules.orchestration.Orchestrator";
 
   // Gobblin Service Manager Keys
-  public static final String GOBBLIN_SERVICE_TOPOLOGY_CATALOG_ENABLED_KEY = 
GOBBLIN_SERVICE_PREFIX + "topologyCatalog.enabled";
   public static final String GOBBLIN_SERVICE_FLOW_CATALOG_ENABLED_KEY = 
GOBBLIN_SERVICE_PREFIX + "flowCatalog.enabled";
   public static final String GOBBLIN_SERVICE_SCHEDULER_ENABLED_KEY = 
GOBBLIN_SERVICE_PREFIX + "scheduler.enabled";
   public static final String GOBBLIN_SERVICE_INSTANCE_NAME = 
GOBBLIN_SERVICE_PREFIX + "instance.name";
@@ -37,8 +36,6 @@ public class ServiceConfigKeys {
   public static final String GOBBLIN_SERVICE_RESTLI_SERVER_ENABLED_KEY = 
GOBBLIN_SERVICE_PREFIX + "restliServer.enabled";
   public static final String GOBBLIN_SERVICE_TOPOLOGY_SPEC_FACTORY_ENABLED_KEY 
= GOBBLIN_SERVICE_PREFIX + "topologySpecFactory.enabled";
   public static final String GOBBLIN_SERVICE_GIT_CONFIG_MONITOR_ENABLED_KEY = 
GOBBLIN_SERVICE_PREFIX + "gitConfigMonitor.enabled";
-  public static final String GOBBLIN_SERVICE_DAG_MANAGER_ENABLED_KEY = 
GOBBLIN_SERVICE_PREFIX + "dagManager.enabled";
-  public static final boolean DEFAULT_GOBBLIN_SERVICE_DAG_MANAGER_ENABLED = 
false;
   public static final String GOBBLIN_SERVICE_JOB_STATUS_MONITOR_ENABLED_KEY = 
GOBBLIN_SERVICE_PREFIX + "jobStatusMonitor.enabled";
   public static final String GOBBLIN_SERVICE_WARM_STANDBY_ENABLED_KEY = 
GOBBLIN_SERVICE_PREFIX + "warmStandby.enabled";
   public static final String 
GOBBLIN_SERVICE_MULTI_ACTIVE_SCHEDULER_ENABLED_KEY = GOBBLIN_SERVICE_PREFIX + 
"multiActiveScheduler.enabled";
diff --git 
a/gobblin-core-base/src/main/java/org/apache/gobblin/instrumented/Instrumented.java
 
b/gobblin-core-base/src/main/java/org/apache/gobblin/instrumented/Instrumented.java
index f2aa10303..33f4b56a8 100644
--- 
a/gobblin-core-base/src/main/java/org/apache/gobblin/instrumented/Instrumented.java
+++ 
b/gobblin-core-base/src/main/java/org/apache/gobblin/instrumented/Instrumented.java
@@ -25,8 +25,6 @@ import java.util.Random;
 import java.util.UUID;
 import java.util.concurrent.TimeUnit;
 
-import javax.annotation.Nonnull;
-
 import org.apache.commons.lang3.StringUtils;
 
 import com.codahale.metrics.Meter;
@@ -41,6 +39,8 @@ import com.google.common.collect.Iterables;
 import com.google.common.collect.Lists;
 import com.google.common.io.Closer;
 
+import javax.annotation.Nonnull;
+
 import org.apache.gobblin.Constructs;
 import org.apache.gobblin.configuration.ConfigurationKeys;
 import org.apache.gobblin.configuration.State;
@@ -247,6 +247,10 @@ public class Instrumented implements Instrumentable, 
Closeable {
     });
   }
 
+  public static void updateTimer(Timer timer, final long duration, final 
TimeUnit unit) {
+    updateTimer(Optional.of(timer), duration, unit);
+  }
+
   /**
    * Marks a meter only if it is defined.
    * @param meter an Optional&lt;{@link com.codahale.metrics.Meter}&gt;
@@ -255,6 +259,10 @@ public class Instrumented implements Instrumentable, 
Closeable {
     markMeter(meter, 1);
   }
 
+  public static void markMeter(Meter meter) {
+    markMeter(Optional.of(meter), 1);
+  }
+
   /**
    * Marks a meter only if it is defined.
    * @param meter an Optional&lt;{@link com.codahale.metrics.Meter}&gt;
diff --git 
a/gobblin-metrics-libs/gobblin-metrics-base/src/main/java/org/apache/gobblin/metrics/event/EventSubmitter.java
 
b/gobblin-metrics-libs/gobblin-metrics-base/src/main/java/org/apache/gobblin/metrics/event/EventSubmitter.java
index 891f980b7..480542560 100644
--- 
a/gobblin-metrics-libs/gobblin-metrics-base/src/main/java/org/apache/gobblin/metrics/event/EventSubmitter.java
+++ 
b/gobblin-metrics-libs/gobblin-metrics-base/src/main/java/org/apache/gobblin/metrics/event/EventSubmitter.java
@@ -179,7 +179,7 @@ public class EventSubmitter {
       }
 
       // Timestamp is set by metric context.
-      this.metricContext.get().submitEvent(new GobblinTrackingEvent(0l, 
this.namespace, name, finalMetadata));
+      this.metricContext.get().submitEvent(new GobblinTrackingEvent(0L, 
this.namespace, name, finalMetadata));
     }
   }
 
diff --git 
a/gobblin-restli/gobblin-flow-config-service/gobblin-flow-config-service-client/src/test/java/org/apache/gobblin/service/FlowStatusTest.java
 
b/gobblin-restli/gobblin-flow-config-service/gobblin-flow-config-service-client/src/test/java/org/apache/gobblin/service/FlowStatusTest.java
index c507b0310..4888a2c23 100644
--- 
a/gobblin-restli/gobblin-flow-config-service/gobblin-flow-config-service-client/src/test/java/org/apache/gobblin/service/FlowStatusTest.java
+++ 
b/gobblin-restli/gobblin-flow-config-service/gobblin-flow-config-service-client/src/test/java/org/apache/gobblin/service/FlowStatusTest.java
@@ -54,7 +54,7 @@ public class FlowStatusTest {
   class TestJobStatusRetriever extends JobStatusRetriever {
 
     protected TestJobStatusRetriever(MultiContextIssueRepository 
issueRepository) {
-      super(ServiceConfigKeys.DEFAULT_GOBBLIN_SERVICE_DAG_MANAGER_ENABLED, 
issueRepository);
+      super(issueRepository);
     }
 
     @Override
diff --git 
a/gobblin-runtime/src/main/java/org/apache/gobblin/service/monitoring/FlowStatusGenerator.java
 
b/gobblin-runtime/src/main/java/org/apache/gobblin/service/monitoring/FlowStatusGenerator.java
index 8b991b985..0bf8b71ea 100644
--- 
a/gobblin-runtime/src/main/java/org/apache/gobblin/service/monitoring/FlowStatusGenerator.java
+++ 
b/gobblin-runtime/src/main/java/org/apache/gobblin/service/monitoring/FlowStatusGenerator.java
@@ -118,7 +118,7 @@ public class FlowStatusGenerator {
     List<JobStatus> jobStatuses = 
ImmutableList.copyOf(retainStatusOfAnyFlowOrJobMatchingTag(
         jobStatusRetriever.getJobStatusesForFlowExecution(flowName, flowGroup, 
flowExecutionId), tag));
     ExecutionStatus flowExecutionStatus =
-        
JobStatusRetriever.getFlowStatusFromJobStatuses(jobStatusRetriever.getDagManagerEnabled(),
 jobStatuses.iterator());
+        
JobStatusRetriever.getFlowStatusFromJobStatuses(jobStatuses.iterator());
     return jobStatuses.iterator().hasNext()
         ? new FlowStatus(flowName, flowGroup, flowExecutionId, 
jobStatuses.iterator(), flowExecutionStatus) : null;
   }
diff --git 
a/gobblin-runtime/src/main/java/org/apache/gobblin/service/monitoring/JobStatusRetriever.java
 
b/gobblin-runtime/src/main/java/org/apache/gobblin/service/monitoring/JobStatusRetriever.java
index e5845fe37..750834092 100644
--- 
a/gobblin-runtime/src/main/java/org/apache/gobblin/service/monitoring/JobStatusRetriever.java
+++ 
b/gobblin-runtime/src/main/java/org/apache/gobblin/service/monitoring/JobStatusRetriever.java
@@ -19,12 +19,10 @@ package org.apache.gobblin.service.monitoring;
 
 import java.util.Collections;
 import java.util.Comparator;
-import java.util.HashSet;
 import java.util.Iterator;
 import java.util.List;
 import java.util.Map;
 import java.util.Objects;
-import java.util.Set;
 import java.util.stream.Collectors;
 
 import com.google.common.base.Supplier;
@@ -60,15 +58,12 @@ public abstract class JobStatusRetriever implements 
LatestFlowExecutionIdTracker
 
   @Getter
   protected final MetricContext metricContext;
-  @Getter
-  protected final Boolean dagManagerEnabled;
 
   private final MultiContextIssueRepository issueRepository;
 
-  protected JobStatusRetriever(boolean dagManagerEnabled, 
MultiContextIssueRepository issueRepository) {
+  protected JobStatusRetriever(MultiContextIssueRepository issueRepository) {
     this.metricContext = 
Instrumented.getMetricContext(ConfigUtils.configToState(ConfigFactory.empty()), 
getClass());
     this.issueRepository = Objects.requireNonNull(issueRepository);
-    this.dagManagerEnabled = dagManagerEnabled;
   }
 
   public abstract Iterator<JobStatus> getJobStatusesForFlowExecution(String 
flowName, String flowGroup,
@@ -186,7 +181,7 @@ public abstract class JobStatusRetriever implements 
LatestFlowExecutionIdTracker
           
Comparator.comparing(this::getJobGroup).thenComparing(this::getJobName).thenComparing(this::getJobExecutionId)
       ).collect(Collectors.toList())));
       return new FlowStatus(exec.getFlowName(), exec.getFlowGroup(), 
exec.getFlowExecutionId(), jobStatuses.iterator(),
-            getFlowStatusFromJobStatuses(dagManagerEnabled, 
jobStatuses.iterator()));
+            getFlowStatusFromJobStatuses(jobStatuses.iterator()));
     }).collect(Collectors.toList());
   }
 
@@ -227,31 +222,15 @@ public abstract class JobStatusRetriever implements 
LatestFlowExecutionIdTracker
         && jobStatus.getJobName().equals(JobStatusRetriever.NA_KEY) && 
jobStatus.getJobGroup().equals(JobStatusRetriever.NA_KEY);
   }
 
-  public static ExecutionStatus getFlowStatusFromJobStatuses(boolean 
dagManagerEnabled, Iterator<JobStatus> jobStatusIterator) {
+  public static ExecutionStatus 
getFlowStatusFromJobStatuses(Iterator<JobStatus> jobStatusIterator) {
     ExecutionStatus flowExecutionStatus = ExecutionStatus.$UNKNOWN;
 
-    if (dagManagerEnabled) {
-      while (jobStatusIterator.hasNext()) {
-        JobStatus jobStatus = jobStatusIterator.next();
-        // Check if this is the flow status instead of a single job status
-        if (JobStatusRetriever.isFlowStatus(jobStatus)) {
-          flowExecutionStatus = 
ExecutionStatus.valueOf(jobStatus.getEventName());
-        }
-      }
-    } else {
-      Set<ExecutionStatus> jobStatuses = new HashSet<>();
-      while (jobStatusIterator.hasNext()) {
-        JobStatus jobStatus = jobStatusIterator.next();
-        // because in absence of DagManager we do not get all flow level 
events, we will ignore the flow level events
-        // we actually get and purely calculate flow status based on flow 
statuses.
-        if (!JobStatusRetriever.isFlowStatus(jobStatus)) {
-          jobStatuses.add(ExecutionStatus.valueOf(jobStatus.getEventName()));
-        }
+    while (jobStatusIterator.hasNext()) {
+      JobStatus jobStatus = jobStatusIterator.next();
+      // Check if this is the flow status instead of a single job status
+      if (JobStatusRetriever.isFlowStatus(jobStatus)) {
+        flowExecutionStatus = 
ExecutionStatus.valueOf(jobStatus.getEventName());
       }
-
-      List<ExecutionStatus> statusesInDescendingSalience = 
ImmutableList.of(ExecutionStatus.FAILED, ExecutionStatus.CANCELLED,
-          ExecutionStatus.RUNNING, ExecutionStatus.ORCHESTRATED, 
ExecutionStatus.COMPLETE);
-      flowExecutionStatus = 
statusesInDescendingSalience.stream().filter(jobStatuses::contains).findFirst().orElse(ExecutionStatus.$UNKNOWN);
     }
 
     return flowExecutionStatus;
diff --git 
a/gobblin-runtime/src/test/java/org/apache/gobblin/service/monitoring/FlowStatusGeneratorTest.java
 
b/gobblin-runtime/src/test/java/org/apache/gobblin/service/monitoring/FlowStatusGeneratorTest.java
index 1c5c534be..380177936 100644
--- 
a/gobblin-runtime/src/test/java/org/apache/gobblin/service/monitoring/FlowStatusGeneratorTest.java
+++ 
b/gobblin-runtime/src/test/java/org/apache/gobblin/service/monitoring/FlowStatusGeneratorTest.java
@@ -77,7 +77,6 @@ public class FlowStatusGeneratorTest {
         
.jobName(JobStatusRetriever.NA_KEY).jobGroup(JobStatusRetriever.NA_KEY).eventName("RUNNING").build();
     jobStatusIterator = Lists.newArrayList(jobStatus1, jobStatus2, jobStatus3, 
flowStatus).iterator();
     when(jobStatusRetriever.getJobStatusesForFlowExecution(flowName, 
flowGroup, flowExecutionId)).thenReturn(jobStatusIterator);
-    when(jobStatusRetriever.getDagManagerEnabled()).thenReturn(true);
     Assert.assertTrue(flowStatusGenerator.isFlowRunning(flowName, flowGroup));
   }
 
@@ -109,9 +108,9 @@ public class FlowStatusGeneratorTest {
 
     // IMPORTANT: result invariants to honor - ordered by ascending flowName, 
all of same flowName adjacent, therein descending flowExecutionId
     // NOTE: Three copies of FlowStatus are needed for repeated use, due to 
mutable, non-rewinding `Iterator FlowStatus.getJobStatusIterator`
-    FlowStatus flowStatus = createFlowStatus(flowGroup, flowName1, 
flowExecutionId1, Arrays.asList(f1Js0, f1Js1, f1Js2), jobStatusRetriever);
-    FlowStatus flowStatus2 = createFlowStatus(flowGroup, flowName1, 
flowExecutionId1, Arrays.asList(f1Js0, f1Js1, f1Js2), jobStatusRetriever);
-    FlowStatus flowStatus3 = createFlowStatus(flowGroup, flowName1, 
flowExecutionId1, Arrays.asList(f1Js0, f1Js1, f1Js2), jobStatusRetriever);
+    FlowStatus flowStatus = createFlowStatus(flowGroup, flowName1, 
flowExecutionId1, Arrays.asList(f1Js0, f1Js1, f1Js2));
+    FlowStatus flowStatus2 = createFlowStatus(flowGroup, flowName1, 
flowExecutionId1, Arrays.asList(f1Js0, f1Js1, f1Js2));
+    FlowStatus flowStatus3 = createFlowStatus(flowGroup, flowName1, 
flowExecutionId1, Arrays.asList(f1Js0, f1Js1, f1Js2));
     
Mockito.when(jobStatusRetriever.getFlowStatusesForFlowGroupExecutions("myFlowGroup",
 2))
         .thenReturn(Collections.singletonList(flowStatus), 
Collections.singletonList(flowStatus2), 
Collections.singletonList(flowStatus3)); // (for three invocations)
 
@@ -138,9 +137,9 @@ public class FlowStatusGeneratorTest {
         Arrays.asList(f0jsmDep2)));
   }
 
-  private FlowStatus createFlowStatus(String flowGroup, String flowName, long 
flowExecutionId, List<JobStatus> jobStatuses, JobStatusRetriever 
jobStatusRetriever) {
+  private FlowStatus createFlowStatus(String flowGroup, String flowName, long 
flowExecutionId, List<JobStatus> jobStatuses) {
     return new FlowStatus(flowName, flowGroup, flowExecutionId, 
jobStatuses.iterator(),
-        
JobStatusRetriever.getFlowStatusFromJobStatuses(jobStatusRetriever.getDagManagerEnabled(),
 jobStatuses.iterator()));
+        
JobStatusRetriever.getFlowStatusFromJobStatuses(jobStatuses.iterator()));
   }
 
   private JobStatus createFlowJobStatus(String flowGroup, String flowName, 
long flowExecutionId, ExecutionStatus status) {
diff --git 
a/gobblin-service/src/main/java/org/apache/gobblin/service/modules/core/GobblinServiceConfiguration.java
 
b/gobblin-service/src/main/java/org/apache/gobblin/service/modules/core/GobblinServiceConfiguration.java
index 03081b3ba..1998b541d 100644
--- 
a/gobblin-service/src/main/java/org/apache/gobblin/service/modules/core/GobblinServiceConfiguration.java
+++ 
b/gobblin-service/src/main/java/org/apache/gobblin/service/modules/core/GobblinServiceConfiguration.java
@@ -46,9 +46,6 @@ public class GobblinServiceConfiguration {
   @Getter
   private final boolean isMultiActiveSchedulerEnabled;
 
-  @Getter
-  private final boolean isTopologyCatalogEnabled;
-
   @Getter
   private final boolean isFlowCatalogEnabled;
 
@@ -64,9 +61,6 @@ public class GobblinServiceConfiguration {
   @Getter
   private final boolean isGitConfigMonitorEnabled;
 
-  @Getter
-  private final boolean isDagManagerEnabled;
-
   @Getter
   private final boolean isJobStatusMonitorEnabled;
 
@@ -93,8 +87,6 @@ public class GobblinServiceConfiguration {
     this.innerConfig = Objects.requireNonNull(config, "Config cannot be null");
     this.serviceWorkDir = serviceWorkDir;
 
-    isTopologyCatalogEnabled =
-        ConfigUtils.getBoolean(config, 
ServiceConfigKeys.GOBBLIN_SERVICE_TOPOLOGY_CATALOG_ENABLED_KEY, true);
     isFlowCatalogEnabled =
         ConfigUtils.getBoolean(config, 
ServiceConfigKeys.GOBBLIN_SERVICE_FLOW_CATALOG_ENABLED_KEY, true);
 
@@ -113,8 +105,6 @@ public class GobblinServiceConfiguration {
     this.isMultiActiveSchedulerEnabled = ConfigUtils.getBoolean(config, 
ServiceConfigKeys.GOBBLIN_SERVICE_MULTI_ACTIVE_SCHEDULER_ENABLED_KEY, false);
 
     this.isHelixManagerEnabled = 
config.hasPath(ServiceConfigKeys.ZK_CONNECTION_STRING_KEY);
-    this.isDagManagerEnabled =
-        ConfigUtils.getBoolean(config, 
ServiceConfigKeys.GOBBLIN_SERVICE_DAG_MANAGER_ENABLED_KEY, 
ServiceConfigKeys.DEFAULT_GOBBLIN_SERVICE_DAG_MANAGER_ENABLED);
     this.isJobStatusMonitorEnabled =
         ConfigUtils.getBoolean(config, 
ServiceConfigKeys.GOBBLIN_SERVICE_JOB_STATUS_MONITOR_ENABLED_KEY, true);
     this.isSchedulerEnabled =
diff --git 
a/gobblin-service/src/main/java/org/apache/gobblin/service/modules/core/GobblinServiceGuiceModule.java
 
b/gobblin-service/src/main/java/org/apache/gobblin/service/modules/core/GobblinServiceGuiceModule.java
index af0c3461a..e108dec14 100644
--- 
a/gobblin-service/src/main/java/org/apache/gobblin/service/modules/core/GobblinServiceGuiceModule.java
+++ 
b/gobblin-service/src/main/java/org/apache/gobblin/service/modules/core/GobblinServiceGuiceModule.java
@@ -19,18 +19,6 @@ package org.apache.gobblin.service.modules.core;
 
 import java.util.Objects;
 
-import org.apache.gobblin.runtime.api.DagActionStore;
-import org.apache.gobblin.runtime.api.MultiActiveLeaseArbiter;
-import org.apache.gobblin.runtime.api.MysqlMultiActiveLeaseArbiter;
-import org.apache.gobblin.runtime.dag_action_store.MysqlDagActionStore;
-import org.apache.gobblin.service.modules.orchestration.FlowTriggerHandler;
-import org.apache.gobblin.service.modules.orchestration.UserQuotaManager;
-import 
org.apache.gobblin.service.modules.restli.GobblinServiceFlowConfigV2ResourceHandlerWithWarmStandby;
-import 
org.apache.gobblin.service.modules.restli.GobblinServiceFlowExecutionResourceHandlerWithWarmStandby;
-import org.apache.gobblin.service.modules.utils.SharedFlowMetricsSingleton;
-import org.apache.gobblin.service.monitoring.DagActionStoreChangeMonitor;
-import 
org.apache.gobblin.service.monitoring.DagActionStoreChangeMonitorFactory;
-import org.apache.gobblin.service.monitoring.GitConfigMonitor;
 import org.apache.helix.HelixManager;
 import org.slf4j.Logger;
 import org.slf4j.LoggerFactory;
@@ -49,13 +37,18 @@ import com.typesafe.config.Config;
 import javax.inject.Singleton;
 
 import org.apache.gobblin.restli.EmbeddedRestliServer;
+import org.apache.gobblin.runtime.api.DagActionStore;
 import org.apache.gobblin.runtime.api.GobblinInstanceEnvironment;
+import org.apache.gobblin.runtime.api.MultiActiveLeaseArbiter;
+import org.apache.gobblin.runtime.api.MysqlMultiActiveLeaseArbiter;
+import org.apache.gobblin.runtime.dag_action_store.MysqlDagActionStore;
 import org.apache.gobblin.runtime.instance.StandardGobblinInstanceLauncher;
 import org.apache.gobblin.runtime.spec_catalog.FlowCatalog;
 import org.apache.gobblin.runtime.spec_catalog.TopologyCatalog;
 import 
org.apache.gobblin.runtime.troubleshooter.InMemoryMultiContextIssueRepository;
 import org.apache.gobblin.runtime.troubleshooter.JobIssueEventHandler;
 import org.apache.gobblin.runtime.troubleshooter.MultiContextIssueRepository;
+import org.apache.gobblin.runtime.util.InjectionNames;
 import org.apache.gobblin.scheduler.SchedulerService;
 import org.apache.gobblin.service.FlowConfigResourceLocalHandler;
 import org.apache.gobblin.service.FlowConfigV2ResourceLocalHandler;
@@ -75,17 +68,24 @@ import 
org.apache.gobblin.service.modules.db.ServiceDatabaseManager;
 import org.apache.gobblin.service.modules.db.ServiceDatabaseProvider;
 import org.apache.gobblin.service.modules.db.ServiceDatabaseProviderImpl;
 import org.apache.gobblin.service.modules.orchestration.DagManager;
+import org.apache.gobblin.service.modules.orchestration.FlowTriggerHandler;
 import org.apache.gobblin.service.modules.orchestration.Orchestrator;
+import org.apache.gobblin.service.modules.orchestration.UserQuotaManager;
 import 
org.apache.gobblin.service.modules.restli.GobblinServiceFlowConfigResourceHandler;
 import 
org.apache.gobblin.service.modules.restli.GobblinServiceFlowConfigV2ResourceHandler;
+import 
org.apache.gobblin.service.modules.restli.GobblinServiceFlowConfigV2ResourceHandlerWithWarmStandby;
 import 
org.apache.gobblin.service.modules.restli.GobblinServiceFlowExecutionResourceHandler;
+import 
org.apache.gobblin.service.modules.restli.GobblinServiceFlowExecutionResourceHandlerWithWarmStandby;
 import org.apache.gobblin.service.modules.scheduler.GobblinServiceJobScheduler;
 import org.apache.gobblin.service.modules.topology.TopologySpecFactory;
 import 
org.apache.gobblin.service.modules.troubleshooter.MySqlMultiContextIssueRepository;
 import org.apache.gobblin.service.modules.utils.HelixUtils;
-import org.apache.gobblin.runtime.util.InjectionNames;
+import org.apache.gobblin.service.modules.utils.SharedFlowMetricsSingleton;
+import org.apache.gobblin.service.monitoring.DagActionStoreChangeMonitor;
+import 
org.apache.gobblin.service.monitoring.DagActionStoreChangeMonitorFactory;
 import org.apache.gobblin.service.monitoring.FlowStatusGenerator;
 import org.apache.gobblin.service.monitoring.FsJobStatusRetriever;
+import org.apache.gobblin.service.monitoring.GitConfigMonitor;
 import org.apache.gobblin.service.monitoring.JobStatusRetriever;
 import org.apache.gobblin.service.monitoring.KafkaJobStatusMonitor;
 import org.apache.gobblin.service.monitoring.KafkaJobStatusMonitorFactory;
@@ -190,9 +190,7 @@ public class GobblinServiceGuiceModule implements Module {
     binder.bind(SharedFlowMetricsSingleton.class);
 
     OptionalBinder.newOptionalBinder(binder, TopologyCatalog.class);
-    if (serviceConfig.isTopologyCatalogEnabled()) {
-      binder.bind(TopologyCatalog.class);
-    }
+    binder.bind(TopologyCatalog.class);
 
     if (serviceConfig.isTopologySpecFactoryEnabled()) {
       binder.bind(TopologySpecFactory.class)
@@ -200,10 +198,7 @@ public class GobblinServiceGuiceModule implements Module {
               ServiceConfigKeys.TOPOLOGYSPEC_FACTORY_KEY, 
ServiceConfigKeys.DEFAULT_TOPOLOGY_SPEC_FACTORY));
     }
 
-    OptionalBinder.newOptionalBinder(binder, DagManager.class);
-    if (serviceConfig.isDagManagerEnabled()) {
-      binder.bind(DagManager.class);
-    }
+    binder.bind(DagManager.class);
 
     OptionalBinder.newOptionalBinder(binder, HelixManager.class);
     if (serviceConfig.isHelixManagerEnabled()) {
diff --git 
a/gobblin-service/src/main/java/org/apache/gobblin/service/modules/core/GobblinServiceManager.java
 
b/gobblin-service/src/main/java/org/apache/gobblin/service/modules/core/GobblinServiceManager.java
index 9212eae91..737c13876 100644
--- 
a/gobblin-service/src/main/java/org/apache/gobblin/service/modules/core/GobblinServiceManager.java
+++ 
b/gobblin-service/src/main/java/org/apache/gobblin/service/modules/core/GobblinServiceManager.java
@@ -31,9 +31,6 @@ import org.apache.commons.cli.HelpFormatter;
 import org.apache.commons.cli.Options;
 import org.apache.commons.cli.ParseException;
 import org.apache.commons.lang3.ObjectUtils;
-import org.apache.gobblin.service.modules.orchestration.UserQuotaManager;
-import org.apache.gobblin.service.monitoring.DagActionStoreChangeMonitor;
-import org.apache.gobblin.service.monitoring.GitConfigMonitor;
 import org.apache.hadoop.conf.Configuration;
 import org.apache.hadoop.fs.FileSystem;
 import org.apache.hadoop.fs.Path;
@@ -97,9 +94,12 @@ import org.apache.gobblin.service.ServiceConfigKeys;
 import org.apache.gobblin.service.modules.db.ServiceDatabaseManager;
 import org.apache.gobblin.service.modules.orchestration.DagManager;
 import org.apache.gobblin.service.modules.orchestration.Orchestrator;
+import org.apache.gobblin.service.modules.orchestration.UserQuotaManager;
 import org.apache.gobblin.service.modules.scheduler.GobblinServiceJobScheduler;
 import org.apache.gobblin.service.modules.topology.TopologySpecFactory;
+import org.apache.gobblin.service.monitoring.DagActionStoreChangeMonitor;
 import org.apache.gobblin.service.monitoring.FlowStatusGenerator;
+import org.apache.gobblin.service.monitoring.GitConfigMonitor;
 import org.apache.gobblin.service.monitoring.KafkaJobStatusMonitor;
 import org.apache.gobblin.service.monitoring.SpecStoreChangeMonitor;
 import org.apache.gobblin.util.ConfigUtils;
@@ -188,9 +188,10 @@ public class GobblinServiceManager implements 
ApplicationLauncher, StandardMetri
   @Inject(optional = true)
   protected GitConfigMonitor gitConfigMonitor;
 
-  @Inject(optional = true)
+  @Inject
   @Getter
-  protected DagManager dagManager;
+  @VisibleForTesting
+  public DagManager dagManager;
 
   @Inject(optional = true)
   protected KafkaJobStatusMonitor jobStatusMonitor;
@@ -317,12 +318,10 @@ public class GobblinServiceManager implements 
ApplicationLauncher, StandardMetri
 
         // TODO: surround by try/catch to disconnect from Helix and fail the 
leader transition if DagManager is not
         // transitioned properly
-        if (configuration.isDagManagerEnabled()) {
-          //Activate DagManager only if TopologyCatalog is initialized. If 
not; skip activation.
-          if (this.topologyCatalog.getInitComplete().getCount() == 0) {
-            this.dagManager.setActive(true);
-            this.eventBus.register(this.dagManager);
-          }
+        //Activate DagManager only if TopologyCatalog is initialized. If not; 
skip activation.
+        if (this.topologyCatalog.getInitComplete().getCount() == 0) {
+          this.dagManager.setActive(true);
+          this.eventBus.register(this.dagManager);
         }
 
         if (configuration.isOnlyAnnounceLeader()) {
@@ -346,10 +345,8 @@ public class GobblinServiceManager implements 
ApplicationLauncher, StandardMetri
           this.gitConfigMonitor.setActive(false);
         }
 
-        if (configuration.isDagManagerEnabled()) {
-          this.dagManager.setActive(false);
-          this.eventBus.unregister(this.dagManager);
-        }
+        this.dagManager.setActive(false);
+        this.eventBus.unregister(this.dagManager);
 
         if (configuration.isOnlyAnnounceLeader()) {
           this.d2Announcer.markDownServer();
@@ -359,9 +356,7 @@ public class GobblinServiceManager implements 
ApplicationLauncher, StandardMetri
   }
 
   private void registerServicesInLauncher(){
-    if (configuration.isTopologyCatalogEnabled()) {
-      this.serviceLauncher.addService(topologyCatalog);
-    }
+    this.serviceLauncher.addService(topologyCatalog);
 
     if (configuration.isFlowCatalogEnabled()) {
       this.serviceLauncher.addService(flowCatalog);
@@ -371,9 +366,7 @@ public class GobblinServiceManager implements 
ApplicationLauncher, StandardMetri
       }
     }
 
-    if (configuration.isDagManagerEnabled()) {
-      this.serviceLauncher.addService(dagManager);
-    }
+    this.serviceLauncher.addService(dagManager);
 
     this.serviceLauncher.addService(databaseManager);
     this.serviceLauncher.addService(issueRepository);
@@ -526,10 +519,8 @@ public class GobblinServiceManager implements 
ApplicationLauncher, StandardMetri
 
     //Activate the DagManager service, after the topologyCatalog has been 
initialized.
     if (!this.helixManager.isPresent() || this.helixManager.get().isLeader()){
-      if (configuration.isDagManagerEnabled()) {
-        this.dagManager.setActive(true);
-        this.eventBus.register(this.dagManager);
-      }
+      this.dagManager.setActive(true);
+      this.eventBus.register(this.dagManager);
     }
   }
 
diff --git 
a/gobblin-service/src/main/java/org/apache/gobblin/service/modules/orchestration/DagManager.java
 
b/gobblin-service/src/main/java/org/apache/gobblin/service/modules/orchestration/DagManager.java
index 3b422c55b..ca38eda0c 100644
--- 
a/gobblin-service/src/main/java/org/apache/gobblin/service/modules/orchestration/DagManager.java
+++ 
b/gobblin-service/src/main/java/org/apache/gobblin/service/modules/orchestration/DagManager.java
@@ -17,20 +17,6 @@
 
 package org.apache.gobblin.service.modules.orchestration;
 
-import com.codahale.metrics.Meter;
-import com.codahale.metrics.Timer;
-import com.google.common.annotations.VisibleForTesting;
-import com.google.common.base.Joiner;
-import com.google.common.base.Optional;
-import com.google.common.collect.Lists;
-import com.google.common.collect.Maps;
-import com.google.common.eventbus.Subscribe;
-import com.google.common.util.concurrent.AbstractIdleService;
-import com.google.inject.Inject;
-import com.google.inject.Singleton;
-import com.typesafe.config.Config;
-import com.typesafe.config.ConfigException;
-import com.typesafe.config.ConfigFactory;
 import java.io.IOException;
 import java.net.URI;
 import java.util.ArrayList;
@@ -51,9 +37,26 @@ import java.util.concurrent.LinkedBlockingDeque;
 import java.util.concurrent.ScheduledExecutorService;
 import java.util.concurrent.TimeUnit;
 import java.util.concurrent.atomic.AtomicLong;
+
+import com.codahale.metrics.Meter;
+import com.codahale.metrics.Timer;
+import com.google.common.annotations.VisibleForTesting;
+import com.google.common.base.Joiner;
+import com.google.common.base.Optional;
+import com.google.common.collect.Lists;
+import com.google.common.collect.Maps;
+import com.google.common.eventbus.Subscribe;
+import com.google.common.util.concurrent.AbstractIdleService;
+import com.google.inject.Inject;
+import com.google.inject.Singleton;
+import com.typesafe.config.Config;
+import com.typesafe.config.ConfigException;
+import com.typesafe.config.ConfigFactory;
+
 import lombok.EqualsAndHashCode;
 import lombok.Getter;
 import lombok.extern.slf4j.Slf4j;
+
 import org.apache.gobblin.annotation.Alpha;
 import org.apache.gobblin.configuration.ConfigurationKeys;
 import org.apache.gobblin.instrumented.Instrumented;
@@ -91,7 +94,7 @@ import static org.apache.gobblin.service.ExecutionStatus.*;
 
 /**
  * This class implements a manager to manage the life cycle of a {@link Dag}. 
A {@link Dag} is submitted to the
- * {@link DagManager} by the {@link Orchestrator#orchestrate(Spec)} method. On 
receiving a {@link Dag}, the
+ * {@link DagManager} by the {@link Orchestrator#orchestrate} method. On 
receiving a {@link Dag}, the
  * {@link DagManager} first persists the {@link Dag} to the {@link 
DagStateStore}, and then submits it to the specific
  * {@link DagManagerThread}'s {@link BlockingQueue} based on the 
flowExecutionId of the Flow.
  * This guarantees that each {@link Dag} received by the {@link DagManager} 
can be recovered in case of a leadership
@@ -194,10 +197,8 @@ public class DagManager extends AbstractIdleService {
   DagManagerThread[] dagManagerThreads;
 
   private final ScheduledExecutorService scheduledExecutorPool;
-  private final boolean instrumentationEnabled;
   private DagStateStore dagStateStore;
   private Map<URI, TopologySpec> topologySpecMap;
-  private int houseKeepingThreadInitialDelay = 
INITIAL_HOUSEKEEPING_THREAD_DELAY;
   @Getter
   private ScheduledExecutorService houseKeepingThreadPool;
 
@@ -215,7 +216,7 @@ public class DagManager extends AbstractIdleService {
   private final FlowCatalog flowCatalog;
   private final FlowCompilationValidationHelper 
flowCompilationValidationHelper;
   private final Config config;
-  private final Optional<EventSubmitter> eventSubmitter;
+  private final EventSubmitter eventSubmitter;
   private final long failedDagRetentionTime;
   private final DagManagerMetrics dagManagerMetrics;
 
@@ -226,9 +227,10 @@ public class DagManager extends AbstractIdleService {
 
   private volatile boolean isActive = false;
 
+  @Inject
   public DagManager(Config config, JobStatusRetriever jobStatusRetriever,
       SharedFlowMetricsSingleton sharedFlowMetricsSingleton, 
FlowStatusGenerator flowStatusGenerator,
-      FlowCatalog flowCatalog, boolean instrumentationEnabled) {
+      FlowCatalog flowCatalog) {
     this.config = config;
     this.numThreads = ConfigUtils.getInt(config, NUM_THREADS_KEY, 
DEFAULT_NUM_THREADS);
     this.runQueue = (BlockingQueue<Dag<JobExecutionPlan>>[]) 
initializeDagQueue(this.numThreads);
@@ -237,14 +239,8 @@ public class DagManager extends AbstractIdleService {
     this.scheduledExecutorPool = Executors.newScheduledThreadPool(numThreads);
     this.pollingInterval = ConfigUtils.getInt(config, 
JOB_STATUS_POLLING_INTERVAL_KEY, DEFAULT_JOB_STATUS_POLLING_INTERVAL);
     this.retentionPollingInterval = ConfigUtils.getInt(config, 
FAILED_DAG_POLLING_INTERVAL, DEFAULT_FAILED_DAG_POLLING_INTERVAL);
-    this.instrumentationEnabled = instrumentationEnabled;
-    MetricContext metricContext = null;
-    if (instrumentationEnabled) {
-      metricContext = 
Instrumented.getMetricContext(ConfigUtils.configToState(ConfigFactory.empty()), 
getClass());
-      this.eventSubmitter = Optional.of(new 
EventSubmitter.Builder(metricContext, "org.apache.gobblin.service").build());
-    } else {
-      this.eventSubmitter = Optional.absent();
-    }
+    MetricContext metricContext = 
Instrumented.getMetricContext(ConfigUtils.configToState(ConfigFactory.empty()), 
getClass());
+    this.eventSubmitter = new EventSubmitter.Builder(metricContext, 
"org.apache.gobblin.service").build();
     this.dagManagerMetrics = new DagManagerMetrics();
     TimeUnit jobStartTimeUnit = TimeUnit.valueOf(ConfigUtils.getString(config, 
JOB_START_SLA_UNITS, 
ConfigurationKeys.FALLBACK_GOBBLIN_JOB_START_SLA_TIME_UNIT));
     this.defaultJobStartSlaTimeMillis = 
jobStartTimeUnit.toMillis(ConfigUtils.getLong(config, JOB_START_SLA_TIME, 
ConfigurationKeys.FALLBACK_GOBBLIN_JOB_START_SLA_TIME));
@@ -284,13 +280,6 @@ public class DagManager extends AbstractIdleService {
     return queue;
   }
 
-  @Inject
-  public DagManager(Config config, JobStatusRetriever jobStatusRetriever,
-      SharedFlowMetricsSingleton sharedFlowMetricsSingleton, 
FlowStatusGenerator flowStatusGenerator,
-      FlowCatalog flowCatalog) {
-    this(config, jobStatusRetriever, sharedFlowMetricsSingleton, 
flowStatusGenerator, flowCatalog, true);
-  }
-
   /** Do Nothing on service startup. Scheduling of {@link DagManagerThread}s 
and loading of any {@link Dag}s is done
    * during leadership change.
    */
@@ -340,13 +329,11 @@ public class DagManager extends AbstractIdleService {
   }
 
   private void submitEventsAndSetStatus(Dag<JobExecutionPlan> dag) {
-    if (this.eventSubmitter.isPresent()) {
-      for (DagNode<JobExecutionPlan> dagNode : dag.getNodes()) {
-        JobExecutionPlan jobExecutionPlan = 
DagManagerUtils.getJobExecutionPlan(dagNode);
-        Map<String, String> jobMetadata = 
TimingEventUtils.getJobMetadata(Maps.newHashMap(), jobExecutionPlan);
-        
this.eventSubmitter.get().getTimingEvent(TimingEvent.LauncherTimings.JOB_PENDING).stop(jobMetadata);
-        jobExecutionPlan.setExecutionStatus(PENDING);
-      }
+    for (DagNode<JobExecutionPlan> dagNode : dag.getNodes()) {
+      JobExecutionPlan jobExecutionPlan = 
DagManagerUtils.getJobExecutionPlan(dagNode);
+      Map<String, String> jobMetadata = 
TimingEventUtils.getJobMetadata(Maps.newHashMap(), jobExecutionPlan);
+      new TimingEvent(eventSubmitter, 
TimingEvent.LauncherTimings.JOB_PENDING).stop(jobMetadata);
+      jobExecutionPlan.setExecutionStatus(PENDING);
     }
   }
 
@@ -448,7 +435,7 @@ public class DagManager extends AbstractIdleService {
         this.dagManagerThreads = new DagManagerThread[numThreads];
         for (int i = 0; i < numThreads; i++) {
           DagManagerThread dagManagerThread = new 
DagManagerThread(jobStatusRetriever, dagStateStore, failedDagStateStore, 
dagActionStore,
-              runQueue[i], cancelQueue[i], resumeQueue[i], 
instrumentationEnabled, failedDagIds, this.dagManagerMetrics,
+              runQueue[i], cancelQueue[i], resumeQueue[i], failedDagIds, 
this.dagManagerMetrics,
               this.defaultJobStartSlaTimeMillis, quotaManager, i);
           this.dagManagerThreads[i] = dagManagerThread;
           this.scheduledExecutorPool.scheduleAtFixedRate(dagManagerThread, 0, 
this.pollingInterval, TimeUnit.SECONDS);
@@ -457,7 +444,7 @@ public class DagManager extends AbstractIdleService {
         
this.scheduledExecutorPool.scheduleAtFixedRate(failedDagRetentionThread, 0, 
retentionPollingInterval, TimeUnit.MINUTES);
         loadDagFromDagStateStore();
         this.houseKeepingThreadPool = 
Executors.newSingleThreadScheduledExecutor();
-        for (int delay = houseKeepingThreadInitialDelay; delay < 
MAX_HOUSEKEEPING_THREAD_DELAY; delay *= 2) {
+        for (int delay = INITIAL_HOUSEKEEPING_THREAD_DELAY; delay < 
MAX_HOUSEKEEPING_THREAD_DELAY; delay *= 2) {
           this.houseKeepingThreadPool.schedule(() -> {
             try {
               loadDagFromDagStateStore();
@@ -510,8 +497,8 @@ public class DagManager extends AbstractIdleService {
     final Map<String, Long> dagToSLA = new HashMap<>();
     private final MetricContext metricContext;
     private final Set<String> dagIdstoClean = new HashSet<>();
-    private final Optional<EventSubmitter> eventSubmitter;
-    private final Optional<Timer> jobStatusPolledTimer;
+    private final EventSubmitter eventSubmitter;
+    private final Timer jobStatusPolledTimer;
     private final AtomicLong orchestrationDelay = new AtomicLong(0);
     private final DagManagerMetrics dagManagerMetrics;
     private final UserQuotaManager quotaManager;
@@ -523,13 +510,13 @@ public class DagManager extends AbstractIdleService {
     private final BlockingQueue<DagId> resumeQueue;
     private final Long defaultJobStartSlaTimeMillis;
     private final Optional<DagActionStore> dagActionStore;
-    private final Optional<Meter> dagManagerThreadHeartbeat;
+    private final Meter dagManagerThreadHeartbeat;
     /**
      * Constructor.
      */
     DagManagerThread(JobStatusRetriever jobStatusRetriever, DagStateStore 
dagStateStore, DagStateStore failedDagStateStore,
         Optional<DagActionStore> dagActionStore, 
BlockingQueue<Dag<JobExecutionPlan>> queue, BlockingQueue<DagId> cancelQueue,
-        BlockingQueue<DagId> resumeQueue, boolean instrumentationEnabled, 
Set<String> failedDagIds, DagManagerMetrics dagManagerMetrics,
+        BlockingQueue<DagId> resumeQueue, Set<String> failedDagIds, 
DagManagerMetrics dagManagerMetrics,
         Long defaultJobStartSla, UserQuotaManager quotaManager, int 
dagMangerThreadId) {
       this.jobStatusRetriever = jobStatusRetriever;
       this.dagStateStore = dagStateStore;
@@ -542,21 +529,13 @@ public class DagManager extends AbstractIdleService {
       this.defaultJobStartSlaTimeMillis = defaultJobStartSla;
       this.quotaManager = quotaManager;
       this.dagActionStore = dagActionStore;
-
-      if (instrumentationEnabled) {
-        this.metricContext = 
Instrumented.getMetricContext(ConfigUtils.configToState(ConfigFactory.empty()), 
getClass());
-        this.eventSubmitter = Optional.of(new 
EventSubmitter.Builder(this.metricContext, 
"org.apache.gobblin.service").build());
-        this.jobStatusPolledTimer = 
Optional.of(this.metricContext.timer(ServiceMetricNames.JOB_STATUS_POLLED_TIMER));
-        ContextAwareGauge<Long> orchestrationDelayMetric = 
metricContext.newContextAwareGauge(ServiceMetricNames.FLOW_ORCHESTRATION_DELAY,
-            orchestrationDelay::get);
-        this.metricContext.register(orchestrationDelayMetric);
-        this.dagManagerThreadHeartbeat = 
Optional.of(this.metricContext.contextAwareMeter(String.format(DAG_MANAGER_HEARTBEAT,
 dagMangerThreadId)));
-      } else {
-        this.metricContext = null;
-        this.eventSubmitter = Optional.absent();
-        this.jobStatusPolledTimer = Optional.absent();
-        this.dagManagerThreadHeartbeat = Optional.absent();
-      }
+      this.metricContext = 
Instrumented.getMetricContext(ConfigUtils.configToState(ConfigFactory.empty()), 
getClass());
+      this.eventSubmitter = new EventSubmitter.Builder(this.metricContext, 
"org.apache.gobblin.service").build();
+      this.jobStatusPolledTimer = 
this.metricContext.timer(ServiceMetricNames.JOB_STATUS_POLLED_TIMER);
+      ContextAwareGauge<Long> orchestrationDelayMetric = 
metricContext.newContextAwareGauge(ServiceMetricNames.FLOW_ORCHESTRATION_DELAY,
+          orchestrationDelay::get);
+      this.metricContext.register(orchestrationDelayMetric);
+      this.dagManagerThreadHeartbeat = 
this.metricContext.contextAwareMeter(String.format(DAG_MANAGER_HEARTBEAT, 
dagMangerThreadId));
     }
 
     /**
@@ -645,7 +624,7 @@ public class DagManager extends AbstractIdleService {
           node.getValue().setCurrentAttempts(0);
           DagManagerUtils.incrementJobGeneration(node);
           Map<String, String> jobMetadata = 
TimingEventUtils.getJobMetadata(Maps.newHashMap(), node.getValue());
-          
this.eventSubmitter.get().getTimingEvent(TimingEvent.LauncherTimings.JOB_PENDING_RESUME).stop(jobMetadata);
+          
this.eventSubmitter.getTimingEvent(TimingEvent.LauncherTimings.JOB_PENDING_RESUME).stop(jobMetadata);
         }
 
         // Set flowStartTime so that flow SLA will be based on current time 
instead of original flow
@@ -690,7 +669,7 @@ public class DagManager extends AbstractIdleService {
 
     /**
      * Cancels the dag and sends a cancellation tracking event.
-     * @param dagToCancel dag node to cancel
+     * @param dagId dag node to cancel
      * @throws ExecutionException executionException
      * @throws InterruptedException interruptedException
      */
@@ -731,11 +710,9 @@ public class DagManager extends AbstractIdleService {
     }
 
     private void sendCancellationEvent(JobExecutionPlan jobExecutionPlan) {
-      if (this.eventSubmitter.isPresent()) {
-        Map<String, String> jobMetadata = 
TimingEventUtils.getJobMetadata(Maps.newHashMap(), jobExecutionPlan);
-        
this.eventSubmitter.get().getTimingEvent(TimingEvent.LauncherTimings.JOB_CANCEL).stop(jobMetadata);
-        jobExecutionPlan.setExecutionStatus(CANCELLED);
-      }
+      Map<String, String> jobMetadata = 
TimingEventUtils.getJobMetadata(Maps.newHashMap(), jobExecutionPlan);
+      
this.eventSubmitter.getTimingEvent(TimingEvent.LauncherTimings.JOB_CANCEL).stop(jobMetadata);
+      jobExecutionPlan.setExecutionStatus(CANCELLED);
     }
 
     /**
@@ -1040,8 +1017,7 @@ public class DagManager extends AbstractIdleService {
         quotaManager.checkQuota(Collections.singleton(dagNode));
 
         producer = DagManagerUtils.getSpecProducer(dagNode);
-        TimingEvent jobOrchestrationTimer = this.eventSubmitter.isPresent() ? 
this.eventSubmitter.get().
-            getTimingEvent(TimingEvent.LauncherTimings.JOB_ORCHESTRATED) : 
null;
+        TimingEvent jobOrchestrationTimer = new 
TimingEvent(this.eventSubmitter, TimingEvent.LauncherTimings.JOB_ORCHESTRATED);
 
         // Increment job count before submitting the job onto the spec 
producer, in case that throws an exception.
         // By this point the quota is allocated, so it's imperative to 
increment as missing would introduce the potential to decrement below zero upon 
quota release.
@@ -1065,14 +1041,11 @@ public class DagManager extends AbstractIdleService {
         jobMetadata.put(TimingEvent.METADATA_MESSAGE, 
producer.getExecutionLink(addSpecFuture, specExecutorUri));
         // Add serialized job properties as part of the orchestrated job event 
metadata
         jobMetadata.put(JobExecutionPlan.JOB_PROPS_KEY, 
dagNode.getValue().toString());
-        if (jobOrchestrationTimer != null) {
-          jobOrchestrationTimer.stop(jobMetadata);
-        }
+        jobOrchestrationTimer.stop(jobMetadata);
         log.info("Orchestrated job: {} on Executor: {}", 
DagManagerUtils.getFullyQualifiedJobName(dagNode), specExecutorUri);
         this.dagManagerMetrics.incrementJobsSentToExecutor(dagNode);
       } catch (Exception e) {
-        TimingEvent jobFailedTimer = this.eventSubmitter.isPresent() ? 
this.eventSubmitter.get().
-            getTimingEvent(TimingEvent.LauncherTimings.JOB_FAILED) : null;
+        TimingEvent jobFailedTimer = 
this.eventSubmitter.getTimingEvent(TimingEvent.LauncherTimings.JOB_FAILED);
         String message = "Cannot submit job " + 
DagManagerUtils.getFullyQualifiedJobName(dagNode) + " on executor " + 
specExecutorUri;
         log.error(message, e);
         jobMetadata.put(TimingEvent.METADATA_MESSAGE, message + " due to " + 
e.getMessage());
diff --git 
a/gobblin-service/src/main/java/org/apache/gobblin/service/modules/orchestration/DagManagerUtils.java
 
b/gobblin-service/src/main/java/org/apache/gobblin/service/modules/orchestration/DagManagerUtils.java
index 152dca00d..97201a4aa 100644
--- 
a/gobblin-service/src/main/java/org/apache/gobblin/service/modules/orchestration/DagManagerUtils.java
+++ 
b/gobblin-service/src/main/java/org/apache/gobblin/service/modules/orchestration/DagManagerUtils.java
@@ -28,7 +28,6 @@ import java.util.concurrent.ExecutionException;
 import java.util.concurrent.TimeUnit;
 import java.util.stream.Collectors;
 
-import com.google.common.base.Optional;
 import com.google.common.collect.ImmutableMap;
 import com.google.common.collect.Lists;
 import com.typesafe.config.Config;
@@ -332,8 +331,8 @@ public class DagManagerUtils {
     return dagNode.getValue().getSpecExecutor().getUri().toString();
   }
 
-  static void emitFlowEvent(Optional<EventSubmitter> eventSubmitter, 
Dag<JobExecutionPlan> dag, String flowEvent) {
-    if (eventSubmitter.isPresent() && !dag.isEmpty()) {
+  static void emitFlowEvent(EventSubmitter eventSubmitter, 
Dag<JobExecutionPlan> dag, String flowEvent) {
+    if (!dag.isEmpty()) {
       // Every dag node will contain the same flow metadata
       Config config = getDagJobConfig(dag);
       Map<String, String> flowMetadata = 
TimingEventUtils.getFlowMetadata(config);
@@ -345,7 +344,7 @@ public class DagManagerUtils {
         flowMetadata.put(TimingEvent.METADATA_MESSAGE, dag.getMessage());
       }
 
-      eventSubmitter.get().getTimingEvent(flowEvent).stop(flowMetadata);
+      eventSubmitter.getTimingEvent(flowEvent).stop(flowMetadata);
     }
   }
 
diff --git 
a/gobblin-service/src/main/java/org/apache/gobblin/service/modules/orchestration/Orchestrator.java
 
b/gobblin-service/src/main/java/org/apache/gobblin/service/modules/orchestration/Orchestrator.java
index ce7931c82..85c8248fb 100644
--- 
a/gobblin-service/src/main/java/org/apache/gobblin/service/modules/orchestration/Orchestrator.java
+++ 
b/gobblin-service/src/main/java/org/apache/gobblin/service/modules/orchestration/Orchestrator.java
@@ -17,12 +17,6 @@
 
 package org.apache.gobblin.service.modules.orchestration;
 
-import com.codahale.metrics.Counter;
-import com.codahale.metrics.Meter;
-import com.codahale.metrics.Timer;
-import com.google.common.annotations.VisibleForTesting;
-import com.google.common.base.Optional;
-import com.typesafe.config.Config;
 import java.io.IOException;
 import java.lang.reflect.InvocationTargetException;
 import java.net.URI;
@@ -31,12 +25,24 @@ import java.util.List;
 import java.util.Map;
 import java.util.Properties;
 import java.util.concurrent.TimeUnit;
+
+import org.apache.commons.lang3.reflect.ConstructorUtils;
+import org.slf4j.Logger;
+import org.slf4j.LoggerFactory;
+
+import com.codahale.metrics.Counter;
+import com.codahale.metrics.Meter;
+import com.codahale.metrics.Timer;
+import com.google.common.annotations.VisibleForTesting;
+import com.google.common.base.Optional;
+import com.typesafe.config.Config;
+
 import javax.annotation.Nonnull;
 import javax.inject.Inject;
 import javax.inject.Singleton;
 import lombok.Getter;
 import lombok.Setter;
-import org.apache.commons.lang3.reflect.ConstructorUtils;
+
 import org.apache.gobblin.annotation.Alpha;
 import org.apache.gobblin.configuration.ConfigurationKeys;
 import org.apache.gobblin.configuration.State;
@@ -68,8 +74,6 @@ import 
org.apache.gobblin.service.monitoring.FlowStatusGenerator;
 import org.apache.gobblin.util.ClassAliasResolver;
 import org.apache.gobblin.util.ConfigUtils;
 import org.apache.gobblin.util.reflection.GobblinConstructorUtils;
-import org.slf4j.Logger;
-import org.slf4j.LoggerFactory;
 
 
 /**
@@ -83,20 +87,20 @@ public class Orchestrator implements SpecCatalogListener, 
Instrumentable {
 
   protected final Logger _log;
   protected final SpecCompiler specCompiler;
-  protected final Optional<TopologyCatalog> topologyCatalog;
-  protected final Optional<DagManager> dagManager;
+  protected final TopologyCatalog topologyCatalog;
+  protected final DagManager dagManager;
 
   protected final MetricContext metricContext;
 
-  protected final Optional<EventSubmitter> eventSubmitter;
+  protected final EventSubmitter eventSubmitter;
   private final boolean isFlowConcurrencyEnabled;
   @Getter
-  private Optional<Meter> flowOrchestrationSuccessFulMeter;
+  private Meter flowOrchestrationSuccessFulMeter;
   @Getter
-  private Optional<Meter> flowOrchestrationFailedMeter;
+  private Meter flowOrchestrationFailedMeter;
   @Getter
-  private Optional<Timer> flowOrchestrationTimer;
-  private Optional<Counter> flowFailedForwardToDagManagerCounter;
+  private Timer flowOrchestrationTimer;
+  private Counter flowFailedForwardToDagManagerCounter;
   @Setter
   private FlowStatusGenerator flowStatusGenerator;
 
@@ -109,10 +113,10 @@ public class Orchestrator implements SpecCatalogListener, 
Instrumentable {
 
   private final ClassAliasResolver<SpecCompiler> aliasResolver;
 
-  public Orchestrator(Config config, Optional<TopologyCatalog> 
topologyCatalog, Optional<DagManager> dagManager,
-      Optional<Logger> log, FlowStatusGenerator flowStatusGenerator, boolean 
instrumentationEnabled,
-      Optional<FlowTriggerHandler> flowTriggerHandler, 
SharedFlowMetricsSingleton sharedFlowMetricsSingleton,
-      Optional<FlowCatalog> flowCatalog) {
+  @Inject
+  public Orchestrator(Config config, TopologyCatalog topologyCatalog, 
DagManager dagManager,
+      Optional<Logger> log, FlowStatusGenerator flowStatusGenerator, 
Optional<FlowTriggerHandler> flowTriggerHandler,
+      SharedFlowMetricsSingleton sharedFlowMetricsSingleton, 
Optional<FlowCatalog> flowCatalog) {
     _log = log.isPresent() ? log.get() : LoggerFactory.getLogger(getClass());
     this.aliasResolver = new ClassAliasResolver<>(SpecCompiler.class);
     this.topologyCatalog = topologyCatalog;
@@ -135,25 +139,15 @@ public class Orchestrator implements SpecCatalogListener, 
Instrumentable {
     }
 
     //At this point, the TopologySpecMap is initialized by the SpecCompiler. 
Pass the TopologySpecMap to the DagManager.
-    if (this.dagManager.isPresent()) {
-      
this.dagManager.get().setTopologySpecMap(getSpecCompiler().getTopologySpecMap());
-    }
+    this.dagManager.setTopologySpecMap(getSpecCompiler().getTopologySpecMap());
+
+    this.metricContext = 
Instrumented.getMetricContext(ConfigUtils.configToState(config), 
this.specCompiler.getClass());
+    this.flowOrchestrationSuccessFulMeter = 
this.metricContext.meter(ServiceMetricNames.FLOW_ORCHESTRATION_SUCCESSFUL_METER);
+    this.flowOrchestrationFailedMeter = 
this.metricContext.meter(ServiceMetricNames.FLOW_ORCHESTRATION_FAILED_METER);
+    this.flowOrchestrationTimer = 
this.metricContext.timer(ServiceMetricNames.FLOW_ORCHESTRATION_TIMER);
+    this.flowFailedForwardToDagManagerCounter = 
this.metricContext.counter(ServiceMetricNames.FLOW_FAILED_FORWARD_TO_DAG_MANAGER_COUNT);
+    this.eventSubmitter = new EventSubmitter.Builder(this.metricContext, 
"org.apache.gobblin.service").build();
 
-    if (instrumentationEnabled) {
-      this.metricContext = 
Instrumented.getMetricContext(ConfigUtils.configToState(config), 
this.specCompiler.getClass());
-      this.flowOrchestrationSuccessFulMeter = 
Optional.of(this.metricContext.meter(ServiceMetricNames.FLOW_ORCHESTRATION_SUCCESSFUL_METER));
-      this.flowOrchestrationFailedMeter = 
Optional.of(this.metricContext.meter(ServiceMetricNames.FLOW_ORCHESTRATION_FAILED_METER));
-      this.flowOrchestrationTimer = 
Optional.of(this.metricContext.timer(ServiceMetricNames.FLOW_ORCHESTRATION_TIMER));
-      this.flowFailedForwardToDagManagerCounter = 
Optional.of(this.metricContext.counter(ServiceMetricNames.FLOW_FAILED_FORWARD_TO_DAG_MANAGER_COUNT));
-      this.eventSubmitter = Optional.of(new 
EventSubmitter.Builder(this.metricContext, 
"org.apache.gobblin.service").build());
-    } else {
-      this.metricContext = null;
-      this.flowOrchestrationSuccessFulMeter = Optional.absent();
-      this.flowOrchestrationFailedMeter = Optional.absent();
-      this.flowOrchestrationTimer = Optional.absent();
-      this.flowFailedForwardToDagManagerCounter = Optional.absent();
-      this.eventSubmitter = Optional.absent();
-    }
     this.isFlowConcurrencyEnabled = ConfigUtils.getBoolean(config, 
ServiceConfigKeys.FLOW_CONCURRENCY_ALLOWED,
         ServiceConfigKeys.DEFAULT_FLOW_CONCURRENCY_ALLOWED);
     quotaManager = 
GobblinConstructorUtils.invokeConstructor(UserQuotaManager.class,
@@ -163,15 +157,6 @@ public class Orchestrator implements SpecCatalogListener, 
Instrumentable {
         quotaManager, eventSubmitter, flowStatusGenerator, 
isFlowConcurrencyEnabled);
   }
 
-  @Inject
-  public Orchestrator(Config config, FlowStatusGenerator flowStatusGenerator, 
Optional<TopologyCatalog> topologyCatalog,
-      Optional<DagManager> dagManager, Optional<Logger> log, 
Optional<FlowTriggerHandler> flowTriggerHandler,
-      SharedFlowMetricsSingleton sharedFlowMetricsSingleton, 
Optional<FlowCatalog> flowCatalog) {
-    this(config, topologyCatalog, dagManager, log, flowStatusGenerator, true, 
flowTriggerHandler,
-        sharedFlowMetricsSingleton, flowCatalog);
-  }
-
-
   @VisibleForTesting
   public SpecCompiler getSpecCompiler() {
     return this.specCompiler;
@@ -199,9 +184,7 @@ public class Orchestrator implements SpecCatalogListener, 
Instrumentable {
   public void onDeleteSpec(URI deletedSpecURI, String deletedSpecVersion, 
Properties headers) {
     _log.info("Spec deletion detected: " + deletedSpecURI + "/" + 
deletedSpecVersion);
 
-    if (topologyCatalog.isPresent()) {
-      this.specCompiler.onDeleteSpec(deletedSpecURI, deletedSpecVersion, 
headers);
-    }
+    this.specCompiler.onDeleteSpec(deletedSpecURI, deletedSpecVersion, 
headers);
   }
 
   /** {@inheritDoc} */
@@ -226,13 +209,12 @@ public class Orchestrator implements SpecCatalogListener, 
Instrumentable {
     } catch (Exception e) {
       _log.error("Failed to update Spec: " + updatedSpec, e);
     }
-
   }
 
   public void orchestrate(Spec spec, Properties jobProps, long 
triggerTimestampMillis, boolean isReminderEvent)
       throws Exception {
     // Add below waiting because TopologyCatalog and FlowCatalog service can 
be launched at the same time
-    this.topologyCatalog.get().getInitComplete().await();
+    this.topologyCatalog.getInitComplete().await();
 
     //Wait for the SpecCompiler to become healthy.
     this.getSpecCompiler().awaitHealthy();
@@ -261,9 +243,7 @@ public class Orchestrator implements SpecCatalogListener, 
Instrumentable {
               jobProps.getProperty(ConfigurationKeys.JOB_NAME_KEY));
           flowMetadata.put(TimingEvent.METADATA_MESSAGE, "Flow orchestration 
skipped because no trigger timestamp "
               + "associated with flow action.");
-          if (this.eventSubmitter.isPresent()) {
-            new TimingEvent(this.eventSubmitter.get(), 
TimingEvent.FlowTimings.FLOW_FAILED).stop(flowMetadata);
-          }
+          new TimingEvent(this.eventSubmitter, 
TimingEvent.FlowTimings.FLOW_FAILED).stop(flowMetadata);
           return;
         }
 
@@ -273,8 +253,7 @@ public class Orchestrator implements SpecCatalogListener, 
Instrumentable {
         _log.info("Multi-active scheduler finished handling trigger event: 
[{}, is: {}, triggerEventTimestamp: {}]",
             flowAction, isReminderEvent ? "reminder" : "original", 
triggerTimestampMillis);
       } else {
-        Optional<TimingEvent> flowCompilationTimer =
-          this.eventSubmitter.transform(submitter -> new 
TimingEvent(submitter, TimingEvent.FlowTimings.FLOW_COMPILED));
+        TimingEvent flowCompilationTimer = new 
TimingEvent(this.eventSubmitter, TimingEvent.FlowTimings.FLOW_COMPILED);
         Optional<Dag<JobExecutionPlan>> compiledDagOptional =
             
this.flowCompilationValidationHelper.validateAndHandleConcurrentExecution(flowConfig,
 spec, flowGroup,
                 flowName);
@@ -284,7 +263,7 @@ public class Orchestrator implements SpecCatalogListener, 
Instrumentable {
           return;
         }
         Dag<JobExecutionPlan> compiledDag = compiledDagOptional.get();
-        if (compiledDag == null || compiledDag.isEmpty()) {
+        if (compiledDag.isEmpty()) {
           
FlowCompilationValidationHelper.populateFlowCompilationFailedEventMessage(eventSubmitter,
 spec, flowMetadata);
           Instrumented.markMeter(this.flowOrchestrationFailedMeter);
           
sharedFlowMetricsSingleton.conditionallyUpdateFlowGaugeSpecState(spec,
@@ -296,47 +275,10 @@ public class Orchestrator implements SpecCatalogListener, 
Instrumentable {
             SharedFlowMetricsSingleton.CompiledState.SUCCESSFUL);
 
         
FlowCompilationValidationHelper.addFlowExecutionIdIfAbsent(flowMetadata, 
compiledDag);
-        if (flowCompilationTimer.isPresent()) {
-          flowCompilationTimer.get().stop(flowMetadata);
-        }
+        flowCompilationTimer.stop(flowMetadata);
 
         // Depending on if DagManager is present, handle execution
-        if (this.dagManager.isPresent()) {
-          submitFlowToDagManager(flowSpec, compiledDag);
-        } else {
-          // Schedule all compiled JobSpecs on their respective Executor
-          for (Dag.DagNode<JobExecutionPlan> dagNode : compiledDag.getNodes()) 
{
-            DagManagerUtils.incrementJobAttempt(dagNode);
-            JobExecutionPlan jobExecutionPlan = dagNode.getValue();
-
-            // Run this spec on selected executor
-            SpecProducer producer = null;
-            try {
-              producer = 
jobExecutionPlan.getSpecExecutor().getProducer().get();
-              Spec jobSpec = jobExecutionPlan.getJobSpec();
-
-              if (!((JobSpec) 
jobSpec).getConfig().hasPath(ConfigurationKeys.FLOW_EXECUTION_ID_KEY)) {
-                _log.warn("JobSpec does not contain flowExecutionId: {}", 
jobSpec);
-              }
-
-              Map<String, String> jobMetadata = 
TimingEventUtils.getJobMetadata(flowMetadata, jobExecutionPlan);
-              _log.info(String.format("Going to orchestrate JobSpec: %s on 
Executor: %s", jobSpec, producer));
-
-              Optional<TimingEvent> jobOrchestrationTimer = 
this.eventSubmitter.transform(
-                  submitter -> new TimingEvent(submitter, 
TimingEvent.LauncherTimings.JOB_ORCHESTRATED));
-
-              producer.addSpec(jobSpec);
-
-              if (jobOrchestrationTimer.isPresent()) {
-                jobOrchestrationTimer.get().stop(jobMetadata);
-              }
-            } catch (Exception e) {
-              _log.error("Cannot successfully setup spec: " + 
jobExecutionPlan.getJobSpec() + " on executor: " + producer
-                  + " for flow: " + spec, e);
-            }
-          }
-          deleteSpecFromCatalogIfAdhoc(flowSpec);
-        }
+        submitFlowToDagManager(flowSpec, compiledDag);
       }
     } else {
       Instrumented.markMeter(this.flowOrchestrationFailedMeter);
@@ -362,7 +304,7 @@ public class Orchestrator implements SpecCatalogListener, 
Instrumentable {
       throws IOException {
     try {
       // Send the dag to the DagManager
-      this.dagManager.get().addDag(jobExecutionPlanDag, true, true);
+      this.dagManager.addDag(jobExecutionPlanDag, true, true);
 
       /*
       Adhoc flows can be deleted after persisting it in DagManager as the 
DagManager's failure recovery method ensures
@@ -373,15 +315,11 @@ public class Orchestrator implements SpecCatalogListener, 
Instrumentable {
     } catch (Exception ex) {
       String failureMessage = "Failed to add Job Execution Plan due to: " + 
ex.getMessage();
       _log.warn("Orchestrator call - " + failureMessage, ex);
-      if (this.flowFailedForwardToDagManagerCounter.isPresent()) {
-        this.flowFailedForwardToDagManagerCounter.get().inc();
-      }
-      if (this.eventSubmitter.isPresent()) {
-        // pronounce failed before stack unwinds, to ensure flow not marooned 
in `COMPILED` state; (failure likely attributable to DB connection/failover)
-        Map<String, String> flowMetadata = 
TimingEventUtils.getFlowMetadata(flowSpec);
-        flowMetadata.put(TimingEvent.METADATA_MESSAGE, failureMessage);
-        new TimingEvent(this.eventSubmitter.get(), 
TimingEvent.FlowTimings.FLOW_FAILED).stop(flowMetadata);
-      }
+      this.flowFailedForwardToDagManagerCounter.inc();
+      // pronounce failed before stack unwinds, to ensure flow not marooned in 
`COMPILED` state; (failure likely attributable to DB connection/failover)
+      Map<String, String> flowMetadata = 
TimingEventUtils.getFlowMetadata(flowSpec);
+      flowMetadata.put(TimingEvent.METADATA_MESSAGE, failureMessage);
+      new TimingEvent(this.eventSubmitter, 
TimingEvent.FlowTimings.FLOW_FAILED).stop(flowMetadata);
       throw ex;
     }
   }
@@ -393,10 +331,8 @@ public class Orchestrator implements SpecCatalogListener, 
Instrumentable {
     if (spec instanceof FlowSpec) {
       //Send the dag to the DagManager to stop it.
       //Also send it to the SpecProducer to do any cleanup tasks on 
SpecExecutor.
-      if (this.dagManager.isPresent()) {
-        _log.info("Forwarding cancel request for flow URI {} to DagManager.", 
spec.getUri());
-        this.dagManager.get().stopDag(spec.getUri());
-      }
+      _log.info("Forwarding cancel request for flow URI {} to DagManager.", 
spec.getUri());
+      this.dagManager.stopDag(spec.getUri());
       // We need to recompile the flow to find the spec producer,
       // If compilation result is different, its remove request can go to some 
different spec producer
       deleteFromExecutor(spec, headers);
diff --git 
a/gobblin-service/src/main/java/org/apache/gobblin/service/modules/scheduler/GobblinServiceJobScheduler.java
 
b/gobblin-service/src/main/java/org/apache/gobblin/service/modules/scheduler/GobblinServiceJobScheduler.java
index 7ccc094b1..cd1bd421f 100644
--- 
a/gobblin-service/src/main/java/org/apache/gobblin/service/modules/scheduler/GobblinServiceJobScheduler.java
+++ 
b/gobblin-service/src/main/java/org/apache/gobblin/service/modules/scheduler/GobblinServiceJobScheduler.java
@@ -17,12 +17,6 @@
 
 package org.apache.gobblin.service.modules.scheduler;
 
-import com.codahale.metrics.MetricFilter;
-import com.google.common.annotations.VisibleForTesting;
-import com.google.common.base.Optional;
-import com.google.common.collect.Maps;
-import com.typesafe.config.Config;
-import com.typesafe.config.ConfigFactory;
 import java.io.IOException;
 import java.net.URI;
 import java.text.ParseException;
@@ -38,12 +32,35 @@ import java.util.Iterator;
 import java.util.Map;
 import java.util.Properties;
 import java.util.TimeZone;
+
+import org.apache.commons.lang.StringUtils;
+import org.apache.helix.HelixManager;
+import org.quartz.CronExpression;
+import org.quartz.DisallowConcurrentExecution;
+import org.quartz.InterruptableJob;
+import org.quartz.JobDataMap;
+import org.quartz.JobDetail;
+import org.quartz.JobExecutionContext;
+import org.quartz.JobExecutionException;
+import org.quartz.SchedulerException;
+import org.quartz.Trigger;
+import org.quartz.UnableToInterruptJobException;
+import org.slf4j.Logger;
+import org.slf4j.LoggerFactory;
+
+import com.codahale.metrics.MetricFilter;
+import com.google.common.annotations.VisibleForTesting;
+import com.google.common.base.Optional;
+import com.google.common.collect.Maps;
+import com.typesafe.config.Config;
+import com.typesafe.config.ConfigFactory;
+
 import javax.inject.Inject;
 import javax.inject.Named;
 import javax.inject.Singleton;
 import lombok.Getter;
 import lombok.extern.slf4j.Slf4j;
-import org.apache.commons.lang.StringUtils;
+
 import org.apache.gobblin.annotation.Alpha;
 import org.apache.gobblin.configuration.ConfigurationKeys;
 import org.apache.gobblin.instrumented.Instrumented;
@@ -76,19 +93,6 @@ import 
org.apache.gobblin.service.modules.utils.SharedFlowMetricsSingleton;
 import org.apache.gobblin.service.monitoring.FlowStatusGenerator;
 import org.apache.gobblin.util.ConfigUtils;
 import org.apache.gobblin.util.PropertiesUtils;
-import org.apache.helix.HelixManager;
-import org.quartz.CronExpression;
-import org.quartz.DisallowConcurrentExecution;
-import org.quartz.InterruptableJob;
-import org.quartz.JobDataMap;
-import org.quartz.JobDetail;
-import org.quartz.JobExecutionContext;
-import org.quartz.JobExecutionException;
-import org.quartz.SchedulerException;
-import org.quartz.Trigger;
-import org.quartz.UnableToInterruptJobException;
-import org.slf4j.Logger;
-import org.slf4j.LoggerFactory;
 
 import static 
org.apache.gobblin.service.ServiceConfigKeys.GOBBLIN_SERVICE_PREFIX;
 
@@ -168,7 +172,7 @@ public class GobblinServiceJobScheduler extends 
JobScheduler implements SpecCata
   @Inject
   public GobblinServiceJobScheduler(@Named(InjectionNames.SERVICE_NAME) String 
serviceName,
       Config config,
-      Optional<HelixManager> helixManager, Optional<FlowCatalog> flowCatalog, 
Optional<TopologyCatalog> topologyCatalog,
+      Optional<HelixManager> helixManager, Optional<FlowCatalog> flowCatalog,
       Orchestrator orchestrator, SchedulerService schedulerService, 
Optional<UserQuotaManager> quotaManager, Optional<Logger> log,
       @Named(InjectionNames.WARM_STANDBY_ENABLED) boolean isWarmStandbyEnabled,
       Optional<FlowTriggerHandler> flowTriggerHandler) throws Exception {
@@ -207,13 +211,13 @@ public class GobblinServiceJobScheduler extends 
JobScheduler implements SpecCata
   }
 
   public GobblinServiceJobScheduler(String serviceName, Config config, 
FlowStatusGenerator flowStatusGenerator,
-      Optional<HelixManager> helixManager, Optional<FlowCatalog> flowCatalog, 
Optional<TopologyCatalog> topologyCatalog,
-      Optional<DagManager> dagManager, Optional<UserQuotaManager> 
quotaManager, SchedulerService schedulerService,
+      Optional<HelixManager> helixManager, Optional<FlowCatalog> flowCatalog, 
TopologyCatalog topologyCatalog,
+      DagManager dagManager, Optional<UserQuotaManager> quotaManager, 
SchedulerService schedulerService,
       Optional<Logger> log, boolean isWarmStandbyEnabled, Optional 
<FlowTriggerHandler> flowTriggerHandler,
       SharedFlowMetricsSingleton sharedFlowMetricsSingleton)
       throws Exception {
-    this(serviceName, config, helixManager, flowCatalog, topologyCatalog,
-        new Orchestrator(config, flowStatusGenerator, topologyCatalog, 
dagManager, log, flowTriggerHandler,
+    this(serviceName, config, helixManager, flowCatalog,
+        new Orchestrator(config, topologyCatalog, dagManager, log, 
flowStatusGenerator, flowTriggerHandler,
             sharedFlowMetricsSingleton, flowCatalog),
         schedulerService, quotaManager, log, isWarmStandbyEnabled, 
flowTriggerHandler);
   }
diff --git 
a/gobblin-service/src/main/java/org/apache/gobblin/service/modules/utils/FlowCompilationValidationHelper.java
 
b/gobblin-service/src/main/java/org/apache/gobblin/service/modules/utils/FlowCompilationValidationHelper.java
index 78b5446bf..674ff0024 100644
--- 
a/gobblin-service/src/main/java/org/apache/gobblin/service/modules/utils/FlowCompilationValidationHelper.java
+++ 
b/gobblin-service/src/main/java/org/apache/gobblin/service/modules/utils/FlowCompilationValidationHelper.java
@@ -56,7 +56,7 @@ public final class FlowCompilationValidationHelper {
   private final SharedFlowMetricsSingleton sharedFlowMetricsSingleton;
   private final SpecCompiler specCompiler;
   private final UserQuotaManager quotaManager;
-  private final Optional<EventSubmitter> eventSubmitter;
+  private final EventSubmitter eventSubmitter;
   private final FlowStatusGenerator flowStatusGenerator;
   private final boolean isFlowConcurrencyEnabled;
 
@@ -66,7 +66,7 @@ public final class FlowCompilationValidationHelper {
    * caller.
    * @param flowSpec
    * @param optionalFlowExecutionId for scheduled (non-ad-hoc) flows, to pass 
the ID "laundered" via the DB;
-   *                                see: {@link MysqlMultiActiveLeaseArbiter 
javadoc section titled
+   *                                see: {@link 
org.apache.gobblin.runtime.api.MysqlMultiActiveLeaseArbiter javadoc section 
titled
    *                                `Database event_timestamp laundering`}
    * @return jobExecutionPlan dag if one can be constructed for the given 
flowSpec
    */
@@ -79,8 +79,7 @@ public final class FlowCompilationValidationHelper {
     //Wait for the SpecCompiler to become healthy.
     specCompiler.awaitHealthy();
 
-    Optional<TimingEvent> flowCompilationTimer =
-        this.eventSubmitter.transform(submitter -> new TimingEvent(submitter, 
TimingEvent.FlowTimings.FLOW_COMPILED));
+    TimingEvent flowCompilationTimer = new TimingEvent(this.eventSubmitter, 
TimingEvent.FlowTimings.FLOW_COMPILED);
     Optional<Dag<JobExecutionPlan>> jobExecutionPlanDagOptional =
         validateAndHandleConcurrentExecution(flowConfig, flowSpec, flowGroup, 
flowName);
     Map<String, String> flowMetadata = 
TimingEventUtils.getFlowMetadata(flowSpec);
@@ -89,15 +88,13 @@ public final class FlowCompilationValidationHelper {
       return Optional.absent();
     }
 
-    if (jobExecutionPlanDagOptional.get() == null || 
jobExecutionPlanDagOptional.get().isEmpty()) {
+    if (jobExecutionPlanDagOptional.get().isEmpty()) {
       populateFlowCompilationFailedEventMessage(eventSubmitter, flowSpec, 
flowMetadata);
       return Optional.absent();
     }
 
     addFlowExecutionIdIfAbsent(flowMetadata, optionalFlowExecutionId, 
jobExecutionPlanDagOptional.get());
-    if (flowCompilationTimer.isPresent()) {
-      flowCompilationTimer.get().stop(flowMetadata);
-    }
+    flowCompilationTimer.stop(flowMetadata);
     return jobExecutionPlanDagOptional;
   }
 
@@ -133,9 +130,7 @@ public final class FlowCompilationValidationHelper {
       Map<String, String> flowMetadata = 
TimingEventUtils.getFlowMetadata((FlowSpec) spec);
       flowMetadata.put(TimingEvent.METADATA_MESSAGE, "Flow failed because 
another instance is running and concurrent "
           + "executions are disabled. Set flow.allowConcurrentExecution to 
true in the flow spec to change this behaviour.");
-      if (eventSubmitter.isPresent()) {
-        new TimingEvent(eventSubmitter.get(), 
TimingEvent.FlowTimings.FLOW_FAILED).stop(flowMetadata);
-      }
+      new TimingEvent(eventSubmitter, 
TimingEvent.FlowTimings.FLOW_FAILED).stop(flowMetadata);
       return Optional.absent();
     }
   }
@@ -158,7 +153,7 @@ public final class FlowCompilationValidationHelper {
    * @param spec
    * @param flowMetadata
    */
-  public static void 
populateFlowCompilationFailedEventMessage(Optional<EventSubmitter> 
eventSubmitter, Spec spec,
+  public static void populateFlowCompilationFailedEventMessage(EventSubmitter 
eventSubmitter, Spec spec,
       Map<String, String> flowMetadata) {
     // For scheduled flows, we do not insert the flowExecutionId into the 
FlowSpec. As a result, if the flow
     // compilation fails (i.e. we are unable to find a path), the metadata 
will not have flowExecutionId.
@@ -172,12 +167,7 @@ public final class FlowCompilationValidationHelper {
     }
     flowMetadata.put(TimingEvent.METADATA_MESSAGE, message);
 
-    Optional<TimingEvent> flowCompileFailedTimer = 
eventSubmitter.transform(submitter ->
-        new TimingEvent(submitter, 
TimingEvent.FlowTimings.FLOW_COMPILE_FAILED));
-
-    if (flowCompileFailedTimer.isPresent()) {
-      flowCompileFailedTimer.get().stop(flowMetadata);
-    }
+    new TimingEvent(eventSubmitter, 
TimingEvent.FlowTimings.FLOW_COMPILE_FAILED).stop(flowMetadata);
   }
 
   /**
diff --git 
a/gobblin-service/src/main/java/org/apache/gobblin/service/monitoring/FsJobStatusRetriever.java
 
b/gobblin-service/src/main/java/org/apache/gobblin/service/monitoring/FsJobStatusRetriever.java
index 15fa2eee6..e27ce534e 100644
--- 
a/gobblin-service/src/main/java/org/apache/gobblin/service/monitoring/FsJobStatusRetriever.java
+++ 
b/gobblin-service/src/main/java/org/apache/gobblin/service/monitoring/FsJobStatusRetriever.java
@@ -43,8 +43,6 @@ import 
org.apache.gobblin.metastore.FileContextBasedFsStateStore;
 import org.apache.gobblin.metastore.FileContextBasedFsStateStoreFactory;
 import org.apache.gobblin.metastore.FsStateStore;
 import org.apache.gobblin.runtime.troubleshooter.MultiContextIssueRepository;
-import org.apache.gobblin.service.ServiceConfigKeys;
-import org.apache.gobblin.util.ConfigUtils;
 import org.apache.gobblin.util.function.CheckedExceptionFunction;
 
 
@@ -63,8 +61,7 @@ public class FsJobStatusRetriever extends JobStatusRetriever {
 
   @Inject
   public FsJobStatusRetriever(Config config, MultiContextIssueRepository 
issueRepository) {
-    super(ConfigUtils.getBoolean(config, 
ServiceConfigKeys.GOBBLIN_SERVICE_DAG_MANAGER_ENABLED_KEY,
-        ServiceConfigKeys.DEFAULT_GOBBLIN_SERVICE_DAG_MANAGER_ENABLED), 
issueRepository);
+    super(issueRepository);
     this.stateStore = (FileContextBasedFsStateStore<State>) new 
FileContextBasedFsStateStoreFactory().
         createStateStore(config.getConfig(CONF_PREFIX), State.class);
   }
diff --git 
a/gobblin-service/src/main/java/org/apache/gobblin/service/monitoring/LocalFsJobStatusRetriever.java
 
b/gobblin-service/src/main/java/org/apache/gobblin/service/monitoring/LocalFsJobStatusRetriever.java
index 84dbb60ce..887ee5827 100644
--- 
a/gobblin-service/src/main/java/org/apache/gobblin/service/monitoring/LocalFsJobStatusRetriever.java
+++ 
b/gobblin-service/src/main/java/org/apache/gobblin/service/monitoring/LocalFsJobStatusRetriever.java
@@ -37,7 +37,6 @@ import org.apache.gobblin.metastore.StateStore;
 import org.apache.gobblin.runtime.spec_executorInstance.LocalFsSpecProducer;
 import org.apache.gobblin.runtime.troubleshooter.MultiContextIssueRepository;
 import org.apache.gobblin.service.ExecutionStatus;
-import org.apache.gobblin.service.ServiceConfigKeys;
 
 
 /**
@@ -54,7 +53,7 @@ public class LocalFsJobStatusRetriever extends 
JobStatusRetriever {
   // Do not use a state store for this implementation, just look at the job 
folder that @LocalFsSpecProducer writes to
   @Inject
   public LocalFsJobStatusRetriever(Config config, MultiContextIssueRepository 
issueRepository) {
-    super(ServiceConfigKeys.DEFAULT_GOBBLIN_SERVICE_DAG_MANAGER_ENABLED, 
issueRepository);
+    super(issueRepository);
     this.specProducerPath = config.getString(CONF_PREFIX + 
LocalFsSpecProducer.LOCAL_FS_PRODUCER_PATH_KEY);
   }
 
diff --git 
a/gobblin-service/src/main/java/org/apache/gobblin/service/monitoring/MysqlJobStatusRetriever.java
 
b/gobblin-service/src/main/java/org/apache/gobblin/service/monitoring/MysqlJobStatusRetriever.java
index 0b593165d..6b8d1e251 100644
--- 
a/gobblin-service/src/main/java/org/apache/gobblin/service/monitoring/MysqlJobStatusRetriever.java
+++ 
b/gobblin-service/src/main/java/org/apache/gobblin/service/monitoring/MysqlJobStatusRetriever.java
@@ -35,8 +35,6 @@ import org.apache.gobblin.metastore.MysqlJobStatusStateStore;
 import org.apache.gobblin.metastore.MysqlJobStatusStateStoreFactory;
 import org.apache.gobblin.metrics.ServiceMetricNames;
 import org.apache.gobblin.runtime.troubleshooter.MultiContextIssueRepository;
-import org.apache.gobblin.service.ServiceConfigKeys;
-import org.apache.gobblin.util.ConfigUtils;
 
 
 /**
@@ -65,8 +63,7 @@ public class MysqlJobStatusRetriever extends 
JobStatusRetriever {
 
   @Inject
   public MysqlJobStatusRetriever(Config config, MultiContextIssueRepository 
issueRepository) throws ReflectiveOperationException {
-    super(ConfigUtils.getBoolean(config, 
ServiceConfigKeys.GOBBLIN_SERVICE_DAG_MANAGER_ENABLED_KEY,
-        ServiceConfigKeys.DEFAULT_GOBBLIN_SERVICE_DAG_MANAGER_ENABLED), 
issueRepository);
+    super(issueRepository);
     config = 
config.getConfig(MYSQL_JOB_STATUS_RETRIEVER_PREFIX).withFallback(config);
     this.stateStore = 
(MysqlJobStatusStateStoreFactory.class.newInstance()).createStateStore(config, 
State.class);
   }
diff --git 
a/gobblin-service/src/test/java/org/apache/gobblin/service/GobblinServiceManagerTest.java
 
b/gobblin-service/src/test/java/org/apache/gobblin/service/GobblinServiceManagerTest.java
index 08e89ef0b..38bfc5bee 100644
--- 
a/gobblin-service/src/test/java/org/apache/gobblin/service/GobblinServiceManagerTest.java
+++ 
b/gobblin-service/src/test/java/org/apache/gobblin/service/GobblinServiceManagerTest.java
@@ -28,9 +28,6 @@ import java.util.concurrent.TimeUnit;
 
 import org.apache.commons.io.FileUtils;
 import org.apache.curator.test.TestingServer;
-
-import 
org.apache.gobblin.service.modules.orchestration.AbstractUserQuotaManager;
-import 
org.apache.gobblin.service.modules.orchestration.ServiceAzkabanConfigKeys;
 import org.apache.hadoop.fs.Path;
 import org.eclipse.jetty.http.HttpStatus;
 import org.eclipse.jgit.api.Git;
@@ -61,13 +58,20 @@ import 
org.apache.gobblin.metastore.testing.TestMetastoreDatabaseFactory;
 import org.apache.gobblin.runtime.api.FlowSpec;
 import org.apache.gobblin.runtime.api.Spec;
 import org.apache.gobblin.runtime.spec_catalog.FlowCatalog;
-import org.apache.gobblin.service.monitoring.GitConfigMonitor;
 import org.apache.gobblin.service.modules.core.GobblinServiceManager;
 import org.apache.gobblin.service.modules.flow.MockedSpecCompiler;
+import 
org.apache.gobblin.service.modules.orchestration.AbstractUserQuotaManager;
+import org.apache.gobblin.service.modules.orchestration.DagManager;
+import 
org.apache.gobblin.service.modules.orchestration.ServiceAzkabanConfigKeys;
 import org.apache.gobblin.service.monitoring.FsJobStatusRetriever;
+import org.apache.gobblin.service.monitoring.GitConfigMonitor;
 import org.apache.gobblin.testing.AssertWithBackoff;
 import org.apache.gobblin.util.ConfigUtils;
 
+import static org.mockito.ArgumentMatchers.anyBoolean;
+import static org.mockito.Mockito.doNothing;
+import static org.mockito.Mockito.spy;
+
 
 public class GobblinServiceManagerTest {
 
@@ -193,14 +197,29 @@ public class GobblinServiceManagerTest {
     this.gitForPush.commit().setMessage("First commit").call();
     this.gitForPush.push().setRemote("origin").setRefSpecs(new 
RefSpec("master")).call();
 
-    this.gobblinServiceManager = GobblinServiceManager.create("CoreService", 
"1",
-        ConfigUtils.propertiesToConfig(serviceCoreProperties), new 
Path(SERVICE_WORK_DIR));
+    this.gobblinServiceManager = 
createTestGobblinServiceManager(serviceCoreProperties);
+
     this.gobblinServiceManager.start();
 
     this.flowConfigClient = new 
FlowConfigV2Client(String.format("http://127.0.0.1:%s/";,
         this.gobblinServiceManager.getRestLiServerListeningURI().getPort()), 
transportClientProperties);
   }
 
+  public static GobblinServiceManager 
createTestGobblinServiceManager(Properties serviceCoreProperties) {
+    return createTestGobblinServiceManager(serviceCoreProperties, 
"CoreService", "1", SERVICE_WORK_DIR);
+  }
+
+  public static GobblinServiceManager 
createTestGobblinServiceManager(Properties serviceCoreProperties,
+      String serviceName, String serviceId, String serviceWorkDir) {
+    GobblinServiceManager gobblinServiceManager = 
GobblinServiceManager.create(serviceName, serviceId,
+        ConfigUtils.propertiesToConfig(serviceCoreProperties), new 
Path(serviceWorkDir));
+
+    DagManager spiedDagManager = spy(gobblinServiceManager.getDagManager());
+    doNothing().when(spiedDagManager).setActive(anyBoolean());
+    gobblinServiceManager.dagManager = spiedDagManager;
+    return gobblinServiceManager;
+  }
+
   private void cleanUpDir(String dir) throws Exception {
     File specStoreDir = new File(dir);
     if (specStoreDir.exists()) {
@@ -555,8 +574,7 @@ null, null, null, null);
 
   private void serviceReboot() throws Exception {
     this.gobblinServiceManager.stop();
-    this.gobblinServiceManager = GobblinServiceManager.create("CoreService", 
"1",
-        ConfigUtils.propertiesToConfig(serviceCoreProperties), new 
Path(SERVICE_WORK_DIR));
+    this.gobblinServiceManager = 
createTestGobblinServiceManager(serviceCoreProperties);
     this.gobblinServiceManager.start();
     this.flowConfigClient = new 
FlowConfigV2Client(String.format("http://127.0.0.1:%s/";,
         this.gobblinServiceManager.getRestLiServerListeningURI().getPort()), 
transportClientProperties);
diff --git 
a/gobblin-service/src/test/java/org/apache/gobblin/service/modules/core/GobblinServiceHATest.java
 
b/gobblin-service/src/test/java/org/apache/gobblin/service/modules/core/GobblinServiceHATest.java
index 29782af11..3a61a7301 100644
--- 
a/gobblin-service/src/test/java/org/apache/gobblin/service/modules/core/GobblinServiceHATest.java
+++ 
b/gobblin-service/src/test/java/org/apache/gobblin/service/modules/core/GobblinServiceHATest.java
@@ -23,7 +23,6 @@ import java.util.UUID;
 
 import org.apache.commons.io.FileUtils;
 import org.apache.curator.test.TestingServer;
-import org.apache.hadoop.fs.Path;
 import org.eclipse.jetty.http.HttpStatus;
 import org.slf4j.Logger;
 import org.slf4j.LoggerFactory;
@@ -46,12 +45,12 @@ import org.apache.gobblin.runtime.spec_catalog.FlowCatalog;
 import org.apache.gobblin.service.FlowConfig;
 import org.apache.gobblin.service.FlowConfigClient;
 import org.apache.gobblin.service.FlowId;
+import org.apache.gobblin.service.GobblinServiceManagerTest;
 import org.apache.gobblin.service.Schedule;
 import org.apache.gobblin.service.ServiceConfigKeys;
 import org.apache.gobblin.service.TestServiceDatabaseConfig;
 import org.apache.gobblin.service.modules.utils.HelixUtils;
 import org.apache.gobblin.service.monitoring.FsJobStatusRetriever;
-import org.apache.gobblin.util.ConfigUtils;
 
 
 @Test
@@ -173,13 +172,13 @@ public class GobblinServiceHATest {
     node2ServiceCoreProperties.put(QUARTZ_THREAD_POOL_COUNT, 3);
 
     // Start Node 1
-    this.node1GobblinServiceManager = 
GobblinServiceManager.create("CoreService1", "1",
-        ConfigUtils.propertiesToConfig(node1ServiceCoreProperties), new 
Path(NODE_1_SERVICE_WORK_DIR));
+    this.node1GobblinServiceManager = 
GobblinServiceManagerTest.createTestGobblinServiceManager(
+        node1ServiceCoreProperties, "CoreService1", "1", 
NODE_1_SERVICE_WORK_DIR);
     this.node1GobblinServiceManager.start();
 
     // Start Node 2
-    this.node2GobblinServiceManager = 
GobblinServiceManager.create("CoreService2", "2",
-        ConfigUtils.propertiesToConfig(node2ServiceCoreProperties), new 
Path(NODE_2_SERVICE_WORK_DIR));
+    this.node2GobblinServiceManager = 
GobblinServiceManagerTest.createTestGobblinServiceManager(
+        node2ServiceCoreProperties, "CoreService2", "2", 
NODE_2_SERVICE_WORK_DIR);
     this.node2GobblinServiceManager.start();
 
     // Initialize Node 1 Client
diff --git 
a/gobblin-service/src/test/java/org/apache/gobblin/service/modules/core/GobblinServiceRedirectTest.java
 
b/gobblin-service/src/test/java/org/apache/gobblin/service/modules/core/GobblinServiceRedirectTest.java
index 4f6fbded9..7d2648a28 100644
--- 
a/gobblin-service/src/test/java/org/apache/gobblin/service/modules/core/GobblinServiceRedirectTest.java
+++ 
b/gobblin-service/src/test/java/org/apache/gobblin/service/modules/core/GobblinServiceRedirectTest.java
@@ -46,12 +46,12 @@ import org.apache.gobblin.runtime.spec_catalog.FlowCatalog;
 import org.apache.gobblin.service.FlowConfig;
 import org.apache.gobblin.service.FlowConfigClient;
 import org.apache.gobblin.service.FlowId;
+import org.apache.gobblin.service.GobblinServiceManagerTest;
 import org.apache.gobblin.service.Schedule;
 import org.apache.gobblin.service.ServiceConfigKeys;
 import org.apache.gobblin.service.TestServiceDatabaseConfig;
 import org.apache.gobblin.service.modules.utils.HelixUtils;
 import org.apache.gobblin.service.monitoring.FsJobStatusRetriever;
-import org.apache.gobblin.util.ConfigUtils;
 import org.apache.gobblin.util.PortUtils;
 
 
@@ -177,13 +177,13 @@ public class GobblinServiceRedirectTest {
     node2ServiceCoreProperties.put(ServiceConfigKeys.SERVICE_PORT, port2);
 
     // Start Node 1
-    this.node1GobblinServiceManager = 
GobblinServiceManager.create("RedirectCoreService1", "1",
-        ConfigUtils.propertiesToConfig(node1ServiceCoreProperties), new 
Path(NODE_1_SERVICE_WORK_DIR));
+    this.node1GobblinServiceManager = 
GobblinServiceManagerTest.createTestGobblinServiceManager(
+        node1ServiceCoreProperties, "RedirectCoreService1", "1", 
NODE_1_SERVICE_WORK_DIR);
     this.node1GobblinServiceManager.start();
 
     // Start Node 2
-    this.node2GobblinServiceManager = 
GobblinServiceManager.create("RedirectCoreService2", "2",
-        ConfigUtils.propertiesToConfig(node2ServiceCoreProperties), new 
Path(NODE_2_SERVICE_WORK_DIR));
+    this.node2GobblinServiceManager = 
GobblinServiceManagerTest.createTestGobblinServiceManager(
+        node2ServiceCoreProperties, "RedirectCoreService2", "2", 
NODE_2_SERVICE_WORK_DIR);
     this.node2GobblinServiceManager.start();
 
     // Initialize Node 1 Client
diff --git 
a/gobblin-service/src/test/java/org/apache/gobblin/service/modules/orchestration/DagManagerFlowTest.java
 
b/gobblin-service/src/test/java/org/apache/gobblin/service/modules/orchestration/DagManagerFlowTest.java
index a72f45732..d61d9a6de 100644
--- 
a/gobblin-service/src/test/java/org/apache/gobblin/service/modules/orchestration/DagManagerFlowTest.java
+++ 
b/gobblin-service/src/test/java/org/apache/gobblin/service/modules/orchestration/DagManagerFlowTest.java
@@ -90,7 +90,7 @@ public class DagManagerFlowTest {
     dagActionStore = new MysqlDagActionStore(config);
     dagActionStore.addDagAction(flowGroup, flowName, flowExecutionId, 
DagActionStore.FlowActionType.KILL);
     dagActionStore.addDagAction(flowGroup, flowName, flowExecutionId_2, 
DagActionStore.FlowActionType.RESUME);
-    dagManager = new MockedDagManager(ConfigUtils.propertiesToConfig(props), 
false);
+    dagManager = new MockedDagManager(ConfigUtils.propertiesToConfig(props));
     dagManager.dagActionStore = Optional.of(dagActionStore);
     dagManager.setActive(true);
     this.dagNumThreads = dagManager.getNumThreads();
@@ -391,9 +391,9 @@ class CancelPredicate implements Predicate<Void> {
 
 class MockedDagManager extends DagManager {
 
-  public MockedDagManager(Config config, boolean instrumentationEnabled) {
+  public MockedDagManager(Config config) {
     super(config, createJobStatusRetriever(), 
Mockito.mock(SharedFlowMetricsSingleton.class),
-        Mockito.mock(FlowStatusGenerator.class), 
Mockito.mock(FlowCatalog.class), instrumentationEnabled);
+        Mockito.mock(FlowStatusGenerator.class), 
Mockito.mock(FlowCatalog.class));
   }
 
   private static JobStatusRetriever createJobStatusRetriever() {
diff --git 
a/gobblin-service/src/test/java/org/apache/gobblin/service/modules/orchestration/DagManagerTest.java
 
b/gobblin-service/src/test/java/org/apache/gobblin/service/modules/orchestration/DagManagerTest.java
index 2babd0683..d5aec8ad4 100644
--- 
a/gobblin-service/src/test/java/org/apache/gobblin/service/modules/orchestration/DagManagerTest.java
+++ 
b/gobblin-service/src/test/java/org/apache/gobblin/service/modules/orchestration/DagManagerTest.java
@@ -106,7 +106,7 @@ public class DagManagerTest {
     this._dagManagerMetrics.activate();
     this._dagManagerThread = new 
DagManager.DagManagerThread(_jobStatusRetriever, _dagStateStore, 
_failedDagStateStore,
         Optional.absent(), queue, cancelQueue,
-        resumeQueue, true, new HashSet<>(), this._dagManagerMetrics, 
START_SLA_DEFAULT, _gobblinServiceQuotaManager, 0);
+        resumeQueue, new HashSet<>(), this._dagManagerMetrics, 
START_SLA_DEFAULT, _gobblinServiceQuotaManager, 0);
 
     Field jobToDagField = 
DagManager.DagManagerThread.class.getDeclaredField("jobToDag");
     jobToDagField.setAccessible(true);
diff --git 
a/gobblin-service/src/test/java/org/apache/gobblin/service/modules/orchestration/OrchestratorTest.java
 
b/gobblin-service/src/test/java/org/apache/gobblin/service/modules/orchestration/OrchestratorTest.java
index 7e7daef9b..8d64b0978 100644
--- 
a/gobblin-service/src/test/java/org/apache/gobblin/service/modules/orchestration/OrchestratorTest.java
+++ 
b/gobblin-service/src/test/java/org/apache/gobblin/service/modules/orchestration/OrchestratorTest.java
@@ -17,24 +17,33 @@
 
 package org.apache.gobblin.service.modules.orchestration;
 
-import com.codahale.metrics.MetricRegistry;
-import com.google.common.base.Optional;
-import com.google.gson.Gson;
-import com.google.gson.GsonBuilder;
-import com.typesafe.config.Config;
 import java.io.File;
 import java.net.URI;
 import java.net.URISyntaxException;
 import java.util.Collection;
 import java.util.List;
 import java.util.Properties;
+
 import org.apache.commons.io.FileUtils;
+import org.apache.hadoop.fs.Path;
+import org.slf4j.Logger;
+import org.slf4j.LoggerFactory;
+import org.testng.Assert;
+import org.testng.annotations.AfterClass;
+import org.testng.annotations.BeforeClass;
+import org.testng.annotations.Test;
+
+import com.codahale.metrics.MetricRegistry;
+import com.google.common.base.Optional;
+import com.google.gson.Gson;
+import com.google.gson.GsonBuilder;
+import com.typesafe.config.Config;
+
 import org.apache.gobblin.configuration.ConfigurationKeys;
 import org.apache.gobblin.metrics.MetricContext;
 import org.apache.gobblin.metrics.ServiceMetricNames;
 import org.apache.gobblin.runtime.api.FlowSpec;
 import org.apache.gobblin.runtime.api.Spec;
-import org.apache.gobblin.runtime.api.SpecCatalogListener;
 import org.apache.gobblin.runtime.api.SpecExecutor;
 import org.apache.gobblin.runtime.api.TopologySpec;
 import org.apache.gobblin.runtime.app.ServiceBasedAppLauncher;
@@ -46,15 +55,10 @@ import 
org.apache.gobblin.service.modules.utils.SharedFlowMetricsSingleton;
 import org.apache.gobblin.service.monitoring.FlowStatusGenerator;
 import org.apache.gobblin.util.ConfigUtils;
 import org.apache.gobblin.util.PathUtils;
-import org.apache.hadoop.fs.Path;
-import org.slf4j.Logger;
-import org.slf4j.LoggerFactory;
-import org.testng.Assert;
-import org.testng.annotations.AfterClass;
-import org.testng.annotations.BeforeClass;
-import org.testng.annotations.Test;
 
-import static org.mockito.Mockito.*;
+import static org.mockito.ArgumentMatchers.anyMap;
+import static org.mockito.Mockito.doNothing;
+import static org.mockito.Mockito.mock;
 
 
 public class OrchestratorTest {
@@ -73,10 +77,7 @@ public class OrchestratorTest {
   private TopologySpec topologySpec;
 
   private FlowCatalog flowCatalog;
-  private SpecCatalogListener mockListener;
   private FlowSpec flowSpec;
-  private FlowStatusGenerator mockStatusGenerator;
-  private FlowTriggerHandler _mockFlowTriggerHandler;
   private Orchestrator orchestrator;
 
   @BeforeClass
@@ -103,12 +104,14 @@ public class OrchestratorTest {
         Optional.of(logger), Optional.<MetricContext>absent(), true, true);
 
     this.serviceLauncher.addService(flowCatalog);
-    this.mockStatusGenerator = mock(FlowStatusGenerator.class);
+    FlowStatusGenerator mockStatusGenerator = mock(FlowStatusGenerator.class);
+    FlowTriggerHandler mockFlowTriggerHandler = mock(FlowTriggerHandler.class);
+    DagManager mockDagManager = mock(DagManager.class);
+    doNothing().when(mockDagManager).setTopologySpecMap(anyMap());
 
-    this._mockFlowTriggerHandler = mock(FlowTriggerHandler.class);
     this.orchestrator = new 
Orchestrator(ConfigUtils.propertiesToConfig(orchestratorProperties),
-        this.mockStatusGenerator, Optional.of(this.topologyCatalog), 
Optional.<DagManager>absent(), Optional.of(logger),
-         Optional.of(this._mockFlowTriggerHandler), new 
SharedFlowMetricsSingleton(
+        this.topologyCatalog, mockDagManager, Optional.of(logger), 
mockStatusGenerator,
+        Optional.of(mockFlowTriggerHandler), new SharedFlowMetricsSingleton(
              ConfigUtils.propertiesToConfig(orchestratorProperties)), 
Optional.of(mock(FlowCatalog.class)));
     this.topologyCatalog.addListener(orchestrator);
     this.flowCatalog.addListener(orchestrator);
diff --git 
a/gobblin-service/src/test/java/org/apache/gobblin/service/modules/scheduler/GobblinServiceJobSchedulerTest.java
 
b/gobblin-service/src/test/java/org/apache/gobblin/service/modules/scheduler/GobblinServiceJobSchedulerTest.java
index 0fdfc894b..45f8323a3 100644
--- 
a/gobblin-service/src/test/java/org/apache/gobblin/service/modules/scheduler/GobblinServiceJobSchedulerTest.java
+++ 
b/gobblin-service/src/test/java/org/apache/gobblin/service/modules/scheduler/GobblinServiceJobSchedulerTest.java
@@ -50,7 +50,6 @@ import org.apache.gobblin.runtime.app.ServiceBasedAppLauncher;
 import org.apache.gobblin.runtime.spec_catalog.AddSpecResponse;
 import org.apache.gobblin.runtime.spec_catalog.FlowCatalog;
 import org.apache.gobblin.runtime.spec_catalog.FlowCatalogTest;
-import org.apache.gobblin.runtime.spec_catalog.TopologyCatalog;
 import org.apache.gobblin.runtime.spec_executorInstance.InMemorySpecExecutor;
 import org.apache.gobblin.scheduler.SchedulerService;
 import org.apache.gobblin.service.ServiceConfigKeys;
@@ -58,9 +57,9 @@ import 
org.apache.gobblin.service.modules.flow.MockedSpecCompiler;
 import org.apache.gobblin.service.modules.flow.SpecCompiler;
 import org.apache.gobblin.service.modules.flowgraph.Dag;
 import 
org.apache.gobblin.service.modules.orchestration.AbstractUserQuotaManager;
+import org.apache.gobblin.service.modules.orchestration.FlowTriggerHandler;
 import 
org.apache.gobblin.service.modules.orchestration.InMemoryUserQuotaManager;
 import org.apache.gobblin.service.modules.orchestration.Orchestrator;
-import org.apache.gobblin.service.modules.orchestration.FlowTriggerHandler;
 import org.apache.gobblin.service.modules.orchestration.UserQuotaManager;
 import org.apache.gobblin.service.modules.spec.JobExecutionPlan;
 import org.apache.gobblin.service.modules.spec.JobExecutionPlanDagFactory;
@@ -135,7 +134,7 @@ public class GobblinServiceJobSchedulerTest {
 
     // Mock a GaaS scheduler.
     TestGobblinServiceJobScheduler scheduler = new 
TestGobblinServiceJobScheduler("testscheduler",
-        ConfigFactory.empty(), Optional.of(flowCatalog), null, 
mockOrchestrator, Optional.of(quotaManager), null, false);
+        ConfigFactory.empty(), Optional.of(flowCatalog), mockOrchestrator, 
Optional.of(quotaManager), null, false);
 
     SpecCompiler mockCompiler = Mockito.mock(SpecCompiler.class);
     Mockito.when(mockOrchestrator.getSpecCompiler()).thenReturn(mockCompiler);
@@ -221,7 +220,7 @@ public class GobblinServiceJobSchedulerTest {
 
     // Mock a GaaS scheduler.
     TestGobblinServiceJobScheduler scheduler = new 
TestGobblinServiceJobScheduler("testscheduler",
-        ConfigFactory.empty(), Optional.of(flowCatalog), null, 
mockOrchestrator, Optional.of(new InMemoryUserQuotaManager(quotaConfig)), null, 
false);
+        ConfigFactory.empty(), Optional.of(flowCatalog), mockOrchestrator, 
Optional.of(new InMemoryUserQuotaManager(quotaConfig)), null, false);
 
     SpecCompiler mockCompiler = Mockito.mock(SpecCompiler.class);
     Mockito.when(mockOrchestrator.getSpecCompiler()).thenReturn(mockCompiler);
@@ -284,7 +283,7 @@ public class GobblinServiceJobSchedulerTest {
     SchedulerService schedulerService = new SchedulerService(new Properties());
     // Mock a GaaS scheduler.
     TestGobblinServiceJobScheduler scheduler = new 
TestGobblinServiceJobScheduler("testscheduler",
-        ConfigFactory.empty(), Optional.of(flowCatalog), null, 
mockOrchestrator, Optional.of(new InMemoryUserQuotaManager(quotaConfig)), 
schedulerService, false);
+        ConfigFactory.empty(), Optional.of(flowCatalog), mockOrchestrator, 
Optional.of(new InMemoryUserQuotaManager(quotaConfig)), schedulerService, 
false);
 
     schedulerService.startAsync().awaitRunning();
     scheduler.startUp();
@@ -355,7 +354,7 @@ public class GobblinServiceJobSchedulerTest {
     SchedulerService schedulerService = new SchedulerService(new Properties());
     // Mock a GaaS scheduler not in warm standby mode
     GobblinServiceJobScheduler scheduler = new 
GobblinServiceJobScheduler("testscheduler",
-        ConfigFactory.empty(), Optional.absent(), Optional.of(flowCatalog), 
null, mockOrchestrator, schedulerService,
+        ConfigFactory.empty(), Optional.absent(), Optional.of(flowCatalog), 
mockOrchestrator, schedulerService,
         Optional.of(new InMemoryUserQuotaManager(quotaConfig)), 
Optional.absent(), false, Optional.of(Mockito.mock(
         FlowTriggerHandler.class)));
 
@@ -374,9 +373,9 @@ public class GobblinServiceJobSchedulerTest {
 
     //Mock a GaaS scheduler in warm standby mode, where we don't check quota
     GobblinServiceJobScheduler schedulerWithWarmStandbyEnabled = new 
GobblinServiceJobScheduler("testscheduler",
-        ConfigFactory.empty(), Optional.absent(), Optional.of(flowCatalog), 
null, mockOrchestrator, schedulerService,
-        Optional.of(new InMemoryUserQuotaManager(quotaConfig)), 
Optional.absent(), true, Optional.of(Mockito.mock(
-        FlowTriggerHandler.class)));
+        ConfigFactory.empty(), Optional.absent(), Optional.of(flowCatalog), 
mockOrchestrator, schedulerService,
+        Optional.of(new InMemoryUserQuotaManager(quotaConfig)), 
Optional.absent(), true,
+        Optional.of(Mockito.mock(FlowTriggerHandler.class)));
 
     schedulerWithWarmStandbyEnabled.startUp();
     schedulerWithWarmStandbyEnabled.setActive(true);
@@ -396,9 +395,9 @@ public class GobblinServiceJobSchedulerTest {
     private boolean hasScheduler = false;
 
     public TestGobblinServiceJobScheduler(String serviceName, Config config,
-        Optional<FlowCatalog> flowCatalog, Optional<TopologyCatalog> 
topologyCatalog, Orchestrator orchestrator, Optional<UserQuotaManager> 
quotaManager,
+        Optional<FlowCatalog> flowCatalog, Orchestrator orchestrator, 
Optional<UserQuotaManager> quotaManager,
         SchedulerService schedulerService, boolean isWarmStandbyEnabled) 
throws Exception {
-      super(serviceName, config, Optional.absent(), flowCatalog, 
topologyCatalog, orchestrator, schedulerService,
+      super(serviceName, config, Optional.absent(), flowCatalog, orchestrator, 
schedulerService,
           quotaManager, Optional.absent(), isWarmStandbyEnabled, 
Optional.of(Mockito.mock(FlowTriggerHandler.class)));
       if (schedulerService != null) {
         hasScheduler = true;
diff --git 
a/gobblin-service/src/test/java/org/apache/gobblin/service/monitoring/FsJobStatusRetrieverTest.java
 
b/gobblin-service/src/test/java/org/apache/gobblin/service/monitoring/FsJobStatusRetrieverTest.java
index 257726e1d..5af639cb5 100644
--- 
a/gobblin-service/src/test/java/org/apache/gobblin/service/monitoring/FsJobStatusRetrieverTest.java
+++ 
b/gobblin-service/src/test/java/org/apache/gobblin/service/monitoring/FsJobStatusRetrieverTest.java
@@ -32,7 +32,6 @@ import com.typesafe.config.ConfigValueFactory;
 import org.apache.gobblin.configuration.ConfigurationKeys;
 import org.apache.gobblin.runtime.troubleshooter.MultiContextIssueRepository;
 import org.apache.gobblin.service.ExecutionStatus;
-import org.apache.gobblin.service.ServiceConfigKeys;
 
 import static org.mockito.Mockito.mock;
 
@@ -46,9 +45,7 @@ public class FsJobStatusRetrieverTest extends 
JobStatusRetrieverTest {
     cleanUpDir();
     Config config = ConfigFactory.empty()
         .withValue(FsJobStatusRetriever.CONF_PREFIX + "." + 
ConfigurationKeys.STATE_STORE_ROOT_DIR_KEY,
-        ConfigValueFactory.fromAnyRef(stateStoreDir))
-        .withValue(ServiceConfigKeys.GOBBLIN_SERVICE_DAG_MANAGER_ENABLED_KEY,
-            ConfigValueFactory.fromAnyRef("true"));
+        ConfigValueFactory.fromAnyRef(stateStoreDir));
     this.jobStatusRetriever = new FsJobStatusRetriever(config, 
mock(MultiContextIssueRepository.class));
   }
 
@@ -83,31 +80,31 @@ public class FsJobStatusRetrieverTest extends 
JobStatusRetrieverTest {
 
     addJobStatusToStateStore(flowExecutionId, JobStatusRetriever.NA_KEY, 
ExecutionStatus.COMPILED.name());
     Assert.assertEquals(ExecutionStatus.COMPILED,
-        
jobStatusRetriever.getFlowStatusFromJobStatuses(jobStatusRetriever.dagManagerEnabled,
 jobStatusRetriever.getJobStatusesForFlowExecution(FLOW_NAME, FLOW_GROUP, 
flowExecutionId)));
+        
jobStatusRetriever.getFlowStatusFromJobStatuses(jobStatusRetriever.getJobStatusesForFlowExecution(FLOW_NAME,
 FLOW_GROUP, flowExecutionId)));
 
     addJobStatusToStateStore(flowExecutionId, MY_JOB_NAME_1, 
ExecutionStatus.ORCHESTRATED.name(), JOB_ORCHESTRATED_TIME, 
JOB_ORCHESTRATED_TIME);
     Assert.assertEquals(ExecutionStatus.COMPILED,
-        
jobStatusRetriever.getFlowStatusFromJobStatuses(jobStatusRetriever.dagManagerEnabled,
 jobStatusRetriever.getJobStatusesForFlowExecution(FLOW_NAME, FLOW_GROUP, 
flowExecutionId)));
+        
jobStatusRetriever.getFlowStatusFromJobStatuses(jobStatusRetriever.getJobStatusesForFlowExecution(FLOW_NAME,
 FLOW_GROUP, flowExecutionId)));
 
     addJobStatusToStateStore(flowExecutionId, JobStatusRetriever.NA_KEY, 
ExecutionStatus.ORCHESTRATED.name());
     Assert.assertEquals(ExecutionStatus.ORCHESTRATED,
-        
jobStatusRetriever.getFlowStatusFromJobStatuses(jobStatusRetriever.dagManagerEnabled,
 jobStatusRetriever.getJobStatusesForFlowExecution(FLOW_NAME, FLOW_GROUP, 
flowExecutionId)));
+        
jobStatusRetriever.getFlowStatusFromJobStatuses(jobStatusRetriever.getJobStatusesForFlowExecution(FLOW_NAME,
 FLOW_GROUP, flowExecutionId)));
 
     addJobStatusToStateStore(flowExecutionId, MY_JOB_NAME_1, 
ExecutionStatus.RUNNING.name(), JOB_ORCHESTRATED_TIME, JOB_ORCHESTRATED_TIME);
     Assert.assertEquals(ExecutionStatus.ORCHESTRATED,
-        
jobStatusRetriever.getFlowStatusFromJobStatuses(jobStatusRetriever.dagManagerEnabled,
 jobStatusRetriever.getJobStatusesForFlowExecution(FLOW_NAME, FLOW_GROUP, 
flowExecutionId)));
+        
jobStatusRetriever.getFlowStatusFromJobStatuses(jobStatusRetriever.getJobStatusesForFlowExecution(FLOW_NAME,
 FLOW_GROUP, flowExecutionId)));
 
     addJobStatusToStateStore(flowExecutionId, JobStatusRetriever.NA_KEY, 
ExecutionStatus.RUNNING.name());
     Assert.assertEquals(ExecutionStatus.RUNNING,
-        
jobStatusRetriever.getFlowStatusFromJobStatuses(jobStatusRetriever.dagManagerEnabled,
 jobStatusRetriever.getJobStatusesForFlowExecution(FLOW_NAME, FLOW_GROUP, 
flowExecutionId)));
+        
jobStatusRetriever.getFlowStatusFromJobStatuses(jobStatusRetriever.getJobStatusesForFlowExecution(FLOW_NAME,
 FLOW_GROUP, flowExecutionId)));
 
     addJobStatusToStateStore(flowExecutionId, MY_JOB_NAME_1, 
ExecutionStatus.COMPLETE.name(), JOB_ORCHESTRATED_TIME, JOB_ORCHESTRATED_TIME);
     Assert.assertEquals(ExecutionStatus.RUNNING,
-        
jobStatusRetriever.getFlowStatusFromJobStatuses(jobStatusRetriever.dagManagerEnabled,
 jobStatusRetriever.getJobStatusesForFlowExecution(FLOW_NAME, FLOW_GROUP, 
flowExecutionId)));
+        
jobStatusRetriever.getFlowStatusFromJobStatuses(jobStatusRetriever.getJobStatusesForFlowExecution(FLOW_NAME,
 FLOW_GROUP, flowExecutionId)));
 
     addJobStatusToStateStore(flowExecutionId, JobStatusRetriever.NA_KEY, 
ExecutionStatus.COMPLETE.name());
     Assert.assertEquals(ExecutionStatus.COMPLETE,
-        
jobStatusRetriever.getFlowStatusFromJobStatuses(jobStatusRetriever.dagManagerEnabled,
 jobStatusRetriever.getJobStatusesForFlowExecution(FLOW_NAME, FLOW_GROUP, 
flowExecutionId)));
+        
jobStatusRetriever.getFlowStatusFromJobStatuses(jobStatusRetriever.getJobStatusesForFlowExecution(FLOW_NAME,
 FLOW_GROUP, flowExecutionId)));
   }
 
   @Override
diff --git 
a/gobblin-service/src/test/java/org/apache/gobblin/service/monitoring/FsJobStatusRetrieverTestWithoutDagManager.java
 
b/gobblin-service/src/test/java/org/apache/gobblin/service/monitoring/FsJobStatusRetrieverTestWithoutDagManager.java
deleted file mode 100644
index c91fc5be3..000000000
--- 
a/gobblin-service/src/test/java/org/apache/gobblin/service/monitoring/FsJobStatusRetrieverTestWithoutDagManager.java
+++ /dev/null
@@ -1,116 +0,0 @@
-/*
- * Licensed to the Apache Software Foundation (ASF) under one or more
- * contributor license agreements.  See the NOTICE file distributed with
- * this work for additional information regarding copyright ownership.
- * The ASF licenses this file to You under the Apache License, Version 2.0
- * (the "License"); you may not use this file except in compliance with
- * the License.  You may obtain a copy of the License at
- *
- *    http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
- */
-
-package org.apache.gobblin.service.monitoring;
-
-import java.io.File;
-import java.io.IOException;
-
-import org.apache.commons.io.FileUtils;
-import org.testng.Assert;
-import org.testng.annotations.BeforeClass;
-import org.testng.annotations.Test;
-
-import com.typesafe.config.Config;
-import com.typesafe.config.ConfigFactory;
-import com.typesafe.config.ConfigValueFactory;
-
-import org.apache.gobblin.configuration.ConfigurationKeys;
-import org.apache.gobblin.runtime.troubleshooter.MultiContextIssueRepository;
-import org.apache.gobblin.service.ExecutionStatus;
-
-import static org.mockito.Mockito.mock;
-
-
-public class FsJobStatusRetrieverTestWithoutDagManager extends 
JobStatusRetrieverTest {
-
-  private String stateStoreDir = "/tmp/jobStatusRetrieverTest/statestore";
-
-  @BeforeClass
-  public void setUp() throws Exception {
-    cleanUpDir();
-    Config config = 
ConfigFactory.empty().withValue(FsJobStatusRetriever.CONF_PREFIX + "." + 
ConfigurationKeys.STATE_STORE_ROOT_DIR_KEY,
-        ConfigValueFactory.fromAnyRef(stateStoreDir));
-    this.jobStatusRetriever = new FsJobStatusRetriever(config, 
mock(MultiContextIssueRepository.class));
-  }
-
-  @Test
-  public void testGetJobStatusesForFlowExecution() throws IOException {
-    super.testGetJobStatusesForFlowExecution();
-  }
-
-  @Test (dependsOnMethods = "testGetJobStatusesForFlowExecution")
-  public void testJobTiming() throws Exception {
-    super.testJobTiming();
-  }
-
-  @Test (dependsOnMethods = "testJobTiming")
-  public void testOutOfOrderJobTimingEvents() throws IOException {
-    super.testOutOfOrderJobTimingEvents();
-  }
-
-  @Test (dependsOnMethods = "testJobTiming")
-  public void testGetJobStatusesForFlowExecution1() {
-    super.testGetJobStatusesForFlowExecution1();
-  }
-
-  @Test (dependsOnMethods = "testGetJobStatusesForFlowExecution1")
-  public void testGetLatestExecutionIdsForFlow() throws Exception {
-    super.testGetLatestExecutionIdsForFlow();
-  }
-
-  @Test (dependsOnMethods = "testGetLatestExecutionIdsForFlow")
-  public void testGetFlowStatusFromJobStatuses() throws Exception {
-    long flowExecutionId = 1237L;
-
-    addJobStatusToStateStore(flowExecutionId, JobStatusRetriever.NA_KEY, 
ExecutionStatus.COMPILED.name());
-    Assert.assertEquals(ExecutionStatus.$UNKNOWN,
-        
jobStatusRetriever.getFlowStatusFromJobStatuses(jobStatusRetriever.dagManagerEnabled,
 jobStatusRetriever.getJobStatusesForFlowExecution(FLOW_NAME, FLOW_GROUP, 
flowExecutionId)));
-
-    addJobStatusToStateStore(flowExecutionId, JobStatusRetriever.NA_KEY, 
ExecutionStatus.ORCHESTRATED.name());
-    Assert.assertEquals(ExecutionStatus.$UNKNOWN,
-        
jobStatusRetriever.getFlowStatusFromJobStatuses(jobStatusRetriever.dagManagerEnabled,
 jobStatusRetriever.getJobStatusesForFlowExecution(FLOW_NAME, FLOW_GROUP, 
flowExecutionId)));
-
-    addJobStatusToStateStore(flowExecutionId, MY_JOB_NAME_1, 
ExecutionStatus.ORCHESTRATED.name(), JOB_ORCHESTRATED_TIME, 
JOB_ORCHESTRATED_TIME);
-    Assert.assertEquals(ExecutionStatus.ORCHESTRATED,
-        
jobStatusRetriever.getFlowStatusFromJobStatuses(jobStatusRetriever.dagManagerEnabled,
 jobStatusRetriever.getJobStatusesForFlowExecution(FLOW_NAME, FLOW_GROUP, 
flowExecutionId)));
-
-    addJobStatusToStateStore(flowExecutionId, JobStatusRetriever.NA_KEY, 
ExecutionStatus.RUNNING.name());
-    Assert.assertEquals(ExecutionStatus.ORCHESTRATED,
-        
jobStatusRetriever.getFlowStatusFromJobStatuses(jobStatusRetriever.dagManagerEnabled,
 jobStatusRetriever.getJobStatusesForFlowExecution(FLOW_NAME, FLOW_GROUP, 
flowExecutionId)));
-
-    addJobStatusToStateStore(flowExecutionId, MY_JOB_NAME_1, 
ExecutionStatus.RUNNING.name(), JOB_ORCHESTRATED_TIME, JOB_ORCHESTRATED_TIME);
-    Assert.assertEquals(ExecutionStatus.RUNNING,
-        
jobStatusRetriever.getFlowStatusFromJobStatuses(jobStatusRetriever.dagManagerEnabled,
 jobStatusRetriever.getJobStatusesForFlowExecution(FLOW_NAME, FLOW_GROUP, 
flowExecutionId)));
-
-    addJobStatusToStateStore(flowExecutionId, JobStatusRetriever.NA_KEY, 
ExecutionStatus.COMPLETE.name(), JOB_ORCHESTRATED_TIME, JOB_ORCHESTRATED_TIME);
-    Assert.assertEquals(ExecutionStatus.RUNNING,
-        
jobStatusRetriever.getFlowStatusFromJobStatuses(jobStatusRetriever.dagManagerEnabled,
 jobStatusRetriever.getJobStatusesForFlowExecution(FLOW_NAME, FLOW_GROUP, 
flowExecutionId)));
-
-    addJobStatusToStateStore(flowExecutionId, MY_JOB_NAME_1, 
ExecutionStatus.COMPLETE.name());
-    Assert.assertEquals(ExecutionStatus.COMPLETE,
-        
jobStatusRetriever.getFlowStatusFromJobStatuses(jobStatusRetriever.dagManagerEnabled,
 jobStatusRetriever.getJobStatusesForFlowExecution(FLOW_NAME, FLOW_GROUP, 
flowExecutionId)));
-  }
-
-  @Override
-  protected void cleanUpDir() throws Exception {
-    File specStoreDir = new File(this.stateStoreDir);
-    if (specStoreDir.exists()) {
-      FileUtils.deleteDirectory(specStoreDir);
-    }
-  }
-}
\ No newline at end of file
diff --git 
a/gobblin-service/src/test/java/org/apache/gobblin/service/monitoring/JobStatusRetrieverTest.java
 
b/gobblin-service/src/test/java/org/apache/gobblin/service/monitoring/JobStatusRetrieverTest.java
index 00b2598c8..345b5d7cc 100644
--- 
a/gobblin-service/src/test/java/org/apache/gobblin/service/monitoring/JobStatusRetrieverTest.java
+++ 
b/gobblin-service/src/test/java/org/apache/gobblin/service/monitoring/JobStatusRetrieverTest.java
@@ -175,7 +175,7 @@ public abstract class JobStatusRetrieverTest {
     Assert.assertEquals(jobStatus.getJobGroup(), jobGroup);
     Assert.assertFalse(jobStatusIterator.hasNext());
     Assert.assertEquals(ExecutionStatus.RUNNING,
-        
this.jobStatusRetriever.getFlowStatusFromJobStatuses(this.jobStatusRetriever.dagManagerEnabled,
 this.jobStatusRetriever.getJobStatusesForFlowExecution(FLOW_NAME, FLOW_GROUP, 
flowExecutionId)));
+        
this.jobStatusRetriever.getFlowStatusFromJobStatuses(this.jobStatusRetriever.getJobStatusesForFlowExecution(FLOW_NAME,
 FLOW_GROUP, flowExecutionId)));
 
     addJobStatusToStateStore(flowExecutionId, MY_JOB_NAME_2, 
ExecutionStatus.RUNNING.name());
     jobStatusIterator = 
this.jobStatusRetriever.getJobStatusesForFlowExecution(FLOW_NAME, FLOW_GROUP, 
flowExecutionId);
diff --git 
a/gobblin-service/src/test/java/org/apache/gobblin/service/monitoring/MysqlJobStatusRetrieverTest.java
 
b/gobblin-service/src/test/java/org/apache/gobblin/service/monitoring/MysqlJobStatusRetrieverTest.java
index f26ebf59e..66d552e54 100644
--- 
a/gobblin-service/src/test/java/org/apache/gobblin/service/monitoring/MysqlJobStatusRetrieverTest.java
+++ 
b/gobblin-service/src/test/java/org/apache/gobblin/service/monitoring/MysqlJobStatusRetrieverTest.java
@@ -56,7 +56,6 @@ public class MysqlJobStatusRetrieverTest extends 
JobStatusRetrieverTest {
     
configBuilder.addPrimitive(MysqlJobStatusRetriever.MYSQL_JOB_STATUS_RETRIEVER_PREFIX
 + "." + ConfigurationKeys.STATE_STORE_DB_URL_KEY, jdbcUrl);
     
configBuilder.addPrimitive(MysqlJobStatusRetriever.MYSQL_JOB_STATUS_RETRIEVER_PREFIX
 + "." + ConfigurationKeys.STATE_STORE_DB_USER_KEY, TEST_USER);
     
configBuilder.addPrimitive(MysqlJobStatusRetriever.MYSQL_JOB_STATUS_RETRIEVER_PREFIX
 + "." + ConfigurationKeys.STATE_STORE_DB_PASSWORD_KEY, TEST_PASSWORD);
-    
configBuilder.addPrimitive(ServiceConfigKeys.GOBBLIN_SERVICE_DAG_MANAGER_ENABLED_KEY,
 "true");
 
     this.jobStatusRetriever =
         new MysqlJobStatusRetriever(configBuilder.build(), 
mock(MultiContextIssueRepository.class));
@@ -95,31 +94,31 @@ public class MysqlJobStatusRetrieverTest extends 
JobStatusRetrieverTest {
 
     addJobStatusToStateStore(flowExecutionId, JobStatusRetriever.NA_KEY, 
ExecutionStatus.COMPILED.name());
     Assert.assertEquals(ExecutionStatus.COMPILED,
-        
jobStatusRetriever.getFlowStatusFromJobStatuses(jobStatusRetriever.dagManagerEnabled,
 jobStatusRetriever.getJobStatusesForFlowExecution(FLOW_NAME, FLOW_GROUP, 
flowExecutionId)));
+        
jobStatusRetriever.getFlowStatusFromJobStatuses(jobStatusRetriever.getJobStatusesForFlowExecution(FLOW_NAME,
 FLOW_GROUP, flowExecutionId)));
 
     addJobStatusToStateStore(flowExecutionId, MY_JOB_NAME_1, 
ExecutionStatus.ORCHESTRATED.name(), JOB_ORCHESTRATED_TIME, 
JOB_ORCHESTRATED_TIME);
     Assert.assertEquals(ExecutionStatus.COMPILED,
-        
jobStatusRetriever.getFlowStatusFromJobStatuses(jobStatusRetriever.dagManagerEnabled,
 jobStatusRetriever.getJobStatusesForFlowExecution(FLOW_NAME, FLOW_GROUP, 
flowExecutionId)));
+        
jobStatusRetriever.getFlowStatusFromJobStatuses(jobStatusRetriever.getJobStatusesForFlowExecution(FLOW_NAME,
 FLOW_GROUP, flowExecutionId)));
 
     addJobStatusToStateStore(flowExecutionId, JobStatusRetriever.NA_KEY, 
ExecutionStatus.ORCHESTRATED.name());
     Assert.assertEquals(ExecutionStatus.ORCHESTRATED,
-        
jobStatusRetriever.getFlowStatusFromJobStatuses(jobStatusRetriever.dagManagerEnabled,
 jobStatusRetriever.getJobStatusesForFlowExecution(FLOW_NAME, FLOW_GROUP, 
flowExecutionId)));
+        
jobStatusRetriever.getFlowStatusFromJobStatuses(jobStatusRetriever.getJobStatusesForFlowExecution(FLOW_NAME,
 FLOW_GROUP, flowExecutionId)));
 
     addJobStatusToStateStore(flowExecutionId, MY_JOB_NAME_1, 
ExecutionStatus.RUNNING.name(), JOB_ORCHESTRATED_TIME, JOB_ORCHESTRATED_TIME);
     Assert.assertEquals(ExecutionStatus.ORCHESTRATED,
-        
jobStatusRetriever.getFlowStatusFromJobStatuses(jobStatusRetriever.dagManagerEnabled,
 jobStatusRetriever.getJobStatusesForFlowExecution(FLOW_NAME, FLOW_GROUP, 
flowExecutionId)));
+        
jobStatusRetriever.getFlowStatusFromJobStatuses(jobStatusRetriever.getJobStatusesForFlowExecution(FLOW_NAME,
 FLOW_GROUP, flowExecutionId)));
 
     addJobStatusToStateStore(flowExecutionId, JobStatusRetriever.NA_KEY, 
ExecutionStatus.RUNNING.name());
     Assert.assertEquals(ExecutionStatus.RUNNING,
-        
jobStatusRetriever.getFlowStatusFromJobStatuses(jobStatusRetriever.dagManagerEnabled,
 jobStatusRetriever.getJobStatusesForFlowExecution(FLOW_NAME, FLOW_GROUP, 
flowExecutionId)));
+        
jobStatusRetriever.getFlowStatusFromJobStatuses(jobStatusRetriever.getJobStatusesForFlowExecution(FLOW_NAME,
 FLOW_GROUP, flowExecutionId)));
 
     addJobStatusToStateStore(flowExecutionId, MY_JOB_NAME_1, 
ExecutionStatus.COMPLETE.name(), JOB_ORCHESTRATED_TIME, JOB_ORCHESTRATED_TIME);
     Assert.assertEquals(ExecutionStatus.RUNNING,
-        
jobStatusRetriever.getFlowStatusFromJobStatuses(jobStatusRetriever.dagManagerEnabled,
 jobStatusRetriever.getJobStatusesForFlowExecution(FLOW_NAME, FLOW_GROUP, 
flowExecutionId)));
+        
jobStatusRetriever.getFlowStatusFromJobStatuses(jobStatusRetriever.getJobStatusesForFlowExecution(FLOW_NAME,
 FLOW_GROUP, flowExecutionId)));
 
     addJobStatusToStateStore(flowExecutionId, JobStatusRetriever.NA_KEY, 
ExecutionStatus.COMPLETE.name());
     Assert.assertEquals(ExecutionStatus.COMPLETE,
-        
jobStatusRetriever.getFlowStatusFromJobStatuses(jobStatusRetriever.dagManagerEnabled,
 jobStatusRetriever.getJobStatusesForFlowExecution(FLOW_NAME, FLOW_GROUP, 
flowExecutionId)));
+        
jobStatusRetriever.getFlowStatusFromJobStatuses(jobStatusRetriever.getJobStatusesForFlowExecution(FLOW_NAME,
 FLOW_GROUP, flowExecutionId)));
   }
 
   @Test
diff --git 
a/gobblin-service/src/test/java/org/apache/gobblin/service/monitoring/MysqlJobStatusRetrieverTestWithoutDagManager.java
 
b/gobblin-service/src/test/java/org/apache/gobblin/service/monitoring/MysqlJobStatusRetrieverTestWithoutDagManager.java
deleted file mode 100644
index 03a621927..000000000
--- 
a/gobblin-service/src/test/java/org/apache/gobblin/service/monitoring/MysqlJobStatusRetrieverTestWithoutDagManager.java
+++ /dev/null
@@ -1,128 +0,0 @@
-/*
- * Licensed to the Apache Software Foundation (ASF) under one or more
- * contributor license agreements.  See the NOTICE file distributed with
- * this work for additional information regarding copyright ownership.
- * The ASF licenses this file to You under the Apache License, Version 2.0
- * (the "License"); you may not use this file except in compliance with
- * the License.  You may obtain a copy of the License at
- *
- *    http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
- */
-
-package org.apache.gobblin.service.monitoring;
-
-import java.io.IOException;
-
-import org.testng.Assert;
-import org.testng.annotations.BeforeClass;
-import org.testng.annotations.Test;
-
-import org.apache.gobblin.config.ConfigBuilder;
-import org.apache.gobblin.configuration.ConfigurationKeys;
-import org.apache.gobblin.configuration.State;
-import org.apache.gobblin.metastore.MysqlJobStatusStateStore;
-import org.apache.gobblin.metastore.testing.ITestMetastoreDatabase;
-import org.apache.gobblin.metastore.testing.TestMetastoreDatabaseFactory;
-import org.apache.gobblin.runtime.troubleshooter.MultiContextIssueRepository;
-import org.apache.gobblin.service.ExecutionStatus;
-import org.apache.gobblin.service.ServiceConfigKeys;
-
-import static org.mockito.Mockito.mock;
-
-
-/**
- * Flow status can be different when DagManager is not being used. So we need 
separate unit tests for testing job/flow
- * status when DagManager is disabled.
- */
-public class MysqlJobStatusRetrieverTestWithoutDagManager extends 
JobStatusRetrieverTest {
-  private MysqlJobStatusStateStore<State> dbJobStateStore;
-  private static final String TEST_USER = "testUser";
-  private static final String TEST_PASSWORD = "testPassword";
-
-  @BeforeClass
-  @Override
-  public void setUp() throws Exception {
-    ITestMetastoreDatabase testMetastoreDatabase = 
TestMetastoreDatabaseFactory.get();
-    String jdbcUrl = testMetastoreDatabase.getJdbcUrl();
-
-    ConfigBuilder configBuilder = ConfigBuilder.create();
-    
configBuilder.addPrimitive(MysqlJobStatusRetriever.MYSQL_JOB_STATUS_RETRIEVER_PREFIX
 + "." + ConfigurationKeys.STATE_STORE_DB_URL_KEY, jdbcUrl);
-    
configBuilder.addPrimitive(MysqlJobStatusRetriever.MYSQL_JOB_STATUS_RETRIEVER_PREFIX
 + "." + ConfigurationKeys.STATE_STORE_DB_USER_KEY, TEST_USER);
-    
configBuilder.addPrimitive(MysqlJobStatusRetriever.MYSQL_JOB_STATUS_RETRIEVER_PREFIX
 + "." + ConfigurationKeys.STATE_STORE_DB_PASSWORD_KEY, TEST_PASSWORD);
-
-    
configBuilder.addPrimitive(ServiceConfigKeys.GOBBLIN_SERVICE_DAG_MANAGER_ENABLED_KEY,
 "false");
-    this.jobStatusRetriever =
-        new MysqlJobStatusRetriever(configBuilder.build(), 
mock(MultiContextIssueRepository.class));
-    this.dbJobStateStore = ((MysqlJobStatusRetriever) 
this.jobStatusRetriever).getStateStore();
-    cleanUpDir();
-  }
-
-  @Test
-  public void testGetJobStatusesForFlowExecution() throws IOException {
-    super.testGetJobStatusesForFlowExecution();
-  }
-
-  @Test (dependsOnMethods = "testGetJobStatusesForFlowExecution")
-  public void testJobTiming() throws Exception {
-    super.testJobTiming();
-  }
-
-  @Test (dependsOnMethods = "testJobTiming")
-  public void testOutOfOrderJobTimingEvents() throws IOException {
-    super.testOutOfOrderJobTimingEvents();
-  }
-
-  @Test (dependsOnMethods = "testJobTiming")
-  public void testGetJobStatusesForFlowExecution1() {
-    super.testGetJobStatusesForFlowExecution1();
-  }
-
-  @Test (dependsOnMethods = "testGetJobStatusesForFlowExecution1")
-  public void testGetLatestExecutionIdsForFlow() throws Exception {
-    super.testGetLatestExecutionIdsForFlow();
-  }
-
-  @Test (dependsOnMethods = "testGetLatestExecutionIdsForFlow")
-  public void testGetFlowStatusFromJobStatuses() throws Exception {
-    long flowExecutionId = 1237L;
-
-    addJobStatusToStateStore(flowExecutionId, JobStatusRetriever.NA_KEY, 
ExecutionStatus.COMPILED.name());
-    Assert.assertEquals(ExecutionStatus.$UNKNOWN,
-        
JobStatusRetriever.getFlowStatusFromJobStatuses(jobStatusRetriever.dagManagerEnabled,
 jobStatusRetriever.getJobStatusesForFlowExecution(FLOW_NAME, FLOW_GROUP, 
flowExecutionId)));
-
-    addJobStatusToStateStore(flowExecutionId, JobStatusRetriever.NA_KEY, 
ExecutionStatus.ORCHESTRATED.name());
-    Assert.assertEquals(ExecutionStatus.$UNKNOWN,
-        
JobStatusRetriever.getFlowStatusFromJobStatuses(jobStatusRetriever.dagManagerEnabled,
 jobStatusRetriever.getJobStatusesForFlowExecution(FLOW_NAME, FLOW_GROUP, 
flowExecutionId)));
-
-    addJobStatusToStateStore(flowExecutionId, MY_JOB_NAME_1, 
ExecutionStatus.ORCHESTRATED.name(), JOB_ORCHESTRATED_TIME, 
JOB_ORCHESTRATED_TIME);
-    Assert.assertEquals(ExecutionStatus.ORCHESTRATED,
-        
JobStatusRetriever.getFlowStatusFromJobStatuses(jobStatusRetriever.dagManagerEnabled,
 jobStatusRetriever.getJobStatusesForFlowExecution(FLOW_NAME, FLOW_GROUP, 
flowExecutionId)));
-
-    addJobStatusToStateStore(flowExecutionId, JobStatusRetriever.NA_KEY, 
ExecutionStatus.RUNNING.name());
-    Assert.assertEquals(ExecutionStatus.ORCHESTRATED,
-        
JobStatusRetriever.getFlowStatusFromJobStatuses(jobStatusRetriever.dagManagerEnabled,
 jobStatusRetriever.getJobStatusesForFlowExecution(FLOW_NAME, FLOW_GROUP, 
flowExecutionId)));
-
-    addJobStatusToStateStore(flowExecutionId, MY_JOB_NAME_1, 
ExecutionStatus.RUNNING.name(), JOB_ORCHESTRATED_TIME, JOB_ORCHESTRATED_TIME);
-    Assert.assertEquals(ExecutionStatus.RUNNING,
-        
JobStatusRetriever.getFlowStatusFromJobStatuses(jobStatusRetriever.dagManagerEnabled,
 jobStatusRetriever.getJobStatusesForFlowExecution(FLOW_NAME, FLOW_GROUP, 
flowExecutionId)));
-
-    addJobStatusToStateStore(flowExecutionId, JobStatusRetriever.NA_KEY, 
ExecutionStatus.COMPLETE.name(), JOB_ORCHESTRATED_TIME, JOB_ORCHESTRATED_TIME);
-    Assert.assertEquals(ExecutionStatus.RUNNING,
-        
JobStatusRetriever.getFlowStatusFromJobStatuses(jobStatusRetriever.dagManagerEnabled,
 jobStatusRetriever.getJobStatusesForFlowExecution(FLOW_NAME, FLOW_GROUP, 
flowExecutionId)));
-
-    addJobStatusToStateStore(flowExecutionId, MY_JOB_NAME_1, 
ExecutionStatus.COMPLETE.name());
-    Assert.assertEquals(ExecutionStatus.COMPLETE,
-        
JobStatusRetriever.getFlowStatusFromJobStatuses(jobStatusRetriever.dagManagerEnabled,
 jobStatusRetriever.getJobStatusesForFlowExecution(FLOW_NAME, FLOW_GROUP, 
flowExecutionId)));
-  }
-
-  @Override
-  void cleanUpDir() throws Exception {
-    
this.dbJobStateStore.delete(KafkaJobStatusMonitor.jobStatusStoreName(FLOW_GROUP,
 FLOW_NAME));
-  }
-}

Reply via email to