This is an automated email from the ASF dual-hosted git repository.

arjun4084346 pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/gobblin.git


The following commit(s) were added to refs/heads/master by this push:
     new 221720526 [GOBBLIN-2013] update guice initialization for 
'DagProcEngine enabled' and related classes  (#3892)
221720526 is described below

commit 2217205267d1ab184b4fbc1e4e94550c04b09831
Author: Arjun Singh Bora <[email protected]>
AuthorDate: Mon Mar 11 23:32:33 2024 -0700

    [GOBBLIN-2013] update guice initialization for 'DagProcEngine enabled' and 
related classes  (#3892)
    
    * update guice module
    * address review comment
---
 .../gobblin/runtime/util/InjectionNames.java       |  4 +++
 .../modules/core/GobblinServiceConfiguration.java  |  4 +++
 .../modules/core/GobblinServiceGuiceModule.java    | 27 ++++++++++++++++-
 .../MostlyMySqlDagManagementStateStore.java        |  4 +--
 .../modules/orchestration/Orchestrator.java        | 13 ++++----
 .../modules/orchestration/proc/LaunchDagProc.java  |  4 +--
 .../scheduler/GobblinServiceJobScheduler.java      | 13 ++++----
 .../utils/FlowCompilationValidationHelper.java     | 35 ++++++++++++++++++++--
 ...agementDagActionStoreChangeMonitorFactory.java} |  4 +--
 .../gobblin/service/GobblinServiceManagerTest.java |  2 ++
 .../DagManagementTaskStreamImplTest.java           |  5 ++--
 .../orchestration/DagProcessingEngineTest.java     |  2 +-
 .../MostlyMySqlDagManagementStateStoreTest.java    |  5 ++--
 .../modules/orchestration/OrchestratorTest.java    | 19 ++++++++++--
 .../scheduler/GobblinServiceJobSchedulerTest.java  |  8 ++---
 15 files changed, 115 insertions(+), 34 deletions(-)

diff --git 
a/gobblin-runtime/src/main/java/org/apache/gobblin/runtime/util/InjectionNames.java
 
b/gobblin-runtime/src/main/java/org/apache/gobblin/runtime/util/InjectionNames.java
index b9ff94f8a..55aa2130d 100644
--- 
a/gobblin-runtime/src/main/java/org/apache/gobblin/runtime/util/InjectionNames.java
+++ 
b/gobblin-runtime/src/main/java/org/apache/gobblin/runtime/util/InjectionNames.java
@@ -17,6 +17,9 @@
 
 package org.apache.gobblin.runtime.util;
 
+import org.apache.gobblin.service.ServiceConfigKeys;
+
+
 /**
  * These names are used for dependency injection, when we need to inject 
different instances of the same type,
  * or inject constants.
@@ -29,4 +32,5 @@ public final class InjectionNames {
   // TODO: Rename `warm_standby_enabled` config to 
`message_forwarding_enabled` since it's a misnomer.
   public static final String WARM_STANDBY_ENABLED = "statelessRestAPIEnabled";
   public static final String MULTI_ACTIVE_SCHEDULER_ENABLED = 
"multiActiveSchedulerEnabled";
+  public static final String DAG_PROC_ENGINE_ENABLED = 
ServiceConfigKeys.DAG_PROCESSING_ENGINE_ENABLED;
 }
diff --git 
a/gobblin-service/src/main/java/org/apache/gobblin/service/modules/core/GobblinServiceConfiguration.java
 
b/gobblin-service/src/main/java/org/apache/gobblin/service/modules/core/GobblinServiceConfiguration.java
index 1998b541d..83b64c900 100644
--- 
a/gobblin-service/src/main/java/org/apache/gobblin/service/modules/core/GobblinServiceConfiguration.java
+++ 
b/gobblin-service/src/main/java/org/apache/gobblin/service/modules/core/GobblinServiceConfiguration.java
@@ -73,6 +73,9 @@ public class GobblinServiceConfiguration {
   @Getter
   private final boolean onlyAnnounceLeader;
 
+  @Getter
+  private final boolean isDagProcessingEngineEnabled;
+
   @Getter
   private final Config innerConfig;
 
@@ -114,5 +117,6 @@ public class GobblinServiceConfiguration {
     this.isTopologySpecFactoryEnabled =
         ConfigUtils.getBoolean(config, 
ServiceConfigKeys.GOBBLIN_SERVICE_TOPOLOGY_SPEC_FACTORY_ENABLED_KEY, true);
     this.onlyAnnounceLeader = ConfigUtils.getBoolean(config, 
ServiceConfigKeys.GOBBLIN_SERVICE_D2_ONLY_ANNOUNCE_LEADER, false);
+    this.isDagProcessingEngineEnabled = ConfigUtils.getBoolean(config, 
ServiceConfigKeys.DAG_PROCESSING_ENGINE_ENABLED, false);
   }
 }
diff --git 
a/gobblin-service/src/main/java/org/apache/gobblin/service/modules/core/GobblinServiceGuiceModule.java
 
b/gobblin-service/src/main/java/org/apache/gobblin/service/modules/core/GobblinServiceGuiceModule.java
index e108dec14..01b2598b0 100644
--- 
a/gobblin-service/src/main/java/org/apache/gobblin/service/modules/core/GobblinServiceGuiceModule.java
+++ 
b/gobblin-service/src/main/java/org/apache/gobblin/service/modules/core/GobblinServiceGuiceModule.java
@@ -67,8 +67,15 @@ import org.apache.gobblin.service.ServiceConfigKeys;
 import org.apache.gobblin.service.modules.db.ServiceDatabaseManager;
 import org.apache.gobblin.service.modules.db.ServiceDatabaseProvider;
 import org.apache.gobblin.service.modules.db.ServiceDatabaseProviderImpl;
+import org.apache.gobblin.service.modules.orchestration.DagManagement;
+import 
org.apache.gobblin.service.modules.orchestration.DagManagementStateStore;
+import 
org.apache.gobblin.service.modules.orchestration.DagManagementTaskStreamImpl;
 import org.apache.gobblin.service.modules.orchestration.DagManager;
+import org.apache.gobblin.service.modules.orchestration.DagProcFactory;
+import org.apache.gobblin.service.modules.orchestration.DagProcessingEngine;
+import org.apache.gobblin.service.modules.orchestration.DagTaskStream;
 import org.apache.gobblin.service.modules.orchestration.FlowTriggerHandler;
+import 
org.apache.gobblin.service.modules.orchestration.MostlyMySqlDagManagementStateStore;
 import org.apache.gobblin.service.modules.orchestration.Orchestrator;
 import org.apache.gobblin.service.modules.orchestration.UserQuotaManager;
 import 
org.apache.gobblin.service.modules.restli.GobblinServiceFlowConfigResourceHandler;
@@ -79,10 +86,12 @@ import 
org.apache.gobblin.service.modules.restli.GobblinServiceFlowExecutionReso
 import org.apache.gobblin.service.modules.scheduler.GobblinServiceJobScheduler;
 import org.apache.gobblin.service.modules.topology.TopologySpecFactory;
 import 
org.apache.gobblin.service.modules.troubleshooter.MySqlMultiContextIssueRepository;
+import 
org.apache.gobblin.service.modules.utils.FlowCompilationValidationHelper;
 import org.apache.gobblin.service.modules.utils.HelixUtils;
 import org.apache.gobblin.service.modules.utils.SharedFlowMetricsSingleton;
 import org.apache.gobblin.service.monitoring.DagActionStoreChangeMonitor;
 import 
org.apache.gobblin.service.monitoring.DagActionStoreChangeMonitorFactory;
+import 
org.apache.gobblin.service.monitoring.DagManagementDagActionStoreChangeMonitorFactory;
 import org.apache.gobblin.service.monitoring.FlowStatusGenerator;
 import org.apache.gobblin.service.monitoring.FsJobStatusRetriever;
 import org.apache.gobblin.service.monitoring.GitConfigMonitor;
@@ -154,6 +163,10 @@ public class GobblinServiceGuiceModule implements Module {
     binder.bindConstant()
         
.annotatedWith(Names.named(InjectionNames.MULTI_ACTIVE_SCHEDULER_ENABLED))
         .to(serviceConfig.isMultiActiveSchedulerEnabled());
+    binder.bindConstant()
+        .annotatedWith(Names.named(InjectionNames.DAG_PROC_ENGINE_ENABLED))
+        .to(serviceConfig.isDagProcessingEngineEnabled());
+
     OptionalBinder.newOptionalBinder(binder, DagActionStore.class);
     if (serviceConfig.isWarmStandbyEnabled()) {
       binder.bind(DagActionStore.class).to(MysqlDagActionStore.class);
@@ -173,6 +186,12 @@ public class GobblinServiceGuiceModule implements Module {
       binder.bind(FlowTriggerHandler.class);
     }
 
+    binder.bind(DagManagement.class).to(DagManagementTaskStreamImpl.class);
+    binder.bind(DagTaskStream.class).to(DagManagementTaskStreamImpl.class);
+    
binder.bind(DagManagementStateStore.class).to(MostlyMySqlDagManagementStateStore.class).in(Singleton.class);
+    binder.bind(DagProcFactory.class);
+    binder.bind(DagProcessingEngine.class);
+
     binder.bind(FlowConfigsResource.class);
     binder.bind(FlowConfigsV2Resource.class);
     binder.bind(FlowStatusResource.class);
@@ -188,6 +207,7 @@ public class GobblinServiceGuiceModule implements Module {
         .to(NoopRequesterService.class);
 
     binder.bind(SharedFlowMetricsSingleton.class);
+    binder.bind(FlowCompilationValidationHelper.class);
 
     OptionalBinder.newOptionalBinder(binder, TopologyCatalog.class);
     binder.bind(TopologyCatalog.class);
@@ -248,7 +268,12 @@ public class GobblinServiceGuiceModule implements Module {
 
     if (serviceConfig.isWarmStandbyEnabled()) {
       
binder.bind(SpecStoreChangeMonitor.class).toProvider(SpecStoreChangeMonitorFactory.class).in(Singleton.class);
-      
binder.bind(DagActionStoreChangeMonitor.class).toProvider(DagActionStoreChangeMonitorFactory.class).in(Singleton.class);
+      if (serviceConfig.isDagProcessingEngineEnabled()) {
+        binder.bind(DagActionStoreChangeMonitor.class)
+            
.toProvider(DagManagementDagActionStoreChangeMonitorFactory.class).in(Singleton.class);
+      } else {
+        
binder.bind(DagActionStoreChangeMonitor.class).toProvider(DagActionStoreChangeMonitorFactory.class).in(Singleton.class);
+      }
     }
 
     binder.bind(GobblinServiceManager.class);
diff --git 
a/gobblin-service/src/main/java/org/apache/gobblin/service/modules/orchestration/MostlyMySqlDagManagementStateStore.java
 
b/gobblin-service/src/main/java/org/apache/gobblin/service/modules/orchestration/MostlyMySqlDagManagementStateStore.java
index 48b789d11..10c69947f 100644
--- 
a/gobblin-service/src/main/java/org/apache/gobblin/service/modules/orchestration/MostlyMySqlDagManagementStateStore.java
+++ 
b/gobblin-service/src/main/java/org/apache/gobblin/service/modules/orchestration/MostlyMySqlDagManagementStateStore.java
@@ -70,8 +70,8 @@ public class MostlyMySqlDagManagementStateStore implements 
DagManagementStateSto
   FlowCatalog flowCatalog;
 
   @Inject
-  public MostlyMySqlDagManagementStateStore(Config config, FlowCatalog 
flowCatalog) throws IOException {
-    this.quotaManager = new MysqlUserQuotaManager(config);
+  public MostlyMySqlDagManagementStateStore(Config config, FlowCatalog 
flowCatalog, UserQuotaManager userQuotaManager) {
+    this.quotaManager = userQuotaManager;
     this.config = config;
     this.flowCatalog = flowCatalog;
    }
diff --git 
a/gobblin-service/src/main/java/org/apache/gobblin/service/modules/orchestration/Orchestrator.java
 
b/gobblin-service/src/main/java/org/apache/gobblin/service/modules/orchestration/Orchestrator.java
index 618d8b050..daf6aff3f 100644
--- 
a/gobblin-service/src/main/java/org/apache/gobblin/service/modules/orchestration/Orchestrator.java
+++ 
b/gobblin-service/src/main/java/org/apache/gobblin/service/modules/orchestration/Orchestrator.java
@@ -111,14 +111,13 @@ public class Orchestrator implements SpecCatalogListener, 
Instrumentable {
   @Getter
   private final SharedFlowMetricsSingleton sharedFlowMetricsSingleton;
 
-  private final ClassAliasResolver<SpecCompiler> aliasResolver;
-
   @Inject
   public Orchestrator(Config config, TopologyCatalog topologyCatalog, 
DagManager dagManager,
       Optional<Logger> log, FlowStatusGenerator flowStatusGenerator, 
Optional<FlowTriggerHandler> flowTriggerHandler,
-      SharedFlowMetricsSingleton sharedFlowMetricsSingleton, 
Optional<FlowCatalog> flowCatalog) {
+      SharedFlowMetricsSingleton sharedFlowMetricsSingleton, 
Optional<FlowCatalog> flowCatalog,
+      DagManagementStateStore dagManagementStateStore, 
FlowCompilationValidationHelper flowCompilationValidationHelper) throws 
IOException {
     _log = log.isPresent() ? log.get() : LoggerFactory.getLogger(getClass());
-    this.aliasResolver = new ClassAliasResolver<>(SpecCompiler.class);
+    ClassAliasResolver<SpecCompiler> aliasResolver = new 
ClassAliasResolver<>(SpecCompiler.class);
     this.topologyCatalog = topologyCatalog;
     this.dagManager = dagManager;
     this.flowStatusGenerator = flowStatusGenerator;
@@ -132,7 +131,7 @@ public class Orchestrator implements SpecCatalogListener, 
Instrumentable {
       }
       _log.info("Using specCompiler class name/alias " + 
specCompilerClassName);
 
-      this.specCompiler = (SpecCompiler) 
ConstructorUtils.invokeConstructor(Class.forName(this.aliasResolver.resolve(specCompilerClassName)),
 config);
+      this.specCompiler = (SpecCompiler) 
ConstructorUtils.invokeConstructor(Class.forName(aliasResolver.resolve(specCompilerClassName)),
 config);
     } catch (NoSuchMethodException | IllegalAccessException | 
InvocationTargetException | InstantiationException |
              ClassNotFoundException e) {
       throw new RuntimeException(e);
@@ -140,6 +139,7 @@ public class Orchestrator implements SpecCatalogListener, 
Instrumentable {
 
     //At this point, the TopologySpecMap is initialized by the SpecCompiler. 
Pass the TopologySpecMap to the DagManager.
     this.dagManager.setTopologySpecMap(getSpecCompiler().getTopologySpecMap());
+    ((MostlyMySqlDagManagementStateStore) 
dagManagementStateStore).setTopologySpecMap(getSpecCompiler().getTopologySpecMap());
 
     this.metricContext = 
Instrumented.getMetricContext(ConfigUtils.configToState(config), 
this.specCompiler.getClass());
     this.flowOrchestrationSuccessFulMeter = 
this.metricContext.meter(ServiceMetricNames.FLOW_ORCHESTRATION_SUCCESSFUL_METER);
@@ -153,8 +153,7 @@ public class Orchestrator implements SpecCatalogListener, 
Instrumentable {
     quotaManager = 
GobblinConstructorUtils.invokeConstructor(UserQuotaManager.class,
         ConfigUtils.getString(config, ServiceConfigKeys.QUOTA_MANAGER_CLASS, 
ServiceConfigKeys.DEFAULT_QUOTA_MANAGER),
         config);
-    this.flowCompilationValidationHelper = new 
FlowCompilationValidationHelper(sharedFlowMetricsSingleton, specCompiler,
-        quotaManager, eventSubmitter, flowStatusGenerator, 
isFlowConcurrencyEnabled);
+    this.flowCompilationValidationHelper = flowCompilationValidationHelper;
   }
 
   @VisibleForTesting
diff --git 
a/gobblin-service/src/main/java/org/apache/gobblin/service/modules/orchestration/proc/LaunchDagProc.java
 
b/gobblin-service/src/main/java/org/apache/gobblin/service/modules/orchestration/proc/LaunchDagProc.java
index 72a59ee0e..d9afff5b4 100644
--- 
a/gobblin-service/src/main/java/org/apache/gobblin/service/modules/orchestration/proc/LaunchDagProc.java
+++ 
b/gobblin-service/src/main/java/org/apache/gobblin/service/modules/orchestration/proc/LaunchDagProc.java
@@ -45,9 +45,9 @@ public class LaunchDagProc extends 
DagProc<Optional<Dag<JobExecutionPlan>>, Opti
   public LaunchDagProc(LaunchDagTask launchDagTask) {
     this.launchDagTask = launchDagTask;
     this.orchestrationDelayCounter = new AtomicLong(0);
-    ContextAwareGauge<Long> orchestrationDelayMetric = 
this.metricContext.newContextAwareGauge
+    ContextAwareGauge<Long> orchestrationDelayMetric = 
metricContext.newContextAwareGauge
         (ServiceMetricNames.FLOW_ORCHESTRATION_DELAY, 
orchestrationDelayCounter::get);
-    this.metricContext.register(orchestrationDelayMetric);
+    metricContext.register(orchestrationDelayMetric);
   }
 
   @Override
diff --git 
a/gobblin-service/src/main/java/org/apache/gobblin/service/modules/scheduler/GobblinServiceJobScheduler.java
 
b/gobblin-service/src/main/java/org/apache/gobblin/service/modules/scheduler/GobblinServiceJobScheduler.java
index ba614867f..59d7b0c9b 100644
--- 
a/gobblin-service/src/main/java/org/apache/gobblin/service/modules/scheduler/GobblinServiceJobScheduler.java
+++ 
b/gobblin-service/src/main/java/org/apache/gobblin/service/modules/scheduler/GobblinServiceJobScheduler.java
@@ -84,11 +84,13 @@ import org.apache.gobblin.scheduler.JobScheduler;
 import org.apache.gobblin.scheduler.SchedulerService;
 import org.apache.gobblin.service.ServiceConfigKeys;
 import org.apache.gobblin.service.modules.flowgraph.Dag;
+import 
org.apache.gobblin.service.modules.orchestration.DagManagementStateStore;
 import org.apache.gobblin.service.modules.orchestration.DagManager;
 import org.apache.gobblin.service.modules.orchestration.FlowTriggerHandler;
 import org.apache.gobblin.service.modules.orchestration.Orchestrator;
 import org.apache.gobblin.service.modules.orchestration.UserQuotaManager;
 import org.apache.gobblin.service.modules.spec.JobExecutionPlan;
+import 
org.apache.gobblin.service.modules.utils.FlowCompilationValidationHelper;
 import org.apache.gobblin.service.modules.utils.SharedFlowMetricsSingleton;
 import org.apache.gobblin.service.monitoring.FlowStatusGenerator;
 import org.apache.gobblin.util.ConfigUtils;
@@ -120,7 +122,7 @@ public class GobblinServiceJobScheduler extends 
JobScheduler implements SpecCata
   protected final Optional<UserQuotaManager> quotaManager;
   protected final Optional<FlowTriggerHandler> flowTriggerHandler;
   @Getter
-  protected final Map<String, Spec> scheduledFlowSpecs;
+  protected final Map<String, FlowSpec> scheduledFlowSpecs;
   @Getter
   protected final Map<String, Long> lastUpdatedTimeForFlowSpec;
   protected volatile int loadSpecsBatchSize = -1;
@@ -214,11 +216,12 @@ public class GobblinServiceJobScheduler extends 
JobScheduler implements SpecCata
       Optional<HelixManager> helixManager, Optional<FlowCatalog> flowCatalog, 
TopologyCatalog topologyCatalog,
       DagManager dagManager, Optional<UserQuotaManager> quotaManager, 
SchedulerService schedulerService,
       Optional<Logger> log, boolean isWarmStandbyEnabled, Optional 
<FlowTriggerHandler> flowTriggerHandler,
-      SharedFlowMetricsSingleton sharedFlowMetricsSingleton)
+      SharedFlowMetricsSingleton sharedFlowMetricsSingleton, 
DagManagementStateStore dagManagementStateStore,
+      FlowCompilationValidationHelper flowCompilationValidationHelper)
       throws Exception {
     this(serviceName, config, helixManager, flowCatalog,
         new Orchestrator(config, topologyCatalog, dagManager, log, 
flowStatusGenerator, flowTriggerHandler,
-            sharedFlowMetricsSingleton, flowCatalog),
+            sharedFlowMetricsSingleton, flowCatalog, dagManagementStateStore, 
flowCompilationValidationHelper),
         schedulerService, quotaManager, log, isWarmStandbyEnabled, 
flowTriggerHandler);
   }
 
@@ -502,7 +505,7 @@ public class GobblinServiceJobScheduler extends 
JobScheduler implements SpecCata
   @Override
   public void runJob(Properties jobProps, JobListener jobListener) throws 
JobException {
     try {
-      Spec flowSpec = 
this.scheduledFlowSpecs.get(jobProps.getProperty(ConfigurationKeys.JOB_NAME_KEY));
+      FlowSpec flowSpec = 
this.scheduledFlowSpecs.get(jobProps.getProperty(ConfigurationKeys.JOB_NAME_KEY));
       // The trigger event time will be missing for adhoc and run-immediately 
flows, so we set the default here
       String triggerTimestampMillis = jobProps.getProperty(
           ConfigurationKeys.ORCHESTRATOR_TRIGGER_EVENT_TIME_MILLIS_KEY,
@@ -597,7 +600,7 @@ public class GobblinServiceJobScheduler extends 
JobScheduler implements SpecCata
     }
 
     // todo : we should probably not schedule a flow if it is a runOnce flow
-    this.scheduledFlowSpecs.put(flowSpecUri.toString(), addedSpec);
+    this.scheduledFlowSpecs.put(flowSpecUri.toString(), flowSpec);
     this.lastUpdatedTimeForFlowSpec.put(flowSpecUri.toString(), 
modificationTime);
 
     if (jobConfig.containsKey(ConfigurationKeys.JOB_SCHEDULE_KEY)) {
diff --git 
a/gobblin-service/src/main/java/org/apache/gobblin/service/modules/utils/FlowCompilationValidationHelper.java
 
b/gobblin-service/src/main/java/org/apache/gobblin/service/modules/utils/FlowCompilationValidationHelper.java
index 7400602f7..8be9b384c 100644
--- 
a/gobblin-service/src/main/java/org/apache/gobblin/service/modules/utils/FlowCompilationValidationHelper.java
+++ 
b/gobblin-service/src/main/java/org/apache/gobblin/service/modules/utils/FlowCompilationValidationHelper.java
@@ -17,23 +17,33 @@
 
 package org.apache.gobblin.service.modules.utils;
 
-import com.google.common.base.Optional;
-import com.typesafe.config.Config;
 import java.io.IOException;
+import java.lang.reflect.InvocationTargetException;
 import java.util.Map;
+
+import org.apache.commons.lang3.reflect.ConstructorUtils;
+
+import com.google.common.base.Optional;
+import com.google.inject.Inject;
+import com.typesafe.config.Config;
+
 import lombok.Data;
 import lombok.extern.slf4j.Slf4j;
+
 import org.apache.gobblin.configuration.ConfigurationKeys;
 import org.apache.gobblin.instrumented.Instrumented;
+import org.apache.gobblin.metrics.MetricContext;
 import org.apache.gobblin.metrics.event.EventSubmitter;
 import org.apache.gobblin.metrics.event.TimingEvent;
 import org.apache.gobblin.runtime.api.FlowSpec;
+import org.apache.gobblin.service.ServiceConfigKeys;
 import org.apache.gobblin.service.modules.flow.SpecCompiler;
 import org.apache.gobblin.service.modules.flowgraph.Dag;
 import org.apache.gobblin.service.modules.orchestration.TimingEventUtils;
 import org.apache.gobblin.service.modules.orchestration.UserQuotaManager;
 import org.apache.gobblin.service.modules.spec.JobExecutionPlan;
 import org.apache.gobblin.service.monitoring.FlowStatusGenerator;
+import org.apache.gobblin.util.ClassAliasResolver;
 import org.apache.gobblin.util.ConfigUtils;
 
 
@@ -59,6 +69,27 @@ public final class FlowCompilationValidationHelper {
   private final FlowStatusGenerator flowStatusGenerator;
   private final boolean isFlowConcurrencyEnabled;
 
+  @Inject
+  public FlowCompilationValidationHelper(Config config, 
SharedFlowMetricsSingleton sharedFlowMetricsSingleton,
+      UserQuotaManager userQuotaManager, FlowStatusGenerator 
flowStatusGenerator) {
+    try {
+      String specCompilerClassName = ConfigUtils.getString(config, 
ServiceConfigKeys.GOBBLIN_SERVICE_FLOWCOMPILER_CLASS_KEY,
+          ServiceConfigKeys.DEFAULT_GOBBLIN_SERVICE_FLOWCOMPILER_CLASS);
+      this.specCompiler = (SpecCompiler) 
ConstructorUtils.invokeConstructor(Class.forName(
+          new 
ClassAliasResolver<>(SpecCompiler.class).resolve(specCompilerClassName)), 
config);
+    } catch (NoSuchMethodException | IllegalAccessException | 
InvocationTargetException | InstantiationException |
+             ClassNotFoundException e) {
+      throw new RuntimeException(e);
+    }
+    this.sharedFlowMetricsSingleton = sharedFlowMetricsSingleton;
+    this.quotaManager = userQuotaManager;
+    MetricContext metricContext = 
Instrumented.getMetricContext(ConfigUtils.configToState(config), 
this.specCompiler.getClass());
+    this.eventSubmitter = new EventSubmitter.Builder(metricContext, 
"org.apache.gobblin.service").build();
+    this.flowStatusGenerator = flowStatusGenerator;
+    this.isFlowConcurrencyEnabled = ConfigUtils.getBoolean(config, 
ServiceConfigKeys.FLOW_CONCURRENCY_ALLOWED,
+        ServiceConfigKeys.DEFAULT_FLOW_CONCURRENCY_ALLOWED);
+  }
+
   /**
    * For a given a flowSpec, verifies that an execution is allowed (in case 
there is an ongoing execution) and the
    * flowspec can be compiled. If the pre-conditions hold, then a 
JobExecutionPlan is constructed and returned to the
diff --git 
a/gobblin-service/src/main/java/org/apache/gobblin/service/monitoring/DagProcEngineEnabledDagActionStoreChangeMonitorFactory.java
 
b/gobblin-service/src/main/java/org/apache/gobblin/service/monitoring/DagManagementDagActionStoreChangeMonitorFactory.java
similarity index 93%
rename from 
gobblin-service/src/main/java/org/apache/gobblin/service/monitoring/DagProcEngineEnabledDagActionStoreChangeMonitorFactory.java
rename to 
gobblin-service/src/main/java/org/apache/gobblin/service/monitoring/DagManagementDagActionStoreChangeMonitorFactory.java
index fbf6bc336..4eeb2bbee 100644
--- 
a/gobblin-service/src/main/java/org/apache/gobblin/service/monitoring/DagProcEngineEnabledDagActionStoreChangeMonitorFactory.java
+++ 
b/gobblin-service/src/main/java/org/apache/gobblin/service/monitoring/DagManagementDagActionStoreChangeMonitorFactory.java
@@ -39,7 +39,7 @@ import org.apache.gobblin.util.ConfigUtils;
  * A factory implementation that returns a {@link 
DagManagementDagActionStoreChangeMonitor} instance.
  */
 @Slf4j
-public class DagProcEngineEnabledDagActionStoreChangeMonitorFactory implements 
Provider<DagActionStoreChangeMonitor> {
+public class DagManagementDagActionStoreChangeMonitorFactory implements 
Provider<DagActionStoreChangeMonitor> {
   static final String DAG_ACTION_STORE_CHANGE_MONITOR_NUM_THREADS_KEY = 
"numThreads";
 
   private final Config config;
@@ -50,7 +50,7 @@ public class 
DagProcEngineEnabledDagActionStoreChangeMonitorFactory implements P
   private final DagManagement dagManagement;
 
   @Inject
-  public DagProcEngineEnabledDagActionStoreChangeMonitorFactory(Config config, 
DagManager dagManager, FlowCatalog flowCatalog,
+  public DagManagementDagActionStoreChangeMonitorFactory(Config config, 
DagManager dagManager, FlowCatalog flowCatalog,
       Orchestrator orchestrator, DagActionStore dagActionStore, DagManagement 
dagManagement,
       @Named(InjectionNames.MULTI_ACTIVE_SCHEDULER_ENABLED) boolean 
isMultiActiveSchedulerEnabled) {
     this.config = Objects.requireNonNull(config);
diff --git 
a/gobblin-service/src/test/java/org/apache/gobblin/service/GobblinServiceManagerTest.java
 
b/gobblin-service/src/test/java/org/apache/gobblin/service/GobblinServiceManagerTest.java
index 2162bed00..9fee0d3f1 100644
--- 
a/gobblin-service/src/test/java/org/apache/gobblin/service/GobblinServiceManagerTest.java
+++ 
b/gobblin-service/src/test/java/org/apache/gobblin/service/GobblinServiceManagerTest.java
@@ -62,6 +62,7 @@ import 
org.apache.gobblin.service.modules.core.GobblinServiceManager;
 import org.apache.gobblin.service.modules.flow.MockedSpecCompiler;
 import 
org.apache.gobblin.service.modules.orchestration.AbstractUserQuotaManager;
 import org.apache.gobblin.service.modules.orchestration.DagManager;
+import org.apache.gobblin.service.modules.orchestration.MysqlDagStateStore;
 import 
org.apache.gobblin.service.modules.orchestration.ServiceAzkabanConfigKeys;
 import org.apache.gobblin.service.monitoring.FsJobStatusRetriever;
 import org.apache.gobblin.service.monitoring.GitConfigMonitor;
@@ -161,6 +162,7 @@ public class GobblinServiceManagerTest {
     serviceCoreProperties.put(FlowCatalog.FLOWSPEC_STORE_DIR_KEY, 
FLOW_SPEC_STORE_DIR);
     serviceCoreProperties.put(FlowCatalog.FLOWSPEC_STORE_CLASS_KEY, 
"org.apache.gobblin.runtime.spec_store.MysqlSpecStore");
     serviceCoreProperties.put(ConfigurationKeys.STATE_STORE_DB_TABLE_KEY, 
"flow_spec_store");
+    serviceCoreProperties.put(MysqlDagStateStore.CONFIG_PREFIX + "." + 
ConfigurationKeys.STATE_STORE_DB_TABLE_KEY, "dag_store");
     serviceCoreProperties.put(FlowCatalog.FLOWSPEC_SERDE_CLASS_KEY, 
"org.apache.gobblin.runtime.spec_serde.GsonFlowSpecSerDe");
 
     
serviceCoreProperties.put(ServiceConfigKeys.TOPOLOGY_FACTORY_TOPOLOGY_NAMES_KEY,
 TEST_GOBBLIN_EXECUTOR_NAME);
diff --git 
a/gobblin-service/src/test/java/org/apache/gobblin/service/modules/orchestration/DagManagementTaskStreamImplTest.java
 
b/gobblin-service/src/test/java/org/apache/gobblin/service/modules/orchestration/DagManagementTaskStreamImplTest.java
index c9998f046..20425071a 100644
--- 
a/gobblin-service/src/test/java/org/apache/gobblin/service/modules/orchestration/DagManagementTaskStreamImplTest.java
+++ 
b/gobblin-service/src/test/java/org/apache/gobblin/service/modules/orchestration/DagManagementTaskStreamImplTest.java
@@ -53,14 +53,13 @@ public class DagManagementTaskStreamImplTest {
     // Setting up mock DB
     testMetastoreDatabase = TestMetastoreDatabaseFactory.get();
 
-    Config config;
     ConfigBuilder configBuilder = ConfigBuilder.create();
     
configBuilder.addPrimitive(MostlyMySqlDagManagementStateStore.DAG_STATESTORE_CLASS_KEY,
 MostlyMySqlDagManagementStateStoreTest.TestMysqlDagStateStore.class.getName())
         
.addPrimitive(MysqlUserQuotaManager.qualify(ConfigurationKeys.STATE_STORE_DB_URL_KEY),
 testMetastoreDatabase.getJdbcUrl())
         
.addPrimitive(MysqlUserQuotaManager.qualify(ConfigurationKeys.STATE_STORE_DB_USER_KEY),
 TEST_USER)
         
.addPrimitive(MysqlUserQuotaManager.qualify(ConfigurationKeys.STATE_STORE_DB_PASSWORD_KEY),
 TEST_PASSWORD)
         
.addPrimitive(MysqlUserQuotaManager.qualify(ConfigurationKeys.STATE_STORE_DB_TABLE_KEY),
 TEST_TABLE);
-    config = configBuilder.build();
+    Config config = configBuilder.build();
 
     // Constructing TopologySpecMap.
     Map<URI, TopologySpec> topologySpecMap = new HashMap<>();
@@ -68,7 +67,7 @@ public class DagManagementTaskStreamImplTest {
     TopologySpec topologySpec = 
DagTestUtils.buildNaiveTopologySpec(specExecInstance);
     URI specExecURI = new URI(specExecInstance);
     topologySpecMap.put(specExecURI, topologySpec);
-    MostlyMySqlDagManagementStateStore dagManagementStateStore = new 
MostlyMySqlDagManagementStateStore(config, null);
+    MostlyMySqlDagManagementStateStore dagManagementStateStore = new 
MostlyMySqlDagManagementStateStore(config, null, null);
     dagManagementStateStore.setTopologySpecMap(topologySpecMap);
     this.dagManagementTaskStream =
         new DagManagementTaskStreamImpl(config, Optional.empty());
diff --git 
a/gobblin-service/src/test/java/org/apache/gobblin/service/modules/orchestration/DagProcessingEngineTest.java
 
b/gobblin-service/src/test/java/org/apache/gobblin/service/modules/orchestration/DagProcessingEngineTest.java
index 11a74d7db..776d5fbf5 100644
--- 
a/gobblin-service/src/test/java/org/apache/gobblin/service/modules/orchestration/DagProcessingEngineTest.java
+++ 
b/gobblin-service/src/test/java/org/apache/gobblin/service/modules/orchestration/DagProcessingEngineTest.java
@@ -79,7 +79,7 @@ public class DagProcessingEngineTest {
     TopologySpec topologySpec = 
DagTestUtils.buildNaiveTopologySpec(specExecInstance);
     URI specExecURI = new URI(specExecInstance);
     topologySpecMap.put(specExecURI, topologySpec);
-    MostlyMySqlDagManagementStateStore dagManagementStateStore = new 
MostlyMySqlDagManagementStateStore(config, null);
+    MostlyMySqlDagManagementStateStore dagManagementStateStore = new 
MostlyMySqlDagManagementStateStore(config, null, null);
     dagManagementStateStore.setTopologySpecMap(topologySpecMap);
     this.dagManagementTaskStream =
         new DagManagementTaskStreamImpl(config, Optional.empty());
diff --git 
a/gobblin-service/src/test/java/org/apache/gobblin/service/modules/orchestration/MostlyMySqlDagManagementStateStoreTest.java
 
b/gobblin-service/src/test/java/org/apache/gobblin/service/modules/orchestration/MostlyMySqlDagManagementStateStoreTest.java
index 0a8e421df..ed8eada1b 100644
--- 
a/gobblin-service/src/test/java/org/apache/gobblin/service/modules/orchestration/MostlyMySqlDagManagementStateStoreTest.java
+++ 
b/gobblin-service/src/test/java/org/apache/gobblin/service/modules/orchestration/MostlyMySqlDagManagementStateStoreTest.java
@@ -60,14 +60,13 @@ public class MostlyMySqlDagManagementStateStoreTest {
     // Setting up mock DB
     testMetastoreDatabase = TestMetastoreDatabaseFactory.get();
 
-    Config config;
     ConfigBuilder configBuilder = ConfigBuilder.create();
     
configBuilder.addPrimitive(MostlyMySqlDagManagementStateStore.DAG_STATESTORE_CLASS_KEY,
 TestMysqlDagStateStore.class.getName())
         
.addPrimitive(MysqlUserQuotaManager.qualify(ConfigurationKeys.STATE_STORE_DB_URL_KEY),
 testMetastoreDatabase.getJdbcUrl())
         
.addPrimitive(MysqlUserQuotaManager.qualify(ConfigurationKeys.STATE_STORE_DB_USER_KEY),
 TEST_USER)
         
.addPrimitive(MysqlUserQuotaManager.qualify(ConfigurationKeys.STATE_STORE_DB_PASSWORD_KEY),
 TEST_PASSWORD)
         
.addPrimitive(MysqlUserQuotaManager.qualify(ConfigurationKeys.STATE_STORE_DB_TABLE_KEY),
 TEST_TABLE);
-    config = configBuilder.build();
+    Config config = configBuilder.build();
 
     // Constructing TopologySpecMap.
     Map<URI, TopologySpec> topologySpecMap = new HashMap<>();
@@ -75,7 +74,7 @@ public class MostlyMySqlDagManagementStateStoreTest {
     TopologySpec topologySpec = 
DagTestUtils.buildNaiveTopologySpec(specExecInstance);
     URI specExecURI = new URI(specExecInstance);
     topologySpecMap.put(specExecURI, topologySpec);
-    this.dagManagementStateStore = new 
MostlyMySqlDagManagementStateStore(config, null);
+    this.dagManagementStateStore = new 
MostlyMySqlDagManagementStateStore(config, null, null);
     this.dagManagementStateStore.setTopologySpecMap(topologySpecMap);
     this.dagManagementStateStore.start();
   }
diff --git 
a/gobblin-service/src/test/java/org/apache/gobblin/service/modules/orchestration/OrchestratorTest.java
 
b/gobblin-service/src/test/java/org/apache/gobblin/service/modules/orchestration/OrchestratorTest.java
index 8d64b0978..6a590dd26 100644
--- 
a/gobblin-service/src/test/java/org/apache/gobblin/service/modules/orchestration/OrchestratorTest.java
+++ 
b/gobblin-service/src/test/java/org/apache/gobblin/service/modules/orchestration/OrchestratorTest.java
@@ -39,7 +39,9 @@ import com.google.gson.Gson;
 import com.google.gson.GsonBuilder;
 import com.typesafe.config.Config;
 
+import org.apache.gobblin.config.ConfigBuilder;
 import org.apache.gobblin.configuration.ConfigurationKeys;
+import org.apache.gobblin.metastore.testing.TestMetastoreDatabaseFactory;
 import org.apache.gobblin.metrics.MetricContext;
 import org.apache.gobblin.metrics.ServiceMetricNames;
 import org.apache.gobblin.runtime.api.FlowSpec;
@@ -79,6 +81,9 @@ public class OrchestratorTest {
   private FlowCatalog flowCatalog;
   private FlowSpec flowSpec;
   private Orchestrator orchestrator;
+  private static final String TEST_USER = "testUser";
+  private static final String TEST_PASSWORD = "testPassword";
+  private static final String TEST_TABLE = "quotas";
 
   @BeforeClass
   public void setup() throws Exception {
@@ -108,11 +113,21 @@ public class OrchestratorTest {
     FlowTriggerHandler mockFlowTriggerHandler = mock(FlowTriggerHandler.class);
     DagManager mockDagManager = mock(DagManager.class);
     doNothing().when(mockDagManager).setTopologySpecMap(anyMap());
+    Config config = ConfigBuilder.create()
+        
.addPrimitive(MostlyMySqlDagManagementStateStore.DAG_STATESTORE_CLASS_KEY,
+            
MostlyMySqlDagManagementStateStoreTest.TestMysqlDagStateStore.class.getName())
+        
.addPrimitive(MysqlUserQuotaManager.qualify(ConfigurationKeys.STATE_STORE_DB_URL_KEY),
 TestMetastoreDatabaseFactory.get().getJdbcUrl())
+        
.addPrimitive(MysqlUserQuotaManager.qualify(ConfigurationKeys.STATE_STORE_DB_USER_KEY),
 TEST_USER)
+        
.addPrimitive(MysqlUserQuotaManager.qualify(ConfigurationKeys.STATE_STORE_DB_PASSWORD_KEY),
 TEST_PASSWORD)
+        
.addPrimitive(MysqlUserQuotaManager.qualify(ConfigurationKeys.STATE_STORE_DB_TABLE_KEY),
 TEST_TABLE).build();
+
+    MostlyMySqlDagManagementStateStore dagManagementStateStore =
+        new MostlyMySqlDagManagementStateStore(config, null, null);
 
     this.orchestrator = new 
Orchestrator(ConfigUtils.propertiesToConfig(orchestratorProperties),
         this.topologyCatalog, mockDagManager, Optional.of(logger), 
mockStatusGenerator,
-        Optional.of(mockFlowTriggerHandler), new SharedFlowMetricsSingleton(
-             ConfigUtils.propertiesToConfig(orchestratorProperties)), 
Optional.of(mock(FlowCatalog.class)));
+        Optional.of(mockFlowTriggerHandler), new 
SharedFlowMetricsSingleton(ConfigUtils.propertiesToConfig(
+            orchestratorProperties)), Optional.of(mock(FlowCatalog.class)), 
dagManagementStateStore, null);
     this.topologyCatalog.addListener(orchestrator);
     this.flowCatalog.addListener(orchestrator);
     // Start application
diff --git 
a/gobblin-service/src/test/java/org/apache/gobblin/service/modules/scheduler/GobblinServiceJobSchedulerTest.java
 
b/gobblin-service/src/test/java/org/apache/gobblin/service/modules/scheduler/GobblinServiceJobSchedulerTest.java
index 45f8323a3..06691ad36 100644
--- 
a/gobblin-service/src/test/java/org/apache/gobblin/service/modules/scheduler/GobblinServiceJobSchedulerTest.java
+++ 
b/gobblin-service/src/test/java/org/apache/gobblin/service/modules/scheduler/GobblinServiceJobSchedulerTest.java
@@ -149,7 +149,7 @@ public class GobblinServiceJobSchedulerTest {
         .assertTrue(new Predicate<Void>() {
           @Override
           public boolean apply(Void input) {
-            Map<String, Spec> scheduledFlowSpecs = 
scheduler.scheduledFlowSpecs;
+            Map<String, FlowSpec> scheduledFlowSpecs = 
scheduler.scheduledFlowSpecs;
             if (scheduledFlowSpecs != null && scheduledFlowSpecs.size() == 2) {
               return scheduler.scheduledFlowSpecs.containsKey("spec0") &&
                   scheduler.scheduledFlowSpecs.containsKey("spec1");
@@ -235,7 +235,7 @@ public class GobblinServiceJobSchedulerTest {
         .assertTrue(new Predicate<Void>() {
           @Override
           public boolean apply(Void input) {
-            Map<String, Spec> scheduledFlowSpecs = 
scheduler.scheduledFlowSpecs;
+            Map<String, FlowSpec> scheduledFlowSpecs = 
scheduler.scheduledFlowSpecs;
             if (scheduledFlowSpecs != null && scheduledFlowSpecs.size() == 2) {
               return scheduler.scheduledFlowSpecs.containsKey("spec1") &&
                   scheduler.scheduledFlowSpecs.containsKey("spec2");
@@ -300,7 +300,7 @@ public class GobblinServiceJobSchedulerTest {
         .assertTrue(new Predicate<Void>() {
           @Override
           public boolean apply(Void input) {
-            Map<String, Spec> scheduledFlowSpecs = 
scheduler.scheduledFlowSpecs;
+            Map<String, FlowSpec> scheduledFlowSpecs = 
scheduler.scheduledFlowSpecs;
             if (scheduledFlowSpecs != null && scheduledFlowSpecs.size() == 3) {
               return scheduler.scheduledFlowSpecs.containsKey("spec0") &&
                   scheduler.scheduledFlowSpecs.containsKey("spec1") &&
@@ -413,7 +413,7 @@ public class GobblinServiceJobSchedulerTest {
       if (flowName.equals(MockedSpecCompiler.UNCOMPILABLE_FLOW)) {
         throw new RuntimeException("Could not compile flow");
       }
-      super.scheduledFlowSpecs.put(addedSpec.getUri().toString(), addedSpec);
+      super.scheduledFlowSpecs.put(addedSpec.getUri().toString(), (FlowSpec) 
addedSpec);
       if (hasScheduler) {
         try {
           scheduleJob(((FlowSpec) addedSpec).getConfigAsProperties(), null);


Reply via email to