This is an automated email from the ASF dual-hosted git repository.
arjun4084346 pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/gobblin.git
The following commit(s) were added to refs/heads/master by this push:
new 221720526 [GOBBLIN-2013] update guice initialization for
'DagProcEngine enabled' and related classes (#3892)
221720526 is described below
commit 2217205267d1ab184b4fbc1e4e94550c04b09831
Author: Arjun Singh Bora <[email protected]>
AuthorDate: Mon Mar 11 23:32:33 2024 -0700
[GOBBLIN-2013] update guice initialization for 'DagProcEngine enabled' and
related classes (#3892)
* update guice module
* address review comment
---
.../gobblin/runtime/util/InjectionNames.java | 4 +++
.../modules/core/GobblinServiceConfiguration.java | 4 +++
.../modules/core/GobblinServiceGuiceModule.java | 27 ++++++++++++++++-
.../MostlyMySqlDagManagementStateStore.java | 4 +--
.../modules/orchestration/Orchestrator.java | 13 ++++----
.../modules/orchestration/proc/LaunchDagProc.java | 4 +--
.../scheduler/GobblinServiceJobScheduler.java | 13 ++++----
.../utils/FlowCompilationValidationHelper.java | 35 ++++++++++++++++++++--
...agementDagActionStoreChangeMonitorFactory.java} | 4 +--
.../gobblin/service/GobblinServiceManagerTest.java | 2 ++
.../DagManagementTaskStreamImplTest.java | 5 ++--
.../orchestration/DagProcessingEngineTest.java | 2 +-
.../MostlyMySqlDagManagementStateStoreTest.java | 5 ++--
.../modules/orchestration/OrchestratorTest.java | 19 ++++++++++--
.../scheduler/GobblinServiceJobSchedulerTest.java | 8 ++---
15 files changed, 115 insertions(+), 34 deletions(-)
diff --git
a/gobblin-runtime/src/main/java/org/apache/gobblin/runtime/util/InjectionNames.java
b/gobblin-runtime/src/main/java/org/apache/gobblin/runtime/util/InjectionNames.java
index b9ff94f8a..55aa2130d 100644
---
a/gobblin-runtime/src/main/java/org/apache/gobblin/runtime/util/InjectionNames.java
+++
b/gobblin-runtime/src/main/java/org/apache/gobblin/runtime/util/InjectionNames.java
@@ -17,6 +17,9 @@
package org.apache.gobblin.runtime.util;
+import org.apache.gobblin.service.ServiceConfigKeys;
+
+
/**
* These names are used for dependency injection, when we need to inject
different instances of the same type,
* or inject constants.
@@ -29,4 +32,5 @@ public final class InjectionNames {
// TODO: Rename `warm_standby_enabled` config to
`message_forwarding_enabled` since it's a misnomer.
public static final String WARM_STANDBY_ENABLED = "statelessRestAPIEnabled";
public static final String MULTI_ACTIVE_SCHEDULER_ENABLED =
"multiActiveSchedulerEnabled";
+ public static final String DAG_PROC_ENGINE_ENABLED =
ServiceConfigKeys.DAG_PROCESSING_ENGINE_ENABLED;
}
diff --git
a/gobblin-service/src/main/java/org/apache/gobblin/service/modules/core/GobblinServiceConfiguration.java
b/gobblin-service/src/main/java/org/apache/gobblin/service/modules/core/GobblinServiceConfiguration.java
index 1998b541d..83b64c900 100644
---
a/gobblin-service/src/main/java/org/apache/gobblin/service/modules/core/GobblinServiceConfiguration.java
+++
b/gobblin-service/src/main/java/org/apache/gobblin/service/modules/core/GobblinServiceConfiguration.java
@@ -73,6 +73,9 @@ public class GobblinServiceConfiguration {
@Getter
private final boolean onlyAnnounceLeader;
+ @Getter
+ private final boolean isDagProcessingEngineEnabled;
+
@Getter
private final Config innerConfig;
@@ -114,5 +117,6 @@ public class GobblinServiceConfiguration {
this.isTopologySpecFactoryEnabled =
ConfigUtils.getBoolean(config,
ServiceConfigKeys.GOBBLIN_SERVICE_TOPOLOGY_SPEC_FACTORY_ENABLED_KEY, true);
this.onlyAnnounceLeader = ConfigUtils.getBoolean(config,
ServiceConfigKeys.GOBBLIN_SERVICE_D2_ONLY_ANNOUNCE_LEADER, false);
+ this.isDagProcessingEngineEnabled = ConfigUtils.getBoolean(config,
ServiceConfigKeys.DAG_PROCESSING_ENGINE_ENABLED, false);
}
}
diff --git
a/gobblin-service/src/main/java/org/apache/gobblin/service/modules/core/GobblinServiceGuiceModule.java
b/gobblin-service/src/main/java/org/apache/gobblin/service/modules/core/GobblinServiceGuiceModule.java
index e108dec14..01b2598b0 100644
---
a/gobblin-service/src/main/java/org/apache/gobblin/service/modules/core/GobblinServiceGuiceModule.java
+++
b/gobblin-service/src/main/java/org/apache/gobblin/service/modules/core/GobblinServiceGuiceModule.java
@@ -67,8 +67,15 @@ import org.apache.gobblin.service.ServiceConfigKeys;
import org.apache.gobblin.service.modules.db.ServiceDatabaseManager;
import org.apache.gobblin.service.modules.db.ServiceDatabaseProvider;
import org.apache.gobblin.service.modules.db.ServiceDatabaseProviderImpl;
+import org.apache.gobblin.service.modules.orchestration.DagManagement;
+import
org.apache.gobblin.service.modules.orchestration.DagManagementStateStore;
+import
org.apache.gobblin.service.modules.orchestration.DagManagementTaskStreamImpl;
import org.apache.gobblin.service.modules.orchestration.DagManager;
+import org.apache.gobblin.service.modules.orchestration.DagProcFactory;
+import org.apache.gobblin.service.modules.orchestration.DagProcessingEngine;
+import org.apache.gobblin.service.modules.orchestration.DagTaskStream;
import org.apache.gobblin.service.modules.orchestration.FlowTriggerHandler;
+import
org.apache.gobblin.service.modules.orchestration.MostlyMySqlDagManagementStateStore;
import org.apache.gobblin.service.modules.orchestration.Orchestrator;
import org.apache.gobblin.service.modules.orchestration.UserQuotaManager;
import
org.apache.gobblin.service.modules.restli.GobblinServiceFlowConfigResourceHandler;
@@ -79,10 +86,12 @@ import
org.apache.gobblin.service.modules.restli.GobblinServiceFlowExecutionReso
import org.apache.gobblin.service.modules.scheduler.GobblinServiceJobScheduler;
import org.apache.gobblin.service.modules.topology.TopologySpecFactory;
import
org.apache.gobblin.service.modules.troubleshooter.MySqlMultiContextIssueRepository;
+import
org.apache.gobblin.service.modules.utils.FlowCompilationValidationHelper;
import org.apache.gobblin.service.modules.utils.HelixUtils;
import org.apache.gobblin.service.modules.utils.SharedFlowMetricsSingleton;
import org.apache.gobblin.service.monitoring.DagActionStoreChangeMonitor;
import
org.apache.gobblin.service.monitoring.DagActionStoreChangeMonitorFactory;
+import
org.apache.gobblin.service.monitoring.DagManagementDagActionStoreChangeMonitorFactory;
import org.apache.gobblin.service.monitoring.FlowStatusGenerator;
import org.apache.gobblin.service.monitoring.FsJobStatusRetriever;
import org.apache.gobblin.service.monitoring.GitConfigMonitor;
@@ -154,6 +163,10 @@ public class GobblinServiceGuiceModule implements Module {
binder.bindConstant()
.annotatedWith(Names.named(InjectionNames.MULTI_ACTIVE_SCHEDULER_ENABLED))
.to(serviceConfig.isMultiActiveSchedulerEnabled());
+ binder.bindConstant()
+ .annotatedWith(Names.named(InjectionNames.DAG_PROC_ENGINE_ENABLED))
+ .to(serviceConfig.isDagProcessingEngineEnabled());
+
OptionalBinder.newOptionalBinder(binder, DagActionStore.class);
if (serviceConfig.isWarmStandbyEnabled()) {
binder.bind(DagActionStore.class).to(MysqlDagActionStore.class);
@@ -173,6 +186,12 @@ public class GobblinServiceGuiceModule implements Module {
binder.bind(FlowTriggerHandler.class);
}
+ binder.bind(DagManagement.class).to(DagManagementTaskStreamImpl.class);
+ binder.bind(DagTaskStream.class).to(DagManagementTaskStreamImpl.class);
+
binder.bind(DagManagementStateStore.class).to(MostlyMySqlDagManagementStateStore.class).in(Singleton.class);
+ binder.bind(DagProcFactory.class);
+ binder.bind(DagProcessingEngine.class);
+
binder.bind(FlowConfigsResource.class);
binder.bind(FlowConfigsV2Resource.class);
binder.bind(FlowStatusResource.class);
@@ -188,6 +207,7 @@ public class GobblinServiceGuiceModule implements Module {
.to(NoopRequesterService.class);
binder.bind(SharedFlowMetricsSingleton.class);
+ binder.bind(FlowCompilationValidationHelper.class);
OptionalBinder.newOptionalBinder(binder, TopologyCatalog.class);
binder.bind(TopologyCatalog.class);
@@ -248,7 +268,12 @@ public class GobblinServiceGuiceModule implements Module {
if (serviceConfig.isWarmStandbyEnabled()) {
binder.bind(SpecStoreChangeMonitor.class).toProvider(SpecStoreChangeMonitorFactory.class).in(Singleton.class);
-
binder.bind(DagActionStoreChangeMonitor.class).toProvider(DagActionStoreChangeMonitorFactory.class).in(Singleton.class);
+ if (serviceConfig.isDagProcessingEngineEnabled()) {
+ binder.bind(DagActionStoreChangeMonitor.class)
+
.toProvider(DagManagementDagActionStoreChangeMonitorFactory.class).in(Singleton.class);
+ } else {
+
binder.bind(DagActionStoreChangeMonitor.class).toProvider(DagActionStoreChangeMonitorFactory.class).in(Singleton.class);
+ }
}
binder.bind(GobblinServiceManager.class);
diff --git
a/gobblin-service/src/main/java/org/apache/gobblin/service/modules/orchestration/MostlyMySqlDagManagementStateStore.java
b/gobblin-service/src/main/java/org/apache/gobblin/service/modules/orchestration/MostlyMySqlDagManagementStateStore.java
index 48b789d11..10c69947f 100644
---
a/gobblin-service/src/main/java/org/apache/gobblin/service/modules/orchestration/MostlyMySqlDagManagementStateStore.java
+++
b/gobblin-service/src/main/java/org/apache/gobblin/service/modules/orchestration/MostlyMySqlDagManagementStateStore.java
@@ -70,8 +70,8 @@ public class MostlyMySqlDagManagementStateStore implements
DagManagementStateSto
FlowCatalog flowCatalog;
@Inject
- public MostlyMySqlDagManagementStateStore(Config config, FlowCatalog
flowCatalog) throws IOException {
- this.quotaManager = new MysqlUserQuotaManager(config);
+ public MostlyMySqlDagManagementStateStore(Config config, FlowCatalog
flowCatalog, UserQuotaManager userQuotaManager) {
+ this.quotaManager = userQuotaManager;
this.config = config;
this.flowCatalog = flowCatalog;
}
diff --git
a/gobblin-service/src/main/java/org/apache/gobblin/service/modules/orchestration/Orchestrator.java
b/gobblin-service/src/main/java/org/apache/gobblin/service/modules/orchestration/Orchestrator.java
index 618d8b050..daf6aff3f 100644
---
a/gobblin-service/src/main/java/org/apache/gobblin/service/modules/orchestration/Orchestrator.java
+++
b/gobblin-service/src/main/java/org/apache/gobblin/service/modules/orchestration/Orchestrator.java
@@ -111,14 +111,13 @@ public class Orchestrator implements SpecCatalogListener,
Instrumentable {
@Getter
private final SharedFlowMetricsSingleton sharedFlowMetricsSingleton;
- private final ClassAliasResolver<SpecCompiler> aliasResolver;
-
@Inject
public Orchestrator(Config config, TopologyCatalog topologyCatalog,
DagManager dagManager,
Optional<Logger> log, FlowStatusGenerator flowStatusGenerator,
Optional<FlowTriggerHandler> flowTriggerHandler,
- SharedFlowMetricsSingleton sharedFlowMetricsSingleton,
Optional<FlowCatalog> flowCatalog) {
+ SharedFlowMetricsSingleton sharedFlowMetricsSingleton,
Optional<FlowCatalog> flowCatalog,
+ DagManagementStateStore dagManagementStateStore,
FlowCompilationValidationHelper flowCompilationValidationHelper) throws
IOException {
_log = log.isPresent() ? log.get() : LoggerFactory.getLogger(getClass());
- this.aliasResolver = new ClassAliasResolver<>(SpecCompiler.class);
+ ClassAliasResolver<SpecCompiler> aliasResolver = new
ClassAliasResolver<>(SpecCompiler.class);
this.topologyCatalog = topologyCatalog;
this.dagManager = dagManager;
this.flowStatusGenerator = flowStatusGenerator;
@@ -132,7 +131,7 @@ public class Orchestrator implements SpecCatalogListener,
Instrumentable {
}
_log.info("Using specCompiler class name/alias " +
specCompilerClassName);
- this.specCompiler = (SpecCompiler)
ConstructorUtils.invokeConstructor(Class.forName(this.aliasResolver.resolve(specCompilerClassName)),
config);
+ this.specCompiler = (SpecCompiler)
ConstructorUtils.invokeConstructor(Class.forName(aliasResolver.resolve(specCompilerClassName)),
config);
} catch (NoSuchMethodException | IllegalAccessException |
InvocationTargetException | InstantiationException |
ClassNotFoundException e) {
throw new RuntimeException(e);
@@ -140,6 +139,7 @@ public class Orchestrator implements SpecCatalogListener,
Instrumentable {
//At this point, the TopologySpecMap is initialized by the SpecCompiler.
Pass the TopologySpecMap to the DagManager.
this.dagManager.setTopologySpecMap(getSpecCompiler().getTopologySpecMap());
+ ((MostlyMySqlDagManagementStateStore)
dagManagementStateStore).setTopologySpecMap(getSpecCompiler().getTopologySpecMap());
this.metricContext =
Instrumented.getMetricContext(ConfigUtils.configToState(config),
this.specCompiler.getClass());
this.flowOrchestrationSuccessFulMeter =
this.metricContext.meter(ServiceMetricNames.FLOW_ORCHESTRATION_SUCCESSFUL_METER);
@@ -153,8 +153,7 @@ public class Orchestrator implements SpecCatalogListener,
Instrumentable {
quotaManager =
GobblinConstructorUtils.invokeConstructor(UserQuotaManager.class,
ConfigUtils.getString(config, ServiceConfigKeys.QUOTA_MANAGER_CLASS,
ServiceConfigKeys.DEFAULT_QUOTA_MANAGER),
config);
- this.flowCompilationValidationHelper = new
FlowCompilationValidationHelper(sharedFlowMetricsSingleton, specCompiler,
- quotaManager, eventSubmitter, flowStatusGenerator,
isFlowConcurrencyEnabled);
+ this.flowCompilationValidationHelper = flowCompilationValidationHelper;
}
@VisibleForTesting
diff --git
a/gobblin-service/src/main/java/org/apache/gobblin/service/modules/orchestration/proc/LaunchDagProc.java
b/gobblin-service/src/main/java/org/apache/gobblin/service/modules/orchestration/proc/LaunchDagProc.java
index 72a59ee0e..d9afff5b4 100644
---
a/gobblin-service/src/main/java/org/apache/gobblin/service/modules/orchestration/proc/LaunchDagProc.java
+++
b/gobblin-service/src/main/java/org/apache/gobblin/service/modules/orchestration/proc/LaunchDagProc.java
@@ -45,9 +45,9 @@ public class LaunchDagProc extends
DagProc<Optional<Dag<JobExecutionPlan>>, Opti
public LaunchDagProc(LaunchDagTask launchDagTask) {
this.launchDagTask = launchDagTask;
this.orchestrationDelayCounter = new AtomicLong(0);
- ContextAwareGauge<Long> orchestrationDelayMetric =
this.metricContext.newContextAwareGauge
+ ContextAwareGauge<Long> orchestrationDelayMetric =
metricContext.newContextAwareGauge
(ServiceMetricNames.FLOW_ORCHESTRATION_DELAY,
orchestrationDelayCounter::get);
- this.metricContext.register(orchestrationDelayMetric);
+ metricContext.register(orchestrationDelayMetric);
}
@Override
diff --git
a/gobblin-service/src/main/java/org/apache/gobblin/service/modules/scheduler/GobblinServiceJobScheduler.java
b/gobblin-service/src/main/java/org/apache/gobblin/service/modules/scheduler/GobblinServiceJobScheduler.java
index ba614867f..59d7b0c9b 100644
---
a/gobblin-service/src/main/java/org/apache/gobblin/service/modules/scheduler/GobblinServiceJobScheduler.java
+++
b/gobblin-service/src/main/java/org/apache/gobblin/service/modules/scheduler/GobblinServiceJobScheduler.java
@@ -84,11 +84,13 @@ import org.apache.gobblin.scheduler.JobScheduler;
import org.apache.gobblin.scheduler.SchedulerService;
import org.apache.gobblin.service.ServiceConfigKeys;
import org.apache.gobblin.service.modules.flowgraph.Dag;
+import
org.apache.gobblin.service.modules.orchestration.DagManagementStateStore;
import org.apache.gobblin.service.modules.orchestration.DagManager;
import org.apache.gobblin.service.modules.orchestration.FlowTriggerHandler;
import org.apache.gobblin.service.modules.orchestration.Orchestrator;
import org.apache.gobblin.service.modules.orchestration.UserQuotaManager;
import org.apache.gobblin.service.modules.spec.JobExecutionPlan;
+import
org.apache.gobblin.service.modules.utils.FlowCompilationValidationHelper;
import org.apache.gobblin.service.modules.utils.SharedFlowMetricsSingleton;
import org.apache.gobblin.service.monitoring.FlowStatusGenerator;
import org.apache.gobblin.util.ConfigUtils;
@@ -120,7 +122,7 @@ public class GobblinServiceJobScheduler extends
JobScheduler implements SpecCata
protected final Optional<UserQuotaManager> quotaManager;
protected final Optional<FlowTriggerHandler> flowTriggerHandler;
@Getter
- protected final Map<String, Spec> scheduledFlowSpecs;
+ protected final Map<String, FlowSpec> scheduledFlowSpecs;
@Getter
protected final Map<String, Long> lastUpdatedTimeForFlowSpec;
protected volatile int loadSpecsBatchSize = -1;
@@ -214,11 +216,12 @@ public class GobblinServiceJobScheduler extends
JobScheduler implements SpecCata
Optional<HelixManager> helixManager, Optional<FlowCatalog> flowCatalog,
TopologyCatalog topologyCatalog,
DagManager dagManager, Optional<UserQuotaManager> quotaManager,
SchedulerService schedulerService,
Optional<Logger> log, boolean isWarmStandbyEnabled, Optional
<FlowTriggerHandler> flowTriggerHandler,
- SharedFlowMetricsSingleton sharedFlowMetricsSingleton)
+ SharedFlowMetricsSingleton sharedFlowMetricsSingleton,
DagManagementStateStore dagManagementStateStore,
+ FlowCompilationValidationHelper flowCompilationValidationHelper)
throws Exception {
this(serviceName, config, helixManager, flowCatalog,
new Orchestrator(config, topologyCatalog, dagManager, log,
flowStatusGenerator, flowTriggerHandler,
- sharedFlowMetricsSingleton, flowCatalog),
+ sharedFlowMetricsSingleton, flowCatalog, dagManagementStateStore,
flowCompilationValidationHelper),
schedulerService, quotaManager, log, isWarmStandbyEnabled,
flowTriggerHandler);
}
@@ -502,7 +505,7 @@ public class GobblinServiceJobScheduler extends
JobScheduler implements SpecCata
@Override
public void runJob(Properties jobProps, JobListener jobListener) throws
JobException {
try {
- Spec flowSpec =
this.scheduledFlowSpecs.get(jobProps.getProperty(ConfigurationKeys.JOB_NAME_KEY));
+ FlowSpec flowSpec =
this.scheduledFlowSpecs.get(jobProps.getProperty(ConfigurationKeys.JOB_NAME_KEY));
// The trigger event time will be missing for adhoc and run-immediately
flows, so we set the default here
String triggerTimestampMillis = jobProps.getProperty(
ConfigurationKeys.ORCHESTRATOR_TRIGGER_EVENT_TIME_MILLIS_KEY,
@@ -597,7 +600,7 @@ public class GobblinServiceJobScheduler extends
JobScheduler implements SpecCata
}
// todo : we should probably not schedule a flow if it is a runOnce flow
- this.scheduledFlowSpecs.put(flowSpecUri.toString(), addedSpec);
+ this.scheduledFlowSpecs.put(flowSpecUri.toString(), flowSpec);
this.lastUpdatedTimeForFlowSpec.put(flowSpecUri.toString(),
modificationTime);
if (jobConfig.containsKey(ConfigurationKeys.JOB_SCHEDULE_KEY)) {
diff --git
a/gobblin-service/src/main/java/org/apache/gobblin/service/modules/utils/FlowCompilationValidationHelper.java
b/gobblin-service/src/main/java/org/apache/gobblin/service/modules/utils/FlowCompilationValidationHelper.java
index 7400602f7..8be9b384c 100644
---
a/gobblin-service/src/main/java/org/apache/gobblin/service/modules/utils/FlowCompilationValidationHelper.java
+++
b/gobblin-service/src/main/java/org/apache/gobblin/service/modules/utils/FlowCompilationValidationHelper.java
@@ -17,23 +17,33 @@
package org.apache.gobblin.service.modules.utils;
-import com.google.common.base.Optional;
-import com.typesafe.config.Config;
import java.io.IOException;
+import java.lang.reflect.InvocationTargetException;
import java.util.Map;
+
+import org.apache.commons.lang3.reflect.ConstructorUtils;
+
+import com.google.common.base.Optional;
+import com.google.inject.Inject;
+import com.typesafe.config.Config;
+
import lombok.Data;
import lombok.extern.slf4j.Slf4j;
+
import org.apache.gobblin.configuration.ConfigurationKeys;
import org.apache.gobblin.instrumented.Instrumented;
+import org.apache.gobblin.metrics.MetricContext;
import org.apache.gobblin.metrics.event.EventSubmitter;
import org.apache.gobblin.metrics.event.TimingEvent;
import org.apache.gobblin.runtime.api.FlowSpec;
+import org.apache.gobblin.service.ServiceConfigKeys;
import org.apache.gobblin.service.modules.flow.SpecCompiler;
import org.apache.gobblin.service.modules.flowgraph.Dag;
import org.apache.gobblin.service.modules.orchestration.TimingEventUtils;
import org.apache.gobblin.service.modules.orchestration.UserQuotaManager;
import org.apache.gobblin.service.modules.spec.JobExecutionPlan;
import org.apache.gobblin.service.monitoring.FlowStatusGenerator;
+import org.apache.gobblin.util.ClassAliasResolver;
import org.apache.gobblin.util.ConfigUtils;
@@ -59,6 +69,27 @@ public final class FlowCompilationValidationHelper {
private final FlowStatusGenerator flowStatusGenerator;
private final boolean isFlowConcurrencyEnabled;
+ @Inject
+ public FlowCompilationValidationHelper(Config config,
SharedFlowMetricsSingleton sharedFlowMetricsSingleton,
+ UserQuotaManager userQuotaManager, FlowStatusGenerator
flowStatusGenerator) {
+ try {
+ String specCompilerClassName = ConfigUtils.getString(config,
ServiceConfigKeys.GOBBLIN_SERVICE_FLOWCOMPILER_CLASS_KEY,
+ ServiceConfigKeys.DEFAULT_GOBBLIN_SERVICE_FLOWCOMPILER_CLASS);
+ this.specCompiler = (SpecCompiler)
ConstructorUtils.invokeConstructor(Class.forName(
+ new
ClassAliasResolver<>(SpecCompiler.class).resolve(specCompilerClassName)),
config);
+ } catch (NoSuchMethodException | IllegalAccessException |
InvocationTargetException | InstantiationException |
+ ClassNotFoundException e) {
+ throw new RuntimeException(e);
+ }
+ this.sharedFlowMetricsSingleton = sharedFlowMetricsSingleton;
+ this.quotaManager = userQuotaManager;
+ MetricContext metricContext =
Instrumented.getMetricContext(ConfigUtils.configToState(config),
this.specCompiler.getClass());
+ this.eventSubmitter = new EventSubmitter.Builder(metricContext,
"org.apache.gobblin.service").build();
+ this.flowStatusGenerator = flowStatusGenerator;
+ this.isFlowConcurrencyEnabled = ConfigUtils.getBoolean(config,
ServiceConfigKeys.FLOW_CONCURRENCY_ALLOWED,
+ ServiceConfigKeys.DEFAULT_FLOW_CONCURRENCY_ALLOWED);
+ }
+
/**
* For a given a flowSpec, verifies that an execution is allowed (in case
there is an ongoing execution) and the
* flowspec can be compiled. If the pre-conditions hold, then a
JobExecutionPlan is constructed and returned to the
diff --git
a/gobblin-service/src/main/java/org/apache/gobblin/service/monitoring/DagProcEngineEnabledDagActionStoreChangeMonitorFactory.java
b/gobblin-service/src/main/java/org/apache/gobblin/service/monitoring/DagManagementDagActionStoreChangeMonitorFactory.java
similarity index 93%
rename from
gobblin-service/src/main/java/org/apache/gobblin/service/monitoring/DagProcEngineEnabledDagActionStoreChangeMonitorFactory.java
rename to
gobblin-service/src/main/java/org/apache/gobblin/service/monitoring/DagManagementDagActionStoreChangeMonitorFactory.java
index fbf6bc336..4eeb2bbee 100644
---
a/gobblin-service/src/main/java/org/apache/gobblin/service/monitoring/DagProcEngineEnabledDagActionStoreChangeMonitorFactory.java
+++
b/gobblin-service/src/main/java/org/apache/gobblin/service/monitoring/DagManagementDagActionStoreChangeMonitorFactory.java
@@ -39,7 +39,7 @@ import org.apache.gobblin.util.ConfigUtils;
* A factory implementation that returns a {@link
DagManagementDagActionStoreChangeMonitor} instance.
*/
@Slf4j
-public class DagProcEngineEnabledDagActionStoreChangeMonitorFactory implements
Provider<DagActionStoreChangeMonitor> {
+public class DagManagementDagActionStoreChangeMonitorFactory implements
Provider<DagActionStoreChangeMonitor> {
static final String DAG_ACTION_STORE_CHANGE_MONITOR_NUM_THREADS_KEY =
"numThreads";
private final Config config;
@@ -50,7 +50,7 @@ public class
DagProcEngineEnabledDagActionStoreChangeMonitorFactory implements P
private final DagManagement dagManagement;
@Inject
- public DagProcEngineEnabledDagActionStoreChangeMonitorFactory(Config config,
DagManager dagManager, FlowCatalog flowCatalog,
+ public DagManagementDagActionStoreChangeMonitorFactory(Config config,
DagManager dagManager, FlowCatalog flowCatalog,
Orchestrator orchestrator, DagActionStore dagActionStore, DagManagement
dagManagement,
@Named(InjectionNames.MULTI_ACTIVE_SCHEDULER_ENABLED) boolean
isMultiActiveSchedulerEnabled) {
this.config = Objects.requireNonNull(config);
diff --git
a/gobblin-service/src/test/java/org/apache/gobblin/service/GobblinServiceManagerTest.java
b/gobblin-service/src/test/java/org/apache/gobblin/service/GobblinServiceManagerTest.java
index 2162bed00..9fee0d3f1 100644
---
a/gobblin-service/src/test/java/org/apache/gobblin/service/GobblinServiceManagerTest.java
+++
b/gobblin-service/src/test/java/org/apache/gobblin/service/GobblinServiceManagerTest.java
@@ -62,6 +62,7 @@ import
org.apache.gobblin.service.modules.core.GobblinServiceManager;
import org.apache.gobblin.service.modules.flow.MockedSpecCompiler;
import
org.apache.gobblin.service.modules.orchestration.AbstractUserQuotaManager;
import org.apache.gobblin.service.modules.orchestration.DagManager;
+import org.apache.gobblin.service.modules.orchestration.MysqlDagStateStore;
import
org.apache.gobblin.service.modules.orchestration.ServiceAzkabanConfigKeys;
import org.apache.gobblin.service.monitoring.FsJobStatusRetriever;
import org.apache.gobblin.service.monitoring.GitConfigMonitor;
@@ -161,6 +162,7 @@ public class GobblinServiceManagerTest {
serviceCoreProperties.put(FlowCatalog.FLOWSPEC_STORE_DIR_KEY,
FLOW_SPEC_STORE_DIR);
serviceCoreProperties.put(FlowCatalog.FLOWSPEC_STORE_CLASS_KEY,
"org.apache.gobblin.runtime.spec_store.MysqlSpecStore");
serviceCoreProperties.put(ConfigurationKeys.STATE_STORE_DB_TABLE_KEY,
"flow_spec_store");
+ serviceCoreProperties.put(MysqlDagStateStore.CONFIG_PREFIX + "." +
ConfigurationKeys.STATE_STORE_DB_TABLE_KEY, "dag_store");
serviceCoreProperties.put(FlowCatalog.FLOWSPEC_SERDE_CLASS_KEY,
"org.apache.gobblin.runtime.spec_serde.GsonFlowSpecSerDe");
serviceCoreProperties.put(ServiceConfigKeys.TOPOLOGY_FACTORY_TOPOLOGY_NAMES_KEY,
TEST_GOBBLIN_EXECUTOR_NAME);
diff --git
a/gobblin-service/src/test/java/org/apache/gobblin/service/modules/orchestration/DagManagementTaskStreamImplTest.java
b/gobblin-service/src/test/java/org/apache/gobblin/service/modules/orchestration/DagManagementTaskStreamImplTest.java
index c9998f046..20425071a 100644
---
a/gobblin-service/src/test/java/org/apache/gobblin/service/modules/orchestration/DagManagementTaskStreamImplTest.java
+++
b/gobblin-service/src/test/java/org/apache/gobblin/service/modules/orchestration/DagManagementTaskStreamImplTest.java
@@ -53,14 +53,13 @@ public class DagManagementTaskStreamImplTest {
// Setting up mock DB
testMetastoreDatabase = TestMetastoreDatabaseFactory.get();
- Config config;
ConfigBuilder configBuilder = ConfigBuilder.create();
configBuilder.addPrimitive(MostlyMySqlDagManagementStateStore.DAG_STATESTORE_CLASS_KEY,
MostlyMySqlDagManagementStateStoreTest.TestMysqlDagStateStore.class.getName())
.addPrimitive(MysqlUserQuotaManager.qualify(ConfigurationKeys.STATE_STORE_DB_URL_KEY),
testMetastoreDatabase.getJdbcUrl())
.addPrimitive(MysqlUserQuotaManager.qualify(ConfigurationKeys.STATE_STORE_DB_USER_KEY),
TEST_USER)
.addPrimitive(MysqlUserQuotaManager.qualify(ConfigurationKeys.STATE_STORE_DB_PASSWORD_KEY),
TEST_PASSWORD)
.addPrimitive(MysqlUserQuotaManager.qualify(ConfigurationKeys.STATE_STORE_DB_TABLE_KEY),
TEST_TABLE);
- config = configBuilder.build();
+ Config config = configBuilder.build();
// Constructing TopologySpecMap.
Map<URI, TopologySpec> topologySpecMap = new HashMap<>();
@@ -68,7 +67,7 @@ public class DagManagementTaskStreamImplTest {
TopologySpec topologySpec =
DagTestUtils.buildNaiveTopologySpec(specExecInstance);
URI specExecURI = new URI(specExecInstance);
topologySpecMap.put(specExecURI, topologySpec);
- MostlyMySqlDagManagementStateStore dagManagementStateStore = new
MostlyMySqlDagManagementStateStore(config, null);
+ MostlyMySqlDagManagementStateStore dagManagementStateStore = new
MostlyMySqlDagManagementStateStore(config, null, null);
dagManagementStateStore.setTopologySpecMap(topologySpecMap);
this.dagManagementTaskStream =
new DagManagementTaskStreamImpl(config, Optional.empty());
diff --git
a/gobblin-service/src/test/java/org/apache/gobblin/service/modules/orchestration/DagProcessingEngineTest.java
b/gobblin-service/src/test/java/org/apache/gobblin/service/modules/orchestration/DagProcessingEngineTest.java
index 11a74d7db..776d5fbf5 100644
---
a/gobblin-service/src/test/java/org/apache/gobblin/service/modules/orchestration/DagProcessingEngineTest.java
+++
b/gobblin-service/src/test/java/org/apache/gobblin/service/modules/orchestration/DagProcessingEngineTest.java
@@ -79,7 +79,7 @@ public class DagProcessingEngineTest {
TopologySpec topologySpec =
DagTestUtils.buildNaiveTopologySpec(specExecInstance);
URI specExecURI = new URI(specExecInstance);
topologySpecMap.put(specExecURI, topologySpec);
- MostlyMySqlDagManagementStateStore dagManagementStateStore = new
MostlyMySqlDagManagementStateStore(config, null);
+ MostlyMySqlDagManagementStateStore dagManagementStateStore = new
MostlyMySqlDagManagementStateStore(config, null, null);
dagManagementStateStore.setTopologySpecMap(topologySpecMap);
this.dagManagementTaskStream =
new DagManagementTaskStreamImpl(config, Optional.empty());
diff --git
a/gobblin-service/src/test/java/org/apache/gobblin/service/modules/orchestration/MostlyMySqlDagManagementStateStoreTest.java
b/gobblin-service/src/test/java/org/apache/gobblin/service/modules/orchestration/MostlyMySqlDagManagementStateStoreTest.java
index 0a8e421df..ed8eada1b 100644
---
a/gobblin-service/src/test/java/org/apache/gobblin/service/modules/orchestration/MostlyMySqlDagManagementStateStoreTest.java
+++
b/gobblin-service/src/test/java/org/apache/gobblin/service/modules/orchestration/MostlyMySqlDagManagementStateStoreTest.java
@@ -60,14 +60,13 @@ public class MostlyMySqlDagManagementStateStoreTest {
// Setting up mock DB
testMetastoreDatabase = TestMetastoreDatabaseFactory.get();
- Config config;
ConfigBuilder configBuilder = ConfigBuilder.create();
configBuilder.addPrimitive(MostlyMySqlDagManagementStateStore.DAG_STATESTORE_CLASS_KEY,
TestMysqlDagStateStore.class.getName())
.addPrimitive(MysqlUserQuotaManager.qualify(ConfigurationKeys.STATE_STORE_DB_URL_KEY),
testMetastoreDatabase.getJdbcUrl())
.addPrimitive(MysqlUserQuotaManager.qualify(ConfigurationKeys.STATE_STORE_DB_USER_KEY),
TEST_USER)
.addPrimitive(MysqlUserQuotaManager.qualify(ConfigurationKeys.STATE_STORE_DB_PASSWORD_KEY),
TEST_PASSWORD)
.addPrimitive(MysqlUserQuotaManager.qualify(ConfigurationKeys.STATE_STORE_DB_TABLE_KEY),
TEST_TABLE);
- config = configBuilder.build();
+ Config config = configBuilder.build();
// Constructing TopologySpecMap.
Map<URI, TopologySpec> topologySpecMap = new HashMap<>();
@@ -75,7 +74,7 @@ public class MostlyMySqlDagManagementStateStoreTest {
TopologySpec topologySpec =
DagTestUtils.buildNaiveTopologySpec(specExecInstance);
URI specExecURI = new URI(specExecInstance);
topologySpecMap.put(specExecURI, topologySpec);
- this.dagManagementStateStore = new
MostlyMySqlDagManagementStateStore(config, null);
+ this.dagManagementStateStore = new
MostlyMySqlDagManagementStateStore(config, null, null);
this.dagManagementStateStore.setTopologySpecMap(topologySpecMap);
this.dagManagementStateStore.start();
}
diff --git
a/gobblin-service/src/test/java/org/apache/gobblin/service/modules/orchestration/OrchestratorTest.java
b/gobblin-service/src/test/java/org/apache/gobblin/service/modules/orchestration/OrchestratorTest.java
index 8d64b0978..6a590dd26 100644
---
a/gobblin-service/src/test/java/org/apache/gobblin/service/modules/orchestration/OrchestratorTest.java
+++
b/gobblin-service/src/test/java/org/apache/gobblin/service/modules/orchestration/OrchestratorTest.java
@@ -39,7 +39,9 @@ import com.google.gson.Gson;
import com.google.gson.GsonBuilder;
import com.typesafe.config.Config;
+import org.apache.gobblin.config.ConfigBuilder;
import org.apache.gobblin.configuration.ConfigurationKeys;
+import org.apache.gobblin.metastore.testing.TestMetastoreDatabaseFactory;
import org.apache.gobblin.metrics.MetricContext;
import org.apache.gobblin.metrics.ServiceMetricNames;
import org.apache.gobblin.runtime.api.FlowSpec;
@@ -79,6 +81,9 @@ public class OrchestratorTest {
private FlowCatalog flowCatalog;
private FlowSpec flowSpec;
private Orchestrator orchestrator;
+ private static final String TEST_USER = "testUser";
+ private static final String TEST_PASSWORD = "testPassword";
+ private static final String TEST_TABLE = "quotas";
@BeforeClass
public void setup() throws Exception {
@@ -108,11 +113,21 @@ public class OrchestratorTest {
FlowTriggerHandler mockFlowTriggerHandler = mock(FlowTriggerHandler.class);
DagManager mockDagManager = mock(DagManager.class);
doNothing().when(mockDagManager).setTopologySpecMap(anyMap());
+ Config config = ConfigBuilder.create()
+
.addPrimitive(MostlyMySqlDagManagementStateStore.DAG_STATESTORE_CLASS_KEY,
+
MostlyMySqlDagManagementStateStoreTest.TestMysqlDagStateStore.class.getName())
+
.addPrimitive(MysqlUserQuotaManager.qualify(ConfigurationKeys.STATE_STORE_DB_URL_KEY),
TestMetastoreDatabaseFactory.get().getJdbcUrl())
+
.addPrimitive(MysqlUserQuotaManager.qualify(ConfigurationKeys.STATE_STORE_DB_USER_KEY),
TEST_USER)
+
.addPrimitive(MysqlUserQuotaManager.qualify(ConfigurationKeys.STATE_STORE_DB_PASSWORD_KEY),
TEST_PASSWORD)
+
.addPrimitive(MysqlUserQuotaManager.qualify(ConfigurationKeys.STATE_STORE_DB_TABLE_KEY),
TEST_TABLE).build();
+
+ MostlyMySqlDagManagementStateStore dagManagementStateStore =
+ new MostlyMySqlDagManagementStateStore(config, null, null);
this.orchestrator = new
Orchestrator(ConfigUtils.propertiesToConfig(orchestratorProperties),
this.topologyCatalog, mockDagManager, Optional.of(logger),
mockStatusGenerator,
- Optional.of(mockFlowTriggerHandler), new SharedFlowMetricsSingleton(
- ConfigUtils.propertiesToConfig(orchestratorProperties)),
Optional.of(mock(FlowCatalog.class)));
+ Optional.of(mockFlowTriggerHandler), new
SharedFlowMetricsSingleton(ConfigUtils.propertiesToConfig(
+ orchestratorProperties)), Optional.of(mock(FlowCatalog.class)),
dagManagementStateStore, null);
this.topologyCatalog.addListener(orchestrator);
this.flowCatalog.addListener(orchestrator);
// Start application
diff --git
a/gobblin-service/src/test/java/org/apache/gobblin/service/modules/scheduler/GobblinServiceJobSchedulerTest.java
b/gobblin-service/src/test/java/org/apache/gobblin/service/modules/scheduler/GobblinServiceJobSchedulerTest.java
index 45f8323a3..06691ad36 100644
---
a/gobblin-service/src/test/java/org/apache/gobblin/service/modules/scheduler/GobblinServiceJobSchedulerTest.java
+++
b/gobblin-service/src/test/java/org/apache/gobblin/service/modules/scheduler/GobblinServiceJobSchedulerTest.java
@@ -149,7 +149,7 @@ public class GobblinServiceJobSchedulerTest {
.assertTrue(new Predicate<Void>() {
@Override
public boolean apply(Void input) {
- Map<String, Spec> scheduledFlowSpecs =
scheduler.scheduledFlowSpecs;
+ Map<String, FlowSpec> scheduledFlowSpecs =
scheduler.scheduledFlowSpecs;
if (scheduledFlowSpecs != null && scheduledFlowSpecs.size() == 2) {
return scheduler.scheduledFlowSpecs.containsKey("spec0") &&
scheduler.scheduledFlowSpecs.containsKey("spec1");
@@ -235,7 +235,7 @@ public class GobblinServiceJobSchedulerTest {
.assertTrue(new Predicate<Void>() {
@Override
public boolean apply(Void input) {
- Map<String, Spec> scheduledFlowSpecs =
scheduler.scheduledFlowSpecs;
+ Map<String, FlowSpec> scheduledFlowSpecs =
scheduler.scheduledFlowSpecs;
if (scheduledFlowSpecs != null && scheduledFlowSpecs.size() == 2) {
return scheduler.scheduledFlowSpecs.containsKey("spec1") &&
scheduler.scheduledFlowSpecs.containsKey("spec2");
@@ -300,7 +300,7 @@ public class GobblinServiceJobSchedulerTest {
.assertTrue(new Predicate<Void>() {
@Override
public boolean apply(Void input) {
- Map<String, Spec> scheduledFlowSpecs =
scheduler.scheduledFlowSpecs;
+ Map<String, FlowSpec> scheduledFlowSpecs =
scheduler.scheduledFlowSpecs;
if (scheduledFlowSpecs != null && scheduledFlowSpecs.size() == 3) {
return scheduler.scheduledFlowSpecs.containsKey("spec0") &&
scheduler.scheduledFlowSpecs.containsKey("spec1") &&
@@ -413,7 +413,7 @@ public class GobblinServiceJobSchedulerTest {
if (flowName.equals(MockedSpecCompiler.UNCOMPILABLE_FLOW)) {
throw new RuntimeException("Could not compile flow");
}
- super.scheduledFlowSpecs.put(addedSpec.getUri().toString(), addedSpec);
+ super.scheduledFlowSpecs.put(addedSpec.getUri().toString(), (FlowSpec)
addedSpec);
if (hasScheduler) {
try {
scheduleJob(((FlowSpec) addedSpec).getConfigAsProperties(), null);